diff --git a/distributor/main.go b/distributor/main.go index 85fcc0c..f067cfe 100644 --- a/distributor/main.go +++ b/distributor/main.go @@ -6,10 +6,10 @@ import ( "path/filepath" "dev.sum7.eu/genofire/golang-lib/file" + "dev.sum7.eu/genofire/unified-push-xmpp/distributor/storage" "github.com/bdlm/log" "github.com/bdlm/std/logger" "unifiedpush.org/go/np2p_dbus/distributor" - "unifiedpush.org/go/np2p_dbus/storage" ) var dbus *distributor.DBus diff --git a/distributor/storage/store.go b/distributor/storage/store.go new file mode 100644 index 0000000..953a15e --- /dev/null +++ b/distributor/storage/store.go @@ -0,0 +1,38 @@ +package storage + +import ( + "gorm.io/gorm" + "unifiedpush.org/go/np2p_dbus/storage" +) + +func InitStorage(path string) (*Storage, error) { + st, err := storage.InitStorage(path) + db := st.DB() + + if err := db.AutoMigrate(&latestID{}); err != nil { + return nil, err + } + + return &Storage{Storage: st, db: st.DB()}, err +} + +type Storage struct { + *storage.Storage + db *gorm.DB +} + +func (d Storage) GetLatestNotif() (int64, error) { + id := latestID{} + result := d.db.First(&id) + return id.ID, result.Error +} + +func (d Storage) SaveLatestNotif(time int64) error { + id := latestID{} + result := d.db.Model(&id).Where("1=1").Update("id", time) + return result.Error +} + +type latestID struct { + ID int64 +} diff --git a/distributor/xmpp.go b/distributor/xmpp.go index cba241a..7a81a89 100644 --- a/distributor/xmpp.go +++ b/distributor/xmpp.go @@ -14,12 +14,13 @@ import ( "mellium.im/xmlstream" "mellium.im/xmpp" "mellium.im/xmpp/disco" + "mellium.im/xmpp/history" "mellium.im/xmpp/jid" "mellium.im/xmpp/mux" "mellium.im/xmpp/stanza" "unifiedpush.org/go/np2p_dbus/distributor" - "unifiedpush.org/go/np2p_dbus/storage" + "dev.sum7.eu/genofire/unified-push-xmpp/distributor/storage" "dev.sum7.eu/genofire/unified-push-xmpp/messages" ) @@ -74,23 +75,43 @@ func (s *XMPPService) Run(dbus *distributor.DBus, store *storage.Storage) error return err } go s.checkServer() - go s.selectGateway() + + go func() { + s.selectGateway() + latestID, _ := store.GetLatestNotif() + log.Debug("Starting history fetch") + q := history.Query{With: s.gateway, Start: time.UnixMilli(latestID)} + _, _ = history.Fetch(context.TODO(), q, s.session.LocalAddr().Bare(), s.session) + log.Debug("Finished history fetch") + }() + log.Debug("xmpp client is running") s.session.Serve(mux.New( // disco.Handle(), - mux.MessageFunc("", xml.Name{Local: "subject"}, s.message), + mux.MessageFunc("", xml.Name{Local: "subject"}, s.HandleMessage), + history.Handle(history.NewHandler(s)), )) return nil } -// handler of incoming message - forward to DBUS -func (s *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEncoder) error { +// handler of incoming HandleMessage - forward to DBUS +func (s *XMPPService) HandleMessage(msgHead stanza.Message, t xmlstream.TokenReadEncoder) error { logger := log.WithFields(map[string]interface{}{ "to": msgHead.To.String(), "from": msgHead.From.String(), "id": msgHead.ID, }) + d := xml.NewTokenDecoder(t) + + //probably temporary workaround for history receiving + if msgHead.From.String() == "" { + _, _ = d.Token() + _, _ = d.Token() + _, _ = d.Token() + msgHead.From = s.gateway + } + msg := messages.MessageBody{} err := d.Decode(&msg) if err != nil && err != io.EOF { @@ -107,14 +128,19 @@ func (s *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEncod "content": msg.Body, }) + if err := s.store.SaveLatestNotif(time.Now().UnixMilli()); err != nil { + logger.Errorf("Cannot save latest push id: %s", err) + } + conn := s.store.GetConnectionbyPublic(msg.PublicToken) if conn == nil { logger.Warnf("no appID and appToken found for publicToken") return nil } from := msgHead.From.String() - if settings := strings.Split(conn.Settings, ":"); len(settings) > 1 && settings[0] == from { - log.WithField("from", from).Info("message not from gateway, that is no notification") + if settings := strings.Split(conn.Settings, ":"); !(len(settings) > 1 && settings[0] == from) { + log.WithField("from", from).WithField("set", settings[0]).Info("message not from gateway, that is no notification") + return nil } logger = logger.WithFields(map[string]interface{}{ @@ -127,6 +153,7 @@ func (s *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEncod logger.Errorf("Error send unified push: %q", err) return nil } + logger.Infof("receive unified push") return nil