diff --git a/distributor/go.mod b/distributor/go.mod index 5beb441..2415eeb 100644 --- a/distributor/go.mod +++ b/distributor/go.mod @@ -12,6 +12,7 @@ require ( ) require ( + dev.sum7.eu/genofire/unified-push-xmpp/messages v0.0.0-00010101000000-000000000000 // indirect github.com/bdlm/std v1.0.1 // indirect github.com/godbus/dbus/v5 v5.0.4 // indirect github.com/naoina/go-stringutil v0.1.0 // indirect @@ -24,3 +25,5 @@ require ( golang.org/x/tools v0.0.0-20200103221440-774c71fcf114 // indirect mellium.im/reader v0.1.0 // indirect ) + +replace dev.sum7.eu/genofire/unified-push-xmpp/messages => ../messages diff --git a/distributor/main.go b/distributor/main.go index 71b39f4..b780a6d 100644 --- a/distributor/main.go +++ b/distributor/main.go @@ -22,7 +22,7 @@ func main() { } dbus = distributor.NewDBus("org.unifiedpush.Distributor.xmpp") - dbus.StartHandling(handler{}) + dbus.StartHandling(config) log.Info("startup") if err := config.Run(dbus); err != nil { diff --git a/distributor/xmpp.go b/distributor/xmpp.go index 3cdc1e4..6bf3079 100644 --- a/distributor/xmpp.go +++ b/distributor/xmpp.go @@ -5,15 +5,20 @@ import ( "crypto/tls" "encoding/xml" "io" + "errors" + "fmt" + "strings" "github.com/bdlm/log" "mellium.im/sasl" "mellium.im/xmlstream" "mellium.im/xmpp" - "mellium.im/xmpp/mux" "mellium.im/xmpp/jid" + "mellium.im/xmpp/mux" "mellium.im/xmpp/stanza" "unifiedpush.org/go/np2p_dbus/distributor" + + "dev.sum7.eu/genofire/unified-push-xmpp/messages" ) type XMPPService struct { @@ -21,58 +26,76 @@ type XMPPService struct { Password string Gateway string dbus *distributor.DBus + session *xmpp.Session } -func (xs *XMPPService) Run(dbus *distributor.DBus) error { - xs.dbus = dbus - j := jid.MustParse(xs.Login) - s, err := xmpp.DialClientSession( +func (s *XMPPService) Run(dbus *distributor.DBus) error { + var err error + s.dbus = dbus + j := jid.MustParse(s.Login) + if s.session, err = xmpp.DialClientSession( context.TODO(), j, xmpp.BindResource(), xmpp.StartTLS(&tls.Config{ ServerName: j.Domain().String(), }), //TODO sasl.ScramSha1Plus <- problem with (my) ejabberd - //xmpp.SASL("", xs.Password, sasl.ScramSha1Plus, sasl.ScramSha1, sasl.Plain), - xmpp.SASL("", xs.Password, sasl.ScramSha1, sasl.Plain), - ) - if err != nil { + //xmpp.SASL("", s.Password, sasl.ScramSha1Plus, sasl.ScramSha1, sasl.Plain), + xmpp.SASL("", s.Password, sasl.ScramSha1, sasl.Plain), + ); err != nil { return err } defer func() { log.Info("Closing session…") - if err := s.Close(); err != nil { + if err := s.session.Close(); err != nil { log.Errorf("Error closing session: %q", err) } log.Println("Closing conn…") - if err := s.Conn().Close(); err != nil { + if err := s.session.Conn().Close(); err != nil { log.Errorf("Error closing connection: %q", err) } }() // Send initial presence to let the server know we want to receive messages. - err = s.Send(context.TODO(), stanza.Presence{Type: stanza.AvailablePresence}.Wrap(nil)) + err = s.session.Send(context.TODO(), stanza.Presence{Type: stanza.AvailablePresence}.Wrap(nil)) if err != nil { return err } - s.Serve(mux.New( - mux.MessageFunc("",xml.Name{Local: "subject"}, xs.message), + // Send subscripe to ask for allowing sending IQ (Register/Unregister) + err = s.session.Send(context.TODO(), stanza.Presence{ + Type: stanza.SubscribePresence, + To: jid.MustParse(s.Gateway), + }.Wrap(nil)) + if err != nil { + return err + } + s.session.Serve(mux.New( + mux.MessageFunc("", xml.Name{Local: "subject"}, s.message), + mux.PresenceFunc(stanza.SubscribePresence, xml.Name{}, s.autoSubscribe), )) return nil } -func (xs *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEncoder) error { +// autoSubscribe to allow sending IQ +func (s *XMPPService) autoSubscribe(presHead stanza.Presence, t xmlstream.TokenReadEncoder) error { + log.WithField("p", presHead).Info("autoSubscribe") + t.Encode(stanza.Presence{ + Type: stanza.SubscribedPresence, + To: presHead.From, + }) + return nil +} + +// handler of incoming message - forward to DBUS +func (s *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEncoder) error { d := xml.NewTokenDecoder(t) - msg := struct { - Token string `xml:"subject"` - Body string `xml:"body"` - }{} + msg := messages.MessageBody{} err := d.Decode(&msg) if err != nil && err != io.EOF { log.WithField("msg", msg).Errorf("Error decoding message: %q", err) return nil } from := msgHead.From.Bare().String() - if xs.Gateway == "" || from != xs.Gateway { + if s.Gateway == "" || from != s.Gateway { log.WithField("from", from).Info("message not from gateway, that is no notification") return nil } @@ -82,10 +105,15 @@ func (xs *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEnco return nil } + token := strings.SplitN(msg.Token, "/", 2) + if len(token) >= 2 { + log.WithField("token", msg.Token).Errorf("unable to parse token") + return nil + } //TODO Lockup for appid by token in storage - if xs.dbus. - NewConnector("cc.malhotra.karmanyaah.testapp.golibrary"). - Message(msg.Token, msg.Body, msgHead.ID) != nil { + if s.dbus. + NewConnector(token[0]). + Message(token[1], msg.Body, msgHead.ID) != nil { log.Errorf("Error send unified push: %q", err) return nil } @@ -93,3 +121,49 @@ func (xs *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEnco return nil } +// Register handler of DBUS Distribution +func (s *XMPPService) Register(appName, token string) (string, string, error) { + logger := log.WithFields(map[string]interface{}{ + "name": appName, + "token": token, + }) + iq := messages.RegisterIQ{ + IQ: stanza.IQ{ + Type: stanza.SetIQ, + To: jid.MustParse(s.Gateway), + }, + } + externalToken := fmt.Sprintf("%s/%s", appName, token) + iq.Register.Token = &messages.TokenData{ Body: externalToken } + t, err := s.session.EncodeIQ(context.TODO(), iq) + if err != nil { + logger.Errorf("xmpp send IQ for register: %v", err) + return "", "xmpp unable send iq to gateway", err + } + d := xml.NewTokenDecoder(t) + iqRegister := messages.RegisterIQ{} + if err := d.Decode(&iqRegister); err != nil { + logger.Errorf("xmpp recv IQ for register: %v", err) + return "", "xmpp unable recv iq to gateway", err + } + if endpoint := iqRegister.Register.Endpoint; endpoint != nil { + logger.WithField("endpoint", endpoint.Body).Info("success") + return endpoint.Body, "", nil + } + errStr := "Unknown Error" + if errr := iqRegister.Register.Error; errr != nil { + errStr = errr.Body + } + err = errors.New(errStr) + logger.WithField("error", err).Error("unable to register") + return "", errStr, err +} + +// Unregister handler of DBUS Distribution +func (xs *XMPPService) Unregister(token string) { + log.WithFields(map[string]interface{}{ + "token": token, + }).Info("distributor-unregister") + appID := "" + _ = xs.dbus.NewConnector(appID).Unregistered(token) +} diff --git a/gateway/go.mod b/gateway/go.mod index 58500b2..f88573a 100644 --- a/gateway/go.mod +++ b/gateway/go.mod @@ -11,6 +11,7 @@ require ( ) require ( + dev.sum7.eu/genofire/unified-push-xmpp/messages v0.0.0-00010101000000-000000000000 // indirect github.com/bdlm/std v1.0.1 // indirect github.com/naoina/go-stringutil v0.1.0 // indirect github.com/naoina/toml v0.1.1 // indirect @@ -22,3 +23,5 @@ require ( golang.org/x/tools v0.0.0-20200103221440-774c71fcf114 // indirect mellium.im/reader v0.1.0 // indirect ) + +replace dev.sum7.eu/genofire/unified-push-xmpp/messages => ../messages diff --git a/gateway/xmpp.go b/gateway/xmpp.go index 39acd39..71c9dca 100644 --- a/gateway/xmpp.go +++ b/gateway/xmpp.go @@ -4,75 +4,150 @@ import ( "context" "crypto/tls" "encoding/xml" + "io" "github.com/bdlm/log" "mellium.im/sasl" "mellium.im/xmlstream" - "mellium.im/xmpp/mux" "mellium.im/xmpp" "mellium.im/xmpp/jid" + "mellium.im/xmpp/mux" "mellium.im/xmpp/stanza" + + "dev.sum7.eu/genofire/unified-push-xmpp/messages" ) type XMPPService struct { Login string Password string + session *xmpp.Session } -// XMLElement is for Unmarshal undefined structs a fallback - any hasn't matched element -type XMLElement struct { - XMLName xml.Name - InnerXML string `xml:",innerxml"` -} - -func (xs *XMPPService) Run() error { - j := jid.MustParse(xs.Login) - s, err := xmpp.DialClientSession( +func (s *XMPPService) Run() error { + var err error + j := jid.MustParse(s.Login) + if s.session, err = xmpp.DialClientSession( context.TODO(), j, - xmpp.BindResource(), + xmpp.BindCustom(func(i jid.JID,r string) (jid.JID, error) { + // Never run + log.Infof("try to bind: %v with ressource %s", i, r) + return j, nil + }), xmpp.StartTLS(&tls.Config{ ServerName: j.Domain().String(), }), // sasl.ScramSha1Plus <- problem with (my) ejabberd //xmpp.SASL("", xs.Password, sasl.ScramSha1Plus, sasl.ScramSha1, sasl.Plain), - xmpp.SASL("", xs.Password, sasl.ScramSha1, sasl.Plain), - ) - if err != nil { + xmpp.SASL("", s.Password, sasl.ScramSha1, sasl.Plain), + ); err != nil { return err } defer func() { log.Info("Closing session…") - if err := s.Close(); err != nil { + if err := s.session.Close(); err != nil { log.Errorf("Error closing session: %q", err) } log.Println("Closing conn…") - if err := s.Conn().Close(); err != nil { + if err := s.session.Conn().Close(); err != nil { log.Errorf("Error closing connection: %q", err) } }() // Send initial presence to let the server know we want to receive messages. - err = s.Send(context.TODO(), stanza.Presence{Type: stanza.AvailablePresence}.Wrap(nil)) + err = s.session.Send(context.TODO(), stanza.Presence{Type: stanza.AvailablePresence}.Wrap(nil)) if err != nil { return err } - // TODO - err = s.Encode(context.TODO(), struct{ - stanza.Message - Token string `xml:"subject,omitempty"` - Body string `xml:"body,omitempty"` - }{ + log.Infof("connected with %s", s.session.LocalAddr()) + s.session.Serve(mux.New( + // register - get + set + mux.IQFunc(stanza.SetIQ, xml.Name{Local: messages.LocalRegister, Space: messages.Space}, s.handleRegister), + mux.IQFunc(stanza.GetIQ, xml.Name{Local: messages.LocalRegister, Space: messages.Space}, s.handleRegister), + // unregister - get + set + mux.IQFunc(stanza.SetIQ, xml.Name{Local: messages.LocalUnregister, Space: messages.Space}, s.handleUnregister), + mux.IQFunc(stanza.GetIQ, xml.Name{Local: messages.LocalUnregister, Space: messages.Space}, s.handleUnregister), + // auto accept + mux.PresenceFunc(stanza.SubscribePresence, xml.Name{}, s.autoSubscribe), + )) + return nil +} +// autoSubscribe to allow sending IQ +func (s *XMPPService) autoSubscribe(presHead stanza.Presence, t xmlstream.TokenReadEncoder) error { + log.WithField("p", presHead).Info("autoSubscribe") + // request eighter + t.Encode(stanza.Presence{ + Type: stanza.SubscribePresence, + To: presHead.From, + }) + // agree + t.Encode(stanza.Presence{ + Type: stanza.SubscribedPresence, + To: presHead.From, + }) + return nil +} + +func (s *XMPPService) handleRegister(iq stanza.IQ, t xmlstream.TokenReadEncoder, start *xml.StartElement) error { + reply := messages.RegisterIQ{ + IQ: stanza.IQ{ + ID: iq.ID, + Type: stanza.ErrorIQ, + To: iq.From, + }, + } + defer func(){ + if err := t.Encode(reply); err != nil { + log.Errorf("sending response: %v", err) + } + }() + log.Infof("recieved iq: %v", iq) + + tokenData := messages.TokenData{} + err := xml.NewTokenDecoder(t).Decode(&tokenData) + if err != nil && err != io.EOF { + log.Errorf("Error decoding message: %q", err) + reply.Register.Error = &messages.ErrorData{ Body: "unable decode"} + return nil + } + token := tokenData.Body + if token == "" { + log.Errorf("no token found: %v", token) + reply.Register.Error = &messages.ErrorData{ Body: "no token"} + return nil + } + endpoint := "https://localhost/UP?token=" + token + "&to=" +iq.From.String() + reply.IQ.Type = stanza.ResultIQ + reply.Register.Endpoint = &messages.EndpointData{ Body: endpoint} + log.Infof("generate respone: %v", endpoint) + return nil +} +func (s *XMPPService) handleUnregister(iq stanza.IQ, t xmlstream.TokenReadEncoder, start *xml.StartElement) error { + reply := messages.UnregisterIQ{ + IQ: stanza.IQ{ + ID: iq.ID, + Type: stanza.ErrorIQ, + To: iq.From, + }, + } + defer func(){ + if err := t.Encode(reply); err != nil { + log.Errorf("sending response: %v", err) + } + }() + log.Infof("unhandled: %v", start) + + reply.Unregister.Error = "not implemented" + return nil +} + +// SendMessage of an UP Notification +func (s *XMPPService) SendMessage(to, token, content string) error { + return s.session.Encode(context.TODO(), messages.Message{ Message: stanza.Message{ - To: jid.MustParse("up-test@chat.sum7.eu"), + To: jid.MustParse(to), // Type: stanza.ChatMessage, Type: stanza.NormalMessage, }, - Token: "691499b4-adaf-4a92-b417-40e9a68f04a6", - Body: "New Message ;) - Titel of UP-Developing", + Token: token, + Body: content, }) - s.Serve(mux.New()) - return nil -} -func (xs *XMPPService) HandleXMPP(t xmlstream.TokenReadEncoder, start *xml.StartElement) error { - log.Infof("unhandled: %v", start) - return nil } diff --git a/messages/main.go b/messages/main.go index 32223b0..2dd1f4b 100644 --- a/messages/main.go +++ b/messages/main.go @@ -17,18 +17,27 @@ const ( // RegisterIQ with stanza type RegisterIQ struct { stanza.IQ - Register + Register struct{ + XMLName xml.Name `xml:"unifiedpush.org register"` + Token *TokenData `xml:"token"` + Endpoint *EndpointData `xml:"endpoint"` + Error *ErrorData `xml:"error"` + } `xml:"register"` } -// Register without stanza -type Register struct { - XMLName xml.Name `xml:"unifiedpush.org register"` - // set - Token string `xml:"token,omitempty"` - // result - Endpoint string `xml:"endpoint,omitempty"` - // error - Error string `xml:"error,omitempty"` +type TokenData struct { + XMLName xml.Name `xml:"token"` + Body string `xml:",chardata"` +} + +type EndpointData struct { + XMLName xml.Name `xml:"endpoint"` + Body string `xml:",chardata"` +} + +type ErrorData struct { + XMLName xml.Name `xml:"error"` + Body string `xml:",chardata"` } // UnregisterIQ with stanza