forked from genofire/unified-push-xmpp
This commit is contained in:
parent
4b06d6886a
commit
6f0b45ba7a
|
@ -6,10 +6,10 @@ import (
|
|||
"path/filepath"
|
||||
|
||||
"dev.sum7.eu/genofire/golang-lib/file"
|
||||
"dev.sum7.eu/genofire/unified-push-xmpp/distributor/storage"
|
||||
"github.com/bdlm/log"
|
||||
"github.com/bdlm/std/logger"
|
||||
"unifiedpush.org/go/np2p_dbus/distributor"
|
||||
"unifiedpush.org/go/np2p_dbus/storage"
|
||||
)
|
||||
|
||||
var dbus *distributor.DBus
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"gorm.io/gorm"
|
||||
"unifiedpush.org/go/np2p_dbus/storage"
|
||||
)
|
||||
|
||||
func InitStorage(path string) (*Storage, error) {
|
||||
st, err := storage.InitStorage(path)
|
||||
db := st.DB()
|
||||
|
||||
if err := db.AutoMigrate(&latestID{}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Storage{Storage: st, db: st.DB()}, err
|
||||
}
|
||||
|
||||
type Storage struct {
|
||||
*storage.Storage
|
||||
db *gorm.DB
|
||||
}
|
||||
|
||||
func (d Storage) GetLatestNotif() (int64, error) {
|
||||
id := latestID{}
|
||||
result := d.db.First(&id)
|
||||
return id.ID, result.Error
|
||||
}
|
||||
|
||||
func (d Storage) SaveLatestNotif(time int64) error {
|
||||
id := latestID{}
|
||||
result := d.db.Model(&id).Where("1=1").Update("id", time)
|
||||
return result.Error
|
||||
}
|
||||
|
||||
type latestID struct {
|
||||
ID int64
|
||||
}
|
|
@ -14,12 +14,13 @@ import (
|
|||
"mellium.im/xmlstream"
|
||||
"mellium.im/xmpp"
|
||||
"mellium.im/xmpp/disco"
|
||||
"mellium.im/xmpp/history"
|
||||
"mellium.im/xmpp/jid"
|
||||
"mellium.im/xmpp/mux"
|
||||
"mellium.im/xmpp/stanza"
|
||||
"unifiedpush.org/go/np2p_dbus/distributor"
|
||||
"unifiedpush.org/go/np2p_dbus/storage"
|
||||
|
||||
"dev.sum7.eu/genofire/unified-push-xmpp/distributor/storage"
|
||||
"dev.sum7.eu/genofire/unified-push-xmpp/messages"
|
||||
)
|
||||
|
||||
|
@ -74,23 +75,43 @@ func (s *XMPPService) Run(dbus *distributor.DBus, store *storage.Storage) error
|
|||
return err
|
||||
}
|
||||
go s.checkServer()
|
||||
go s.selectGateway()
|
||||
|
||||
go func() {
|
||||
s.selectGateway()
|
||||
latestID, _ := store.GetLatestNotif()
|
||||
log.Debug("Starting history fetch")
|
||||
q := history.Query{With: s.gateway, Start: time.UnixMilli(latestID)}
|
||||
_, _ = history.Fetch(context.TODO(), q, s.session.LocalAddr().Bare(), s.session)
|
||||
log.Debug("Finished history fetch")
|
||||
}()
|
||||
|
||||
log.Debug("xmpp client is running")
|
||||
s.session.Serve(mux.New(
|
||||
// disco.Handle(),
|
||||
mux.MessageFunc("", xml.Name{Local: "subject"}, s.message),
|
||||
mux.MessageFunc("", xml.Name{Local: "subject"}, s.HandleMessage),
|
||||
history.Handle(history.NewHandler(s)),
|
||||
))
|
||||
return nil
|
||||
}
|
||||
|
||||
// handler of incoming message - forward to DBUS
|
||||
func (s *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEncoder) error {
|
||||
// handler of incoming HandleMessage - forward to DBUS
|
||||
func (s *XMPPService) HandleMessage(msgHead stanza.Message, t xmlstream.TokenReadEncoder) error {
|
||||
logger := log.WithFields(map[string]interface{}{
|
||||
"to": msgHead.To.String(),
|
||||
"from": msgHead.From.String(),
|
||||
"id": msgHead.ID,
|
||||
})
|
||||
|
||||
d := xml.NewTokenDecoder(t)
|
||||
|
||||
//probably temporary workaround for history receiving
|
||||
if msgHead.From.String() == "" {
|
||||
_, _ = d.Token()
|
||||
_, _ = d.Token()
|
||||
_, _ = d.Token()
|
||||
msgHead.From = s.gateway
|
||||
}
|
||||
|
||||
msg := messages.MessageBody{}
|
||||
err := d.Decode(&msg)
|
||||
if err != nil && err != io.EOF {
|
||||
|
@ -107,14 +128,19 @@ func (s *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEncod
|
|||
"content": msg.Body,
|
||||
})
|
||||
|
||||
if err := s.store.SaveLatestNotif(time.Now().UnixMilli()); err != nil {
|
||||
logger.Errorf("Cannot save latest push id: %s", err)
|
||||
}
|
||||
|
||||
conn := s.store.GetConnectionbyPublic(msg.PublicToken)
|
||||
if conn == nil {
|
||||
logger.Warnf("no appID and appToken found for publicToken")
|
||||
return nil
|
||||
}
|
||||
from := msgHead.From.String()
|
||||
if settings := strings.Split(conn.Settings, ":"); len(settings) > 1 && settings[0] == from {
|
||||
log.WithField("from", from).Info("message not from gateway, that is no notification")
|
||||
if settings := strings.Split(conn.Settings, ":"); !(len(settings) > 1 && settings[0] == from) {
|
||||
log.WithField("from", from).WithField("set", settings[0]).Info("message not from gateway, that is no notification")
|
||||
|
||||
return nil
|
||||
}
|
||||
logger = logger.WithFields(map[string]interface{}{
|
||||
|
@ -127,6 +153,7 @@ func (s *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEncod
|
|||
logger.Errorf("Error send unified push: %q", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
logger.Infof("receive unified push")
|
||||
|
||||
return nil
|
||||
|
|
Loading…
Reference in New Issue