From 88975d256609a4674508afd6c53ca8d317573b90 Mon Sep 17 00:00:00 2001 From: Andreas Rammhold Date: Thu, 1 Jun 2017 22:52:14 +0200 Subject: [PATCH] [TASK] add database type graphite (#65) --- config_example.toml | 5 ++ database/all/main.go | 1 + database/graphite/database.go | 90 ++++++++++++++++++++++++++++ database/graphite/global.go | 33 ++++++++++ database/graphite/node.go | 109 ++++++++++++++++++++++++++++++++++ runtime/config_test.go | 6 ++ 6 files changed, 244 insertions(+) create mode 100644 database/graphite/database.go create mode 100644 database/graphite/global.go create mode 100644 database/graphite/node.go diff --git a/config_example.toml b/config_example.toml index fd215ab..c6e15ba 100644 --- a/config_example.toml +++ b/config_example.toml @@ -76,3 +76,8 @@ system = "testing" [[database.connection.logging]] enable = false path = "/var/log/yanic.log" + +[[database.connection.graphite]] +enable = false +address = "localhost:2003" +prefix = "freifunk" diff --git a/database/all/main.go b/database/all/main.go index 13b3846..fae0160 100644 --- a/database/all/main.go +++ b/database/all/main.go @@ -1,6 +1,7 @@ package all import ( + _ "github.com/FreifunkBremen/yanic/database/graphite" _ "github.com/FreifunkBremen/yanic/database/influxdb" _ "github.com/FreifunkBremen/yanic/database/logging" ) diff --git a/database/graphite/database.go b/database/graphite/database.go new file mode 100644 index 0000000..2cf8d97 --- /dev/null +++ b/database/graphite/database.go @@ -0,0 +1,90 @@ +package graphite + +import ( + "github.com/FreifunkBremen/yanic/database" + "github.com/fgrosse/graphigo" + "log" + "sync" +) + +const ( + MeasurementNode = "node" // Measurement for per-node statistics + MeasurementGlobal = "global" // Measurement for summarized global statistics + CounterMeasurementFirmware = "firmware" // Measurement for firmware statistics + CounterMeasurementModel = "model" // Measurement for model statistics +) + +type Connection struct { + database.Connection + client graphigo.Client + points chan []graphigo.Metric + wg sync.WaitGroup +} + +type Config map[string]interface{} + +func (c Config) Address() string { + return c["address"].(string) +} + +func (c Config) Prefix() string { + return c["prefix"].(string) +} + +func (c Config) Enable() bool { + return c["enable"].(bool) +} + +func Connect(configuration interface{}) (database.Connection, error) { + var config Config + + config = configuration.(map[string]interface{}) + + if !config.Enable() { + return nil, nil + } + + con := &Connection{ + client: graphigo.Client{ + Address: config.Address(), + Prefix: config.Prefix(), + }, + points: make(chan []graphigo.Metric, 1000), + } + + if err := con.client.Connect(); err != nil { + return nil, err + } + + con.wg.Add(1) + go con.addWorker() + + return con, nil +} + +func (c *Connection) Close() { + close(c.points) + if c.client.Connection != nil { + c.client.Close() + } +} + +func (c *Connection) addWorker() { + defer c.wg.Done() + defer c.Close() + for point := range c.points { + err := c.client.SendAll(point) + if err != nil { + log.Fatal(err) + return + } + } +} + +func (c *Connection) addPoint(point []graphigo.Metric) { + c.points <- point +} + +func init() { + database.RegisterAdapter("graphite", Connect) +} diff --git a/database/graphite/global.go b/database/graphite/global.go new file mode 100644 index 0000000..00f2d31 --- /dev/null +++ b/database/graphite/global.go @@ -0,0 +1,33 @@ +package graphite + +import ( + "time" + + "github.com/FreifunkBremen/yanic/runtime" + "github.com/fgrosse/graphigo" +) + +func (c *Connection) InsertGlobals(stats *runtime.GlobalStats, time time.Time) { + c.addPoint(GlobalStatsFields(stats)) + c.addCounterMap(CounterMeasurementModel, stats.Models, time) + c.addCounterMap(CounterMeasurementFirmware, stats.Firmwares, time) +} + +func GlobalStatsFields(stats *runtime.GlobalStats) []graphigo.Metric { + return []graphigo.Metric{ + {Name: MeasurementGlobal + ".nodes", Value: stats.Nodes}, + {Name: MeasurementGlobal + ".gateways", Value: stats.Gateways}, + {Name: MeasurementGlobal + ".clients.total", Value: stats.Clients}, + {Name: MeasurementGlobal + ".clients.wifi", Value: stats.ClientsWifi}, + {Name: MeasurementGlobal + ".clients.wifi24", Value: stats.ClientsWifi24}, + {Name: MeasurementGlobal + ".clients.wifi5", Value: stats.ClientsWifi5}, + } +} + +func (c *Connection) addCounterMap(name string, m runtime.CounterMap, t time.Time) { + var fields []graphigo.Metric + for key, count := range m { + fields = append(fields, graphigo.Metric{Name: name + `.` + key + `.count`, Value: count, Timestamp: t}) + } + c.addPoint(fields) +} diff --git a/database/graphite/node.go b/database/graphite/node.go new file mode 100644 index 0000000..2b04e9a --- /dev/null +++ b/database/graphite/node.go @@ -0,0 +1,109 @@ +package graphite + +import ( + "strings" + "time" + + "github.com/FreifunkBremen/yanic/runtime" + "github.com/fgrosse/graphigo" +) + +// PruneNode implementation of database +func (c *Connection) PruneNodes(deleteAfter time.Duration) { + // we can't really delete nodes from graphite remotely :( +} + +// InsertNode implementation of database +func (c *Connection) InsertNode(node *runtime.Node) { + var fields []graphigo.Metric + + stats := node.Statistics + + nodeinfo := node.Nodeinfo + + if nodeinfo == nil { + return + } + + node_prefix := MeasurementNode + `.` + stats.NodeID + `.` + strings.Replace(nodeinfo.Hostname, ".", "__", -1) + + addField := func(name string, value interface{}) { + fields = append(fields, graphigo.Metric{Name: node_prefix + "." + name, Value: value}) + } + + if neighbours := node.Neighbours; neighbours != nil { + vpn := 0 + if meshvpn := stats.MeshVPN; meshvpn != nil { + for _, group := range meshvpn.Groups { + for _, link := range group.Peers { + if link != nil && link.Established > 1 { + vpn++ + } + } + } + } + addField("neighbours.vpn", vpn) + // protocol: Batman Advance + batadv := 0 + for _, batadvNeighbours := range neighbours.Batadv { + batadv += len(batadvNeighbours.Neighbours) + } + addField("neighbours.batadv", batadv) + + // protocol: LLDP + lldp := 0 + for _, lldpNeighbours := range neighbours.LLDP { + lldp += len(lldpNeighbours) + } + addField("neighbours.lldp", lldp) + + // total is the sum of all protocols + addField("neighbours.total", batadv+lldp) + } + + if t := stats.Traffic.Rx; t != nil { + addField("traffic.rx.bytes", int64(t.Bytes)) + addField("traffic.rx.packets", t.Packets) + } + if t := stats.Traffic.Tx; t != nil { + addField("traffic.tx.bytes", int64(t.Bytes)) + addField("traffic.tx.packets", t.Packets) + addField("traffic.tx.dropped", t.Dropped) + } + if t := stats.Traffic.Forward; t != nil { + addField("traffic.forward.bytes", int64(t.Bytes)) + addField("traffic.forward.packets", t.Packets) + } + if t := stats.Traffic.MgmtRx; t != nil { + addField("traffic.mgmt_rx.bytes", int64(t.Bytes)) + addField("traffic.mgmt_rx.packets", t.Packets) + } + if t := stats.Traffic.MgmtTx; t != nil { + addField("traffic.mgmt_tx.bytes", int64(t.Bytes)) + addField("traffic.mgmt_tx.packets", t.Packets) + } + + for _, airtime := range stats.Wireless { + suffix := airtime.FrequencyName() + addField("airtime"+suffix+".chan_util", airtime.ChanUtil) + addField("airtime"+suffix+".rx_util", airtime.RxUtil) + addField("airtime"+suffix+".tx_util", airtime.TxUtil) + addField("airtime"+suffix+".noise", airtime.Noise) + addField("airtime"+suffix+".frequency", airtime.Frequency) + } + + addField("load", stats.LoadAverage) + addField("time.up", int64(stats.Uptime)) + addField("time.idle", int64(stats.Idletime)) + addField("proc.running", stats.Processes.Running) + addField("clients.wifi", stats.Clients.Wifi) + addField("clients.wifi24", stats.Clients.Wifi24) + addField("clients.wifi5", stats.Clients.Wifi5) + addField("clients.total", stats.Clients.Total) + addField("memory.buffers", stats.Memory.Buffers) + addField("memory.cached", stats.Memory.Cached) + addField("memory.free", stats.Memory.Free) + addField("memory.total", stats.Memory.Total) + + c.addPoint(fields) +} diff --git a/runtime/config_test.go b/runtime/config_test.go index 814ef42..a5110a3 100644 --- a/runtime/config_test.go +++ b/runtime/config_test.go @@ -30,4 +30,10 @@ func TestReadConfig(t *testing.T) { assert.Len(dbs, 1, "more influxdb are given") influxdb = dbs[0].(map[string]interface{}) assert.Equal(influxdb["database"], "ffhb") + + var graphitedb map[string]interface{} + dbs = config.Database.Connection["graphite"] + assert.Len(dbs, 1, "more graphitedb are given") + graphitedb = dbs[0].(map[string]interface{}) + assert.Equal(graphitedb["address"], "localhost:2003") }