diff --git a/distributor/go.mod b/distributor/go.mod index 507d268..b79c5f4 100644 --- a/distributor/go.mod +++ b/distributor/go.mod @@ -10,7 +10,7 @@ require ( github.com/google/uuid v1.3.0 mellium.im/sasl v0.2.1 mellium.im/xmlstream v0.15.3-0.20210221202126-7cc1407dad4c - mellium.im/xmpp v0.19.0 + mellium.im/xmpp v0.19.1-0.20210901124536-6846e6241769 unifiedpush.org/go/np2p_dbus v0.0.0-20210916024230-0bcac24079b3 ) diff --git a/distributor/go.sum b/distributor/go.sum index 50f5bf6..cf10f76 100644 --- a/distributor/go.sum +++ b/distributor/go.sum @@ -741,8 +741,9 @@ mellium.im/sasl v0.2.1 h1:nspKSRg7/SyO0cRGY71OkfHab8tf9kCts6a6oTDut0w= mellium.im/sasl v0.2.1/go.mod h1:ROaEDLQNuf9vjKqE1SrAfnsobm2YKXT1gnN1uDp1PjQ= mellium.im/xmlstream v0.15.3-0.20210221202126-7cc1407dad4c h1:1RCzOXu94kvNjuCC89G+5XTP6GOdoDrLsYdGIryyc2Y= mellium.im/xmlstream v0.15.3-0.20210221202126-7cc1407dad4c/go.mod h1:7SUlP7f2qnMczK+Cu/OFgqaIhldMolVjo8np7xG41D0= -mellium.im/xmpp v0.19.0 h1:zgW0jOOxEfQn8v+zUsInaZikSa8BELpKpvnTYzM435E= mellium.im/xmpp v0.19.0/go.mod h1:zpU69WRb0YuYcVTM/GVweZQP8r48nj1Tlq8RLXUOSNE= +mellium.im/xmpp v0.19.1-0.20210901124536-6846e6241769 h1:V//oZVgzyQF427rqMAKFHfKLA7gbeTuPbENK3S8DPTU= +mellium.im/xmpp v0.19.1-0.20210901124536-6846e6241769/go.mod h1:zpU69WRb0YuYcVTM/GVweZQP8r48nj1Tlq8RLXUOSNE= nhooyr.io/websocket v1.8.7/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= diff --git a/distributor/xmpp.go b/distributor/xmpp.go index 5818290..3a3c8f4 100644 --- a/distributor/xmpp.go +++ b/distributor/xmpp.go @@ -14,6 +14,7 @@ import ( "mellium.im/xmlstream" "mellium.im/xmpp" "mellium.im/xmpp/disco" + "mellium.im/xmpp/history" "mellium.im/xmpp/jid" "mellium.im/xmpp/mux" "mellium.im/xmpp/stanza" @@ -93,15 +94,24 @@ func (s *XMPPService) Run(dbus *distributor.DBus, store *storage.Storage) error } }() log.Debug("xmpp client is running") + + go func() { + log.Debug("starting fetch") + q := history.Query{Start: time.Now().Add(-2 * time.Hour), With: s.gateway.Bare(), Limit: 100} + _, _ = history.Fetch(context.TODO(), q, s.session.LocalAddr().Bare(), s.session) + log.Debug("ending fetch") + }() + s.session.Serve(mux.New( // disco.Handle(), - mux.MessageFunc("", xml.Name{Local: "subject"}, s.message), + mux.MessageFunc("", xml.Name{Local: "subject"}, s.HandleMessage), + history.Handle(history.NewHandler(s)), )) return nil } -// handler of incoming message - forward to DBUS -func (s *XMPPService) message(msgHead stanza.Message, t xmlstream.TokenReadEncoder) error { +// handler of incoming HandleMessage - forward to DBUS +func (s *XMPPService) HandleMessage(msgHead stanza.Message, t xmlstream.TokenReadEncoder) error { logger := log.WithFields(map[string]interface{}{ "to": msgHead.To.String(), "from": msgHead.From.String(),