diff --git a/database/database.go b/database/database.go index 12a8034..091bcd5 100644 --- a/database/database.go +++ b/database/database.go @@ -1,6 +1,7 @@ package database import ( + "fmt" "log" "sync" "time" @@ -13,7 +14,6 @@ import ( const ( MeasurementNode = "node" // Measurement for per-node statistics MeasurementGlobal = "global" // Measurement for summarized global statistics - batchDuration = time.Second * 5 batchMaxSize = 500 ) @@ -22,6 +22,7 @@ type DB struct { client client.Client points chan *client.Point wg sync.WaitGroup + quit chan struct{} } func New(config *models.Config) *DB { @@ -40,14 +41,21 @@ func New(config *models.Config) *DB { config: config, client: c, points: make(chan *client.Point, 1000), + quit: make(chan struct{}), } db.wg.Add(1) - go db.worker() + go db.addWorker() + go db.deleteWorker() return db } +func (db *DB) DeletePoints() { + query := fmt.Sprintf("delete from %s where time < now() - %dm", MeasurementNode, db.config.Influxdb.DeleteTill) + db.client.Query(client.NewQuery(query, db.config.Influxdb.Database, "m")) +} + func (db *DB) AddPoint(name string, tags imodels.Tags, fields imodels.Fields, time time.Time) { point, err := client.NewPoint(name, tags.Map(), fields, time) if err != nil { @@ -63,13 +71,27 @@ func (db *DB) Add(nodeId string, node *models.Node) { } func (db *DB) Close() { + close(db.quit) close(db.points) db.wg.Wait() db.client.Close() } +func (db *DB) deleteWorker() { + duration := time.Minute * time.Duration(db.config.Influxdb.DeleteInterval) + ticker := time.NewTicker(duration) + for { + select { + case <-ticker.C: + db.DeletePoints() + case <-db.quit: + ticker.Stop() + return + } + } +} // stores data points in batches into the influxdb -func (db *DB) worker() { +func (db *DB) addWorker() { bpConfig := client.BatchPointsConfig{ Database: db.config.Influxdb.Database, Precision: "m", @@ -78,6 +100,7 @@ func (db *DB) worker() { var bp client.BatchPoints var err error var writeNow, closed bool + batchDuration := time.Second * time.Duration(db.config.Influxdb.SaveInterval) timer := time.NewTimer(batchDuration) for !closed { diff --git a/models/config.go b/models/config.go index fdd20e6..89b5901 100644 --- a/models/config.go +++ b/models/config.go @@ -35,11 +35,14 @@ type Config struct { MaxAge int `yaml:"max_age"` // Remove nodes after n days of inactivity } `yaml:"nodes"` Influxdb struct { - Enable bool `yaml:"enable"` - Addr string `yaml:"host"` - Database string `yaml:"database"` - Username string `yaml:"username"` - Password string `yaml:"password"` + Enable bool `yaml:"enable"` + Addr string `yaml:"host"` + Database string `yaml:"database"` + Username string `yaml:"username"` + Password string `yaml:"password"` + SaveInterval int `yaml:"saveinterval"` // Save nodes every n seconds + DeleteInterval int `yaml:"deleteinterval"` // Delete stats of nodes every n minutes + DeleteTill int `yaml:"deletetill"` // Delete stats of nodes till now-deletetill n minutes } }