r2p_graph/influxdb.go

116 lines
2.2 KiB
Go
Raw Normal View History

2018-09-29 00:38:54 +02:00
package main
import (
"sync"
"time"
log "github.com/Sirupsen/logrus"
"github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/influxdb/models"
)
const (
batchMaxSize = 1000
batchTimeout = 5 * time.Second
)
type InfluxDBConnection struct {
dbname string
client client.Client
points chan *client.Point
wg sync.WaitGroup
}
func InfluxDBConnect(config client.HTTPConfig, dbname string) (*InfluxDBConnection, error) {
// Make client
c, err := client.NewHTTPClient(config)
if err != nil {
return nil, err
}
_, _, err = c.Ping(time.Millisecond * 50)
if err != nil {
return nil, err
}
db := &InfluxDBConnection{
dbname: dbname,
client: c,
points: make(chan *client.Point, batchMaxSize),
}
db.wg.Add(1)
go db.addWorker()
return db, nil
}
func (conn *InfluxDBConnection) addPoint(name string, tags models.Tags, fields models.Fields) {
point, err := client.NewPoint(name, tags.Map(), fields, time.Now())
if err != nil {
panic(err)
}
conn.points <- point
}
// Close all connection and clean up
func (conn *InfluxDBConnection) Close() {
close(conn.points)
conn.wg.Wait()
conn.client.Close()
}
// stores data points in batches into the influxdb
func (conn *InfluxDBConnection) addWorker() {
bpConfig := client.BatchPointsConfig{
Database: conn.dbname,
Precision: "m",
}
var bp client.BatchPoints
var err error
var writeNow, closed bool
timer := time.NewTimer(batchTimeout)
for !closed {
// wait for new points
select {
case point, ok := <-conn.points:
if ok {
if bp == nil {
// create new batch
timer.Reset(batchTimeout)
if bp, err = client.NewBatchPoints(bpConfig); err != nil {
log.Fatal(err)
}
}
bp.AddPoint(point)
} else {
closed = true
}
case <-timer.C:
if bp == nil {
timer.Reset(batchTimeout)
} else {
writeNow = true
}
}
// write batch now?
if bp != nil && (writeNow || closed || len(bp.Points()) >= batchMaxSize) {
log.Infof("saving %d points", len(bp.Points()))
if err = conn.client.Write(bp); err != nil {
log.Errorf("influxdb write error: %s", err.Error())
}
writeNow = false
bp = nil
}
}
timer.Stop()
conn.wg.Done()
}