Do some magic reflection stuff
This commit is contained in:
parent
14ad523b7f
commit
9923e0a4f2
32
main.go
32
main.go
|
@ -1,7 +1,6 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
|
||||||
"flag"
|
"flag"
|
||||||
"log"
|
"log"
|
||||||
"net"
|
"net"
|
||||||
|
@ -50,29 +49,16 @@ func main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.Respondd.Enable {
|
if config.Respondd.Enable {
|
||||||
multiCollector = respond.NewMultiCollector(collectInterval, func(coll *respond.Collector, res *respond.Response) {
|
multiCollector = respond.NewMultiCollector(collectInterval, func(addr net.UDPAddr, msg interface{}) {
|
||||||
|
switch msg := msg.(type) {
|
||||||
switch coll.CollectType {
|
case *data.NodeInfo:
|
||||||
case "neighbours":
|
nodes.Get(msg.NodeId).Nodeinfo = msg
|
||||||
result := &data.NeighbourStruct{}
|
case *data.NeighbourStruct:
|
||||||
if json.Unmarshal(res.Raw, result) == nil {
|
nodes.Get(msg.NodeId).Neighbours = msg
|
||||||
node := nodes.Get(result.NodeId)
|
case *data.StatisticsStruct:
|
||||||
node.Neighbours = result
|
nodes.Get(msg.NodeId).Statistics = msg
|
||||||
}
|
|
||||||
case "nodeinfo":
|
|
||||||
result := &data.NodeInfo{}
|
|
||||||
if json.Unmarshal(res.Raw, result) == nil {
|
|
||||||
node := nodes.Get(result.NodeId)
|
|
||||||
node.Nodeinfo = result
|
|
||||||
}
|
|
||||||
case "statistics":
|
|
||||||
result := &data.StatisticsStruct{}
|
|
||||||
if json.Unmarshal(res.Raw, result) == nil {
|
|
||||||
node := nodes.Get(result.NodeId)
|
|
||||||
node.Statistics = result
|
|
||||||
}
|
|
||||||
default:
|
default:
|
||||||
log.Println("unknown CollectType:", coll.CollectType)
|
log.Println("unknown message:", msg)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,10 @@
|
||||||
package respond
|
package respond
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
"log"
|
"log"
|
||||||
"net"
|
"net"
|
||||||
|
"reflect"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -11,17 +13,18 @@ type Collector struct {
|
||||||
CollectType string
|
CollectType string
|
||||||
connection *net.UDPConn // UDP socket
|
connection *net.UDPConn // UDP socket
|
||||||
queue chan *Response // received responses
|
queue chan *Response // received responses
|
||||||
parse func(coll *Collector, res *Response)
|
onReceive OnReceive
|
||||||
|
msgType reflect.Type
|
||||||
|
|
||||||
// Ticker and stopper
|
// Ticker and stopper
|
||||||
ticker *time.Ticker
|
ticker *time.Ticker
|
||||||
stop chan interface{}
|
stop chan interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
type ParseFunc func(coll *Collector, res *Response)
|
type OnReceive func(net.UDPAddr, interface{})
|
||||||
|
|
||||||
//NewCollector creates a Collector struct
|
//NewCollector creates a Collector struct
|
||||||
func NewCollector(CollectType string, interval time.Duration, parseFunc ParseFunc) *Collector {
|
func NewCollector(CollectType string, interval time.Duration, msgStruct interface{}, onReceive OnReceive) *Collector {
|
||||||
// Parse address
|
// Parse address
|
||||||
addr, err := net.ResolveUDPAddr("udp", "[::]:0")
|
addr, err := net.ResolveUDPAddr("udp", "[::]:0")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -39,9 +42,10 @@ func NewCollector(CollectType string, interval time.Duration, parseFunc ParseFun
|
||||||
CollectType: CollectType,
|
CollectType: CollectType,
|
||||||
connection: conn,
|
connection: conn,
|
||||||
queue: make(chan *Response, 400),
|
queue: make(chan *Response, 400),
|
||||||
parse: parseFunc,
|
|
||||||
ticker: time.NewTicker(interval),
|
ticker: time.NewTicker(interval),
|
||||||
stop: make(chan interface{}, 1),
|
stop: make(chan interface{}, 1),
|
||||||
|
msgType: reflect.TypeOf(msgStruct),
|
||||||
|
onReceive: onReceive,
|
||||||
}
|
}
|
||||||
|
|
||||||
go collector.receiver()
|
go collector.receiver()
|
||||||
|
@ -91,7 +95,14 @@ func (coll *Collector) sender() {
|
||||||
|
|
||||||
func (coll *Collector) parser() {
|
func (coll *Collector) parser() {
|
||||||
for obj := range coll.queue {
|
for obj := range coll.queue {
|
||||||
coll.parse(coll, obj)
|
// create new struct instance
|
||||||
|
data := reflect.New(coll.msgType).Interface()
|
||||||
|
|
||||||
|
if err := json.Unmarshal(obj.Raw, data); err == nil {
|
||||||
|
coll.onReceive(obj.Address, data)
|
||||||
|
} else {
|
||||||
|
log.Println("unable to decode response from", obj.Address.String(), err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,9 @@
|
||||||
package respond
|
package respond
|
||||||
|
|
||||||
import "time"
|
import (
|
||||||
|
"github.com/ffdo/node-informant/gluon-collector/data"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
//MultiCollector struct
|
//MultiCollector struct
|
||||||
type MultiCollector struct {
|
type MultiCollector struct {
|
||||||
|
@ -8,12 +11,12 @@ type MultiCollector struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
//NewMultiCollector create a list of collectors
|
//NewMultiCollector create a list of collectors
|
||||||
func NewMultiCollector(interval time.Duration, parseFunc ParseFunc) *MultiCollector {
|
func NewMultiCollector(interval time.Duration, onReceive OnReceive) *MultiCollector {
|
||||||
return &MultiCollector{
|
return &MultiCollector{
|
||||||
collectors: []*Collector{
|
collectors: []*Collector{
|
||||||
NewCollector("statistics", interval, parseFunc),
|
NewCollector("statistics", interval, data.StatisticsStruct{}, onReceive),
|
||||||
NewCollector("nodeinfo", interval, parseFunc),
|
NewCollector("nodeinfo", interval, data.NodeInfo{}, onReceive),
|
||||||
NewCollector("neighbours", interval, parseFunc),
|
NewCollector("neighbours", interval, data.NeighbourStruct{}, onReceive),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue