[TASK] add job tag to influxdb database type (#44)

This commit is contained in:
kb-light 2017-06-01 18:17:32 +02:00 committed by Geno
parent dc24c8b250
commit 214a03866e
3 changed files with 81 additions and 0 deletions

View File

@ -67,6 +67,11 @@ address = "http://localhost:8086"
database = "ffhb" database = "ffhb"
username = "" username = ""
password = "" password = ""
# tagging of the data are optional
# be carefull tags by system would overright config
[database.connection.influxdb.tags]
site = "ffhb01"
system = "testing"
[[database.connection.logging]] [[database.connection.logging]]
enable = false enable = false

View File

@ -45,6 +45,12 @@ func (c Config) Username() string {
func (c Config) Password() string { func (c Config) Password() string {
return c["password"].(string) return c["password"].(string)
} }
func (c Config) Tags() map[string]interface{} {
if c["tags"] != nil {
return c["tags"].(map[string]interface{})
}
return nil
}
func init() { func init() {
database.RegisterAdapter("influxdb", Connect) database.RegisterAdapter("influxdb", Connect)
@ -79,6 +85,15 @@ func Connect(configuration interface{}) (database.Connection, error) {
} }
func (conn *Connection) addPoint(name string, tags models.Tags, fields models.Fields, t ...time.Time) { func (conn *Connection) addPoint(name string, tags models.Tags, fields models.Fields, t ...time.Time) {
if configTags := conn.config.Tags(); configTags != nil {
for tag, valueInterface := range configTags {
if value, ok := valueInterface.(string); ok && tags.Get([]byte(tag)) == nil {
tags.SetString(tag, value)
} else {
log.Println(name, "could not saved configured value of tag", tag)
}
}
}
point, err := client.NewPoint(name, tags.Map(), fields, t...) point, err := client.NewPoint(name, tags.Map(), fields, t...)
if err != nil { if err != nil {
panic(err) panic(err)

View File

@ -0,0 +1,61 @@
package influxdb
import (
"testing"
"time"
"github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/influxdb/models"
"github.com/stretchr/testify/assert"
)
func TestAddPoint(t *testing.T) {
assert := assert.New(t)
// Test add Point without tags
connection := &Connection{
config: map[string]interface{}{},
points: make(chan *client.Point, 1),
}
connection.addPoint("name", models.Tags{}, models.Fields{"clients.total": 10}, time.Now())
point := <-connection.points
assert.NotNil(point)
tags := point.Tags()
assert.NotNil(tags)
assert.NotEqual(tags["testtag2"], "value")
// Test add Point with tags
connection.config["tags"] = map[string]interface{}{
"testtag": "value",
}
connection.addPoint("name", models.Tags{}, models.Fields{"clients.total": 10}, time.Now())
point = <-connection.points
assert.NotNil(point)
tags = point.Tags()
assert.NotNil(tags)
assert.Equal(tags["testtag"], "value")
assert.NotEqual(tags["testtag2"], "value")
// Tried to overright by config
connection.config["tags"] = map[string]interface{}{
"nodeid": "value",
}
tagsOrigin := models.Tags{}
tagsOrigin.SetString("nodeid", "collected")
connection.addPoint("name", tagsOrigin, models.Fields{"clients.total": 10}, time.Now())
point = <-connection.points
assert.NotNil(point)
tags = point.Tags()
assert.NotNil(tags)
assert.Equal(tags["nodeid"], "collected")
// Test panic if it was not possible to create a point
assert.Panics(func() {
connection.addPoint("name", models.Tags{}, nil, time.Now())
})
}