Optimize bulk insertion to InfluxDB
This commit is contained in:
parent
dd643c469b
commit
948bfb291f
36
stats_db.go
36
stats_db.go
|
@ -10,7 +10,7 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
batchWaiting = time.Second * 5
|
||||
batchDuration = time.Second * 5
|
||||
)
|
||||
|
||||
type StatsDb struct {
|
||||
|
@ -106,44 +106,46 @@ func (c *StatsDb) worker() {
|
|||
|
||||
var bp client.BatchPoints
|
||||
var err error
|
||||
var dirty, closed bool
|
||||
var batchStarted time.Time
|
||||
var writeNow, closed bool
|
||||
timer := time.NewTimer(batchDuration)
|
||||
|
||||
for !closed {
|
||||
// create new batch points?
|
||||
if bp == nil {
|
||||
if bp, err = client.NewBatchPoints(bpConfig); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
// wait for new points
|
||||
select {
|
||||
case point, ok := <-c.points:
|
||||
if ok {
|
||||
bp.AddPoint(point)
|
||||
if !dirty {
|
||||
batchStarted = time.Now()
|
||||
dirty = true
|
||||
if bp == nil {
|
||||
// create new batch
|
||||
timer.Reset(batchDuration)
|
||||
if bp, err = client.NewBatchPoints(bpConfig); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
bp.AddPoint(point)
|
||||
} else {
|
||||
closed = true
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
// nothing
|
||||
case <-timer.C:
|
||||
if bp == nil {
|
||||
timer.Reset(batchDuration)
|
||||
} else {
|
||||
writeNow = true
|
||||
}
|
||||
}
|
||||
|
||||
// write batch now?
|
||||
if (dirty && batchStarted.Add(batchWaiting).Before(time.Now())) || closed {
|
||||
if bp != nil && (writeNow || closed) {
|
||||
log.Println("saving", len(bp.Points()), "points")
|
||||
|
||||
if err = c.client.Write(bp); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
dirty = false
|
||||
writeNow = false
|
||||
bp = nil
|
||||
}
|
||||
}
|
||||
|
||||
timer.Stop()
|
||||
c.wg.Done()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue