diff --git a/cmd/server.go b/cmd/server.go index 33042bf..29aea96 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -1,11 +1,13 @@ package cmd import ( + "net/http" "os" "os/signal" "syscall" "time" + "github.com/NYTimes/gziphandler" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -13,11 +15,11 @@ import ( "dev.sum7.eu/genofire/golang-lib/worker" "dev.sum7.eu/genofire/logmania/bot" "dev.sum7.eu/genofire/logmania/database" + "dev.sum7.eu/genofire/logmania/input" + allInput "dev.sum7.eu/genofire/logmania/input/all" "dev.sum7.eu/genofire/logmania/lib" - "dev.sum7.eu/genofire/logmania/notify" - allNotify "dev.sum7.eu/genofire/logmania/notify/all" - "dev.sum7.eu/genofire/logmania/receive" - allReceiver "dev.sum7.eu/genofire/logmania/receive/all" + "dev.sum7.eu/genofire/logmania/output" + allOutput "dev.sum7.eu/genofire/logmania/output/all" ) var ( @@ -25,8 +27,8 @@ var ( config *lib.Config db *database.DB dbSaveWorker *worker.Worker - notifier notify.Notifier - receiver receive.Receiver + out output.Output + in input.Input logChannel chan *log.Entry logmaniaBot *bot.Bot ) @@ -51,24 +53,40 @@ var serverCmd = &cobra.Command{ logmaniaBot = bot.NewBot(db) - notifier = allNotify.Init(&config.Notify, db, logmaniaBot) + out = allOutput.Init(config.Output, db, logmaniaBot) logChannel = make(chan *log.Entry) go func() { for a := range logChannel { - notifier.Send(a, nil) + out.Send(a, nil) } }() - if config.Notify.AlertCheck.Duration > time.Duration(time.Second) { - go db.Alert(config.Notify.AlertCheck.Duration, notifier.Send) + if config.AlertCheck.Duration > time.Duration(time.Second) { + go db.Alert(config.AlertCheck.Duration, out.Send) } log.Info("starting logmania") - receiver = allReceiver.Init(&config.Receive, logChannel) + if config.HTTPAddress != "" { + if config.Webroot != "" { + http.Handle("/", gziphandler.GzipHandler(http.FileServer(http.Dir(config.Webroot)))) + } - go receiver.Listen() + srv := &http.Server{ + Addr: config.HTTPAddress, + } + + go func() { + if err := srv.ListenAndServe(); err != http.ErrServerClosed { + log.Panic(err) + } + }() + } + + in = allInput.Init(config.Input, logChannel) + + go in.Listen() // Wait for system signal sigchan := make(chan os.Signal, 1) @@ -92,8 +110,8 @@ var serverCmd = &cobra.Command{ func quit() { dbSaveWorker.Close() file.SaveJSON(config.DB, db) - receiver.Close() - notifier.Close() + in.Close() + out.Close() log.Info("quit of logmania") os.Exit(0) } @@ -106,12 +124,12 @@ func reload() { log.Errorf("reload: could not load '%s' for new configuration. Skip reload.", configPath) return } - receiver.Close() - receiver = allReceiver.Init(&config.Receive, logChannel) - go receiver.Listen() + in.Close() + in = allInput.Init(config.Input, logChannel) + go in.Listen() - notifier.Close() - notifier = allNotify.Init(&config.Notify, db, logmaniaBot) + out.Close() + out = allOutput.Init(config.Output, db, logmaniaBot) } func init() { diff --git a/database/main.go b/database/main.go index 4bb49b6..3bc5507 100644 --- a/database/main.go +++ b/database/main.go @@ -76,10 +76,10 @@ func (db *DB) Alert(expired time.Duration, send func(e *log.Entry, n *Notify) bo for range c { now := time.Now() for _, h := range db.Hosts { - if !h.Lastseen.Before(now.Add(expired * -2)) { + if h.Lastseen.Before(now.Add(expired * -1)) { continue } - if h.LastseenNotify.Year() <= 1 && h.Lastseen.Before(h.LastseenNotify) { + if h.Lastseen.After(h.LastseenNotify) { continue } h.LastseenNotify = now diff --git a/input/all/internal.go b/input/all/internal.go new file mode 100644 index 0000000..f880dc4 --- /dev/null +++ b/input/all/internal.go @@ -0,0 +1,46 @@ +package all + +import ( + log "github.com/sirupsen/logrus" + + "dev.sum7.eu/genofire/logmania/input" +) + +type Input struct { + input.Input + list []input.Input +} + +func Init(configInterface interface{}, exportChannel chan *log.Entry) input.Input { + config := configInterface.(map[string]interface{}) + + var list []input.Input + for inputType, init := range input.Register { + configForItem := config[inputType] + if configForItem == nil { + log.Warnf("the input type '%s' has no configuration\n", inputType) + continue + } + input := init(configForItem, exportChannel) + + if input == nil { + continue + } + list = append(list, input) + } + return &Input{ + list: list, + } +} + +func (in *Input) Listen() { + for _, item := range in.list { + go item.Listen() + } +} + +func (in *Input) Close() { + for _, item := range in.list { + item.Close() + } +} diff --git a/input/all/main.go b/input/all/main.go new file mode 100644 index 0000000..293a20f --- /dev/null +++ b/input/all/main.go @@ -0,0 +1,7 @@ +package all + +import ( + _ "dev.sum7.eu/genofire/logmania/input/journald_json" + _ "dev.sum7.eu/genofire/logmania/input/logrus" + _ "dev.sum7.eu/genofire/logmania/input/syslog" +) diff --git a/receive/journald_json/internal.go b/input/journald_json/internal.go similarity index 100% rename from receive/journald_json/internal.go rename to input/journald_json/internal.go diff --git a/input/journald_json/main.go b/input/journald_json/main.go new file mode 100644 index 0000000..1df8481 --- /dev/null +++ b/input/journald_json/main.go @@ -0,0 +1,77 @@ +package journald_json + +import ( + "net" + + "github.com/mitchellh/mapstructure" + log "github.com/sirupsen/logrus" + + "dev.sum7.eu/genofire/logmania/input" +) + +const inputType = "journald_json" + +var logger = log.WithField("input", inputType) + +type Input struct { + input.Input + exportChannel chan *log.Entry + serverSocket *net.UDPConn +} + +type InputConfig struct { + Type string `mapstructure:"type"` + Address string `mapstructure:"address"` +} + +func Init(configInterface interface{}, exportChannel chan *log.Entry) input.Input { + var config InputConfig + if err := mapstructure.Decode(configInterface, &config); err != nil { + logger.Warnf("not able to decode data: %s", err) + return nil + } + addr, err := net.ResolveUDPAddr(config.Type, config.Address) + ln, err := net.ListenUDP(config.Type, addr) + + if err != nil { + logger.Error("init ", err) + return nil + } + in := &Input{ + serverSocket: ln, + exportChannel: exportChannel, + } + + logger.Info("init") + + return in +} + +const maxDataGramSize = 8192 + +func (in *Input) Listen() { + logger.Info("listen") + for { + buf := make([]byte, maxDataGramSize) + n, src, err := in.serverSocket.ReadFromUDP(buf) + if err != nil { + logger.Warn("failed to accept connection", err) + continue + } + + raw := make([]byte, n) + copy(raw, buf) + entry := toLogEntry(raw, src.IP.String()) + if entry != nil { + in.exportChannel <- entry + } + } +} + +func (in *Input) Close() { + in.serverSocket.Close() +} + +func init() { + input.Add(inputType, Init) +} diff --git a/receive/logrus/client/main.go b/input/logrus/client/main.go similarity index 97% rename from receive/logrus/client/main.go rename to input/logrus/client/main.go index a7b8fa0..607fbfd 100644 --- a/receive/logrus/client/main.go +++ b/input/logrus/client/main.go @@ -4,7 +4,7 @@ import ( "io" websocketLib "dev.sum7.eu/genofire/golang-lib/websocket" - "dev.sum7.eu/genofire/logmania/receive/logrus" + "dev.sum7.eu/genofire/logmania/input/logrus" "github.com/google/uuid" "github.com/gorilla/websocket" log "github.com/sirupsen/logrus" diff --git a/receive/logrus/client/main_deadtest.go b/input/logrus/client/main_deadtest.go similarity index 100% rename from receive/logrus/client/main_deadtest.go rename to input/logrus/client/main_deadtest.go diff --git a/receive/logrus/main.go b/input/logrus/main.go similarity index 54% rename from receive/logrus/main.go rename to input/logrus/main.go index 6a24602..13f8e28 100644 --- a/receive/logrus/main.go +++ b/input/logrus/main.go @@ -6,28 +6,28 @@ import ( "dev.sum7.eu/genofire/golang-lib/websocket" log "github.com/sirupsen/logrus" - "dev.sum7.eu/genofire/logmania/lib" - "dev.sum7.eu/genofire/logmania/receive" + "dev.sum7.eu/genofire/logmania/input" ) +const inputType = "logrus" const WS_LOG_ENTRY = "log" -var logger = log.WithField("receive", "logrus") +var logger = log.WithField("input", inputType) -type Receiver struct { - receive.Receiver +type Input struct { + input.Input input chan *websocket.Message exportChannel chan *log.Entry serverSocket *websocket.Server } -func Init(config *lib.ReceiveConfig, exportChannel chan *log.Entry) receive.Receiver { +func Init(config interface{}, exportChannel chan *log.Entry) input.Input { inputMsg := make(chan *websocket.Message) ws := websocket.NewServer(inputMsg, websocket.NewSessionManager()) - http.HandleFunc("/receiver", ws.Handler) + http.HandleFunc("/input/"+inputType, ws.Handler) - recv := &Receiver{ + input := &Input{ input: inputMsg, serverSocket: ws, exportChannel: exportChannel, @@ -35,21 +35,21 @@ func Init(config *lib.ReceiveConfig, exportChannel chan *log.Entry) receive.Rece logger.Info("init") - return recv + return input } -func (rc *Receiver) Listen() { +func (in *Input) Listen() { logger.Info("listen") - for msg := range rc.input { + for msg := range in.input { if event, ok := msg.Body.(log.Entry); ok { - rc.exportChannel <- &event + in.exportChannel <- &event } } } -func (rc *Receiver) Close() { +func (in *Input) Close() { } func init() { - receive.AddReceiver("websocket", Init) + input.Add(inputType, Init) } diff --git a/input/main.go b/input/main.go new file mode 100644 index 0000000..363af54 --- /dev/null +++ b/input/main.go @@ -0,0 +1,18 @@ +package input + +import ( + log "github.com/sirupsen/logrus" +) + +var Register = make(map[string]Init) + +type Input interface { + Listen() + Close() +} + +type Init func(interface{}, chan *log.Entry) Input + +func Add(name string, init Init) { + Register[name] = init +} diff --git a/receive/syslog/internal.go b/input/syslog/internal.go similarity index 100% rename from receive/syslog/internal.go rename to input/syslog/internal.go diff --git a/receive/syslog/internal_test.go b/input/syslog/internal_test.go similarity index 100% rename from receive/syslog/internal_test.go rename to input/syslog/internal_test.go diff --git a/input/syslog/main.go b/input/syslog/main.go new file mode 100644 index 0000000..79b1f17 --- /dev/null +++ b/input/syslog/main.go @@ -0,0 +1,77 @@ +package syslog + +import ( + "net" + + "github.com/mitchellh/mapstructure" + log "github.com/sirupsen/logrus" + + "dev.sum7.eu/genofire/logmania/input" +) + +const inputType = "syslog" + +var logger = log.WithField("input", inputType) + +type Input struct { + input.Input + exportChannel chan *log.Entry + serverSocket *net.UDPConn +} + +type InputConfig struct { + Type string `mapstructure:"type"` + Address string `mapstructure:"address"` +} + +func Init(configInterface interface{}, exportChannel chan *log.Entry) input.Input { + var config InputConfig + if err := mapstructure.Decode(configInterface, &config); err != nil { + logger.Warnf("not able to decode data: %s", err) + return nil + } + addr, err := net.ResolveUDPAddr(config.Type, config.Address) + ln, err := net.ListenUDP(config.Type, addr) + + if err != nil { + logger.Error("init ", err) + return nil + } + input := &Input{ + serverSocket: ln, + exportChannel: exportChannel, + } + + logger.Info("init") + + return input +} + +const maxDataGramSize = 8192 + +func (in *Input) Listen() { + logger.Info("listen") + for { + buf := make([]byte, maxDataGramSize) + n, src, err := in.serverSocket.ReadFromUDP(buf) + if err != nil { + logger.Warn("failed to accept connection", err) + continue + } + + raw := make([]byte, n) + copy(raw, buf) + entry := toLogEntry(raw, src.IP.String()) + if entry != nil { + in.exportChannel <- entry + } + } +} + +func (in *Input) Close() { + in.serverSocket.Close() +} + +func init() { + input.Add(inputType, Init) +} diff --git a/lib/config.go b/lib/config.go index 0f80b3a..fca33f9 100644 --- a/lib/config.go +++ b/lib/config.go @@ -3,37 +3,10 @@ package lib // Struct of the configuration // e.g. under dev.sum7.eu/genofire/logmania/logmania_example.conf type Config struct { - Notify NotifyConfig `toml:"notify"` - Receive ReceiveConfig `toml:"receive"` - DB string `toml:"database"` -} - -type NotifyConfig struct { - AlertCheck Duration `toml:"alert_check"` - Console bool `toml:"debug"` - XMPP struct { - JID string `toml:"jid"` - Password string `toml:"password"` - Defaults map[string]bool `toml:"default"` - } `toml:"xmpp"` - Websocket struct { - Address string `toml:"address"` - Webroot string `toml:"webroot"` - Default string `toml:"default"` - } `toml:"websocket"` - File struct { - Directory string `toml:"directory"` - Default string `toml:"default"` - } `toml:"file"` -} - -type ReceiveConfig struct { - Syslog struct { - Type string `toml:"type"` - Address string `toml:"address"` - } `toml:"syslog"` - JournaldJSON struct { - Type string `toml:"type"` - Address string `toml:"address"` - } `toml:"journald_json"` + DB string `toml:"database"` + HTTPAddress string `toml:"http_address"` + Webroot string `toml:"webroot"` + AlertCheck Duration `toml:"alert_check"` + Output map[string]interface{} `toml:"output"` + Input map[string]interface{} `toml:"input"` } diff --git a/logmania_example.conf b/logmania_example.conf index 9ceed3b..62db18f 100644 --- a/logmania_example.conf +++ b/logmania_example.conf @@ -1,26 +1,37 @@ -[receive.syslog] +database = "/tmp/logmania.state.json" + +# have to be mote then a minute +alert_check = "5m" + +# webserver +http_address = ":8080" +webroot = "./webroot/" + +######### +# Input # +######### + +[input.syslog] type = "udp" address = ":10001" -[receive.journald_json] +[input.journald_json] type = "udp" address = ":10002" -[notify] -state_file = "/tmp/logmania.state.json" -debug = true +########## +# Output # +########## -[file] +[output.file] directory = "/tmp/" default = "raw" -[notify.xmpp] +[output.xmpp] jid = "user@example.org" password = "password" # if boolean is true for muc either user chat default = { "log-raw@conference.example.org" = true, "person@example.org" = false } -[notify.websocket] -address = ":8080" -webroot = "./webroot/" +[output.websocket] default = "raw" diff --git a/notify/all/internal.go b/notify/all/internal.go deleted file mode 100644 index ddb29be..0000000 --- a/notify/all/internal.go +++ /dev/null @@ -1,76 +0,0 @@ -package all - -import ( - log "github.com/sirupsen/logrus" - - "dev.sum7.eu/genofire/logmania/bot" - "dev.sum7.eu/genofire/logmania/database" - "dev.sum7.eu/genofire/logmania/lib" - "dev.sum7.eu/genofire/logmania/notify" -) - -var logger = log.WithField("notify", "all") - -type Notifier struct { - notify.Notifier - list []notify.Notifier - db *database.DB - channelNotify chan *log.Entry -} - -func Init(config *lib.NotifyConfig, db *database.DB, bot *bot.Bot) notify.Notifier { - var list []notify.Notifier - var defaults []*database.Notify - for _, init := range notify.NotifyRegister { - notify := init(config, db, bot) - - if notify == nil { - continue - } - list = append(list, notify) - def := notify.Default() - if def != nil { - continue - } - defaults = append(defaults, def...) - } - - db.DefaultNotify = defaults - - n := &Notifier{ - db: db, - list: list, - channelNotify: make(chan *log.Entry), - } - go n.sender() - return n -} - -func (n *Notifier) sender() { - for c := range n.channelNotify { - e, _, tos := n.db.SendTo(c) - for _, to := range tos { - send := false - for _, item := range n.list { - send = item.Send(e, to) - if send { - break - } - } - if !send { - logger.Warnf("notify not send to %s: [%s] %s", to.Address(), c.Level.String(), c.Message) - } - } - } -} - -func (n *Notifier) Send(e *log.Entry, to *database.Notify) bool { - n.channelNotify <- e - return true -} - -func (n *Notifier) Close() { - for _, item := range n.list { - item.Close() - } -} diff --git a/notify/all/main.go b/notify/all/main.go deleted file mode 100644 index b66a415..0000000 --- a/notify/all/main.go +++ /dev/null @@ -1,7 +0,0 @@ -package all - -import ( - _ "dev.sum7.eu/genofire/logmania/notify/file" - _ "dev.sum7.eu/genofire/logmania/notify/websocket" - _ "dev.sum7.eu/genofire/logmania/notify/xmpp" -) diff --git a/notify/main.go b/notify/main.go deleted file mode 100644 index 9013071..0000000 --- a/notify/main.go +++ /dev/null @@ -1,23 +0,0 @@ -package notify - -import ( - log "github.com/sirupsen/logrus" - - "dev.sum7.eu/genofire/logmania/bot" - "dev.sum7.eu/genofire/logmania/database" - "dev.sum7.eu/genofire/logmania/lib" -) - -var NotifyRegister []NotifyInit - -type Notifier interface { - Default() []*database.Notify - Send(entry *log.Entry, to *database.Notify) bool - Close() -} - -type NotifyInit func(*lib.NotifyConfig, *database.DB, *bot.Bot) Notifier - -func AddNotifier(n NotifyInit) { - NotifyRegister = append(NotifyRegister, n) -} diff --git a/output/all/internal.go b/output/all/internal.go new file mode 100644 index 0000000..d9915ef --- /dev/null +++ b/output/all/internal.go @@ -0,0 +1,82 @@ +package all + +import ( + log "github.com/sirupsen/logrus" + + "dev.sum7.eu/genofire/logmania/bot" + "dev.sum7.eu/genofire/logmania/database" + "dev.sum7.eu/genofire/logmania/output" +) + +var logger = log.WithField("notify", "all") + +type Output struct { + output.Output + list []output.Output + db *database.DB + channelNotify chan *log.Entry +} + +func Init(configInterface interface{}, db *database.DB, bot *bot.Bot) output.Output { + config := configInterface.(map[string]interface{}) + + var list []output.Output + var defaults []*database.Notify + for outputType, init := range output.Register { + configForItem := config[outputType] + if configForItem == nil { + log.Warnf("the input type '%s' has no configuration\n", outputType) + continue + } + notify := init(configForItem, db, bot) + + if notify == nil { + continue + } + list = append(list, notify) + def := notify.Default() + if def != nil { + continue + } + defaults = append(defaults, def...) + } + + db.DefaultNotify = defaults + + out := &Output{ + db: db, + list: list, + channelNotify: make(chan *log.Entry), + } + go out.sender() + return out +} + +func (out *Output) sender() { + for c := range out.channelNotify { + e, _, tos := out.db.SendTo(c) + for _, to := range tos { + send := false + for _, item := range out.list { + send = item.Send(e, to) + if send { + break + } + } + if !send { + logger.Warnf("notify not send to %s: [%s] %s", to.Address(), c.Level.String(), c.Message) + } + } + } +} + +func (out *Output) Send(e *log.Entry, to *database.Notify) bool { + out.channelNotify <- e + return true +} + +func (out *Output) Close() { + for _, item := range out.list { + item.Close() + } +} diff --git a/output/all/main.go b/output/all/main.go new file mode 100644 index 0000000..ca7e192 --- /dev/null +++ b/output/all/main.go @@ -0,0 +1,7 @@ +package all + +import ( + _ "dev.sum7.eu/genofire/logmania/output/file" + _ "dev.sum7.eu/genofire/logmania/output/websocket" + _ "dev.sum7.eu/genofire/logmania/output/xmpp" +) diff --git a/notify/file/main.go b/output/file/main.go similarity index 50% rename from notify/file/main.go rename to output/file/main.go index 0868f31..98f687b 100644 --- a/notify/file/main.go +++ b/output/file/main.go @@ -5,82 +5,92 @@ import ( "path" "regexp" + "github.com/mitchellh/mapstructure" log "github.com/sirupsen/logrus" "dev.sum7.eu/genofire/logmania/bot" "dev.sum7.eu/genofire/logmania/database" - "dev.sum7.eu/genofire/logmania/lib" - "dev.sum7.eu/genofire/logmania/notify" + "dev.sum7.eu/genofire/logmania/output" ) const ( proto = "file" ) -var logger = log.WithField("notify", proto) +var logger = log.WithField("output", proto) -type Notifier struct { - notify.Notifier +type Output struct { + output.Output defaults []*database.Notify files map[string]*os.File formatter log.Formatter path string } -func Init(config *lib.NotifyConfig, db *database.DB, bot *bot.Bot) notify.Notifier { - if config.File.Directory == "" { +type OutputConfig struct { + Directory string `mapstructure:"directory"` + Default string `mapstructure:"default"` +} + +func Init(configInterface interface{}, db *database.DB, bot *bot.Bot) output.Output { + var config OutputConfig + if err := mapstructure.Decode(configInterface, &config); err != nil { + logger.Warnf("not able to decode data: %s", err) return nil } - logger.WithField("directory", config.File.Directory).Info("startup") + if config.Directory == "" { + return nil + } + logger.WithField("directory", config.Directory).Info("startup") var defaults []*database.Notify - if config.File.Default != "" { + if config.Default != "" { defaults = append(defaults, &database.Notify{ Protocol: proto, - To: config.File.Default, + To: config.Default, }) } - return &Notifier{ + return &Output{ defaults: defaults, files: make(map[string]*os.File), formatter: &log.JSONFormatter{}, - path: config.File.Directory, + path: config.Directory, } } -func (n *Notifier) Default() []*database.Notify { - return n.defaults +func (out *Output) Default() []*database.Notify { + return out.defaults } -func (n *Notifier) getFile(name string) *os.File { - if file, ok := n.files[name]; ok { +func (out *Output) getFile(name string) *os.File { + if file, ok := out.files[name]; ok { return file } if m, err := regexp.MatchString(`^[0-9A-Za-z_-]*$`, name); err != nil || !m { logger.Errorf("not allowed to use '%s:%s'", proto, name) return nil } - filename := path.Join(n.path, name+".json") + filename := path.Join(out.path, name+".json") file, err := os.OpenFile(filename, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600) if err != nil { logger.Errorf("could not open file: %s", err.Error()) return nil } - n.files[name] = file + out.files[name] = file return file } -func (n *Notifier) Send(e *log.Entry, to *database.Notify) bool { +func (out *Output) Send(e *log.Entry, to *database.Notify) bool { if to.Protocol != proto { return false } - byteText, err := n.formatter.Format(e) + byteText, err := out.formatter.Format(e) if err != nil { return false } text := to.RunReplace(string(byteText)) - file := n.getFile(to.To) + file := out.getFile(to.To) if file == nil { return false } @@ -89,5 +99,5 @@ func (n *Notifier) Send(e *log.Entry, to *database.Notify) bool { } func init() { - notify.AddNotifier(Init) + output.Add(proto, Init) } diff --git a/output/main.go b/output/main.go new file mode 100644 index 0000000..77b528d --- /dev/null +++ b/output/main.go @@ -0,0 +1,22 @@ +package output + +import ( + log "github.com/sirupsen/logrus" + + "dev.sum7.eu/genofire/logmania/bot" + "dev.sum7.eu/genofire/logmania/database" +) + +var Register = make(map[string]Init) + +type Output interface { + Default() []*database.Notify + Send(entry *log.Entry, to *database.Notify) bool + Close() +} + +type Init func(interface{}, *database.DB, *bot.Bot) Output + +func Add(name string, init Init) { + Register[name] = init +} diff --git a/notify/websocket/main.go b/output/websocket/main.go similarity index 56% rename from notify/websocket/main.go rename to output/websocket/main.go index 7a75bca..67a0ac4 100644 --- a/notify/websocket/main.go +++ b/output/websocket/main.go @@ -4,33 +4,41 @@ import ( "net/http" "dev.sum7.eu/genofire/golang-lib/websocket" + "github.com/mitchellh/mapstructure" log "github.com/sirupsen/logrus" "dev.sum7.eu/genofire/logmania/bot" "dev.sum7.eu/genofire/logmania/database" - "dev.sum7.eu/genofire/logmania/lib" - "dev.sum7.eu/genofire/logmania/notify" + "dev.sum7.eu/genofire/logmania/output" ) const ( proto = "ws" ) -var logger = log.WithField("notify", proto) +var logger = log.WithField("output", proto) -type Notifier struct { - notify.Notifier +type Output struct { + output.Output defaults []*database.Notify ws *websocket.Server formatter log.Formatter } -func Init(config *lib.NotifyConfig, db *database.DB, bot *bot.Bot) notify.Notifier { +type OutputConfig struct { + Default string `mapstructure:"default"` +} + +func Init(configInterface interface{}, db *database.DB, bot *bot.Bot) output.Output { + var config OutputConfig + if err := mapstructure.Decode(configInterface, &config); err != nil { + logger.Warnf("not able to decode data: %s", err) + return nil + } inputMSG := make(chan *websocket.Message) ws := websocket.NewServer(inputMSG, nil) - http.HandleFunc("/ws", ws.Handler) - http.Handle("/", http.FileServer(http.Dir(config.Websocket.Webroot))) + http.HandleFunc("/output/ws", ws.Handler) go func() { for msg := range inputMSG { @@ -44,26 +52,16 @@ func Init(config *lib.NotifyConfig, db *database.DB, bot *bot.Bot) notify.Notifi } }() - srv := &http.Server{ - Addr: config.Websocket.Address, - } - - go func() { - if err := srv.ListenAndServe(); err != nil { - panic(err) - } - }() - - logger.WithField("http-socket", config.Websocket.Address).Info("startup") + logger.Info("startup") var defaults []*database.Notify - if config.Websocket.Default != "" { + if config.Default != "" { defaults = append(defaults, &database.Notify{ Protocol: proto, - To: config.Websocket.Default, + To: config.Default, }) } - return &Notifier{ + return &Output{ defaults: defaults, ws: ws, formatter: &log.TextFormatter{ @@ -72,16 +70,16 @@ func Init(config *lib.NotifyConfig, db *database.DB, bot *bot.Bot) notify.Notifi } } -func (n *Notifier) Default() []*database.Notify { - return n.defaults +func (out *Output) Default() []*database.Notify { + return out.defaults } -func (n *Notifier) Send(e *log.Entry, to *database.Notify) bool { +func (out *Output) Send(e *log.Entry, to *database.Notify) bool { if to.Protocol != proto { return false } - n.ws.SendAll(&websocket.Message{ + out.ws.SendAll(&websocket.Message{ Subject: to.Address(), Body: &log.Entry{ Buffer: e.Buffer, @@ -95,9 +93,9 @@ func (n *Notifier) Send(e *log.Entry, to *database.Notify) bool { return true } -func (n *Notifier) Close() { +func (out *Output) Close() { } func init() { - notify.AddNotifier(Init) + output.Add("websocket", Init) } diff --git a/notify/xmpp/main.go b/output/xmpp/main.go similarity index 76% rename from notify/xmpp/main.go rename to output/xmpp/main.go index eaed47d..3a04be5 100644 --- a/notify/xmpp/main.go +++ b/output/xmpp/main.go @@ -6,12 +6,12 @@ import ( xmpp_client "dev.sum7.eu/genofire/yaja/client" xmpp "dev.sum7.eu/genofire/yaja/xmpp" "dev.sum7.eu/genofire/yaja/xmpp/base" + "github.com/mitchellh/mapstructure" log "github.com/sirupsen/logrus" "dev.sum7.eu/genofire/logmania/bot" "dev.sum7.eu/genofire/logmania/database" - "dev.sum7.eu/genofire/logmania/lib" - "dev.sum7.eu/genofire/logmania/notify" + "dev.sum7.eu/genofire/logmania/output" ) const ( @@ -20,20 +20,31 @@ const ( nickname = "logmania" ) -var logger = log.WithField("notify", proto) +var logger = log.WithField("output", proto) -type Notifier struct { - notify.Notifier +type Output struct { + output.Output defaults []*database.Notify client *xmpp_client.Client channels map[string]bool formatter log.Formatter } -func Init(config *lib.NotifyConfig, db *database.DB, bot *bot.Bot) notify.Notifier { +type OutputConfig struct { + JID string `mapstructure:"jid"` + Password string `mapstructure:"password"` + Defaults map[string]bool `mapstructure:"default"` +} + +func Init(configInterface interface{}, db *database.DB, bot *bot.Bot) output.Output { + var config OutputConfig + if err := mapstructure.Decode(configInterface, &config); err != nil { + logger.Warnf("not able to decode data: %s", err) + return nil + } channels := make(map[string]bool) - client, err := xmpp_client.NewClient(xmppbase.NewJID(config.XMPP.JID), config.XMPP.Password) + client, err := xmpp_client.NewClient(xmppbase.NewJID(config.JID), config.Password) if err != nil { logger.Error(err) return nil @@ -42,7 +53,7 @@ func Init(config *lib.NotifyConfig, db *database.DB, bot *bot.Bot) notify.Notifi for { if err := client.Start(); err != nil { log.Warn("close connection, try reconnect") - client.Connect(config.XMPP.Password) + client.Connect(config.Password) } else { log.Warn("closed connection") return @@ -130,10 +141,10 @@ func Init(config *lib.NotifyConfig, db *database.DB, bot *bot.Bot) notify.Notifi } } - logger.WithField("jid", config.XMPP.JID).Info("startup") + logger.WithField("jid", config.JID).Info("startup") var defaults []*database.Notify - for to, muc := range config.XMPP.Defaults { + for to, muc := range config.Defaults { def := &database.Notify{ Protocol: proto, To: to, @@ -143,7 +154,7 @@ func Init(config *lib.NotifyConfig, db *database.DB, bot *bot.Bot) notify.Notifi } defaults = append(defaults, def) } - return &Notifier{ + return &Output{ channels: channels, defaults: defaults, client: client, @@ -153,31 +164,31 @@ func Init(config *lib.NotifyConfig, db *database.DB, bot *bot.Bot) notify.Notifi } } -func (n *Notifier) Default() []*database.Notify { - return n.defaults +func (out *Output) Default() []*database.Notify { + return out.defaults } -func (n *Notifier) Send(e *log.Entry, to *database.Notify) bool { - textByte, err := n.formatter.Format(e) +func (out *Output) Send(e *log.Entry, to *database.Notify) bool { + textByte, err := out.formatter.Format(e) if err != nil { logger.Error("during format notify", err) return false } text := strings.TrimRight(to.RunReplace(string(textByte)), "\n") if to.Protocol == protoGroup { - if _, ok := n.channels[to.To]; ok { + if _, ok := out.channels[to.To]; ok { toJID := xmppbase.NewJID(to.To) toJID.Resource = nickname - err := n.client.Send(&xmpp.PresenceClient{ + err := out.client.Send(&xmpp.PresenceClient{ To: toJID, }) if err != nil { logger.Error("xmpp could not join ", toJID.String(), " error:", err) } else { - n.channels[to.To] = true + out.channels[to.To] = true } } - err := n.client.Send(&xmpp.MessageClient{ + err := out.client.Send(&xmpp.MessageClient{ Type: xmpp.MessageTypeGroupchat, To: xmppbase.NewJID(to.To), Body: text, @@ -188,7 +199,7 @@ func (n *Notifier) Send(e *log.Entry, to *database.Notify) bool { return true } if to.Protocol == proto { - err := n.client.Send(&xmpp.MessageClient{ + err := out.client.Send(&xmpp.MessageClient{ Type: xmpp.MessageTypeChat, To: xmppbase.NewJID(to.To), Body: text, @@ -201,11 +212,11 @@ func (n *Notifier) Send(e *log.Entry, to *database.Notify) bool { return false } -func (n *Notifier) Close() { - for jid := range n.channels { +func (out *Output) Close() { + for jid := range out.channels { toJID := xmppbase.NewJID(jid) toJID.Resource = nickname - err := n.client.Send(&xmpp.PresenceClient{ + err := out.client.Send(&xmpp.PresenceClient{ To: toJID, Type: xmpp.PresenceTypeUnavailable, }) @@ -213,9 +224,9 @@ func (n *Notifier) Close() { logger.Error("xmpp could not leave ", toJID.String(), " error:", err) } } - n.client.Close() + out.client.Close() } func init() { - notify.AddNotifier(Init) + output.Add(proto, Init) } diff --git a/receive/all/internal.go b/receive/all/internal.go deleted file mode 100644 index 7f0d27f..0000000 --- a/receive/all/internal.go +++ /dev/null @@ -1,40 +0,0 @@ -package all - -import ( - log "github.com/sirupsen/logrus" - - "dev.sum7.eu/genofire/logmania/lib" - "dev.sum7.eu/genofire/logmania/receive" -) - -type Receiver struct { - receive.Receiver - list []receive.Receiver -} - -func Init(config *lib.ReceiveConfig, exportChannel chan *log.Entry) receive.Receiver { - var list []receive.Receiver - for _, init := range receive.Register { - receiver := init(config, exportChannel) - - if receiver == nil { - continue - } - list = append(list, receiver) - } - return &Receiver{ - list: list, - } -} - -func (r *Receiver) Listen() { - for _, item := range r.list { - go item.Listen() - } -} - -func (r *Receiver) Close() { - for _, item := range r.list { - item.Close() - } -} diff --git a/receive/all/main.go b/receive/all/main.go deleted file mode 100644 index cc2c154..0000000 --- a/receive/all/main.go +++ /dev/null @@ -1,7 +0,0 @@ -package all - -import ( - _ "dev.sum7.eu/genofire/logmania/receive/journald_json" - _ "dev.sum7.eu/genofire/logmania/receive/logrus" - _ "dev.sum7.eu/genofire/logmania/receive/syslog" -) diff --git a/receive/journald_json/main.go b/receive/journald_json/main.go deleted file mode 100644 index 28a1bdd..0000000 --- a/receive/journald_json/main.go +++ /dev/null @@ -1,65 +0,0 @@ -package journald_json - -import ( - "net" - - log "github.com/sirupsen/logrus" - - "dev.sum7.eu/genofire/logmania/lib" - "dev.sum7.eu/genofire/logmania/receive" -) - -var logger = log.WithField("receive", "journald_json") - -type Receiver struct { - receive.Receiver - exportChannel chan *log.Entry - serverSocket *net.UDPConn -} - -func Init(config *lib.ReceiveConfig, exportChannel chan *log.Entry) receive.Receiver { - addr, err := net.ResolveUDPAddr(config.JournaldJSON.Type, config.JournaldJSON.Address) - ln, err := net.ListenUDP(config.JournaldJSON.Type, addr) - - if err != nil { - logger.Error("init ", err) - return nil - } - recv := &Receiver{ - serverSocket: ln, - exportChannel: exportChannel, - } - - logger.Info("init") - - return recv -} - -const maxDataGramSize = 8192 - -func (rc *Receiver) Listen() { - logger.Info("listen") - for { - buf := make([]byte, maxDataGramSize) - n, src, err := rc.serverSocket.ReadFromUDP(buf) - if err != nil { - logger.Warn("failed to accept connection", err) - continue - } - - raw := make([]byte, n) - copy(raw, buf) - entry := toLogEntry(raw, src.IP.String()) - if entry != nil { - rc.exportChannel <- entry - } - } -} - -func (rc *Receiver) Close() { - rc.serverSocket.Close() -} - -func init() { - receive.AddReceiver("journald_json", Init) -} diff --git a/receive/main.go b/receive/main.go deleted file mode 100644 index 2206d87..0000000 --- a/receive/main.go +++ /dev/null @@ -1,19 +0,0 @@ -package receive - -import ( - "dev.sum7.eu/genofire/logmania/lib" - log "github.com/sirupsen/logrus" -) - -var Register = make(map[string]ReceiverInit) - -type Receiver interface { - Listen() - Close() -} - -type ReceiverInit func(*lib.ReceiveConfig, chan *log.Entry) Receiver - -func AddReceiver(name string, n ReceiverInit) { - Register[name] = n -} diff --git a/receive/syslog/main.go b/receive/syslog/main.go deleted file mode 100644 index 47bfb15..0000000 --- a/receive/syslog/main.go +++ /dev/null @@ -1,65 +0,0 @@ -package syslog - -import ( - "net" - - log "github.com/sirupsen/logrus" - - "dev.sum7.eu/genofire/logmania/lib" - "dev.sum7.eu/genofire/logmania/receive" -) - -var logger = log.WithField("receive", "syslog") - -type Receiver struct { - receive.Receiver - exportChannel chan *log.Entry - serverSocket *net.UDPConn -} - -func Init(config *lib.ReceiveConfig, exportChannel chan *log.Entry) receive.Receiver { - addr, err := net.ResolveUDPAddr(config.Syslog.Type, config.Syslog.Address) - ln, err := net.ListenUDP(config.Syslog.Type, addr) - - if err != nil { - logger.Error("init ", err) - return nil - } - recv := &Receiver{ - serverSocket: ln, - exportChannel: exportChannel, - } - - logger.Info("init") - - return recv -} - -const maxDataGramSize = 8192 - -func (rc *Receiver) Listen() { - logger.Info("listen") - for { - buf := make([]byte, maxDataGramSize) - n, src, err := rc.serverSocket.ReadFromUDP(buf) - if err != nil { - logger.Warn("failed to accept connection", err) - continue - } - - raw := make([]byte, n) - copy(raw, buf) - entry := toLogEntry(raw, src.IP.String()) - if entry != nil { - rc.exportChannel <- entry - } - } -} - -func (rc *Receiver) Close() { - rc.serverSocket.Close() -} - -func init() { - receive.AddReceiver("syslog", Init) -}