From e060934c6c83b9df1cb5fbe37706988f9378c897 Mon Sep 17 00:00:00 2001 From: Martin Geno Date: Sat, 26 Nov 2016 13:11:21 +0100 Subject: [PATCH 1/2] want to resolve #8 --- database/database.go | 29 ++++++++++++++++++++++++++--- models/config.go | 13 ++++++++----- 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/database/database.go b/database/database.go index 12a8034..091bcd5 100644 --- a/database/database.go +++ b/database/database.go @@ -1,6 +1,7 @@ package database import ( + "fmt" "log" "sync" "time" @@ -13,7 +14,6 @@ import ( const ( MeasurementNode = "node" // Measurement for per-node statistics MeasurementGlobal = "global" // Measurement for summarized global statistics - batchDuration = time.Second * 5 batchMaxSize = 500 ) @@ -22,6 +22,7 @@ type DB struct { client client.Client points chan *client.Point wg sync.WaitGroup + quit chan struct{} } func New(config *models.Config) *DB { @@ -40,14 +41,21 @@ func New(config *models.Config) *DB { config: config, client: c, points: make(chan *client.Point, 1000), + quit: make(chan struct{}), } db.wg.Add(1) - go db.worker() + go db.addWorker() + go db.deleteWorker() return db } +func (db *DB) DeletePoints() { + query := fmt.Sprintf("delete from %s where time < now() - %dm", MeasurementNode, db.config.Influxdb.DeleteTill) + db.client.Query(client.NewQuery(query, db.config.Influxdb.Database, "m")) +} + func (db *DB) AddPoint(name string, tags imodels.Tags, fields imodels.Fields, time time.Time) { point, err := client.NewPoint(name, tags.Map(), fields, time) if err != nil { @@ -63,13 +71,27 @@ func (db *DB) Add(nodeId string, node *models.Node) { } func (db *DB) Close() { + close(db.quit) close(db.points) db.wg.Wait() db.client.Close() } +func (db *DB) deleteWorker() { + duration := time.Minute * time.Duration(db.config.Influxdb.DeleteInterval) + ticker := time.NewTicker(duration) + for { + select { + case <-ticker.C: + db.DeletePoints() + case <-db.quit: + ticker.Stop() + return + } + } +} // stores data points in batches into the influxdb -func (db *DB) worker() { +func (db *DB) addWorker() { bpConfig := client.BatchPointsConfig{ Database: db.config.Influxdb.Database, Precision: "m", @@ -78,6 +100,7 @@ func (db *DB) worker() { var bp client.BatchPoints var err error var writeNow, closed bool + batchDuration := time.Second * time.Duration(db.config.Influxdb.SaveInterval) timer := time.NewTimer(batchDuration) for !closed { diff --git a/models/config.go b/models/config.go index fdd20e6..89b5901 100644 --- a/models/config.go +++ b/models/config.go @@ -35,11 +35,14 @@ type Config struct { MaxAge int `yaml:"max_age"` // Remove nodes after n days of inactivity } `yaml:"nodes"` Influxdb struct { - Enable bool `yaml:"enable"` - Addr string `yaml:"host"` - Database string `yaml:"database"` - Username string `yaml:"username"` - Password string `yaml:"password"` + Enable bool `yaml:"enable"` + Addr string `yaml:"host"` + Database string `yaml:"database"` + Username string `yaml:"username"` + Password string `yaml:"password"` + SaveInterval int `yaml:"saveinterval"` // Save nodes every n seconds + DeleteInterval int `yaml:"deleteinterval"` // Delete stats of nodes every n minutes + DeleteTill int `yaml:"deletetill"` // Delete stats of nodes till now-deletetill n minutes } } From 30e4fe3267be7c2bbeff8ec4845fddd216789fad Mon Sep 17 00:00:00 2001 From: Martin Geno Date: Tue, 29 Nov 2016 00:15:32 +0100 Subject: [PATCH 2/2] log on deleting node information --- database/database.go | 1 + 1 file changed, 1 insertion(+) diff --git a/database/database.go b/database/database.go index 091bcd5..babb62b 100644 --- a/database/database.go +++ b/database/database.go @@ -53,6 +53,7 @@ func New(config *models.Config) *DB { func (db *DB) DeletePoints() { query := fmt.Sprintf("delete from %s where time < now() - %dm", MeasurementNode, db.config.Influxdb.DeleteTill) + log.Println("delete", MeasurementNode, "older than", db.config.Influxdb.DeleteTill, "minutes") db.client.Query(client.NewQuery(query, db.config.Influxdb.Database, "m")) }