xmpp works with components (distribute Encoding hangs) - dirty without Webserver, and nice tokens
This commit is contained in:
parent
950a147042
commit
be6e6c4154
|
@ -4,9 +4,9 @@ import (
|
|||
"context"
|
||||
"crypto/tls"
|
||||
"encoding/xml"
|
||||
"io"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
|
||||
"github.com/bdlm/log"
|
||||
|
@ -60,31 +60,12 @@ func (s *XMPPService) Run(dbus *distributor.DBus) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Send subscripe to ask for allowing sending IQ (Register/Unregister)
|
||||
err = s.session.Send(context.TODO(), stanza.Presence{
|
||||
Type: stanza.SubscribePresence,
|
||||
To: jid.MustParse(s.Gateway),
|
||||
}.Wrap(nil))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.session.Serve(mux.New(
|
||||
mux.MessageFunc("", xml.Name{Local: "subject"}, s.message),
|
||||
mux.PresenceFunc(stanza.SubscribePresence, xml.Name{}, s.autoSubscribe),
|
||||
))
|
||||
return nil
|
||||
}
|
||||
|
||||
// autoSubscribe to allow sending IQ
|
||||
func (s *XMPPService) autoSubscribe(presHead stanza.Presence, t xmlstream.TokenReadEncoder) error {
|
||||
log.WithField("p", presHead).Info("autoSubscribe")
|
||||
t.Encode(stanza.Presence{
|
||||
Type: stanza.SubscribedPresence,
|
||||
To: presHead.From,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
// handler of incoming message - forward to DBUS
|
||||
func (s *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEncoder) error {
|
||||
d := xml.NewTokenDecoder(t)
|
||||
|
@ -121,6 +102,7 @@ func (s *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEncod
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Register handler of DBUS Distribution
|
||||
func (s *XMPPService) Register(appName, token string) (string, string, error) {
|
||||
logger := log.WithFields(map[string]interface{}{
|
||||
|
@ -134,7 +116,7 @@ func (s *XMPPService) Register(appName, token string) (string, string, error) {
|
|||
},
|
||||
}
|
||||
externalToken := fmt.Sprintf("%s/%s", appName, token)
|
||||
iq.Register.Token = &messages.TokenData{ Body: externalToken }
|
||||
iq.Register.Token = &messages.TokenData{Body: externalToken}
|
||||
t, err := s.session.EncodeIQ(context.TODO(), iq)
|
||||
if err != nil {
|
||||
logger.Errorf("xmpp send IQ for register: %v", err)
|
||||
|
|
|
@ -25,3 +25,5 @@ require (
|
|||
)
|
||||
|
||||
replace dev.sum7.eu/genofire/unified-push-xmpp/messages => ../messages
|
||||
|
||||
replace mellium.im/xmpp => ../../../../mellium.im/xmpp
|
||||
|
|
102
gateway/xmpp.go
102
gateway/xmpp.go
|
@ -2,14 +2,14 @@ package main
|
|||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"encoding/xml"
|
||||
"io"
|
||||
"net"
|
||||
|
||||
"github.com/bdlm/log"
|
||||
"mellium.im/sasl"
|
||||
"mellium.im/xmlstream"
|
||||
"mellium.im/xmpp"
|
||||
"mellium.im/xmpp/component"
|
||||
"mellium.im/xmpp/jid"
|
||||
"mellium.im/xmpp/mux"
|
||||
"mellium.im/xmpp/stanza"
|
||||
|
@ -18,27 +18,23 @@ import (
|
|||
)
|
||||
|
||||
type XMPPService struct {
|
||||
Login string
|
||||
Password string
|
||||
Addr string `toml:"address"`
|
||||
JID string `toml:"jid"`
|
||||
Secret string `toml:"secret"`
|
||||
session *xmpp.Session
|
||||
}
|
||||
|
||||
func (s *XMPPService) Run() error {
|
||||
var err error
|
||||
j := jid.MustParse(s.Login)
|
||||
if s.session, err = xmpp.DialClientSession(
|
||||
context.TODO(), j,
|
||||
xmpp.BindCustom(func(i jid.JID,r string) (jid.JID, error) {
|
||||
// Never run
|
||||
log.Infof("try to bind: %v with ressource %s", i, r)
|
||||
return j, nil
|
||||
}),
|
||||
xmpp.StartTLS(&tls.Config{
|
||||
ServerName: j.Domain().String(),
|
||||
}),
|
||||
// sasl.ScramSha1Plus <- problem with (my) ejabberd
|
||||
//xmpp.SASL("", xs.Password, sasl.ScramSha1Plus, sasl.ScramSha1, sasl.Plain),
|
||||
xmpp.SASL("", s.Password, sasl.ScramSha1, sasl.Plain),
|
||||
j := jid.MustParse(s.JID)
|
||||
ctx := context.TODO()
|
||||
conn, err := net.Dial("tcp", s.Addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if s.session, err = component.NewSession(
|
||||
ctx, j.Domain(),
|
||||
[]byte(s.Secret), conn,
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -52,38 +48,27 @@ func (s *XMPPService) Run() error {
|
|||
log.Errorf("Error closing connection: %q", err)
|
||||
}
|
||||
}()
|
||||
// Send initial presence to let the server know we want to receive messages.
|
||||
/* Send initial presence to let the server know we want to receive messages.
|
||||
err = s.session.Send(context.TODO(), stanza.Presence{Type: stanza.AvailablePresence}.Wrap(nil))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}*/
|
||||
log.Infof("connected with %s", s.session.LocalAddr())
|
||||
s.session.Serve(mux.New(
|
||||
return s.session.Serve(mux.New(
|
||||
// register - get + set
|
||||
mux.IQFunc(stanza.SetIQ, xml.Name{Local: messages.LocalRegister, Space: messages.Space}, s.handleRegister),
|
||||
mux.IQFunc(stanza.GetIQ, xml.Name{Local: messages.LocalRegister, Space: messages.Space}, s.handleRegister),
|
||||
// unregister - get + set
|
||||
mux.IQFunc(stanza.SetIQ, xml.Name{Local: messages.LocalUnregister, Space: messages.Space}, s.handleUnregister),
|
||||
mux.IQFunc(stanza.GetIQ, xml.Name{Local: messages.LocalUnregister, Space: messages.Space}, s.handleUnregister),
|
||||
// auto accept
|
||||
mux.PresenceFunc(stanza.SubscribePresence, xml.Name{}, s.autoSubscribe),
|
||||
// mux.IQFunc("", xml.Name{}, s.handleDisco),
|
||||
))
|
||||
/* -
|
||||
return s.session.Serve(xmpp.HandlerFunc(func(t xmlstream.TokenReadEncoder, start *xml.StartElement) error {
|
||||
log.Info(start)
|
||||
return nil
|
||||
}
|
||||
// autoSubscribe to allow sending IQ
|
||||
func (s *XMPPService) autoSubscribe(presHead stanza.Presence, t xmlstream.TokenReadEncoder) error {
|
||||
log.WithField("p", presHead).Info("autoSubscribe")
|
||||
// request eighter
|
||||
t.Encode(stanza.Presence{
|
||||
Type: stanza.SubscribePresence,
|
||||
To: presHead.From,
|
||||
})
|
||||
// agree
|
||||
t.Encode(stanza.Presence{
|
||||
Type: stanza.SubscribedPresence,
|
||||
To: presHead.From,
|
||||
})
|
||||
return nil
|
||||
}))
|
||||
*/
|
||||
}
|
||||
|
||||
func (s *XMPPService) handleRegister(iq stanza.IQ, t xmlstream.TokenReadEncoder, start *xml.StartElement) error {
|
||||
|
@ -91,12 +76,13 @@ func (s *XMPPService) handleRegister(iq stanza.IQ, t xmlstream.TokenReadEncoder,
|
|||
IQ: stanza.IQ{
|
||||
ID: iq.ID,
|
||||
Type: stanza.ErrorIQ,
|
||||
From: iq.To,
|
||||
To: iq.From,
|
||||
},
|
||||
}
|
||||
defer func(){
|
||||
defer func() {
|
||||
if err := t.Encode(reply); err != nil {
|
||||
log.Errorf("sending response: %v", err)
|
||||
log.Errorf("sending register response: %v", err)
|
||||
}
|
||||
}()
|
||||
log.Infof("recieved iq: %v", iq)
|
||||
|
@ -104,19 +90,19 @@ func (s *XMPPService) handleRegister(iq stanza.IQ, t xmlstream.TokenReadEncoder,
|
|||
tokenData := messages.TokenData{}
|
||||
err := xml.NewTokenDecoder(t).Decode(&tokenData)
|
||||
if err != nil && err != io.EOF {
|
||||
log.Errorf("Error decoding message: %q", err)
|
||||
reply.Register.Error = &messages.ErrorData{ Body: "unable decode"}
|
||||
log.Errorf("decoding message: %q", err)
|
||||
reply.Register.Error = &messages.ErrorData{Body: "unable decode"}
|
||||
return nil
|
||||
}
|
||||
token := tokenData.Body
|
||||
if token == "" {
|
||||
log.Errorf("no token found: %v", token)
|
||||
reply.Register.Error = &messages.ErrorData{ Body: "no token"}
|
||||
reply.Register.Error = &messages.ErrorData{Body: "no token"}
|
||||
return nil
|
||||
}
|
||||
endpoint := "https://localhost/UP?token=" + token + "&to=" +iq.From.String()
|
||||
endpoint := "https://localhost/UP?token=" + token + "&to=" + iq.From.String()
|
||||
reply.IQ.Type = stanza.ResultIQ
|
||||
reply.Register.Endpoint = &messages.EndpointData{ Body: endpoint}
|
||||
reply.Register.Endpoint = &messages.EndpointData{Body: endpoint}
|
||||
log.Infof("generate respone: %v", endpoint)
|
||||
return nil
|
||||
}
|
||||
|
@ -125,17 +111,34 @@ func (s *XMPPService) handleUnregister(iq stanza.IQ, t xmlstream.TokenReadEncode
|
|||
IQ: stanza.IQ{
|
||||
ID: iq.ID,
|
||||
Type: stanza.ErrorIQ,
|
||||
From: iq.To,
|
||||
To: iq.From,
|
||||
},
|
||||
}
|
||||
defer func(){
|
||||
defer func() {
|
||||
if err := t.Encode(reply); err != nil {
|
||||
log.Errorf("sending unregister response: %v", err)
|
||||
}
|
||||
}()
|
||||
log.Infof("unregistered unhandled: %v", start)
|
||||
|
||||
reply.Unregister.Error = "not implemented"
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *XMPPService) handleDisco(iq stanza.IQ, t xmlstream.TokenReadEncoder, start *xml.StartElement) error {
|
||||
reply := stanza.IQ{
|
||||
ID: iq.ID,
|
||||
Type: stanza.ErrorIQ,
|
||||
From: iq.To,
|
||||
To: iq.From,
|
||||
}
|
||||
defer func() {
|
||||
if err := t.Encode(reply); err != nil {
|
||||
log.Errorf("sending response: %v", err)
|
||||
}
|
||||
}()
|
||||
log.Infof("unhandled: %v", start)
|
||||
|
||||
reply.Unregister.Error = "not implemented"
|
||||
log.Infof("recieved iq: %v", iq)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -144,6 +147,7 @@ func (s *XMPPService) SendMessage(to, token, content string) error {
|
|||
return s.session.Encode(context.TODO(), messages.Message{
|
||||
Message: stanza.Message{
|
||||
To: jid.MustParse(to),
|
||||
From: jid.MustParse(s.JID),
|
||||
// Type: stanza.ChatMessage,
|
||||
Type: stanza.NormalMessage,
|
||||
},
|
||||
|
|
|
@ -17,7 +17,7 @@ const (
|
|||
// RegisterIQ with stanza
|
||||
type RegisterIQ struct {
|
||||
stanza.IQ
|
||||
Register struct{
|
||||
Register struct {
|
||||
XMLName xml.Name `xml:"unifiedpush.org register"`
|
||||
Token *TokenData `xml:"token"`
|
||||
Endpoint *EndpointData `xml:"endpoint"`
|
||||
|
|
Loading…
Reference in New Issue