yanic/database/database.go

145 lines
3.1 KiB
Go
Raw Normal View History

2016-10-03 19:55:37 +02:00
package database
import (
2016-11-26 13:11:21 +01:00
"fmt"
2016-10-03 19:55:37 +02:00
"log"
"sync"
"time"
"github.com/FreifunkBremen/respond-collector/models"
"github.com/influxdata/influxdb/client/v2"
imodels "github.com/influxdata/influxdb/models"
)
const (
MeasurementNode = "node" // Measurement for per-node statistics
MeasurementGlobal = "global" // Measurement for summarized global statistics
batchMaxSize = 500
)
type DB struct {
config *models.Config
client client.Client
points chan *client.Point
wg sync.WaitGroup
2016-11-26 13:11:21 +01:00
quit chan struct{}
2016-10-03 19:55:37 +02:00
}
func New(config *models.Config) *DB {
// Make client
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: config.Influxdb.Addr,
Username: config.Influxdb.Username,
Password: config.Influxdb.Password,
})
if err != nil {
panic(err)
}
db := &DB{
config: config,
client: c,
points: make(chan *client.Point, 1000),
2016-11-26 13:11:21 +01:00
quit: make(chan struct{}),
2016-10-03 19:55:37 +02:00
}
db.wg.Add(1)
2016-11-26 13:11:21 +01:00
go db.addWorker()
go db.deleteWorker()
2016-10-03 19:55:37 +02:00
return db
}
2016-11-26 13:11:21 +01:00
func (db *DB) DeletePoints() {
query := fmt.Sprintf("delete from %s where time < now() - %dm", MeasurementNode, db.config.Influxdb.DeleteTill)
2016-11-29 00:15:32 +01:00
log.Println("delete", MeasurementNode, "older than", db.config.Influxdb.DeleteTill, "minutes")
2016-11-26 13:11:21 +01:00
db.client.Query(client.NewQuery(query, db.config.Influxdb.Database, "m"))
}
2016-10-03 19:55:37 +02:00
func (db *DB) AddPoint(name string, tags imodels.Tags, fields imodels.Fields, time time.Time) {
point, err := client.NewPoint(name, tags.Map(), fields, time)
if err != nil {
panic(err)
}
db.points <- point
}
// Add data for a single node
func (db *DB) Add(nodeId string, node *models.Node) {
tags, fields := node.ToInflux()
db.AddPoint(MeasurementNode, tags, fields, time.Now())
}
func (db *DB) Close() {
2016-11-26 13:11:21 +01:00
close(db.quit)
2016-10-03 19:55:37 +02:00
close(db.points)
db.wg.Wait()
db.client.Close()
}
2016-11-26 13:11:21 +01:00
func (db *DB) deleteWorker() {
duration := time.Minute * time.Duration(db.config.Influxdb.DeleteInterval)
ticker := time.NewTicker(duration)
for {
select {
case <-ticker.C:
db.DeletePoints()
case <-db.quit:
ticker.Stop()
return
}
}
}
2016-10-03 19:55:37 +02:00
// stores data points in batches into the influxdb
2016-11-26 13:11:21 +01:00
func (db *DB) addWorker() {
2016-10-03 19:55:37 +02:00
bpConfig := client.BatchPointsConfig{
Database: db.config.Influxdb.Database,
Precision: "m",
}
var bp client.BatchPoints
var err error
var writeNow, closed bool
2016-11-26 13:11:21 +01:00
batchDuration := time.Second * time.Duration(db.config.Influxdb.SaveInterval)
2016-10-03 19:55:37 +02:00
timer := time.NewTimer(batchDuration)
for !closed {
// wait for new points
select {
case point, ok := <-db.points:
if ok {
if bp == nil {
// create new batch
timer.Reset(batchDuration)
if bp, err = client.NewBatchPoints(bpConfig); err != nil {
log.Fatal(err)
}
}
bp.AddPoint(point)
} else {
closed = true
}
case <-timer.C:
if bp == nil {
timer.Reset(batchDuration)
} else {
writeNow = true
}
}
// write batch now?
2016-10-04 14:54:19 +02:00
if bp != nil && (writeNow || closed || len(bp.Points()) >= batchMaxSize) {
2016-10-03 19:55:37 +02:00
log.Println("saving", len(bp.Points()), "points")
if err = db.client.Write(bp); err != nil {
log.Fatal(err)
}
writeNow = false
bp = nil
}
}
timer.Stop()
db.wg.Done()
}