From 0bfcd8a7201dc4143ecccc509ab361acbcb0fc70 Mon Sep 17 00:00:00 2001 From: Julian Kornberger Date: Sat, 12 Mar 2016 03:58:36 +0100 Subject: [PATCH] Small refactoring --- main.go | 51 ++++++++++++++++++++++++++++----------------------- stats_db.go | 19 ++++++++----------- 2 files changed, 36 insertions(+), 34 deletions(-) diff --git a/main.go b/main.go index 5672903..3ce1ea3 100644 --- a/main.go +++ b/main.go @@ -31,15 +31,9 @@ func main() { flag.Parse() config = models.ConfigReadFile(configFile) - collectInterval := time.Second * time.Duration(config.Respondd.CollectInterval) - if config.Nodes.Enable { go nodes.Saver(config) } - if config.Nodes.AliasesEnable { - // FIXME what does this do? - //go aliases.Saver(config.Nodes.AliasesPath, saveInterval) - } if config.Webserver.Enable { if config.Webserver.WebsocketNode { @@ -54,28 +48,16 @@ func main() { } if config.Respondd.Enable { - multiCollector = respond.NewMultiCollector(collectInterval, func(addr net.UDPAddr, msg interface{}) { - switch msg := msg.(type) { - case *data.NodeInfo: - nodes.Get(msg.NodeId).Nodeinfo = msg - case *data.NeighbourStruct: - nodes.Get(msg.NodeId).Neighbours = msg - case *data.StatisticsStruct: - nodes.Get(msg.NodeId).Statistics = msg - if statsDb != nil { - statsDb.Add(msg) - } - default: - log.Println("unknown message:", msg) - } - }) + collectInterval := time.Second * time.Duration(config.Respondd.CollectInterval) + multiCollector = respond.NewMultiCollector(collectInterval, onReceive) } - //TODO bad + // TODO bad if config.Webserver.Enable { log.Fatal(http.ListenAndServe(net.JoinHostPort(config.Webserver.Address, config.Webserver.Port), nil)) } - // Wait for End + + // Wait for INT/TERM sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) sig := <-sigs @@ -92,3 +74,26 @@ func main() { statsDb.Close() } } + +// called for every parsed announced-message +func onReceive(addr net.UDPAddr, msg interface{}) { + switch msg := msg.(type) { + + case *data.NodeInfo: + nodes.Get(msg.NodeId).Nodeinfo = msg + + case *data.NeighbourStruct: + nodes.Get(msg.NodeId).Neighbours = msg + + case *data.StatisticsStruct: + nodes.Get(msg.NodeId).Statistics = msg + + // store data? + if statsDb != nil { + statsDb.Add(msg) + } + + default: + log.Println("unknown message:", msg) + } +} diff --git a/stats_db.go b/stats_db.go index b998ba2..6046568 100644 --- a/stats_db.go +++ b/stats_db.go @@ -49,7 +49,7 @@ func (c *StatsDb) Add(stats *data.StatisticsStruct) { } fields := map[string]interface{}{ "load": stats.LoadAverage, - "processes.running": stats.Processes.Running, + "processes.open": stats.Processes.Running, "clients.wifi": stats.Clients.Wifi, "clients.total": stats.Clients.Total, "traffic.forward": stats.Traffic.Forward, @@ -76,8 +76,10 @@ func (c *StatsDb) Close() { c.client.Close() } +// stores data points in batches into the influxdb func (c *StatsDb) worker() { lastSent := time.Now() + open := true // channel open? bpConfig := client.BatchPointsConfig{ Database: config.Influxdb.Database, Precision: "m", @@ -85,10 +87,9 @@ func (c *StatsDb) worker() { var bp client.BatchPoints var err error - var abort bool var dirty bool - for { + for open { // create new batch points? if bp == nil { if bp, err = client.NewBatchPoints(bpConfig); err != nil { @@ -103,27 +104,23 @@ func (c *StatsDb) worker() { bp.AddPoint(point) dirty = true } else { - abort = true + open = false } case <-time.After(time.Second): // nothing } - // write now? - if dirty && (abort || lastSent.Add(saveInterval).Before(time.Now())) { + // write batch now? + if dirty && (!open || lastSent.Add(saveInterval).Before(time.Now())) { log.Println("saving", len(bp.Points()), "points") - if err := c.client.Write(bp); err != nil { + if err = c.client.Write(bp); err != nil { panic(err) } lastSent = time.Now() dirty = false bp = nil } - - if abort { - break - } } c.wg.Done()