Change XMPP lib

This commit is contained in:
Martin/Geno 2019-07-17 22:31:15 +02:00
parent 0a4adad430
commit b7c7e895b9
4 changed files with 198 additions and 168 deletions

View File

@ -2,6 +2,7 @@ package all
import ( import (
"github.com/bdlm/log" "github.com/bdlm/log"
"time"
"dev.sum7.eu/genofire/logmania/bot" "dev.sum7.eu/genofire/logmania/bot"
"dev.sum7.eu/genofire/logmania/database" "dev.sum7.eu/genofire/logmania/database"
@ -69,7 +70,19 @@ func (out *Output) sender() {
} }
func (out *Output) Send(e *log.Entry, to *database.Notify) bool { func (out *Output) Send(e *log.Entry, to *database.Notify) bool {
before := time.Now()
logger := log.WithFields(e.Data)
logger = logger.WithField("msg", e.Message)
logger.Debugf("starting forward message")
out.channelNotify <- e out.channelNotify <- e
after := time.Now()
delta := after.Sub(before)
logger.WithField("ms", float64(delta)/float64(time.Millisecond)).Debugf("end forward message")
return true return true
} }

View File

@ -3,11 +3,10 @@ package xmpp
import ( import (
"regexp" "regexp"
xmpp_client "dev.sum7.eu/genofire/yaja/client"
xmpp "dev.sum7.eu/genofire/yaja/xmpp"
"dev.sum7.eu/genofire/yaja/xmpp/base"
"github.com/bdlm/log" "github.com/bdlm/log"
"github.com/mitchellh/mapstructure" "github.com/mitchellh/mapstructure"
"gosrc.io/xmpp"
"gosrc.io/xmpp/stanza"
"dev.sum7.eu/genofire/logmania/bot" "dev.sum7.eu/genofire/logmania/bot"
"dev.sum7.eu/genofire/logmania/database" "dev.sum7.eu/genofire/logmania/database"
@ -20,8 +19,6 @@ const (
nickname = "logmania" nickname = "logmania"
) )
var historyMaxChars = 0
var logger = log.WithField("output", proto) var logger = log.WithField("output", proto)
type Output struct { type Output struct {
@ -29,12 +26,13 @@ type Output struct {
defaults []*database.Notify defaults []*database.Notify
channels map[string]bool channels map[string]bool
bot *bot.Bot bot *bot.Bot
client *xmpp_client.Client client xmpp.Sender
botOut chan interface{} botOut chan interface{}
logOut chan interface{} logOut chan interface{}
} }
type OutputConfig struct { type OutputConfig struct {
Address string `mapstructure:"address"`
JID string `mapstructure:"jid"` JID string `mapstructure:"jid"`
Password string `mapstructure:"password"` Password string `mapstructure:"password"`
Defaults map[string]bool `mapstructure:"default"` Defaults map[string]bool `mapstructure:"default"`
@ -47,53 +45,54 @@ func Init(configInterface interface{}, db *database.DB, bot *bot.Bot) output.Out
return nil return nil
} }
jid := xmppbase.NewJID(config.JID) out := &Output{
client, err := xmpp_client.NewClient(jid, config.Password) channels: make(map[string]bool),
bot: bot,
}
router := xmpp.NewRouter()
router.HandleFunc("message", out.recvMessage)
router.HandleFunc("presence", out.recvPresence)
client, err := xmpp.NewClient(xmpp.Config{
Address: config.Address,
Jid: config.JID,
Password: config.Password,
}, router)
if err != nil { if err != nil {
logger.Error(err) logger.Error(err)
return nil return nil
} }
go func() { cm := xmpp.NewStreamManager(client, func(c xmpp.Sender) {
client.Start() out.client = c
log.Panic("closed connection")
}()
out := &Output{
channels: make(map[string]bool),
bot: bot,
client: client,
botOut: make(chan interface{}),
logOut: make(chan interface{}),
}
go out.sender()
go out.receiver()
logger.WithField("jid", config.JID).Info("startup") for to, muc := range config.Defaults {
def := &database.Notify{
for to, muc := range config.Defaults { Protocol: proto,
var def *database.Notify
pro := proto
if muc {
pro = protoGroup
}
if dbNotify, ok := db.NotifiesByAddress[pro+":"+to]; ok {
def = dbNotify
} else {
def = &database.Notify{
Protocol: pro,
To: to, To: to,
RegexIn: make(map[string]*regexp.Regexp), RegexIn: make(map[string]*regexp.Regexp),
MaxPrioIn: log.DebugLevel, MaxPrioIn: log.DebugLevel,
} }
out.Join(to) if muc {
def.Protocol = protoGroup
out.Join(to)
}
out.defaults = append(out.defaults, def)
} }
out.defaults = append(out.defaults, def) for _, toAddresses := range db.NotifiesByAddress {
} if toAddresses.Protocol == protoGroup {
for _, toAddresses := range db.NotifiesByAddress { out.Join(toAddresses.To)
if toAddresses.Protocol == protoGroup { }
out.Join(toAddresses.To)
} }
} logger.Info("join muc after connect")
})
go func() {
cm.Run()
log.Panic("closed connection")
}()
logger.WithField("jid", config.JID).Info("startup")
return out return out
} }
@ -103,17 +102,18 @@ func (out *Output) Default() []*database.Notify {
func (out *Output) Close() { func (out *Output) Close() {
for jid := range out.channels { for jid := range out.channels {
toJID := xmppbase.NewJID(jid) toJID, err := xmpp.NewJid(jid)
toJID.Resource = nickname
err := out.client.Send(&xmpp.PresenceClient{
To: toJID,
Type: xmpp.PresenceTypeUnavailable,
})
if err != nil { if err != nil {
logger.Error("xmpp could not leave ", toJID.String(), " error:", err) logger.Error("xmpp could generate jid to leave ", jid, " error:", err)
}
toJID.Resource = nickname
if err = out.client.Send(stanza.Presence{Attrs: stanza.Attrs{
To: toJID.Full(),
Type: stanza.PresenceTypeUnavailable,
}}); err != nil {
logger.Error("xmpp could not leave ", toJID.Full(), " error:", err)
} }
} }
out.client.Close()
} }
func init() { func init() {

View File

@ -1,75 +1,109 @@
package xmpp package xmpp
import ( import (
xmpp "dev.sum7.eu/genofire/yaja/xmpp" "time"
"github.com/bdlm/log"
"gosrc.io/xmpp"
"gosrc.io/xmpp/stanza"
) )
func (out *Output) receiver() { func (out *Output) recvMessage(s xmpp.Sender, p stanza.Packet) {
for { before := time.Now()
element, more := out.client.Recv()
if !more { msg, ok := p.(stanza.Message)
log.Warn("could not receive new message, try later") if !ok {
continue logger.Errorf("blame gosrc.io/xmpp for routing: %s", p)
} return
out.recv(element)
} }
logger.WithFields(map[string]interface{}{
"sender": msg.From,
"request": msg.Body,
}).Debug("handling bot message")
from, err := xmpp.NewJid(msg.From)
if err != nil {
logger.Errorf("blame gosrc.io/xmpp for jid encoding: %s", msg.From)
return
}
fromBare := from.Bare()
fromLogmania := ""
if msg.Type == stanza.MessageTypeGroupchat {
fromLogmania = protoGroup + ":" + fromBare
} else {
fromLogmania = proto + ":" + fromBare
}
answer := out.bot.Handle(fromLogmania, msg.Body)
if answer == "" {
return
}
if err := s.Send(stanza.Message{Attrs: stanza.Attrs{To: fromBare, Type: msg.Type}, Body: answer}); err != nil {
logger.WithFields(map[string]interface{}{
"sender": fromLogmania,
"request": msg.Body,
"answer": answer,
}).Errorf("unable to send bot answer: %s", err)
}
after := time.Now()
delta := after.Sub(before)
logger.WithFields(map[string]interface{}{
"sender": fromLogmania,
"request": msg.Body,
"answer": answer,
"ms": float64(delta) / float64(time.Millisecond),
}).Debug("handled xmpp bot message")
} }
func (out *Output) recv(element interface{}) {
switch element.(type) { func (out *Output) recvPresence(s xmpp.Sender, p stanza.Packet) {
case *xmpp.PresenceClient: pres, ok := p.(stanza.Presence)
pres := element.(*xmpp.PresenceClient) if !ok {
sender := pres.From logger.Errorf("blame gosrc.io/xmpp for routing: %s", p)
logPres := logger.WithField("from", sender.Full()) return
switch pres.Type { }
case xmpp.PresenceTypeSubscribe: from, err := xmpp.NewJid(pres.From)
logPres.Debugf("recv presence subscribe") if err != nil {
pres.Type = xmpp.PresenceTypeSubscribed logger.Errorf("blame gosrc.io/xmpp for jid encoding: %s", pres.From)
pres.To = sender return
pres.From = nil }
out.botOut <- pres fromBare := from.Bare()
logPres.Debugf("accept new subscribe") logPres := logger.WithField("from", from)
pres.Type = xmpp.PresenceTypeSubscribe switch pres.Type {
pres.ID = "" case stanza.PresenceTypeSubscribe:
out.botOut <- pres logPres.Debugf("recv presence subscribe")
logPres.Info("request also subscribe") if err := s.Send(stanza.Presence{Attrs: stanza.Attrs{
case xmpp.PresenceTypeSubscribed: Type: stanza.PresenceTypeSubscribed,
logPres.Info("recv presence accepted subscribe") To: fromBare,
case xmpp.PresenceTypeUnsubscribe: Id: pres.Id,
logPres.Info("recv presence remove subscribe") }}); err != nil {
case xmpp.PresenceTypeUnsubscribed: logPres.WithField("user", pres.From).Errorf("answer of subscribe not send: %s", err)
logPres.Info("recv presence removed subscribe")
case xmpp.PresenceTypeUnavailable:
logPres.Debug("recv presence unavailable")
case "":
logPres.Debug("recv empty presence, maybe from joining muc")
return
default:
logPres.Warnf("recv presence unsupported: %s -> %s", pres.Type, xmpp.XMLChildrenString(pres))
}
case *xmpp.MessageClient:
msg := element.(*xmpp.MessageClient)
from := msg.From.Bare().String()
if msg.Type == xmpp.MessageTypeGroupchat {
from = protoGroup + ":" + from
} else {
from = proto + ":" + from
}
answer := out.bot.Handle(from, msg.Body)
if answer == "" {
return return
} }
to := msg.From logPres.Debugf("accept new subscribe")
if msg.Type == xmpp.MessageTypeGroupchat && !to.IsBare() {
to = to.Bare() if err := s.Send(stanza.Presence{Attrs: stanza.Attrs{
} Type: stanza.PresenceTypeSubscribe,
out.botOut <- &xmpp.MessageClient{ To: fromBare,
Type: msg.Type, }}); err != nil {
To: to, logPres.WithField("user", pres.From).Errorf("request of subscribe not send: %s", err)
Body: answer, return
} }
logPres.Info("request also subscribe")
case stanza.PresenceTypeSubscribed:
logPres.Info("recv presence accepted subscribe")
case stanza.PresenceTypeUnsubscribe:
logPres.Info("recv presence remove subscribe")
case stanza.PresenceTypeUnsubscribed:
logPres.Info("recv presence removed subscribe")
case stanza.PresenceTypeUnavailable:
logPres.Debug("recv presence unavailable")
case "":
logPres.Debug("recv empty presence, maybe from joining muc")
return
default:
logPres.Warnf("recv presence unsupported: %s -> %v", pres.Type, pres)
} }
} }

View File

@ -1,94 +1,77 @@
package xmpp package xmpp
import ( import (
"encoding/xml"
"strings" "strings"
xmpp "dev.sum7.eu/genofire/yaja/xmpp"
"dev.sum7.eu/genofire/yaja/xmpp/base"
"dev.sum7.eu/genofire/yaja/xmpp/x/muc"
"github.com/bdlm/log" "github.com/bdlm/log"
"gosrc.io/xmpp"
"gosrc.io/xmpp/stanza"
"dev.sum7.eu/genofire/logmania/database" "dev.sum7.eu/genofire/logmania/database"
) )
func (out *Output) Join(to string) { func (out *Output) Join(to string) {
toJID := xmppbase.NewJID(to) toJID, err := xmpp.NewJid(to)
toJID.Resource = nickname
err := out.client.Send(&xmpp.PresenceClient{
To: toJID,
MUC: &xmuc.Base{
History: &xmuc.History{
MaxChars: &historyMaxChars,
},
},
})
if err != nil { if err != nil {
logger.Error("xmpp could not join ", toJID.String(), " error:", err) logger.Errorf("jid not generate to join muc %s : %s", to, err)
return
}
toJID.Resource = nickname
if err = out.client.Send(stanza.Presence{Attrs: stanza.Attrs{To: toJID.Full()},
Extensions: []stanza.PresExtension{
stanza.MucPresence{
History: stanza.History{MaxStanzas: stanza.NewNullableInt(0)},
}},
}); err != nil {
logger.Errorf("muc not join %s : %s", toJID.Full(), err)
} else { } else {
out.channels[to] = true out.channels[to] = true
} }
} }
func (out *Output) sender() {
// priority of bot higher: https://groups.google.com/forum/#!topic/golang-nuts/M2xjN_yWBiQ
for {
select {
case el := <-out.botOut:
out.client.Send(el)
default:
select {
case el := <-out.logOut:
out.client.Send(el)
default:
}
}
}
}
func (out *Output) Send(e *log.Entry, to *database.Notify) bool { func (out *Output) Send(e *log.Entry, to *database.Notify) bool {
if out.client == nil {
logger.Error("xmpp not connected (yet)")
return false
}
html, text := formatLog(e) html, text := formatLog(e)
if html == "" || text == "" { if html == "" || text == "" {
logger.Error("during format notify") logger.Error("during format notify")
return false return false
} }
html = strings.TrimRight(to.RunReplace(html), "\n") html = strings.TrimRight(to.RunReplace(html), "\n")
var body xmpp.XMLElement
xml.Unmarshal([]byte(html), &body)
text = strings.TrimRight(to.RunReplace(text), "\n") text = strings.TrimRight(to.RunReplace(text), "\n")
msg := stanza.Message{
Attrs: stanza.Attrs{
To: to.To,
},
Body: text,
Extensions: []stanza.MsgExtension{
stanza.HTML{Body: stanza.HTMLBody{InnerXML: html}},
},
}
if to.Protocol == protoGroup { if to.Protocol == protoGroup {
if _, ok := out.channels[to.To]; ok { if _, ok := out.channels[to.To]; ok {
toJID := xmppbase.NewJID(to.To) out.Join(to.To)
toJID.Resource = nickname
err := out.client.Send(&xmpp.PresenceClient{
To: toJID,
MUC: &xmuc.Base{
History: &xmuc.History{
MaxChars: &historyMaxChars,
},
},
})
if err != nil {
logger.Error("xmpp could not join ", toJID.String(), " error:", err)
} else {
out.channels[to.To] = true
}
} }
out.logOut <- &xmpp.MessageClient{ msg.Type = stanza.MessageTypeGroupchat
Type: xmpp.MessageTypeGroupchat, if err := out.client.Send(msg); err != nil {
To: xmppbase.NewJID(to.To), logger.WithFields(map[string]interface{}{
Body: text, "muc": to.To,
HTML: &xmpp.HTML{Body: xmpp.HTMLBody{Body: body}}, "text": text,
}).Errorf("log message not forwarded: %s", err)
} }
return true return true
} }
if to.Protocol == proto { if to.Protocol == proto {
out.logOut <- &xmpp.MessageClient{ msg.Type = stanza.MessageTypeChat
Type: xmpp.MessageTypeChat, if err := out.client.Send(msg); err != nil {
To: xmppbase.NewJID(to.To), logger.WithFields(map[string]interface{}{
Body: text, "user": to.To,
HTML: &xmpp.HTML{Body: xmpp.HTMLBody{Body: body}}, "text": text,
}).Errorf("log message not forwarded: %s", err)
} }
return true return true
} }