2021-09-09 23:12:29 +02:00
package main
import (
"context"
"crypto/tls"
"encoding/xml"
2021-09-11 17:16:38 +02:00
"errors"
2021-09-12 00:31:03 +02:00
"io"
2021-09-15 19:06:11 +02:00
"time"
2021-09-09 23:12:29 +02:00
"github.com/bdlm/log"
2021-09-14 21:37:37 +02:00
"github.com/google/uuid"
2021-09-09 23:12:29 +02:00
"mellium.im/sasl"
"mellium.im/xmlstream"
"mellium.im/xmpp"
2021-09-15 17:04:06 +02:00
"mellium.im/xmpp/disco"
2021-09-16 05:14:59 +02:00
"mellium.im/xmpp/history"
2021-09-09 23:12:29 +02:00
"mellium.im/xmpp/jid"
2021-09-11 17:16:38 +02:00
"mellium.im/xmpp/mux"
2021-09-09 23:12:29 +02:00
"mellium.im/xmpp/stanza"
"unifiedpush.org/go/np2p_dbus/distributor"
2021-09-14 21:37:37 +02:00
"unifiedpush.org/go/np2p_dbus/storage"
2021-09-11 17:16:38 +02:00
"dev.sum7.eu/genofire/unified-push-xmpp/messages"
2021-09-09 23:12:29 +02:00
)
2021-09-15 18:42:08 +02:00
// Demo Server as fallback
var (
XMPPUPDemoJID = jid . MustParse ( "up.chat.sum7.eu" )
)
2021-09-09 23:12:29 +02:00
type XMPPService struct {
Login string
Password string
Gateway string
2021-09-15 18:42:08 +02:00
gateway jid . JID
2021-09-09 23:12:29 +02:00
dbus * distributor . DBus
2021-09-12 00:31:03 +02:00
session * xmpp . Session
2021-09-14 21:37:37 +02:00
store * storage . Storage
2021-09-09 23:12:29 +02:00
}
2021-09-14 21:37:37 +02:00
func ( s * XMPPService ) Run ( dbus * distributor . DBus , store * storage . Storage ) error {
2021-09-11 17:16:38 +02:00
var err error
s . dbus = dbus
2021-09-14 21:37:37 +02:00
s . store = store
2021-09-11 17:16:38 +02:00
j := jid . MustParse ( s . Login )
2021-09-15 20:47:29 +02:00
ctx , cancel := context . WithTimeout ( context . Background ( ) , time . Duration ( time . Second * 10 ) )
2021-09-15 19:06:11 +02:00
defer cancel ( )
2021-09-11 17:16:38 +02:00
if s . session , err = xmpp . DialClientSession (
2021-09-15 19:06:11 +02:00
ctx , j ,
2021-09-09 23:12:29 +02:00
xmpp . BindResource ( ) ,
xmpp . StartTLS ( & tls . Config {
ServerName : j . Domain ( ) . String ( ) ,
} ) ,
//TODO sasl.ScramSha1Plus <- problem with (my) ejabberd
2021-09-11 17:16:38 +02:00
//xmpp.SASL("", s.Password, sasl.ScramSha1Plus, sasl.ScramSha1, sasl.Plain),
xmpp . SASL ( "" , s . Password , sasl . ScramSha1 , sasl . Plain ) ,
) ; err != nil {
2021-09-09 23:12:29 +02:00
return err
}
defer func ( ) {
2021-09-14 09:49:59 +02:00
log . Info ( "closing session" )
2021-09-11 17:16:38 +02:00
if err := s . session . Close ( ) ; err != nil {
2021-09-14 09:49:59 +02:00
log . Errorf ( "error closing session: %q" , err )
2021-09-09 23:12:29 +02:00
}
2021-09-14 09:49:59 +02:00
log . Println ( "closing connection" )
2021-09-11 17:16:38 +02:00
if err := s . session . Conn ( ) . Close ( ) ; err != nil {
2021-09-14 09:49:59 +02:00
log . Errorf ( "error closing connection: %q" , err )
2021-09-09 23:12:29 +02:00
}
} ( )
// Send initial presence to let the server know we want to receive messages.
2021-09-15 19:06:11 +02:00
err = s . session . Send ( ctx , stanza . Presence { Type : stanza . AvailablePresence } . Wrap ( nil ) )
2021-09-09 23:12:29 +02:00
if err != nil {
return err
}
2021-09-15 18:42:08 +02:00
go func ( ) {
if err := s . checkServer ( ) ; err != nil {
log . Errorf ( "check server: %v" , err )
}
} ( )
go func ( ) {
if gateway , err := jid . Parse ( s . Gateway ) ; err != nil {
if err := s . checkGateway ( ) ; err != nil {
log . Panicf ( "no gateway found: %v" , err )
} else {
2021-09-15 19:06:11 +02:00
log . WithField ( "gateway" , s . gateway . String ( ) ) . Info ( "using found UnifiedPush" )
2021-09-15 18:42:08 +02:00
}
} else {
if err := s . testAndUseGateway ( gateway ) ; err != nil {
log . Panic ( err )
} else {
2021-09-15 19:06:11 +02:00
log . WithField ( "gateway" , s . gateway . String ( ) ) . Info ( "using configured UnifiedPush" )
2021-09-15 18:42:08 +02:00
}
}
} ( )
log . Debug ( "xmpp client is running" )
2021-09-16 05:14:59 +02:00
go func ( ) {
log . Debug ( "starting fetch" )
q := history . Query { Start : time . Now ( ) . Add ( - 2 * time . Hour ) , With : s . gateway . Bare ( ) , Limit : 100 }
_ , _ = history . Fetch ( context . TODO ( ) , q , s . session . LocalAddr ( ) . Bare ( ) , s . session )
log . Debug ( "ending fetch" )
} ( )
2021-09-11 17:16:38 +02:00
s . session . Serve ( mux . New (
2021-09-15 17:04:06 +02:00
// disco.Handle(),
2021-09-16 05:14:59 +02:00
mux . MessageFunc ( "" , xml . Name { Local : "subject" } , s . HandleMessage ) ,
history . Handle ( history . NewHandler ( s ) ) ,
2021-09-09 23:12:29 +02:00
) )
return nil
}
2021-09-16 05:14:59 +02:00
// handler of incoming HandleMessage - forward to DBUS
func ( s * XMPPService ) HandleMessage ( msgHead stanza . Message , t xmlstream . TokenReadEncoder ) error {
2021-09-14 09:49:59 +02:00
logger := log . WithFields ( map [ string ] interface { } {
"to" : msgHead . To . String ( ) ,
"from" : msgHead . From . String ( ) ,
"id" : msgHead . ID ,
} )
2021-09-09 23:12:29 +02:00
d := xml . NewTokenDecoder ( t )
2021-09-11 17:16:38 +02:00
msg := messages . MessageBody { }
2021-09-09 23:12:29 +02:00
err := d . Decode ( & msg )
if err != nil && err != io . EOF {
2021-09-14 09:49:59 +02:00
log . WithField ( "msg" , msg ) . Errorf ( "error decoding message: %q" , err )
2021-09-09 23:12:29 +02:00
return nil
}
from := msgHead . From . Bare ( ) . String ( )
2021-09-15 18:42:08 +02:00
if from != s . gateway . String ( ) {
2021-09-09 23:12:29 +02:00
log . WithField ( "from" , from ) . Info ( "message not from gateway, that is no notification" )
return nil
}
2021-09-14 21:55:21 +02:00
if msg . Body == "" || msg . PublicToken == "" {
2021-09-09 23:12:29 +02:00
log . Infof ( "empty: %v" , msgHead )
return nil
}
2021-09-14 09:49:59 +02:00
logger = logger . WithFields ( map [ string ] interface { } {
2021-09-14 21:55:21 +02:00
"publicToken" : msg . PublicToken ,
2021-09-14 21:37:37 +02:00
"content" : msg . Body ,
2021-09-14 09:49:59 +02:00
} )
2021-09-09 23:12:29 +02:00
2021-09-14 21:55:21 +02:00
conn := s . store . GetConnectionbyPublic ( msg . PublicToken )
2021-09-14 21:37:37 +02:00
if conn == nil {
logger . Warnf ( "no appID and appToken found for publicToken" )
2021-09-16 03:44:35 +02:00
return nil
2021-09-11 17:16:38 +02:00
}
2021-09-14 09:49:59 +02:00
logger = logger . WithFields ( map [ string ] interface { } {
2021-09-14 21:37:37 +02:00
"appID" : conn . AppID ,
"appToken" : conn . AppToken ,
2021-09-14 09:49:59 +02:00
} )
2021-09-11 17:16:38 +02:00
if s . dbus .
2021-09-14 21:37:37 +02:00
NewConnector ( conn . AppID ) .
Message ( conn . AppToken , msg . Body , msgHead . ID ) != nil {
2021-09-14 09:49:59 +02:00
logger . Errorf ( "Error send unified push: %q" , err )
2021-09-09 23:12:29 +02:00
return nil
}
2021-09-16 01:04:43 +02:00
logger . Infof ( "receive unified push" )
2021-09-09 23:12:29 +02:00
return nil
}
2021-09-12 00:31:03 +02:00
2021-09-15 17:04:06 +02:00
// checkServer - background job
func ( s * XMPPService ) checkServer ( ) error {
domain := s . session . LocalAddr ( ) . Domain ( )
2021-09-15 19:06:11 +02:00
logger := log . WithField ( "instance" , domain . String ( ) )
2021-09-15 18:42:08 +02:00
logger . Debug ( "check running" )
2021-09-15 19:06:11 +02:00
ctx , cancel := context . WithTimeout ( context . Background ( ) , time . Duration ( time . Millisecond * 500 ) )
defer cancel ( )
info , err := disco . GetInfo ( ctx , "" , domain , s . session )
2021-09-15 17:04:06 +02:00
if err != nil {
return err
}
2021-09-15 18:42:08 +02:00
2021-09-15 17:04:06 +02:00
// check if server support msgoffline
supportMSGOffline := false
for _ , f := range info . Features {
if f . Var == "msgoffline" {
supportMSGOffline = true
break
}
}
if ! supportMSGOffline {
log . Warn ( "your server does not support offline messages (XEP-0160) - it is need to deliever messages later, if this distributer has current no connection" )
}
2021-09-15 18:42:08 +02:00
logger . Info ( "your instance checked" )
2021-09-15 17:04:06 +02:00
return nil
}
2021-09-15 18:42:08 +02:00
// checkGateway - background job
func ( s * XMPPService ) checkGateway ( ) error {
domain := s . session . LocalAddr ( ) . Domain ( )
2021-09-15 19:06:11 +02:00
log . WithField ( "instance" , domain . String ( ) ) . Infof ( "no gateway configured, try to find one on your instance" )
ctx , cancel := context . WithTimeout ( context . Background ( ) , time . Duration ( time . Millisecond * 500 ) )
defer cancel ( )
iter := disco . FetchItemsIQ ( ctx , "" , stanza . IQ { To : domain } , s . session )
2021-09-15 18:42:08 +02:00
if err := iter . Err ( ) ; err != nil {
iter . Close ( )
return err
}
addresses := [ ] jid . JID { iter . Item ( ) . JID }
for iter . Next ( ) {
if err := iter . Err ( ) ; err != nil {
iter . Close ( )
return err
}
addresses = append ( addresses , iter . Item ( ) . JID )
}
iter . Close ( )
for _ , j := range addresses {
log . Debugf ( "check for UnifiedPush gateway: %s" , j )
if err := s . testAndUseGateway ( j ) ; err == nil {
return nil
}
}
2021-09-15 19:06:11 +02:00
log . WithField ( "gateway" , XMPPUPDemoJID . String ( ) ) . Infof ( "no UnifiedPush gateway on your instance - try demo server" )
2021-09-15 18:42:08 +02:00
return s . testAndUseGateway ( XMPPUPDemoJID )
}
func ( s * XMPPService ) testAndUseGateway ( address jid . JID ) error {
2021-09-15 19:06:11 +02:00
ctx , cancel := context . WithTimeout ( context . Background ( ) , time . Duration ( time . Millisecond * 500 ) )
defer cancel ( )
info , err := disco . GetInfo ( ctx , "" , address , s . session )
2021-09-15 18:42:08 +02:00
if err != nil {
return err
}
for _ , f := range info . Features {
if f . Var == messages . Space {
s . gateway = address
2021-09-15 19:06:11 +02:00
log . WithField ( "gateway" , s . gateway . String ( ) ) . Debug ( "tested UnifiedPush XMPP gateway should work" )
2021-09-15 18:42:08 +02:00
return nil
}
}
return errors . New ( "this is no UnifiedPush gateway" )
}
2021-09-11 17:16:38 +02:00
// Register handler of DBUS Distribution
2021-09-14 21:37:37 +02:00
func ( s * XMPPService ) Register ( appID , appToken string ) ( string , string , error ) {
publicToken := uuid . New ( ) . String ( )
2021-09-11 17:16:38 +02:00
logger := log . WithFields ( map [ string ] interface { } {
2021-09-14 21:37:37 +02:00
"appID" : appID ,
"appToken" : appToken ,
"publicToken" : publicToken ,
2021-09-11 17:16:38 +02:00
} )
iq := messages . RegisterIQ {
IQ : stanza . IQ {
Type : stanza . SetIQ ,
2021-09-15 18:42:08 +02:00
To : s . gateway ,
2021-09-11 17:16:38 +02:00
} ,
}
2021-09-14 21:37:37 +02:00
iq . Register . Token = & messages . TokenData { Body : publicToken }
2021-09-15 19:06:11 +02:00
ctx , cancel := context . WithTimeout ( context . Background ( ) , time . Duration ( time . Millisecond * 500 ) )
defer cancel ( )
t , err := s . session . EncodeIQ ( ctx , iq )
2021-09-11 17:16:38 +02:00
if err != nil {
logger . Errorf ( "xmpp send IQ for register: %v" , err )
return "" , "xmpp unable send iq to gateway" , err
}
2021-09-14 09:14:07 +02:00
defer func ( ) {
if err := t . Close ( ) ; err != nil {
logger . Errorf ( "unable to close registration response %v" , err )
}
} ( )
2021-09-11 17:16:38 +02:00
d := xml . NewTokenDecoder ( t )
iqRegister := messages . RegisterIQ { }
if err := d . Decode ( & iqRegister ) ; err != nil {
logger . Errorf ( "xmpp recv IQ for register: %v" , err )
return "" , "xmpp unable recv iq to gateway" , err
}
if endpoint := iqRegister . Register . Endpoint ; endpoint != nil {
2021-09-15 20:57:41 +02:00
logger = logger . WithField ( "endpoint" , endpoint . Body )
2021-09-14 21:37:37 +02:00
// update Endpoint
conn := s . store . NewConnectionWithToken ( appID , appToken , publicToken , endpoint . Body )
2021-09-16 03:44:35 +02:00
if conn == nil {
2021-09-15 20:57:41 +02:00
errStr := "error to store public token"
err = errors . New ( errStr )
logger . WithField ( "error" , err ) . Error ( "unable to register" )
return "" , errStr , err
}
logger . Info ( "success" )
2021-09-16 04:44:49 +02:00
return conn . Settings , "" , nil
2021-09-11 17:16:38 +02:00
}
errStr := "Unknown Error"
if errr := iqRegister . Register . Error ; errr != nil {
errStr = errr . Body
}
err = errors . New ( errStr )
logger . WithField ( "error" , err ) . Error ( "unable to register" )
return "" , errStr , err
}
// Unregister handler of DBUS Distribution
2021-09-15 20:57:41 +02:00
func ( s * XMPPService ) Unregister ( appToken string ) {
logger := log . WithFields ( map [ string ] interface { } {
"appToken" : appToken ,
} )
conn , err := s . store . DeleteConnection ( appToken )
if err != nil {
log . WithField ( "error" , err ) . Error ( "delete connection on storage" )
return
}
logger = logger . WithFields ( map [ string ] interface { } {
2021-09-14 21:37:37 +02:00
"appID" : conn . AppID ,
"publicToken" : conn . PublicToken ,
2021-09-16 04:44:49 +02:00
"endpoint" : conn . Settings ,
2021-09-15 20:57:41 +02:00
} )
if err = s . dbus . NewConnector ( conn . AppID ) . Unregistered ( conn . AppToken ) ; err != nil {
logger . WithField ( "error" , err ) . Error ( "send unregister per dbus " )
return
}
logger . Info ( "distributor-unregister" )
2021-09-11 17:16:38 +02:00
}