diff --git a/cmd/yanic/main.go b/cmd/yanic/main.go index 16b6831..ed23888 100644 --- a/cmd/yanic/main.go +++ b/cmd/yanic/main.go @@ -80,7 +80,7 @@ func main() { func importRRD(path string) { log.Println("importing RRD from", path) for ds := range rrd.Read(path) { - connections.AddStatistics( + connections.InsertGlobals( &runtime.GlobalStats{ Nodes: uint32(ds.Nodes), Clients: uint32(ds.Clients), diff --git a/database/all/internal.go b/database/all/internal.go index ddaefc8..ff1b816 100644 --- a/database/all/internal.go +++ b/database/all/internal.go @@ -30,15 +30,16 @@ func Connect(configuration interface{}) (database.Connection, error) { } return &Connection{list: list}, nil } -func (conn *Connection) AddNode(nodeID string, node *runtime.Node) { + +func (conn *Connection) InsertNode(node *runtime.Node) { for _, item := range conn.list { - item.AddNode(nodeID, node) + item.InsertNode(node) } } -func (conn *Connection) AddStatistics(stats *runtime.GlobalStats, time time.Time) { +func (conn *Connection) InsertGlobals(stats *runtime.GlobalStats, time time.Time) { for _, item := range conn.list { - item.AddStatistics(stats, time) + item.InsertGlobals(stats, time) } } diff --git a/database/database.go b/database/database.go index dbdae7d..01621e7 100644 --- a/database/database.go +++ b/database/database.go @@ -8,11 +8,11 @@ import ( // Connection interface to use for implementation in e.g. influxdb type Connection interface { - // AddNode stores data of a single node - AddNode(nodeID string, node *runtime.Node) + // InsertNode stores statistics per node + InsertNode(node *runtime.Node) - // AddStatistics stores global statistics - AddStatistics(stats *runtime.GlobalStats, time time.Time) + // InsertGlobals stores global statistics + InsertGlobals(stats *runtime.GlobalStats, time time.Time) // PruneNodes prunes historical per-node data PruneNodes(deleteAfter time.Duration) diff --git a/database/influxdb/database.go b/database/influxdb/database.go index b168308..7434556 100644 --- a/database/influxdb/database.go +++ b/database/influxdb/database.go @@ -1,7 +1,6 @@ package influxdb import ( - "fmt" "log" "sync" "time" @@ -10,7 +9,6 @@ import ( "github.com/influxdata/influxdb/models" "github.com/FreifunkBremen/yanic/database" - "github.com/FreifunkBremen/yanic/runtime" ) const ( @@ -80,11 +78,6 @@ func Connect(configuration interface{}) (database.Connection, error) { return db, nil } -func (conn *Connection) PruneNodes(deleteAfter time.Duration) { - query := fmt.Sprintf("delete from %s where time < now() - %ds", MeasurementNode, deleteAfter/time.Second) - conn.client.Query(client.NewQuery(query, conn.config.Database(), "m")) -} - func (conn *Connection) addPoint(name string, tags models.Tags, fields models.Fields, time time.Time) { point, err := client.NewPoint(name, tags.Map(), fields, time) if err != nil { @@ -93,36 +86,6 @@ func (conn *Connection) addPoint(name string, tags models.Tags, fields models.Fi conn.points <- point } -// Saves the values of a CounterMap in the database. -// The key are used as 'value' tag. -// The value is used as 'counter' field. -func (conn *Connection) addCounterMap(name string, m runtime.CounterMap) { - now := time.Now() - for key, count := range m { - conn.addPoint( - name, - models.Tags{ - models.Tag{Key: []byte("value"), Value: []byte(key)}, - }, - models.Fields{"count": count}, - now, - ) - } -} - -// AddStatistics implementation of database -func (conn *Connection) AddStatistics(stats *runtime.GlobalStats, time time.Time) { - conn.addPoint(MeasurementGlobal, nil, GlobalStatsFields(stats), time) - conn.addCounterMap(CounterMeasurementModel, stats.Models) - conn.addCounterMap(CounterMeasurementFirmware, stats.Firmwares) -} - -// AddNode implementation of database -func (conn *Connection) AddNode(nodeID string, node *runtime.Node) { - tags, fields := nodeToInflux(node) - conn.addPoint(MeasurementNode, tags, fields, time.Now()) -} - // Close all connection and clean up func (conn *Connection) Close() { close(conn.points) diff --git a/database/influxdb/global.go b/database/influxdb/global.go new file mode 100644 index 0000000..de2495a --- /dev/null +++ b/database/influxdb/global.go @@ -0,0 +1,44 @@ +package influxdb + +import ( + "time" + + "github.com/FreifunkBremen/yanic/runtime" + "github.com/influxdata/influxdb/models" +) + +// InsertGlobals implementation of database +func (conn *Connection) InsertGlobals(stats *runtime.GlobalStats, time time.Time) { + conn.addPoint(MeasurementGlobal, nil, GlobalStatsFields(stats), time) + conn.addCounterMap(CounterMeasurementModel, stats.Models) + conn.addCounterMap(CounterMeasurementFirmware, stats.Firmwares) +} + +// GlobalStatsFields returns fields for InfluxDB +func GlobalStatsFields(stats *runtime.GlobalStats) map[string]interface{} { + return map[string]interface{}{ + "nodes": stats.Nodes, + "gateways": stats.Gateways, + "clients.total": stats.Clients, + "clients.wifi": stats.ClientsWifi, + "clients.wifi24": stats.ClientsWifi24, + "clients.wifi5": stats.ClientsWifi5, + } +} + +// Saves the values of a CounterMap in the database. +// The key are used as 'value' tag. +// The value is used as 'counter' field. +func (conn *Connection) addCounterMap(name string, m runtime.CounterMap) { + now := time.Now() + for key, count := range m { + conn.addPoint( + name, + models.Tags{ + models.Tag{Key: []byte("value"), Value: []byte(key)}, + }, + models.Fields{"count": count}, + now, + ) + } +} diff --git a/database/influxdb/stats_test.go b/database/influxdb/global_test.go similarity index 100% rename from database/influxdb/stats_test.go rename to database/influxdb/global_test.go diff --git a/database/influxdb/node.go b/database/influxdb/node.go index 9b17f59..5f23368 100644 --- a/database/influxdb/node.go +++ b/database/influxdb/node.go @@ -1,15 +1,29 @@ package influxdb import ( + "fmt" "strconv" + "time" + client "github.com/influxdata/influxdb/client/v2" models "github.com/influxdata/influxdb/models" "github.com/FreifunkBremen/yanic/runtime" ) -// NodeToInflux Returns tags and fields for InfluxDB -func nodeToInflux(node *runtime.Node) (tags models.Tags, fields models.Fields) { +// InsertNode implementation of database +func (conn *Connection) InsertNode(node *runtime.Node) { + tags, fields := buildNodeStats(node) + conn.addPoint(MeasurementNode, tags, fields, time.Now()) +} + +func (conn *Connection) PruneNodes(deleteAfter time.Duration) { + query := fmt.Sprintf("delete from %s where time < now() - %ds", MeasurementNode, deleteAfter/time.Second) + conn.client.Query(client.NewQuery(query, conn.config.Database(), "m")) +} + +// returns tags and fields for InfluxDB +func buildNodeStats(node *runtime.Node) (tags models.Tags, fields models.Fields) { stats := node.Statistics tags.SetString("nodeid", stats.NodeID) diff --git a/database/influxdb/node_test.go b/database/influxdb/node_test.go index 08dc06c..3fa7f32 100644 --- a/database/influxdb/node_test.go +++ b/database/influxdb/node_test.go @@ -66,7 +66,7 @@ func TestToInflux(t *testing.T) { }, } - tags, fields := nodeToInflux(node) + tags, fields := buildNodeStats(node) assert.Equal("foobar", tags.GetString("nodeid")) assert.Equal("nobody", tags.GetString("owner")) diff --git a/database/influxdb/stats.go b/database/influxdb/stats.go deleted file mode 100644 index 0cc433e..0000000 --- a/database/influxdb/stats.go +++ /dev/null @@ -1,17 +0,0 @@ -package influxdb - -import ( - "github.com/FreifunkBremen/yanic/runtime" -) - -// GlobalStatsFields returns fields for InfluxDB -func GlobalStatsFields(stats *runtime.GlobalStats) map[string]interface{} { - return map[string]interface{}{ - "nodes": stats.Nodes, - "gateways": stats.Gateways, - "clients.total": stats.Clients, - "clients.wifi": stats.ClientsWifi, - "clients.wifi24": stats.ClientsWifi24, - "clients.wifi5": stats.ClientsWifi5, - } -} diff --git a/database/logging/file.go b/database/logging/file.go index 910a4d0..31863b7 100644 --- a/database/logging/file.go +++ b/database/logging/file.go @@ -48,12 +48,12 @@ func Connect(configuration interface{}) (database.Connection, error) { return &Connection{config: config, file: file}, nil } -func (conn *Connection) AddNode(nodeID string, node *runtime.Node) { - conn.log("AddNode: [", nodeID, "] clients: ", node.Statistics.Clients.Total) +func (conn *Connection) InsertNode(node *runtime.Node) { + conn.log("InsertNode: [", node.Statistics.NodeID, "] clients: ", node.Statistics.Clients.Total) } -func (conn *Connection) AddStatistics(stats *runtime.GlobalStats, time time.Time) { - conn.log("AddStatistics: [", time.String(), "] nodes: ", stats.Nodes, ", clients: ", stats.Clients, " models: ", len(stats.Models)) +func (conn *Connection) InsertGlobals(stats *runtime.GlobalStats, time time.Time) { + conn.log("InsertGlobals: [", time.String(), "] nodes: ", stats.Nodes, ", clients: ", stats.Clients, " models: ", len(stats.Models)) } func (conn *Connection) PruneNodes(deleteAfter time.Duration) { diff --git a/respond/collector.go b/respond/collector.go index f6f5935..b5406c1 100644 --- a/respond/collector.go +++ b/respond/collector.go @@ -208,9 +208,10 @@ func (coll *Collector) saveResponse(addr net.UDPAddr, res *data.ResponseData) { node := coll.nodes.Update(nodeID, res) node.Address = addr.IP - // Store statistics in InfluxDB + // Store statistics in database if coll.db != nil && node.Statistics != nil { - coll.db.AddNode(nodeID, node) + node.Statistics.NodeID = nodeID + coll.db.InsertNode(node) } } @@ -251,5 +252,5 @@ func (coll *Collector) globalStatsWorker() { func (coll *Collector) saveGlobalStats() { stats := runtime.NewGlobalStats(coll.nodes) - coll.db.AddStatistics(stats, time.Now()) + coll.db.InsertGlobals(stats, time.Now()) }