diff --git a/models/nodes.go b/models/nodes.go index 4991304..9eaa516 100644 --- a/models/nodes.go +++ b/models/nodes.go @@ -170,7 +170,8 @@ func (nodes *Nodes) worker() { } // Returns global statistics for InfluxDB -func (nodes *Nodes) GlobalStats() (result GlobalStats) { +func (nodes *Nodes) GlobalStats() (result *GlobalStats) { + result = &GlobalStats{} nodes.Lock() for _, node := range nodes.List { if node.Flags.Online { diff --git a/respond/collector.go b/respond/collector.go index ce9ae34..d971b4d 100644 --- a/respond/collector.go +++ b/respond/collector.go @@ -16,13 +16,13 @@ import ( //Collector for a specificle respond messages type Collector struct { - CollectType string - connection *net.UDPConn // UDP socket - queue chan *Response // received responses - msgType reflect.Type - iface string // interface name for the multicast binding - db *database.DB - nodes *models.Nodes + CollectType string + connection *net.UDPConn // UDP socket + queue chan *Response // received responses + msgType reflect.Type + multicastAddr string + db *database.DB + nodes *models.Nodes // Ticker and stopper ticker *time.Ticker stop chan interface{} @@ -44,13 +44,12 @@ func NewCollector(db *database.DB, nodes *models.Nodes, interval time.Duration, conn.SetReadBuffer(maxDataGramSize) collector := &Collector{ - CollectType: "nodeinfo statistics neighbours", - connection: conn, - nodes: nodes, - iface: iface, - queue: make(chan *Response, 400), - ticker: time.NewTicker(interval), - stop: make(chan interface{}, 1), + connection: conn, + nodes: nodes, + multicastAddr: net.JoinHostPort(multiCastGroup+"%"+iface, port), + queue: make(chan *Response, 400), + ticker: time.NewTicker(interval), + stop: make(chan interface{}, 1), } go collector.receiver() @@ -69,15 +68,14 @@ func NewCollector(db *database.DB, nodes *models.Nodes, interval time.Duration, func (coll *Collector) Close() { // stop ticker coll.ticker.Stop() - coll.stop <- nil + close(coll.stop) coll.connection.Close() close(coll.queue) } func (coll *Collector) sendOnce() { - coll.sendPacket(net.JoinHostPort(multiCastGroup+"%"+coll.iface, port)) - log.Println("request", coll.CollectType) + coll.sendPacket(coll.multicastAddr) } func (coll *Collector) sendPacket(address string) { @@ -86,7 +84,7 @@ func (coll *Collector) sendPacket(address string) { log.Panic(err) } - if _, err := coll.connection.WriteToUDP([]byte("GET "+coll.CollectType), addr); err != nil { + if _, err := coll.connection.WriteToUDP([]byte("GET nodeinfo statistics neighbours"), addr); err != nil { log.Println("WriteToUDP failed:", err) } } @@ -98,6 +96,10 @@ func (coll *Collector) sender() { case <-coll.stop: return case <-coll.ticker.C: + // save global statistics + coll.db.AddPoint(database.MeasurementGlobal, nil, coll.nodes.GlobalStats().Fields(), time.Now()) + + // send the multicast packet to request per-node statistics coll.sendOnce() } }