Small refactoring

This commit is contained in:
Julian Kornberger 2016-03-12 03:58:36 +01:00
parent d275ecf48d
commit 0bfcd8a720
2 changed files with 36 additions and 34 deletions

49
main.go
View File

@ -31,15 +31,9 @@ func main() {
flag.Parse()
config = models.ConfigReadFile(configFile)
collectInterval := time.Second * time.Duration(config.Respondd.CollectInterval)
if config.Nodes.Enable {
go nodes.Saver(config)
}
if config.Nodes.AliasesEnable {
// FIXME what does this do?
//go aliases.Saver(config.Nodes.AliasesPath, saveInterval)
}
if config.Webserver.Enable {
if config.Webserver.WebsocketNode {
@ -54,28 +48,16 @@ func main() {
}
if config.Respondd.Enable {
multiCollector = respond.NewMultiCollector(collectInterval, func(addr net.UDPAddr, msg interface{}) {
switch msg := msg.(type) {
case *data.NodeInfo:
nodes.Get(msg.NodeId).Nodeinfo = msg
case *data.NeighbourStruct:
nodes.Get(msg.NodeId).Neighbours = msg
case *data.StatisticsStruct:
nodes.Get(msg.NodeId).Statistics = msg
if statsDb != nil {
statsDb.Add(msg)
}
default:
log.Println("unknown message:", msg)
}
})
collectInterval := time.Second * time.Duration(config.Respondd.CollectInterval)
multiCollector = respond.NewMultiCollector(collectInterval, onReceive)
}
// TODO bad
if config.Webserver.Enable {
log.Fatal(http.ListenAndServe(net.JoinHostPort(config.Webserver.Address, config.Webserver.Port), nil))
}
// Wait for End
// Wait for INT/TERM
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
sig := <-sigs
@ -92,3 +74,26 @@ func main() {
statsDb.Close()
}
}
// called for every parsed announced-message
func onReceive(addr net.UDPAddr, msg interface{}) {
switch msg := msg.(type) {
case *data.NodeInfo:
nodes.Get(msg.NodeId).Nodeinfo = msg
case *data.NeighbourStruct:
nodes.Get(msg.NodeId).Neighbours = msg
case *data.StatisticsStruct:
nodes.Get(msg.NodeId).Statistics = msg
// store data?
if statsDb != nil {
statsDb.Add(msg)
}
default:
log.Println("unknown message:", msg)
}
}

View File

@ -49,7 +49,7 @@ func (c *StatsDb) Add(stats *data.StatisticsStruct) {
}
fields := map[string]interface{}{
"load": stats.LoadAverage,
"processes.running": stats.Processes.Running,
"processes.open": stats.Processes.Running,
"clients.wifi": stats.Clients.Wifi,
"clients.total": stats.Clients.Total,
"traffic.forward": stats.Traffic.Forward,
@ -76,8 +76,10 @@ func (c *StatsDb) Close() {
c.client.Close()
}
// stores data points in batches into the influxdb
func (c *StatsDb) worker() {
lastSent := time.Now()
open := true // channel open?
bpConfig := client.BatchPointsConfig{
Database: config.Influxdb.Database,
Precision: "m",
@ -85,10 +87,9 @@ func (c *StatsDb) worker() {
var bp client.BatchPoints
var err error
var abort bool
var dirty bool
for {
for open {
// create new batch points?
if bp == nil {
if bp, err = client.NewBatchPoints(bpConfig); err != nil {
@ -103,27 +104,23 @@ func (c *StatsDb) worker() {
bp.AddPoint(point)
dirty = true
} else {
abort = true
open = false
}
case <-time.After(time.Second):
// nothing
}
// write now?
if dirty && (abort || lastSent.Add(saveInterval).Before(time.Now())) {
// write batch now?
if dirty && (!open || lastSent.Add(saveInterval).Before(time.Now())) {
log.Println("saving", len(bp.Points()), "points")
if err := c.client.Write(bp); err != nil {
if err = c.client.Write(bp); err != nil {
panic(err)
}
lastSent = time.Now()
dirty = false
bp = nil
}
if abort {
break
}
}
c.wg.Done()