Extract Start() function from Collector

Makes it easier to test.
This commit is contained in:
Julian Kornberger 2016-12-22 03:06:46 +01:00
parent 252ce89fc0
commit a9cdc623ca
2 changed files with 24 additions and 13 deletions

View File

@ -50,7 +50,8 @@ func main() {
if config.Respondd.Enable { if config.Respondd.Enable {
collectInterval := time.Second * time.Duration(config.Respondd.CollectInterval) collectInterval := time.Second * time.Duration(config.Respondd.CollectInterval)
collector = respond.NewCollector(db, nodes, collectInterval, config.Respondd.Interface) collector = respond.NewCollector(db, nodes, config.Respondd.Interface)
collector.Start(collectInterval)
defer collector.Close() defer collector.Close()
} }

View File

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"compress/flate" "compress/flate"
"encoding/json" "encoding/json"
"io/ioutil"
"log" "log"
"net" "net"
"reflect" "reflect"
@ -29,7 +30,7 @@ type Collector struct {
} }
// Creates a Collector struct // Creates a Collector struct
func NewCollector(db *database.DB, nodes *models.Nodes, interval time.Duration, iface string) *Collector { func NewCollector(db *database.DB, nodes *models.Nodes, iface string) *Collector {
// Parse address // Parse address
addr, err := net.ResolveUDPAddr("udp", "[::]:0") addr, err := net.ResolveUDPAddr("udp", "[::]:0")
if err != nil { if err != nil {
@ -49,7 +50,6 @@ func NewCollector(db *database.DB, nodes *models.Nodes, interval time.Duration,
nodes: nodes, nodes: nodes,
multicastAddr: net.JoinHostPort(multiCastGroup+"%"+iface, port), multicastAddr: net.JoinHostPort(multiCastGroup+"%"+iface, port),
queue: make(chan *Response, 400), queue: make(chan *Response, 400),
ticker: time.NewTicker(interval),
stop: make(chan interface{}, 1), stop: make(chan interface{}, 1),
} }
@ -60,30 +60,40 @@ func NewCollector(db *database.DB, nodes *models.Nodes, interval time.Duration,
go collector.globalStatsWorker() go collector.globalStatsWorker()
} }
// Run senders
go func() {
collector.sendOnce() // immediately
collector.sender() // periodically
}()
return collector return collector
} }
// Start Collector
func (coll *Collector) Start(interval time.Duration) {
if coll.ticker != nil {
panic("already started")
}
coll.ticker = time.NewTicker(interval)
go func() {
coll.sendOnce() // immediately
coll.sender() // periodically
}()
}
// Close Collector // Close Collector
func (coll *Collector) Close() { func (coll *Collector) Close() {
// stop ticker // stop ticker
coll.ticker.Stop() if coll.ticker != nil {
close(coll.stop) coll.ticker.Stop()
close(coll.stop)
}
coll.connection.Close() coll.connection.Close()
close(coll.queue) close(coll.queue)
} }
func (coll *Collector) sendOnce() { func (coll *Collector) sendOnce() {
coll.sendPacket(coll.multicastAddr) coll.SendPacket(coll.multicastAddr)
} }
func (coll *Collector) sendPacket(address string) { // Sends a UDP request to the given unicast or multicast address
func (coll *Collector) SendPacket(address string) {
addr, err := net.ResolveUDPAddr("udp", address) addr, err := net.ResolveUDPAddr("udp", address)
if err != nil { if err != nil {
log.Panic(err) log.Panic(err)