[TASK] add per-link statistics (#57)

ATM: support only influxdb
This commit is contained in:
Julian K 2017-09-27 13:55:02 +02:00 committed by Geno
parent b5a694a7a4
commit f4650213b8
17 changed files with 291 additions and 67 deletions

View File

@ -43,6 +43,8 @@ func main() {
panic(err) panic(err)
} }
nodes = runtime.NewNodes(config)
connections, err = all.Connect(config.Database.Connection) connections, err = all.Connect(config.Database.Connection)
if err != nil { if err != nil {
panic(err) panic(err)
@ -55,7 +57,6 @@ func main() {
return return
} }
nodes = runtime.NewNodes(config)
nodes.Start() nodes.Start()
meshviewer.Start(config, nodes) meshviewer.Start(config, nodes)

View File

@ -23,6 +23,11 @@ type BatInterface struct {
} `json:"interfaces"` } `json:"interfaces"`
} }
// Addresses returns a flat list of all MAC addresses
func (iface *BatInterface) Addresses() []string {
return append(append(iface.Interfaces.Other, iface.Interfaces.Tunnel...), iface.Interfaces.Wireless...)
}
// Network struct // Network struct
type Network struct { type Network struct {
Mac string `json:"mac"` Mac string `json:"mac"`

View File

@ -37,6 +37,12 @@ func (conn *Connection) InsertNode(node *runtime.Node) {
} }
} }
func (conn *Connection) InsertLink(link *runtime.Link, time time.Time) {
for _, item := range conn.list {
item.InsertLink(link, time)
}
}
func (conn *Connection) InsertGlobals(stats *runtime.GlobalStats, time time.Time) { func (conn *Connection) InsertGlobals(stats *runtime.GlobalStats, time time.Time) {
for _, item := range conn.list { for _, item := range conn.list {
item.InsertGlobals(stats, time) item.InsertGlobals(stats, time)

View File

@ -11,8 +11,11 @@ type Connection interface {
// InsertNode stores statistics per node // InsertNode stores statistics per node
InsertNode(node *runtime.Node) InsertNode(node *runtime.Node)
// InsertLink stores statistics per link
InsertLink(*runtime.Link, time.Time)
// InsertGlobals stores global statistics // InsertGlobals stores global statistics
InsertGlobals(stats *runtime.GlobalStats, time time.Time) InsertGlobals(*runtime.GlobalStats, time.Time)
// PruneNodes prunes historical per-node data // PruneNodes prunes historical per-node data
PruneNodes(deleteAfter time.Duration) PruneNodes(deleteAfter time.Duration)

View File

@ -1,10 +1,11 @@
package graphite package graphite
import ( import (
"github.com/FreifunkBremen/yanic/database"
"github.com/fgrosse/graphigo"
"log" "log"
"sync" "sync"
"github.com/FreifunkBremen/yanic/database"
"github.com/fgrosse/graphigo"
) )
const ( const (

11
database/graphite/link.go Normal file
View File

@ -0,0 +1,11 @@
package graphite
import (
"time"
"github.com/FreifunkBremen/yanic/runtime"
)
// InsertLink stores per link statistics
func (c *Connection) InsertLink(link *runtime.Link, time time.Time) {
}

View File

@ -12,6 +12,7 @@ import (
) )
const ( const (
MeasurementLink = "link" // Measurement for per-link statistics
MeasurementNode = "node" // Measurement for per-node statistics MeasurementNode = "node" // Measurement for per-node statistics
MeasurementGlobal = "global" // Measurement for summarized global statistics MeasurementGlobal = "global" // Measurement for summarized global statistics
CounterMeasurementFirmware = "firmware" // Measurement for firmware statistics CounterMeasurementFirmware = "firmware" // Measurement for firmware statistics

19
database/influxdb/link.go Normal file
View File

@ -0,0 +1,19 @@
package influxdb
import (
"time"
"github.com/FreifunkBremen/yanic/runtime"
models "github.com/influxdata/influxdb/models"
)
// InsertLink adds a link data point
func (conn *Connection) InsertLink(link *runtime.Link, t time.Time) {
tags := models.Tags{}
tags.SetString("source.id", link.SourceID)
tags.SetString("source.mac", link.SourceMAC)
tags.SetString("target.id", link.TargetID)
tags.SetString("target.mac", link.TargetMAC)
conn.addPoint(MeasurementLink, tags, models.Fields{"tq": float32(link.TQ) / 2.55}, t)
}

View File

@ -11,24 +11,28 @@ import (
"github.com/FreifunkBremen/yanic/runtime" "github.com/FreifunkBremen/yanic/runtime"
) )
// InsertNode implementation of database // PruneNodes prunes historical per-node data
func (conn *Connection) InsertNode(node *runtime.Node) {
tags, fields := buildNodeStats(node)
conn.addPoint(MeasurementNode, tags, fields, time.Now())
}
func (conn *Connection) PruneNodes(deleteAfter time.Duration) { func (conn *Connection) PruneNodes(deleteAfter time.Duration) {
query := fmt.Sprintf("delete from %s where time < now() - %ds", MeasurementNode, deleteAfter/time.Second) for _, measurement := range []string{MeasurementNode, MeasurementLink} {
conn.client.Query(client.NewQuery(query, conn.config.Database(), "m")) query := fmt.Sprintf("delete from %s where time < now() - %ds", measurement, deleteAfter/time.Second)
conn.client.Query(client.NewQuery(query, conn.config.Database(), "m"))
}
} }
// returns tags and fields for InfluxDB // InsertNode stores statistics and neighbours in the database
func buildNodeStats(node *runtime.Node) (tags models.Tags, fields models.Fields) { func (conn *Connection) InsertNode(node *runtime.Node) {
stats := node.Statistics stats := node.Statistics
time := node.Lastseen.GetTime()
if stats == nil || stats.NodeID == "" {
return
}
tags := models.Tags{}
tags.SetString("nodeid", stats.NodeID) tags.SetString("nodeid", stats.NodeID)
fields = map[string]interface{}{ fields := models.Fields{
"load": stats.LoadAverage, "load": stats.LoadAverage,
"time.up": int64(stats.Uptime), "time.up": int64(stats.Uptime),
"time.idle": int64(stats.Idletime), "time.idle": int64(stats.Idletime),
@ -123,5 +127,7 @@ func buildNodeStats(node *runtime.Node) (tags models.Tags, fields models.Fields)
tags.SetString("frequency"+suffix, strconv.Itoa(int(airtime.Frequency))) tags.SetString("frequency"+suffix, strconv.Itoa(int(airtime.Frequency)))
} }
conn.addPoint(MeasurementNode, tags, fields, time)
return return
} }

View File

@ -3,6 +3,7 @@ package influxdb
import ( import (
"testing" "testing"
"github.com/influxdata/influxdb/client/v2"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/FreifunkBremen/yanic/data" "github.com/FreifunkBremen/yanic/data"
@ -14,7 +15,7 @@ func TestToInflux(t *testing.T) {
node := &runtime.Node{ node := &runtime.Node{
Statistics: &data.Statistics{ Statistics: &data.Statistics{
NodeID: "foobar", NodeID: "deadbeef",
LoadAverage: 0.5, LoadAverage: 0.5,
Wireless: data.WirelessStatistics{ Wireless: data.WirelessStatistics{
&data.WirelessAirtime{Frequency: 5500}, &data.WirelessAirtime{Frequency: 5500},
@ -46,6 +47,7 @@ func TestToInflux(t *testing.T) {
}, },
}, },
Nodeinfo: &data.NodeInfo{ Nodeinfo: &data.NodeInfo{
NodeID: "deadbeef",
Owner: &data.Owner{ Owner: &data.Owner{
Contact: "nobody", Contact: "nobody",
}, },
@ -53,12 +55,18 @@ func TestToInflux(t *testing.T) {
TxPower24: 3, TxPower24: 3,
Channel24: 4, Channel24: 4,
}, },
Network: data.Network{
Mac: "DEADMAC",
},
}, },
Neighbours: &data.Neighbours{ Neighbours: &data.Neighbours{
NodeID: "deadbeef",
Batadv: map[string]data.BatadvNeighbours{ Batadv: map[string]data.BatadvNeighbours{
"a-interface": data.BatadvNeighbours{ "a-interface": data.BatadvNeighbours{
Neighbours: map[string]data.BatmanLink{ Neighbours: map[string]data.BatmanLink{
"b-neigbourinterface": data.BatmanLink{}, "BAFF1E5": data.BatmanLink{
Tq: 204,
},
}, },
}, },
}, },
@ -66,23 +74,96 @@ func TestToInflux(t *testing.T) {
}, },
} }
tags, fields := buildNodeStats(node) neigbour := &runtime.Node{
Nodeinfo: &data.NodeInfo{
NodeID: "foobar",
Network: data.Network{
Mac: "BAFF1E5",
},
},
Statistics: &data.Statistics{},
}
assert.Equal("foobar", tags.GetString("nodeid")) points := testPoints(node, neigbour)
assert.Equal("nobody", tags.GetString("owner")) var fields map[string]interface{}
assert.Equal(0.5, fields["load"]) var tags map[string]string
assert.Equal(0, fields["neighbours.lldp"])
assert.Equal(1, fields["neighbours.batadv"])
assert.Equal(1, fields["neighbours.vpn"])
assert.Equal(1, fields["neighbours.total"])
assert.Equal(uint32(3), fields["wireless.txpower24"]) assert.Len(points, 2)
assert.Equal(uint32(5500), fields["airtime11a.frequency"])
assert.Equal("", tags.GetString("frequency5500"))
assert.Equal(int64(1213), fields["traffic.rx.bytes"]) // first point contains the neighbour
assert.Equal(float64(1321), fields["traffic.tx.dropped"]) sPoint := points[0]
assert.Equal(int64(1322), fields["traffic.forward.bytes"]) tags = sPoint.Tags()
assert.Equal(int64(2331), fields["traffic.mgmt_rx.bytes"]) fields, _ = sPoint.Fields()
assert.Equal(float64(2327), fields["traffic.mgmt_tx.packets"])
assert.EqualValues("deadbeef", tags["nodeid"])
assert.EqualValues("nobody", tags["owner"])
assert.EqualValues(0.5, fields["load"])
assert.EqualValues(0, fields["neighbours.lldp"])
assert.EqualValues(1, fields["neighbours.batadv"])
assert.EqualValues(1, fields["neighbours.vpn"])
assert.EqualValues(1, fields["neighbours.total"])
assert.EqualValues(uint32(3), fields["wireless.txpower24"])
assert.EqualValues(uint32(5500), fields["airtime11a.frequency"])
assert.EqualValues("", tags["frequency5500"])
assert.EqualValues(int64(1213), fields["traffic.rx.bytes"])
assert.EqualValues(float64(1321), fields["traffic.tx.dropped"])
assert.EqualValues(int64(1322), fields["traffic.forward.bytes"])
assert.EqualValues(int64(2331), fields["traffic.mgmt_rx.bytes"])
assert.EqualValues(float64(2327), fields["traffic.mgmt_tx.packets"])
// second point contains the neighbour
nPoint := points[1]
tags = nPoint.Tags()
fields, _ = nPoint.Fields()
assert.EqualValues("link", nPoint.Name())
assert.EqualValues(map[string]string{
"source.id": "deadbeef",
"source.mac": "a-interface",
"target.id": "foobar",
"target.mac": "BAFF1E5",
}, tags)
assert.EqualValues(80, fields["tq"])
}
// Processes data and returns the InfluxDB points
func testPoints(nodes ...*runtime.Node) (points []*client.Point) {
// Create dummy client
influxClient, err := client.NewHTTPClient(client.HTTPConfig{Addr: "http://127.0.0.1"})
if err != nil {
panic(err)
}
nodesList := runtime.NewNodes(&runtime.Config{})
// Create dummy connection
conn := &Connection{
points: make(chan *client.Point),
client: influxClient,
}
for _, node := range nodes {
nodesList.Update(node.Nodeinfo.NodeID, &data.ResponseData{NodeInfo: node.Nodeinfo})
}
// Process data
go func() {
for _, node := range nodes {
conn.InsertNode(node)
if node.Neighbours != nil {
for _, link := range nodesList.NodeLinks(node) {
conn.InsertLink(&link, node.Lastseen.GetTime())
}
}
}
conn.Close()
}()
// Read points
for point := range conn.points {
points = append(points, point)
}
return
} }

View File

@ -52,6 +52,10 @@ func (conn *Connection) InsertNode(node *runtime.Node) {
conn.log("InsertNode: [", node.Statistics.NodeID, "] clients: ", node.Statistics.Clients.Total) conn.log("InsertNode: [", node.Statistics.NodeID, "] clients: ", node.Statistics.Clients.Total)
} }
func (conn *Connection) InsertLink(link *runtime.Link, time time.Time) {
conn.log("InsertLink: ", link)
}
func (conn *Connection) InsertGlobals(stats *runtime.GlobalStats, time time.Time) { func (conn *Connection) InsertGlobals(stats *runtime.GlobalStats, time time.Time) {
conn.log("InsertGlobals: [", time.String(), "] nodes: ", stats.Nodes, ", clients: ", stats.Clients, " models: ", len(stats.Models)) conn.log("InsertGlobals: [", time.String(), "] nodes: ", stats.Nodes, ", clients: ", stats.Clients, " models: ", len(stats.Models))
} }

View File

@ -73,8 +73,7 @@ func (builder *graphBuilder) readNodes(nodes map[string]*runtime.Node) {
// Batman neighbours // Batman neighbours
for _, batinterface := range nodeinfo.Network.Mesh { for _, batinterface := range nodeinfo.Network.Mesh {
interfaces := batinterface.Interfaces addresses := batinterface.Addresses()
addresses := append(append(interfaces.Other, interfaces.Tunnel...), interfaces.Wireless...)
for _, sourceAddress := range addresses { for _, sourceAddress := range addresses {
builder.macToID[sourceAddress] = sourceID builder.macToID[sourceAddress] = sourceID

View File

@ -33,9 +33,7 @@ func TestGenerateGraph(t *testing.T) {
func testGetNodesByFile(files ...string) *runtime.Nodes { func testGetNodesByFile(files ...string) *runtime.Nodes {
nodes := &runtime.Nodes{ nodes := runtime.NewNodes(&runtime.Config{})
List: make(map[string]*runtime.Node),
}
for _, file := range files { for _, file := range files {
node := testGetNodeByFile(file) node := testGetNodeByFile(file)

View File

@ -204,14 +204,31 @@ func (coll *Collector) saveResponse(addr net.UDPAddr, res *data.ResponseData) {
return return
} }
// Set fields to nil if nodeID is inconsistent
if res.Statistics != nil && res.Statistics.NodeID != nodeID {
res.Statistics = nil
}
if res.Neighbours != nil && res.Neighbours.NodeID != nodeID {
res.Neighbours = nil
}
if res.NodeInfo != nil && res.NodeInfo.NodeID != nodeID {
res.NodeInfo = nil
}
// Process the data and update IP address // Process the data and update IP address
node := coll.nodes.Update(nodeID, res) node := coll.nodes.Update(nodeID, res)
node.Address = addr.IP node.Address = addr.IP
// Store statistics in database // Store statistics in database
if coll.db != nil && node.Statistics != nil { if db := coll.db; db != nil {
node.Statistics.NodeID = nodeID db.InsertNode(node)
coll.db.InsertNode(node)
// Store link data
if neighbours := node.Neighbours; neighbours != nil {
for _, link := range coll.nodes.NodeLinks(node) {
db.InsertLink(&link, node.Lastseen.GetTime())
}
}
} }
} }

View File

@ -18,6 +18,15 @@ type Node struct {
Neighbours *data.Neighbours `json:"-"` Neighbours *data.Neighbours `json:"-"`
} }
// Link represents a link between two nodes
type Link struct {
SourceID string
SourceMAC string
TargetID string
TargetMAC string
TQ int
}
// IsGateway returns whether the node is a gateway // IsGateway returns whether the node is a gateway
func (node *Node) IsGateway() bool { func (node *Node) IsGateway() bool {
if info := node.Nodeinfo; info != nil { if info := node.Nodeinfo; info != nil {

View File

@ -13,16 +13,18 @@ import (
// Nodes struct: cache DB of Node's structs // Nodes struct: cache DB of Node's structs
type Nodes struct { type Nodes struct {
List map[string]*Node `json:"nodes"` // the current nodemap, indexed by node ID List map[string]*Node `json:"nodes"` // the current nodemap, indexed by node ID
config *Config ifaceToNodeID map[string]string // mapping from MAC address to NodeID
config *Config
sync.RWMutex sync.RWMutex
} }
// NewNodes create Nodes structs // NewNodes create Nodes structs
func NewNodes(config *Config) *Nodes { func NewNodes(config *Config) *Nodes {
nodes := &Nodes{ nodes := &Nodes{
List: make(map[string]*Node), List: make(map[string]*Node),
config: config, ifaceToNodeID: make(map[string]string),
config: config,
} }
if config.Nodes.StatePath != "" { if config.Nodes.StatePath != "" {
@ -52,28 +54,23 @@ func (nodes *Nodes) Update(nodeID string, res *data.ResponseData) *Node {
} }
nodes.Unlock() nodes.Unlock()
// Update wireless statistics
if statistics := res.Statistics; statistics != nil {
// Update channel utilization if previous statistics are present
if node.Statistics != nil && node.Statistics.Wireless != nil && statistics.Wireless != nil {
statistics.Wireless.SetUtilization(node.Statistics.Wireless)
}
}
// Update fields
node.Lastseen = now node.Lastseen = now
node.Online = true node.Online = true
node.Neighbours = res.Neighbours
node.Nodeinfo = res.NodeInfo
node.Statistics = res.Statistics
// Update neighbours if node.Nodeinfo != nil {
if val := res.Neighbours; val != nil { nodes.readIfaces(node.Nodeinfo)
node.Neighbours = val
}
// Update nodeinfo
if val := res.NodeInfo; val != nil {
node.Nodeinfo = val
}
// Update statistics
if val := res.Statistics; val != nil {
// Update channel utilization if previous statistics are present
if node.Statistics != nil && node.Statistics.Wireless != nil && val.Wireless != nil {
val.Wireless.SetUtilization(node.Statistics.Wireless)
}
node.Statistics = val
} }
return node return node
@ -93,6 +90,33 @@ func (nodes *Nodes) Select(f func(*Node) bool) []*Node {
return result return result
} }
// NodeLinks returns a list of links to known neighbours
func (nodes *Nodes) NodeLinks(node *Node) (result []Link) {
// Store link data
neighbours := node.Neighbours
if neighbours == nil || neighbours.NodeID == "" {
return
}
nodes.RLock()
defer nodes.RUnlock()
for sourceMAC, batadv := range neighbours.Batadv {
for neighbourMAC, link := range batadv.Neighbours {
if neighbourID := nodes.ifaceToNodeID[neighbourMAC]; neighbourID != "" {
result = append(result, Link{
SourceID: neighbours.NodeID,
SourceMAC: sourceMAC,
TargetID: neighbourID,
TargetMAC: neighbourMAC,
TQ: link.Tq,
})
}
}
}
return result
}
// Periodically saves the cached DB to json file // Periodically saves the cached DB to json file
func (nodes *Nodes) worker() { func (nodes *Nodes) worker() {
c := time.Tick(nodes.config.Nodes.SaveInterval.Duration) c := time.Tick(nodes.config.Nodes.SaveInterval.Duration)
@ -132,12 +156,47 @@ func (nodes *Nodes) expire() {
} }
} }
// adds the nodes interface addresses to the internal map
func (nodes *Nodes) readIfaces(nodeinfo *data.NodeInfo) {
nodeID := nodeinfo.NodeID
network := nodeinfo.Network
if nodeID == "" {
log.Println("nodeID missing in nodeinfo")
return
}
nodes.Lock()
defer nodes.Unlock()
addresses := []string{network.Mac}
for _, batinterface := range network.Mesh {
addresses = append(addresses, batinterface.Addresses()...)
}
for _, mac := range addresses {
if oldNodeID, _ := nodes.ifaceToNodeID[mac]; oldNodeID != nodeID {
if oldNodeID != "" {
log.Printf("override nodeID from %s to %s on MAC address %s", oldNodeID, nodeID, mac)
}
nodes.ifaceToNodeID[mac] = nodeID
}
}
}
func (nodes *Nodes) load() { func (nodes *Nodes) load() {
path := nodes.config.Nodes.StatePath path := nodes.config.Nodes.StatePath
if f, err := os.Open(path); err == nil { // transform data to legacy meshviewer if f, err := os.Open(path); err == nil { // transform data to legacy meshviewer
if err = json.NewDecoder(f).Decode(nodes); err == nil { if err = json.NewDecoder(f).Decode(nodes); err == nil {
log.Println("loaded", len(nodes.List), "nodes") log.Println("loaded", len(nodes.List), "nodes")
for _, node := range nodes.List {
if node.Nodeinfo != nil {
nodes.readIfaces(node.Nodeinfo)
}
}
} else { } else {
log.Println("failed to unmarshal nodes:", err) log.Println("failed to unmarshal nodes:", err)
} }

View File

@ -17,8 +17,9 @@ func TestExpire(t *testing.T) {
config.Nodes.OfflineAfter.Duration = time.Minute * 10 config.Nodes.OfflineAfter.Duration = time.Minute * 10
config.Nodes.PruneAfter.Duration = time.Hour * 24 * 6 config.Nodes.PruneAfter.Duration = time.Hour * 24 * 6
nodes := &Nodes{ nodes := &Nodes{
config: config, config: config,
List: make(map[string]*Node), List: make(map[string]*Node),
ifaceToNodeID: make(map[string]string),
} }
nodes.Update("expire", &data.ResponseData{}) // should expire nodes.Update("expire", &data.ResponseData{}) // should expire
@ -63,7 +64,10 @@ func TestLoadAndSave(t *testing.T) {
func TestUpdateNodes(t *testing.T) { func TestUpdateNodes(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
nodes := &Nodes{List: make(map[string]*Node)} nodes := &Nodes{
List: make(map[string]*Node),
ifaceToNodeID: make(map[string]string),
}
assert.Len(nodes.List, 0) assert.Len(nodes.List, 0)
res := &data.ResponseData{ res := &data.ResponseData{