WIP: testing xep-0313 history #16

Draft
karmanyaahm wants to merge 1 commits from karmanyaahm/unified-push-xmpp:history into main
3 changed files with 73 additions and 8 deletions

View File

@ -6,10 +6,10 @@ import (
"path/filepath" "path/filepath"
"dev.sum7.eu/genofire/golang-lib/file" "dev.sum7.eu/genofire/golang-lib/file"
"dev.sum7.eu/genofire/unified-push-xmpp/distributor/storage"
"github.com/bdlm/log" "github.com/bdlm/log"
"github.com/bdlm/std/logger" "github.com/bdlm/std/logger"
"unifiedpush.org/go/np2p_dbus/distributor" "unifiedpush.org/go/np2p_dbus/distributor"
"unifiedpush.org/go/np2p_dbus/storage"
) )
var dbus *distributor.DBus var dbus *distributor.DBus

View File

@ -0,0 +1,38 @@
package storage
import (
"gorm.io/gorm"
"unifiedpush.org/go/np2p_dbus/storage"
)
func InitStorage(path string) (*Storage, error) {
st, err := storage.InitStorage(path)
db := st.DB()
if err := db.AutoMigrate(&latestID{}); err != nil {
return nil, err
}
return &Storage{Storage: st, db: st.DB()}, err
}
type Storage struct {
*storage.Storage
db *gorm.DB
}
func (d Storage) GetLatestNotif() (int64, error) {
id := latestID{}
result := d.db.First(&id)
return id.ID, result.Error
}
func (d Storage) SaveLatestNotif(time int64) error {
id := latestID{}
result := d.db.Model(&id).Where("1=1").Update("id", time)
return result.Error
}
type latestID struct {
ID int64
}

View File

@ -14,12 +14,13 @@ import (
"mellium.im/xmlstream" "mellium.im/xmlstream"
"mellium.im/xmpp" "mellium.im/xmpp"
"mellium.im/xmpp/disco" "mellium.im/xmpp/disco"
"mellium.im/xmpp/history"
"mellium.im/xmpp/jid" "mellium.im/xmpp/jid"
"mellium.im/xmpp/mux" "mellium.im/xmpp/mux"
"mellium.im/xmpp/stanza" "mellium.im/xmpp/stanza"
"unifiedpush.org/go/np2p_dbus/distributor" "unifiedpush.org/go/np2p_dbus/distributor"
"unifiedpush.org/go/np2p_dbus/storage"
"dev.sum7.eu/genofire/unified-push-xmpp/distributor/storage"
"dev.sum7.eu/genofire/unified-push-xmpp/messages" "dev.sum7.eu/genofire/unified-push-xmpp/messages"
) )
@ -74,23 +75,43 @@ func (s *XMPPService) Run(dbus *distributor.DBus, store *storage.Storage) error
return err return err
} }
go s.checkServer() go s.checkServer()
go s.selectGateway()
go func() {
s.selectGateway()
latestID, _ := store.GetLatestNotif()
log.Debug("Starting history fetch")
q := history.Query{With: s.gateway, Start: time.UnixMilli(latestID)}
_, _ = history.Fetch(context.TODO(), q, s.session.LocalAddr().Bare(), s.session)
log.Debug("Finished history fetch")
}()
log.Debug("xmpp client is running") log.Debug("xmpp client is running")
s.session.Serve(mux.New( s.session.Serve(mux.New(
// disco.Handle(), // disco.Handle(),
mux.MessageFunc("", xml.Name{Local: "subject"}, s.message), mux.MessageFunc("", xml.Name{Local: "subject"}, s.HandleMessage),
history.Handle(history.NewHandler(s)),
)) ))
return nil return nil
} }
// handler of incoming message - forward to DBUS // handler of incoming HandleMessage - forward to DBUS
func (s *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEncoder) error { func (s *XMPPService) HandleMessage(msgHead stanza.Message, t xmlstream.TokenReadEncoder) error {
logger := log.WithFields(map[string]interface{}{ logger := log.WithFields(map[string]interface{}{
"to": msgHead.To.String(), "to": msgHead.To.String(),
"from": msgHead.From.String(), "from": msgHead.From.String(),
"id": msgHead.ID, "id": msgHead.ID,
}) })
d := xml.NewTokenDecoder(t) d := xml.NewTokenDecoder(t)
//probably temporary workaround for history receiving
if msgHead.From.String() == "" {
_, _ = d.Token()
_, _ = d.Token()
_, _ = d.Token()
msgHead.From = s.gateway
}
msg := messages.MessageBody{} msg := messages.MessageBody{}
err := d.Decode(&msg) err := d.Decode(&msg)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
@ -107,14 +128,19 @@ func (s *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEncod
"content": msg.Body, "content": msg.Body,
}) })
if err := s.store.SaveLatestNotif(time.Now().UnixMilli()); err != nil {
logger.Errorf("Cannot save latest push id: %s", err)
}
conn := s.store.GetConnectionbyPublic(msg.PublicToken) conn := s.store.GetConnectionbyPublic(msg.PublicToken)
if conn == nil { if conn == nil {
logger.Warnf("no appID and appToken found for publicToken") logger.Warnf("no appID and appToken found for publicToken")
return nil return nil
} }
from := msgHead.From.String() from := msgHead.From.String()
if settings := strings.Split(conn.Settings, ":"); len(settings) > 1 && settings[0] == from { if settings := strings.Split(conn.Settings, ":"); !(len(settings) > 1 && settings[0] == from) {
log.WithField("from", from).Info("message not from gateway, that is no notification") log.WithField("from", from).WithField("set", settings[0]).Info("message not from gateway, that is no notification")
return nil return nil
} }
logger = logger.WithFields(map[string]interface{}{ logger = logger.WithFields(map[string]interface{}{
@ -127,6 +153,7 @@ func (s *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEncod
logger.Errorf("Error send unified push: %q", err) logger.Errorf("Error send unified push: %q", err)
return nil return nil
} }
logger.Infof("receive unified push") logger.Infof("receive unified push")
return nil return nil