Send packets continuously

This commit is contained in:
Julian Kornberger 2016-03-11 23:56:23 +01:00
parent 7d66206815
commit 14ad523b7f
3 changed files with 29 additions and 24 deletions

View File

@ -50,7 +50,7 @@ func main() {
} }
if config.Respondd.Enable { if config.Respondd.Enable {
multiCollector = respond.NewMultiCollector(func(coll *respond.Collector, res *respond.Response) { multiCollector = respond.NewMultiCollector(collectInterval, func(coll *respond.Collector, res *respond.Response) {
switch coll.CollectType { switch coll.CollectType {
case "neighbours": case "neighbours":
@ -75,7 +75,6 @@ func main() {
log.Println("unknown CollectType:", coll.CollectType) log.Println("unknown CollectType:", coll.CollectType)
} }
}) })
go multiCollector.ListenAndSend(collectInterval)
} }
//TODO bad //TODO bad

View File

@ -12,10 +12,16 @@ type Collector struct {
connection *net.UDPConn // UDP socket connection *net.UDPConn // UDP socket
queue chan *Response // received responses queue chan *Response // received responses
parse func(coll *Collector, res *Response) parse func(coll *Collector, res *Response)
// Ticker and stopper
ticker *time.Ticker
stop chan interface{}
} }
type ParseFunc func(coll *Collector, res *Response)
//NewCollector creates a Collector struct //NewCollector creates a Collector struct
func NewCollector(CollectType string, parseFunc func(coll *Collector, res *Response)) *Collector { func NewCollector(CollectType string, interval time.Duration, parseFunc ParseFunc) *Collector {
// Parse address // Parse address
addr, err := net.ResolveUDPAddr("udp", "[::]:0") addr, err := net.ResolveUDPAddr("udp", "[::]:0")
if err != nil { if err != nil {
@ -34,18 +40,23 @@ func NewCollector(CollectType string, parseFunc func(coll *Collector, res *Respo
connection: conn, connection: conn,
queue: make(chan *Response, 400), queue: make(chan *Response, 400),
parse: parseFunc, parse: parseFunc,
ticker: time.NewTicker(interval),
stop: make(chan interface{}, 1),
} }
go collector.receiver() go collector.receiver()
go collector.parser() go collector.parser()
go collector.sender()
collector.sendOnce()
return collector return collector
} }
//Close Collector // Close Collector
func (coll *Collector) Close() { func (coll *Collector) Close() {
// stop ticker
coll.ticker.Stop()
coll.stop <- nil
coll.connection.Close() coll.connection.Close()
close(coll.queue) close(coll.queue)
} }
@ -66,12 +77,15 @@ func (coll *Collector) sendPacket(address string) {
} }
} }
func (coll *Collector) sender(collectInterval time.Duration) { // send packets continously
c := time.Tick(collectInterval) func (coll *Collector) sender() {
for {
for range c { select {
// TODO break condition case <-coll.stop:
coll.sendOnce() return
case <-coll.ticker.C:
coll.sendOnce()
}
} }
} }
@ -98,6 +112,5 @@ func (coll *Collector) receiver() {
Address: *src, Address: *src,
Raw: raw, Raw: raw,
} }
log.Println("received", coll.CollectType, "from", src)
} }
} }

View File

@ -8,23 +8,16 @@ type MultiCollector struct {
} }
//NewMultiCollector create a list of collectors //NewMultiCollector create a list of collectors
func NewMultiCollector(parseFunc func(coll *Collector, res *Response)) *MultiCollector { func NewMultiCollector(interval time.Duration, parseFunc ParseFunc) *MultiCollector {
return &MultiCollector{ return &MultiCollector{
collectors: []*Collector{ collectors: []*Collector{
NewCollector("statistics", parseFunc), NewCollector("statistics", interval, parseFunc),
NewCollector("nodeinfo", parseFunc), NewCollector("nodeinfo", interval, parseFunc),
NewCollector("neighbours", parseFunc), NewCollector("neighbours", interval, parseFunc),
}, },
} }
} }
//ListenAndSend on Collection
func (multi *MultiCollector) ListenAndSend(collectInterval time.Duration) {
for _, col := range multi.collectors {
col.sender(collectInterval)
}
}
//Close all Collections //Close all Collections
func (multi *MultiCollector) Close() { func (multi *MultiCollector) Close() {
for _, col := range multi.collectors { for _, col := range multi.collectors {