From f4650213b84011cba9fa3d1e3d9315a12a830e5d Mon Sep 17 00:00:00 2001 From: Julian K Date: Wed, 27 Sep 2017 13:55:02 +0200 Subject: [PATCH] [TASK] add per-link statistics (#57) ATM: support only influxdb --- cmd/yanic/main.go | 3 +- data/nodeinfo.go | 5 ++ database/all/internal.go | 6 ++ database/database.go | 5 +- database/graphite/database.go | 5 +- database/graphite/link.go | 11 ++++ database/influxdb/database.go | 1 + database/influxdb/link.go | 19 ++++++ database/influxdb/node.go | 28 ++++---- database/influxdb/node_test.go | 117 ++++++++++++++++++++++++++++----- database/logging/file.go | 4 ++ meshviewer/graph.go | 3 +- meshviewer/graph_test.go | 4 +- respond/collector.go | 23 ++++++- runtime/node.go | 9 +++ runtime/nodes.go | 105 ++++++++++++++++++++++------- runtime/nodes_test.go | 10 ++- 17 files changed, 291 insertions(+), 67 deletions(-) create mode 100644 database/graphite/link.go create mode 100644 database/influxdb/link.go diff --git a/cmd/yanic/main.go b/cmd/yanic/main.go index 115cb48..1f90e90 100644 --- a/cmd/yanic/main.go +++ b/cmd/yanic/main.go @@ -43,6 +43,8 @@ func main() { panic(err) } + nodes = runtime.NewNodes(config) + connections, err = all.Connect(config.Database.Connection) if err != nil { panic(err) @@ -55,7 +57,6 @@ func main() { return } - nodes = runtime.NewNodes(config) nodes.Start() meshviewer.Start(config, nodes) diff --git a/data/nodeinfo.go b/data/nodeinfo.go index 8a3b96f..204b044 100644 --- a/data/nodeinfo.go +++ b/data/nodeinfo.go @@ -23,6 +23,11 @@ type BatInterface struct { } `json:"interfaces"` } +// Addresses returns a flat list of all MAC addresses +func (iface *BatInterface) Addresses() []string { + return append(append(iface.Interfaces.Other, iface.Interfaces.Tunnel...), iface.Interfaces.Wireless...) +} + // Network struct type Network struct { Mac string `json:"mac"` diff --git a/database/all/internal.go b/database/all/internal.go index ff1b816..721e09e 100644 --- a/database/all/internal.go +++ b/database/all/internal.go @@ -37,6 +37,12 @@ func (conn *Connection) InsertNode(node *runtime.Node) { } } +func (conn *Connection) InsertLink(link *runtime.Link, time time.Time) { + for _, item := range conn.list { + item.InsertLink(link, time) + } +} + func (conn *Connection) InsertGlobals(stats *runtime.GlobalStats, time time.Time) { for _, item := range conn.list { item.InsertGlobals(stats, time) diff --git a/database/database.go b/database/database.go index 01621e7..3a40edc 100644 --- a/database/database.go +++ b/database/database.go @@ -11,8 +11,11 @@ type Connection interface { // InsertNode stores statistics per node InsertNode(node *runtime.Node) + // InsertLink stores statistics per link + InsertLink(*runtime.Link, time.Time) + // InsertGlobals stores global statistics - InsertGlobals(stats *runtime.GlobalStats, time time.Time) + InsertGlobals(*runtime.GlobalStats, time.Time) // PruneNodes prunes historical per-node data PruneNodes(deleteAfter time.Duration) diff --git a/database/graphite/database.go b/database/graphite/database.go index 2cf8d97..a0bf9b4 100644 --- a/database/graphite/database.go +++ b/database/graphite/database.go @@ -1,10 +1,11 @@ package graphite import ( - "github.com/FreifunkBremen/yanic/database" - "github.com/fgrosse/graphigo" "log" "sync" + + "github.com/FreifunkBremen/yanic/database" + "github.com/fgrosse/graphigo" ) const ( diff --git a/database/graphite/link.go b/database/graphite/link.go new file mode 100644 index 0000000..3b8f97d --- /dev/null +++ b/database/graphite/link.go @@ -0,0 +1,11 @@ +package graphite + +import ( + "time" + + "github.com/FreifunkBremen/yanic/runtime" +) + +// InsertLink stores per link statistics +func (c *Connection) InsertLink(link *runtime.Link, time time.Time) { +} diff --git a/database/influxdb/database.go b/database/influxdb/database.go index ae0c1d4..6239916 100644 --- a/database/influxdb/database.go +++ b/database/influxdb/database.go @@ -12,6 +12,7 @@ import ( ) const ( + MeasurementLink = "link" // Measurement for per-link statistics MeasurementNode = "node" // Measurement for per-node statistics MeasurementGlobal = "global" // Measurement for summarized global statistics CounterMeasurementFirmware = "firmware" // Measurement for firmware statistics diff --git a/database/influxdb/link.go b/database/influxdb/link.go new file mode 100644 index 0000000..3d50468 --- /dev/null +++ b/database/influxdb/link.go @@ -0,0 +1,19 @@ +package influxdb + +import ( + "time" + + "github.com/FreifunkBremen/yanic/runtime" + models "github.com/influxdata/influxdb/models" +) + +// InsertLink adds a link data point +func (conn *Connection) InsertLink(link *runtime.Link, t time.Time) { + tags := models.Tags{} + tags.SetString("source.id", link.SourceID) + tags.SetString("source.mac", link.SourceMAC) + tags.SetString("target.id", link.TargetID) + tags.SetString("target.mac", link.TargetMAC) + + conn.addPoint(MeasurementLink, tags, models.Fields{"tq": float32(link.TQ) / 2.55}, t) +} diff --git a/database/influxdb/node.go b/database/influxdb/node.go index 5f23368..d00f558 100644 --- a/database/influxdb/node.go +++ b/database/influxdb/node.go @@ -11,24 +11,28 @@ import ( "github.com/FreifunkBremen/yanic/runtime" ) -// InsertNode implementation of database -func (conn *Connection) InsertNode(node *runtime.Node) { - tags, fields := buildNodeStats(node) - conn.addPoint(MeasurementNode, tags, fields, time.Now()) -} - +// PruneNodes prunes historical per-node data func (conn *Connection) PruneNodes(deleteAfter time.Duration) { - query := fmt.Sprintf("delete from %s where time < now() - %ds", MeasurementNode, deleteAfter/time.Second) - conn.client.Query(client.NewQuery(query, conn.config.Database(), "m")) + for _, measurement := range []string{MeasurementNode, MeasurementLink} { + query := fmt.Sprintf("delete from %s where time < now() - %ds", measurement, deleteAfter/time.Second) + conn.client.Query(client.NewQuery(query, conn.config.Database(), "m")) + } + } -// returns tags and fields for InfluxDB -func buildNodeStats(node *runtime.Node) (tags models.Tags, fields models.Fields) { +// InsertNode stores statistics and neighbours in the database +func (conn *Connection) InsertNode(node *runtime.Node) { stats := node.Statistics + time := node.Lastseen.GetTime() + if stats == nil || stats.NodeID == "" { + return + } + + tags := models.Tags{} tags.SetString("nodeid", stats.NodeID) - fields = map[string]interface{}{ + fields := models.Fields{ "load": stats.LoadAverage, "time.up": int64(stats.Uptime), "time.idle": int64(stats.Idletime), @@ -123,5 +127,7 @@ func buildNodeStats(node *runtime.Node) (tags models.Tags, fields models.Fields) tags.SetString("frequency"+suffix, strconv.Itoa(int(airtime.Frequency))) } + conn.addPoint(MeasurementNode, tags, fields, time) + return } diff --git a/database/influxdb/node_test.go b/database/influxdb/node_test.go index 3fa7f32..6b594a3 100644 --- a/database/influxdb/node_test.go +++ b/database/influxdb/node_test.go @@ -3,6 +3,7 @@ package influxdb import ( "testing" + "github.com/influxdata/influxdb/client/v2" "github.com/stretchr/testify/assert" "github.com/FreifunkBremen/yanic/data" @@ -14,7 +15,7 @@ func TestToInflux(t *testing.T) { node := &runtime.Node{ Statistics: &data.Statistics{ - NodeID: "foobar", + NodeID: "deadbeef", LoadAverage: 0.5, Wireless: data.WirelessStatistics{ &data.WirelessAirtime{Frequency: 5500}, @@ -46,6 +47,7 @@ func TestToInflux(t *testing.T) { }, }, Nodeinfo: &data.NodeInfo{ + NodeID: "deadbeef", Owner: &data.Owner{ Contact: "nobody", }, @@ -53,12 +55,18 @@ func TestToInflux(t *testing.T) { TxPower24: 3, Channel24: 4, }, + Network: data.Network{ + Mac: "DEADMAC", + }, }, Neighbours: &data.Neighbours{ + NodeID: "deadbeef", Batadv: map[string]data.BatadvNeighbours{ "a-interface": data.BatadvNeighbours{ Neighbours: map[string]data.BatmanLink{ - "b-neigbourinterface": data.BatmanLink{}, + "BAFF1E5": data.BatmanLink{ + Tq: 204, + }, }, }, }, @@ -66,23 +74,96 @@ func TestToInflux(t *testing.T) { }, } - tags, fields := buildNodeStats(node) + neigbour := &runtime.Node{ + Nodeinfo: &data.NodeInfo{ + NodeID: "foobar", + Network: data.Network{ + Mac: "BAFF1E5", + }, + }, + Statistics: &data.Statistics{}, + } - assert.Equal("foobar", tags.GetString("nodeid")) - assert.Equal("nobody", tags.GetString("owner")) - assert.Equal(0.5, fields["load"]) - assert.Equal(0, fields["neighbours.lldp"]) - assert.Equal(1, fields["neighbours.batadv"]) - assert.Equal(1, fields["neighbours.vpn"]) - assert.Equal(1, fields["neighbours.total"]) + points := testPoints(node, neigbour) + var fields map[string]interface{} + var tags map[string]string - assert.Equal(uint32(3), fields["wireless.txpower24"]) - assert.Equal(uint32(5500), fields["airtime11a.frequency"]) - assert.Equal("", tags.GetString("frequency5500")) + assert.Len(points, 2) - assert.Equal(int64(1213), fields["traffic.rx.bytes"]) - assert.Equal(float64(1321), fields["traffic.tx.dropped"]) - assert.Equal(int64(1322), fields["traffic.forward.bytes"]) - assert.Equal(int64(2331), fields["traffic.mgmt_rx.bytes"]) - assert.Equal(float64(2327), fields["traffic.mgmt_tx.packets"]) + // first point contains the neighbour + sPoint := points[0] + tags = sPoint.Tags() + fields, _ = sPoint.Fields() + + assert.EqualValues("deadbeef", tags["nodeid"]) + assert.EqualValues("nobody", tags["owner"]) + assert.EqualValues(0.5, fields["load"]) + assert.EqualValues(0, fields["neighbours.lldp"]) + assert.EqualValues(1, fields["neighbours.batadv"]) + assert.EqualValues(1, fields["neighbours.vpn"]) + assert.EqualValues(1, fields["neighbours.total"]) + + assert.EqualValues(uint32(3), fields["wireless.txpower24"]) + assert.EqualValues(uint32(5500), fields["airtime11a.frequency"]) + assert.EqualValues("", tags["frequency5500"]) + + assert.EqualValues(int64(1213), fields["traffic.rx.bytes"]) + assert.EqualValues(float64(1321), fields["traffic.tx.dropped"]) + assert.EqualValues(int64(1322), fields["traffic.forward.bytes"]) + assert.EqualValues(int64(2331), fields["traffic.mgmt_rx.bytes"]) + assert.EqualValues(float64(2327), fields["traffic.mgmt_tx.packets"]) + + // second point contains the neighbour + nPoint := points[1] + tags = nPoint.Tags() + fields, _ = nPoint.Fields() + assert.EqualValues("link", nPoint.Name()) + assert.EqualValues(map[string]string{ + "source.id": "deadbeef", + "source.mac": "a-interface", + "target.id": "foobar", + "target.mac": "BAFF1E5", + }, tags) + assert.EqualValues(80, fields["tq"]) +} + +// Processes data and returns the InfluxDB points +func testPoints(nodes ...*runtime.Node) (points []*client.Point) { + // Create dummy client + influxClient, err := client.NewHTTPClient(client.HTTPConfig{Addr: "http://127.0.0.1"}) + if err != nil { + panic(err) + } + + nodesList := runtime.NewNodes(&runtime.Config{}) + + // Create dummy connection + conn := &Connection{ + points: make(chan *client.Point), + client: influxClient, + } + + for _, node := range nodes { + nodesList.Update(node.Nodeinfo.NodeID, &data.ResponseData{NodeInfo: node.Nodeinfo}) + } + + // Process data + go func() { + for _, node := range nodes { + conn.InsertNode(node) + if node.Neighbours != nil { + for _, link := range nodesList.NodeLinks(node) { + conn.InsertLink(&link, node.Lastseen.GetTime()) + } + } + } + conn.Close() + }() + + // Read points + for point := range conn.points { + points = append(points, point) + } + + return } diff --git a/database/logging/file.go b/database/logging/file.go index 31863b7..97908b1 100644 --- a/database/logging/file.go +++ b/database/logging/file.go @@ -52,6 +52,10 @@ func (conn *Connection) InsertNode(node *runtime.Node) { conn.log("InsertNode: [", node.Statistics.NodeID, "] clients: ", node.Statistics.Clients.Total) } +func (conn *Connection) InsertLink(link *runtime.Link, time time.Time) { + conn.log("InsertLink: ", link) +} + func (conn *Connection) InsertGlobals(stats *runtime.GlobalStats, time time.Time) { conn.log("InsertGlobals: [", time.String(), "] nodes: ", stats.Nodes, ", clients: ", stats.Clients, " models: ", len(stats.Models)) } diff --git a/meshviewer/graph.go b/meshviewer/graph.go index dda0d3a..0b18b2f 100644 --- a/meshviewer/graph.go +++ b/meshviewer/graph.go @@ -73,8 +73,7 @@ func (builder *graphBuilder) readNodes(nodes map[string]*runtime.Node) { // Batman neighbours for _, batinterface := range nodeinfo.Network.Mesh { - interfaces := batinterface.Interfaces - addresses := append(append(interfaces.Other, interfaces.Tunnel...), interfaces.Wireless...) + addresses := batinterface.Addresses() for _, sourceAddress := range addresses { builder.macToID[sourceAddress] = sourceID diff --git a/meshviewer/graph_test.go b/meshviewer/graph_test.go index 39cc9fa..ceb6430 100644 --- a/meshviewer/graph_test.go +++ b/meshviewer/graph_test.go @@ -33,9 +33,7 @@ func TestGenerateGraph(t *testing.T) { func testGetNodesByFile(files ...string) *runtime.Nodes { - nodes := &runtime.Nodes{ - List: make(map[string]*runtime.Node), - } + nodes := runtime.NewNodes(&runtime.Config{}) for _, file := range files { node := testGetNodeByFile(file) diff --git a/respond/collector.go b/respond/collector.go index b5406c1..ee2c568 100644 --- a/respond/collector.go +++ b/respond/collector.go @@ -204,14 +204,31 @@ func (coll *Collector) saveResponse(addr net.UDPAddr, res *data.ResponseData) { return } + // Set fields to nil if nodeID is inconsistent + if res.Statistics != nil && res.Statistics.NodeID != nodeID { + res.Statistics = nil + } + if res.Neighbours != nil && res.Neighbours.NodeID != nodeID { + res.Neighbours = nil + } + if res.NodeInfo != nil && res.NodeInfo.NodeID != nodeID { + res.NodeInfo = nil + } + // Process the data and update IP address node := coll.nodes.Update(nodeID, res) node.Address = addr.IP // Store statistics in database - if coll.db != nil && node.Statistics != nil { - node.Statistics.NodeID = nodeID - coll.db.InsertNode(node) + if db := coll.db; db != nil { + db.InsertNode(node) + + // Store link data + if neighbours := node.Neighbours; neighbours != nil { + for _, link := range coll.nodes.NodeLinks(node) { + db.InsertLink(&link, node.Lastseen.GetTime()) + } + } } } diff --git a/runtime/node.go b/runtime/node.go index cfd0ee7..f30fe1f 100644 --- a/runtime/node.go +++ b/runtime/node.go @@ -18,6 +18,15 @@ type Node struct { Neighbours *data.Neighbours `json:"-"` } +// Link represents a link between two nodes +type Link struct { + SourceID string + SourceMAC string + TargetID string + TargetMAC string + TQ int +} + // IsGateway returns whether the node is a gateway func (node *Node) IsGateway() bool { if info := node.Nodeinfo; info != nil { diff --git a/runtime/nodes.go b/runtime/nodes.go index 012c109..51f27df 100644 --- a/runtime/nodes.go +++ b/runtime/nodes.go @@ -13,16 +13,18 @@ import ( // Nodes struct: cache DB of Node's structs type Nodes struct { - List map[string]*Node `json:"nodes"` // the current nodemap, indexed by node ID - config *Config + List map[string]*Node `json:"nodes"` // the current nodemap, indexed by node ID + ifaceToNodeID map[string]string // mapping from MAC address to NodeID + config *Config sync.RWMutex } // NewNodes create Nodes structs func NewNodes(config *Config) *Nodes { nodes := &Nodes{ - List: make(map[string]*Node), - config: config, + List: make(map[string]*Node), + ifaceToNodeID: make(map[string]string), + config: config, } if config.Nodes.StatePath != "" { @@ -52,28 +54,23 @@ func (nodes *Nodes) Update(nodeID string, res *data.ResponseData) *Node { } nodes.Unlock() + // Update wireless statistics + if statistics := res.Statistics; statistics != nil { + // Update channel utilization if previous statistics are present + if node.Statistics != nil && node.Statistics.Wireless != nil && statistics.Wireless != nil { + statistics.Wireless.SetUtilization(node.Statistics.Wireless) + } + } + + // Update fields node.Lastseen = now node.Online = true + node.Neighbours = res.Neighbours + node.Nodeinfo = res.NodeInfo + node.Statistics = res.Statistics - // Update neighbours - if val := res.Neighbours; val != nil { - node.Neighbours = val - } - - // Update nodeinfo - if val := res.NodeInfo; val != nil { - node.Nodeinfo = val - } - - // Update statistics - if val := res.Statistics; val != nil { - - // Update channel utilization if previous statistics are present - if node.Statistics != nil && node.Statistics.Wireless != nil && val.Wireless != nil { - val.Wireless.SetUtilization(node.Statistics.Wireless) - } - - node.Statistics = val + if node.Nodeinfo != nil { + nodes.readIfaces(node.Nodeinfo) } return node @@ -93,6 +90,33 @@ func (nodes *Nodes) Select(f func(*Node) bool) []*Node { return result } +// NodeLinks returns a list of links to known neighbours +func (nodes *Nodes) NodeLinks(node *Node) (result []Link) { + // Store link data + neighbours := node.Neighbours + if neighbours == nil || neighbours.NodeID == "" { + return + } + + nodes.RLock() + defer nodes.RUnlock() + + for sourceMAC, batadv := range neighbours.Batadv { + for neighbourMAC, link := range batadv.Neighbours { + if neighbourID := nodes.ifaceToNodeID[neighbourMAC]; neighbourID != "" { + result = append(result, Link{ + SourceID: neighbours.NodeID, + SourceMAC: sourceMAC, + TargetID: neighbourID, + TargetMAC: neighbourMAC, + TQ: link.Tq, + }) + } + } + } + return result +} + // Periodically saves the cached DB to json file func (nodes *Nodes) worker() { c := time.Tick(nodes.config.Nodes.SaveInterval.Duration) @@ -132,12 +156,47 @@ func (nodes *Nodes) expire() { } } +// adds the nodes interface addresses to the internal map +func (nodes *Nodes) readIfaces(nodeinfo *data.NodeInfo) { + nodeID := nodeinfo.NodeID + network := nodeinfo.Network + + if nodeID == "" { + log.Println("nodeID missing in nodeinfo") + return + } + nodes.Lock() + defer nodes.Unlock() + + addresses := []string{network.Mac} + + for _, batinterface := range network.Mesh { + addresses = append(addresses, batinterface.Addresses()...) + } + + for _, mac := range addresses { + if oldNodeID, _ := nodes.ifaceToNodeID[mac]; oldNodeID != nodeID { + if oldNodeID != "" { + log.Printf("override nodeID from %s to %s on MAC address %s", oldNodeID, nodeID, mac) + } + nodes.ifaceToNodeID[mac] = nodeID + } + } +} + func (nodes *Nodes) load() { path := nodes.config.Nodes.StatePath if f, err := os.Open(path); err == nil { // transform data to legacy meshviewer if err = json.NewDecoder(f).Decode(nodes); err == nil { log.Println("loaded", len(nodes.List), "nodes") + + for _, node := range nodes.List { + if node.Nodeinfo != nil { + nodes.readIfaces(node.Nodeinfo) + } + } + } else { log.Println("failed to unmarshal nodes:", err) } diff --git a/runtime/nodes_test.go b/runtime/nodes_test.go index b517c69..d8c583e 100644 --- a/runtime/nodes_test.go +++ b/runtime/nodes_test.go @@ -17,8 +17,9 @@ func TestExpire(t *testing.T) { config.Nodes.OfflineAfter.Duration = time.Minute * 10 config.Nodes.PruneAfter.Duration = time.Hour * 24 * 6 nodes := &Nodes{ - config: config, - List: make(map[string]*Node), + config: config, + List: make(map[string]*Node), + ifaceToNodeID: make(map[string]string), } nodes.Update("expire", &data.ResponseData{}) // should expire @@ -63,7 +64,10 @@ func TestLoadAndSave(t *testing.T) { func TestUpdateNodes(t *testing.T) { assert := assert.New(t) - nodes := &Nodes{List: make(map[string]*Node)} + nodes := &Nodes{ + List: make(map[string]*Node), + ifaceToNodeID: make(map[string]string), + } assert.Len(nodes.List, 0) res := &data.ResponseData{