diff --git a/cmd/respond-query/main.go b/cmd/respond-query/main.go index 15382b6..5f45f98 100644 --- a/cmd/respond-query/main.go +++ b/cmd/respond-query/main.go @@ -2,11 +2,13 @@ package main import ( "log" + "net" "os" + "time" + "github.com/FreifunkBremen/respond-collector/models" "github.com/FreifunkBremen/respond-collector/respond" - "time" ) // Usage: respond-query wlp4s0 "[fe80::eade:27ff:dead:beef%wlp4s0]:1001" @@ -19,7 +21,7 @@ func main() { nodes := models.NewNodes(&models.Config{}) collector := respond.NewCollector(nil, nodes, iface) - collector.SendPacket(dstAddress) + collector.SendPacket(net.ParseIP(dstAddress)) time.Sleep(time.Second) diff --git a/models/node.go b/models/node.go index 0e5d4a8..851562f 100644 --- a/models/node.go +++ b/models/node.go @@ -1,15 +1,18 @@ package models import ( + "net" + "strconv" + "github.com/FreifunkBremen/respond-collector/data" "github.com/FreifunkBremen/respond-collector/jsontime" "github.com/FreifunkBremen/respond-collector/meshviewer" imodels "github.com/influxdata/influxdb/models" - "strconv" ) // Node struct type Node struct { + Address net.IP `json:"address"` // the last known IP address Firstseen jsontime.Time `json:"firstseen"` Lastseen jsontime.Time `json:"lastseen"` Flags meshviewer.Flags `json:"flags"` diff --git a/models/nodes.go b/models/nodes.go index 0a504bf..3881d73 100644 --- a/models/nodes.go +++ b/models/nodes.go @@ -33,7 +33,7 @@ func NewNodes(config *Config) *Nodes { return nodes } -//Start all services to manage Nodes +// Start all services to manage Nodes func (nodes *Nodes) Start() { go nodes.worker() } @@ -114,8 +114,8 @@ func (nodes *Nodes) GetNodesV2() *meshviewer.NodesV2 { Version: 2, Timestamp: jsontime.Now(), } - for nodeID := range nodes.List { + for nodeID := range nodes.List { nodeOrigin := nodes.List[nodeID] if nodeOrigin.Statistics == nil { continue @@ -132,6 +132,20 @@ func (nodes *Nodes) GetNodesV2() *meshviewer.NodesV2 { return meshviewerNodes } +// Select selects a list of nodes to be returned +func (nodes *Nodes) Select(f func(*Node) bool) []*Node { + nodes.RLock() + defer nodes.RUnlock() + + result := make([]*Node, 0, len(nodes.List)) + for _, node := range nodes.List { + if f(node) { + result = append(result, node) + } + } + return result +} + // Periodically saves the cached DB to json file func (nodes *Nodes) worker() { c := time.Tick(nodes.config.Nodes.SaveInterval.Duration) diff --git a/respond/collector.go b/respond/collector.go index 1fcf0e0..f8a22c6 100644 --- a/respond/collector.go +++ b/respond/collector.go @@ -10,18 +10,19 @@ import ( "github.com/FreifunkBremen/respond-collector/data" "github.com/FreifunkBremen/respond-collector/database" + "github.com/FreifunkBremen/respond-collector/jsontime" "github.com/FreifunkBremen/respond-collector/models" ) -//Collector for a specificle respond messages +// Collector for a specificle respond messages type Collector struct { - connection *net.UDPConn // UDP socket - queue chan *Response // received responses - multicastAddr string - db *database.DB - nodes *models.Nodes - interval time.Duration // Interval for multicast packets - stop chan interface{} + connection *net.UDPConn // UDP socket + queue chan *Response // received responses + iface string + db *database.DB + nodes *models.Nodes + interval time.Duration // Interval for multicast packets + stop chan interface{} } // NewCollector creates a Collector struct @@ -40,12 +41,12 @@ func NewCollector(db *database.DB, nodes *models.Nodes, iface string) *Collector conn.SetReadBuffer(maxDataGramSize) collector := &Collector{ - connection: conn, - db: db, - nodes: nodes, - multicastAddr: net.JoinHostPort(multiCastGroup+"%"+iface, port), - queue: make(chan *Response, 400), - stop: make(chan interface{}), + connection: conn, + db: db, + nodes: nodes, + iface: iface, + queue: make(chan *Response, 400), + stop: make(chan interface{}), } go collector.receiver() @@ -82,17 +83,45 @@ func (coll *Collector) Close() { } func (coll *Collector) sendOnce() { - coll.SendPacket(coll.multicastAddr) + now := jsontime.Now() + coll.sendMulticast() + + // Wait for the multicast responses to be processed and send unicasts + time.Sleep(coll.interval / 2) + coll.sendUnicasts(now) } -// SendPacket send a UDP request to the given unicast or multicast address -func (coll *Collector) SendPacket(address string) { - addr, err := net.ResolveUDPAddr("udp", address) - if err != nil { - log.Panic(err) +func (coll *Collector) sendMulticast() { + log.Println("sending multicast") + coll.SendPacket(net.ParseIP(multiCastGroup)) +} + +// Send unicast packets to nodes that did not answer the multicast +func (coll *Collector) sendUnicasts(seenBefore jsontime.Time) { + seenAfter := seenBefore.Add(-time.Minute * 10) + + // Select online nodes that has not been seen recently + nodes := coll.nodes.Select(func(n *models.Node) bool { + return n.Lastseen.After(seenAfter) && n.Lastseen.Before(seenBefore) && n.Address != nil + }) + + // Send unicast packets + log.Printf("sending unicast to %d nodes", len(nodes)) + for _, node := range nodes { + coll.SendPacket(node.Address) + time.Sleep(10 * time.Millisecond) + } +} + +// SendPacket sends a UDP request to the given unicast or multicast address +func (coll *Collector) SendPacket(address net.IP) { + addr := net.UDPAddr{ + IP: address, + Port: port, + Zone: coll.iface, } - if _, err := coll.connection.WriteToUDP([]byte("GET nodeinfo statistics neighbours"), addr); err != nil { + if _, err := coll.connection.WriteToUDP([]byte("GET nodeinfo statistics neighbours"), &addr); err != nil { log.Println("WriteToUDP failed:", err) } } @@ -151,8 +180,9 @@ func (coll *Collector) saveResponse(addr net.UDPAddr, res *data.ResponseData) { return } - // Process the data + // Process the data and update IP address node := coll.nodes.Update(nodeID, res) + node.Address = addr.IP // Store statistics in InfluxDB if coll.db != nil && node.Statistics != nil { diff --git a/respond/respond.go b/respond/respond.go index fd360a4..a7987b8 100644 --- a/respond/respond.go +++ b/respond/respond.go @@ -9,7 +9,7 @@ const ( multiCastGroup = "ff02:0:0:0:0:0:2:1001" // default udp port used by announced - port = "1001" + port = 1001 // maximum receivable size maxDataGramSize = 8192