Compare commits
1 Commits
Author | SHA1 | Date |
---|---|---|
Karmanyaah Malhotra | 6f0b45ba7a |
|
@ -6,10 +6,10 @@ import (
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
"dev.sum7.eu/genofire/golang-lib/file"
|
"dev.sum7.eu/genofire/golang-lib/file"
|
||||||
|
"dev.sum7.eu/genofire/unified-push-xmpp/distributor/storage"
|
||||||
"github.com/bdlm/log"
|
"github.com/bdlm/log"
|
||||||
"github.com/bdlm/std/logger"
|
"github.com/bdlm/std/logger"
|
||||||
"unifiedpush.org/go/np2p_dbus/distributor"
|
"unifiedpush.org/go/np2p_dbus/distributor"
|
||||||
"unifiedpush.org/go/np2p_dbus/storage"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var dbus *distributor.DBus
|
var dbus *distributor.DBus
|
||||||
|
|
|
@ -0,0 +1,38 @@
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"gorm.io/gorm"
|
||||||
|
"unifiedpush.org/go/np2p_dbus/storage"
|
||||||
|
)
|
||||||
|
|
||||||
|
func InitStorage(path string) (*Storage, error) {
|
||||||
|
st, err := storage.InitStorage(path)
|
||||||
|
db := st.DB()
|
||||||
|
|
||||||
|
if err := db.AutoMigrate(&latestID{}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Storage{Storage: st, db: st.DB()}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
type Storage struct {
|
||||||
|
*storage.Storage
|
||||||
|
db *gorm.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d Storage) GetLatestNotif() (int64, error) {
|
||||||
|
id := latestID{}
|
||||||
|
result := d.db.First(&id)
|
||||||
|
return id.ID, result.Error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d Storage) SaveLatestNotif(time int64) error {
|
||||||
|
id := latestID{}
|
||||||
|
result := d.db.Model(&id).Where("1=1").Update("id", time)
|
||||||
|
return result.Error
|
||||||
|
}
|
||||||
|
|
||||||
|
type latestID struct {
|
||||||
|
ID int64
|
||||||
|
}
|
|
@ -14,12 +14,13 @@ import (
|
||||||
"mellium.im/xmlstream"
|
"mellium.im/xmlstream"
|
||||||
"mellium.im/xmpp"
|
"mellium.im/xmpp"
|
||||||
"mellium.im/xmpp/disco"
|
"mellium.im/xmpp/disco"
|
||||||
|
"mellium.im/xmpp/history"
|
||||||
"mellium.im/xmpp/jid"
|
"mellium.im/xmpp/jid"
|
||||||
"mellium.im/xmpp/mux"
|
"mellium.im/xmpp/mux"
|
||||||
"mellium.im/xmpp/stanza"
|
"mellium.im/xmpp/stanza"
|
||||||
"unifiedpush.org/go/np2p_dbus/distributor"
|
"unifiedpush.org/go/np2p_dbus/distributor"
|
||||||
"unifiedpush.org/go/np2p_dbus/storage"
|
|
||||||
|
|
||||||
|
"dev.sum7.eu/genofire/unified-push-xmpp/distributor/storage"
|
||||||
"dev.sum7.eu/genofire/unified-push-xmpp/messages"
|
"dev.sum7.eu/genofire/unified-push-xmpp/messages"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -74,23 +75,43 @@ func (s *XMPPService) Run(dbus *distributor.DBus, store *storage.Storage) error
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
go s.checkServer()
|
go s.checkServer()
|
||||||
go s.selectGateway()
|
|
||||||
|
go func() {
|
||||||
|
s.selectGateway()
|
||||||
|
latestID, _ := store.GetLatestNotif()
|
||||||
|
log.Debug("Starting history fetch")
|
||||||
|
q := history.Query{With: s.gateway, Start: time.UnixMilli(latestID)}
|
||||||
|
_, _ = history.Fetch(context.TODO(), q, s.session.LocalAddr().Bare(), s.session)
|
||||||
|
log.Debug("Finished history fetch")
|
||||||
|
}()
|
||||||
|
|
||||||
log.Debug("xmpp client is running")
|
log.Debug("xmpp client is running")
|
||||||
s.session.Serve(mux.New(
|
s.session.Serve(mux.New(
|
||||||
// disco.Handle(),
|
// disco.Handle(),
|
||||||
mux.MessageFunc("", xml.Name{Local: "subject"}, s.message),
|
mux.MessageFunc("", xml.Name{Local: "subject"}, s.HandleMessage),
|
||||||
|
history.Handle(history.NewHandler(s)),
|
||||||
))
|
))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// handler of incoming message - forward to DBUS
|
// handler of incoming HandleMessage - forward to DBUS
|
||||||
func (s *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEncoder) error {
|
func (s *XMPPService) HandleMessage(msgHead stanza.Message, t xmlstream.TokenReadEncoder) error {
|
||||||
logger := log.WithFields(map[string]interface{}{
|
logger := log.WithFields(map[string]interface{}{
|
||||||
"to": msgHead.To.String(),
|
"to": msgHead.To.String(),
|
||||||
"from": msgHead.From.String(),
|
"from": msgHead.From.String(),
|
||||||
"id": msgHead.ID,
|
"id": msgHead.ID,
|
||||||
})
|
})
|
||||||
|
|
||||||
d := xml.NewTokenDecoder(t)
|
d := xml.NewTokenDecoder(t)
|
||||||
|
|
||||||
|
//probably temporary workaround for history receiving
|
||||||
|
if msgHead.From.String() == "" {
|
||||||
|
_, _ = d.Token()
|
||||||
|
_, _ = d.Token()
|
||||||
|
_, _ = d.Token()
|
||||||
|
msgHead.From = s.gateway
|
||||||
|
}
|
||||||
|
|
||||||
msg := messages.MessageBody{}
|
msg := messages.MessageBody{}
|
||||||
err := d.Decode(&msg)
|
err := d.Decode(&msg)
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
|
@ -107,6 +128,10 @@ func (s *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEncod
|
||||||
"content": msg.Body,
|
"content": msg.Body,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
if err := s.store.SaveLatestNotif(time.Now().UnixMilli()); err != nil {
|
||||||
|
logger.Errorf("Cannot save latest push id: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
conn := s.store.GetConnectionbyPublic(msg.PublicToken)
|
conn := s.store.GetConnectionbyPublic(msg.PublicToken)
|
||||||
if conn == nil {
|
if conn == nil {
|
||||||
logger.Warnf("no appID and appToken found for publicToken")
|
logger.Warnf("no appID and appToken found for publicToken")
|
||||||
|
@ -114,7 +139,8 @@ func (s *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEncod
|
||||||
}
|
}
|
||||||
from := msgHead.From.String()
|
from := msgHead.From.String()
|
||||||
if settings := strings.Split(conn.Settings, ":"); !(len(settings) > 1 && settings[0] == from) {
|
if settings := strings.Split(conn.Settings, ":"); !(len(settings) > 1 && settings[0] == from) {
|
||||||
log.WithField("from", from).Info("message not from gateway, that is no notification")
|
log.WithField("from", from).WithField("set", settings[0]).Info("message not from gateway, that is no notification")
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
logger = logger.WithFields(map[string]interface{}{
|
logger = logger.WithFields(map[string]interface{}{
|
||||||
|
@ -127,6 +153,7 @@ func (s *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEncod
|
||||||
logger.Errorf("Error send unified push: %q", err)
|
logger.Errorf("Error send unified push: %q", err)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Infof("receive unified push")
|
logger.Infof("receive unified push")
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
Loading…
Reference in New Issue