distributor: timeout XMPP requests

This commit is contained in:
Geno 2021-09-15 19:06:11 +02:00
parent 889563b8b2
commit 47b46549ee
1 changed files with 23 additions and 12 deletions

View File

@ -6,6 +6,7 @@ import (
"encoding/xml" "encoding/xml"
"errors" "errors"
"io" "io"
"time"
"github.com/bdlm/log" "github.com/bdlm/log"
"github.com/google/uuid" "github.com/google/uuid"
@ -42,8 +43,10 @@ func (s *XMPPService) Run(dbus *distributor.DBus, store *storage.Storage) error
s.dbus = dbus s.dbus = dbus
s.store = store s.store = store
j := jid.MustParse(s.Login) j := jid.MustParse(s.Login)
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second))
defer cancel()
if s.session, err = xmpp.DialClientSession( if s.session, err = xmpp.DialClientSession(
context.TODO(), j, ctx, j,
xmpp.BindResource(), xmpp.BindResource(),
xmpp.StartTLS(&tls.Config{ xmpp.StartTLS(&tls.Config{
ServerName: j.Domain().String(), ServerName: j.Domain().String(),
@ -65,7 +68,7 @@ func (s *XMPPService) Run(dbus *distributor.DBus, store *storage.Storage) error
} }
}() }()
// Send initial presence to let the server know we want to receive messages. // Send initial presence to let the server know we want to receive messages.
err = s.session.Send(context.TODO(), stanza.Presence{Type: stanza.AvailablePresence}.Wrap(nil)) err = s.session.Send(ctx, stanza.Presence{Type: stanza.AvailablePresence}.Wrap(nil))
if err != nil { if err != nil {
return err return err
} }
@ -79,13 +82,13 @@ func (s *XMPPService) Run(dbus *distributor.DBus, store *storage.Storage) error
if err := s.checkGateway(); err != nil { if err := s.checkGateway(); err != nil {
log.Panicf("no gateway found: %v", err) log.Panicf("no gateway found: %v", err)
} else { } else {
log.Info("using found UnifiedPush") log.WithField("gateway", s.gateway.String()).Info("using found UnifiedPush")
} }
} else { } else {
if err := s.testAndUseGateway(gateway); err != nil { if err := s.testAndUseGateway(gateway); err != nil {
log.Panic(err) log.Panic(err)
} else { } else {
log.Info("using configured UnifiedPush") log.WithField("gateway", s.gateway.String()).Info("using configured UnifiedPush")
} }
} }
}() }()
@ -148,9 +151,11 @@ func (s *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEncod
// checkServer - background job // checkServer - background job
func (s *XMPPService) checkServer() error { func (s *XMPPService) checkServer() error {
domain := s.session.LocalAddr().Domain() domain := s.session.LocalAddr().Domain()
logger := log.WithField("instance", domain) logger := log.WithField("instance", domain.String())
logger.Debug("check running") logger.Debug("check running")
info, err := disco.GetInfo(context.TODO(), "", domain, s.session) ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*500))
defer cancel()
info, err := disco.GetInfo(ctx, "", domain, s.session)
if err != nil { if err != nil {
return err return err
} }
@ -173,8 +178,10 @@ func (s *XMPPService) checkServer() error {
// checkGateway - background job // checkGateway - background job
func (s *XMPPService) checkGateway() error { func (s *XMPPService) checkGateway() error {
domain := s.session.LocalAddr().Domain() domain := s.session.LocalAddr().Domain()
log.WithField("instance", domain).Infof("no gateway configured, try to find one on your instance") log.WithField("instance", domain.String()).Infof("no gateway configured, try to find one on your instance")
iter := disco.FetchItemsIQ(context.TODO(), "", stanza.IQ{To: domain}, s.session) ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*500))
defer cancel()
iter := disco.FetchItemsIQ(ctx, "", stanza.IQ{To: domain}, s.session)
if err := iter.Err(); err != nil { if err := iter.Err(); err != nil {
iter.Close() iter.Close()
return err return err
@ -194,18 +201,20 @@ func (s *XMPPService) checkGateway() error {
return nil return nil
} }
} }
log.WithField("gateway", XMPPUPDemoJID).Infof("no UnifiedPush gateway on your instance - try demo server") log.WithField("gateway", XMPPUPDemoJID.String()).Infof("no UnifiedPush gateway on your instance - try demo server")
return s.testAndUseGateway(XMPPUPDemoJID) return s.testAndUseGateway(XMPPUPDemoJID)
} }
func (s *XMPPService) testAndUseGateway(address jid.JID) error { func (s *XMPPService) testAndUseGateway(address jid.JID) error {
info, err := disco.GetInfo(context.TODO(), "", address, s.session) ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*500))
defer cancel()
info, err := disco.GetInfo(ctx, "", address, s.session)
if err != nil { if err != nil {
return err return err
} }
for _, f := range info.Features { for _, f := range info.Features {
if f.Var == messages.Space { if f.Var == messages.Space {
s.gateway = address s.gateway = address
log.WithField("gateway", s.gateway).Debug("tested UnifiedPush XMPP gateway should work") log.WithField("gateway", s.gateway.String()).Debug("tested UnifiedPush XMPP gateway should work")
return nil return nil
} }
} }
@ -227,7 +236,9 @@ func (s *XMPPService) Register(appID, appToken string) (string, string, error) {
}, },
} }
iq.Register.Token = &messages.TokenData{Body: publicToken} iq.Register.Token = &messages.TokenData{Body: publicToken}
t, err := s.session.EncodeIQ(context.TODO(), iq) ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*500))
defer cancel()
t, err := s.session.EncodeIQ(ctx, iq)
if err != nil { if err != nil {
logger.Errorf("xmpp send IQ for register: %v", err) logger.Errorf("xmpp send IQ for register: %v", err)
return "", "xmpp unable send iq to gateway", err return "", "xmpp unable send iq to gateway", err