Rename and move database methods

This commit is contained in:
Julian Kornberger 2017-04-18 01:48:38 +02:00
parent c47214fc49
commit 8b8b1441ba
11 changed files with 79 additions and 73 deletions

View File

@ -80,7 +80,7 @@ func main() {
func importRRD(path string) { func importRRD(path string) {
log.Println("importing RRD from", path) log.Println("importing RRD from", path)
for ds := range rrd.Read(path) { for ds := range rrd.Read(path) {
connections.AddStatistics( connections.InsertGlobals(
&runtime.GlobalStats{ &runtime.GlobalStats{
Nodes: uint32(ds.Nodes), Nodes: uint32(ds.Nodes),
Clients: uint32(ds.Clients), Clients: uint32(ds.Clients),

View File

@ -30,15 +30,16 @@ func Connect(configuration interface{}) (database.Connection, error) {
} }
return &Connection{list: list}, nil return &Connection{list: list}, nil
} }
func (conn *Connection) AddNode(nodeID string, node *runtime.Node) {
func (conn *Connection) InsertNode(node *runtime.Node) {
for _, item := range conn.list { for _, item := range conn.list {
item.AddNode(nodeID, node) item.InsertNode(node)
} }
} }
func (conn *Connection) AddStatistics(stats *runtime.GlobalStats, time time.Time) { func (conn *Connection) InsertGlobals(stats *runtime.GlobalStats, time time.Time) {
for _, item := range conn.list { for _, item := range conn.list {
item.AddStatistics(stats, time) item.InsertGlobals(stats, time)
} }
} }

View File

@ -8,11 +8,11 @@ import (
// Connection interface to use for implementation in e.g. influxdb // Connection interface to use for implementation in e.g. influxdb
type Connection interface { type Connection interface {
// AddNode stores data of a single node // InsertNode stores statistics per node
AddNode(nodeID string, node *runtime.Node) InsertNode(node *runtime.Node)
// AddStatistics stores global statistics // InsertGlobals stores global statistics
AddStatistics(stats *runtime.GlobalStats, time time.Time) InsertGlobals(stats *runtime.GlobalStats, time time.Time)
// PruneNodes prunes historical per-node data // PruneNodes prunes historical per-node data
PruneNodes(deleteAfter time.Duration) PruneNodes(deleteAfter time.Duration)

View File

@ -1,7 +1,6 @@
package influxdb package influxdb
import ( import (
"fmt"
"log" "log"
"sync" "sync"
"time" "time"
@ -10,7 +9,6 @@ import (
"github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/models"
"github.com/FreifunkBremen/yanic/database" "github.com/FreifunkBremen/yanic/database"
"github.com/FreifunkBremen/yanic/runtime"
) )
const ( const (
@ -80,11 +78,6 @@ func Connect(configuration interface{}) (database.Connection, error) {
return db, nil return db, nil
} }
func (conn *Connection) PruneNodes(deleteAfter time.Duration) {
query := fmt.Sprintf("delete from %s where time < now() - %ds", MeasurementNode, deleteAfter/time.Second)
conn.client.Query(client.NewQuery(query, conn.config.Database(), "m"))
}
func (conn *Connection) addPoint(name string, tags models.Tags, fields models.Fields, time time.Time) { func (conn *Connection) addPoint(name string, tags models.Tags, fields models.Fields, time time.Time) {
point, err := client.NewPoint(name, tags.Map(), fields, time) point, err := client.NewPoint(name, tags.Map(), fields, time)
if err != nil { if err != nil {
@ -93,36 +86,6 @@ func (conn *Connection) addPoint(name string, tags models.Tags, fields models.Fi
conn.points <- point conn.points <- point
} }
// Saves the values of a CounterMap in the database.
// The key are used as 'value' tag.
// The value is used as 'counter' field.
func (conn *Connection) addCounterMap(name string, m runtime.CounterMap) {
now := time.Now()
for key, count := range m {
conn.addPoint(
name,
models.Tags{
models.Tag{Key: []byte("value"), Value: []byte(key)},
},
models.Fields{"count": count},
now,
)
}
}
// AddStatistics implementation of database
func (conn *Connection) AddStatistics(stats *runtime.GlobalStats, time time.Time) {
conn.addPoint(MeasurementGlobal, nil, GlobalStatsFields(stats), time)
conn.addCounterMap(CounterMeasurementModel, stats.Models)
conn.addCounterMap(CounterMeasurementFirmware, stats.Firmwares)
}
// AddNode implementation of database
func (conn *Connection) AddNode(nodeID string, node *runtime.Node) {
tags, fields := nodeToInflux(node)
conn.addPoint(MeasurementNode, tags, fields, time.Now())
}
// Close all connection and clean up // Close all connection and clean up
func (conn *Connection) Close() { func (conn *Connection) Close() {
close(conn.points) close(conn.points)

View File

@ -0,0 +1,44 @@
package influxdb
import (
"time"
"github.com/FreifunkBremen/yanic/runtime"
"github.com/influxdata/influxdb/models"
)
// InsertGlobals implementation of database
func (conn *Connection) InsertGlobals(stats *runtime.GlobalStats, time time.Time) {
conn.addPoint(MeasurementGlobal, nil, GlobalStatsFields(stats), time)
conn.addCounterMap(CounterMeasurementModel, stats.Models)
conn.addCounterMap(CounterMeasurementFirmware, stats.Firmwares)
}
// GlobalStatsFields returns fields for InfluxDB
func GlobalStatsFields(stats *runtime.GlobalStats) map[string]interface{} {
return map[string]interface{}{
"nodes": stats.Nodes,
"gateways": stats.Gateways,
"clients.total": stats.Clients,
"clients.wifi": stats.ClientsWifi,
"clients.wifi24": stats.ClientsWifi24,
"clients.wifi5": stats.ClientsWifi5,
}
}
// Saves the values of a CounterMap in the database.
// The key are used as 'value' tag.
// The value is used as 'counter' field.
func (conn *Connection) addCounterMap(name string, m runtime.CounterMap) {
now := time.Now()
for key, count := range m {
conn.addPoint(
name,
models.Tags{
models.Tag{Key: []byte("value"), Value: []byte(key)},
},
models.Fields{"count": count},
now,
)
}
}

View File

@ -1,15 +1,29 @@
package influxdb package influxdb
import ( import (
"fmt"
"strconv" "strconv"
"time"
client "github.com/influxdata/influxdb/client/v2"
models "github.com/influxdata/influxdb/models" models "github.com/influxdata/influxdb/models"
"github.com/FreifunkBremen/yanic/runtime" "github.com/FreifunkBremen/yanic/runtime"
) )
// NodeToInflux Returns tags and fields for InfluxDB // InsertNode implementation of database
func nodeToInflux(node *runtime.Node) (tags models.Tags, fields models.Fields) { func (conn *Connection) InsertNode(node *runtime.Node) {
tags, fields := buildNodeStats(node)
conn.addPoint(MeasurementNode, tags, fields, time.Now())
}
func (conn *Connection) PruneNodes(deleteAfter time.Duration) {
query := fmt.Sprintf("delete from %s where time < now() - %ds", MeasurementNode, deleteAfter/time.Second)
conn.client.Query(client.NewQuery(query, conn.config.Database(), "m"))
}
// returns tags and fields for InfluxDB
func buildNodeStats(node *runtime.Node) (tags models.Tags, fields models.Fields) {
stats := node.Statistics stats := node.Statistics
tags.SetString("nodeid", stats.NodeID) tags.SetString("nodeid", stats.NodeID)

View File

@ -66,7 +66,7 @@ func TestToInflux(t *testing.T) {
}, },
} }
tags, fields := nodeToInflux(node) tags, fields := buildNodeStats(node)
assert.Equal("foobar", tags.GetString("nodeid")) assert.Equal("foobar", tags.GetString("nodeid"))
assert.Equal("nobody", tags.GetString("owner")) assert.Equal("nobody", tags.GetString("owner"))

View File

@ -1,17 +0,0 @@
package influxdb
import (
"github.com/FreifunkBremen/yanic/runtime"
)
// GlobalStatsFields returns fields for InfluxDB
func GlobalStatsFields(stats *runtime.GlobalStats) map[string]interface{} {
return map[string]interface{}{
"nodes": stats.Nodes,
"gateways": stats.Gateways,
"clients.total": stats.Clients,
"clients.wifi": stats.ClientsWifi,
"clients.wifi24": stats.ClientsWifi24,
"clients.wifi5": stats.ClientsWifi5,
}
}

View File

@ -48,12 +48,12 @@ func Connect(configuration interface{}) (database.Connection, error) {
return &Connection{config: config, file: file}, nil return &Connection{config: config, file: file}, nil
} }
func (conn *Connection) AddNode(nodeID string, node *runtime.Node) { func (conn *Connection) InsertNode(node *runtime.Node) {
conn.log("AddNode: [", nodeID, "] clients: ", node.Statistics.Clients.Total) conn.log("InsertNode: [", node.Statistics.NodeID, "] clients: ", node.Statistics.Clients.Total)
} }
func (conn *Connection) AddStatistics(stats *runtime.GlobalStats, time time.Time) { func (conn *Connection) InsertGlobals(stats *runtime.GlobalStats, time time.Time) {
conn.log("AddStatistics: [", time.String(), "] nodes: ", stats.Nodes, ", clients: ", stats.Clients, " models: ", len(stats.Models)) conn.log("InsertGlobals: [", time.String(), "] nodes: ", stats.Nodes, ", clients: ", stats.Clients, " models: ", len(stats.Models))
} }
func (conn *Connection) PruneNodes(deleteAfter time.Duration) { func (conn *Connection) PruneNodes(deleteAfter time.Duration) {

View File

@ -208,9 +208,10 @@ func (coll *Collector) saveResponse(addr net.UDPAddr, res *data.ResponseData) {
node := coll.nodes.Update(nodeID, res) node := coll.nodes.Update(nodeID, res)
node.Address = addr.IP node.Address = addr.IP
// Store statistics in InfluxDB // Store statistics in database
if coll.db != nil && node.Statistics != nil { if coll.db != nil && node.Statistics != nil {
coll.db.AddNode(nodeID, node) node.Statistics.NodeID = nodeID
coll.db.InsertNode(node)
} }
} }
@ -251,5 +252,5 @@ func (coll *Collector) globalStatsWorker() {
func (coll *Collector) saveGlobalStats() { func (coll *Collector) saveGlobalStats() {
stats := runtime.NewGlobalStats(coll.nodes) stats := runtime.NewGlobalStats(coll.nodes)
coll.db.AddStatistics(stats, time.Now()) coll.db.InsertGlobals(stats, time.Now())
} }