Add global statistics
This commit is contained in:
parent
a60be980c5
commit
cbd8048d31
|
@ -170,7 +170,8 @@ func (nodes *Nodes) worker() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns global statistics for InfluxDB
|
// Returns global statistics for InfluxDB
|
||||||
func (nodes *Nodes) GlobalStats() (result GlobalStats) {
|
func (nodes *Nodes) GlobalStats() (result *GlobalStats) {
|
||||||
|
result = &GlobalStats{}
|
||||||
nodes.Lock()
|
nodes.Lock()
|
||||||
for _, node := range nodes.List {
|
for _, node := range nodes.List {
|
||||||
if node.Flags.Online {
|
if node.Flags.Online {
|
||||||
|
|
|
@ -16,13 +16,13 @@ import (
|
||||||
|
|
||||||
//Collector for a specificle respond messages
|
//Collector for a specificle respond messages
|
||||||
type Collector struct {
|
type Collector struct {
|
||||||
CollectType string
|
CollectType string
|
||||||
connection *net.UDPConn // UDP socket
|
connection *net.UDPConn // UDP socket
|
||||||
queue chan *Response // received responses
|
queue chan *Response // received responses
|
||||||
msgType reflect.Type
|
msgType reflect.Type
|
||||||
iface string // interface name for the multicast binding
|
multicastAddr string
|
||||||
db *database.DB
|
db *database.DB
|
||||||
nodes *models.Nodes
|
nodes *models.Nodes
|
||||||
// Ticker and stopper
|
// Ticker and stopper
|
||||||
ticker *time.Ticker
|
ticker *time.Ticker
|
||||||
stop chan interface{}
|
stop chan interface{}
|
||||||
|
@ -44,13 +44,12 @@ func NewCollector(db *database.DB, nodes *models.Nodes, interval time.Duration,
|
||||||
conn.SetReadBuffer(maxDataGramSize)
|
conn.SetReadBuffer(maxDataGramSize)
|
||||||
|
|
||||||
collector := &Collector{
|
collector := &Collector{
|
||||||
CollectType: "nodeinfo statistics neighbours",
|
connection: conn,
|
||||||
connection: conn,
|
nodes: nodes,
|
||||||
nodes: nodes,
|
multicastAddr: net.JoinHostPort(multiCastGroup+"%"+iface, port),
|
||||||
iface: iface,
|
queue: make(chan *Response, 400),
|
||||||
queue: make(chan *Response, 400),
|
ticker: time.NewTicker(interval),
|
||||||
ticker: time.NewTicker(interval),
|
stop: make(chan interface{}, 1),
|
||||||
stop: make(chan interface{}, 1),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
go collector.receiver()
|
go collector.receiver()
|
||||||
|
@ -69,15 +68,14 @@ func NewCollector(db *database.DB, nodes *models.Nodes, interval time.Duration,
|
||||||
func (coll *Collector) Close() {
|
func (coll *Collector) Close() {
|
||||||
// stop ticker
|
// stop ticker
|
||||||
coll.ticker.Stop()
|
coll.ticker.Stop()
|
||||||
coll.stop <- nil
|
close(coll.stop)
|
||||||
|
|
||||||
coll.connection.Close()
|
coll.connection.Close()
|
||||||
close(coll.queue)
|
close(coll.queue)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (coll *Collector) sendOnce() {
|
func (coll *Collector) sendOnce() {
|
||||||
coll.sendPacket(net.JoinHostPort(multiCastGroup+"%"+coll.iface, port))
|
coll.sendPacket(coll.multicastAddr)
|
||||||
log.Println("request", coll.CollectType)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (coll *Collector) sendPacket(address string) {
|
func (coll *Collector) sendPacket(address string) {
|
||||||
|
@ -86,7 +84,7 @@ func (coll *Collector) sendPacket(address string) {
|
||||||
log.Panic(err)
|
log.Panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := coll.connection.WriteToUDP([]byte("GET "+coll.CollectType), addr); err != nil {
|
if _, err := coll.connection.WriteToUDP([]byte("GET nodeinfo statistics neighbours"), addr); err != nil {
|
||||||
log.Println("WriteToUDP failed:", err)
|
log.Println("WriteToUDP failed:", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -98,6 +96,10 @@ func (coll *Collector) sender() {
|
||||||
case <-coll.stop:
|
case <-coll.stop:
|
||||||
return
|
return
|
||||||
case <-coll.ticker.C:
|
case <-coll.ticker.C:
|
||||||
|
// save global statistics
|
||||||
|
coll.db.AddPoint(database.MeasurementGlobal, nil, coll.nodes.GlobalStats().Fields(), time.Now())
|
||||||
|
|
||||||
|
// send the multicast packet to request per-node statistics
|
||||||
coll.sendOnce()
|
coll.sendOnce()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue