yanic/cmd/respondd-bridge/main.go

217 lines
5.1 KiB
Go

package main
import (
"bufio"
"bytes"
"compress/flate"
"encoding/json"
"fmt"
"log"
"net"
"os"
"time"
"github.com/FreifunkBremen/yanic/data"
"github.com/FreifunkBremen/yanic/respond"
)
const maxDataGramSize = 8192
// Collector for a specificle respond messages
type Collector struct {
connection *net.UDPConn // UDP socket
queue chan *respond.Response // received responses
interval time.Duration // Interval for multicast packets
stop chan interface{}
nodes map[string]*data.ResponseData
interMac map[string]string
addrFrom net.UDPAddr
addrTo net.UDPAddr
}
func main() {
iface := os.Args[1]
addrFrom := os.Args[2]
addrTo := os.Args[3]
linkLocalAddr, err := getLinkLocalAddr(iface)
if err != nil {
log.Panic(err)
}
conn, err := net.ListenUDP("udp", &net.UDPAddr{
IP: linkLocalAddr,
Zone: iface,
})
if err != nil {
log.Panic(err)
}
conn.SetReadBuffer(maxDataGramSize)
collector := &Collector{
connection: conn,
queue: make(chan *respond.Response, 400),
stop: make(chan interface{}),
addrFrom: net.UDPAddr{IP: net.ParseIP(addrFrom)},
addrTo: net.UDPAddr{IP: net.ParseIP(addrTo)},
interval: time.Second * 10,
nodes: make(map[string]*data.ResponseData),
interMac: make(map[string]string),
}
go collector.receiver(conn)
go collector.parser()
collector.sendOnce()
collector.sender()
collector.Close()
}
// Returns the first link local unicast address for the given interface name
func getLinkLocalAddr(ifname string) (net.IP, error) {
iface, err := net.InterfaceByName(ifname)
if err != nil {
return nil, err
}
addresses, err := iface.Addrs()
if err != nil {
return nil, err
}
for _, addr := range addresses {
if ipnet := addr.(*net.IPNet); ipnet.IP.IsLinkLocalUnicast() {
return ipnet.IP, nil
}
}
return nil, fmt.Errorf("unable to find link local unicast address for %s", ifname)
}
// SendPacket sends a UDP request to the given unicast or multicast address
func (coll *Collector) SendRequestPacket(addr net.UDPAddr) {
addr.Port = 1001
if _, err := coll.connection.WriteToUDP([]byte("GET nodeinfo statistics neighbours"), &addr); err != nil {
log.Println("WriteToUDP failed:", err)
}
}
func (coll *Collector) saveResponse(addr net.UDPAddr, node *data.ResponseData) {
if val := node.NodeInfo; val == nil {
log.Printf("no nodeinfo from %s", addr.String())
return
}
// save current node
coll.nodes[addr.IP.String()] = node
// Process the data and update IP address
var otherIP string
if addr.IP.Equal(coll.addrFrom.IP) {
otherIP = coll.addrTo.IP.String()
} else {
otherIP = coll.addrFrom.IP.String()
}
otherNode := coll.nodes[otherIP]
if otherIP == "" || otherNode == nil {
log.Print("othernode not found")
return
}
if node.Neighbours == nil {
node.Neighbours = &data.Neighbours{
Batadv: make(map[string]data.BatadvNeighbours),
NodeID: node.NodeInfo.NodeID,
}
}
interMac := node.NodeInfo.Network.Mesh["bat0"].Interfaces.Other[0]
if newMac, ok := coll.interMac[addr.IP.String()]; ok {
interMac = newMac
} else {
coll.interMac[addr.IP.String()] = interMac
}
if _, ok := node.Neighbours.Batadv[interMac]; !ok {
node.Neighbours.Batadv[interMac] = data.BatadvNeighbours{
Neighbours: make(map[string]data.BatmanLink),
}
}
interOtherMac := otherNode.NodeInfo.Network.Mesh["bat0"].Interfaces.Other[0]
if newMac, ok := coll.interMac[coll.addrTo.IP.String()]; ok {
interOtherMac = newMac
} else {
coll.interMac[otherIP] = interMac
}
node.Neighbours.Batadv[interMac].Neighbours[interOtherMac] = data.BatmanLink{
Tq: 253,
Lastseen: 0.2,
}
buf := bytes.Buffer{}
writer := bufio.NewWriter(&buf)
deflater, err := flate.NewWriter(writer, flate.DefaultCompression)
err = json.NewEncoder(deflater).Encode(node)
if err != nil {
panic(err)
}
deflater.Close()
writer.Flush()
coll.connection.WriteToUDP(buf.Bytes(), &net.UDPAddr{
IP: net.ParseIP("fe80::de:faff:fe9f:2414"),
Port: 12345,
})
log.Print("send response from: ", addr.IP.String())
}
func (coll *Collector) receiver(conn *net.UDPConn) {
buf := make([]byte, maxDataGramSize)
for {
n, src, err := conn.ReadFromUDP(buf)
if err != nil {
log.Println("ReadFromUDP failed:", err)
return
}
raw := make([]byte, n)
copy(raw, buf)
coll.queue <- &respond.Response{
Address: *src,
Raw: raw,
}
}
}
func (coll *Collector) parser() {
for obj := range coll.queue {
if data, err := obj.Parse(); err != nil {
log.Println("unable to decode response from", obj.Address.String(), err, "\n", string(obj.Raw))
} else {
coll.saveResponse(obj.Address, data)
}
}
}
func (coll *Collector) sendOnce() {
coll.SendRequestPacket(coll.addrFrom)
coll.SendRequestPacket(coll.addrTo)
log.Print("send request")
}
// send packets continously
func (coll *Collector) sender() {
ticker := time.NewTicker(coll.interval)
for {
select {
case <-coll.stop:
ticker.Stop()
return
case <-ticker.C:
// send the multicast packet to request per-node statistics
coll.sendOnce()
}
}
}
// Close Collector
func (coll *Collector) Close() {
close(coll.stop)
coll.connection.Close()
close(coll.queue)
}