Optimize batch insertion
This commit is contained in:
parent
d1b315aa6e
commit
497190f343
17
stats_db.go
17
stats_db.go
|
@ -10,7 +10,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
saveInterval = time.Second * 5
|
batchWaiting = time.Second * 5
|
||||||
)
|
)
|
||||||
|
|
||||||
type StatsDb struct {
|
type StatsDb struct {
|
||||||
|
@ -99,8 +99,6 @@ func (c *StatsDb) Close() {
|
||||||
|
|
||||||
// stores data points in batches into the influxdb
|
// stores data points in batches into the influxdb
|
||||||
func (c *StatsDb) worker() {
|
func (c *StatsDb) worker() {
|
||||||
lastSent := time.Now()
|
|
||||||
open := true // channel open?
|
|
||||||
bpConfig := client.BatchPointsConfig{
|
bpConfig := client.BatchPointsConfig{
|
||||||
Database: config.Influxdb.Database,
|
Database: config.Influxdb.Database,
|
||||||
Precision: "m",
|
Precision: "m",
|
||||||
|
@ -108,9 +106,10 @@ func (c *StatsDb) worker() {
|
||||||
|
|
||||||
var bp client.BatchPoints
|
var bp client.BatchPoints
|
||||||
var err error
|
var err error
|
||||||
var dirty bool
|
var dirty, closed bool
|
||||||
|
var batchStarted time.Time
|
||||||
|
|
||||||
for open {
|
for !closed {
|
||||||
// create new batch points?
|
// create new batch points?
|
||||||
if bp == nil {
|
if bp == nil {
|
||||||
if bp, err = client.NewBatchPoints(bpConfig); err != nil {
|
if bp, err = client.NewBatchPoints(bpConfig); err != nil {
|
||||||
|
@ -123,22 +122,24 @@ func (c *StatsDb) worker() {
|
||||||
case point, ok := <-c.points:
|
case point, ok := <-c.points:
|
||||||
if ok {
|
if ok {
|
||||||
bp.AddPoint(point)
|
bp.AddPoint(point)
|
||||||
|
if !dirty {
|
||||||
|
batchStarted = time.Now()
|
||||||
dirty = true
|
dirty = true
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
open = false
|
closed = true
|
||||||
}
|
}
|
||||||
case <-time.After(time.Second):
|
case <-time.After(time.Second):
|
||||||
// nothing
|
// nothing
|
||||||
}
|
}
|
||||||
|
|
||||||
// write batch now?
|
// write batch now?
|
||||||
if dirty && (!open || lastSent.Add(saveInterval).Before(time.Now())) {
|
if (dirty && batchStarted.Add(batchWaiting).Before(time.Now())) || closed {
|
||||||
log.Println("saving", len(bp.Points()), "points")
|
log.Println("saving", len(bp.Points()), "points")
|
||||||
|
|
||||||
if err = c.client.Write(bp); err != nil {
|
if err = c.client.Write(bp); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
lastSent = time.Now()
|
|
||||||
dirty = false
|
dirty = false
|
||||||
bp = nil
|
bp = nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue