diff --git a/output/xmpp/format.go b/output/xmpp/format.go index 5cf81af..e174f07 100644 --- a/output/xmpp/format.go +++ b/output/xmpp/format.go @@ -122,5 +122,5 @@ func formatLog(entry *log.Entry) (string, string) { if err := tempLog.Execute(logLine, data); err != nil { return "formating error", "formating error" } - return logLine.String(), fmt.Sprintf("%s> %s: %s", data.Hostname, log.LevelString(entry.Level), entry.Message) + return logLine.String(), fmt.Sprintf("[%s] %s > %s", data.Hostname, log.LevelString(entry.Level), entry.Message) } diff --git a/output/xmpp/main.go b/output/xmpp/main.go index 551b9e9..0a01999 100644 --- a/output/xmpp/main.go +++ b/output/xmpp/main.go @@ -1,14 +1,11 @@ package xmpp import ( - "encoding/xml" "regexp" - "strings" xmpp_client "dev.sum7.eu/genofire/yaja/client" xmpp "dev.sum7.eu/genofire/yaja/xmpp" "dev.sum7.eu/genofire/yaja/xmpp/base" - "dev.sum7.eu/genofire/yaja/xmpp/x/muc" "github.com/bdlm/log" "github.com/mitchellh/mapstructure" @@ -29,10 +26,12 @@ var logger = log.WithField("output", proto) type Output struct { output.Output - defaults []*database.Notify - client *xmpp_client.Client - channels map[string]bool - formatter log.Formatter + defaults []*database.Notify + channels map[string]bool + bot *bot.Bot + client *xmpp_client.Client + botOut chan interface{} + logOut chan interface{} } type OutputConfig struct { @@ -47,7 +46,6 @@ func Init(configInterface interface{}, db *database.DB, bot *bot.Bot) output.Out logger.Warnf("not able to decode data: %s", err) return nil } - channels := make(map[string]bool) jid := xmppbase.NewJID(config.JID) client, err := xmpp_client.NewClient(jid, config.Password) @@ -60,86 +58,18 @@ func Init(configInterface interface{}, db *database.DB, bot *bot.Bot) output.Out client.Start() log.Panic("closed connection") }() - go func() { - for { - element, more := client.Recv() - if !more { - log.Warn("could not receive new message, try later") - continue - } - - switch element.(type) { - case *xmpp.PresenceClient: - pres := element.(*xmpp.PresenceClient) - sender := pres.From - logPres := logger.WithField("from", sender.Full()) - switch pres.Type { - case xmpp.PresenceTypeSubscribe: - logPres.Debugf("recv presence subscribe") - pres.Type = xmpp.PresenceTypeSubscribed - pres.To = sender - pres.From = nil - client.Send(pres) - logPres.Debugf("accept new subscribe") - - pres.Type = xmpp.PresenceTypeSubscribe - pres.ID = "" - client.Send(pres) - logPres.Info("request also subscribe") - case xmpp.PresenceTypeSubscribed: - logPres.Info("recv presence accepted subscribe") - case xmpp.PresenceTypeUnsubscribe: - logPres.Info("recv presence remove subscribe") - case xmpp.PresenceTypeUnsubscribed: - logPres.Info("recv presence removed subscribe") - case xmpp.PresenceTypeUnavailable: - logPres.Debug("recv presence unavailable") - case "": - logPres.Debug("recv empty presence, maybe from joining muc") - continue - default: - logPres.Warnf("recv presence unsupported: %s -> %s", pres.Type, xmpp.XMLChildrenString(pres)) - } - case *xmpp.MessageClient: - msg := element.(*xmpp.MessageClient) - from := msg.From.Bare().String() - if msg.Type == xmpp.MessageTypeGroupchat { - from = protoGroup + ":" + from - } else { - from = proto + ":" + from - } - - answer := bot.Handle(from, msg.Body) - if answer == "" { - continue - } - to := msg.From - if msg.Type == xmpp.MessageTypeGroupchat && !to.IsBare() { - to = to.Bare() - } - err := client.Send(&xmpp.MessageClient{ - Type: msg.Type, - To: to, - Body: answer, - }) - if err != nil { - logger.Error("xmpp to ", msg.From.String(), " error:", err) - } - } - } - }() + out := &Output{ + channels: make(map[string]bool), + bot: bot, + client: client, + botOut: make(chan interface{}), + logOut: make(chan interface{}), + } + go out.sender() + go out.receiver() logger.WithField("jid", config.JID).Info("startup") - out := &Output{ - channels: channels, - client: client, - formatter: &log.TextFormatter{ - DisableCaller: true, - DisableTimestamp: true, - }, - } - for to, muc := range config.Defaults { def := &database.Notify{ Protocol: proto, @@ -161,82 +91,10 @@ func Init(configInterface interface{}, db *database.DB, bot *bot.Bot) output.Out return out } -func (out *Output) Join(to string) { - toJID := xmppbase.NewJID(to) - toJID.Resource = nickname - err := out.client.Send(&xmpp.PresenceClient{ - To: toJID, - MUC: &xmuc.Base{ - History: &xmuc.History{ - MaxChars: &historyMaxChars, - }, - }, - }) - if err != nil { - logger.Error("xmpp could not join ", toJID.String(), " error:", err) - } else { - out.channels[to] = true - } -} - func (out *Output) Default() []*database.Notify { return out.defaults } -func (out *Output) Send(e *log.Entry, to *database.Notify) bool { - html, text := formatLog(e) - if html == "" || text == "" { - logger.Error("during format notify") - return false - } - html = strings.TrimRight(to.RunReplace(html), "\n") - var body xmpp.XMLElement - xml.Unmarshal([]byte(html), &body) - - text = strings.TrimRight(to.RunReplace(text), "\n") - - if to.Protocol == protoGroup { - if _, ok := out.channels[to.To]; ok { - toJID := xmppbase.NewJID(to.To) - toJID.Resource = nickname - err := out.client.Send(&xmpp.PresenceClient{ - To: toJID, - MUC: &xmuc.Base{ - History: &xmuc.History{ - MaxChars: &historyMaxChars, - }, - }, - }) - if err != nil { - logger.Error("xmpp could not join ", toJID.String(), " error:", err) - } else { - out.channels[to.To] = true - } - } - if err := out.client.Send(&xmpp.MessageClient{ - Type: xmpp.MessageTypeGroupchat, - To: xmppbase.NewJID(to.To), - Body: text, - HTML: &xmpp.HTML{Body: xmpp.HTMLBody{Body: body}}, - }); err != nil { - logger.Error("xmpp to ", to.To, " error:", err) - } - return true - } - if to.Protocol == proto { - if err := out.client.Send(&xmpp.MessageClient{ - Type: xmpp.MessageTypeChat, - To: xmppbase.NewJID(to.To), - Body: text, - HTML: &xmpp.HTML{Body: xmpp.HTMLBody{Body: body}}, - }); err != nil { - logger.Error("xmpp to ", to, " error:", err) - } - return true - } - return false -} - func (out *Output) Close() { for jid := range out.channels { toJID := xmppbase.NewJID(jid) diff --git a/output/xmpp/recv.go b/output/xmpp/recv.go new file mode 100644 index 0000000..5b7d49e --- /dev/null +++ b/output/xmpp/recv.go @@ -0,0 +1,75 @@ +package xmpp + +import ( + xmpp "dev.sum7.eu/genofire/yaja/xmpp" + "github.com/bdlm/log" +) + +func (out *Output) receiver() { + for { + element, more := out.client.Recv() + if !more { + log.Warn("could not receive new message, try later") + continue + } + out.recv(element) + } +} +func (out *Output) recv(element interface{}) { + + switch element.(type) { + case *xmpp.PresenceClient: + pres := element.(*xmpp.PresenceClient) + sender := pres.From + logPres := logger.WithField("from", sender.Full()) + switch pres.Type { + case xmpp.PresenceTypeSubscribe: + logPres.Debugf("recv presence subscribe") + pres.Type = xmpp.PresenceTypeSubscribed + pres.To = sender + pres.From = nil + out.botOut <- pres + logPres.Debugf("accept new subscribe") + + pres.Type = xmpp.PresenceTypeSubscribe + pres.ID = "" + out.botOut <- pres + logPres.Info("request also subscribe") + case xmpp.PresenceTypeSubscribed: + logPres.Info("recv presence accepted subscribe") + case xmpp.PresenceTypeUnsubscribe: + logPres.Info("recv presence remove subscribe") + case xmpp.PresenceTypeUnsubscribed: + logPres.Info("recv presence removed subscribe") + case xmpp.PresenceTypeUnavailable: + logPres.Debug("recv presence unavailable") + case "": + logPres.Debug("recv empty presence, maybe from joining muc") + return + default: + logPres.Warnf("recv presence unsupported: %s -> %s", pres.Type, xmpp.XMLChildrenString(pres)) + } + case *xmpp.MessageClient: + msg := element.(*xmpp.MessageClient) + from := msg.From.Bare().String() + if msg.Type == xmpp.MessageTypeGroupchat { + from = protoGroup + ":" + from + } else { + from = proto + ":" + from + } + + answer := out.bot.Handle(from, msg.Body) + if answer == "" { + return + } + to := msg.From + if msg.Type == xmpp.MessageTypeGroupchat && !to.IsBare() { + to = to.Bare() + } + out.botOut <- &xmpp.MessageClient{ + Type: msg.Type, + To: to, + Body: answer, + } + } +} diff --git a/output/xmpp/send.go b/output/xmpp/send.go new file mode 100644 index 0000000..c47c06e --- /dev/null +++ b/output/xmpp/send.go @@ -0,0 +1,96 @@ +package xmpp + +import ( + "encoding/xml" + "strings" + + xmpp "dev.sum7.eu/genofire/yaja/xmpp" + "dev.sum7.eu/genofire/yaja/xmpp/base" + "dev.sum7.eu/genofire/yaja/xmpp/x/muc" + "github.com/bdlm/log" + + "dev.sum7.eu/genofire/logmania/database" +) + +func (out *Output) Join(to string) { + toJID := xmppbase.NewJID(to) + toJID.Resource = nickname + err := out.client.Send(&xmpp.PresenceClient{ + To: toJID, + MUC: &xmuc.Base{ + History: &xmuc.History{ + MaxChars: &historyMaxChars, + }, + }, + }) + if err != nil { + logger.Error("xmpp could not join ", toJID.String(), " error:", err) + } else { + out.channels[to] = true + } +} + +func (out *Output) sender() { + // priority of bot higher: https://groups.google.com/forum/#!topic/golang-nuts/M2xjN_yWBiQ + for { + select { + case el := <-out.botOut: + out.client.Send(el) + default: + select { + case el := <-out.logOut: + out.client.Send(el) + default: + } + } + } +} +func (out *Output) Send(e *log.Entry, to *database.Notify) bool { + html, text := formatLog(e) + if html == "" || text == "" { + logger.Error("during format notify") + return false + } + html = strings.TrimRight(to.RunReplace(html), "\n") + var body xmpp.XMLElement + xml.Unmarshal([]byte(html), &body) + + text = strings.TrimRight(to.RunReplace(text), "\n") + + if to.Protocol == protoGroup { + if _, ok := out.channels[to.To]; ok { + toJID := xmppbase.NewJID(to.To) + toJID.Resource = nickname + err := out.client.Send(&xmpp.PresenceClient{ + To: toJID, + MUC: &xmuc.Base{ + History: &xmuc.History{ + MaxChars: &historyMaxChars, + }, + }, + }) + if err != nil { + logger.Error("xmpp could not join ", toJID.String(), " error:", err) + } else { + out.channels[to.To] = true + } + } + out.logOut <- &xmpp.MessageClient{ + Type: xmpp.MessageTypeGroupchat, + To: xmppbase.NewJID(to.To), + Body: text, + HTML: &xmpp.HTML{Body: xmpp.HTMLBody{Body: body}}, + } + return true + } + if to.Protocol == proto { + out.logOut <- &xmpp.MessageClient{ + Type: xmpp.MessageTypeChat, + To: xmppbase.NewJID(to.To), + Body: text, + HTML: &xmpp.HTML{Body: xmpp.HTMLBody{Body: body}}, + } + return true + } + return false +}