distributor: register again, if gateway changed (configurable)

This commit is contained in:
genofire 2021-09-16 15:33:13 +02:00
parent 9c4919007a
commit db0b61ab46
2 changed files with 51 additions and 33 deletions

View File

@ -4,4 +4,7 @@ storage_path = ""
[xmpp] [xmpp]
login = "YOUR-ACCOUNT@chat.sum7.eu" login = "YOUR-ACCOUNT@chat.sum7.eu"
password = "CHANGEME" password = "CHANGEME"
gateway = "up.chat.sum7.eu" # if not configured will try to find one on your instance and fallback to demo-server "up.chat.sum7.eu"
gateway = ""
# keep stored gateway of application or register they again on new gateway
keep_gateway = false

View File

@ -28,13 +28,14 @@ var (
) )
type XMPPService struct { type XMPPService struct {
Login string Login string `toml:"login"`
Password string Password string `toml:"password"`
Gateway string Gateway string `toml:"gateway"`
gateway jid.JID KeepGateway bool `toml:"keep_gateway"`
dbus *distributor.DBus gateway jid.JID
session *xmpp.Session dbus *distributor.DBus
store *storage.Storage session *xmpp.Session
store *storage.Storage
} }
func (s *XMPPService) Run(dbus *distributor.DBus, store *storage.Storage) error { func (s *XMPPService) Run(dbus *distributor.DBus, store *storage.Storage) error {
@ -71,26 +72,8 @@ func (s *XMPPService) Run(dbus *distributor.DBus, store *storage.Storage) error
if err != nil { if err != nil {
return err return err
} }
go func() { go s.checkServer()
if err := s.checkServer(); err != nil { go s.selectGateway()
log.Errorf("check server: %v", err)
}
}()
go func() {
if gateway, err := jid.Parse(s.Gateway); err != nil {
if err := s.checkGateway(); err != nil {
log.Panicf("no gateway found: %v", err)
} else {
log.WithField("gateway", s.gateway.String()).Info("using found UnifiedPush")
}
} else {
if err := s.testAndUseGateway(gateway); err != nil {
log.Panic(err)
} else {
log.WithField("gateway", s.gateway.String()).Info("using configured UnifiedPush")
}
}
}()
log.Debug("xmpp client is running") log.Debug("xmpp client is running")
s.session.Serve(mux.New( s.session.Serve(mux.New(
// disco.Handle(), // disco.Handle(),
@ -149,7 +132,7 @@ func (s *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEncod
} }
// checkServer - background job // checkServer - background job
func (s *XMPPService) checkServer() error { func (s *XMPPService) checkServer() {
domain := s.session.LocalAddr().Domain() domain := s.session.LocalAddr().Domain()
logger := log.WithField("instance", domain.String()) logger := log.WithField("instance", domain.String())
logger.Debug("check running") logger.Debug("check running")
@ -157,7 +140,8 @@ func (s *XMPPService) checkServer() error {
defer cancel() defer cancel()
info, err := disco.GetInfo(ctx, "", domain, s.session) info, err := disco.GetInfo(ctx, "", domain, s.session)
if err != nil { if err != nil {
return err log.Errorf("check server: %v", err)
return
} }
// check if server support msgoffline // check if server support msgoffline
@ -172,11 +156,40 @@ func (s *XMPPService) checkServer() error {
log.Warn("your server does not support offline messages (XEP-0160) - it is need to deliever messages later, if this distributer has current no connection") log.Warn("your server does not support offline messages (XEP-0160) - it is need to deliever messages later, if this distributer has current no connection")
} }
logger.Info("your instance checked") logger.Info("your instance checked")
return nil return
} }
// checkGateway - background job // selectGateway - background job
func (s *XMPPService) checkGateway() error { func (s *XMPPService) selectGateway() {
if gateway, err := jid.Parse(s.Gateway); err != nil {
if err := s.findGateway(); err != nil {
log.Panicf("no gateway found: %v", err)
} else {
log.WithField("gateway", s.gateway.String()).Info("using found UnifiedPush")
}
} else {
if err := s.testAndUseGateway(gateway); err != nil {
log.Panic(err)
} else {
log.WithField("gateway", s.gateway.String()).Info("using configured UnifiedPush")
}
}
// function to renew endpoint if new gateway was detected
if s.KeepGateway {
return
}
conns := s.store.GetUnequalSettings(s.gateway.String())
if len(conns) <= 0 {
return
}
log.WithField("count", len(conns)).Info("register apps for new gateway")
for _, i := range conns {
s.Register(i.AppID, i.AppToken)
}
}
// findGateway
func (s *XMPPService) findGateway() error {
domain := s.session.LocalAddr().Domain() domain := s.session.LocalAddr().Domain()
log.WithField("instance", domain.String()).Infof("no gateway configured, try to find one on your instance") log.WithField("instance", domain.String()).Infof("no gateway configured, try to find one on your instance")
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*500)) ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*500))
@ -204,6 +217,8 @@ func (s *XMPPService) checkGateway() error {
log.WithField("gateway", XMPPUPDemoJID.String()).Infof("no UnifiedPush gateway on your instance - try demo server") log.WithField("gateway", XMPPUPDemoJID.String()).Infof("no UnifiedPush gateway on your instance - try demo server")
return s.testAndUseGateway(XMPPUPDemoJID) return s.testAndUseGateway(XMPPUPDemoJID)
} }
// testAndUseGateway
func (s *XMPPService) testAndUseGateway(address jid.JID) error { func (s *XMPPService) testAndUseGateway(address jid.JID) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*500)) ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*500))
defer cancel() defer cancel()