yanic/respond/collector.go

164 lines
3.4 KiB
Go
Raw Normal View History

2016-02-26 09:28:31 +01:00
package respond
2015-12-29 04:08:03 +01:00
import (
2016-03-19 01:50:23 +01:00
"bytes"
"compress/flate"
2016-03-12 00:59:36 +01:00
"encoding/json"
2016-03-19 15:07:44 +01:00
"io/ioutil"
2015-12-29 04:08:03 +01:00
"log"
"net"
2016-03-12 00:59:36 +01:00
"reflect"
2015-12-29 04:08:03 +01:00
"time"
)
2016-02-26 09:28:31 +01:00
//Collector for a specificle respond messages
2015-12-29 04:08:03 +01:00
type Collector struct {
CollectType string
2016-01-04 02:07:09 +01:00
connection *net.UDPConn // UDP socket
queue chan *Response // received responses
2016-03-12 00:59:36 +01:00
onReceive OnReceive
msgType reflect.Type
2016-03-11 23:56:23 +01:00
// Ticker and stopper
ticker *time.Ticker
stop chan interface{}
2015-12-29 04:08:03 +01:00
}
2016-03-12 00:59:36 +01:00
type OnReceive func(net.UDPAddr, interface{})
2016-03-11 23:56:23 +01:00
//NewCollector creates a Collector struct
2016-03-12 18:26:51 +01:00
func NewCollector(CollectType string, initialDelay time.Duration, interval time.Duration, msgStruct interface{}, onReceive OnReceive) *Collector {
2015-12-29 04:08:03 +01:00
// Parse address
addr, err := net.ResolveUDPAddr("udp", "[::]:0")
2015-12-29 04:08:03 +01:00
if err != nil {
log.Panic(err)
}
// Open socket
conn, err := net.ListenUDP("udp", addr)
if err != nil {
log.Panic(err)
}
conn.SetReadBuffer(maxDataGramSize)
2015-12-29 04:08:03 +01:00
collector := &Collector{
CollectType: CollectType,
2015-12-29 14:05:47 +01:00
connection: conn,
2016-02-19 14:25:11 +01:00
queue: make(chan *Response, 400),
2016-03-11 23:56:23 +01:00
ticker: time.NewTicker(interval),
stop: make(chan interface{}, 1),
2016-03-12 00:59:36 +01:00
msgType: reflect.TypeOf(msgStruct),
onReceive: onReceive,
2015-12-29 04:08:03 +01:00
}
go collector.receiver()
go collector.parser()
2016-03-12 01:04:22 +01:00
// Run senders
2016-03-12 18:26:51 +01:00
go func() {
time.Sleep(initialDelay)
collector.sendOnce() // immediately
collector.sender() // periodically
}()
2016-02-19 17:14:14 +01:00
2015-12-29 04:08:03 +01:00
return collector
}
2016-03-11 23:56:23 +01:00
// Close Collector
2015-12-29 04:08:03 +01:00
func (coll *Collector) Close() {
2016-03-11 23:56:23 +01:00
// stop ticker
coll.ticker.Stop()
coll.stop <- nil
2015-12-29 04:08:03 +01:00
coll.connection.Close()
close(coll.queue)
}
2015-12-29 14:05:47 +01:00
func (coll *Collector) sendOnce() {
coll.sendPacket(net.JoinHostPort(multiCastGroup, port))
2016-03-12 03:35:35 +01:00
log.Println("request", coll.CollectType)
2015-12-29 14:05:47 +01:00
}
func (coll *Collector) sendPacket(address string) {
2015-12-29 04:08:03 +01:00
addr, err := net.ResolveUDPAddr("udp", address)
2016-02-19 11:13:30 +01:00
if err != nil {
log.Panic(err)
}
2015-12-29 14:05:47 +01:00
2016-03-19 01:50:23 +01:00
if _, err := coll.connection.WriteToUDP([]byte("GET "+coll.CollectType), addr); err != nil {
2016-03-07 01:37:07 +01:00
log.Println("WriteToUDP failed:", err)
}
2015-12-29 04:08:03 +01:00
}
2016-03-11 23:56:23 +01:00
// send packets continously
func (coll *Collector) sender() {
for {
select {
case <-coll.stop:
return
case <-coll.ticker.C:
coll.sendOnce()
}
2015-12-29 04:08:03 +01:00
}
}
func (coll *Collector) parser() {
2016-01-04 02:07:09 +01:00
for obj := range coll.queue {
2016-03-19 01:50:23 +01:00
if err := coll.parse(obj); err != nil {
log.Println("unable to decode response from", obj.Address.String(), err, "\n", string(obj.Raw))
2016-03-12 00:59:36 +01:00
}
2015-12-29 04:08:03 +01:00
}
}
2016-03-19 01:50:23 +01:00
func (coll *Collector) parse(response *Response) (err error) {
// create new struct instance
data := reflect.New(coll.msgType).Interface()
// deflater
reader := flate.NewReader(bytes.NewReader(response.Raw))
defer reader.Close()
2016-03-19 15:07:44 +01:00
decompressed, err := ioutil.ReadAll(reader)
if err != nil {
return
}
// Remove useless wrapper element that only exists in compressed data.
// Who introduced this !?
if bytes.HasPrefix(decompressed, []byte(`{"neighbours":`)) ||
bytes.HasPrefix(decompressed, []byte(`{"statistics":`)) {
decompressed = decompressed[14 : len(decompressed)-1]
} else if bytes.HasPrefix(decompressed, []byte(`{"nodeinfo":`)) {
decompressed = decompressed[12 : len(decompressed)-1]
2016-03-19 01:50:23 +01:00
}
2016-03-19 15:07:44 +01:00
err = json.Unmarshal(decompressed, data)
if err != nil {
return
}
coll.onReceive(response.Address, data)
2016-03-19 01:50:23 +01:00
return
}
2015-12-29 04:08:03 +01:00
func (coll *Collector) receiver() {
buf := make([]byte, maxDataGramSize)
2015-12-29 04:08:03 +01:00
for {
2016-01-04 02:07:09 +01:00
n, src, err := coll.connection.ReadFromUDP(buf)
2015-12-29 04:08:03 +01:00
if err != nil {
log.Println("ReadFromUDP failed:", err)
2015-12-29 14:05:47 +01:00
return
2015-12-29 04:08:03 +01:00
}
2016-01-04 02:07:09 +01:00
raw := make([]byte, n)
copy(raw, buf)
coll.queue <- &Response{
Address: *src,
Raw: raw,
}
2015-12-29 04:08:03 +01:00
}
}