From 665c55da41dfdb4a0081941860eb65957a38fd90 Mon Sep 17 00:00:00 2001 From: Martin Geno Date: Fri, 28 Jul 2017 08:55:11 +0200 Subject: [PATCH] add respond injector bridge to paint links --- cmd/respondd-bridge/main.go | 216 ++++++++++++++++++++++++++++++++++++ respond/collector.go | 4 +- respond/collector_test.go | 2 +- 3 files changed, 219 insertions(+), 3 deletions(-) create mode 100644 cmd/respondd-bridge/main.go diff --git a/cmd/respondd-bridge/main.go b/cmd/respondd-bridge/main.go new file mode 100644 index 0000000..b01d06f --- /dev/null +++ b/cmd/respondd-bridge/main.go @@ -0,0 +1,216 @@ +package main + +import ( + "bufio" + "bytes" + "compress/flate" + "encoding/json" + "fmt" + "log" + "net" + "os" + "time" + + "github.com/FreifunkBremen/yanic/data" + "github.com/FreifunkBremen/yanic/respond" +) + +const maxDataGramSize = 8192 + +// Collector for a specificle respond messages +type Collector struct { + connection *net.UDPConn // UDP socket + queue chan *respond.Response // received responses + interval time.Duration // Interval for multicast packets + stop chan interface{} + nodes map[string]*data.ResponseData + interMac map[string]string + addrFrom net.UDPAddr + addrTo net.UDPAddr +} + +func main() { + iface := os.Args[1] + addrFrom := os.Args[2] + addrTo := os.Args[3] + linkLocalAddr, err := getLinkLocalAddr(iface) + if err != nil { + log.Panic(err) + } + conn, err := net.ListenUDP("udp", &net.UDPAddr{ + IP: linkLocalAddr, + Zone: iface, + }) + if err != nil { + log.Panic(err) + } + conn.SetReadBuffer(maxDataGramSize) + collector := &Collector{ + connection: conn, + queue: make(chan *respond.Response, 400), + stop: make(chan interface{}), + addrFrom: net.UDPAddr{IP: net.ParseIP(addrFrom)}, + addrTo: net.UDPAddr{IP: net.ParseIP(addrTo)}, + interval: time.Second * 10, + nodes: make(map[string]*data.ResponseData), + interMac: make(map[string]string), + } + go collector.receiver(conn) + go collector.parser() + collector.sendOnce() + collector.sender() + collector.Close() +} + +// Returns the first link local unicast address for the given interface name +func getLinkLocalAddr(ifname string) (net.IP, error) { + iface, err := net.InterfaceByName(ifname) + if err != nil { + return nil, err + } + + addresses, err := iface.Addrs() + if err != nil { + return nil, err + } + + for _, addr := range addresses { + if ipnet := addr.(*net.IPNet); ipnet.IP.IsLinkLocalUnicast() { + return ipnet.IP, nil + } + } + return nil, fmt.Errorf("unable to find link local unicast address for %s", ifname) +} + +// SendPacket sends a UDP request to the given unicast or multicast address +func (coll *Collector) SendRequestPacket(addr net.UDPAddr) { + addr.Port = 1001 + if _, err := coll.connection.WriteToUDP([]byte("GET nodeinfo statistics neighbours"), &addr); err != nil { + log.Println("WriteToUDP failed:", err) + } +} + +func (coll *Collector) saveResponse(addr net.UDPAddr, node *data.ResponseData) { + if val := node.NodeInfo; val == nil { + log.Printf("no nodeinfo from %s", addr.String()) + return + } + // save current node + coll.nodes[addr.IP.String()] = node + + // Process the data and update IP address + var otherIP string + if addr.IP.Equal(coll.addrFrom.IP) { + otherIP = coll.addrTo.IP.String() + } else { + otherIP = coll.addrFrom.IP.String() + } + + otherNode := coll.nodes[otherIP] + if otherIP == "" || otherNode == nil { + log.Print("othernode not found") + return + } + + if node.Neighbours == nil { + node.Neighbours = &data.Neighbours{ + Batadv: make(map[string]data.BatadvNeighbours), + NodeID: node.NodeInfo.NodeID, + } + } + interMac := node.NodeInfo.Network.Mesh["bat0"].Interfaces.Other[0] + if newMac, ok := coll.interMac[addr.IP.String()]; ok { + interMac = newMac + } else { + coll.interMac[addr.IP.String()] = interMac + } + if _, ok := node.Neighbours.Batadv[interMac]; !ok { + node.Neighbours.Batadv[interMac] = data.BatadvNeighbours{ + Neighbours: make(map[string]data.BatmanLink), + } + } + interOtherMac := otherNode.NodeInfo.Network.Mesh["bat0"].Interfaces.Other[0] + if newMac, ok := coll.interMac[coll.addrTo.IP.String()]; ok { + interOtherMac = newMac + } else { + coll.interMac[otherIP] = interMac + } + node.Neighbours.Batadv[interMac].Neighbours[interOtherMac] = data.BatmanLink{ + Tq: 253, + Lastseen: 0.2, + } + buf := bytes.Buffer{} + writer := bufio.NewWriter(&buf) + deflater, err := flate.NewWriter(writer, flate.DefaultCompression) + + err = json.NewEncoder(deflater).Encode(node) + if err != nil { + panic(err) + } + deflater.Close() + writer.Flush() + + coll.connection.WriteToUDP(buf.Bytes(), &net.UDPAddr{ + IP: net.ParseIP("fe80::de:faff:fe9f:2414"), + Port: 12345, + }) + log.Print("send response from: ", addr.IP.String()) +} + +func (coll *Collector) receiver(conn *net.UDPConn) { + buf := make([]byte, maxDataGramSize) + for { + n, src, err := conn.ReadFromUDP(buf) + + if err != nil { + log.Println("ReadFromUDP failed:", err) + return + } + + raw := make([]byte, n) + copy(raw, buf) + + coll.queue <- &respond.Response{ + Address: *src, + Raw: raw, + } + } +} + +func (coll *Collector) parser() { + for obj := range coll.queue { + if data, err := obj.Parse(); err != nil { + log.Println("unable to decode response from", obj.Address.String(), err, "\n", string(obj.Raw)) + } else { + coll.saveResponse(obj.Address, data) + } + } +} + +func (coll *Collector) sendOnce() { + coll.SendRequestPacket(coll.addrFrom) + coll.SendRequestPacket(coll.addrTo) + log.Print("send request") +} + +// send packets continously +func (coll *Collector) sender() { + ticker := time.NewTicker(coll.interval) + for { + select { + case <-coll.stop: + ticker.Stop() + return + case <-ticker.C: + // send the multicast packet to request per-node statistics + coll.sendOnce() + } + } +} + +// Close Collector +func (coll *Collector) Close() { + close(coll.stop) + coll.connection.Close() + close(coll.queue) +} diff --git a/respond/collector.go b/respond/collector.go index e6c31f1..851abe0 100644 --- a/respond/collector.go +++ b/respond/collector.go @@ -184,7 +184,7 @@ func (coll *Collector) sender() { func (coll *Collector) parser() { for obj := range coll.queue { - if data, err := obj.parse(); err != nil { + if data, err := obj.Parse(); err != nil { log.Println("unable to decode response from", obj.Address.String(), err, "\n", string(obj.Raw)) } else { coll.saveResponse(obj.Address, data) @@ -192,7 +192,7 @@ func (coll *Collector) parser() { } } -func (res *Response) parse() (*data.ResponseData, error) { +func (res *Response) Parse() (*data.ResponseData, error) { // Deflate deflater := flate.NewReader(bytes.NewReader(res.Raw)) defer deflater.Close() diff --git a/respond/collector_test.go b/respond/collector_test.go index f95f13d..c7ea57b 100644 --- a/respond/collector_test.go +++ b/respond/collector_test.go @@ -18,7 +18,7 @@ func TestParse(t *testing.T) { Raw: compressed, } - data, err := res.parse() + data, err := res.Parse() assert.NoError(err) assert.NotNil(data)