From e9a9b2cf548301dc73b2f73304f1bdb7473df4f6 Mon Sep 17 00:00:00 2001 From: Julian Kornberger Date: Sat, 12 Mar 2016 03:36:02 +0100 Subject: [PATCH] Store statistics to influxdb --- config_example.yaml | 5 ++ main.go | 11 ++++ models/config.go | 6 ++ stats_db.go | 130 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 152 insertions(+) create mode 100644 stats_db.go diff --git a/config_example.yaml b/config_example.yaml index 11ef097..8db21be 100644 --- a/config_example.yaml +++ b/config_example.yaml @@ -18,3 +18,8 @@ nodes: vpn_addresses: - da:25:d6:5c:97:6f - da:62:f2:70:c8:8d +influxdb: + enable: false + database: ffhb + username: + password: diff --git a/main.go b/main.go index 4cdca87..5672903 100644 --- a/main.go +++ b/main.go @@ -21,6 +21,7 @@ var ( config *models.Config wsserverForNodes *websocketserver.Server multiCollector *respond.MultiCollector + statsDb *StatsDb nodes = models.NewNodes() //aliases = models.NewNodes() ) @@ -48,6 +49,10 @@ func main() { http.Handle("/", http.FileServer(http.Dir(config.Webserver.Webroot))) } + if config.Influxdb.Enable { + statsDb = NewStatsDb() + } + if config.Respondd.Enable { multiCollector = respond.NewMultiCollector(collectInterval, func(addr net.UDPAddr, msg interface{}) { switch msg := msg.(type) { @@ -57,6 +62,9 @@ func main() { nodes.Get(msg.NodeId).Neighbours = msg case *data.StatisticsStruct: nodes.Get(msg.NodeId).Statistics = msg + if statsDb != nil { + statsDb.Add(msg) + } default: log.Println("unknown message:", msg) } @@ -80,4 +88,7 @@ func main() { if multiCollector != nil { multiCollector.Close() } + if statsDb != nil { + statsDb.Close() + } } diff --git a/models/config.go b/models/config.go index d05100c..0f35088 100644 --- a/models/config.go +++ b/models/config.go @@ -32,6 +32,12 @@ type Config struct { SaveInterval int `yaml:"saveinterval"` VpnAddresses []string `yaml:"vpn_addresses"` } `yaml:"nodes"` + Influxdb struct { + Enable bool `yaml:"enable"` + Database string `yaml:"database"` + Username string `yaml:"username"` + Password string `yaml:"password"` + } } //ConfigReadFile reads a Config models by path to a yml file diff --git a/stats_db.go b/stats_db.go new file mode 100644 index 0000000..b998ba2 --- /dev/null +++ b/stats_db.go @@ -0,0 +1,130 @@ +package main + +import ( + "log" + "sync" + "time" + + "github.com/ffdo/node-informant/gluon-collector/data" + "github.com/influxdata/influxdb/client/v2" +) + +const ( + saveInterval = time.Second * 5 +) + +type StatsDb struct { + points chan *client.Point + wg sync.WaitGroup + client client.Client +} + +func NewStatsDb() *StatsDb { + // Make client + c, err := client.NewHTTPClient(client.HTTPConfig{ + Addr: "http://localhost:8086", + Username: config.Influxdb.Username, + Password: config.Influxdb.Password, + }) + + if err != nil { + panic(err) + } + + db := &StatsDb{ + client: c, + points: make(chan *client.Point, 500), + } + + // start worker + db.wg.Add(1) + go db.worker() + + return db +} + +func (c *StatsDb) Add(stats *data.StatisticsStruct) { + tags := map[string]string{ + "nodeid": stats.NodeId, + } + fields := map[string]interface{}{ + "load": stats.LoadAverage, + "processes.running": stats.Processes.Running, + "clients.wifi": stats.Clients.Wifi, + "clients.total": stats.Clients.Total, + "traffic.forward": stats.Traffic.Forward, + "traffic.rx": stats.Traffic.Rx, + "traffic.tx": stats.Traffic.Tx, + "traffic.mgmt.rx": stats.Traffic.MgmtRx, + "traffic.mgmt.tx": stats.Traffic.MgmtTx, + "traffic.memory.buffers": stats.Memory.Buffers, + "traffic.memory.cached": stats.Memory.Cached, + "traffic.memory.free": stats.Memory.Free, + "traffic.memory.total": stats.Memory.Total, + } + + point, err := client.NewPoint("node", tags, fields, time.Now()) + if err != nil { + panic(err) + } + c.points <- point +} + +func (c *StatsDb) Close() { + close(c.points) + c.wg.Wait() + c.client.Close() +} + +func (c *StatsDb) worker() { + lastSent := time.Now() + bpConfig := client.BatchPointsConfig{ + Database: config.Influxdb.Database, + Precision: "m", + } + + var bp client.BatchPoints + var err error + var abort bool + var dirty bool + + for { + // create new batch points? + if bp == nil { + if bp, err = client.NewBatchPoints(bpConfig); err != nil { + panic(err) + } + } + + // wait for new points + select { + case point, ok := <-c.points: + if ok { + bp.AddPoint(point) + dirty = true + } else { + abort = true + } + case <-time.After(time.Second): + // nothing + } + + // write now? + if dirty && (abort || lastSent.Add(saveInterval).Before(time.Now())) { + log.Println("saving", len(bp.Points()), "points") + + if err := c.client.Write(bp); err != nil { + panic(err) + } + lastSent = time.Now() + dirty = false + bp = nil + } + + if abort { + break + } + } + + c.wg.Done() +}