diff --git a/database/database.go b/database/database.go new file mode 100644 index 0000000..5e88dc9 --- /dev/null +++ b/database/database.go @@ -0,0 +1,120 @@ +package database + +import ( + "log" + "sync" + "time" + + "github.com/FreifunkBremen/respond-collector/models" + "github.com/influxdata/influxdb/client/v2" + imodels "github.com/influxdata/influxdb/models" +) + +const ( + MeasurementNode = "node" // Measurement for per-node statistics + MeasurementGlobal = "global" // Measurement for summarized global statistics + batchDuration = time.Second * 5 + batchMaxSize = 500 +) + +type DB struct { + config *models.Config + client client.Client + points chan *client.Point + wg sync.WaitGroup +} + +func New(config *models.Config) *DB { + // Make client + c, err := client.NewHTTPClient(client.HTTPConfig{ + Addr: config.Influxdb.Addr, + Username: config.Influxdb.Username, + Password: config.Influxdb.Password, + }) + + if err != nil { + panic(err) + } + + db := &DB{ + config: config, + client: c, + points: make(chan *client.Point, 1000), + } + + db.wg.Add(1) + go db.worker() + + return db +} + +func (db *DB) AddPoint(name string, tags imodels.Tags, fields imodels.Fields, time time.Time) { + point, err := client.NewPoint(name, tags.Map(), fields, time) + if err != nil { + panic(err) + } + db.points <- point +} + +// Add data for a single node +func (db *DB) Add(nodeId string, node *models.Node) { + tags, fields := node.ToInflux() + db.AddPoint(MeasurementNode, tags, fields, time.Now()) +} + +func (db *DB) Close() { + close(db.points) + db.wg.Wait() + db.client.Close() +} + +// stores data points in batches into the influxdb +func (db *DB) worker() { + bpConfig := client.BatchPointsConfig{ + Database: db.config.Influxdb.Database, + Precision: "m", + } + + var bp client.BatchPoints + var err error + var writeNow, closed bool + timer := time.NewTimer(batchDuration) + + for !closed { + // wait for new points + select { + case point, ok := <-db.points: + if ok { + if bp == nil { + // create new batch + timer.Reset(batchDuration) + if bp, err = client.NewBatchPoints(bpConfig); err != nil { + log.Fatal(err) + } + } + bp.AddPoint(point) + } else { + closed = true + } + case <-timer.C: + if bp == nil { + timer.Reset(batchDuration) + } else { + writeNow = true + } + } + + // write batch now? + if bp != nil && (writeNow || closed) || len(bp.Points()) >= batchMaxSize { + log.Println("saving", len(bp.Points()), "points") + + if err = db.client.Write(bp); err != nil { + log.Fatal(err) + } + writeNow = false + bp = nil + } + } + timer.Stop() + db.wg.Done() +} diff --git a/main.go b/main.go index f10e0a9..10e4838 100644 --- a/main.go +++ b/main.go @@ -14,7 +14,7 @@ import ( "github.com/julienschmidt/httprouter" "github.com/FreifunkBremen/respond-collector/api" - "github.com/FreifunkBremen/respond-collector/data" + "github.com/FreifunkBremen/respond-collector/database" "github.com/FreifunkBremen/respond-collector/models" "github.com/FreifunkBremen/respond-collector/respond" ) @@ -23,7 +23,7 @@ var ( configFile string config *models.Config collector *respond.Collector - statsDb *StatsDb + db *database.DB nodes *models.Nodes ) @@ -31,15 +31,15 @@ func main() { flag.StringVar(&configFile, "config", "config.yml", "path of configuration file (default:config.yaml)") flag.Parse() config = models.ReadConfigFile(configFile) - nodes = models.NewNodes(config) if config.Influxdb.Enable { - statsDb = NewStatsDb() + db = database.New(config) } + nodes = models.NewNodes(config) if config.Respondd.Enable { collectInterval := time.Second * time.Duration(config.Respondd.CollectInterval) - collector = respond.NewCollector("nodeinfo statistics neighbours", collectInterval, onReceive, config.Respondd.Interface) + collector = respond.NewCollector(db, nodes, collectInterval, config.Respondd.Interface) } if config.Webserver.Enable { @@ -70,33 +70,7 @@ func main() { if collector != nil { collector.Close() } - if statsDb != nil { - statsDb.Close() - } -} - -// called for every parsed announced-message -func onReceive(addr net.UDPAddr, res *data.ResponseData) { - - // Search for NodeID - var nodeId string - if val := res.NodeInfo; val != nil { - nodeId = val.NodeId - } else if val := res.Neighbours; val != nil { - nodeId = val.NodeId - } else if val := res.Statistics; val != nil { - nodeId = val.NodeId - } - - // Updates nodes if NodeID found - if len(nodeId) != 12 { - log.Printf("invalid NodeID '%s' from %s", nodeId, addr.String()) - return - } - - node := nodes.Update(nodeId, res) - - if statsDb != nil && node.Statistics != nil { - statsDb.Add(nodeId, node) + if db != nil { + db.Close() } } diff --git a/models/node.go b/models/node.go new file mode 100644 index 0000000..d871a62 --- /dev/null +++ b/models/node.go @@ -0,0 +1,95 @@ +package models + +import ( + "strconv" + + "github.com/FreifunkBremen/respond-collector/data" + "github.com/FreifunkBremen/respond-collector/jsontime" + "github.com/FreifunkBremen/respond-collector/meshviewer" + imodels "github.com/influxdata/influxdb/models" +) + +// Node struct +type Node struct { + Firstseen jsontime.Time `json:"firstseen"` + Lastseen jsontime.Time `json:"lastseen"` + Flags *meshviewer.Flags `json:"flags,omitempty"` + Statistics *data.Statistics `json:"statistics"` + Nodeinfo *data.NodeInfo `json:"nodeinfo"` + Neighbours *data.Neighbours `json:"-"` +} + +// Returns tags and fields for InfluxDB +func (node *Node) ToInflux() (tags imodels.Tags, fields imodels.Fields) { + stats := node.Statistics + + tags.SetString("nodeid", stats.NodeId) + + fields = map[string]interface{}{ + "load": stats.LoadAverage, + "time.up": int64(stats.Uptime), + "time.idle": int64(stats.Idletime), + "proc.running": stats.Processes.Running, + "clients.wifi": stats.Clients.Wifi, + "clients.wifi24": stats.Clients.Wifi24, + "clients.wifi5": stats.Clients.Wifi5, + "clients.total": stats.Clients.Total, + "memory.buffers": stats.Memory.Buffers, + "memory.cached": stats.Memory.Cached, + "memory.free": stats.Memory.Free, + "memory.total": stats.Memory.Total, + } + + if nodeinfo := node.Nodeinfo; nodeinfo != nil { + if owner := nodeinfo.Owner; owner != nil { + tags.SetString("owner", owner.Contact) + } + if wireless := nodeinfo.Wireless; wireless != nil { + fields["wireless.txpower24"] = wireless.TxPower24 + fields["wireless.txpower5"] = wireless.TxPower5 + } + // morpheus needs + tags.SetString("hostname", nodeinfo.Hostname) + } + + if t := stats.Traffic.Rx; t != nil { + fields["traffic.rx.bytes"] = int64(t.Bytes) + fields["traffic.rx.packets"] = t.Packets + } + if t := stats.Traffic.Tx; t != nil { + fields["traffic.tx.bytes"] = int64(t.Bytes) + fields["traffic.tx.packets"] = t.Packets + fields["traffic.tx.dropped"] = t.Dropped + } + if t := stats.Traffic.Forward; t != nil { + fields["traffic.forward.bytes"] = int64(t.Bytes) + fields["traffic.forward.packets"] = t.Packets + } + if t := stats.Traffic.MgmtRx; t != nil { + fields["traffic.mgmt_rx.bytes"] = int64(t.Bytes) + fields["traffic.mgmt_rx.packets"] = t.Packets + } + if t := stats.Traffic.MgmtTx; t != nil { + fields["traffic.mgmt_tx.bytes"] = int64(t.Bytes) + fields["traffic.mgmt_tx.packets"] = t.Packets + } + if w := stats.Wireless; w != nil { + addAirtime := func(suffix string, time *data.WirelessAirtime) { + fields["airtime"+suffix+".chan_util"] = time.ChanUtil + fields["airtime"+suffix+".rx_util"] = time.RxUtil + fields["airtime"+suffix+".tx_util"] = time.TxUtil + fields["airtime"+suffix+".noise"] = time.Noise + fields["airtime"+suffix+".frequency"] = time.Frequency + tags.SetString("frequency"+suffix, strconv.Itoa(int(time.Frequency))) + } + + if time := w.Airtime24; time != nil { + addAirtime("24", w.Airtime24) + } + if time := w.Airtime5; time != nil { + addAirtime("5", w.Airtime5) + } + } + + return +} diff --git a/models/nodes.go b/models/nodes.go index c98c875..4991304 100644 --- a/models/nodes.go +++ b/models/nodes.go @@ -13,16 +13,6 @@ import ( "github.com/FreifunkBremen/respond-collector/meshviewer" ) -// Node struct -type Node struct { - Firstseen jsontime.Time `json:"firstseen"` - Lastseen jsontime.Time `json:"lastseen"` - Flags *meshviewer.Flags `json:"flags,omitempty"` - Statistics *data.Statistics `json:"statistics"` - Nodeinfo *data.NodeInfo `json:"nodeinfo"` - Neighbours *data.Neighbours `json:"-"` -} - // Nodes struct: cache DB of Node's structs type Nodes struct { Version int `json:"version"` @@ -32,6 +22,14 @@ type Nodes struct { sync.RWMutex } +type GlobalStats struct { + Nodes uint32 + Clients uint32 + ClientsWifi uint32 + ClientsWifi24 uint32 + ClientsWifi5 uint32 +} + // NewNodes create Nodes structs func NewNodes(config *Config) *Nodes { nodes := &Nodes{ @@ -171,33 +169,32 @@ func (nodes *Nodes) worker() { } } -func (nodes *Nodes) GetStats() map[string]interface{} { - var nodesCount uint32 - var clientsCount uint32 - var clientsWifiCount uint32 - var clientsWifi24Count uint32 - var clientsWifi5Count uint32 - +// Returns global statistics for InfluxDB +func (nodes *Nodes) GlobalStats() (result GlobalStats) { nodes.Lock() for _, node := range nodes.List { if node.Flags.Online { - nodesCount += 1 + result.Nodes += 1 if stats := node.Statistics; stats != nil { - clientsCount += stats.Clients.Total - clientsWifi24Count += stats.Clients.Wifi24 - clientsWifi5Count += stats.Clients.Wifi5 - clientsWifiCount += stats.Clients.Wifi + result.Clients += stats.Clients.Total + result.ClientsWifi24 += stats.Clients.Wifi24 + result.ClientsWifi5 += stats.Clients.Wifi5 + result.ClientsWifi += stats.Clients.Wifi } } } nodes.Unlock() + return +} +// Returns fields for InfluxDB +func (stats *GlobalStats) Fields() map[string]interface{} { return map[string]interface{}{ - "nodes": nodesCount, - "clients.total": clientsCount, - "clients.wifi": clientsWifiCount, - "clients.wifi24": clientsWifi24Count, - "clients.wifi5": clientsWifi5Count, + "nodes": stats.Nodes, + "clients.total": stats.Clients, + "clients.wifi": stats.ClientsWifi, + "clients.wifi24": stats.ClientsWifi24, + "clients.wifi5": stats.ClientsWifi5, } } diff --git a/respond/collector.go b/respond/collector.go index 4b93035..ce9ae34 100644 --- a/respond/collector.go +++ b/respond/collector.go @@ -10,6 +10,8 @@ import ( "time" "github.com/FreifunkBremen/respond-collector/data" + "github.com/FreifunkBremen/respond-collector/database" + "github.com/FreifunkBremen/respond-collector/models" ) //Collector for a specificle respond messages @@ -17,18 +19,17 @@ type Collector struct { CollectType string connection *net.UDPConn // UDP socket queue chan *Response // received responses - onReceive OnReceive msgType reflect.Type - intface string + iface string // interface name for the multicast binding + db *database.DB + nodes *models.Nodes // Ticker and stopper ticker *time.Ticker stop chan interface{} } -type OnReceive func(net.UDPAddr, *data.ResponseData) - -//NewCollector creates a Collector struct -func NewCollector(CollectType string, interval time.Duration, onReceive OnReceive, intface string) *Collector { +// Creates a Collector struct +func NewCollector(db *database.DB, nodes *models.Nodes, interval time.Duration, iface string) *Collector { // Parse address addr, err := net.ResolveUDPAddr("udp", "[::]:0") if err != nil { @@ -43,13 +44,13 @@ func NewCollector(CollectType string, interval time.Duration, onReceive OnReceiv conn.SetReadBuffer(maxDataGramSize) collector := &Collector{ - CollectType: CollectType, + CollectType: "nodeinfo statistics neighbours", connection: conn, + nodes: nodes, + iface: iface, queue: make(chan *Response, 400), ticker: time.NewTicker(interval), stop: make(chan interface{}, 1), - onReceive: onReceive, - intface: intface, } go collector.receiver() @@ -75,7 +76,7 @@ func (coll *Collector) Close() { } func (coll *Collector) sendOnce() { - coll.sendPacket(net.JoinHostPort(multiCastGroup+"%"+coll.intface, port)) + coll.sendPacket(net.JoinHostPort(multiCastGroup+"%"+coll.iface, port)) log.Println("request", coll.CollectType) } @@ -104,25 +105,47 @@ func (coll *Collector) sender() { func (coll *Collector) parser() { for obj := range coll.queue { - if err := coll.parse(obj); err != nil { + if data, err := obj.parse(); err != nil { log.Println("unable to decode response from", obj.Address.String(), err, "\n", string(obj.Raw)) + } else { + coll.saveResponse(obj.Address, data) } } } -func (coll *Collector) parse(response *Response) (err error) { - +func (res *Response) parse() (*data.ResponseData, error) { // Deflate - deflater := flate.NewReader(bytes.NewReader(response.Raw)) + deflater := flate.NewReader(bytes.NewReader(res.Raw)) defer deflater.Close() // Unmarshal - res := &data.ResponseData{} - if err = json.NewDecoder(deflater).Decode(res); err == nil { - coll.onReceive(response.Address, res) + rdata := &data.ResponseData{} + err := json.NewDecoder(deflater).Decode(rdata) + + return rdata, err +} + +func (coll *Collector) saveResponse(addr net.UDPAddr, res *data.ResponseData) { + // Search for NodeID + var nodeId string + if val := res.NodeInfo; val != nil { + nodeId = val.NodeId + } else if val := res.Neighbours; val != nil { + nodeId = val.NodeId + } else if val := res.Statistics; val != nil { + nodeId = val.NodeId } - return + // Updates nodes if NodeID found + if len(nodeId) != 12 { + log.Printf("invalid NodeID '%s' from %s", nodeId, addr.String()) + return + } + node := coll.nodes.Update(nodeId, res) + + if coll.db != nil && node.Statistics != nil { + coll.db.Add(nodeId, node) + } } func (coll *Collector) receiver() { diff --git a/respond/collector_test.go b/respond/collector_test.go index 7837566..beffc93 100644 --- a/respond/collector_test.go +++ b/respond/collector_test.go @@ -2,37 +2,26 @@ package respond import ( "io/ioutil" - "net" - "reflect" "testing" - "github.com/FreifunkBremen/respond-collector/data" "github.com/stretchr/testify/assert" ) func TestParse(t *testing.T) { assert := assert.New(t) - var decompressed *data.ResponseData - - // callback function - onReceive := func(addr net.UDPAddr, res *data.ResponseData) { - decompressed = res - } - - collector := &Collector{ - msgType: reflect.TypeOf(data.NodeInfo{}), - onReceive: onReceive, - } // read testdata compressed, err := ioutil.ReadFile("testdata/nodeinfo.flated") assert.Nil(err) - collector.parse(&Response{ + res := &Response{ Raw: compressed, - }) + } - assert.NotNil(decompressed) - assert.NotNil(decompressed.NodeInfo) - assert.Equal("f81a67a5e9c1", decompressed.NodeInfo.NodeId) + data, err := res.parse() + + assert.NoError(err) + assert.NotNil(data) + + assert.Equal("f81a67a5e9c1", data.NodeInfo.NodeId) } diff --git a/stats_db.go b/stats_db.go deleted file mode 100644 index 662c35e..0000000 --- a/stats_db.go +++ /dev/null @@ -1,198 +0,0 @@ -package main - -import ( - "log" - "strconv" - "sync" - "time" - - "github.com/FreifunkBremen/respond-collector/data" - "github.com/FreifunkBremen/respond-collector/models" - "github.com/influxdata/influxdb/client/v2" -) - -const ( - batchDuration = time.Second * 5 -) - -type StatsDb struct { - points chan *client.Point - wg sync.WaitGroup - nodes *models.Nodes - client client.Client -} - -func NewStatsDb() *StatsDb { - // Make client - c, err := client.NewHTTPClient(client.HTTPConfig{ - Addr: config.Influxdb.Addr, - Username: config.Influxdb.Username, - Password: config.Influxdb.Password, - }) - - if err != nil { - panic(err) - } - - db := &StatsDb{ - client: c, - points: make(chan *client.Point, 500), - nodes: nodes, - } - - // start worker - db.wg.Add(1) - go db.worker() - - return db -} - -func (c *StatsDb) Add(nodeId string, node *models.Node) { - stats := node.Statistics - - tags := map[string]string{ - "nodeid": nodeId, - } - - fields := map[string]interface{}{ - "load": stats.LoadAverage, - "idletime": int64(stats.Idletime), - "uptime": int64(stats.Uptime), - "processes.running": stats.Processes.Running, - "clients.wifi": stats.Clients.Wifi, - "clients.wifi24": stats.Clients.Wifi24, - "clients.wifi5": stats.Clients.Wifi5, - "clients.total": stats.Clients.Total, - "memory.buffers": stats.Memory.Buffers, - "memory.cached": stats.Memory.Cached, - "memory.free": stats.Memory.Free, - "memory.total": stats.Memory.Total, - } - - if nodeinfo := node.Nodeinfo; nodeinfo != nil { - if owner := nodeinfo.Owner; owner != nil { - tags["owner"] = owner.Contact - } - if wireless := nodeinfo.Wireless; wireless != nil { - fields["wireless.txpower24"] = wireless.TxPower24 - fields["wireless.txpower5"] = wireless.TxPower5 - } - // morpheus needs - tags["hostname"] = nodeinfo.Hostname - } - - if t := stats.Traffic.Rx; t != nil { - fields["traffic.rx.bytes"] = int64(t.Bytes) - fields["traffic.rx.packets"] = t.Packets - } - if t := stats.Traffic.Tx; t != nil { - fields["traffic.tx.bytes"] = int64(t.Bytes) - fields["traffic.tx.packets"] = t.Packets - fields["traffic.tx.dropped"] = t.Dropped - } - if t := stats.Traffic.Forward; t != nil { - fields["traffic.forward.bytes"] = int64(t.Bytes) - fields["traffic.forward.packets"] = t.Packets - } - if t := stats.Traffic.MgmtRx; t != nil { - fields["traffic.mgmt_rx.bytes"] = int64(t.Bytes) - fields["traffic.mgmt_rx.packets"] = t.Packets - } - if t := stats.Traffic.MgmtTx; t != nil { - fields["traffic.mgmt_tx.bytes"] = int64(t.Bytes) - fields["traffic.mgmt_tx.packets"] = t.Packets - } - if w := stats.Wireless; w != nil { - addAirtime := func(suffix string, time *data.WirelessAirtime) { - fields["airtime"+suffix+".chan_util"] = time.ChanUtil - fields["airtime"+suffix+".rx_util"] = time.RxUtil - fields["airtime"+suffix+".tx_util"] = time.TxUtil - fields["airtime"+suffix+".noise"] = time.Noise - fields["airtime"+suffix+".frequency"] = time.Frequency - tags["frequency"+suffix+""] = strconv.Itoa(int(time.Frequency)) - } - - if time := w.Airtime24; time != nil { - addAirtime("24", w.Airtime24) - - } - if time := w.Airtime5; time != nil { - addAirtime("5", w.Airtime5) - } - } - - point, err := client.NewPoint("node", tags, fields, time.Now()) - if err != nil { - panic(err) - } - c.points <- point -} - -func (c *StatsDb) Close() { - close(c.points) - c.wg.Wait() - c.client.Close() -} - -// stores data points in batches into the influxdb -func (c *StatsDb) worker() { - bpConfig := client.BatchPointsConfig{ - Database: config.Influxdb.Database, - Precision: "m", - } - - var bp client.BatchPoints - var err error - var writeNow, closed bool - timer := time.NewTimer(batchDuration) - globalDuration := time.Second * time.Duration(config.Nodes.SaveInterval) - globalTimer := time.NewTimer(globalDuration) - - for !closed { - - // wait for new points - select { - case <-globalTimer.C: - point, err := client.NewPoint("global", nil, nodes.GetStats(), time.Now()) - if err != nil { - panic(err) - } - c.points <- point - globalTimer.Reset(globalDuration) - log.Print("saving global point") - case point, ok := <-c.points: - if ok { - if bp == nil { - // create new batch - timer.Reset(batchDuration) - if bp, err = client.NewBatchPoints(bpConfig); err != nil { - log.Fatal(err) - } - } - bp.AddPoint(point) - } else { - closed = true - } - case <-timer.C: - if bp == nil { - timer.Reset(batchDuration) - } else { - writeNow = true - } - } - - // write batch now? - if bp != nil && (writeNow || closed) { - log.Println("saving", len(bp.Points()), "points") - - if err = c.client.Write(bp); err != nil { - log.Fatal(err) - } - writeNow = false - bp = nil - } - } - globalTimer.Stop() - timer.Stop() - c.wg.Done() -}