cleanup xmpp + prio in sending

This commit is contained in:
Martin/Geno 2019-06-21 05:20:34 +02:00
parent 346f3544d5
commit 0ebee067dc
No known key found for this signature in database
GPG Key ID: 9D7D3C6BFF600C6A
4 changed files with 187 additions and 158 deletions

View File

@ -122,5 +122,5 @@ func formatLog(entry *log.Entry) (string, string) {
if err := tempLog.Execute(logLine, data); err != nil {
return "formating error", "formating error"
}
return logLine.String(), fmt.Sprintf("%s> %s: %s", data.Hostname, log.LevelString(entry.Level), entry.Message)
return logLine.String(), fmt.Sprintf("[%s] %s > %s", data.Hostname, log.LevelString(entry.Level), entry.Message)
}

View File

@ -1,14 +1,11 @@
package xmpp
import (
"encoding/xml"
"regexp"
"strings"
xmpp_client "dev.sum7.eu/genofire/yaja/client"
xmpp "dev.sum7.eu/genofire/yaja/xmpp"
"dev.sum7.eu/genofire/yaja/xmpp/base"
"dev.sum7.eu/genofire/yaja/xmpp/x/muc"
"github.com/bdlm/log"
"github.com/mitchellh/mapstructure"
@ -29,10 +26,12 @@ var logger = log.WithField("output", proto)
type Output struct {
output.Output
defaults []*database.Notify
client *xmpp_client.Client
channels map[string]bool
formatter log.Formatter
defaults []*database.Notify
channels map[string]bool
bot *bot.Bot
client *xmpp_client.Client
botOut chan interface{}
logOut chan interface{}
}
type OutputConfig struct {
@ -47,7 +46,6 @@ func Init(configInterface interface{}, db *database.DB, bot *bot.Bot) output.Out
logger.Warnf("not able to decode data: %s", err)
return nil
}
channels := make(map[string]bool)
jid := xmppbase.NewJID(config.JID)
client, err := xmpp_client.NewClient(jid, config.Password)
@ -60,86 +58,18 @@ func Init(configInterface interface{}, db *database.DB, bot *bot.Bot) output.Out
client.Start()
log.Panic("closed connection")
}()
go func() {
for {
element, more := client.Recv()
if !more {
log.Warn("could not receive new message, try later")
continue
}
switch element.(type) {
case *xmpp.PresenceClient:
pres := element.(*xmpp.PresenceClient)
sender := pres.From
logPres := logger.WithField("from", sender.Full())
switch pres.Type {
case xmpp.PresenceTypeSubscribe:
logPres.Debugf("recv presence subscribe")
pres.Type = xmpp.PresenceTypeSubscribed
pres.To = sender
pres.From = nil
client.Send(pres)
logPres.Debugf("accept new subscribe")
pres.Type = xmpp.PresenceTypeSubscribe
pres.ID = ""
client.Send(pres)
logPres.Info("request also subscribe")
case xmpp.PresenceTypeSubscribed:
logPres.Info("recv presence accepted subscribe")
case xmpp.PresenceTypeUnsubscribe:
logPres.Info("recv presence remove subscribe")
case xmpp.PresenceTypeUnsubscribed:
logPres.Info("recv presence removed subscribe")
case xmpp.PresenceTypeUnavailable:
logPres.Debug("recv presence unavailable")
case "":
logPres.Debug("recv empty presence, maybe from joining muc")
continue
default:
logPres.Warnf("recv presence unsupported: %s -> %s", pres.Type, xmpp.XMLChildrenString(pres))
}
case *xmpp.MessageClient:
msg := element.(*xmpp.MessageClient)
from := msg.From.Bare().String()
if msg.Type == xmpp.MessageTypeGroupchat {
from = protoGroup + ":" + from
} else {
from = proto + ":" + from
}
answer := bot.Handle(from, msg.Body)
if answer == "" {
continue
}
to := msg.From
if msg.Type == xmpp.MessageTypeGroupchat && !to.IsBare() {
to = to.Bare()
}
err := client.Send(&xmpp.MessageClient{
Type: msg.Type,
To: to,
Body: answer,
})
if err != nil {
logger.Error("xmpp to ", msg.From.String(), " error:", err)
}
}
}
}()
out := &Output{
channels: make(map[string]bool),
bot: bot,
client: client,
botOut: make(chan interface{}),
logOut: make(chan interface{}),
}
go out.sender()
go out.receiver()
logger.WithField("jid", config.JID).Info("startup")
out := &Output{
channels: channels,
client: client,
formatter: &log.TextFormatter{
DisableCaller: true,
DisableTimestamp: true,
},
}
for to, muc := range config.Defaults {
def := &database.Notify{
Protocol: proto,
@ -161,82 +91,10 @@ func Init(configInterface interface{}, db *database.DB, bot *bot.Bot) output.Out
return out
}
func (out *Output) Join(to string) {
toJID := xmppbase.NewJID(to)
toJID.Resource = nickname
err := out.client.Send(&xmpp.PresenceClient{
To: toJID,
MUC: &xmuc.Base{
History: &xmuc.History{
MaxChars: &historyMaxChars,
},
},
})
if err != nil {
logger.Error("xmpp could not join ", toJID.String(), " error:", err)
} else {
out.channels[to] = true
}
}
func (out *Output) Default() []*database.Notify {
return out.defaults
}
func (out *Output) Send(e *log.Entry, to *database.Notify) bool {
html, text := formatLog(e)
if html == "" || text == "" {
logger.Error("during format notify")
return false
}
html = strings.TrimRight(to.RunReplace(html), "\n")
var body xmpp.XMLElement
xml.Unmarshal([]byte(html), &body)
text = strings.TrimRight(to.RunReplace(text), "\n")
if to.Protocol == protoGroup {
if _, ok := out.channels[to.To]; ok {
toJID := xmppbase.NewJID(to.To)
toJID.Resource = nickname
err := out.client.Send(&xmpp.PresenceClient{
To: toJID,
MUC: &xmuc.Base{
History: &xmuc.History{
MaxChars: &historyMaxChars,
},
},
})
if err != nil {
logger.Error("xmpp could not join ", toJID.String(), " error:", err)
} else {
out.channels[to.To] = true
}
}
if err := out.client.Send(&xmpp.MessageClient{
Type: xmpp.MessageTypeGroupchat,
To: xmppbase.NewJID(to.To),
Body: text,
HTML: &xmpp.HTML{Body: xmpp.HTMLBody{Body: body}},
}); err != nil {
logger.Error("xmpp to ", to.To, " error:", err)
}
return true
}
if to.Protocol == proto {
if err := out.client.Send(&xmpp.MessageClient{
Type: xmpp.MessageTypeChat,
To: xmppbase.NewJID(to.To),
Body: text,
HTML: &xmpp.HTML{Body: xmpp.HTMLBody{Body: body}},
}); err != nil {
logger.Error("xmpp to ", to, " error:", err)
}
return true
}
return false
}
func (out *Output) Close() {
for jid := range out.channels {
toJID := xmppbase.NewJID(jid)

75
output/xmpp/recv.go Normal file
View File

@ -0,0 +1,75 @@
package xmpp
import (
xmpp "dev.sum7.eu/genofire/yaja/xmpp"
"github.com/bdlm/log"
)
func (out *Output) receiver() {
for {
element, more := out.client.Recv()
if !more {
log.Warn("could not receive new message, try later")
continue
}
out.recv(element)
}
}
func (out *Output) recv(element interface{}) {
switch element.(type) {
case *xmpp.PresenceClient:
pres := element.(*xmpp.PresenceClient)
sender := pres.From
logPres := logger.WithField("from", sender.Full())
switch pres.Type {
case xmpp.PresenceTypeSubscribe:
logPres.Debugf("recv presence subscribe")
pres.Type = xmpp.PresenceTypeSubscribed
pres.To = sender
pres.From = nil
out.botOut <- pres
logPres.Debugf("accept new subscribe")
pres.Type = xmpp.PresenceTypeSubscribe
pres.ID = ""
out.botOut <- pres
logPres.Info("request also subscribe")
case xmpp.PresenceTypeSubscribed:
logPres.Info("recv presence accepted subscribe")
case xmpp.PresenceTypeUnsubscribe:
logPres.Info("recv presence remove subscribe")
case xmpp.PresenceTypeUnsubscribed:
logPres.Info("recv presence removed subscribe")
case xmpp.PresenceTypeUnavailable:
logPres.Debug("recv presence unavailable")
case "":
logPres.Debug("recv empty presence, maybe from joining muc")
return
default:
logPres.Warnf("recv presence unsupported: %s -> %s", pres.Type, xmpp.XMLChildrenString(pres))
}
case *xmpp.MessageClient:
msg := element.(*xmpp.MessageClient)
from := msg.From.Bare().String()
if msg.Type == xmpp.MessageTypeGroupchat {
from = protoGroup + ":" + from
} else {
from = proto + ":" + from
}
answer := out.bot.Handle(from, msg.Body)
if answer == "" {
return
}
to := msg.From
if msg.Type == xmpp.MessageTypeGroupchat && !to.IsBare() {
to = to.Bare()
}
out.botOut <- &xmpp.MessageClient{
Type: msg.Type,
To: to,
Body: answer,
}
}
}

96
output/xmpp/send.go Normal file
View File

@ -0,0 +1,96 @@
package xmpp
import (
"encoding/xml"
"strings"
xmpp "dev.sum7.eu/genofire/yaja/xmpp"
"dev.sum7.eu/genofire/yaja/xmpp/base"
"dev.sum7.eu/genofire/yaja/xmpp/x/muc"
"github.com/bdlm/log"
"dev.sum7.eu/genofire/logmania/database"
)
func (out *Output) Join(to string) {
toJID := xmppbase.NewJID(to)
toJID.Resource = nickname
err := out.client.Send(&xmpp.PresenceClient{
To: toJID,
MUC: &xmuc.Base{
History: &xmuc.History{
MaxChars: &historyMaxChars,
},
},
})
if err != nil {
logger.Error("xmpp could not join ", toJID.String(), " error:", err)
} else {
out.channels[to] = true
}
}
func (out *Output) sender() {
// priority of bot higher: https://groups.google.com/forum/#!topic/golang-nuts/M2xjN_yWBiQ
for {
select {
case el := <-out.botOut:
out.client.Send(el)
default:
select {
case el := <-out.logOut:
out.client.Send(el)
default:
}
}
}
}
func (out *Output) Send(e *log.Entry, to *database.Notify) bool {
html, text := formatLog(e)
if html == "" || text == "" {
logger.Error("during format notify")
return false
}
html = strings.TrimRight(to.RunReplace(html), "\n")
var body xmpp.XMLElement
xml.Unmarshal([]byte(html), &body)
text = strings.TrimRight(to.RunReplace(text), "\n")
if to.Protocol == protoGroup {
if _, ok := out.channels[to.To]; ok {
toJID := xmppbase.NewJID(to.To)
toJID.Resource = nickname
err := out.client.Send(&xmpp.PresenceClient{
To: toJID,
MUC: &xmuc.Base{
History: &xmuc.History{
MaxChars: &historyMaxChars,
},
},
})
if err != nil {
logger.Error("xmpp could not join ", toJID.String(), " error:", err)
} else {
out.channels[to.To] = true
}
}
out.logOut <- &xmpp.MessageClient{
Type: xmpp.MessageTypeGroupchat,
To: xmppbase.NewJID(to.To),
Body: text,
HTML: &xmpp.HTML{Body: xmpp.HTMLBody{Body: body}},
}
return true
}
if to.Protocol == proto {
out.logOut <- &xmpp.MessageClient{
Type: xmpp.MessageTypeChat,
To: xmppbase.NewJID(to.To),
Body: text,
HTML: &xmpp.HTML{Body: xmpp.HTMLBody{Body: body}},
}
return true
}
return false
}