2016-02-26 09:28:31 +01:00
|
|
|
package respond
|
2015-12-29 04:08:03 +01:00
|
|
|
|
|
|
|
import (
|
2016-03-19 01:50:23 +01:00
|
|
|
"bytes"
|
|
|
|
"compress/flate"
|
2016-03-12 00:59:36 +01:00
|
|
|
"encoding/json"
|
2015-12-29 04:08:03 +01:00
|
|
|
"log"
|
|
|
|
"net"
|
2016-03-12 00:59:36 +01:00
|
|
|
"reflect"
|
2015-12-29 04:08:03 +01:00
|
|
|
"time"
|
2016-03-19 23:18:26 +01:00
|
|
|
|
|
|
|
"github.com/FreifunkBremen/respond-collector/data"
|
2016-10-03 19:55:37 +02:00
|
|
|
"github.com/FreifunkBremen/respond-collector/database"
|
|
|
|
"github.com/FreifunkBremen/respond-collector/models"
|
2015-12-29 04:08:03 +01:00
|
|
|
)
|
|
|
|
|
2016-02-26 09:28:31 +01:00
|
|
|
//Collector for a specificle respond messages
|
2015-12-29 04:08:03 +01:00
|
|
|
type Collector struct {
|
2016-10-04 01:05:18 +02:00
|
|
|
CollectType string
|
|
|
|
connection *net.UDPConn // UDP socket
|
|
|
|
queue chan *Response // received responses
|
|
|
|
msgType reflect.Type
|
|
|
|
multicastAddr string
|
|
|
|
db *database.DB
|
|
|
|
nodes *models.Nodes
|
2016-03-11 23:56:23 +01:00
|
|
|
// Ticker and stopper
|
|
|
|
ticker *time.Ticker
|
|
|
|
stop chan interface{}
|
2015-12-29 04:08:03 +01:00
|
|
|
}
|
|
|
|
|
2016-10-03 19:55:37 +02:00
|
|
|
// Creates a Collector struct
|
|
|
|
func NewCollector(db *database.DB, nodes *models.Nodes, interval time.Duration, iface string) *Collector {
|
2015-12-29 04:08:03 +01:00
|
|
|
// Parse address
|
2016-02-25 21:06:37 +01:00
|
|
|
addr, err := net.ResolveUDPAddr("udp", "[::]:0")
|
2015-12-29 04:08:03 +01:00
|
|
|
if err != nil {
|
|
|
|
log.Panic(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Open socket
|
|
|
|
conn, err := net.ListenUDP("udp", addr)
|
|
|
|
if err != nil {
|
|
|
|
log.Panic(err)
|
|
|
|
}
|
2016-02-25 21:06:37 +01:00
|
|
|
conn.SetReadBuffer(maxDataGramSize)
|
2015-12-29 04:08:03 +01:00
|
|
|
|
|
|
|
collector := &Collector{
|
2016-10-04 01:05:18 +02:00
|
|
|
connection: conn,
|
2016-10-04 14:54:19 +02:00
|
|
|
db: db,
|
2016-10-04 01:05:18 +02:00
|
|
|
nodes: nodes,
|
|
|
|
multicastAddr: net.JoinHostPort(multiCastGroup+"%"+iface, port),
|
|
|
|
queue: make(chan *Response, 400),
|
|
|
|
ticker: time.NewTicker(interval),
|
|
|
|
stop: make(chan interface{}, 1),
|
2015-12-29 04:08:03 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
go collector.receiver()
|
|
|
|
go collector.parser()
|
2016-03-12 01:04:22 +01:00
|
|
|
|
|
|
|
// Run senders
|
2016-03-12 18:26:51 +01:00
|
|
|
go func() {
|
|
|
|
collector.sendOnce() // immediately
|
|
|
|
collector.sender() // periodically
|
|
|
|
}()
|
2016-02-19 17:14:14 +01:00
|
|
|
|
2015-12-29 04:08:03 +01:00
|
|
|
return collector
|
|
|
|
}
|
|
|
|
|
2016-03-11 23:56:23 +01:00
|
|
|
// Close Collector
|
2015-12-29 04:08:03 +01:00
|
|
|
func (coll *Collector) Close() {
|
2016-03-11 23:56:23 +01:00
|
|
|
// stop ticker
|
|
|
|
coll.ticker.Stop()
|
2016-10-04 01:05:18 +02:00
|
|
|
close(coll.stop)
|
2016-03-11 23:56:23 +01:00
|
|
|
|
2015-12-29 04:08:03 +01:00
|
|
|
coll.connection.Close()
|
|
|
|
close(coll.queue)
|
|
|
|
}
|
|
|
|
|
2015-12-29 14:05:47 +01:00
|
|
|
func (coll *Collector) sendOnce() {
|
2016-10-04 01:05:18 +02:00
|
|
|
coll.sendPacket(coll.multicastAddr)
|
2015-12-29 14:05:47 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
func (coll *Collector) sendPacket(address string) {
|
2015-12-29 04:08:03 +01:00
|
|
|
addr, err := net.ResolveUDPAddr("udp", address)
|
2016-02-19 11:13:30 +01:00
|
|
|
if err != nil {
|
|
|
|
log.Panic(err)
|
|
|
|
}
|
2015-12-29 14:05:47 +01:00
|
|
|
|
2016-10-04 01:05:18 +02:00
|
|
|
if _, err := coll.connection.WriteToUDP([]byte("GET nodeinfo statistics neighbours"), addr); err != nil {
|
2016-03-07 01:37:07 +01:00
|
|
|
log.Println("WriteToUDP failed:", err)
|
|
|
|
}
|
2015-12-29 04:08:03 +01:00
|
|
|
}
|
|
|
|
|
2016-03-11 23:56:23 +01:00
|
|
|
// send packets continously
|
|
|
|
func (coll *Collector) sender() {
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-coll.stop:
|
|
|
|
return
|
|
|
|
case <-coll.ticker.C:
|
2016-10-04 01:05:18 +02:00
|
|
|
// save global statistics
|
|
|
|
coll.db.AddPoint(database.MeasurementGlobal, nil, coll.nodes.GlobalStats().Fields(), time.Now())
|
|
|
|
|
|
|
|
// send the multicast packet to request per-node statistics
|
2016-03-11 23:56:23 +01:00
|
|
|
coll.sendOnce()
|
|
|
|
}
|
2015-12-29 04:08:03 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (coll *Collector) parser() {
|
2016-01-04 02:07:09 +01:00
|
|
|
for obj := range coll.queue {
|
2016-10-03 19:55:37 +02:00
|
|
|
if data, err := obj.parse(); err != nil {
|
2016-03-12 13:32:55 +01:00
|
|
|
log.Println("unable to decode response from", obj.Address.String(), err, "\n", string(obj.Raw))
|
2016-10-03 19:55:37 +02:00
|
|
|
} else {
|
|
|
|
coll.saveResponse(obj.Address, data)
|
2016-03-12 00:59:36 +01:00
|
|
|
}
|
2015-12-29 04:08:03 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-10-03 19:55:37 +02:00
|
|
|
func (res *Response) parse() (*data.ResponseData, error) {
|
2016-03-19 23:27:07 +01:00
|
|
|
// Deflate
|
2016-10-03 19:55:37 +02:00
|
|
|
deflater := flate.NewReader(bytes.NewReader(res.Raw))
|
2016-03-19 23:27:07 +01:00
|
|
|
defer deflater.Close()
|
2016-03-19 15:07:44 +01:00
|
|
|
|
2016-03-19 23:27:07 +01:00
|
|
|
// Unmarshal
|
2016-10-03 19:55:37 +02:00
|
|
|
rdata := &data.ResponseData{}
|
|
|
|
err := json.NewDecoder(deflater).Decode(rdata)
|
|
|
|
|
|
|
|
return rdata, err
|
|
|
|
}
|
|
|
|
|
|
|
|
func (coll *Collector) saveResponse(addr net.UDPAddr, res *data.ResponseData) {
|
|
|
|
// Search for NodeID
|
|
|
|
var nodeId string
|
|
|
|
if val := res.NodeInfo; val != nil {
|
|
|
|
nodeId = val.NodeId
|
|
|
|
} else if val := res.Neighbours; val != nil {
|
|
|
|
nodeId = val.NodeId
|
|
|
|
} else if val := res.Statistics; val != nil {
|
|
|
|
nodeId = val.NodeId
|
|
|
|
}
|
|
|
|
|
|
|
|
// Updates nodes if NodeID found
|
|
|
|
if len(nodeId) != 12 {
|
|
|
|
log.Printf("invalid NodeID '%s' from %s", nodeId, addr.String())
|
|
|
|
return
|
2016-03-19 15:07:44 +01:00
|
|
|
}
|
2016-10-03 19:55:37 +02:00
|
|
|
node := coll.nodes.Update(nodeId, res)
|
2016-03-19 15:07:44 +01:00
|
|
|
|
2016-10-03 19:55:37 +02:00
|
|
|
if coll.db != nil && node.Statistics != nil {
|
|
|
|
coll.db.Add(nodeId, node)
|
|
|
|
}
|
2016-03-19 01:50:23 +01:00
|
|
|
}
|
|
|
|
|
2015-12-29 04:08:03 +01:00
|
|
|
func (coll *Collector) receiver() {
|
2016-02-25 21:06:37 +01:00
|
|
|
buf := make([]byte, maxDataGramSize)
|
2015-12-29 04:08:03 +01:00
|
|
|
for {
|
2016-01-04 02:07:09 +01:00
|
|
|
n, src, err := coll.connection.ReadFromUDP(buf)
|
2015-12-29 04:08:03 +01:00
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
log.Println("ReadFromUDP failed:", err)
|
2015-12-29 14:05:47 +01:00
|
|
|
return
|
2015-12-29 04:08:03 +01:00
|
|
|
}
|
2016-01-04 02:07:09 +01:00
|
|
|
|
|
|
|
raw := make([]byte, n)
|
|
|
|
copy(raw, buf)
|
|
|
|
|
|
|
|
coll.queue <- &Response{
|
|
|
|
Address: *src,
|
|
|
|
Raw: raw,
|
|
|
|
}
|
2015-12-29 04:08:03 +01:00
|
|
|
}
|
|
|
|
}
|