From 7b61f9f5ab5cabd42c1648b739e07efb6c84eb3f Mon Sep 17 00:00:00 2001 From: Julian Kornberger Date: Tue, 29 Dec 2015 04:08:03 +0100 Subject: [PATCH] First code --- collector.go | 118 +++++++++++++++++++++++++++++++++++++++++++++++++++ main.go | 22 ++++++++++ 2 files changed, 140 insertions(+) create mode 100644 collector.go create mode 100644 main.go diff --git a/collector.go b/collector.go new file mode 100644 index 0000000..574f57f --- /dev/null +++ b/collector.go @@ -0,0 +1,118 @@ +package main + +import ( + "encoding/json" + "log" + "net" + "time" +) + +const ( + maxDatagramSize = 8192 +) + +type Node struct { + Firstseen time.Time `json:"firstseen"` + Lastseen time.Time `json:"lastseen"` + Statistics interface{} `json:"statistics"` + Nodeinfo interface{} `json:"nodeinfo"` +} + +type Collector struct { + connection *net.UDPConn // UDP socket + queue chan string // received responses + nodes map[string]*Node // the current nodemap +} + +func NewCollector() *Collector { + // Parse address + addr, err := net.ResolveUDPAddr("udp", "[::]:1001") + if err != nil { + log.Panic(err) + } + + // Open socket + conn, err := net.ListenUDP("udp", addr) + if err != nil { + log.Panic(err) + } + conn.SetReadBuffer(maxDatagramSize) + + collector := &Collector{ + connection: conn, + queue: make(chan string, 100), + nodes: make(map[string]*Node), + } + + go collector.receiver() + go collector.parser() + + return collector +} + +func (coll *Collector) Close() { + coll.connection.Close() + close(coll.queue) +} + +func (coll *Collector) send(address string) { + addr, err := net.ResolveUDPAddr("udp", address) + if err != nil { + log.Panic(err) + } + coll.connection.WriteToUDP([]byte("nodeinfo"), addr) +} + +func (coll *Collector) print() { + b, err := json.Marshal(coll.nodes) + if err != nil { + log.Panic(err) + } + log.Println(string(b)) +} + +func (coll *Collector) parser() { + for str := range coll.queue { + coll.parseSingle(str) + coll.print() + } +} + +// Parst die Rückgabe +func (coll *Collector) parseSingle(str string) { + var result map[string]interface{} + json.Unmarshal([]byte(str), &result) + + nodeId, _ := result["node_id"].(string) + + if nodeId == "" { + log.Println("unable to parse node_id") + return + } + + now := time.Now() + node, _ := coll.nodes[nodeId] + + if node == nil { + node = &Node{ + Firstseen: now, + } + coll.nodes[nodeId] = node + } + + node.Lastseen = now + node.Nodeinfo = result +} + +func (coll *Collector) receiver() { + b := make([]byte, maxDatagramSize) + for { + n, _, err := coll.connection.ReadFromUDP(b) + + if err != nil { + log.Println("ReadFromUDP failed:", err) + } else { + coll.queue <- string(b[:n]) + } + } +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..71fd2ff --- /dev/null +++ b/main.go @@ -0,0 +1,22 @@ +package main + +import ( + "log" + "os" + "os/signal" + "syscall" +) + +func main() { + collector := NewCollector() + defer collector.Close() + collector.send("[2a06:8782:ffbb:1337:c24a:ff:fe2c:c7ac]:1001") + collector.send("[2001:bf7:540:0:32b5:c2ff:fe6e:99d5]:1001") + + // Wait for SIGINT or SIGTERM + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + sig := <-sigs + log.Println("received", sig) + +}