Merge pull request #10 from FreifunkBremen/purge-influxdb

want to resolve #8:  Purge node specific statistics after several days
This commit is contained in:
Geno 2016-11-29 16:08:07 +01:00 committed by GitHub
commit 40fb10eb4e
2 changed files with 35 additions and 8 deletions

View File

@ -1,6 +1,7 @@
package database package database
import ( import (
"fmt"
"log" "log"
"sync" "sync"
"time" "time"
@ -13,7 +14,6 @@ import (
const ( const (
MeasurementNode = "node" // Measurement for per-node statistics MeasurementNode = "node" // Measurement for per-node statistics
MeasurementGlobal = "global" // Measurement for summarized global statistics MeasurementGlobal = "global" // Measurement for summarized global statistics
batchDuration = time.Second * 5
batchMaxSize = 500 batchMaxSize = 500
) )
@ -22,6 +22,7 @@ type DB struct {
client client.Client client client.Client
points chan *client.Point points chan *client.Point
wg sync.WaitGroup wg sync.WaitGroup
quit chan struct{}
} }
func New(config *models.Config) *DB { func New(config *models.Config) *DB {
@ -40,14 +41,22 @@ func New(config *models.Config) *DB {
config: config, config: config,
client: c, client: c,
points: make(chan *client.Point, 1000), points: make(chan *client.Point, 1000),
quit: make(chan struct{}),
} }
db.wg.Add(1) db.wg.Add(1)
go db.worker() go db.addWorker()
go db.deleteWorker()
return db return db
} }
func (db *DB) DeletePoints() {
query := fmt.Sprintf("delete from %s where time < now() - %dm", MeasurementNode, db.config.Influxdb.DeleteTill)
log.Println("delete", MeasurementNode, "older than", db.config.Influxdb.DeleteTill, "minutes")
db.client.Query(client.NewQuery(query, db.config.Influxdb.Database, "m"))
}
func (db *DB) AddPoint(name string, tags imodels.Tags, fields imodels.Fields, time time.Time) { func (db *DB) AddPoint(name string, tags imodels.Tags, fields imodels.Fields, time time.Time) {
point, err := client.NewPoint(name, tags.Map(), fields, time) point, err := client.NewPoint(name, tags.Map(), fields, time)
if err != nil { if err != nil {
@ -63,13 +72,27 @@ func (db *DB) Add(nodeId string, node *models.Node) {
} }
func (db *DB) Close() { func (db *DB) Close() {
close(db.quit)
close(db.points) close(db.points)
db.wg.Wait() db.wg.Wait()
db.client.Close() db.client.Close()
} }
func (db *DB) deleteWorker() {
duration := time.Minute * time.Duration(db.config.Influxdb.DeleteInterval)
ticker := time.NewTicker(duration)
for {
select {
case <-ticker.C:
db.DeletePoints()
case <-db.quit:
ticker.Stop()
return
}
}
}
// stores data points in batches into the influxdb // stores data points in batches into the influxdb
func (db *DB) worker() { func (db *DB) addWorker() {
bpConfig := client.BatchPointsConfig{ bpConfig := client.BatchPointsConfig{
Database: db.config.Influxdb.Database, Database: db.config.Influxdb.Database,
Precision: "m", Precision: "m",
@ -78,6 +101,7 @@ func (db *DB) worker() {
var bp client.BatchPoints var bp client.BatchPoints
var err error var err error
var writeNow, closed bool var writeNow, closed bool
batchDuration := time.Second * time.Duration(db.config.Influxdb.SaveInterval)
timer := time.NewTimer(batchDuration) timer := time.NewTimer(batchDuration)
for !closed { for !closed {

View File

@ -40,6 +40,9 @@ type Config struct {
Database string `yaml:"database"` Database string `yaml:"database"`
Username string `yaml:"username"` Username string `yaml:"username"`
Password string `yaml:"password"` Password string `yaml:"password"`
SaveInterval int `yaml:"saveinterval"` // Save nodes every n seconds
DeleteInterval int `yaml:"deleteinterval"` // Delete stats of nodes every n minutes
DeleteTill int `yaml:"deletetill"` // Delete stats of nodes till now-deletetill n minutes
} }
} }