diff --git a/data/response.go b/data/response.go new file mode 100644 index 0000000..be9231d --- /dev/null +++ b/data/response.go @@ -0,0 +1,7 @@ +package data + +type ResponseData struct { + Neighbours *Neighbours `json:"neighbours"` + NodeInfo *NodeInfo `json:"nodeinfo"` + Statistics *Statistics `json:"statistics"` +} diff --git a/main.go b/main.go index 9105a28..aa8ee0b 100644 --- a/main.go +++ b/main.go @@ -20,7 +20,7 @@ var ( configFile string config *models.Config wsserverForNodes *websocketserver.Server - multiCollector *respond.MultiCollector + collector *respond.Collector statsDb *StatsDb nodes = models.NewNodes() //aliases = models.NewNodes() @@ -49,7 +49,7 @@ func main() { if config.Respondd.Enable { collectInterval := time.Second * time.Duration(config.Respondd.CollectInterval) - multiCollector = respond.NewMultiCollector(collectInterval, onReceive) + collector = respond.NewCollector("nodeinfo statistics neighbours", collectInterval, onReceive) } // TODO bad @@ -67,8 +67,8 @@ func main() { if wsserverForNodes != nil { wsserverForNodes.Close() } - if multiCollector != nil { - multiCollector.Close() + if collector != nil { + collector.Close() } if statsDb != nil { statsDb.Close() @@ -76,24 +76,21 @@ func main() { } // called for every parsed announced-message -func onReceive(addr net.UDPAddr, msg interface{}) { - switch msg := msg.(type) { +func onReceive(addr net.UDPAddr, res *data.ResponseData) { - case *data.NodeInfo: - nodes.Get(msg.NodeId).Nodeinfo = msg + if val := res.Neighbours; val != nil { + nodes.Get(val.NodeId).Neighbours = val + } - case *data.Neighbours: - nodes.Get(msg.NodeId).Neighbours = msg + if val := res.NodeInfo; val != nil { + nodes.Get(val.NodeId).Nodeinfo = val + } - case *data.Statistics: - nodes.Get(msg.NodeId).Statistics = msg + if val := res.Statistics; val != nil { + nodes.Get(val.NodeId).Statistics = val - // store data? if statsDb != nil { - statsDb.Add(msg) + statsDb.Add(val) } - - default: - log.Println("unknown message:", msg) } } diff --git a/respond/collector.go b/respond/collector.go index 86eea69..d440a7a 100644 --- a/respond/collector.go +++ b/respond/collector.go @@ -9,6 +9,8 @@ import ( "net" "reflect" "time" + + "github.com/FreifunkBremen/respond-collector/data" ) //Collector for a specificle respond messages @@ -24,10 +26,10 @@ type Collector struct { stop chan interface{} } -type OnReceive func(net.UDPAddr, interface{}) +type OnReceive func(net.UDPAddr, *data.ResponseData) //NewCollector creates a Collector struct -func NewCollector(CollectType string, initialDelay time.Duration, interval time.Duration, msgStruct interface{}, onReceive OnReceive) *Collector { +func NewCollector(CollectType string, interval time.Duration, onReceive OnReceive) *Collector { // Parse address addr, err := net.ResolveUDPAddr("udp", "[::]:0") if err != nil { @@ -47,7 +49,6 @@ func NewCollector(CollectType string, initialDelay time.Duration, interval time. queue: make(chan *Response, 400), ticker: time.NewTicker(interval), stop: make(chan interface{}, 1), - msgType: reflect.TypeOf(msgStruct), onReceive: onReceive, } @@ -56,7 +57,6 @@ func NewCollector(CollectType string, initialDelay time.Duration, interval time. // Run senders go func() { - time.Sleep(initialDelay) collector.sendOnce() // immediately collector.sender() // periodically }() @@ -111,8 +111,6 @@ func (coll *Collector) parser() { } func (coll *Collector) parse(response *Response) (err error) { - // create new struct instance - data := reflect.New(coll.msgType).Interface() // deflater reader := flate.NewReader(bytes.NewReader(response.Raw)) @@ -123,21 +121,13 @@ func (coll *Collector) parse(response *Response) (err error) { return } - // Remove useless wrapper element that only exists in compressed data. - // Who introduced this !? - if bytes.HasPrefix(decompressed, []byte(`{"neighbours":`)) || - bytes.HasPrefix(decompressed, []byte(`{"statistics":`)) { - decompressed = decompressed[14 : len(decompressed)-1] - } else if bytes.HasPrefix(decompressed, []byte(`{"nodeinfo":`)) { - decompressed = decompressed[12 : len(decompressed)-1] - } - - err = json.Unmarshal(decompressed, data) + res := &data.ResponseData{} + err = json.Unmarshal(decompressed, res) if err != nil { return } - coll.onReceive(response.Address, data) + coll.onReceive(response.Address, res) return } diff --git a/respond/collector_test.go b/respond/collector_test.go index 7ae2812..7837566 100644 --- a/respond/collector_test.go +++ b/respond/collector_test.go @@ -12,16 +12,11 @@ import ( func TestParse(t *testing.T) { assert := assert.New(t) - var decompressed *data.NodeInfo + var decompressed *data.ResponseData // callback function - onReceive := func(addr net.UDPAddr, msg interface{}) { - switch msg := msg.(type) { - case *data.NodeInfo: - decompressed = msg - default: - t.Error("unexpected message:", msg) - } + onReceive := func(addr net.UDPAddr, res *data.ResponseData) { + decompressed = res } collector := &Collector{ @@ -38,5 +33,6 @@ func TestParse(t *testing.T) { }) assert.NotNil(decompressed) - assert.Equal("f81a67a5e9c1", decompressed.NodeId) + assert.NotNil(decompressed.NodeInfo) + assert.Equal("f81a67a5e9c1", decompressed.NodeInfo.NodeId) } diff --git a/respond/multi_collector.go b/respond/multi_collector.go deleted file mode 100644 index 5bf8d63..0000000 --- a/respond/multi_collector.go +++ /dev/null @@ -1,29 +0,0 @@ -package respond - -import ( - "github.com/FreifunkBremen/respond-collector/data" - "time" -) - -//MultiCollector struct -type MultiCollector struct { - collectors []*Collector -} - -//NewMultiCollector create a list of collectors -func NewMultiCollector(interval time.Duration, onReceive OnReceive) *MultiCollector { - return &MultiCollector{ - collectors: []*Collector{ - NewCollector("statistics", 0, interval, data.Statistics{}, onReceive), - NewCollector("nodeinfo", time.Second*3, interval, data.NodeInfo{}, onReceive), - NewCollector("neighbours", time.Second*6, interval, data.Neighbours{}, onReceive), - }, - } -} - -//Close all Collections -func (multi *MultiCollector) Close() { - for _, col := range multi.collectors { - col.Close() - } -}