From db0b61ab460d04b7b1331d7a097a64e206828fae Mon Sep 17 00:00:00 2001 From: genofire Date: Thu, 16 Sep 2021 15:33:13 +0200 Subject: [PATCH] distributor: register again, if gateway changed (configurable) --- distributor/config_example.toml | 5 ++- distributor/xmpp.go | 79 ++++++++++++++++++++------------- 2 files changed, 51 insertions(+), 33 deletions(-) diff --git a/distributor/config_example.toml b/distributor/config_example.toml index 76e18b0..47ca5ba 100644 --- a/distributor/config_example.toml +++ b/distributor/config_example.toml @@ -4,4 +4,7 @@ storage_path = "" [xmpp] login = "YOUR-ACCOUNT@chat.sum7.eu" password = "CHANGEME" -gateway = "up.chat.sum7.eu" +# if not configured will try to find one on your instance and fallback to demo-server "up.chat.sum7.eu" +gateway = "" +# keep stored gateway of application or register they again on new gateway +keep_gateway = false diff --git a/distributor/xmpp.go b/distributor/xmpp.go index abbd360..9a87ce3 100644 --- a/distributor/xmpp.go +++ b/distributor/xmpp.go @@ -28,13 +28,14 @@ var ( ) type XMPPService struct { - Login string - Password string - Gateway string - gateway jid.JID - dbus *distributor.DBus - session *xmpp.Session - store *storage.Storage + Login string `toml:"login"` + Password string `toml:"password"` + Gateway string `toml:"gateway"` + KeepGateway bool `toml:"keep_gateway"` + gateway jid.JID + dbus *distributor.DBus + session *xmpp.Session + store *storage.Storage } func (s *XMPPService) Run(dbus *distributor.DBus, store *storage.Storage) error { @@ -71,26 +72,8 @@ func (s *XMPPService) Run(dbus *distributor.DBus, store *storage.Storage) error if err != nil { return err } - go func() { - if err := s.checkServer(); err != nil { - log.Errorf("check server: %v", err) - } - }() - go func() { - if gateway, err := jid.Parse(s.Gateway); err != nil { - if err := s.checkGateway(); err != nil { - log.Panicf("no gateway found: %v", err) - } else { - log.WithField("gateway", s.gateway.String()).Info("using found UnifiedPush") - } - } else { - if err := s.testAndUseGateway(gateway); err != nil { - log.Panic(err) - } else { - log.WithField("gateway", s.gateway.String()).Info("using configured UnifiedPush") - } - } - }() + go s.checkServer() + go s.selectGateway() log.Debug("xmpp client is running") s.session.Serve(mux.New( // disco.Handle(), @@ -149,7 +132,7 @@ func (s *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEncod } // checkServer - background job -func (s *XMPPService) checkServer() error { +func (s *XMPPService) checkServer() { domain := s.session.LocalAddr().Domain() logger := log.WithField("instance", domain.String()) logger.Debug("check running") @@ -157,7 +140,8 @@ func (s *XMPPService) checkServer() error { defer cancel() info, err := disco.GetInfo(ctx, "", domain, s.session) if err != nil { - return err + log.Errorf("check server: %v", err) + return } // check if server support msgoffline @@ -172,11 +156,40 @@ func (s *XMPPService) checkServer() error { log.Warn("your server does not support offline messages (XEP-0160) - it is need to deliever messages later, if this distributer has current no connection") } logger.Info("your instance checked") - return nil + return } -// checkGateway - background job -func (s *XMPPService) checkGateway() error { +// selectGateway - background job +func (s *XMPPService) selectGateway() { + if gateway, err := jid.Parse(s.Gateway); err != nil { + if err := s.findGateway(); err != nil { + log.Panicf("no gateway found: %v", err) + } else { + log.WithField("gateway", s.gateway.String()).Info("using found UnifiedPush") + } + } else { + if err := s.testAndUseGateway(gateway); err != nil { + log.Panic(err) + } else { + log.WithField("gateway", s.gateway.String()).Info("using configured UnifiedPush") + } + } + // function to renew endpoint if new gateway was detected + if s.KeepGateway { + return + } + conns := s.store.GetUnequalSettings(s.gateway.String()) + if len(conns) <= 0 { + return + } + log.WithField("count", len(conns)).Info("register apps for new gateway") + for _, i := range conns { + s.Register(i.AppID, i.AppToken) + } +} + +// findGateway +func (s *XMPPService) findGateway() error { domain := s.session.LocalAddr().Domain() log.WithField("instance", domain.String()).Infof("no gateway configured, try to find one on your instance") ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*500)) @@ -204,6 +217,8 @@ func (s *XMPPService) checkGateway() error { log.WithField("gateway", XMPPUPDemoJID.String()).Infof("no UnifiedPush gateway on your instance - try demo server") return s.testAndUseGateway(XMPPUPDemoJID) } + +// testAndUseGateway func (s *XMPPService) testAndUseGateway(address jid.JID) error { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*500)) defer cancel()