From 47b46549ee0f51b76777213b8399655c01eff2f6 Mon Sep 17 00:00:00 2001 From: Geno Date: Wed, 15 Sep 2021 19:06:11 +0200 Subject: [PATCH] distributor: timeout XMPP requests --- distributor/xmpp.go | 35 +++++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/distributor/xmpp.go b/distributor/xmpp.go index d4bdf69..8e08ade 100644 --- a/distributor/xmpp.go +++ b/distributor/xmpp.go @@ -6,6 +6,7 @@ import ( "encoding/xml" "errors" "io" + "time" "github.com/bdlm/log" "github.com/google/uuid" @@ -42,8 +43,10 @@ func (s *XMPPService) Run(dbus *distributor.DBus, store *storage.Storage) error s.dbus = dbus s.store = store j := jid.MustParse(s.Login) + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second)) + defer cancel() if s.session, err = xmpp.DialClientSession( - context.TODO(), j, + ctx, j, xmpp.BindResource(), xmpp.StartTLS(&tls.Config{ ServerName: j.Domain().String(), @@ -65,7 +68,7 @@ func (s *XMPPService) Run(dbus *distributor.DBus, store *storage.Storage) error } }() // Send initial presence to let the server know we want to receive messages. - err = s.session.Send(context.TODO(), stanza.Presence{Type: stanza.AvailablePresence}.Wrap(nil)) + err = s.session.Send(ctx, stanza.Presence{Type: stanza.AvailablePresence}.Wrap(nil)) if err != nil { return err } @@ -79,13 +82,13 @@ func (s *XMPPService) Run(dbus *distributor.DBus, store *storage.Storage) error if err := s.checkGateway(); err != nil { log.Panicf("no gateway found: %v", err) } else { - log.Info("using found UnifiedPush") + log.WithField("gateway", s.gateway.String()).Info("using found UnifiedPush") } } else { if err := s.testAndUseGateway(gateway); err != nil { log.Panic(err) } else { - log.Info("using configured UnifiedPush") + log.WithField("gateway", s.gateway.String()).Info("using configured UnifiedPush") } } }() @@ -148,9 +151,11 @@ func (s *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEncod // checkServer - background job func (s *XMPPService) checkServer() error { domain := s.session.LocalAddr().Domain() - logger := log.WithField("instance", domain) + logger := log.WithField("instance", domain.String()) logger.Debug("check running") - info, err := disco.GetInfo(context.TODO(), "", domain, s.session) + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*500)) + defer cancel() + info, err := disco.GetInfo(ctx, "", domain, s.session) if err != nil { return err } @@ -173,8 +178,10 @@ func (s *XMPPService) checkServer() error { // checkGateway - background job func (s *XMPPService) checkGateway() error { domain := s.session.LocalAddr().Domain() - log.WithField("instance", domain).Infof("no gateway configured, try to find one on your instance") - iter := disco.FetchItemsIQ(context.TODO(), "", stanza.IQ{To: domain}, s.session) + log.WithField("instance", domain.String()).Infof("no gateway configured, try to find one on your instance") + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*500)) + defer cancel() + iter := disco.FetchItemsIQ(ctx, "", stanza.IQ{To: domain}, s.session) if err := iter.Err(); err != nil { iter.Close() return err @@ -194,18 +201,20 @@ func (s *XMPPService) checkGateway() error { return nil } } - log.WithField("gateway", XMPPUPDemoJID).Infof("no UnifiedPush gateway on your instance - try demo server") + log.WithField("gateway", XMPPUPDemoJID.String()).Infof("no UnifiedPush gateway on your instance - try demo server") return s.testAndUseGateway(XMPPUPDemoJID) } func (s *XMPPService) testAndUseGateway(address jid.JID) error { - info, err := disco.GetInfo(context.TODO(), "", address, s.session) + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*500)) + defer cancel() + info, err := disco.GetInfo(ctx, "", address, s.session) if err != nil { return err } for _, f := range info.Features { if f.Var == messages.Space { s.gateway = address - log.WithField("gateway", s.gateway).Debug("tested UnifiedPush XMPP gateway should work") + log.WithField("gateway", s.gateway.String()).Debug("tested UnifiedPush XMPP gateway should work") return nil } } @@ -227,7 +236,9 @@ func (s *XMPPService) Register(appID, appToken string) (string, string, error) { }, } iq.Register.Token = &messages.TokenData{Body: publicToken} - t, err := s.session.EncodeIQ(context.TODO(), iq) + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*500)) + defer cancel() + t, err := s.session.EncodeIQ(ctx, iq) if err != nil { logger.Errorf("xmpp send IQ for register: %v", err) return "", "xmpp unable send iq to gateway", err