From e2562c93c22b0703479e5d17eaa35e29e9f607b4 Mon Sep 17 00:00:00 2001 From: Geno Date: Wed, 15 Sep 2021 18:42:08 +0200 Subject: [PATCH] distributor: use service discovery to check gateway (and fallback to demo) fix #12 --- distributor/main.go | 2 +- distributor/xmpp.go | 79 ++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 76 insertions(+), 5 deletions(-) diff --git a/distributor/main.go b/distributor/main.go index 39383c1..90a6642 100644 --- a/distributor/main.go +++ b/distributor/main.go @@ -61,7 +61,7 @@ func main() { dbus = distributor.NewDBus("org.unifiedpush.Distributor.xmpp") dbus.StartHandling(&config.XMPP) - log.Info("startup") + log.Debug("startup ...") if err := config.XMPP.Run(dbus, store); err != nil { log.Errorf("startup xmpp: %v", err) } diff --git a/distributor/xmpp.go b/distributor/xmpp.go index 526cd27..d4bdf69 100644 --- a/distributor/xmpp.go +++ b/distributor/xmpp.go @@ -22,10 +22,16 @@ import ( "dev.sum7.eu/genofire/unified-push-xmpp/messages" ) +// Demo Server as fallback +var ( + XMPPUPDemoJID = jid.MustParse("up.chat.sum7.eu") +) + type XMPPService struct { Login string Password string Gateway string + gateway jid.JID dbus *distributor.DBus session *xmpp.Session store *storage.Storage @@ -63,7 +69,27 @@ func (s *XMPPService) Run(dbus *distributor.DBus, store *storage.Storage) error if err != nil { return err } - go s.checkServer() + go func() { + if err := s.checkServer(); err != nil { + log.Errorf("check server: %v", err) + } + }() + go func() { + if gateway, err := jid.Parse(s.Gateway); err != nil { + if err := s.checkGateway(); err != nil { + log.Panicf("no gateway found: %v", err) + } else { + log.Info("using found UnifiedPush") + } + } else { + if err := s.testAndUseGateway(gateway); err != nil { + log.Panic(err) + } else { + log.Info("using configured UnifiedPush") + } + } + }() + log.Debug("xmpp client is running") s.session.Serve(mux.New( // disco.Handle(), mux.MessageFunc("", xml.Name{Local: "subject"}, s.message), @@ -86,7 +112,7 @@ func (s *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEncod return nil } from := msgHead.From.Bare().String() - if s.Gateway == "" || from != s.Gateway { + if from != s.gateway.String() { log.WithField("from", from).Info("message not from gateway, that is no notification") return nil } @@ -122,11 +148,13 @@ func (s *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEncod // checkServer - background job func (s *XMPPService) checkServer() error { domain := s.session.LocalAddr().Domain() - log.Infof("your instant is %s - check running", domain) + logger := log.WithField("instance", domain) + logger.Debug("check running") info, err := disco.GetInfo(context.TODO(), "", domain, s.session) if err != nil { return err } + // check if server support msgoffline supportMSGOffline := false for _, f := range info.Features { @@ -138,9 +166,52 @@ func (s *XMPPService) checkServer() error { if !supportMSGOffline { log.Warn("your server does not support offline messages (XEP-0160) - it is need to deliever messages later, if this distributer has current no connection") } + logger.Info("your instance checked") return nil } +// checkGateway - background job +func (s *XMPPService) checkGateway() error { + domain := s.session.LocalAddr().Domain() + log.WithField("instance", domain).Infof("no gateway configured, try to find one on your instance") + iter := disco.FetchItemsIQ(context.TODO(), "", stanza.IQ{To: domain}, s.session) + if err := iter.Err(); err != nil { + iter.Close() + return err + } + addresses := []jid.JID{iter.Item().JID} + for iter.Next() { + if err := iter.Err(); err != nil { + iter.Close() + return err + } + addresses = append(addresses, iter.Item().JID) + } + iter.Close() + for _, j := range addresses { + log.Debugf("check for UnifiedPush gateway: %s", j) + if err := s.testAndUseGateway(j); err == nil { + return nil + } + } + log.WithField("gateway", XMPPUPDemoJID).Infof("no UnifiedPush gateway on your instance - try demo server") + return s.testAndUseGateway(XMPPUPDemoJID) +} +func (s *XMPPService) testAndUseGateway(address jid.JID) error { + info, err := disco.GetInfo(context.TODO(), "", address, s.session) + if err != nil { + return err + } + for _, f := range info.Features { + if f.Var == messages.Space { + s.gateway = address + log.WithField("gateway", s.gateway).Debug("tested UnifiedPush XMPP gateway should work") + return nil + } + } + return errors.New("this is no UnifiedPush gateway") +} + // Register handler of DBUS Distribution func (s *XMPPService) Register(appID, appToken string) (string, string, error) { publicToken := uuid.New().String() @@ -152,7 +223,7 @@ func (s *XMPPService) Register(appID, appToken string) (string, string, error) { iq := messages.RegisterIQ{ IQ: stanza.IQ{ Type: stanza.SetIQ, - To: jid.MustParse(s.Gateway), + To: s.gateway, }, } iq.Register.Token = &messages.TokenData{Body: publicToken}