diff --git a/logging/cmd/main.go b/logging/cmd/main.go new file mode 100644 index 0000000..b119eee --- /dev/null +++ b/logging/cmd/main.go @@ -0,0 +1,74 @@ +package main + +import ( + "encoding/json" + "fmt" + "os" + "strconv" + "strings" + "time" + + "github.com/bdlm/log" + "github.com/satori/go.uuid" + "github.com/streadway/amqp" + + srv "dev.sum7.eu/genofire/microservices-collection/logging" +) + +const ServiceName = "eu.sum7.log-cmd" + +var ServiceID = uuid.Must(uuid.NewV4()) + +func main() { + log.SetFormatter(&log.TextFormatter{ + DisableTimestamp: true, + }) + + logLevel, err := strconv.ParseUint(os.Args[1], 10, 32) + if err != nil { + log.Fatalf("Failed to parse LogLevel: %s", err) + } + logMessage := srv.LogMessage{ + ServiceName: ServiceName, + ServiceID: ServiceID, + Time: time.Now(), + Level: uint32(logLevel), + Message: strings.Join(os.Args[2:], " "), + } + body, err := json.Marshal(logMessage) + if err != nil { + log.Fatalf("Failed to parse LogLevel: %s", err) + } + + conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") + if err != nil { + log.Fatalf("Failed to connect to RabbitMQ: %s", err) + } + defer conn.Close() + + ch, err := conn.Channel() + if err != nil { + log.Fatalf("Failed to open a channel: %s", err) + } + defer ch.Close() + + if err = srv.ExchangeDeclare(ch); err != nil { + log.Fatalf("Failed to declare an exchange: %s", err) + } + + err = ch.Publish( + srv.EXCHANGE, + fmt.Sprintf("%d.%s", logLevel, ServiceName), + false, + false, + amqp.Publishing{ + ContentType: "application/json", + DeliveryMode: amqp.Persistent, + Body: body, + }) + if err != nil { + log.Fatalf("Failed to publish log message: %s", err) + } + + log.Printf(" [x] Sent %s", body) +} diff --git a/logging/service/main.go b/logging/service/main.go new file mode 100644 index 0000000..7fc418c --- /dev/null +++ b/logging/service/main.go @@ -0,0 +1,128 @@ +package main + +import ( + "encoding/json" + "flag" + "fmt" + "os" + "os/signal" + "syscall" + + "github.com/bdlm/log" + "github.com/bdlm/std/logger" + "github.com/streadway/amqp" + + "dev.sum7.eu/genofire/microservices-collection/lib" + srv "dev.sum7.eu/genofire/microservices-collection/logging" +) + +var logRecieved = false + +func main() { + flag.BoolVar(&logRecieved, "recieved", logRecieved, "show recieved log on console") + flag.Parse() + lib.LogUpdateConfig() + + conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") + if err != nil { + log.Fatalf("Failed to connect to RabbitMQ: %s", err) + } + defer conn.Close() + + ch, err := conn.Channel() + if err != nil { + log.Fatalf("Failed to open a channel: %s", err) + } + defer ch.Close() + + if err = srv.ExchangeDeclare(ch); err != nil { + log.Fatalf("Failed to declare an exchange: %s", err) + } + + q, err := ch.QueueDeclare( + "", // name + false, // durable + false, // delete when usused + true, // exclusive + false, // no-wait + nil, // arguments + ) + if err != nil { + log.Fatalf("Failed to declare a queue: %s", err) + } + + err = ch.QueueBind( + q.Name, // queue name + "", // routing key + srv.EXCHANGE, // exchange + false, + nil) + if err != nil { + log.Fatalf("Failed to bind a queue: %s", err) + } + + msgs, err := ch.Consume( + q.Name, // queue + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + if err != nil { + log.Fatalf("Failed to register a consumer: %s", err) + } + + loggerRecieved := log.New() + loggerRecieved.SetLevel(logger.Trace) + + go func() { + for d := range msgs { + var logMessage *srv.LogMessage + if err := json.Unmarshal([]byte(d.Body), &logMessage); err != nil { + log.Fatal(err) + } + //TODO write to Database (to get last values for web client) + + // output to console + if !logRecieved { + continue + } + e := &log.Entry{ + Logger: loggerRecieved, + Time: logMessage.Time, + Data: logMessage.Data, + } + e = e.WithFields(map[string]interface{}{ + "service_name": logMessage.ServiceName, + "service_id": logMessage.ServiceID, + "log_level": logMessage.Level, + }) + lvl := logger.Level(uint32(logMessage.Level)) + if lvl >= logger.Trace { + e.Debug(fmt.Sprintf("[trace] %s", logMessage.Message)) + } else if lvl >= logger.Debug { + e.Debug(logMessage.Message) + } else if lvl >= logger.Info { + e.Info(logMessage.Message) + } else if lvl >= logger.Warn { + e.Warn(logMessage.Message) + } else if lvl >= logger.Error { + e.Error(logMessage.Message) + } else if lvl >= logger.Panic { + e.Error(fmt.Sprintf("[panic] %s", logMessage.Message)) + } else if lvl >= logger.Fatal { + e.Error(fmt.Sprintf("[fatal] %s", logMessage.Message)) + } + } + }() + + log.Debug("started") + + // Wait for INT/TERM + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + sig := <-sigs + log.WithField("recieved", sig).Info("stopped") +} diff --git a/logging/srv.go b/logging/srv.go new file mode 100644 index 0000000..33e5d36 --- /dev/null +++ b/logging/srv.go @@ -0,0 +1,43 @@ +package logging + +import ( + "time" + + "github.com/bdlm/log" + "github.com/satori/go.uuid" + "github.com/streadway/amqp" +) + +const EXCHANGE = "sum7.logging" + +func ExchangeDeclare(ch *amqp.Channel) error { + return ch.ExchangeDeclare( + EXCHANGE, + "direct", + true, + false, + false, + false, + nil, + ) +} + +type LogMessage struct { + ServiceName string `json:"sname"` + ServiceID uuid.UUID `json:"sid"` + Time time.Time `json:"time"` + Level uint32 `json:"level"` + Message string `json:"message"` + Data map[string]interface{} `json:"data"` +} + +func LogMessageFromEntry(sname string, sid uuid.UUID, e *log.Entry) *LogMessage { + return &LogMessage{ + ServiceName: sname, + ServiceID: sid, + Time: e.Time, + Level: uint32(e.Level), + Message: e.Message, + Data: e.Data, + } +}