forked from genofire/unified-push-xmpp
Compare commits
2 Commits
Author | SHA1 | Date |
---|---|---|
genofire | 612d577b21 | |
karmanyaahm | 8eb9ba32b5 |
|
@ -6,10 +6,10 @@ import (
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
"dev.sum7.eu/genofire/golang-lib/file"
|
"dev.sum7.eu/genofire/golang-lib/file"
|
||||||
"dev.sum7.eu/genofire/unified-push-xmpp/distributor/storage"
|
|
||||||
"github.com/bdlm/log"
|
"github.com/bdlm/log"
|
||||||
"github.com/bdlm/std/logger"
|
"github.com/bdlm/std/logger"
|
||||||
"unifiedpush.org/go/np2p_dbus/distributor"
|
"unifiedpush.org/go/np2p_dbus/distributor"
|
||||||
|
"unifiedpush.org/go/np2p_dbus/storage"
|
||||||
)
|
)
|
||||||
|
|
||||||
var dbus *distributor.DBus
|
var dbus *distributor.DBus
|
||||||
|
|
|
@ -1,38 +0,0 @@
|
||||||
package storage
|
|
||||||
|
|
||||||
import (
|
|
||||||
"gorm.io/gorm"
|
|
||||||
"unifiedpush.org/go/np2p_dbus/storage"
|
|
||||||
)
|
|
||||||
|
|
||||||
func InitStorage(path string) (*Storage, error) {
|
|
||||||
st, err := storage.InitStorage(path)
|
|
||||||
db := st.DB()
|
|
||||||
|
|
||||||
if err := db.AutoMigrate(&latestID{}); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &Storage{Storage: st, db: st.DB()}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
type Storage struct {
|
|
||||||
*storage.Storage
|
|
||||||
db *gorm.DB
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d Storage) GetLatestNotif() (int64, error) {
|
|
||||||
id := latestID{}
|
|
||||||
result := d.db.First(&id)
|
|
||||||
return id.ID, result.Error
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d Storage) SaveLatestNotif(time int64) error {
|
|
||||||
id := latestID{}
|
|
||||||
result := d.db.Model(&id).Where("1=1").Update("id", time)
|
|
||||||
return result.Error
|
|
||||||
}
|
|
||||||
|
|
||||||
type latestID struct {
|
|
||||||
ID int64
|
|
||||||
}
|
|
|
@ -14,13 +14,12 @@ import (
|
||||||
"mellium.im/xmlstream"
|
"mellium.im/xmlstream"
|
||||||
"mellium.im/xmpp"
|
"mellium.im/xmpp"
|
||||||
"mellium.im/xmpp/disco"
|
"mellium.im/xmpp/disco"
|
||||||
"mellium.im/xmpp/history"
|
|
||||||
"mellium.im/xmpp/jid"
|
"mellium.im/xmpp/jid"
|
||||||
"mellium.im/xmpp/mux"
|
"mellium.im/xmpp/mux"
|
||||||
"mellium.im/xmpp/stanza"
|
"mellium.im/xmpp/stanza"
|
||||||
"unifiedpush.org/go/np2p_dbus/distributor"
|
"unifiedpush.org/go/np2p_dbus/distributor"
|
||||||
|
"unifiedpush.org/go/np2p_dbus/storage"
|
||||||
|
|
||||||
"dev.sum7.eu/genofire/unified-push-xmpp/distributor/storage"
|
|
||||||
"dev.sum7.eu/genofire/unified-push-xmpp/messages"
|
"dev.sum7.eu/genofire/unified-push-xmpp/messages"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -75,43 +74,23 @@ func (s *XMPPService) Run(dbus *distributor.DBus, store *storage.Storage) error
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
go s.checkServer()
|
go s.checkServer()
|
||||||
|
go s.selectGateway()
|
||||||
go func() {
|
|
||||||
s.selectGateway()
|
|
||||||
latestID, _ := store.GetLatestNotif()
|
|
||||||
log.Debug("Starting history fetch")
|
|
||||||
q := history.Query{With: s.gateway, Start: time.UnixMilli(latestID)}
|
|
||||||
_, _ = history.Fetch(context.TODO(), q, s.session.LocalAddr().Bare(), s.session)
|
|
||||||
log.Debug("Finished history fetch")
|
|
||||||
}()
|
|
||||||
|
|
||||||
log.Debug("xmpp client is running")
|
log.Debug("xmpp client is running")
|
||||||
s.session.Serve(mux.New(
|
s.session.Serve(mux.New(
|
||||||
// disco.Handle(),
|
// disco.Handle(),
|
||||||
mux.MessageFunc("", xml.Name{Local: "subject"}, s.HandleMessage),
|
mux.MessageFunc("", xml.Name{Local: "subject"}, s.message),
|
||||||
history.Handle(history.NewHandler(s)),
|
|
||||||
))
|
))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// handler of incoming HandleMessage - forward to DBUS
|
// handler of incoming message - forward to DBUS
|
||||||
func (s *XMPPService) HandleMessage(msgHead stanza.Message, t xmlstream.TokenReadEncoder) error {
|
func (s *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEncoder) error {
|
||||||
logger := log.WithFields(map[string]interface{}{
|
logger := log.WithFields(map[string]interface{}{
|
||||||
"to": msgHead.To.String(),
|
"to": msgHead.To.String(),
|
||||||
"from": msgHead.From.String(),
|
"from": msgHead.From.String(),
|
||||||
"id": msgHead.ID,
|
"id": msgHead.ID,
|
||||||
})
|
})
|
||||||
|
|
||||||
d := xml.NewTokenDecoder(t)
|
d := xml.NewTokenDecoder(t)
|
||||||
|
|
||||||
//probably temporary workaround for history receiving
|
|
||||||
if msgHead.From.String() == "" {
|
|
||||||
_, _ = d.Token()
|
|
||||||
_, _ = d.Token()
|
|
||||||
_, _ = d.Token()
|
|
||||||
msgHead.From = s.gateway
|
|
||||||
}
|
|
||||||
|
|
||||||
msg := messages.MessageBody{}
|
msg := messages.MessageBody{}
|
||||||
err := d.Decode(&msg)
|
err := d.Decode(&msg)
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
|
@ -128,10 +107,6 @@ func (s *XMPPService) HandleMessage(msgHead stanza.Message, t xmlstream.TokenRea
|
||||||
"content": msg.Body,
|
"content": msg.Body,
|
||||||
})
|
})
|
||||||
|
|
||||||
if err := s.store.SaveLatestNotif(time.Now().UnixMilli()); err != nil {
|
|
||||||
logger.Errorf("Cannot save latest push id: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
conn := s.store.GetConnectionbyPublic(msg.PublicToken)
|
conn := s.store.GetConnectionbyPublic(msg.PublicToken)
|
||||||
if conn == nil {
|
if conn == nil {
|
||||||
logger.Warnf("no appID and appToken found for publicToken")
|
logger.Warnf("no appID and appToken found for publicToken")
|
||||||
|
@ -139,8 +114,7 @@ func (s *XMPPService) HandleMessage(msgHead stanza.Message, t xmlstream.TokenRea
|
||||||
}
|
}
|
||||||
from := msgHead.From.String()
|
from := msgHead.From.String()
|
||||||
if settings := strings.Split(conn.Settings, ":"); !(len(settings) > 1 && settings[0] == from) {
|
if settings := strings.Split(conn.Settings, ":"); !(len(settings) > 1 && settings[0] == from) {
|
||||||
log.WithField("from", from).WithField("set", settings[0]).Info("message not from gateway, that is no notification")
|
log.WithField("from", from).Info("message not from gateway, that is no notification")
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
logger = logger.WithFields(map[string]interface{}{
|
logger = logger.WithFields(map[string]interface{}{
|
||||||
|
@ -153,7 +127,6 @@ func (s *XMPPService) HandleMessage(msgHead stanza.Message, t xmlstream.TokenRea
|
||||||
logger.Errorf("Error send unified push: %q", err)
|
logger.Errorf("Error send unified push: %q", err)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Infof("receive unified push")
|
logger.Infof("receive unified push")
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
Loading…
Reference in New Issue