Store statistics to influxdb

This commit is contained in:
Julian Kornberger 2016-03-12 03:36:02 +01:00
parent 037fe1de05
commit e9a9b2cf54
4 changed files with 152 additions and 0 deletions

View File

@ -18,3 +18,8 @@ nodes:
vpn_addresses: vpn_addresses:
- da:25:d6:5c:97:6f - da:25:d6:5c:97:6f
- da:62:f2:70:c8:8d - da:62:f2:70:c8:8d
influxdb:
enable: false
database: ffhb
username:
password:

11
main.go
View File

@ -21,6 +21,7 @@ var (
config *models.Config config *models.Config
wsserverForNodes *websocketserver.Server wsserverForNodes *websocketserver.Server
multiCollector *respond.MultiCollector multiCollector *respond.MultiCollector
statsDb *StatsDb
nodes = models.NewNodes() nodes = models.NewNodes()
//aliases = models.NewNodes() //aliases = models.NewNodes()
) )
@ -48,6 +49,10 @@ func main() {
http.Handle("/", http.FileServer(http.Dir(config.Webserver.Webroot))) http.Handle("/", http.FileServer(http.Dir(config.Webserver.Webroot)))
} }
if config.Influxdb.Enable {
statsDb = NewStatsDb()
}
if config.Respondd.Enable { if config.Respondd.Enable {
multiCollector = respond.NewMultiCollector(collectInterval, func(addr net.UDPAddr, msg interface{}) { multiCollector = respond.NewMultiCollector(collectInterval, func(addr net.UDPAddr, msg interface{}) {
switch msg := msg.(type) { switch msg := msg.(type) {
@ -57,6 +62,9 @@ func main() {
nodes.Get(msg.NodeId).Neighbours = msg nodes.Get(msg.NodeId).Neighbours = msg
case *data.StatisticsStruct: case *data.StatisticsStruct:
nodes.Get(msg.NodeId).Statistics = msg nodes.Get(msg.NodeId).Statistics = msg
if statsDb != nil {
statsDb.Add(msg)
}
default: default:
log.Println("unknown message:", msg) log.Println("unknown message:", msg)
} }
@ -80,4 +88,7 @@ func main() {
if multiCollector != nil { if multiCollector != nil {
multiCollector.Close() multiCollector.Close()
} }
if statsDb != nil {
statsDb.Close()
}
} }

View File

@ -32,6 +32,12 @@ type Config struct {
SaveInterval int `yaml:"saveinterval"` SaveInterval int `yaml:"saveinterval"`
VpnAddresses []string `yaml:"vpn_addresses"` VpnAddresses []string `yaml:"vpn_addresses"`
} `yaml:"nodes"` } `yaml:"nodes"`
Influxdb struct {
Enable bool `yaml:"enable"`
Database string `yaml:"database"`
Username string `yaml:"username"`
Password string `yaml:"password"`
}
} }
//ConfigReadFile reads a Config models by path to a yml file //ConfigReadFile reads a Config models by path to a yml file

130
stats_db.go Normal file
View File

@ -0,0 +1,130 @@
package main
import (
"log"
"sync"
"time"
"github.com/ffdo/node-informant/gluon-collector/data"
"github.com/influxdata/influxdb/client/v2"
)
const (
saveInterval = time.Second * 5
)
type StatsDb struct {
points chan *client.Point
wg sync.WaitGroup
client client.Client
}
func NewStatsDb() *StatsDb {
// Make client
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: "http://localhost:8086",
Username: config.Influxdb.Username,
Password: config.Influxdb.Password,
})
if err != nil {
panic(err)
}
db := &StatsDb{
client: c,
points: make(chan *client.Point, 500),
}
// start worker
db.wg.Add(1)
go db.worker()
return db
}
func (c *StatsDb) Add(stats *data.StatisticsStruct) {
tags := map[string]string{
"nodeid": stats.NodeId,
}
fields := map[string]interface{}{
"load": stats.LoadAverage,
"processes.running": stats.Processes.Running,
"clients.wifi": stats.Clients.Wifi,
"clients.total": stats.Clients.Total,
"traffic.forward": stats.Traffic.Forward,
"traffic.rx": stats.Traffic.Rx,
"traffic.tx": stats.Traffic.Tx,
"traffic.mgmt.rx": stats.Traffic.MgmtRx,
"traffic.mgmt.tx": stats.Traffic.MgmtTx,
"traffic.memory.buffers": stats.Memory.Buffers,
"traffic.memory.cached": stats.Memory.Cached,
"traffic.memory.free": stats.Memory.Free,
"traffic.memory.total": stats.Memory.Total,
}
point, err := client.NewPoint("node", tags, fields, time.Now())
if err != nil {
panic(err)
}
c.points <- point
}
func (c *StatsDb) Close() {
close(c.points)
c.wg.Wait()
c.client.Close()
}
func (c *StatsDb) worker() {
lastSent := time.Now()
bpConfig := client.BatchPointsConfig{
Database: config.Influxdb.Database,
Precision: "m",
}
var bp client.BatchPoints
var err error
var abort bool
var dirty bool
for {
// create new batch points?
if bp == nil {
if bp, err = client.NewBatchPoints(bpConfig); err != nil {
panic(err)
}
}
// wait for new points
select {
case point, ok := <-c.points:
if ok {
bp.AddPoint(point)
dirty = true
} else {
abort = true
}
case <-time.After(time.Second):
// nothing
}
// write now?
if dirty && (abort || lastSent.Add(saveInterval).Before(time.Now())) {
log.Println("saving", len(bp.Points()), "points")
if err := c.client.Write(bp); err != nil {
panic(err)
}
lastSent = time.Now()
dirty = false
bp = nil
}
if abort {
break
}
}
c.wg.Done()
}