From 14ad523b7fb0382cf3dd3f2660024c4474b72128 Mon Sep 17 00:00:00 2001 From: Julian Kornberger Date: Fri, 11 Mar 2016 23:56:23 +0100 Subject: [PATCH] Send packets continuously --- main.go | 3 +-- respond/collector.go | 35 ++++++++++++++++++++++++----------- respond/multi_collector.go | 15 ++++----------- 3 files changed, 29 insertions(+), 24 deletions(-) diff --git a/main.go b/main.go index e986639..1e25f1a 100644 --- a/main.go +++ b/main.go @@ -50,7 +50,7 @@ func main() { } if config.Respondd.Enable { - multiCollector = respond.NewMultiCollector(func(coll *respond.Collector, res *respond.Response) { + multiCollector = respond.NewMultiCollector(collectInterval, func(coll *respond.Collector, res *respond.Response) { switch coll.CollectType { case "neighbours": @@ -75,7 +75,6 @@ func main() { log.Println("unknown CollectType:", coll.CollectType) } }) - go multiCollector.ListenAndSend(collectInterval) } //TODO bad diff --git a/respond/collector.go b/respond/collector.go index 5358d01..9b68710 100644 --- a/respond/collector.go +++ b/respond/collector.go @@ -12,10 +12,16 @@ type Collector struct { connection *net.UDPConn // UDP socket queue chan *Response // received responses parse func(coll *Collector, res *Response) + + // Ticker and stopper + ticker *time.Ticker + stop chan interface{} } +type ParseFunc func(coll *Collector, res *Response) + //NewCollector creates a Collector struct -func NewCollector(CollectType string, parseFunc func(coll *Collector, res *Response)) *Collector { +func NewCollector(CollectType string, interval time.Duration, parseFunc ParseFunc) *Collector { // Parse address addr, err := net.ResolveUDPAddr("udp", "[::]:0") if err != nil { @@ -34,18 +40,23 @@ func NewCollector(CollectType string, parseFunc func(coll *Collector, res *Respo connection: conn, queue: make(chan *Response, 400), parse: parseFunc, + ticker: time.NewTicker(interval), + stop: make(chan interface{}, 1), } go collector.receiver() go collector.parser() - - collector.sendOnce() + go collector.sender() return collector } -//Close Collector +// Close Collector func (coll *Collector) Close() { + // stop ticker + coll.ticker.Stop() + coll.stop <- nil + coll.connection.Close() close(coll.queue) } @@ -66,12 +77,15 @@ func (coll *Collector) sendPacket(address string) { } } -func (coll *Collector) sender(collectInterval time.Duration) { - c := time.Tick(collectInterval) - - for range c { - // TODO break condition - coll.sendOnce() +// send packets continously +func (coll *Collector) sender() { + for { + select { + case <-coll.stop: + return + case <-coll.ticker.C: + coll.sendOnce() + } } } @@ -98,6 +112,5 @@ func (coll *Collector) receiver() { Address: *src, Raw: raw, } - log.Println("received", coll.CollectType, "from", src) } } diff --git a/respond/multi_collector.go b/respond/multi_collector.go index 8f4253a..75a35ae 100644 --- a/respond/multi_collector.go +++ b/respond/multi_collector.go @@ -8,23 +8,16 @@ type MultiCollector struct { } //NewMultiCollector create a list of collectors -func NewMultiCollector(parseFunc func(coll *Collector, res *Response)) *MultiCollector { +func NewMultiCollector(interval time.Duration, parseFunc ParseFunc) *MultiCollector { return &MultiCollector{ collectors: []*Collector{ - NewCollector("statistics", parseFunc), - NewCollector("nodeinfo", parseFunc), - NewCollector("neighbours", parseFunc), + NewCollector("statistics", interval, parseFunc), + NewCollector("nodeinfo", interval, parseFunc), + NewCollector("neighbours", interval, parseFunc), }, } } -//ListenAndSend on Collection -func (multi *MultiCollector) ListenAndSend(collectInterval time.Duration) { - for _, col := range multi.collectors { - col.sender(collectInterval) - } -} - //Close all Collections func (multi *MultiCollector) Close() { for _, col := range multi.collectors {