distributor: use service discovery to check gateway (and fallback to demo)

fix #12
This commit is contained in:
Geno 2021-09-15 18:42:08 +02:00
parent 63aa921252
commit e2562c93c2
2 changed files with 76 additions and 5 deletions

View File

@ -61,7 +61,7 @@ func main() {
dbus = distributor.NewDBus("org.unifiedpush.Distributor.xmpp") dbus = distributor.NewDBus("org.unifiedpush.Distributor.xmpp")
dbus.StartHandling(&config.XMPP) dbus.StartHandling(&config.XMPP)
log.Info("startup") log.Debug("startup ...")
if err := config.XMPP.Run(dbus, store); err != nil { if err := config.XMPP.Run(dbus, store); err != nil {
log.Errorf("startup xmpp: %v", err) log.Errorf("startup xmpp: %v", err)
} }

View File

@ -22,10 +22,16 @@ import (
"dev.sum7.eu/genofire/unified-push-xmpp/messages" "dev.sum7.eu/genofire/unified-push-xmpp/messages"
) )
// Demo Server as fallback
var (
XMPPUPDemoJID = jid.MustParse("up.chat.sum7.eu")
)
type XMPPService struct { type XMPPService struct {
Login string Login string
Password string Password string
Gateway string Gateway string
gateway jid.JID
dbus *distributor.DBus dbus *distributor.DBus
session *xmpp.Session session *xmpp.Session
store *storage.Storage store *storage.Storage
@ -63,7 +69,27 @@ func (s *XMPPService) Run(dbus *distributor.DBus, store *storage.Storage) error
if err != nil { if err != nil {
return err return err
} }
go s.checkServer() go func() {
if err := s.checkServer(); err != nil {
log.Errorf("check server: %v", err)
}
}()
go func() {
if gateway, err := jid.Parse(s.Gateway); err != nil {
if err := s.checkGateway(); err != nil {
log.Panicf("no gateway found: %v", err)
} else {
log.Info("using found UnifiedPush")
}
} else {
if err := s.testAndUseGateway(gateway); err != nil {
log.Panic(err)
} else {
log.Info("using configured UnifiedPush")
}
}
}()
log.Debug("xmpp client is running")
s.session.Serve(mux.New( s.session.Serve(mux.New(
// disco.Handle(), // disco.Handle(),
mux.MessageFunc("", xml.Name{Local: "subject"}, s.message), mux.MessageFunc("", xml.Name{Local: "subject"}, s.message),
@ -86,7 +112,7 @@ func (s *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEncod
return nil return nil
} }
from := msgHead.From.Bare().String() from := msgHead.From.Bare().String()
if s.Gateway == "" || from != s.Gateway { if from != s.gateway.String() {
log.WithField("from", from).Info("message not from gateway, that is no notification") log.WithField("from", from).Info("message not from gateway, that is no notification")
return nil return nil
} }
@ -122,11 +148,13 @@ func (s *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEncod
// checkServer - background job // checkServer - background job
func (s *XMPPService) checkServer() error { func (s *XMPPService) checkServer() error {
domain := s.session.LocalAddr().Domain() domain := s.session.LocalAddr().Domain()
log.Infof("your instant is %s - check running", domain) logger := log.WithField("instance", domain)
logger.Debug("check running")
info, err := disco.GetInfo(context.TODO(), "", domain, s.session) info, err := disco.GetInfo(context.TODO(), "", domain, s.session)
if err != nil { if err != nil {
return err return err
} }
// check if server support msgoffline // check if server support msgoffline
supportMSGOffline := false supportMSGOffline := false
for _, f := range info.Features { for _, f := range info.Features {
@ -138,9 +166,52 @@ func (s *XMPPService) checkServer() error {
if !supportMSGOffline { if !supportMSGOffline {
log.Warn("your server does not support offline messages (XEP-0160) - it is need to deliever messages later, if this distributer has current no connection") log.Warn("your server does not support offline messages (XEP-0160) - it is need to deliever messages later, if this distributer has current no connection")
} }
logger.Info("your instance checked")
return nil return nil
} }
// checkGateway - background job
func (s *XMPPService) checkGateway() error {
domain := s.session.LocalAddr().Domain()
log.WithField("instance", domain).Infof("no gateway configured, try to find one on your instance")
iter := disco.FetchItemsIQ(context.TODO(), "", stanza.IQ{To: domain}, s.session)
if err := iter.Err(); err != nil {
iter.Close()
return err
}
addresses := []jid.JID{iter.Item().JID}
for iter.Next() {
if err := iter.Err(); err != nil {
iter.Close()
return err
}
addresses = append(addresses, iter.Item().JID)
}
iter.Close()
for _, j := range addresses {
log.Debugf("check for UnifiedPush gateway: %s", j)
if err := s.testAndUseGateway(j); err == nil {
return nil
}
}
log.WithField("gateway", XMPPUPDemoJID).Infof("no UnifiedPush gateway on your instance - try demo server")
return s.testAndUseGateway(XMPPUPDemoJID)
}
func (s *XMPPService) testAndUseGateway(address jid.JID) error {
info, err := disco.GetInfo(context.TODO(), "", address, s.session)
if err != nil {
return err
}
for _, f := range info.Features {
if f.Var == messages.Space {
s.gateway = address
log.WithField("gateway", s.gateway).Debug("tested UnifiedPush XMPP gateway should work")
return nil
}
}
return errors.New("this is no UnifiedPush gateway")
}
// Register handler of DBUS Distribution // Register handler of DBUS Distribution
func (s *XMPPService) Register(appID, appToken string) (string, string, error) { func (s *XMPPService) Register(appID, appToken string) (string, string, error) {
publicToken := uuid.New().String() publicToken := uuid.New().String()
@ -152,7 +223,7 @@ func (s *XMPPService) Register(appID, appToken string) (string, string, error) {
iq := messages.RegisterIQ{ iq := messages.RegisterIQ{
IQ: stanza.IQ{ IQ: stanza.IQ{
Type: stanza.SetIQ, Type: stanza.SetIQ,
To: jid.MustParse(s.Gateway), To: s.gateway,
}, },
} }
iq.Register.Token = &messages.TokenData{Body: publicToken} iq.Register.Token = &messages.TokenData{Body: publicToken}