diff --git a/output/all/internal.go b/output/all/internal.go index 2c9cff9..e19b389 100644 --- a/output/all/internal.go +++ b/output/all/internal.go @@ -2,6 +2,7 @@ package all import ( "github.com/bdlm/log" + "time" "dev.sum7.eu/genofire/logmania/bot" "dev.sum7.eu/genofire/logmania/database" @@ -69,7 +70,19 @@ func (out *Output) sender() { } func (out *Output) Send(e *log.Entry, to *database.Notify) bool { + before := time.Now() + + logger := log.WithFields(e.Data) + logger = logger.WithField("msg", e.Message) + + logger.Debugf("starting forward message") + out.channelNotify <- e + + after := time.Now() + delta := after.Sub(before) + logger.WithField("ms", float64(delta)/float64(time.Millisecond)).Debugf("end forward message") + return true } diff --git a/output/xmpp/main.go b/output/xmpp/main.go index a5f426f..8503440 100644 --- a/output/xmpp/main.go +++ b/output/xmpp/main.go @@ -3,11 +3,10 @@ package xmpp import ( "regexp" - xmpp_client "dev.sum7.eu/genofire/yaja/client" - xmpp "dev.sum7.eu/genofire/yaja/xmpp" - "dev.sum7.eu/genofire/yaja/xmpp/base" "github.com/bdlm/log" "github.com/mitchellh/mapstructure" + "gosrc.io/xmpp" + "gosrc.io/xmpp/stanza" "dev.sum7.eu/genofire/logmania/bot" "dev.sum7.eu/genofire/logmania/database" @@ -20,8 +19,6 @@ const ( nickname = "logmania" ) -var historyMaxChars = 0 - var logger = log.WithField("output", proto) type Output struct { @@ -29,12 +26,13 @@ type Output struct { defaults []*database.Notify channels map[string]bool bot *bot.Bot - client *xmpp_client.Client + client xmpp.Sender botOut chan interface{} logOut chan interface{} } type OutputConfig struct { + Address string `mapstructure:"address"` JID string `mapstructure:"jid"` Password string `mapstructure:"password"` Defaults map[string]bool `mapstructure:"default"` @@ -47,53 +45,54 @@ func Init(configInterface interface{}, db *database.DB, bot *bot.Bot) output.Out return nil } - jid := xmppbase.NewJID(config.JID) - client, err := xmpp_client.NewClient(jid, config.Password) + out := &Output{ + channels: make(map[string]bool), + bot: bot, + } + + router := xmpp.NewRouter() + router.HandleFunc("message", out.recvMessage) + router.HandleFunc("presence", out.recvPresence) + + client, err := xmpp.NewClient(xmpp.Config{ + Address: config.Address, + Jid: config.JID, + Password: config.Password, + }, router) if err != nil { logger.Error(err) return nil } - go func() { - client.Start() - log.Panic("closed connection") - }() - out := &Output{ - channels: make(map[string]bool), - bot: bot, - client: client, - botOut: make(chan interface{}), - logOut: make(chan interface{}), - } - go out.sender() - go out.receiver() + cm := xmpp.NewStreamManager(client, func(c xmpp.Sender) { + out.client = c - logger.WithField("jid", config.JID).Info("startup") - - for to, muc := range config.Defaults { - var def *database.Notify - pro := proto - if muc { - pro = protoGroup - } - if dbNotify, ok := db.NotifiesByAddress[pro+":"+to]; ok { - def = dbNotify - } else { - def = &database.Notify{ - Protocol: pro, + for to, muc := range config.Defaults { + def := &database.Notify{ + Protocol: proto, To: to, RegexIn: make(map[string]*regexp.Regexp), MaxPrioIn: log.DebugLevel, } - out.Join(to) + if muc { + def.Protocol = protoGroup + out.Join(to) + } + out.defaults = append(out.defaults, def) } - out.defaults = append(out.defaults, def) - } - for _, toAddresses := range db.NotifiesByAddress { - if toAddresses.Protocol == protoGroup { - out.Join(toAddresses.To) + for _, toAddresses := range db.NotifiesByAddress { + if toAddresses.Protocol == protoGroup { + out.Join(toAddresses.To) + } } - } + logger.Info("join muc after connect") + }) + go func() { + cm.Run() + log.Panic("closed connection") + }() + + logger.WithField("jid", config.JID).Info("startup") return out } @@ -103,17 +102,18 @@ func (out *Output) Default() []*database.Notify { func (out *Output) Close() { for jid := range out.channels { - toJID := xmppbase.NewJID(jid) - toJID.Resource = nickname - err := out.client.Send(&xmpp.PresenceClient{ - To: toJID, - Type: xmpp.PresenceTypeUnavailable, - }) + toJID, err := xmpp.NewJid(jid) if err != nil { - logger.Error("xmpp could not leave ", toJID.String(), " error:", err) + logger.Error("xmpp could generate jid to leave ", jid, " error:", err) + } + toJID.Resource = nickname + if err = out.client.Send(stanza.Presence{Attrs: stanza.Attrs{ + To: toJID.Full(), + Type: stanza.PresenceTypeUnavailable, + }}); err != nil { + logger.Error("xmpp could not leave ", toJID.Full(), " error:", err) } } - out.client.Close() } func init() { diff --git a/output/xmpp/recv.go b/output/xmpp/recv.go index 5b7d49e..6f97b98 100644 --- a/output/xmpp/recv.go +++ b/output/xmpp/recv.go @@ -1,75 +1,109 @@ package xmpp import ( - xmpp "dev.sum7.eu/genofire/yaja/xmpp" - "github.com/bdlm/log" + "time" + + "gosrc.io/xmpp" + "gosrc.io/xmpp/stanza" ) -func (out *Output) receiver() { - for { - element, more := out.client.Recv() - if !more { - log.Warn("could not receive new message, try later") - continue - } - out.recv(element) +func (out *Output) recvMessage(s xmpp.Sender, p stanza.Packet) { + before := time.Now() + + msg, ok := p.(stanza.Message) + if !ok { + logger.Errorf("blame gosrc.io/xmpp for routing: %s", p) + return } + logger.WithFields(map[string]interface{}{ + "sender": msg.From, + "request": msg.Body, + }).Debug("handling bot message") + + from, err := xmpp.NewJid(msg.From) + if err != nil { + logger.Errorf("blame gosrc.io/xmpp for jid encoding: %s", msg.From) + return + } + + fromBare := from.Bare() + fromLogmania := "" + if msg.Type == stanza.MessageTypeGroupchat { + fromLogmania = protoGroup + ":" + fromBare + } else { + fromLogmania = proto + ":" + fromBare + } + + answer := out.bot.Handle(fromLogmania, msg.Body) + if answer == "" { + return + } + if err := s.Send(stanza.Message{Attrs: stanza.Attrs{To: fromBare, Type: msg.Type}, Body: answer}); err != nil { + logger.WithFields(map[string]interface{}{ + "sender": fromLogmania, + "request": msg.Body, + "answer": answer, + }).Errorf("unable to send bot answer: %s", err) + } + + after := time.Now() + delta := after.Sub(before) + + logger.WithFields(map[string]interface{}{ + "sender": fromLogmania, + "request": msg.Body, + "answer": answer, + "ms": float64(delta) / float64(time.Millisecond), + }).Debug("handled xmpp bot message") } -func (out *Output) recv(element interface{}) { - switch element.(type) { - case *xmpp.PresenceClient: - pres := element.(*xmpp.PresenceClient) - sender := pres.From - logPres := logger.WithField("from", sender.Full()) - switch pres.Type { - case xmpp.PresenceTypeSubscribe: - logPres.Debugf("recv presence subscribe") - pres.Type = xmpp.PresenceTypeSubscribed - pres.To = sender - pres.From = nil - out.botOut <- pres - logPres.Debugf("accept new subscribe") +func (out *Output) recvPresence(s xmpp.Sender, p stanza.Packet) { + pres, ok := p.(stanza.Presence) + if !ok { + logger.Errorf("blame gosrc.io/xmpp for routing: %s", p) + return + } + from, err := xmpp.NewJid(pres.From) + if err != nil { + logger.Errorf("blame gosrc.io/xmpp for jid encoding: %s", pres.From) + return + } + fromBare := from.Bare() + logPres := logger.WithField("from", from) - pres.Type = xmpp.PresenceTypeSubscribe - pres.ID = "" - out.botOut <- pres - logPres.Info("request also subscribe") - case xmpp.PresenceTypeSubscribed: - logPres.Info("recv presence accepted subscribe") - case xmpp.PresenceTypeUnsubscribe: - logPres.Info("recv presence remove subscribe") - case xmpp.PresenceTypeUnsubscribed: - logPres.Info("recv presence removed subscribe") - case xmpp.PresenceTypeUnavailable: - logPres.Debug("recv presence unavailable") - case "": - logPres.Debug("recv empty presence, maybe from joining muc") - return - default: - logPres.Warnf("recv presence unsupported: %s -> %s", pres.Type, xmpp.XMLChildrenString(pres)) - } - case *xmpp.MessageClient: - msg := element.(*xmpp.MessageClient) - from := msg.From.Bare().String() - if msg.Type == xmpp.MessageTypeGroupchat { - from = protoGroup + ":" + from - } else { - from = proto + ":" + from - } - - answer := out.bot.Handle(from, msg.Body) - if answer == "" { + switch pres.Type { + case stanza.PresenceTypeSubscribe: + logPres.Debugf("recv presence subscribe") + if err := s.Send(stanza.Presence{Attrs: stanza.Attrs{ + Type: stanza.PresenceTypeSubscribed, + To: fromBare, + Id: pres.Id, + }}); err != nil { + logPres.WithField("user", pres.From).Errorf("answer of subscribe not send: %s", err) return } - to := msg.From - if msg.Type == xmpp.MessageTypeGroupchat && !to.IsBare() { - to = to.Bare() - } - out.botOut <- &xmpp.MessageClient{ - Type: msg.Type, - To: to, - Body: answer, + logPres.Debugf("accept new subscribe") + + if err := s.Send(stanza.Presence{Attrs: stanza.Attrs{ + Type: stanza.PresenceTypeSubscribe, + To: fromBare, + }}); err != nil { + logPres.WithField("user", pres.From).Errorf("request of subscribe not send: %s", err) + return } + logPres.Info("request also subscribe") + case stanza.PresenceTypeSubscribed: + logPres.Info("recv presence accepted subscribe") + case stanza.PresenceTypeUnsubscribe: + logPres.Info("recv presence remove subscribe") + case stanza.PresenceTypeUnsubscribed: + logPres.Info("recv presence removed subscribe") + case stanza.PresenceTypeUnavailable: + logPres.Debug("recv presence unavailable") + case "": + logPres.Debug("recv empty presence, maybe from joining muc") + return + default: + logPres.Warnf("recv presence unsupported: %s -> %v", pres.Type, pres) } } diff --git a/output/xmpp/send.go b/output/xmpp/send.go index c47c06e..17a673c 100644 --- a/output/xmpp/send.go +++ b/output/xmpp/send.go @@ -1,94 +1,77 @@ package xmpp import ( - "encoding/xml" "strings" - xmpp "dev.sum7.eu/genofire/yaja/xmpp" - "dev.sum7.eu/genofire/yaja/xmpp/base" - "dev.sum7.eu/genofire/yaja/xmpp/x/muc" "github.com/bdlm/log" + "gosrc.io/xmpp" + "gosrc.io/xmpp/stanza" "dev.sum7.eu/genofire/logmania/database" ) func (out *Output) Join(to string) { - toJID := xmppbase.NewJID(to) - toJID.Resource = nickname - err := out.client.Send(&xmpp.PresenceClient{ - To: toJID, - MUC: &xmuc.Base{ - History: &xmuc.History{ - MaxChars: &historyMaxChars, - }, - }, - }) + toJID, err := xmpp.NewJid(to) if err != nil { - logger.Error("xmpp could not join ", toJID.String(), " error:", err) + logger.Errorf("jid not generate to join muc %s : %s", to, err) + return + } + toJID.Resource = nickname + + if err = out.client.Send(stanza.Presence{Attrs: stanza.Attrs{To: toJID.Full()}, + Extensions: []stanza.PresExtension{ + stanza.MucPresence{ + History: stanza.History{MaxStanzas: stanza.NewNullableInt(0)}, + }}, + }); err != nil { + logger.Errorf("muc not join %s : %s", toJID.Full(), err) } else { out.channels[to] = true } } -func (out *Output) sender() { - // priority of bot higher: https://groups.google.com/forum/#!topic/golang-nuts/M2xjN_yWBiQ - for { - select { - case el := <-out.botOut: - out.client.Send(el) - default: - select { - case el := <-out.logOut: - out.client.Send(el) - default: - } - } - } -} func (out *Output) Send(e *log.Entry, to *database.Notify) bool { + if out.client == nil { + logger.Error("xmpp not connected (yet)") + return false + } html, text := formatLog(e) if html == "" || text == "" { logger.Error("during format notify") return false } html = strings.TrimRight(to.RunReplace(html), "\n") - var body xmpp.XMLElement - xml.Unmarshal([]byte(html), &body) - text = strings.TrimRight(to.RunReplace(text), "\n") + msg := stanza.Message{ + Attrs: stanza.Attrs{ + To: to.To, + }, + Body: text, + Extensions: []stanza.MsgExtension{ + stanza.HTML{Body: stanza.HTMLBody{InnerXML: html}}, + }, + } if to.Protocol == protoGroup { if _, ok := out.channels[to.To]; ok { - toJID := xmppbase.NewJID(to.To) - toJID.Resource = nickname - err := out.client.Send(&xmpp.PresenceClient{ - To: toJID, - MUC: &xmuc.Base{ - History: &xmuc.History{ - MaxChars: &historyMaxChars, - }, - }, - }) - if err != nil { - logger.Error("xmpp could not join ", toJID.String(), " error:", err) - } else { - out.channels[to.To] = true - } + out.Join(to.To) } - out.logOut <- &xmpp.MessageClient{ - Type: xmpp.MessageTypeGroupchat, - To: xmppbase.NewJID(to.To), - Body: text, - HTML: &xmpp.HTML{Body: xmpp.HTMLBody{Body: body}}, + msg.Type = stanza.MessageTypeGroupchat + if err := out.client.Send(msg); err != nil { + logger.WithFields(map[string]interface{}{ + "muc": to.To, + "text": text, + }).Errorf("log message not forwarded: %s", err) } return true } if to.Protocol == proto { - out.logOut <- &xmpp.MessageClient{ - Type: xmpp.MessageTypeChat, - To: xmppbase.NewJID(to.To), - Body: text, - HTML: &xmpp.HTML{Body: xmpp.HTMLBody{Body: body}}, + msg.Type = stanza.MessageTypeChat + if err := out.client.Send(msg); err != nil { + logger.WithFields(map[string]interface{}{ + "user": to.To, + "text": text, + }).Errorf("log message not forwarded: %s", err) } return true }