From 497190f343abbb71294a57e1c54b3d75778788f2 Mon Sep 17 00:00:00 2001 From: Julian Kornberger Date: Sat, 12 Mar 2016 16:27:52 +0100 Subject: [PATCH] Optimize batch insertion --- stats_db.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/stats_db.go b/stats_db.go index d4e0793..9bfd432 100644 --- a/stats_db.go +++ b/stats_db.go @@ -10,7 +10,7 @@ import ( ) const ( - saveInterval = time.Second * 5 + batchWaiting = time.Second * 5 ) type StatsDb struct { @@ -99,8 +99,6 @@ func (c *StatsDb) Close() { // stores data points in batches into the influxdb func (c *StatsDb) worker() { - lastSent := time.Now() - open := true // channel open? bpConfig := client.BatchPointsConfig{ Database: config.Influxdb.Database, Precision: "m", @@ -108,9 +106,10 @@ func (c *StatsDb) worker() { var bp client.BatchPoints var err error - var dirty bool + var dirty, closed bool + var batchStarted time.Time - for open { + for !closed { // create new batch points? if bp == nil { if bp, err = client.NewBatchPoints(bpConfig); err != nil { @@ -123,22 +122,24 @@ func (c *StatsDb) worker() { case point, ok := <-c.points: if ok { bp.AddPoint(point) - dirty = true + if !dirty { + batchStarted = time.Now() + dirty = true + } } else { - open = false + closed = true } case <-time.After(time.Second): // nothing } // write batch now? - if dirty && (!open || lastSent.Add(saveInterval).Before(time.Now())) { + if (dirty && batchStarted.Add(batchWaiting).Before(time.Now())) || closed { log.Println("saving", len(bp.Points()), "points") if err = c.client.Write(bp); err != nil { panic(err) } - lastSent = time.Now() dirty = false bp = nil }