Browse Source

distributor: use storage always and keep gateway in storage connection settings

pull/15/head
genofire 10 months ago
parent
commit
9c4919007a
  1. 5
      distributor/go.mod
  2. 3
      distributor/go.sum
  3. 42
      distributor/xmpp.go

5
distributor/go.mod

@ -11,7 +11,7 @@ require (
mellium.im/sasl v0.2.1
mellium.im/xmlstream v0.15.3-0.20210221202126-7cc1407dad4c
mellium.im/xmpp v0.19.1-0.20210916033628-404b735d69e5
unifiedpush.org/go/np2p_dbus v0.0.0-20210916020553-6eec6f305585
unifiedpush.org/go/np2p_dbus v0.0.0-20210916024230-0bcac24079b3
)
require (
@ -29,9 +29,12 @@ require (
golang.org/x/text v0.3.7 // indirect
golang.org/x/tools v0.1.5 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
gopkg.in/ini.v1 v1.63.1 // indirect
gorm.io/driver/sqlite v1.1.5 // indirect
gorm.io/gorm v1.21.15 // indirect
mellium.im/reader v0.1.0 // indirect
)
replace dev.sum7.eu/genofire/unified-push-xmpp/messages => ../messages
replace mellium.im/xmpp => ../../../../mellium.im/xmpp

3
distributor/go.sum

@ -700,6 +700,7 @@ gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8
gopkg.in/go-playground/validator.v9 v9.29.1/go.mod h1:+c9/zcJMFNgbLvly1L1V+PpxWdVbfP1avr/N00E2vyQ=
gopkg.in/inconshreveable/log15.v2 v2.0.0-20180818164646-67afb5ed74ec/go.mod h1:aPpfJ7XW+gOuirDoZ8gHhLh3kZ1B08FtV2bbmy7Jv3s=
gopkg.in/ini.v1 v1.57.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/ini.v1 v1.63.1 h1:WlmD2fPTg4maPpRITalGs62TK7VMMtP5E9CHH7aFy6Y=
gopkg.in/ini.v1 v1.63.1/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/mail.v2 v2.3.1/go.mod h1:htwXN1Qh09vZJ1NVKxQqHPBaCBbzKhp5GzuJEA4VJWw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
@ -749,3 +750,5 @@ rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
unifiedpush.org/go/np2p_dbus v0.0.0-20210916020553-6eec6f305585 h1:KVZ5crwEMgSPdxum+abczj/5IWGhE603sPPItqoQDDY=
unifiedpush.org/go/np2p_dbus v0.0.0-20210916020553-6eec6f305585/go.mod h1:4ug2cMRBjAeOiQVmArby6gg9GH0ZYvb8WXZ+yhrgzlw=
unifiedpush.org/go/np2p_dbus v0.0.0-20210916024230-0bcac24079b3 h1:zGMxFWjJEnVw11fTHO32WpR2JaqZbDVRT7/IA6IWEc4=
unifiedpush.org/go/np2p_dbus v0.0.0-20210916024230-0bcac24079b3/go.mod h1:4ug2cMRBjAeOiQVmArby6gg9GH0ZYvb8WXZ+yhrgzlw=

42
distributor/xmpp.go

@ -9,7 +9,6 @@ import (
"time"
"github.com/bdlm/log"
"github.com/google/uuid"
"mellium.im/sasl"
"mellium.im/xmlstream"
"mellium.im/xmpp"
@ -114,11 +113,6 @@ func (s *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEncod
log.WithField("msg", msg).Errorf("error decoding message: %q", err)
return nil
}
from := msgHead.From.Bare().String()
if from != s.gateway.String() {
log.WithField("from", from).Info("message not from gateway, that is no notification")
return nil
}
if msg.Body == "" || msg.PublicToken == "" {
log.Infof("empty: %v", msgHead)
@ -134,6 +128,11 @@ func (s *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEncod
logger.Warnf("no appID and appToken found for publicToken")
return nil
}
from := msgHead.From.String()
if from != conn.Settings {
log.WithField("from", from).Info("message not from gateway, that is no notification")
return nil
}
logger = logger.WithFields(map[string]interface{}{
"appID": conn.AppID,
"appToken": conn.AppToken,
@ -224,19 +223,25 @@ func (s *XMPPService) testAndUseGateway(address jid.JID) error {
// Register handler of DBUS Distribution
func (s *XMPPService) Register(appID, appToken string) (string, string, error) {
publicToken := uuid.New().String()
logger := log.WithFields(map[string]interface{}{
"appID": appID,
"appToken": appToken,
"publicToken": publicToken,
"appID": appID,
"appToken": appToken,
})
conn := s.store.NewConnection(appID, appToken, s.gateway.String())
if conn == nil {
errStr := "error to store public token"
err := errors.New(errStr)
logger.WithField("error", err).Error("unable to register")
return "", errStr, err
}
logger = logger.WithField("publicToken", conn.PublicToken)
iq := messages.RegisterIQ{
IQ: stanza.IQ{
Type: stanza.SetIQ,
To: s.gateway,
},
}
iq.Register.Token = &messages.TokenData{Body: publicToken}
iq.Register.Token = &messages.TokenData{Body: conn.PublicToken}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*500))
defer cancel()
t, err := s.session.EncodeIQ(ctx, iq)
@ -256,17 +261,8 @@ func (s *XMPPService) Register(appID, appToken string) (string, string, error) {
return "", "xmpp unable recv iq to gateway", err
}
if endpoint := iqRegister.Register.Endpoint; endpoint != nil {
logger = logger.WithField("endpoint", endpoint.Body)
// update Endpoint
conn := s.store.NewConnectionWithToken(appID, appToken, publicToken, endpoint.Body)
if conn == nil {
errStr := "error to store public token"
err = errors.New(errStr)
logger.WithField("error", err).Error("unable to register")
return "", errStr, err
}
logger.Info("success")
return conn.Endpoint, "", nil
logger.WithField("endpoint", endpoint.Body).Info("success")
return endpoint.Body, "", nil
}
errStr := "Unknown Error"
if errr := iqRegister.Register.Error; errr != nil {
@ -290,7 +286,7 @@ func (s *XMPPService) Unregister(appToken string) {
logger = logger.WithFields(map[string]interface{}{
"appID": conn.AppID,
"publicToken": conn.PublicToken,
"endpoint": conn.Endpoint,
"gateway": conn.Settings,
})
if err = s.dbus.NewConnector(conn.AppID).Unregistered(conn.AppToken); err != nil {
logger.WithField("error", err).Error("send unregister per dbus ")

Loading…
Cancel
Save