198 lines
4.7 KiB
Go
198 lines
4.7 KiB
Go
package influxdb
|
|
|
|
import (
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
"unicode"
|
|
|
|
"github.com/bdlm/log"
|
|
"github.com/influxdata/influxdb1-client/models"
|
|
client "github.com/influxdata/influxdb1-client/v2"
|
|
|
|
"github.com/FreifunkBremen/yanic/database"
|
|
)
|
|
|
|
const (
|
|
MeasurementLink = "link" // Measurement for per-link statistics
|
|
MeasurementNode = "node" // Measurement for per-node statistics
|
|
MeasurementDHCP = "dhcp" // Measurement for DHCP server statistics
|
|
MeasurementGlobal = "global" // Measurement for summarized global statistics
|
|
CounterMeasurementFirmware = "firmware" // Measurement for firmware statistics
|
|
CounterMeasurementModel = "model" // Measurement for model statistics
|
|
CounterMeasurementAutoupdater = "autoupdater" // Measurement for autoupdater
|
|
batchMaxSize = 1000
|
|
batchTimeout = 5 * time.Second
|
|
)
|
|
|
|
type Connection struct {
|
|
database.Connection
|
|
config Config
|
|
client client.Client
|
|
points chan *client.Point
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
type Config map[string]interface{}
|
|
|
|
func (c Config) Address() string {
|
|
return c["address"].(string)
|
|
}
|
|
func (c Config) Database() string {
|
|
return c["database"].(string)
|
|
}
|
|
func (c Config) Username() string {
|
|
return c["username"].(string)
|
|
}
|
|
func (c Config) Password() string {
|
|
return c["password"].(string)
|
|
}
|
|
func (c Config) InsecureSkipVerify() bool {
|
|
if d, ok := c["insecure_skip_verify"]; ok {
|
|
return d.(bool)
|
|
}
|
|
return false
|
|
}
|
|
func (c Config) Tags() map[string]interface{} {
|
|
if c["tags"] != nil {
|
|
return c["tags"].(map[string]interface{})
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func init() {
|
|
database.RegisterAdapter("influxdb", Connect)
|
|
}
|
|
func Connect(configuration map[string]interface{}) (database.Connection, error) {
|
|
config := Config(configuration)
|
|
|
|
// Make client
|
|
c, err := client.NewHTTPClient(client.HTTPConfig{
|
|
Addr: config.Address(),
|
|
Username: config.Username(),
|
|
Password: config.Password(),
|
|
InsecureSkipVerify: config.InsecureSkipVerify(),
|
|
Timeout: batchTimeout,
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
_, _, err = c.Ping(time.Millisecond * 50)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
db := &Connection{
|
|
config: config,
|
|
client: c,
|
|
points: make(chan *client.Point, batchMaxSize),
|
|
}
|
|
|
|
db.wg.Add(1)
|
|
go db.addWorker()
|
|
|
|
return db, nil
|
|
}
|
|
|
|
func sanitizeValues(tags models.Tags) models.Tags {
|
|
// https://docs.influxdata.com/influxdb/v2/reference/syntax/line-protocol/
|
|
// Line protocol does not support the newline character \n in tag or field values.
|
|
// To be safe, remove all non-printable characters and spaces except ASCII space and U+0020.
|
|
for _, tag := range tags {
|
|
cleaned_value := strings.Map(func(r rune) rune {
|
|
if unicode.IsPrint(r) {
|
|
return r
|
|
}
|
|
return ' '
|
|
}, string(tag.Value))
|
|
|
|
if cleaned_value != string(tag.Value) {
|
|
tags.SetString(string(tag.Key), cleaned_value)
|
|
}
|
|
}
|
|
return tags
|
|
}
|
|
|
|
func (conn *Connection) addPoint(name string, tags models.Tags, fields models.Fields, t ...time.Time) {
|
|
if configTags := conn.config.Tags(); configTags != nil {
|
|
for tag, valueInterface := range configTags {
|
|
if value, ok := valueInterface.(string); ok && tags.Get([]byte(tag)) == nil {
|
|
tags.SetString(tag, value)
|
|
} else {
|
|
log.WithFields(map[string]interface{}{
|
|
"name": name,
|
|
"tag": tag,
|
|
}).Warnf("could not save tag configuration on point")
|
|
}
|
|
}
|
|
}
|
|
|
|
tags = sanitizeValues(tags)
|
|
|
|
point, err := client.NewPoint(name, tags.Map(), fields, t...)
|
|
if err != nil {
|
|
log.Panicf("could not save points: %s", err)
|
|
}
|
|
conn.points <- point
|
|
}
|
|
|
|
// Close all connection and clean up
|
|
func (conn *Connection) Close() {
|
|
close(conn.points)
|
|
conn.wg.Wait()
|
|
conn.client.Close()
|
|
}
|
|
|
|
// stores data points in batches into the influxdb
|
|
func (conn *Connection) addWorker() {
|
|
bpConfig := client.BatchPointsConfig{
|
|
Database: conn.config.Database(),
|
|
Precision: "m",
|
|
}
|
|
|
|
var bp client.BatchPoints
|
|
var err error
|
|
var writeNow, closed bool
|
|
timer := time.NewTimer(batchTimeout)
|
|
|
|
for !closed {
|
|
// wait for new points
|
|
select {
|
|
case point, ok := <-conn.points:
|
|
if ok {
|
|
if bp == nil {
|
|
// create new batch
|
|
timer.Reset(batchTimeout)
|
|
if bp, err = client.NewBatchPoints(bpConfig); err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
}
|
|
bp.AddPoint(point)
|
|
} else {
|
|
closed = true
|
|
}
|
|
case <-timer.C:
|
|
if bp == nil {
|
|
timer.Reset(batchTimeout)
|
|
} else {
|
|
writeNow = true
|
|
}
|
|
}
|
|
|
|
// write batch now?
|
|
if bp != nil && (writeNow || closed || len(bp.Points()) >= batchMaxSize) {
|
|
log.WithField("count", len(bp.Points())).Info("saving points")
|
|
|
|
if err = conn.client.Write(bp); err != nil {
|
|
log.Error(err)
|
|
}
|
|
writeNow = false
|
|
bp = nil
|
|
}
|
|
}
|
|
timer.Stop()
|
|
conn.wg.Done()
|
|
}
|