[TASK] refactory to input and output

This commit is contained in:
Martin/Geno 2018-09-05 01:53:23 +02:00
parent d3318177aa
commit a9039fa290
No known key found for this signature in database
GPG Key ID: 9D7D3C6BFF600C6A
29 changed files with 511 additions and 456 deletions

View File

@ -1,11 +1,13 @@
package cmd package cmd
import ( import (
"net/http"
"os" "os"
"os/signal" "os/signal"
"syscall" "syscall"
"time" "time"
"github.com/NYTimes/gziphandler"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -13,11 +15,11 @@ import (
"dev.sum7.eu/genofire/golang-lib/worker" "dev.sum7.eu/genofire/golang-lib/worker"
"dev.sum7.eu/genofire/logmania/bot" "dev.sum7.eu/genofire/logmania/bot"
"dev.sum7.eu/genofire/logmania/database" "dev.sum7.eu/genofire/logmania/database"
"dev.sum7.eu/genofire/logmania/input"
allInput "dev.sum7.eu/genofire/logmania/input/all"
"dev.sum7.eu/genofire/logmania/lib" "dev.sum7.eu/genofire/logmania/lib"
"dev.sum7.eu/genofire/logmania/notify" "dev.sum7.eu/genofire/logmania/output"
allNotify "dev.sum7.eu/genofire/logmania/notify/all" allOutput "dev.sum7.eu/genofire/logmania/output/all"
"dev.sum7.eu/genofire/logmania/receive"
allReceiver "dev.sum7.eu/genofire/logmania/receive/all"
) )
var ( var (
@ -25,8 +27,8 @@ var (
config *lib.Config config *lib.Config
db *database.DB db *database.DB
dbSaveWorker *worker.Worker dbSaveWorker *worker.Worker
notifier notify.Notifier out output.Output
receiver receive.Receiver in input.Input
logChannel chan *log.Entry logChannel chan *log.Entry
logmaniaBot *bot.Bot logmaniaBot *bot.Bot
) )
@ -51,24 +53,40 @@ var serverCmd = &cobra.Command{
logmaniaBot = bot.NewBot(db) logmaniaBot = bot.NewBot(db)
notifier = allNotify.Init(&config.Notify, db, logmaniaBot) out = allOutput.Init(config.Output, db, logmaniaBot)
logChannel = make(chan *log.Entry) logChannel = make(chan *log.Entry)
go func() { go func() {
for a := range logChannel { for a := range logChannel {
notifier.Send(a, nil) out.Send(a, nil)
} }
}() }()
if config.Notify.AlertCheck.Duration > time.Duration(time.Second) { if config.AlertCheck.Duration > time.Duration(time.Second) {
go db.Alert(config.Notify.AlertCheck.Duration, notifier.Send) go db.Alert(config.AlertCheck.Duration, out.Send)
} }
log.Info("starting logmania") log.Info("starting logmania")
receiver = allReceiver.Init(&config.Receive, logChannel) if config.HTTPAddress != "" {
if config.Webroot != "" {
http.Handle("/", gziphandler.GzipHandler(http.FileServer(http.Dir(config.Webroot))))
}
go receiver.Listen() srv := &http.Server{
Addr: config.HTTPAddress,
}
go func() {
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
log.Panic(err)
}
}()
}
in = allInput.Init(config.Input, logChannel)
go in.Listen()
// Wait for system signal // Wait for system signal
sigchan := make(chan os.Signal, 1) sigchan := make(chan os.Signal, 1)
@ -92,8 +110,8 @@ var serverCmd = &cobra.Command{
func quit() { func quit() {
dbSaveWorker.Close() dbSaveWorker.Close()
file.SaveJSON(config.DB, db) file.SaveJSON(config.DB, db)
receiver.Close() in.Close()
notifier.Close() out.Close()
log.Info("quit of logmania") log.Info("quit of logmania")
os.Exit(0) os.Exit(0)
} }
@ -106,12 +124,12 @@ func reload() {
log.Errorf("reload: could not load '%s' for new configuration. Skip reload.", configPath) log.Errorf("reload: could not load '%s' for new configuration. Skip reload.", configPath)
return return
} }
receiver.Close() in.Close()
receiver = allReceiver.Init(&config.Receive, logChannel) in = allInput.Init(config.Input, logChannel)
go receiver.Listen() go in.Listen()
notifier.Close() out.Close()
notifier = allNotify.Init(&config.Notify, db, logmaniaBot) out = allOutput.Init(config.Output, db, logmaniaBot)
} }
func init() { func init() {

View File

@ -76,10 +76,10 @@ func (db *DB) Alert(expired time.Duration, send func(e *log.Entry, n *Notify) bo
for range c { for range c {
now := time.Now() now := time.Now()
for _, h := range db.Hosts { for _, h := range db.Hosts {
if !h.Lastseen.Before(now.Add(expired * -2)) { if h.Lastseen.Before(now.Add(expired * -1)) {
continue continue
} }
if h.LastseenNotify.Year() <= 1 && h.Lastseen.Before(h.LastseenNotify) { if h.Lastseen.After(h.LastseenNotify) {
continue continue
} }
h.LastseenNotify = now h.LastseenNotify = now

46
input/all/internal.go Normal file
View File

@ -0,0 +1,46 @@
package all
import (
log "github.com/sirupsen/logrus"
"dev.sum7.eu/genofire/logmania/input"
)
type Input struct {
input.Input
list []input.Input
}
func Init(configInterface interface{}, exportChannel chan *log.Entry) input.Input {
config := configInterface.(map[string]interface{})
var list []input.Input
for inputType, init := range input.Register {
configForItem := config[inputType]
if configForItem == nil {
log.Warnf("the input type '%s' has no configuration\n", inputType)
continue
}
input := init(configForItem, exportChannel)
if input == nil {
continue
}
list = append(list, input)
}
return &Input{
list: list,
}
}
func (in *Input) Listen() {
for _, item := range in.list {
go item.Listen()
}
}
func (in *Input) Close() {
for _, item := range in.list {
item.Close()
}
}

7
input/all/main.go Normal file
View File

@ -0,0 +1,7 @@
package all
import (
_ "dev.sum7.eu/genofire/logmania/input/journald_json"
_ "dev.sum7.eu/genofire/logmania/input/logrus"
_ "dev.sum7.eu/genofire/logmania/input/syslog"
)

View File

@ -0,0 +1,77 @@
package journald_json
import (
"net"
"github.com/mitchellh/mapstructure"
log "github.com/sirupsen/logrus"
"dev.sum7.eu/genofire/logmania/input"
)
const inputType = "journald_json"
var logger = log.WithField("input", inputType)
type Input struct {
input.Input
exportChannel chan *log.Entry
serverSocket *net.UDPConn
}
type InputConfig struct {
Type string `mapstructure:"type"`
Address string `mapstructure:"address"`
}
func Init(configInterface interface{}, exportChannel chan *log.Entry) input.Input {
var config InputConfig
if err := mapstructure.Decode(configInterface, &config); err != nil {
logger.Warnf("not able to decode data: %s", err)
return nil
}
addr, err := net.ResolveUDPAddr(config.Type, config.Address)
ln, err := net.ListenUDP(config.Type, addr)
if err != nil {
logger.Error("init ", err)
return nil
}
in := &Input{
serverSocket: ln,
exportChannel: exportChannel,
}
logger.Info("init")
return in
}
const maxDataGramSize = 8192
func (in *Input) Listen() {
logger.Info("listen")
for {
buf := make([]byte, maxDataGramSize)
n, src, err := in.serverSocket.ReadFromUDP(buf)
if err != nil {
logger.Warn("failed to accept connection", err)
continue
}
raw := make([]byte, n)
copy(raw, buf)
entry := toLogEntry(raw, src.IP.String())
if entry != nil {
in.exportChannel <- entry
}
}
}
func (in *Input) Close() {
in.serverSocket.Close()
}
func init() {
input.Add(inputType, Init)
}

View File

@ -4,7 +4,7 @@ import (
"io" "io"
websocketLib "dev.sum7.eu/genofire/golang-lib/websocket" websocketLib "dev.sum7.eu/genofire/golang-lib/websocket"
"dev.sum7.eu/genofire/logmania/receive/logrus" "dev.sum7.eu/genofire/logmania/input/logrus"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"

View File

@ -6,28 +6,28 @@ import (
"dev.sum7.eu/genofire/golang-lib/websocket" "dev.sum7.eu/genofire/golang-lib/websocket"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"dev.sum7.eu/genofire/logmania/lib" "dev.sum7.eu/genofire/logmania/input"
"dev.sum7.eu/genofire/logmania/receive"
) )
const inputType = "logrus"
const WS_LOG_ENTRY = "log" const WS_LOG_ENTRY = "log"
var logger = log.WithField("receive", "logrus") var logger = log.WithField("input", inputType)
type Receiver struct { type Input struct {
receive.Receiver input.Input
input chan *websocket.Message input chan *websocket.Message
exportChannel chan *log.Entry exportChannel chan *log.Entry
serverSocket *websocket.Server serverSocket *websocket.Server
} }
func Init(config *lib.ReceiveConfig, exportChannel chan *log.Entry) receive.Receiver { func Init(config interface{}, exportChannel chan *log.Entry) input.Input {
inputMsg := make(chan *websocket.Message) inputMsg := make(chan *websocket.Message)
ws := websocket.NewServer(inputMsg, websocket.NewSessionManager()) ws := websocket.NewServer(inputMsg, websocket.NewSessionManager())
http.HandleFunc("/receiver", ws.Handler) http.HandleFunc("/input/"+inputType, ws.Handler)
recv := &Receiver{ input := &Input{
input: inputMsg, input: inputMsg,
serverSocket: ws, serverSocket: ws,
exportChannel: exportChannel, exportChannel: exportChannel,
@ -35,21 +35,21 @@ func Init(config *lib.ReceiveConfig, exportChannel chan *log.Entry) receive.Rece
logger.Info("init") logger.Info("init")
return recv return input
} }
func (rc *Receiver) Listen() { func (in *Input) Listen() {
logger.Info("listen") logger.Info("listen")
for msg := range rc.input { for msg := range in.input {
if event, ok := msg.Body.(log.Entry); ok { if event, ok := msg.Body.(log.Entry); ok {
rc.exportChannel <- &event in.exportChannel <- &event
} }
} }
} }
func (rc *Receiver) Close() { func (in *Input) Close() {
} }
func init() { func init() {
receive.AddReceiver("websocket", Init) input.Add(inputType, Init)
} }

18
input/main.go Normal file
View File

@ -0,0 +1,18 @@
package input
import (
log "github.com/sirupsen/logrus"
)
var Register = make(map[string]Init)
type Input interface {
Listen()
Close()
}
type Init func(interface{}, chan *log.Entry) Input
func Add(name string, init Init) {
Register[name] = init
}

77
input/syslog/main.go Normal file
View File

@ -0,0 +1,77 @@
package syslog
import (
"net"
"github.com/mitchellh/mapstructure"
log "github.com/sirupsen/logrus"
"dev.sum7.eu/genofire/logmania/input"
)
const inputType = "syslog"
var logger = log.WithField("input", inputType)
type Input struct {
input.Input
exportChannel chan *log.Entry
serverSocket *net.UDPConn
}
type InputConfig struct {
Type string `mapstructure:"type"`
Address string `mapstructure:"address"`
}
func Init(configInterface interface{}, exportChannel chan *log.Entry) input.Input {
var config InputConfig
if err := mapstructure.Decode(configInterface, &config); err != nil {
logger.Warnf("not able to decode data: %s", err)
return nil
}
addr, err := net.ResolveUDPAddr(config.Type, config.Address)
ln, err := net.ListenUDP(config.Type, addr)
if err != nil {
logger.Error("init ", err)
return nil
}
input := &Input{
serverSocket: ln,
exportChannel: exportChannel,
}
logger.Info("init")
return input
}
const maxDataGramSize = 8192
func (in *Input) Listen() {
logger.Info("listen")
for {
buf := make([]byte, maxDataGramSize)
n, src, err := in.serverSocket.ReadFromUDP(buf)
if err != nil {
logger.Warn("failed to accept connection", err)
continue
}
raw := make([]byte, n)
copy(raw, buf)
entry := toLogEntry(raw, src.IP.String())
if entry != nil {
in.exportChannel <- entry
}
}
}
func (in *Input) Close() {
in.serverSocket.Close()
}
func init() {
input.Add(inputType, Init)
}

View File

@ -3,37 +3,10 @@ package lib
// Struct of the configuration // Struct of the configuration
// e.g. under dev.sum7.eu/genofire/logmania/logmania_example.conf // e.g. under dev.sum7.eu/genofire/logmania/logmania_example.conf
type Config struct { type Config struct {
Notify NotifyConfig `toml:"notify"`
Receive ReceiveConfig `toml:"receive"`
DB string `toml:"database"` DB string `toml:"database"`
} HTTPAddress string `toml:"http_address"`
type NotifyConfig struct {
AlertCheck Duration `toml:"alert_check"`
Console bool `toml:"debug"`
XMPP struct {
JID string `toml:"jid"`
Password string `toml:"password"`
Defaults map[string]bool `toml:"default"`
} `toml:"xmpp"`
Websocket struct {
Address string `toml:"address"`
Webroot string `toml:"webroot"` Webroot string `toml:"webroot"`
Default string `toml:"default"` AlertCheck Duration `toml:"alert_check"`
} `toml:"websocket"` Output map[string]interface{} `toml:"output"`
File struct { Input map[string]interface{} `toml:"input"`
Directory string `toml:"directory"`
Default string `toml:"default"`
} `toml:"file"`
}
type ReceiveConfig struct {
Syslog struct {
Type string `toml:"type"`
Address string `toml:"address"`
} `toml:"syslog"`
JournaldJSON struct {
Type string `toml:"type"`
Address string `toml:"address"`
} `toml:"journald_json"`
} }

View File

@ -1,26 +1,37 @@
[receive.syslog] database = "/tmp/logmania.state.json"
# have to be mote then a minute
alert_check = "5m"
# webserver
http_address = ":8080"
webroot = "./webroot/"
#########
# Input #
#########
[input.syslog]
type = "udp" type = "udp"
address = ":10001" address = ":10001"
[receive.journald_json] [input.journald_json]
type = "udp" type = "udp"
address = ":10002" address = ":10002"
[notify] ##########
state_file = "/tmp/logmania.state.json" # Output #
debug = true ##########
[file] [output.file]
directory = "/tmp/" directory = "/tmp/"
default = "raw" default = "raw"
[notify.xmpp] [output.xmpp]
jid = "user@example.org" jid = "user@example.org"
password = "password" password = "password"
# if boolean is true for muc either user chat # if boolean is true for muc either user chat
default = { "log-raw@conference.example.org" = true, "person@example.org" = false } default = { "log-raw@conference.example.org" = true, "person@example.org" = false }
[notify.websocket] [output.websocket]
address = ":8080"
webroot = "./webroot/"
default = "raw" default = "raw"

View File

@ -1,76 +0,0 @@
package all
import (
log "github.com/sirupsen/logrus"
"dev.sum7.eu/genofire/logmania/bot"
"dev.sum7.eu/genofire/logmania/database"
"dev.sum7.eu/genofire/logmania/lib"
"dev.sum7.eu/genofire/logmania/notify"
)
var logger = log.WithField("notify", "all")
type Notifier struct {
notify.Notifier
list []notify.Notifier
db *database.DB
channelNotify chan *log.Entry
}
func Init(config *lib.NotifyConfig, db *database.DB, bot *bot.Bot) notify.Notifier {
var list []notify.Notifier
var defaults []*database.Notify
for _, init := range notify.NotifyRegister {
notify := init(config, db, bot)
if notify == nil {
continue
}
list = append(list, notify)
def := notify.Default()
if def != nil {
continue
}
defaults = append(defaults, def...)
}
db.DefaultNotify = defaults
n := &Notifier{
db: db,
list: list,
channelNotify: make(chan *log.Entry),
}
go n.sender()
return n
}
func (n *Notifier) sender() {
for c := range n.channelNotify {
e, _, tos := n.db.SendTo(c)
for _, to := range tos {
send := false
for _, item := range n.list {
send = item.Send(e, to)
if send {
break
}
}
if !send {
logger.Warnf("notify not send to %s: [%s] %s", to.Address(), c.Level.String(), c.Message)
}
}
}
}
func (n *Notifier) Send(e *log.Entry, to *database.Notify) bool {
n.channelNotify <- e
return true
}
func (n *Notifier) Close() {
for _, item := range n.list {
item.Close()
}
}

View File

@ -1,7 +0,0 @@
package all
import (
_ "dev.sum7.eu/genofire/logmania/notify/file"
_ "dev.sum7.eu/genofire/logmania/notify/websocket"
_ "dev.sum7.eu/genofire/logmania/notify/xmpp"
)

View File

@ -1,23 +0,0 @@
package notify
import (
log "github.com/sirupsen/logrus"
"dev.sum7.eu/genofire/logmania/bot"
"dev.sum7.eu/genofire/logmania/database"
"dev.sum7.eu/genofire/logmania/lib"
)
var NotifyRegister []NotifyInit
type Notifier interface {
Default() []*database.Notify
Send(entry *log.Entry, to *database.Notify) bool
Close()
}
type NotifyInit func(*lib.NotifyConfig, *database.DB, *bot.Bot) Notifier
func AddNotifier(n NotifyInit) {
NotifyRegister = append(NotifyRegister, n)
}

82
output/all/internal.go Normal file
View File

@ -0,0 +1,82 @@
package all
import (
log "github.com/sirupsen/logrus"
"dev.sum7.eu/genofire/logmania/bot"
"dev.sum7.eu/genofire/logmania/database"
"dev.sum7.eu/genofire/logmania/output"
)
var logger = log.WithField("notify", "all")
type Output struct {
output.Output
list []output.Output
db *database.DB
channelNotify chan *log.Entry
}
func Init(configInterface interface{}, db *database.DB, bot *bot.Bot) output.Output {
config := configInterface.(map[string]interface{})
var list []output.Output
var defaults []*database.Notify
for outputType, init := range output.Register {
configForItem := config[outputType]
if configForItem == nil {
log.Warnf("the input type '%s' has no configuration\n", outputType)
continue
}
notify := init(configForItem, db, bot)
if notify == nil {
continue
}
list = append(list, notify)
def := notify.Default()
if def != nil {
continue
}
defaults = append(defaults, def...)
}
db.DefaultNotify = defaults
out := &Output{
db: db,
list: list,
channelNotify: make(chan *log.Entry),
}
go out.sender()
return out
}
func (out *Output) sender() {
for c := range out.channelNotify {
e, _, tos := out.db.SendTo(c)
for _, to := range tos {
send := false
for _, item := range out.list {
send = item.Send(e, to)
if send {
break
}
}
if !send {
logger.Warnf("notify not send to %s: [%s] %s", to.Address(), c.Level.String(), c.Message)
}
}
}
}
func (out *Output) Send(e *log.Entry, to *database.Notify) bool {
out.channelNotify <- e
return true
}
func (out *Output) Close() {
for _, item := range out.list {
item.Close()
}
}

7
output/all/main.go Normal file
View File

@ -0,0 +1,7 @@
package all
import (
_ "dev.sum7.eu/genofire/logmania/output/file"
_ "dev.sum7.eu/genofire/logmania/output/websocket"
_ "dev.sum7.eu/genofire/logmania/output/xmpp"
)

View File

@ -5,82 +5,92 @@ import (
"path" "path"
"regexp" "regexp"
"github.com/mitchellh/mapstructure"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"dev.sum7.eu/genofire/logmania/bot" "dev.sum7.eu/genofire/logmania/bot"
"dev.sum7.eu/genofire/logmania/database" "dev.sum7.eu/genofire/logmania/database"
"dev.sum7.eu/genofire/logmania/lib" "dev.sum7.eu/genofire/logmania/output"
"dev.sum7.eu/genofire/logmania/notify"
) )
const ( const (
proto = "file" proto = "file"
) )
var logger = log.WithField("notify", proto) var logger = log.WithField("output", proto)
type Notifier struct { type Output struct {
notify.Notifier output.Output
defaults []*database.Notify defaults []*database.Notify
files map[string]*os.File files map[string]*os.File
formatter log.Formatter formatter log.Formatter
path string path string
} }
func Init(config *lib.NotifyConfig, db *database.DB, bot *bot.Bot) notify.Notifier { type OutputConfig struct {
if config.File.Directory == "" { Directory string `mapstructure:"directory"`
Default string `mapstructure:"default"`
}
func Init(configInterface interface{}, db *database.DB, bot *bot.Bot) output.Output {
var config OutputConfig
if err := mapstructure.Decode(configInterface, &config); err != nil {
logger.Warnf("not able to decode data: %s", err)
return nil return nil
} }
logger.WithField("directory", config.File.Directory).Info("startup") if config.Directory == "" {
return nil
}
logger.WithField("directory", config.Directory).Info("startup")
var defaults []*database.Notify var defaults []*database.Notify
if config.File.Default != "" { if config.Default != "" {
defaults = append(defaults, &database.Notify{ defaults = append(defaults, &database.Notify{
Protocol: proto, Protocol: proto,
To: config.File.Default, To: config.Default,
}) })
} }
return &Notifier{ return &Output{
defaults: defaults, defaults: defaults,
files: make(map[string]*os.File), files: make(map[string]*os.File),
formatter: &log.JSONFormatter{}, formatter: &log.JSONFormatter{},
path: config.File.Directory, path: config.Directory,
} }
} }
func (n *Notifier) Default() []*database.Notify { func (out *Output) Default() []*database.Notify {
return n.defaults return out.defaults
} }
func (n *Notifier) getFile(name string) *os.File { func (out *Output) getFile(name string) *os.File {
if file, ok := n.files[name]; ok { if file, ok := out.files[name]; ok {
return file return file
} }
if m, err := regexp.MatchString(`^[0-9A-Za-z_-]*$`, name); err != nil || !m { if m, err := regexp.MatchString(`^[0-9A-Za-z_-]*$`, name); err != nil || !m {
logger.Errorf("not allowed to use '%s:%s'", proto, name) logger.Errorf("not allowed to use '%s:%s'", proto, name)
return nil return nil
} }
filename := path.Join(n.path, name+".json") filename := path.Join(out.path, name+".json")
file, err := os.OpenFile(filename, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600) file, err := os.OpenFile(filename, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
if err != nil { if err != nil {
logger.Errorf("could not open file: %s", err.Error()) logger.Errorf("could not open file: %s", err.Error())
return nil return nil
} }
n.files[name] = file out.files[name] = file
return file return file
} }
func (n *Notifier) Send(e *log.Entry, to *database.Notify) bool { func (out *Output) Send(e *log.Entry, to *database.Notify) bool {
if to.Protocol != proto { if to.Protocol != proto {
return false return false
} }
byteText, err := n.formatter.Format(e) byteText, err := out.formatter.Format(e)
if err != nil { if err != nil {
return false return false
} }
text := to.RunReplace(string(byteText)) text := to.RunReplace(string(byteText))
file := n.getFile(to.To) file := out.getFile(to.To)
if file == nil { if file == nil {
return false return false
} }
@ -89,5 +99,5 @@ func (n *Notifier) Send(e *log.Entry, to *database.Notify) bool {
} }
func init() { func init() {
notify.AddNotifier(Init) output.Add(proto, Init)
} }

22
output/main.go Normal file
View File

@ -0,0 +1,22 @@
package output
import (
log "github.com/sirupsen/logrus"
"dev.sum7.eu/genofire/logmania/bot"
"dev.sum7.eu/genofire/logmania/database"
)
var Register = make(map[string]Init)
type Output interface {
Default() []*database.Notify
Send(entry *log.Entry, to *database.Notify) bool
Close()
}
type Init func(interface{}, *database.DB, *bot.Bot) Output
func Add(name string, init Init) {
Register[name] = init
}

View File

@ -4,33 +4,41 @@ import (
"net/http" "net/http"
"dev.sum7.eu/genofire/golang-lib/websocket" "dev.sum7.eu/genofire/golang-lib/websocket"
"github.com/mitchellh/mapstructure"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"dev.sum7.eu/genofire/logmania/bot" "dev.sum7.eu/genofire/logmania/bot"
"dev.sum7.eu/genofire/logmania/database" "dev.sum7.eu/genofire/logmania/database"
"dev.sum7.eu/genofire/logmania/lib" "dev.sum7.eu/genofire/logmania/output"
"dev.sum7.eu/genofire/logmania/notify"
) )
const ( const (
proto = "ws" proto = "ws"
) )
var logger = log.WithField("notify", proto) var logger = log.WithField("output", proto)
type Notifier struct { type Output struct {
notify.Notifier output.Output
defaults []*database.Notify defaults []*database.Notify
ws *websocket.Server ws *websocket.Server
formatter log.Formatter formatter log.Formatter
} }
func Init(config *lib.NotifyConfig, db *database.DB, bot *bot.Bot) notify.Notifier { type OutputConfig struct {
Default string `mapstructure:"default"`
}
func Init(configInterface interface{}, db *database.DB, bot *bot.Bot) output.Output {
var config OutputConfig
if err := mapstructure.Decode(configInterface, &config); err != nil {
logger.Warnf("not able to decode data: %s", err)
return nil
}
inputMSG := make(chan *websocket.Message) inputMSG := make(chan *websocket.Message)
ws := websocket.NewServer(inputMSG, nil) ws := websocket.NewServer(inputMSG, nil)
http.HandleFunc("/ws", ws.Handler) http.HandleFunc("/output/ws", ws.Handler)
http.Handle("/", http.FileServer(http.Dir(config.Websocket.Webroot)))
go func() { go func() {
for msg := range inputMSG { for msg := range inputMSG {
@ -44,26 +52,16 @@ func Init(config *lib.NotifyConfig, db *database.DB, bot *bot.Bot) notify.Notifi
} }
}() }()
srv := &http.Server{ logger.Info("startup")
Addr: config.Websocket.Address,
}
go func() {
if err := srv.ListenAndServe(); err != nil {
panic(err)
}
}()
logger.WithField("http-socket", config.Websocket.Address).Info("startup")
var defaults []*database.Notify var defaults []*database.Notify
if config.Websocket.Default != "" { if config.Default != "" {
defaults = append(defaults, &database.Notify{ defaults = append(defaults, &database.Notify{
Protocol: proto, Protocol: proto,
To: config.Websocket.Default, To: config.Default,
}) })
} }
return &Notifier{ return &Output{
defaults: defaults, defaults: defaults,
ws: ws, ws: ws,
formatter: &log.TextFormatter{ formatter: &log.TextFormatter{
@ -72,16 +70,16 @@ func Init(config *lib.NotifyConfig, db *database.DB, bot *bot.Bot) notify.Notifi
} }
} }
func (n *Notifier) Default() []*database.Notify { func (out *Output) Default() []*database.Notify {
return n.defaults return out.defaults
} }
func (n *Notifier) Send(e *log.Entry, to *database.Notify) bool { func (out *Output) Send(e *log.Entry, to *database.Notify) bool {
if to.Protocol != proto { if to.Protocol != proto {
return false return false
} }
n.ws.SendAll(&websocket.Message{ out.ws.SendAll(&websocket.Message{
Subject: to.Address(), Subject: to.Address(),
Body: &log.Entry{ Body: &log.Entry{
Buffer: e.Buffer, Buffer: e.Buffer,
@ -95,9 +93,9 @@ func (n *Notifier) Send(e *log.Entry, to *database.Notify) bool {
return true return true
} }
func (n *Notifier) Close() { func (out *Output) Close() {
} }
func init() { func init() {
notify.AddNotifier(Init) output.Add("websocket", Init)
} }

View File

@ -6,12 +6,12 @@ import (
xmpp_client "dev.sum7.eu/genofire/yaja/client" xmpp_client "dev.sum7.eu/genofire/yaja/client"
xmpp "dev.sum7.eu/genofire/yaja/xmpp" xmpp "dev.sum7.eu/genofire/yaja/xmpp"
"dev.sum7.eu/genofire/yaja/xmpp/base" "dev.sum7.eu/genofire/yaja/xmpp/base"
"github.com/mitchellh/mapstructure"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"dev.sum7.eu/genofire/logmania/bot" "dev.sum7.eu/genofire/logmania/bot"
"dev.sum7.eu/genofire/logmania/database" "dev.sum7.eu/genofire/logmania/database"
"dev.sum7.eu/genofire/logmania/lib" "dev.sum7.eu/genofire/logmania/output"
"dev.sum7.eu/genofire/logmania/notify"
) )
const ( const (
@ -20,20 +20,31 @@ const (
nickname = "logmania" nickname = "logmania"
) )
var logger = log.WithField("notify", proto) var logger = log.WithField("output", proto)
type Notifier struct { type Output struct {
notify.Notifier output.Output
defaults []*database.Notify defaults []*database.Notify
client *xmpp_client.Client client *xmpp_client.Client
channels map[string]bool channels map[string]bool
formatter log.Formatter formatter log.Formatter
} }
func Init(config *lib.NotifyConfig, db *database.DB, bot *bot.Bot) notify.Notifier { type OutputConfig struct {
JID string `mapstructure:"jid"`
Password string `mapstructure:"password"`
Defaults map[string]bool `mapstructure:"default"`
}
func Init(configInterface interface{}, db *database.DB, bot *bot.Bot) output.Output {
var config OutputConfig
if err := mapstructure.Decode(configInterface, &config); err != nil {
logger.Warnf("not able to decode data: %s", err)
return nil
}
channels := make(map[string]bool) channels := make(map[string]bool)
client, err := xmpp_client.NewClient(xmppbase.NewJID(config.XMPP.JID), config.XMPP.Password) client, err := xmpp_client.NewClient(xmppbase.NewJID(config.JID), config.Password)
if err != nil { if err != nil {
logger.Error(err) logger.Error(err)
return nil return nil
@ -42,7 +53,7 @@ func Init(config *lib.NotifyConfig, db *database.DB, bot *bot.Bot) notify.Notifi
for { for {
if err := client.Start(); err != nil { if err := client.Start(); err != nil {
log.Warn("close connection, try reconnect") log.Warn("close connection, try reconnect")
client.Connect(config.XMPP.Password) client.Connect(config.Password)
} else { } else {
log.Warn("closed connection") log.Warn("closed connection")
return return
@ -130,10 +141,10 @@ func Init(config *lib.NotifyConfig, db *database.DB, bot *bot.Bot) notify.Notifi
} }
} }
logger.WithField("jid", config.XMPP.JID).Info("startup") logger.WithField("jid", config.JID).Info("startup")
var defaults []*database.Notify var defaults []*database.Notify
for to, muc := range config.XMPP.Defaults { for to, muc := range config.Defaults {
def := &database.Notify{ def := &database.Notify{
Protocol: proto, Protocol: proto,
To: to, To: to,
@ -143,7 +154,7 @@ func Init(config *lib.NotifyConfig, db *database.DB, bot *bot.Bot) notify.Notifi
} }
defaults = append(defaults, def) defaults = append(defaults, def)
} }
return &Notifier{ return &Output{
channels: channels, channels: channels,
defaults: defaults, defaults: defaults,
client: client, client: client,
@ -153,31 +164,31 @@ func Init(config *lib.NotifyConfig, db *database.DB, bot *bot.Bot) notify.Notifi
} }
} }
func (n *Notifier) Default() []*database.Notify { func (out *Output) Default() []*database.Notify {
return n.defaults return out.defaults
} }
func (n *Notifier) Send(e *log.Entry, to *database.Notify) bool { func (out *Output) Send(e *log.Entry, to *database.Notify) bool {
textByte, err := n.formatter.Format(e) textByte, err := out.formatter.Format(e)
if err != nil { if err != nil {
logger.Error("during format notify", err) logger.Error("during format notify", err)
return false return false
} }
text := strings.TrimRight(to.RunReplace(string(textByte)), "\n") text := strings.TrimRight(to.RunReplace(string(textByte)), "\n")
if to.Protocol == protoGroup { if to.Protocol == protoGroup {
if _, ok := n.channels[to.To]; ok { if _, ok := out.channels[to.To]; ok {
toJID := xmppbase.NewJID(to.To) toJID := xmppbase.NewJID(to.To)
toJID.Resource = nickname toJID.Resource = nickname
err := n.client.Send(&xmpp.PresenceClient{ err := out.client.Send(&xmpp.PresenceClient{
To: toJID, To: toJID,
}) })
if err != nil { if err != nil {
logger.Error("xmpp could not join ", toJID.String(), " error:", err) logger.Error("xmpp could not join ", toJID.String(), " error:", err)
} else { } else {
n.channels[to.To] = true out.channels[to.To] = true
} }
} }
err := n.client.Send(&xmpp.MessageClient{ err := out.client.Send(&xmpp.MessageClient{
Type: xmpp.MessageTypeGroupchat, Type: xmpp.MessageTypeGroupchat,
To: xmppbase.NewJID(to.To), To: xmppbase.NewJID(to.To),
Body: text, Body: text,
@ -188,7 +199,7 @@ func (n *Notifier) Send(e *log.Entry, to *database.Notify) bool {
return true return true
} }
if to.Protocol == proto { if to.Protocol == proto {
err := n.client.Send(&xmpp.MessageClient{ err := out.client.Send(&xmpp.MessageClient{
Type: xmpp.MessageTypeChat, Type: xmpp.MessageTypeChat,
To: xmppbase.NewJID(to.To), To: xmppbase.NewJID(to.To),
Body: text, Body: text,
@ -201,11 +212,11 @@ func (n *Notifier) Send(e *log.Entry, to *database.Notify) bool {
return false return false
} }
func (n *Notifier) Close() { func (out *Output) Close() {
for jid := range n.channels { for jid := range out.channels {
toJID := xmppbase.NewJID(jid) toJID := xmppbase.NewJID(jid)
toJID.Resource = nickname toJID.Resource = nickname
err := n.client.Send(&xmpp.PresenceClient{ err := out.client.Send(&xmpp.PresenceClient{
To: toJID, To: toJID,
Type: xmpp.PresenceTypeUnavailable, Type: xmpp.PresenceTypeUnavailable,
}) })
@ -213,9 +224,9 @@ func (n *Notifier) Close() {
logger.Error("xmpp could not leave ", toJID.String(), " error:", err) logger.Error("xmpp could not leave ", toJID.String(), " error:", err)
} }
} }
n.client.Close() out.client.Close()
} }
func init() { func init() {
notify.AddNotifier(Init) output.Add(proto, Init)
} }

View File

@ -1,40 +0,0 @@
package all
import (
log "github.com/sirupsen/logrus"
"dev.sum7.eu/genofire/logmania/lib"
"dev.sum7.eu/genofire/logmania/receive"
)
type Receiver struct {
receive.Receiver
list []receive.Receiver
}
func Init(config *lib.ReceiveConfig, exportChannel chan *log.Entry) receive.Receiver {
var list []receive.Receiver
for _, init := range receive.Register {
receiver := init(config, exportChannel)
if receiver == nil {
continue
}
list = append(list, receiver)
}
return &Receiver{
list: list,
}
}
func (r *Receiver) Listen() {
for _, item := range r.list {
go item.Listen()
}
}
func (r *Receiver) Close() {
for _, item := range r.list {
item.Close()
}
}

View File

@ -1,7 +0,0 @@
package all
import (
_ "dev.sum7.eu/genofire/logmania/receive/journald_json"
_ "dev.sum7.eu/genofire/logmania/receive/logrus"
_ "dev.sum7.eu/genofire/logmania/receive/syslog"
)

View File

@ -1,65 +0,0 @@
package journald_json
import (
"net"
log "github.com/sirupsen/logrus"
"dev.sum7.eu/genofire/logmania/lib"
"dev.sum7.eu/genofire/logmania/receive"
)
var logger = log.WithField("receive", "journald_json")
type Receiver struct {
receive.Receiver
exportChannel chan *log.Entry
serverSocket *net.UDPConn
}
func Init(config *lib.ReceiveConfig, exportChannel chan *log.Entry) receive.Receiver {
addr, err := net.ResolveUDPAddr(config.JournaldJSON.Type, config.JournaldJSON.Address)
ln, err := net.ListenUDP(config.JournaldJSON.Type, addr)
if err != nil {
logger.Error("init ", err)
return nil
}
recv := &Receiver{
serverSocket: ln,
exportChannel: exportChannel,
}
logger.Info("init")
return recv
}
const maxDataGramSize = 8192
func (rc *Receiver) Listen() {
logger.Info("listen")
for {
buf := make([]byte, maxDataGramSize)
n, src, err := rc.serverSocket.ReadFromUDP(buf)
if err != nil {
logger.Warn("failed to accept connection", err)
continue
}
raw := make([]byte, n)
copy(raw, buf)
entry := toLogEntry(raw, src.IP.String())
if entry != nil {
rc.exportChannel <- entry
}
}
}
func (rc *Receiver) Close() {
rc.serverSocket.Close()
}
func init() {
receive.AddReceiver("journald_json", Init)
}

View File

@ -1,19 +0,0 @@
package receive
import (
"dev.sum7.eu/genofire/logmania/lib"
log "github.com/sirupsen/logrus"
)
var Register = make(map[string]ReceiverInit)
type Receiver interface {
Listen()
Close()
}
type ReceiverInit func(*lib.ReceiveConfig, chan *log.Entry) Receiver
func AddReceiver(name string, n ReceiverInit) {
Register[name] = n
}

View File

@ -1,65 +0,0 @@
package syslog
import (
"net"
log "github.com/sirupsen/logrus"
"dev.sum7.eu/genofire/logmania/lib"
"dev.sum7.eu/genofire/logmania/receive"
)
var logger = log.WithField("receive", "syslog")
type Receiver struct {
receive.Receiver
exportChannel chan *log.Entry
serverSocket *net.UDPConn
}
func Init(config *lib.ReceiveConfig, exportChannel chan *log.Entry) receive.Receiver {
addr, err := net.ResolveUDPAddr(config.Syslog.Type, config.Syslog.Address)
ln, err := net.ListenUDP(config.Syslog.Type, addr)
if err != nil {
logger.Error("init ", err)
return nil
}
recv := &Receiver{
serverSocket: ln,
exportChannel: exportChannel,
}
logger.Info("init")
return recv
}
const maxDataGramSize = 8192
func (rc *Receiver) Listen() {
logger.Info("listen")
for {
buf := make([]byte, maxDataGramSize)
n, src, err := rc.serverSocket.ReadFromUDP(buf)
if err != nil {
logger.Warn("failed to accept connection", err)
continue
}
raw := make([]byte, n)
copy(raw, buf)
entry := toLogEntry(raw, src.IP.String())
if entry != nil {
rc.exportChannel <- entry
}
}
}
func (rc *Receiver) Close() {
rc.serverSocket.Close()
}
func init() {
receive.AddReceiver("syslog", Init)
}