diff --git a/cmd/respond-collector/main.go b/cmd/respond-collector/main.go index 726d35e..7d2d33b 100644 --- a/cmd/respond-collector/main.go +++ b/cmd/respond-collector/main.go @@ -50,7 +50,8 @@ func main() { if config.Respondd.Enable { collectInterval := time.Second * time.Duration(config.Respondd.CollectInterval) - collector = respond.NewCollector(db, nodes, collectInterval, config.Respondd.Interface) + collector = respond.NewCollector(db, nodes, config.Respondd.Interface) + collector.Start(collectInterval) defer collector.Close() } diff --git a/respond/collector.go b/respond/collector.go index d206f46..61aa0ee 100644 --- a/respond/collector.go +++ b/respond/collector.go @@ -4,6 +4,7 @@ import ( "bytes" "compress/flate" "encoding/json" + "io/ioutil" "log" "net" "reflect" @@ -29,7 +30,7 @@ type Collector struct { } // Creates a Collector struct -func NewCollector(db *database.DB, nodes *models.Nodes, interval time.Duration, iface string) *Collector { +func NewCollector(db *database.DB, nodes *models.Nodes, iface string) *Collector { // Parse address addr, err := net.ResolveUDPAddr("udp", "[::]:0") if err != nil { @@ -49,7 +50,6 @@ func NewCollector(db *database.DB, nodes *models.Nodes, interval time.Duration, nodes: nodes, multicastAddr: net.JoinHostPort(multiCastGroup+"%"+iface, port), queue: make(chan *Response, 400), - ticker: time.NewTicker(interval), stop: make(chan interface{}, 1), } @@ -60,30 +60,40 @@ func NewCollector(db *database.DB, nodes *models.Nodes, interval time.Duration, go collector.globalStatsWorker() } - // Run senders - go func() { - collector.sendOnce() // immediately - collector.sender() // periodically - }() - return collector } +// Start Collector +func (coll *Collector) Start(interval time.Duration) { + if coll.ticker != nil { + panic("already started") + } + + coll.ticker = time.NewTicker(interval) + go func() { + coll.sendOnce() // immediately + coll.sender() // periodically + }() +} + // Close Collector func (coll *Collector) Close() { // stop ticker - coll.ticker.Stop() - close(coll.stop) + if coll.ticker != nil { + coll.ticker.Stop() + close(coll.stop) + } coll.connection.Close() close(coll.queue) } func (coll *Collector) sendOnce() { - coll.sendPacket(coll.multicastAddr) + coll.SendPacket(coll.multicastAddr) } -func (coll *Collector) sendPacket(address string) { +// Sends a UDP request to the given unicast or multicast address +func (coll *Collector) SendPacket(address string) { addr, err := net.ResolveUDPAddr("udp", address) if err != nil { log.Panic(err)