xmpp works - sadly with new generated ressource on gateway (we need an component)

This commit is contained in:
Geno 2021-09-11 17:16:38 +02:00
parent 7057859bfe
commit 950a147042
6 changed files with 230 additions and 66 deletions

View File

@ -12,6 +12,7 @@ require (
) )
require ( require (
dev.sum7.eu/genofire/unified-push-xmpp/messages v0.0.0-00010101000000-000000000000 // indirect
github.com/bdlm/std v1.0.1 // indirect github.com/bdlm/std v1.0.1 // indirect
github.com/godbus/dbus/v5 v5.0.4 // indirect github.com/godbus/dbus/v5 v5.0.4 // indirect
github.com/naoina/go-stringutil v0.1.0 // indirect github.com/naoina/go-stringutil v0.1.0 // indirect
@ -24,3 +25,5 @@ require (
golang.org/x/tools v0.0.0-20200103221440-774c71fcf114 // indirect golang.org/x/tools v0.0.0-20200103221440-774c71fcf114 // indirect
mellium.im/reader v0.1.0 // indirect mellium.im/reader v0.1.0 // indirect
) )
replace dev.sum7.eu/genofire/unified-push-xmpp/messages => ../messages

View File

@ -22,7 +22,7 @@ func main() {
} }
dbus = distributor.NewDBus("org.unifiedpush.Distributor.xmpp") dbus = distributor.NewDBus("org.unifiedpush.Distributor.xmpp")
dbus.StartHandling(handler{}) dbus.StartHandling(config)
log.Info("startup") log.Info("startup")
if err := config.Run(dbus); err != nil { if err := config.Run(dbus); err != nil {

View File

@ -5,15 +5,20 @@ import (
"crypto/tls" "crypto/tls"
"encoding/xml" "encoding/xml"
"io" "io"
"errors"
"fmt"
"strings"
"github.com/bdlm/log" "github.com/bdlm/log"
"mellium.im/sasl" "mellium.im/sasl"
"mellium.im/xmlstream" "mellium.im/xmlstream"
"mellium.im/xmpp" "mellium.im/xmpp"
"mellium.im/xmpp/mux"
"mellium.im/xmpp/jid" "mellium.im/xmpp/jid"
"mellium.im/xmpp/mux"
"mellium.im/xmpp/stanza" "mellium.im/xmpp/stanza"
"unifiedpush.org/go/np2p_dbus/distributor" "unifiedpush.org/go/np2p_dbus/distributor"
"dev.sum7.eu/genofire/unified-push-xmpp/messages"
) )
type XMPPService struct { type XMPPService struct {
@ -21,58 +26,76 @@ type XMPPService struct {
Password string Password string
Gateway string Gateway string
dbus *distributor.DBus dbus *distributor.DBus
session *xmpp.Session
} }
func (xs *XMPPService) Run(dbus *distributor.DBus) error { func (s *XMPPService) Run(dbus *distributor.DBus) error {
xs.dbus = dbus var err error
j := jid.MustParse(xs.Login) s.dbus = dbus
s, err := xmpp.DialClientSession( j := jid.MustParse(s.Login)
if s.session, err = xmpp.DialClientSession(
context.TODO(), j, context.TODO(), j,
xmpp.BindResource(), xmpp.BindResource(),
xmpp.StartTLS(&tls.Config{ xmpp.StartTLS(&tls.Config{
ServerName: j.Domain().String(), ServerName: j.Domain().String(),
}), }),
//TODO sasl.ScramSha1Plus <- problem with (my) ejabberd //TODO sasl.ScramSha1Plus <- problem with (my) ejabberd
//xmpp.SASL("", xs.Password, sasl.ScramSha1Plus, sasl.ScramSha1, sasl.Plain), //xmpp.SASL("", s.Password, sasl.ScramSha1Plus, sasl.ScramSha1, sasl.Plain),
xmpp.SASL("", xs.Password, sasl.ScramSha1, sasl.Plain), xmpp.SASL("", s.Password, sasl.ScramSha1, sasl.Plain),
) ); err != nil {
if err != nil {
return err return err
} }
defer func() { defer func() {
log.Info("Closing session…") log.Info("Closing session…")
if err := s.Close(); err != nil { if err := s.session.Close(); err != nil {
log.Errorf("Error closing session: %q", err) log.Errorf("Error closing session: %q", err)
} }
log.Println("Closing conn…") log.Println("Closing conn…")
if err := s.Conn().Close(); err != nil { if err := s.session.Conn().Close(); err != nil {
log.Errorf("Error closing connection: %q", err) log.Errorf("Error closing connection: %q", err)
} }
}() }()
// Send initial presence to let the server know we want to receive messages. // Send initial presence to let the server know we want to receive messages.
err = s.Send(context.TODO(), stanza.Presence{Type: stanza.AvailablePresence}.Wrap(nil)) err = s.session.Send(context.TODO(), stanza.Presence{Type: stanza.AvailablePresence}.Wrap(nil))
if err != nil { if err != nil {
return err return err
} }
s.Serve(mux.New( // Send subscripe to ask for allowing sending IQ (Register/Unregister)
mux.MessageFunc("",xml.Name{Local: "subject"}, xs.message), err = s.session.Send(context.TODO(), stanza.Presence{
Type: stanza.SubscribePresence,
To: jid.MustParse(s.Gateway),
}.Wrap(nil))
if err != nil {
return err
}
s.session.Serve(mux.New(
mux.MessageFunc("", xml.Name{Local: "subject"}, s.message),
mux.PresenceFunc(stanza.SubscribePresence, xml.Name{}, s.autoSubscribe),
)) ))
return nil return nil
} }
func (xs *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEncoder) error { // autoSubscribe to allow sending IQ
func (s *XMPPService) autoSubscribe(presHead stanza.Presence, t xmlstream.TokenReadEncoder) error {
log.WithField("p", presHead).Info("autoSubscribe")
t.Encode(stanza.Presence{
Type: stanza.SubscribedPresence,
To: presHead.From,
})
return nil
}
// handler of incoming message - forward to DBUS
func (s *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEncoder) error {
d := xml.NewTokenDecoder(t) d := xml.NewTokenDecoder(t)
msg := struct { msg := messages.MessageBody{}
Token string `xml:"subject"`
Body string `xml:"body"`
}{}
err := d.Decode(&msg) err := d.Decode(&msg)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
log.WithField("msg", msg).Errorf("Error decoding message: %q", err) log.WithField("msg", msg).Errorf("Error decoding message: %q", err)
return nil return nil
} }
from := msgHead.From.Bare().String() from := msgHead.From.Bare().String()
if xs.Gateway == "" || from != xs.Gateway { if s.Gateway == "" || from != s.Gateway {
log.WithField("from", from).Info("message not from gateway, that is no notification") log.WithField("from", from).Info("message not from gateway, that is no notification")
return nil return nil
} }
@ -82,10 +105,15 @@ func (xs *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEnco
return nil return nil
} }
token := strings.SplitN(msg.Token, "/", 2)
if len(token) >= 2 {
log.WithField("token", msg.Token).Errorf("unable to parse token")
return nil
}
//TODO Lockup for appid by token in storage //TODO Lockup for appid by token in storage
if xs.dbus. if s.dbus.
NewConnector("cc.malhotra.karmanyaah.testapp.golibrary"). NewConnector(token[0]).
Message(msg.Token, msg.Body, msgHead.ID) != nil { Message(token[1], msg.Body, msgHead.ID) != nil {
log.Errorf("Error send unified push: %q", err) log.Errorf("Error send unified push: %q", err)
return nil return nil
} }
@ -93,3 +121,49 @@ func (xs *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEnco
return nil return nil
} }
// Register handler of DBUS Distribution
func (s *XMPPService) Register(appName, token string) (string, string, error) {
logger := log.WithFields(map[string]interface{}{
"name": appName,
"token": token,
})
iq := messages.RegisterIQ{
IQ: stanza.IQ{
Type: stanza.SetIQ,
To: jid.MustParse(s.Gateway),
},
}
externalToken := fmt.Sprintf("%s/%s", appName, token)
iq.Register.Token = &messages.TokenData{ Body: externalToken }
t, err := s.session.EncodeIQ(context.TODO(), iq)
if err != nil {
logger.Errorf("xmpp send IQ for register: %v", err)
return "", "xmpp unable send iq to gateway", err
}
d := xml.NewTokenDecoder(t)
iqRegister := messages.RegisterIQ{}
if err := d.Decode(&iqRegister); err != nil {
logger.Errorf("xmpp recv IQ for register: %v", err)
return "", "xmpp unable recv iq to gateway", err
}
if endpoint := iqRegister.Register.Endpoint; endpoint != nil {
logger.WithField("endpoint", endpoint.Body).Info("success")
return endpoint.Body, "", nil
}
errStr := "Unknown Error"
if errr := iqRegister.Register.Error; errr != nil {
errStr = errr.Body
}
err = errors.New(errStr)
logger.WithField("error", err).Error("unable to register")
return "", errStr, err
}
// Unregister handler of DBUS Distribution
func (xs *XMPPService) Unregister(token string) {
log.WithFields(map[string]interface{}{
"token": token,
}).Info("distributor-unregister")
appID := ""
_ = xs.dbus.NewConnector(appID).Unregistered(token)
}

View File

@ -11,6 +11,7 @@ require (
) )
require ( require (
dev.sum7.eu/genofire/unified-push-xmpp/messages v0.0.0-00010101000000-000000000000 // indirect
github.com/bdlm/std v1.0.1 // indirect github.com/bdlm/std v1.0.1 // indirect
github.com/naoina/go-stringutil v0.1.0 // indirect github.com/naoina/go-stringutil v0.1.0 // indirect
github.com/naoina/toml v0.1.1 // indirect github.com/naoina/toml v0.1.1 // indirect
@ -22,3 +23,5 @@ require (
golang.org/x/tools v0.0.0-20200103221440-774c71fcf114 // indirect golang.org/x/tools v0.0.0-20200103221440-774c71fcf114 // indirect
mellium.im/reader v0.1.0 // indirect mellium.im/reader v0.1.0 // indirect
) )
replace dev.sum7.eu/genofire/unified-push-xmpp/messages => ../messages

View File

@ -4,75 +4,150 @@ import (
"context" "context"
"crypto/tls" "crypto/tls"
"encoding/xml" "encoding/xml"
"io"
"github.com/bdlm/log" "github.com/bdlm/log"
"mellium.im/sasl" "mellium.im/sasl"
"mellium.im/xmlstream" "mellium.im/xmlstream"
"mellium.im/xmpp/mux"
"mellium.im/xmpp" "mellium.im/xmpp"
"mellium.im/xmpp/jid" "mellium.im/xmpp/jid"
"mellium.im/xmpp/mux"
"mellium.im/xmpp/stanza" "mellium.im/xmpp/stanza"
"dev.sum7.eu/genofire/unified-push-xmpp/messages"
) )
type XMPPService struct { type XMPPService struct {
Login string Login string
Password string Password string
session *xmpp.Session
} }
// XMLElement is for Unmarshal undefined structs a fallback - any hasn't matched element func (s *XMPPService) Run() error {
type XMLElement struct { var err error
XMLName xml.Name j := jid.MustParse(s.Login)
InnerXML string `xml:",innerxml"` if s.session, err = xmpp.DialClientSession(
}
func (xs *XMPPService) Run() error {
j := jid.MustParse(xs.Login)
s, err := xmpp.DialClientSession(
context.TODO(), j, context.TODO(), j,
xmpp.BindResource(), xmpp.BindCustom(func(i jid.JID,r string) (jid.JID, error) {
// Never run
log.Infof("try to bind: %v with ressource %s", i, r)
return j, nil
}),
xmpp.StartTLS(&tls.Config{ xmpp.StartTLS(&tls.Config{
ServerName: j.Domain().String(), ServerName: j.Domain().String(),
}), }),
// sasl.ScramSha1Plus <- problem with (my) ejabberd // sasl.ScramSha1Plus <- problem with (my) ejabberd
//xmpp.SASL("", xs.Password, sasl.ScramSha1Plus, sasl.ScramSha1, sasl.Plain), //xmpp.SASL("", xs.Password, sasl.ScramSha1Plus, sasl.ScramSha1, sasl.Plain),
xmpp.SASL("", xs.Password, sasl.ScramSha1, sasl.Plain), xmpp.SASL("", s.Password, sasl.ScramSha1, sasl.Plain),
) ); err != nil {
if err != nil {
return err return err
} }
defer func() { defer func() {
log.Info("Closing session…") log.Info("Closing session…")
if err := s.Close(); err != nil { if err := s.session.Close(); err != nil {
log.Errorf("Error closing session: %q", err) log.Errorf("Error closing session: %q", err)
} }
log.Println("Closing conn…") log.Println("Closing conn…")
if err := s.Conn().Close(); err != nil { if err := s.session.Conn().Close(); err != nil {
log.Errorf("Error closing connection: %q", err) log.Errorf("Error closing connection: %q", err)
} }
}() }()
// Send initial presence to let the server know we want to receive messages. // Send initial presence to let the server know we want to receive messages.
err = s.Send(context.TODO(), stanza.Presence{Type: stanza.AvailablePresence}.Wrap(nil)) err = s.session.Send(context.TODO(), stanza.Presence{Type: stanza.AvailablePresence}.Wrap(nil))
if err != nil { if err != nil {
return err return err
} }
// TODO log.Infof("connected with %s", s.session.LocalAddr())
err = s.Encode(context.TODO(), struct{ s.session.Serve(mux.New(
stanza.Message // register - get + set
Token string `xml:"subject,omitempty"` mux.IQFunc(stanza.SetIQ, xml.Name{Local: messages.LocalRegister, Space: messages.Space}, s.handleRegister),
Body string `xml:"body,omitempty"` mux.IQFunc(stanza.GetIQ, xml.Name{Local: messages.LocalRegister, Space: messages.Space}, s.handleRegister),
}{ // unregister - get + set
mux.IQFunc(stanza.SetIQ, xml.Name{Local: messages.LocalUnregister, Space: messages.Space}, s.handleUnregister),
mux.IQFunc(stanza.GetIQ, xml.Name{Local: messages.LocalUnregister, Space: messages.Space}, s.handleUnregister),
// auto accept
mux.PresenceFunc(stanza.SubscribePresence, xml.Name{}, s.autoSubscribe),
))
return nil
}
// autoSubscribe to allow sending IQ
func (s *XMPPService) autoSubscribe(presHead stanza.Presence, t xmlstream.TokenReadEncoder) error {
log.WithField("p", presHead).Info("autoSubscribe")
// request eighter
t.Encode(stanza.Presence{
Type: stanza.SubscribePresence,
To: presHead.From,
})
// agree
t.Encode(stanza.Presence{
Type: stanza.SubscribedPresence,
To: presHead.From,
})
return nil
}
func (s *XMPPService) handleRegister(iq stanza.IQ, t xmlstream.TokenReadEncoder, start *xml.StartElement) error {
reply := messages.RegisterIQ{
IQ: stanza.IQ{
ID: iq.ID,
Type: stanza.ErrorIQ,
To: iq.From,
},
}
defer func(){
if err := t.Encode(reply); err != nil {
log.Errorf("sending response: %v", err)
}
}()
log.Infof("recieved iq: %v", iq)
tokenData := messages.TokenData{}
err := xml.NewTokenDecoder(t).Decode(&tokenData)
if err != nil && err != io.EOF {
log.Errorf("Error decoding message: %q", err)
reply.Register.Error = &messages.ErrorData{ Body: "unable decode"}
return nil
}
token := tokenData.Body
if token == "" {
log.Errorf("no token found: %v", token)
reply.Register.Error = &messages.ErrorData{ Body: "no token"}
return nil
}
endpoint := "https://localhost/UP?token=" + token + "&to=" +iq.From.String()
reply.IQ.Type = stanza.ResultIQ
reply.Register.Endpoint = &messages.EndpointData{ Body: endpoint}
log.Infof("generate respone: %v", endpoint)
return nil
}
func (s *XMPPService) handleUnregister(iq stanza.IQ, t xmlstream.TokenReadEncoder, start *xml.StartElement) error {
reply := messages.UnregisterIQ{
IQ: stanza.IQ{
ID: iq.ID,
Type: stanza.ErrorIQ,
To: iq.From,
},
}
defer func(){
if err := t.Encode(reply); err != nil {
log.Errorf("sending response: %v", err)
}
}()
log.Infof("unhandled: %v", start)
reply.Unregister.Error = "not implemented"
return nil
}
// SendMessage of an UP Notification
func (s *XMPPService) SendMessage(to, token, content string) error {
return s.session.Encode(context.TODO(), messages.Message{
Message: stanza.Message{ Message: stanza.Message{
To: jid.MustParse("up-test@chat.sum7.eu"), To: jid.MustParse(to),
// Type: stanza.ChatMessage, // Type: stanza.ChatMessage,
Type: stanza.NormalMessage, Type: stanza.NormalMessage,
}, },
Token: "691499b4-adaf-4a92-b417-40e9a68f04a6", Token: token,
Body: "New Message ;) - Titel of UP-Developing", Body: content,
}) })
s.Serve(mux.New())
return nil
}
func (xs *XMPPService) HandleXMPP(t xmlstream.TokenReadEncoder, start *xml.StartElement) error {
log.Infof("unhandled: %v", start)
return nil
} }

View File

@ -17,18 +17,27 @@ const (
// RegisterIQ with stanza // RegisterIQ with stanza
type RegisterIQ struct { type RegisterIQ struct {
stanza.IQ stanza.IQ
Register Register struct{
XMLName xml.Name `xml:"unifiedpush.org register"`
Token *TokenData `xml:"token"`
Endpoint *EndpointData `xml:"endpoint"`
Error *ErrorData `xml:"error"`
} `xml:"register"`
} }
// Register without stanza type TokenData struct {
type Register struct { XMLName xml.Name `xml:"token"`
XMLName xml.Name `xml:"unifiedpush.org register"` Body string `xml:",chardata"`
// set }
Token string `xml:"token,omitempty"`
// result type EndpointData struct {
Endpoint string `xml:"endpoint,omitempty"` XMLName xml.Name `xml:"endpoint"`
// error Body string `xml:",chardata"`
Error string `xml:"error,omitempty"` }
type ErrorData struct {
XMLName xml.Name `xml:"error"`
Body string `xml:",chardata"`
} }
// UnregisterIQ with stanza // UnregisterIQ with stanza