yanic/database/influxdb/database.go

183 lines
4.2 KiB
Go

package influxdb
import (
"fmt"
"log"
"sync"
"time"
"github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/influxdb/models"
"github.com/FreifunkBremen/yanic/database"
"github.com/FreifunkBremen/yanic/runtime"
)
const (
MeasurementNode = "node" // Measurement for per-node statistics
MeasurementGlobal = "global" // Measurement for summarized global statistics
CounterMeasurementFirmware = "firmware" // Measurement for firmware statistics
CounterMeasurementModel = "model" // Measurement for model statistics
batchMaxSize = 500
batchTimeout = 5 * time.Second
)
type Connection struct {
database.Connection
config Config
client client.Client
points chan *client.Point
wg sync.WaitGroup
}
type Config map[string]interface{}
func (c Config) Enable() bool {
return c["enable"].(bool)
}
func (c Config) Address() string {
return c["address"].(string)
}
func (c Config) Database() string {
return c["database"].(string)
}
func (c Config) Username() string {
return c["username"].(string)
}
func (c Config) Password() string {
return c["password"].(string)
}
func init() {
database.AddDatabaseType("influxdb", Connect)
}
func Connect(configuration interface{}) (database.Connection, error) {
var config Config
config = configuration.(map[string]interface{})
if !config.Enable() {
return nil, nil
}
// Make client
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: config.Address(),
Username: config.Username(),
Password: config.Password(),
})
if err != nil {
return nil, err
}
db := &Connection{
config: config,
client: c,
points: make(chan *client.Point, 1000),
}
db.wg.Add(1)
go db.addWorker()
return db, nil
}
func (conn *Connection) DeleteNode(deleteAfter time.Duration) {
query := fmt.Sprintf("delete from %s where time < now() - %ds", MeasurementNode, deleteAfter/time.Second)
conn.client.Query(client.NewQuery(query, conn.config.Database(), "m"))
}
func (conn *Connection) addPoint(name string, tags models.Tags, fields models.Fields, time time.Time) {
point, err := client.NewPoint(name, tags.Map(), fields, time)
if err != nil {
panic(err)
}
conn.points <- point
}
// Saves the values of a CounterMap in the database.
// The key are used as 'value' tag.
// The value is used as 'counter' field.
func (conn *Connection) addCounterMap(name string, m runtime.CounterMap) {
now := time.Now()
for key, count := range m {
conn.addPoint(
name,
models.Tags{
models.Tag{Key: []byte("value"), Value: []byte(key)},
},
models.Fields{"count": count},
now,
)
}
}
// AddStatistics implementation of database
func (conn *Connection) AddStatistics(stats *runtime.GlobalStats, time time.Time) {
conn.addPoint(MeasurementGlobal, nil, GlobalStatsFields(stats), time)
conn.addCounterMap(CounterMeasurementModel, stats.Models)
conn.addCounterMap(CounterMeasurementFirmware, stats.Firmwares)
}
// AddNode implementation of database
func (conn *Connection) AddNode(nodeID string, node *runtime.Node) {
tags, fields := nodeToInflux(node)
conn.addPoint(MeasurementNode, tags, fields, time.Now())
}
// Close all connection and clean up
func (conn *Connection) Close() {
close(conn.points)
conn.wg.Wait()
conn.client.Close()
}
// stores data points in batches into the influxdb
func (conn *Connection) addWorker() {
bpConfig := client.BatchPointsConfig{
Database: conn.config.Database(),
Precision: "m",
}
var bp client.BatchPoints
var err error
var writeNow, closed bool
timer := time.NewTimer(batchTimeout)
for !closed {
// wait for new points
select {
case point, ok := <-conn.points:
if ok {
if bp == nil {
// create new batch
timer.Reset(batchTimeout)
if bp, err = client.NewBatchPoints(bpConfig); err != nil {
log.Fatal(err)
}
}
bp.AddPoint(point)
} else {
closed = true
}
case <-timer.C:
if bp == nil {
timer.Reset(batchTimeout)
} else {
writeNow = true
}
}
// write batch now?
if bp != nil && (writeNow || closed || len(bp.Points()) >= batchMaxSize) {
log.Println("saving", len(bp.Points()), "points")
if err = conn.client.Write(bp); err != nil {
log.Fatal(err)
}
writeNow = false
bp = nil
}
}
timer.Stop()
conn.wg.Done()
}