Introduce Response type

This commit is contained in:
Julian Kornberger 2016-01-04 02:07:09 +01:00
parent d4183a466d
commit 66fbac4c2c
1 changed files with 34 additions and 13 deletions

View File

@ -10,13 +10,25 @@ import (
) )
const ( const (
maxDatagramSize = 8192 // default multicast group used by announced
MultiCastGroup string = "ff02:0:0:0:0:0:2:1001"
// default udp port used by announced
Port int = 1001
// maximum receivable size
MaxDataGramSize int = 8192
) )
type Response struct {
Address net.UDPAddr
Raw []byte
}
type Collector struct { type Collector struct {
collectType string collectType string
connection *net.UDPConn // UDP socket connection *net.UDPConn // UDP socket
queue chan string // received responses queue chan *Response // received responses
} }
func NewCollector(collectType string) *Collector { func NewCollector(collectType string) *Collector {
@ -31,12 +43,12 @@ func NewCollector(collectType string) *Collector {
if err != nil { if err != nil {
log.Panic(err) log.Panic(err)
} }
conn.SetReadBuffer(maxDatagramSize) conn.SetReadBuffer(MaxDataGramSize)
collector := &Collector{ collector := &Collector{
collectType: collectType, collectType: collectType,
connection: conn, connection: conn,
queue: make(chan string, 100), queue: make(chan *Response, 100),
} }
go collector.sendOnce() go collector.sendOnce()
@ -54,6 +66,7 @@ func (coll *Collector) Close() {
} }
func (coll *Collector) sendOnce() { func (coll *Collector) sendOnce() {
// TODO
coll.sendPacket("[2a06:8782:ffbb:1337:c24a:ff:fe2c:c7ac]:1001") coll.sendPacket("[2a06:8782:ffbb:1337:c24a:ff:fe2c:c7ac]:1001")
coll.sendPacket("[2001:bf7:540:0:32b5:c2ff:fe6e:99d5]:1001") coll.sendPacket("[2001:bf7:540:0:32b5:c2ff:fe6e:99d5]:1001")
} }
@ -69,20 +82,21 @@ func (coll *Collector) sender() {
c := time.Tick(collectInterval) c := time.Tick(collectInterval)
for range c { for range c {
// TODO break condition
coll.sendOnce() coll.sendOnce()
} }
} }
func (coll *Collector) parser() { func (coll *Collector) parser() {
for str := range coll.queue { for obj := range coll.queue {
coll.parseSingle(str) coll.parse(obj)
} }
} }
// Parst die Rückgabe // Parses a response
func (coll *Collector) parseSingle(str string) { func (coll *Collector) parse(res *Response) {
var result map[string]interface{} var result map[string]interface{}
json.Unmarshal([]byte(str), &result) json.Unmarshal(res.Raw, &result)
nodeId, _ := result["node_id"].(string) nodeId, _ := result["node_id"].(string)
@ -100,15 +114,22 @@ func (coll *Collector) parseSingle(str string) {
} }
func (coll *Collector) receiver() { func (coll *Collector) receiver() {
b := make([]byte, maxDatagramSize) buf := make([]byte, MaxDataGramSize)
for { for {
n, src, err := coll.connection.ReadFromUDP(b) n, src, err := coll.connection.ReadFromUDP(buf)
if err != nil { if err != nil {
log.Println("ReadFromUDP failed:", err) log.Println("ReadFromUDP failed:", err)
return return
} }
coll.queue <- string(b[:n])
raw := make([]byte, n)
copy(raw, buf)
coll.queue <- &Response{
Address: *src,
Raw: raw,
}
log.Println("received", coll.collectType, "from", src) log.Println("received", coll.collectType, "from", src)
} }
} }