Request all data types at once
This commit is contained in:
parent
96d8be417c
commit
c88d13723a
|
@ -0,0 +1,7 @@
|
||||||
|
package data
|
||||||
|
|
||||||
|
type ResponseData struct {
|
||||||
|
Neighbours *Neighbours `json:"neighbours"`
|
||||||
|
NodeInfo *NodeInfo `json:"nodeinfo"`
|
||||||
|
Statistics *Statistics `json:"statistics"`
|
||||||
|
}
|
31
main.go
31
main.go
|
@ -20,7 +20,7 @@ var (
|
||||||
configFile string
|
configFile string
|
||||||
config *models.Config
|
config *models.Config
|
||||||
wsserverForNodes *websocketserver.Server
|
wsserverForNodes *websocketserver.Server
|
||||||
multiCollector *respond.MultiCollector
|
collector *respond.Collector
|
||||||
statsDb *StatsDb
|
statsDb *StatsDb
|
||||||
nodes = models.NewNodes()
|
nodes = models.NewNodes()
|
||||||
//aliases = models.NewNodes()
|
//aliases = models.NewNodes()
|
||||||
|
@ -49,7 +49,7 @@ func main() {
|
||||||
|
|
||||||
if config.Respondd.Enable {
|
if config.Respondd.Enable {
|
||||||
collectInterval := time.Second * time.Duration(config.Respondd.CollectInterval)
|
collectInterval := time.Second * time.Duration(config.Respondd.CollectInterval)
|
||||||
multiCollector = respond.NewMultiCollector(collectInterval, onReceive)
|
collector = respond.NewCollector("nodeinfo statistics neighbours", collectInterval, onReceive)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO bad
|
// TODO bad
|
||||||
|
@ -67,8 +67,8 @@ func main() {
|
||||||
if wsserverForNodes != nil {
|
if wsserverForNodes != nil {
|
||||||
wsserverForNodes.Close()
|
wsserverForNodes.Close()
|
||||||
}
|
}
|
||||||
if multiCollector != nil {
|
if collector != nil {
|
||||||
multiCollector.Close()
|
collector.Close()
|
||||||
}
|
}
|
||||||
if statsDb != nil {
|
if statsDb != nil {
|
||||||
statsDb.Close()
|
statsDb.Close()
|
||||||
|
@ -76,24 +76,21 @@ func main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// called for every parsed announced-message
|
// called for every parsed announced-message
|
||||||
func onReceive(addr net.UDPAddr, msg interface{}) {
|
func onReceive(addr net.UDPAddr, res *data.ResponseData) {
|
||||||
switch msg := msg.(type) {
|
|
||||||
|
|
||||||
case *data.NodeInfo:
|
if val := res.Neighbours; val != nil {
|
||||||
nodes.Get(msg.NodeId).Nodeinfo = msg
|
nodes.Get(val.NodeId).Neighbours = val
|
||||||
|
}
|
||||||
|
|
||||||
case *data.Neighbours:
|
if val := res.NodeInfo; val != nil {
|
||||||
nodes.Get(msg.NodeId).Neighbours = msg
|
nodes.Get(val.NodeId).Nodeinfo = val
|
||||||
|
}
|
||||||
|
|
||||||
case *data.Statistics:
|
if val := res.Statistics; val != nil {
|
||||||
nodes.Get(msg.NodeId).Statistics = msg
|
nodes.Get(val.NodeId).Statistics = val
|
||||||
|
|
||||||
// store data?
|
|
||||||
if statsDb != nil {
|
if statsDb != nil {
|
||||||
statsDb.Add(msg)
|
statsDb.Add(val)
|
||||||
}
|
}
|
||||||
|
|
||||||
default:
|
|
||||||
log.Println("unknown message:", msg)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,6 +9,8 @@ import (
|
||||||
"net"
|
"net"
|
||||||
"reflect"
|
"reflect"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/FreifunkBremen/respond-collector/data"
|
||||||
)
|
)
|
||||||
|
|
||||||
//Collector for a specificle respond messages
|
//Collector for a specificle respond messages
|
||||||
|
@ -24,10 +26,10 @@ type Collector struct {
|
||||||
stop chan interface{}
|
stop chan interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
type OnReceive func(net.UDPAddr, interface{})
|
type OnReceive func(net.UDPAddr, *data.ResponseData)
|
||||||
|
|
||||||
//NewCollector creates a Collector struct
|
//NewCollector creates a Collector struct
|
||||||
func NewCollector(CollectType string, initialDelay time.Duration, interval time.Duration, msgStruct interface{}, onReceive OnReceive) *Collector {
|
func NewCollector(CollectType string, interval time.Duration, onReceive OnReceive) *Collector {
|
||||||
// Parse address
|
// Parse address
|
||||||
addr, err := net.ResolveUDPAddr("udp", "[::]:0")
|
addr, err := net.ResolveUDPAddr("udp", "[::]:0")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -47,7 +49,6 @@ func NewCollector(CollectType string, initialDelay time.Duration, interval time.
|
||||||
queue: make(chan *Response, 400),
|
queue: make(chan *Response, 400),
|
||||||
ticker: time.NewTicker(interval),
|
ticker: time.NewTicker(interval),
|
||||||
stop: make(chan interface{}, 1),
|
stop: make(chan interface{}, 1),
|
||||||
msgType: reflect.TypeOf(msgStruct),
|
|
||||||
onReceive: onReceive,
|
onReceive: onReceive,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -56,7 +57,6 @@ func NewCollector(CollectType string, initialDelay time.Duration, interval time.
|
||||||
|
|
||||||
// Run senders
|
// Run senders
|
||||||
go func() {
|
go func() {
|
||||||
time.Sleep(initialDelay)
|
|
||||||
collector.sendOnce() // immediately
|
collector.sendOnce() // immediately
|
||||||
collector.sender() // periodically
|
collector.sender() // periodically
|
||||||
}()
|
}()
|
||||||
|
@ -111,8 +111,6 @@ func (coll *Collector) parser() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (coll *Collector) parse(response *Response) (err error) {
|
func (coll *Collector) parse(response *Response) (err error) {
|
||||||
// create new struct instance
|
|
||||||
data := reflect.New(coll.msgType).Interface()
|
|
||||||
|
|
||||||
// deflater
|
// deflater
|
||||||
reader := flate.NewReader(bytes.NewReader(response.Raw))
|
reader := flate.NewReader(bytes.NewReader(response.Raw))
|
||||||
|
@ -123,21 +121,13 @@ func (coll *Collector) parse(response *Response) (err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove useless wrapper element that only exists in compressed data.
|
res := &data.ResponseData{}
|
||||||
// Who introduced this !?
|
err = json.Unmarshal(decompressed, res)
|
||||||
if bytes.HasPrefix(decompressed, []byte(`{"neighbours":`)) ||
|
|
||||||
bytes.HasPrefix(decompressed, []byte(`{"statistics":`)) {
|
|
||||||
decompressed = decompressed[14 : len(decompressed)-1]
|
|
||||||
} else if bytes.HasPrefix(decompressed, []byte(`{"nodeinfo":`)) {
|
|
||||||
decompressed = decompressed[12 : len(decompressed)-1]
|
|
||||||
}
|
|
||||||
|
|
||||||
err = json.Unmarshal(decompressed, data)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
coll.onReceive(response.Address, data)
|
coll.onReceive(response.Address, res)
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,16 +12,11 @@ import (
|
||||||
|
|
||||||
func TestParse(t *testing.T) {
|
func TestParse(t *testing.T) {
|
||||||
assert := assert.New(t)
|
assert := assert.New(t)
|
||||||
var decompressed *data.NodeInfo
|
var decompressed *data.ResponseData
|
||||||
|
|
||||||
// callback function
|
// callback function
|
||||||
onReceive := func(addr net.UDPAddr, msg interface{}) {
|
onReceive := func(addr net.UDPAddr, res *data.ResponseData) {
|
||||||
switch msg := msg.(type) {
|
decompressed = res
|
||||||
case *data.NodeInfo:
|
|
||||||
decompressed = msg
|
|
||||||
default:
|
|
||||||
t.Error("unexpected message:", msg)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
collector := &Collector{
|
collector := &Collector{
|
||||||
|
@ -38,5 +33,6 @@ func TestParse(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
assert.NotNil(decompressed)
|
assert.NotNil(decompressed)
|
||||||
assert.Equal("f81a67a5e9c1", decompressed.NodeId)
|
assert.NotNil(decompressed.NodeInfo)
|
||||||
|
assert.Equal("f81a67a5e9c1", decompressed.NodeInfo.NodeId)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,29 +0,0 @@
|
||||||
package respond
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/FreifunkBremen/respond-collector/data"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
//MultiCollector struct
|
|
||||||
type MultiCollector struct {
|
|
||||||
collectors []*Collector
|
|
||||||
}
|
|
||||||
|
|
||||||
//NewMultiCollector create a list of collectors
|
|
||||||
func NewMultiCollector(interval time.Duration, onReceive OnReceive) *MultiCollector {
|
|
||||||
return &MultiCollector{
|
|
||||||
collectors: []*Collector{
|
|
||||||
NewCollector("statistics", 0, interval, data.Statistics{}, onReceive),
|
|
||||||
NewCollector("nodeinfo", time.Second*3, interval, data.NodeInfo{}, onReceive),
|
|
||||||
NewCollector("neighbours", time.Second*6, interval, data.Neighbours{}, onReceive),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//Close all Collections
|
|
||||||
func (multi *MultiCollector) Close() {
|
|
||||||
for _, col := range multi.collectors {
|
|
||||||
col.Close()
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue