diff --git a/respond/collector.go b/respond/collector.go index e30422b..1fcf0e0 100644 --- a/respond/collector.go +++ b/respond/collector.go @@ -6,7 +6,6 @@ import ( "encoding/json" "log" "net" - "reflect" "time" "github.com/FreifunkBremen/respond-collector/data" @@ -16,16 +15,13 @@ import ( //Collector for a specificle respond messages type Collector struct { - CollectType string connection *net.UDPConn // UDP socket queue chan *Response // received responses - msgType reflect.Type multicastAddr string db *database.DB nodes *models.Nodes - // Ticker and stopper - ticker *time.Ticker - stop chan interface{} + interval time.Duration // Interval for multicast packets + stop chan interface{} } // NewCollector creates a Collector struct @@ -49,7 +45,7 @@ func NewCollector(db *database.DB, nodes *models.Nodes, iface string) *Collector nodes: nodes, multicastAddr: net.JoinHostPort(multiCastGroup+"%"+iface, port), queue: make(chan *Response, 400), - stop: make(chan interface{}, 1), + stop: make(chan interface{}), } go collector.receiver() @@ -64,11 +60,14 @@ func NewCollector(db *database.DB, nodes *models.Nodes, iface string) *Collector // Start Collector func (coll *Collector) Start(interval time.Duration) { - if coll.ticker != nil { + if coll.interval != 0 { panic("already started") } + if interval <= 0 { + panic("invalid collector interval") + } + coll.interval = interval - coll.ticker = time.NewTicker(interval) go func() { coll.sendOnce() // immediately coll.sender() // periodically @@ -77,12 +76,7 @@ func (coll *Collector) Start(interval time.Duration) { // Close Collector func (coll *Collector) Close() { - // stop ticker - if coll.ticker != nil { - coll.ticker.Stop() - close(coll.stop) - } - + close(coll.stop) coll.connection.Close() close(coll.queue) } @@ -91,7 +85,7 @@ func (coll *Collector) sendOnce() { coll.SendPacket(coll.multicastAddr) } -// Sends a UDP request to the given unicast or multicast address +// SendPacket send a UDP request to the given unicast or multicast address func (coll *Collector) SendPacket(address string) { addr, err := net.ResolveUDPAddr("udp", address) if err != nil { @@ -105,11 +99,13 @@ func (coll *Collector) SendPacket(address string) { // send packets continously func (coll *Collector) sender() { + ticker := time.NewTicker(coll.interval) for { select { case <-coll.stop: + ticker.Stop() return - case <-coll.ticker.C: + case <-ticker.C: // send the multicast packet to request per-node statistics coll.sendOnce() } @@ -149,13 +145,16 @@ func (coll *Collector) saveResponse(addr net.UDPAddr, res *data.ResponseData) { nodeID = val.NodeID } - // Updates nodes if NodeID found + // Check length of nodeID if len(nodeID) != 12 { log.Printf("invalid NodeID '%s' from %s", nodeID, addr.String()) return } + + // Process the data node := coll.nodes.Update(nodeID, res) + // Store statistics in InfluxDB if coll.db != nil && node.Statistics != nil { coll.db.Add(nodeID, node) } @@ -186,6 +185,7 @@ func (coll *Collector) globalStatsWorker() { for { select { case <-coll.stop: + ticker.Stop() return case <-ticker.C: coll.saveGlobalStats()