diff --git a/collector.go b/collector.go index 3e800ed..56dd7e3 100644 --- a/collector.go +++ b/collector.go @@ -10,13 +10,25 @@ import ( ) const ( - maxDatagramSize = 8192 + // default multicast group used by announced + MultiCastGroup string = "ff02:0:0:0:0:0:2:1001" + + // default udp port used by announced + Port int = 1001 + + // maximum receivable size + MaxDataGramSize int = 8192 ) +type Response struct { + Address net.UDPAddr + Raw []byte +} + type Collector struct { collectType string - connection *net.UDPConn // UDP socket - queue chan string // received responses + connection *net.UDPConn // UDP socket + queue chan *Response // received responses } func NewCollector(collectType string) *Collector { @@ -31,12 +43,12 @@ func NewCollector(collectType string) *Collector { if err != nil { log.Panic(err) } - conn.SetReadBuffer(maxDatagramSize) + conn.SetReadBuffer(MaxDataGramSize) collector := &Collector{ collectType: collectType, connection: conn, - queue: make(chan string, 100), + queue: make(chan *Response, 100), } go collector.sendOnce() @@ -54,6 +66,7 @@ func (coll *Collector) Close() { } func (coll *Collector) sendOnce() { + // TODO coll.sendPacket("[2a06:8782:ffbb:1337:c24a:ff:fe2c:c7ac]:1001") coll.sendPacket("[2001:bf7:540:0:32b5:c2ff:fe6e:99d5]:1001") } @@ -69,20 +82,21 @@ func (coll *Collector) sender() { c := time.Tick(collectInterval) for range c { + // TODO break condition coll.sendOnce() } } func (coll *Collector) parser() { - for str := range coll.queue { - coll.parseSingle(str) + for obj := range coll.queue { + coll.parse(obj) } } -// Parst die Rückgabe -func (coll *Collector) parseSingle(str string) { +// Parses a response +func (coll *Collector) parse(res *Response) { var result map[string]interface{} - json.Unmarshal([]byte(str), &result) + json.Unmarshal(res.Raw, &result) nodeId, _ := result["node_id"].(string) @@ -100,15 +114,22 @@ func (coll *Collector) parseSingle(str string) { } func (coll *Collector) receiver() { - b := make([]byte, maxDatagramSize) + buf := make([]byte, MaxDataGramSize) for { - n, src, err := coll.connection.ReadFromUDP(b) + n, src, err := coll.connection.ReadFromUDP(buf) if err != nil { log.Println("ReadFromUDP failed:", err) return } - coll.queue <- string(b[:n]) + + raw := make([]byte, n) + copy(raw, buf) + + coll.queue <- &Response{ + Address: *src, + Raw: raw, + } log.Println("received", coll.collectType, "from", src) } }