From dbc1353154a262398f2f463e8483f8ecf61da45c Mon Sep 17 00:00:00 2001 From: Julian K Date: Wed, 18 Oct 2017 18:22:14 +0200 Subject: [PATCH] Add support for listening on multiple interfaces (#80) closes #79 --- cmd/query.go | 2 +- cmd/serve.go | 2 +- config_example.toml | 2 +- respond/collector.go | 105 ++++++++++++++++++++++++-------------- respond/collector_test.go | 11 ++++ respond/respond.go | 9 ++-- runtime/config.go | 2 +- runtime/config_test.go | 2 +- runtime/node.go | 2 +- 9 files changed, 89 insertions(+), 48 deletions(-) diff --git a/cmd/query.go b/cmd/query.go index 79ffa20..0f20a1e 100644 --- a/cmd/query.go +++ b/cmd/query.go @@ -26,7 +26,7 @@ var queryCmd = &cobra.Command{ nodes := runtime.NewNodes(&runtime.Config{}) - collector := respond.NewCollector(nil, nodes, iface, 0) + collector := respond.NewCollector(nil, nodes, []string{iface}, 0) defer collector.Close() collector.SendPacket(dstAddress) diff --git a/cmd/serve.go b/cmd/serve.go index 42a6a9b..82341ae 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -51,7 +51,7 @@ var serveCmd = &cobra.Command{ time.Sleep(delay) } - collector = respond.NewCollector(connections, nodes, config.Respondd.Interface, config.Respondd.Port) + collector = respond.NewCollector(connections, nodes, config.Respondd.Interfaces, config.Respondd.Port) collector.Start(config.Respondd.CollectInterval.Duration) defer collector.Close() } diff --git a/config_example.toml b/config_example.toml index 931fb84..a31d5d0 100644 --- a/config_example.toml +++ b/config_example.toml @@ -6,7 +6,7 @@ synchronize = "1m" # how oftern request per multicast collect_interval = "1m" # on which interface -interface = "eth0" +interfaces = ["eth0"] # define a port to listen # (no or 0 would choose at port at his own) #port = 10001 diff --git a/respond/collector.go b/respond/collector.go index ee2c568..982d002 100644 --- a/respond/collector.go +++ b/respond/collector.go @@ -17,17 +17,44 @@ import ( // Collector for a specificle respond messages type Collector struct { - connection *net.UDPConn // UDP socket - queue chan *Response // received responses - iface string - db database.Connection - nodes *runtime.Nodes - interval time.Duration // Interval for multicast packets - stop chan interface{} + connections []*net.UDPConn // UDP sockets + ifaceToConn map[string]*net.UDPConn // map from interface name to UDP socket + + queue chan *Response // received responses + db database.Connection + nodes *runtime.Nodes + interval time.Duration // Interval for multicast packets + stop chan interface{} } // NewCollector creates a Collector struct -func NewCollector(db database.Connection, nodes *runtime.Nodes, iface string, port int) *Collector { +func NewCollector(db database.Connection, nodes *runtime.Nodes, ifaces []string, port int) *Collector { + + coll := &Collector{ + db: db, + nodes: nodes, + queue: make(chan *Response, 400), + stop: make(chan interface{}), + ifaceToConn: make(map[string]*net.UDPConn), + } + + for _, iface := range ifaces { + coll.listenUDP(iface) + } + + go coll.parser() + + if coll.db != nil { + go coll.globalStatsWorker() + } + + return coll +} + +func (coll *Collector) listenUDP(iface string) { + if _, found := coll.ifaceToConn[iface]; found { + log.Panicf("can not listen twice on %s", iface) + } linkLocalAddr, err := getLinkLocalAddr(iface) if err != nil { log.Panic(err) @@ -44,23 +71,11 @@ func NewCollector(db database.Connection, nodes *runtime.Nodes, iface string, po } conn.SetReadBuffer(maxDataGramSize) - collector := &Collector{ - connection: conn, - db: db, - nodes: nodes, - iface: iface, - queue: make(chan *Response, 400), - stop: make(chan interface{}), - } + coll.ifaceToConn[iface] = conn + coll.connections = append(coll.connections, conn) - go collector.receiver() - go collector.parser() - - if collector.db != nil { - go collector.globalStatsWorker() - } - - return collector + // Start receiver + go coll.receiver(conn) } // Returns the first link local unicast address for the given interface name @@ -102,7 +117,9 @@ func (coll *Collector) Start(interval time.Duration) { // Close Collector func (coll *Collector) Close() { close(coll.stop) - coll.connection.Close() + for _, conn := range coll.connections { + conn.Close() + } close(coll.queue) } @@ -116,8 +133,10 @@ func (coll *Collector) sendOnce() { } func (coll *Collector) sendMulticast() { - log.Println("sending multicast") - coll.SendPacket(net.ParseIP(multiCastGroup)) + log.Println("sending multicasts") + for _, conn := range coll.connections { + coll.sendPacket(conn, multiCastGroup) + } } // Send unicast packets to nodes that did not answer the multicast @@ -132,20 +151,30 @@ func (coll *Collector) sendUnicasts(seenBefore jsontime.Time) { // Send unicast packets log.Printf("sending unicast to %d nodes", len(nodes)) for _, node := range nodes { - coll.SendPacket(node.Address) + conn := coll.ifaceToConn[node.Address.Zone] + if conn == nil { + log.Printf("unable to find connection for %s", node.Address.Zone) + continue + } + coll.sendPacket(conn, node.Address.IP) time.Sleep(10 * time.Millisecond) } } -// SendPacket sends a UDP request to the given unicast or multicast address -func (coll *Collector) SendPacket(address net.IP) { +// SendPacket sends a UDP request to the given unicast or multicast address on the first UDP socket +func (coll *Collector) SendPacket(destination net.IP) { + coll.sendPacket(coll.connections[0], destination) +} + +// sendPacket sends a UDP request to the given unicast or multicast address on the given UDP socket +func (coll *Collector) sendPacket(conn *net.UDPConn, destination net.IP) { addr := net.UDPAddr{ - IP: address, + IP: destination, Port: port, - Zone: coll.iface, + Zone: conn.LocalAddr().(*net.UDPAddr).Zone, } - if _, err := coll.connection.WriteToUDP([]byte("GET nodeinfo statistics neighbours"), &addr); err != nil { + if _, err := conn.WriteToUDP([]byte("GET nodeinfo statistics neighbours"), &addr); err != nil { log.Println("WriteToUDP failed:", err) } } @@ -187,7 +216,7 @@ func (res *Response) parse() (*data.ResponseData, error) { return rdata, err } -func (coll *Collector) saveResponse(addr net.UDPAddr, res *data.ResponseData) { +func (coll *Collector) saveResponse(addr *net.UDPAddr, res *data.ResponseData) { // Search for NodeID var nodeID string if val := res.NodeInfo; val != nil { @@ -217,7 +246,7 @@ func (coll *Collector) saveResponse(addr net.UDPAddr, res *data.ResponseData) { // Process the data and update IP address node := coll.nodes.Update(nodeID, res) - node.Address = addr.IP + node.Address = addr // Store statistics in database if db := coll.db; db != nil { @@ -232,10 +261,10 @@ func (coll *Collector) saveResponse(addr net.UDPAddr, res *data.ResponseData) { } } -func (coll *Collector) receiver() { +func (coll *Collector) receiver(conn *net.UDPConn) { buf := make([]byte, maxDataGramSize) for { - n, src, err := coll.connection.ReadFromUDP(buf) + n, src, err := conn.ReadFromUDP(buf) if err != nil { log.Println("ReadFromUDP failed:", err) @@ -246,7 +275,7 @@ func (coll *Collector) receiver() { copy(raw, buf) coll.queue <- &Response{ - Address: *src, + Address: src, Raw: raw, } } diff --git a/respond/collector_test.go b/respond/collector_test.go index f95f13d..e73b835 100644 --- a/respond/collector_test.go +++ b/respond/collector_test.go @@ -3,10 +3,21 @@ package respond import ( "io/ioutil" "testing" + "time" + "github.com/FreifunkBremen/yanic/runtime" "github.com/stretchr/testify/assert" ) +func TestCollector(t *testing.T) { + nodes := runtime.NewNodes(&runtime.Config{}) + + collector := NewCollector(nil, nodes, []string{}, 10001) + collector.Start(time.Millisecond) + time.Sleep(time.Millisecond * 10) + collector.Close() +} + func TestParse(t *testing.T) { assert := assert.New(t) diff --git a/respond/respond.go b/respond/respond.go index a7987b8..4ad3aa2 100644 --- a/respond/respond.go +++ b/respond/respond.go @@ -4,9 +4,10 @@ import ( "net" ) +// default multicast group used by announced +var multiCastGroup = net.ParseIP("ff02:0:0:0:0:0:2:1001") + const ( - // default multicast group used by announced - multiCastGroup = "ff02:0:0:0:0:0:2:1001" // default udp port used by announced port = 1001 @@ -15,8 +16,8 @@ const ( maxDataGramSize = 8192 ) -//Response of the respond request +// Response of the respond request type Response struct { - Address net.UDPAddr + Address *net.UDPAddr Raw []byte } diff --git a/runtime/config.go b/runtime/config.go index 4e446fc..45c258f 100644 --- a/runtime/config.go +++ b/runtime/config.go @@ -11,7 +11,7 @@ type Config struct { Respondd struct { Enable bool `toml:"enable"` Synchronize Duration `toml:"synchronize"` - Interface string `toml:"interface"` + Interfaces []string `toml:"interfaces"` Port int `toml:"port"` CollectInterval Duration `toml:"collect_interval"` } diff --git a/runtime/config_test.go b/runtime/config_test.go index 9183b7c..9450186 100644 --- a/runtime/config_test.go +++ b/runtime/config_test.go @@ -15,7 +15,7 @@ func TestReadConfig(t *testing.T) { assert.NotNil(config) assert.True(config.Respondd.Enable) - assert.Equal("eth0", config.Respondd.Interface) + assert.Equal([]string{"eth0"}, config.Respondd.Interfaces) assert.Equal(time.Minute, config.Respondd.CollectInterval.Duration) assert.Equal(2, config.Meshviewer.Version) diff --git a/runtime/node.go b/runtime/node.go index f30fe1f..76c45d0 100644 --- a/runtime/node.go +++ b/runtime/node.go @@ -9,7 +9,7 @@ import ( // Node struct type Node struct { - Address net.IP `json:"address"` // the last known IP address + Address *net.UDPAddr `json:"-"` // the last known address Firstseen jsontime.Time `json:"firstseen"` Lastseen jsontime.Time `json:"lastseen"` Online bool `json:"online"`