yanic/collector.go

146 lines
2.5 KiB
Go
Raw Normal View History

2015-12-29 04:08:03 +01:00
package main
import (
"encoding/json"
"log"
"net"
2015-12-29 14:05:47 +01:00
"reflect"
"strings"
2015-12-29 04:08:03 +01:00
"time"
)
const (
2016-01-04 02:07:09 +01:00
// default multicast group used by announced
MultiCastGroup string = "ff02:0:0:0:0:0:2:1001"
// default udp port used by announced
2016-02-19 13:47:05 +01:00
Port string = "1001"
2016-01-04 02:07:09 +01:00
// maximum receivable size
MaxDataGramSize int = 8192
2015-12-29 04:08:03 +01:00
)
2016-01-04 02:07:09 +01:00
type Response struct {
Address net.UDPAddr
Raw []byte
}
2015-12-29 04:08:03 +01:00
type Collector struct {
2015-12-29 14:05:47 +01:00
collectType string
2016-01-04 02:07:09 +01:00
connection *net.UDPConn // UDP socket
queue chan *Response // received responses
2015-12-29 04:08:03 +01:00
}
2015-12-29 14:05:47 +01:00
func NewCollector(collectType string) *Collector {
2015-12-29 04:08:03 +01:00
// Parse address
2015-12-29 14:05:47 +01:00
addr, err := net.ResolveUDPAddr("udp", "[::]:0")
2015-12-29 04:08:03 +01:00
if err != nil {
log.Panic(err)
}
// Open socket
conn, err := net.ListenUDP("udp", addr)
if err != nil {
log.Panic(err)
}
2016-01-04 02:07:09 +01:00
conn.SetReadBuffer(MaxDataGramSize)
2015-12-29 04:08:03 +01:00
collector := &Collector{
2015-12-29 14:05:47 +01:00
collectType: collectType,
connection: conn,
2016-02-19 14:25:11 +01:00
queue: make(chan *Response, 400),
2015-12-29 04:08:03 +01:00
}
2015-12-29 14:05:47 +01:00
go collector.sender()
2015-12-29 04:08:03 +01:00
go collector.receiver()
go collector.parser()
2016-02-19 17:14:14 +01:00
collector.sendOnce()
2015-12-29 04:08:03 +01:00
return collector
}
func (coll *Collector) Close() {
coll.connection.Close()
close(coll.queue)
}
2015-12-29 14:05:47 +01:00
func (coll *Collector) sendOnce() {
2016-02-19 13:47:05 +01:00
coll.sendPacket(net.JoinHostPort(MultiCastGroup,Port))
2016-02-19 17:14:14 +01:00
log.Println("send request")
2015-12-29 14:05:47 +01:00
}
func (coll *Collector) sendPacket(address string) {
2015-12-29 04:08:03 +01:00
addr, err := net.ResolveUDPAddr("udp", address)
2016-02-19 11:13:30 +01:00
if err != nil {
log.Panic(err)
}
2015-12-29 14:05:47 +01:00
coll.connection.WriteToUDP([]byte(coll.collectType), addr)
2015-12-29 04:08:03 +01:00
}
2015-12-29 14:05:47 +01:00
func (coll *Collector) sender() {
c := time.Tick(collectInterval)
for range c {
2016-01-04 02:07:09 +01:00
// TODO break condition
2015-12-29 14:05:47 +01:00
coll.sendOnce()
2015-12-29 04:08:03 +01:00
}
}
func (coll *Collector) parser() {
2016-01-04 02:07:09 +01:00
for obj := range coll.queue {
coll.parse(obj)
2015-12-29 04:08:03 +01:00
}
}
2016-01-04 02:07:09 +01:00
// Parses a response
func (coll *Collector) parse(res *Response) {
2015-12-29 04:08:03 +01:00
var result map[string]interface{}
2016-01-04 02:07:09 +01:00
json.Unmarshal(res.Raw, &result)
2015-12-29 04:08:03 +01:00
nodeId, _ := result["node_id"].(string)
if nodeId == "" {
log.Println("unable to parse node_id")
return
}
2015-12-29 14:05:47 +01:00
node := nodes.get(nodeId)
2015-12-29 04:08:03 +01:00
2015-12-29 14:05:47 +01:00
// Set result
elem := reflect.ValueOf(node).Elem()
field := elem.FieldByName(strings.Title(coll.collectType))
2016-02-19 11:41:39 +01:00
2016-02-19 13:28:06 +01:00
log.Println(field)
log.Println(result)
if !reflect.DeepEqual(field,result){
nodeserver.SendAll(node)
}
field.Set(reflect.ValueOf(result))
2015-12-29 04:08:03 +01:00
}
func (coll *Collector) receiver() {
2016-01-04 02:07:09 +01:00
buf := make([]byte, MaxDataGramSize)
2015-12-29 04:08:03 +01:00
for {
2016-01-04 02:07:09 +01:00
n, src, err := coll.connection.ReadFromUDP(buf)
2015-12-29 04:08:03 +01:00
if err != nil {
log.Println("ReadFromUDP failed:", err)
2015-12-29 14:05:47 +01:00
return
2015-12-29 04:08:03 +01:00
}
2016-01-04 02:07:09 +01:00
raw := make([]byte, n)
copy(raw, buf)
coll.queue <- &Response{
Address: *src,
Raw: raw,
}
2015-12-29 14:05:47 +01:00
log.Println("received", coll.collectType, "from", src)
2015-12-29 04:08:03 +01:00
}
}