diff --git a/respond/collector.go b/respond/collector.go index d905921..c48103d 100644 --- a/respond/collector.go +++ b/respond/collector.go @@ -1,6 +1,8 @@ package respond import ( + "bytes" + "compress/flate" "encoding/json" "log" "net" @@ -82,7 +84,7 @@ func (coll *Collector) sendPacket(address string) { log.Panic(err) } - if _, err := coll.connection.WriteToUDP([]byte(coll.CollectType), addr); err != nil { + if _, err := coll.connection.WriteToUDP([]byte("GET "+coll.CollectType), addr); err != nil { log.Println("WriteToUDP failed:", err) } } @@ -101,17 +103,28 @@ func (coll *Collector) sender() { func (coll *Collector) parser() { for obj := range coll.queue { - // create new struct instance - data := reflect.New(coll.msgType).Interface() - - if err := json.Unmarshal(obj.Raw, data); err == nil { - coll.onReceive(obj.Address, data) - } else { + if err := coll.parse(obj); err != nil { log.Println("unable to decode response from", obj.Address.String(), err, "\n", string(obj.Raw)) } } } +func (coll *Collector) parse(response *Response) (err error) { + // create new struct instance + data := reflect.New(coll.msgType).Interface() + + // deflater + reader := flate.NewReader(bytes.NewReader(response.Raw)) + defer reader.Close() + + decoder := json.NewDecoder(reader) + if err = decoder.Decode(data); err == nil { + coll.onReceive(response.Address, data) + } + + return +} + func (coll *Collector) receiver() { buf := make([]byte, maxDataGramSize) for {