From 948bfb291fec06497718a770998f422c6662f64e Mon Sep 17 00:00:00 2001 From: Julian Kornberger Date: Mon, 28 Mar 2016 15:46:22 +0200 Subject: [PATCH] Optimize bulk insertion to InfluxDB --- stats_db.go | 36 +++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/stats_db.go b/stats_db.go index 70a4144..ab889b4 100644 --- a/stats_db.go +++ b/stats_db.go @@ -10,7 +10,7 @@ import ( ) const ( - batchWaiting = time.Second * 5 + batchDuration = time.Second * 5 ) type StatsDb struct { @@ -106,44 +106,46 @@ func (c *StatsDb) worker() { var bp client.BatchPoints var err error - var dirty, closed bool - var batchStarted time.Time + var writeNow, closed bool + timer := time.NewTimer(batchDuration) for !closed { - // create new batch points? - if bp == nil { - if bp, err = client.NewBatchPoints(bpConfig); err != nil { - panic(err) - } - } // wait for new points select { case point, ok := <-c.points: if ok { - bp.AddPoint(point) - if !dirty { - batchStarted = time.Now() - dirty = true + if bp == nil { + // create new batch + timer.Reset(batchDuration) + if bp, err = client.NewBatchPoints(bpConfig); err != nil { + panic(err) + } } + bp.AddPoint(point) } else { closed = true } - case <-time.After(time.Second): - // nothing + case <-timer.C: + if bp == nil { + timer.Reset(batchDuration) + } else { + writeNow = true + } } // write batch now? - if (dirty && batchStarted.Add(batchWaiting).Before(time.Now())) || closed { + if bp != nil && (writeNow || closed) { log.Println("saving", len(bp.Points()), "points") if err = c.client.Write(bp); err != nil { panic(err) } - dirty = false + writeNow = false bp = nil } } + timer.Stop() c.wg.Done() }