From 9c4919007a3f6c6d4c27cd7d7e4a2b2604432457 Mon Sep 17 00:00:00 2001 From: genofire Date: Thu, 16 Sep 2021 15:31:48 +0200 Subject: [PATCH] distributor: use storage always and keep gateway in storage connection settings --- distributor/go.mod | 5 ++++- distributor/go.sum | 3 +++ distributor/xmpp.go | 42 +++++++++++++++++++----------------------- 3 files changed, 26 insertions(+), 24 deletions(-) diff --git a/distributor/go.mod b/distributor/go.mod index bb76793..ab19954 100644 --- a/distributor/go.mod +++ b/distributor/go.mod @@ -11,7 +11,7 @@ require ( mellium.im/sasl v0.2.1 mellium.im/xmlstream v0.15.3-0.20210221202126-7cc1407dad4c mellium.im/xmpp v0.19.1-0.20210916033628-404b735d69e5 - unifiedpush.org/go/np2p_dbus v0.0.0-20210916020553-6eec6f305585 + unifiedpush.org/go/np2p_dbus v0.0.0-20210916024230-0bcac24079b3 ) require ( @@ -29,9 +29,12 @@ require ( golang.org/x/text v0.3.7 // indirect golang.org/x/tools v0.1.5 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect + gopkg.in/ini.v1 v1.63.1 // indirect gorm.io/driver/sqlite v1.1.5 // indirect gorm.io/gorm v1.21.15 // indirect mellium.im/reader v0.1.0 // indirect ) replace dev.sum7.eu/genofire/unified-push-xmpp/messages => ../messages + +replace mellium.im/xmpp => ../../../../mellium.im/xmpp diff --git a/distributor/go.sum b/distributor/go.sum index f768da9..716d5f2 100644 --- a/distributor/go.sum +++ b/distributor/go.sum @@ -700,6 +700,7 @@ gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8 gopkg.in/go-playground/validator.v9 v9.29.1/go.mod h1:+c9/zcJMFNgbLvly1L1V+PpxWdVbfP1avr/N00E2vyQ= gopkg.in/inconshreveable/log15.v2 v2.0.0-20180818164646-67afb5ed74ec/go.mod h1:aPpfJ7XW+gOuirDoZ8gHhLh3kZ1B08FtV2bbmy7Jv3s= gopkg.in/ini.v1 v1.57.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/ini.v1 v1.63.1 h1:WlmD2fPTg4maPpRITalGs62TK7VMMtP5E9CHH7aFy6Y= gopkg.in/ini.v1 v1.63.1/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/mail.v2 v2.3.1/go.mod h1:htwXN1Qh09vZJ1NVKxQqHPBaCBbzKhp5GzuJEA4VJWw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= @@ -749,3 +750,5 @@ rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= unifiedpush.org/go/np2p_dbus v0.0.0-20210916020553-6eec6f305585 h1:KVZ5crwEMgSPdxum+abczj/5IWGhE603sPPItqoQDDY= unifiedpush.org/go/np2p_dbus v0.0.0-20210916020553-6eec6f305585/go.mod h1:4ug2cMRBjAeOiQVmArby6gg9GH0ZYvb8WXZ+yhrgzlw= +unifiedpush.org/go/np2p_dbus v0.0.0-20210916024230-0bcac24079b3 h1:zGMxFWjJEnVw11fTHO32WpR2JaqZbDVRT7/IA6IWEc4= +unifiedpush.org/go/np2p_dbus v0.0.0-20210916024230-0bcac24079b3/go.mod h1:4ug2cMRBjAeOiQVmArby6gg9GH0ZYvb8WXZ+yhrgzlw= diff --git a/distributor/xmpp.go b/distributor/xmpp.go index b1793f0..abbd360 100644 --- a/distributor/xmpp.go +++ b/distributor/xmpp.go @@ -9,7 +9,6 @@ import ( "time" "github.com/bdlm/log" - "github.com/google/uuid" "mellium.im/sasl" "mellium.im/xmlstream" "mellium.im/xmpp" @@ -114,11 +113,6 @@ func (s *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEncod log.WithField("msg", msg).Errorf("error decoding message: %q", err) return nil } - from := msgHead.From.Bare().String() - if from != s.gateway.String() { - log.WithField("from", from).Info("message not from gateway, that is no notification") - return nil - } if msg.Body == "" || msg.PublicToken == "" { log.Infof("empty: %v", msgHead) @@ -134,6 +128,11 @@ func (s *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEncod logger.Warnf("no appID and appToken found for publicToken") return nil } + from := msgHead.From.String() + if from != conn.Settings { + log.WithField("from", from).Info("message not from gateway, that is no notification") + return nil + } logger = logger.WithFields(map[string]interface{}{ "appID": conn.AppID, "appToken": conn.AppToken, @@ -224,19 +223,25 @@ func (s *XMPPService) testAndUseGateway(address jid.JID) error { // Register handler of DBUS Distribution func (s *XMPPService) Register(appID, appToken string) (string, string, error) { - publicToken := uuid.New().String() logger := log.WithFields(map[string]interface{}{ - "appID": appID, - "appToken": appToken, - "publicToken": publicToken, + "appID": appID, + "appToken": appToken, }) + conn := s.store.NewConnection(appID, appToken, s.gateway.String()) + if conn == nil { + errStr := "error to store public token" + err := errors.New(errStr) + logger.WithField("error", err).Error("unable to register") + return "", errStr, err + } + logger = logger.WithField("publicToken", conn.PublicToken) iq := messages.RegisterIQ{ IQ: stanza.IQ{ Type: stanza.SetIQ, To: s.gateway, }, } - iq.Register.Token = &messages.TokenData{Body: publicToken} + iq.Register.Token = &messages.TokenData{Body: conn.PublicToken} ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*500)) defer cancel() t, err := s.session.EncodeIQ(ctx, iq) @@ -256,17 +261,8 @@ func (s *XMPPService) Register(appID, appToken string) (string, string, error) { return "", "xmpp unable recv iq to gateway", err } if endpoint := iqRegister.Register.Endpoint; endpoint != nil { - logger = logger.WithField("endpoint", endpoint.Body) - // update Endpoint - conn := s.store.NewConnectionWithToken(appID, appToken, publicToken, endpoint.Body) - if conn == nil { - errStr := "error to store public token" - err = errors.New(errStr) - logger.WithField("error", err).Error("unable to register") - return "", errStr, err - } - logger.Info("success") - return conn.Endpoint, "", nil + logger.WithField("endpoint", endpoint.Body).Info("success") + return endpoint.Body, "", nil } errStr := "Unknown Error" if errr := iqRegister.Register.Error; errr != nil { @@ -290,7 +286,7 @@ func (s *XMPPService) Unregister(appToken string) { logger = logger.WithFields(map[string]interface{}{ "appID": conn.AppID, "publicToken": conn.PublicToken, - "endpoint": conn.Endpoint, + "gateway": conn.Settings, }) if err = s.dbus.NewConnector(conn.AppID).Unregistered(conn.AppToken); err != nil { logger.WithField("error", err).Error("send unregister per dbus ")