[TASK] database as socket output

This commit is contained in:
Martin Geno 2017-04-27 21:09:46 +02:00
parent fd7e712282
commit f9f50a4a54
No known key found for this signature in database
GPG Key ID: F0D39A37E925E941
4 changed files with 132 additions and 0 deletions

View File

@ -79,7 +79,18 @@ system = "testing"
enable = false enable = false
path = "/var/log/yanic.log" path = "/var/log/yanic.log"
[[database.connection.graphite]] [[database.connection.graphite]]
enable = false enable = false
address = "localhost:2003" address = "localhost:2003"
prefix = "freifunk" prefix = "freifunk"
[[database.connection.socket]]
enable = false
type = "tcp"
address = ":8081"
[[database.connection.socket]]
enable = false
type = "unix"
address = "/var/lib/collector/database.socket"

View File

@ -4,4 +4,5 @@ import (
_ "github.com/FreifunkBremen/yanic/database/graphite" _ "github.com/FreifunkBremen/yanic/database/graphite"
_ "github.com/FreifunkBremen/yanic/database/influxdb" _ "github.com/FreifunkBremen/yanic/database/influxdb"
_ "github.com/FreifunkBremen/yanic/database/logging" _ "github.com/FreifunkBremen/yanic/database/logging"
_ "github.com/FreifunkBremen/yanic/database/socket"
) )

View File

@ -0,0 +1,82 @@
package socket
/**
* This database type is just for,
* - debugging without a influxconn
* - example for other developers for new databases
*/
import (
"log"
"net"
"time"
"github.com/FreifunkBremen/yanic/database"
"github.com/FreifunkBremen/yanic/runtime"
)
type Connection struct {
database.Connection
config Config
listener net.Listener
clients map[net.Addr]net.Conn
}
type Config map[string]interface{}
func (c Config) Enable() bool {
return c["enable"].(bool)
}
func (c Config) Type() string {
return c["type"].(string)
}
func (c Config) Address() string {
return c["address"].(string)
}
func init() {
database.RegisterAdapter("socket", Connect)
}
func Connect(configuration interface{}) (database.Connection, error) {
var config Config
config = configuration.(map[string]interface{})
if !config.Enable() {
return nil, nil
}
ln, err := net.Listen(config.Type(), config.Address())
if err != nil {
return nil, err
}
conn := &Connection{config: config, listener: ln, clients: make(map[net.Addr]net.Conn)}
go conn.handleSocketConnection(ln)
log.Println("[socket-database] listen on: ", ln.Addr())
return conn, nil
}
func (conn *Connection) InsertNode(node *runtime.Node) {
conn.sendJSON(EventMessage{Event: "insert_node", Body: node})
}
func (conn *Connection) InsertGlobals(stats *runtime.GlobalStats, time time.Time) {
conn.sendJSON(EventMessage{Event: "insert_globals", Body: stats})
}
func (conn *Connection) PruneNodes(deleteAfter time.Duration) {
conn.sendJSON(EventMessage{Event: "prune_nodes"})
}
func (conn *Connection) Close() {
for _, c := range conn.clients {
err := c.Close()
if err != nil {
log.Println("[socket-database] client was not able to close:", err)
}
}
err := conn.listener.Close()
if err != nil {
log.Println("[socket-database] server was not able to close:", err)
}
}

View File

@ -0,0 +1,38 @@
package socket
import (
"encoding/json"
"log"
"net"
)
type EventMessage struct {
Event string `json:"event"`
Body interface{} `json:"body,omitempty"`
}
func (config *Connection) handleSocketConnection(ln net.Listener) {
for {
c, err := ln.Accept()
if err != nil {
log.Println("[socket-database] error during connection of a client", err)
continue
}
config.clients[c.RemoteAddr()] = c
}
}
func (conn *Connection) sendJSON(msg EventMessage) {
for i, c := range conn.clients {
d := json.NewEncoder(c)
err := d.Encode(&msg)
if err != nil {
err = c.Close()
if err != nil {
log.Println("[socket-database] connection could not close after error on sending event:", err)
}
delete(conn.clients, i)
}
}
}