Add support for listening on multiple interfaces (#80)

closes #79
This commit is contained in:
Julian K 2017-10-18 18:22:14 +02:00 committed by GitHub
parent 4de49c6d16
commit dbc1353154
9 changed files with 89 additions and 48 deletions

View File

@ -26,7 +26,7 @@ var queryCmd = &cobra.Command{
nodes := runtime.NewNodes(&runtime.Config{})
collector := respond.NewCollector(nil, nodes, iface, 0)
collector := respond.NewCollector(nil, nodes, []string{iface}, 0)
defer collector.Close()
collector.SendPacket(dstAddress)

View File

@ -51,7 +51,7 @@ var serveCmd = &cobra.Command{
time.Sleep(delay)
}
collector = respond.NewCollector(connections, nodes, config.Respondd.Interface, config.Respondd.Port)
collector = respond.NewCollector(connections, nodes, config.Respondd.Interfaces, config.Respondd.Port)
collector.Start(config.Respondd.CollectInterval.Duration)
defer collector.Close()
}

View File

@ -6,7 +6,7 @@ synchronize = "1m"
# how oftern request per multicast
collect_interval = "1m"
# on which interface
interface = "eth0"
interfaces = ["eth0"]
# define a port to listen
# (no or 0 would choose at port at his own)
#port = 10001

View File

@ -17,17 +17,44 @@ import (
// Collector for a specificle respond messages
type Collector struct {
connection *net.UDPConn // UDP socket
queue chan *Response // received responses
iface string
db database.Connection
nodes *runtime.Nodes
interval time.Duration // Interval for multicast packets
stop chan interface{}
connections []*net.UDPConn // UDP sockets
ifaceToConn map[string]*net.UDPConn // map from interface name to UDP socket
queue chan *Response // received responses
db database.Connection
nodes *runtime.Nodes
interval time.Duration // Interval for multicast packets
stop chan interface{}
}
// NewCollector creates a Collector struct
func NewCollector(db database.Connection, nodes *runtime.Nodes, iface string, port int) *Collector {
func NewCollector(db database.Connection, nodes *runtime.Nodes, ifaces []string, port int) *Collector {
coll := &Collector{
db: db,
nodes: nodes,
queue: make(chan *Response, 400),
stop: make(chan interface{}),
ifaceToConn: make(map[string]*net.UDPConn),
}
for _, iface := range ifaces {
coll.listenUDP(iface)
}
go coll.parser()
if coll.db != nil {
go coll.globalStatsWorker()
}
return coll
}
func (coll *Collector) listenUDP(iface string) {
if _, found := coll.ifaceToConn[iface]; found {
log.Panicf("can not listen twice on %s", iface)
}
linkLocalAddr, err := getLinkLocalAddr(iface)
if err != nil {
log.Panic(err)
@ -44,23 +71,11 @@ func NewCollector(db database.Connection, nodes *runtime.Nodes, iface string, po
}
conn.SetReadBuffer(maxDataGramSize)
collector := &Collector{
connection: conn,
db: db,
nodes: nodes,
iface: iface,
queue: make(chan *Response, 400),
stop: make(chan interface{}),
}
coll.ifaceToConn[iface] = conn
coll.connections = append(coll.connections, conn)
go collector.receiver()
go collector.parser()
if collector.db != nil {
go collector.globalStatsWorker()
}
return collector
// Start receiver
go coll.receiver(conn)
}
// Returns the first link local unicast address for the given interface name
@ -102,7 +117,9 @@ func (coll *Collector) Start(interval time.Duration) {
// Close Collector
func (coll *Collector) Close() {
close(coll.stop)
coll.connection.Close()
for _, conn := range coll.connections {
conn.Close()
}
close(coll.queue)
}
@ -116,8 +133,10 @@ func (coll *Collector) sendOnce() {
}
func (coll *Collector) sendMulticast() {
log.Println("sending multicast")
coll.SendPacket(net.ParseIP(multiCastGroup))
log.Println("sending multicasts")
for _, conn := range coll.connections {
coll.sendPacket(conn, multiCastGroup)
}
}
// Send unicast packets to nodes that did not answer the multicast
@ -132,20 +151,30 @@ func (coll *Collector) sendUnicasts(seenBefore jsontime.Time) {
// Send unicast packets
log.Printf("sending unicast to %d nodes", len(nodes))
for _, node := range nodes {
coll.SendPacket(node.Address)
conn := coll.ifaceToConn[node.Address.Zone]
if conn == nil {
log.Printf("unable to find connection for %s", node.Address.Zone)
continue
}
coll.sendPacket(conn, node.Address.IP)
time.Sleep(10 * time.Millisecond)
}
}
// SendPacket sends a UDP request to the given unicast or multicast address
func (coll *Collector) SendPacket(address net.IP) {
// SendPacket sends a UDP request to the given unicast or multicast address on the first UDP socket
func (coll *Collector) SendPacket(destination net.IP) {
coll.sendPacket(coll.connections[0], destination)
}
// sendPacket sends a UDP request to the given unicast or multicast address on the given UDP socket
func (coll *Collector) sendPacket(conn *net.UDPConn, destination net.IP) {
addr := net.UDPAddr{
IP: address,
IP: destination,
Port: port,
Zone: coll.iface,
Zone: conn.LocalAddr().(*net.UDPAddr).Zone,
}
if _, err := coll.connection.WriteToUDP([]byte("GET nodeinfo statistics neighbours"), &addr); err != nil {
if _, err := conn.WriteToUDP([]byte("GET nodeinfo statistics neighbours"), &addr); err != nil {
log.Println("WriteToUDP failed:", err)
}
}
@ -187,7 +216,7 @@ func (res *Response) parse() (*data.ResponseData, error) {
return rdata, err
}
func (coll *Collector) saveResponse(addr net.UDPAddr, res *data.ResponseData) {
func (coll *Collector) saveResponse(addr *net.UDPAddr, res *data.ResponseData) {
// Search for NodeID
var nodeID string
if val := res.NodeInfo; val != nil {
@ -217,7 +246,7 @@ func (coll *Collector) saveResponse(addr net.UDPAddr, res *data.ResponseData) {
// Process the data and update IP address
node := coll.nodes.Update(nodeID, res)
node.Address = addr.IP
node.Address = addr
// Store statistics in database
if db := coll.db; db != nil {
@ -232,10 +261,10 @@ func (coll *Collector) saveResponse(addr net.UDPAddr, res *data.ResponseData) {
}
}
func (coll *Collector) receiver() {
func (coll *Collector) receiver(conn *net.UDPConn) {
buf := make([]byte, maxDataGramSize)
for {
n, src, err := coll.connection.ReadFromUDP(buf)
n, src, err := conn.ReadFromUDP(buf)
if err != nil {
log.Println("ReadFromUDP failed:", err)
@ -246,7 +275,7 @@ func (coll *Collector) receiver() {
copy(raw, buf)
coll.queue <- &Response{
Address: *src,
Address: src,
Raw: raw,
}
}

View File

@ -3,10 +3,21 @@ package respond
import (
"io/ioutil"
"testing"
"time"
"github.com/FreifunkBremen/yanic/runtime"
"github.com/stretchr/testify/assert"
)
func TestCollector(t *testing.T) {
nodes := runtime.NewNodes(&runtime.Config{})
collector := NewCollector(nil, nodes, []string{}, 10001)
collector.Start(time.Millisecond)
time.Sleep(time.Millisecond * 10)
collector.Close()
}
func TestParse(t *testing.T) {
assert := assert.New(t)

View File

@ -4,9 +4,10 @@ import (
"net"
)
// default multicast group used by announced
var multiCastGroup = net.ParseIP("ff02:0:0:0:0:0:2:1001")
const (
// default multicast group used by announced
multiCastGroup = "ff02:0:0:0:0:0:2:1001"
// default udp port used by announced
port = 1001
@ -15,8 +16,8 @@ const (
maxDataGramSize = 8192
)
//Response of the respond request
// Response of the respond request
type Response struct {
Address net.UDPAddr
Address *net.UDPAddr
Raw []byte
}

View File

@ -11,7 +11,7 @@ type Config struct {
Respondd struct {
Enable bool `toml:"enable"`
Synchronize Duration `toml:"synchronize"`
Interface string `toml:"interface"`
Interfaces []string `toml:"interfaces"`
Port int `toml:"port"`
CollectInterval Duration `toml:"collect_interval"`
}

View File

@ -15,7 +15,7 @@ func TestReadConfig(t *testing.T) {
assert.NotNil(config)
assert.True(config.Respondd.Enable)
assert.Equal("eth0", config.Respondd.Interface)
assert.Equal([]string{"eth0"}, config.Respondd.Interfaces)
assert.Equal(time.Minute, config.Respondd.CollectInterval.Duration)
assert.Equal(2, config.Meshviewer.Version)

View File

@ -9,7 +9,7 @@ import (
// Node struct
type Node struct {
Address net.IP `json:"address"` // the last known IP address
Address *net.UDPAddr `json:"-"` // the last known address
Firstseen jsontime.Time `json:"firstseen"`
Lastseen jsontime.Time `json:"lastseen"`
Online bool `json:"online"`