Refactoring

This commit is contained in:
Julian Kornberger 2016-10-03 19:55:37 +02:00
parent 959521b209
commit d57d864ab0
7 changed files with 295 additions and 295 deletions

120
database/database.go Normal file
View File

@ -0,0 +1,120 @@
package database
import (
"log"
"sync"
"time"
"github.com/FreifunkBremen/respond-collector/models"
"github.com/influxdata/influxdb/client/v2"
imodels "github.com/influxdata/influxdb/models"
)
const (
MeasurementNode = "node" // Measurement for per-node statistics
MeasurementGlobal = "global" // Measurement for summarized global statistics
batchDuration = time.Second * 5
batchMaxSize = 500
)
type DB struct {
config *models.Config
client client.Client
points chan *client.Point
wg sync.WaitGroup
}
func New(config *models.Config) *DB {
// Make client
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: config.Influxdb.Addr,
Username: config.Influxdb.Username,
Password: config.Influxdb.Password,
})
if err != nil {
panic(err)
}
db := &DB{
config: config,
client: c,
points: make(chan *client.Point, 1000),
}
db.wg.Add(1)
go db.worker()
return db
}
func (db *DB) AddPoint(name string, tags imodels.Tags, fields imodels.Fields, time time.Time) {
point, err := client.NewPoint(name, tags.Map(), fields, time)
if err != nil {
panic(err)
}
db.points <- point
}
// Add data for a single node
func (db *DB) Add(nodeId string, node *models.Node) {
tags, fields := node.ToInflux()
db.AddPoint(MeasurementNode, tags, fields, time.Now())
}
func (db *DB) Close() {
close(db.points)
db.wg.Wait()
db.client.Close()
}
// stores data points in batches into the influxdb
func (db *DB) worker() {
bpConfig := client.BatchPointsConfig{
Database: db.config.Influxdb.Database,
Precision: "m",
}
var bp client.BatchPoints
var err error
var writeNow, closed bool
timer := time.NewTimer(batchDuration)
for !closed {
// wait for new points
select {
case point, ok := <-db.points:
if ok {
if bp == nil {
// create new batch
timer.Reset(batchDuration)
if bp, err = client.NewBatchPoints(bpConfig); err != nil {
log.Fatal(err)
}
}
bp.AddPoint(point)
} else {
closed = true
}
case <-timer.C:
if bp == nil {
timer.Reset(batchDuration)
} else {
writeNow = true
}
}
// write batch now?
if bp != nil && (writeNow || closed) || len(bp.Points()) >= batchMaxSize {
log.Println("saving", len(bp.Points()), "points")
if err = db.client.Write(bp); err != nil {
log.Fatal(err)
}
writeNow = false
bp = nil
}
}
timer.Stop()
db.wg.Done()
}

40
main.go
View File

@ -14,7 +14,7 @@ import (
"github.com/julienschmidt/httprouter"
"github.com/FreifunkBremen/respond-collector/api"
"github.com/FreifunkBremen/respond-collector/data"
"github.com/FreifunkBremen/respond-collector/database"
"github.com/FreifunkBremen/respond-collector/models"
"github.com/FreifunkBremen/respond-collector/respond"
)
@ -23,7 +23,7 @@ var (
configFile string
config *models.Config
collector *respond.Collector
statsDb *StatsDb
db *database.DB
nodes *models.Nodes
)
@ -31,15 +31,15 @@ func main() {
flag.StringVar(&configFile, "config", "config.yml", "path of configuration file (default:config.yaml)")
flag.Parse()
config = models.ReadConfigFile(configFile)
nodes = models.NewNodes(config)
if config.Influxdb.Enable {
statsDb = NewStatsDb()
db = database.New(config)
}
nodes = models.NewNodes(config)
if config.Respondd.Enable {
collectInterval := time.Second * time.Duration(config.Respondd.CollectInterval)
collector = respond.NewCollector("nodeinfo statistics neighbours", collectInterval, onReceive, config.Respondd.Interface)
collector = respond.NewCollector(db, nodes, collectInterval, config.Respondd.Interface)
}
if config.Webserver.Enable {
@ -70,33 +70,7 @@ func main() {
if collector != nil {
collector.Close()
}
if statsDb != nil {
statsDb.Close()
}
}
// called for every parsed announced-message
func onReceive(addr net.UDPAddr, res *data.ResponseData) {
// Search for NodeID
var nodeId string
if val := res.NodeInfo; val != nil {
nodeId = val.NodeId
} else if val := res.Neighbours; val != nil {
nodeId = val.NodeId
} else if val := res.Statistics; val != nil {
nodeId = val.NodeId
}
// Updates nodes if NodeID found
if len(nodeId) != 12 {
log.Printf("invalid NodeID '%s' from %s", nodeId, addr.String())
return
}
node := nodes.Update(nodeId, res)
if statsDb != nil && node.Statistics != nil {
statsDb.Add(nodeId, node)
if db != nil {
db.Close()
}
}

95
models/node.go Normal file
View File

@ -0,0 +1,95 @@
package models
import (
"strconv"
"github.com/FreifunkBremen/respond-collector/data"
"github.com/FreifunkBremen/respond-collector/jsontime"
"github.com/FreifunkBremen/respond-collector/meshviewer"
imodels "github.com/influxdata/influxdb/models"
)
// Node struct
type Node struct {
Firstseen jsontime.Time `json:"firstseen"`
Lastseen jsontime.Time `json:"lastseen"`
Flags *meshviewer.Flags `json:"flags,omitempty"`
Statistics *data.Statistics `json:"statistics"`
Nodeinfo *data.NodeInfo `json:"nodeinfo"`
Neighbours *data.Neighbours `json:"-"`
}
// Returns tags and fields for InfluxDB
func (node *Node) ToInflux() (tags imodels.Tags, fields imodels.Fields) {
stats := node.Statistics
tags.SetString("nodeid", stats.NodeId)
fields = map[string]interface{}{
"load": stats.LoadAverage,
"time.up": int64(stats.Uptime),
"time.idle": int64(stats.Idletime),
"proc.running": stats.Processes.Running,
"clients.wifi": stats.Clients.Wifi,
"clients.wifi24": stats.Clients.Wifi24,
"clients.wifi5": stats.Clients.Wifi5,
"clients.total": stats.Clients.Total,
"memory.buffers": stats.Memory.Buffers,
"memory.cached": stats.Memory.Cached,
"memory.free": stats.Memory.Free,
"memory.total": stats.Memory.Total,
}
if nodeinfo := node.Nodeinfo; nodeinfo != nil {
if owner := nodeinfo.Owner; owner != nil {
tags.SetString("owner", owner.Contact)
}
if wireless := nodeinfo.Wireless; wireless != nil {
fields["wireless.txpower24"] = wireless.TxPower24
fields["wireless.txpower5"] = wireless.TxPower5
}
// morpheus needs
tags.SetString("hostname", nodeinfo.Hostname)
}
if t := stats.Traffic.Rx; t != nil {
fields["traffic.rx.bytes"] = int64(t.Bytes)
fields["traffic.rx.packets"] = t.Packets
}
if t := stats.Traffic.Tx; t != nil {
fields["traffic.tx.bytes"] = int64(t.Bytes)
fields["traffic.tx.packets"] = t.Packets
fields["traffic.tx.dropped"] = t.Dropped
}
if t := stats.Traffic.Forward; t != nil {
fields["traffic.forward.bytes"] = int64(t.Bytes)
fields["traffic.forward.packets"] = t.Packets
}
if t := stats.Traffic.MgmtRx; t != nil {
fields["traffic.mgmt_rx.bytes"] = int64(t.Bytes)
fields["traffic.mgmt_rx.packets"] = t.Packets
}
if t := stats.Traffic.MgmtTx; t != nil {
fields["traffic.mgmt_tx.bytes"] = int64(t.Bytes)
fields["traffic.mgmt_tx.packets"] = t.Packets
}
if w := stats.Wireless; w != nil {
addAirtime := func(suffix string, time *data.WirelessAirtime) {
fields["airtime"+suffix+".chan_util"] = time.ChanUtil
fields["airtime"+suffix+".rx_util"] = time.RxUtil
fields["airtime"+suffix+".tx_util"] = time.TxUtil
fields["airtime"+suffix+".noise"] = time.Noise
fields["airtime"+suffix+".frequency"] = time.Frequency
tags.SetString("frequency"+suffix, strconv.Itoa(int(time.Frequency)))
}
if time := w.Airtime24; time != nil {
addAirtime("24", w.Airtime24)
}
if time := w.Airtime5; time != nil {
addAirtime("5", w.Airtime5)
}
}
return
}

View File

@ -13,16 +13,6 @@ import (
"github.com/FreifunkBremen/respond-collector/meshviewer"
)
// Node struct
type Node struct {
Firstseen jsontime.Time `json:"firstseen"`
Lastseen jsontime.Time `json:"lastseen"`
Flags *meshviewer.Flags `json:"flags,omitempty"`
Statistics *data.Statistics `json:"statistics"`
Nodeinfo *data.NodeInfo `json:"nodeinfo"`
Neighbours *data.Neighbours `json:"-"`
}
// Nodes struct: cache DB of Node's structs
type Nodes struct {
Version int `json:"version"`
@ -32,6 +22,14 @@ type Nodes struct {
sync.RWMutex
}
type GlobalStats struct {
Nodes uint32
Clients uint32
ClientsWifi uint32
ClientsWifi24 uint32
ClientsWifi5 uint32
}
// NewNodes create Nodes structs
func NewNodes(config *Config) *Nodes {
nodes := &Nodes{
@ -171,33 +169,32 @@ func (nodes *Nodes) worker() {
}
}
func (nodes *Nodes) GetStats() map[string]interface{} {
var nodesCount uint32
var clientsCount uint32
var clientsWifiCount uint32
var clientsWifi24Count uint32
var clientsWifi5Count uint32
// Returns global statistics for InfluxDB
func (nodes *Nodes) GlobalStats() (result GlobalStats) {
nodes.Lock()
for _, node := range nodes.List {
if node.Flags.Online {
nodesCount += 1
result.Nodes += 1
if stats := node.Statistics; stats != nil {
clientsCount += stats.Clients.Total
clientsWifi24Count += stats.Clients.Wifi24
clientsWifi5Count += stats.Clients.Wifi5
clientsWifiCount += stats.Clients.Wifi
result.Clients += stats.Clients.Total
result.ClientsWifi24 += stats.Clients.Wifi24
result.ClientsWifi5 += stats.Clients.Wifi5
result.ClientsWifi += stats.Clients.Wifi
}
}
}
nodes.Unlock()
return
}
// Returns fields for InfluxDB
func (stats *GlobalStats) Fields() map[string]interface{} {
return map[string]interface{}{
"nodes": nodesCount,
"clients.total": clientsCount,
"clients.wifi": clientsWifiCount,
"clients.wifi24": clientsWifi24Count,
"clients.wifi5": clientsWifi5Count,
"nodes": stats.Nodes,
"clients.total": stats.Clients,
"clients.wifi": stats.ClientsWifi,
"clients.wifi24": stats.ClientsWifi24,
"clients.wifi5": stats.ClientsWifi5,
}
}

View File

@ -10,6 +10,8 @@ import (
"time"
"github.com/FreifunkBremen/respond-collector/data"
"github.com/FreifunkBremen/respond-collector/database"
"github.com/FreifunkBremen/respond-collector/models"
)
//Collector for a specificle respond messages
@ -17,18 +19,17 @@ type Collector struct {
CollectType string
connection *net.UDPConn // UDP socket
queue chan *Response // received responses
onReceive OnReceive
msgType reflect.Type
intface string
iface string // interface name for the multicast binding
db *database.DB
nodes *models.Nodes
// Ticker and stopper
ticker *time.Ticker
stop chan interface{}
}
type OnReceive func(net.UDPAddr, *data.ResponseData)
//NewCollector creates a Collector struct
func NewCollector(CollectType string, interval time.Duration, onReceive OnReceive, intface string) *Collector {
// Creates a Collector struct
func NewCollector(db *database.DB, nodes *models.Nodes, interval time.Duration, iface string) *Collector {
// Parse address
addr, err := net.ResolveUDPAddr("udp", "[::]:0")
if err != nil {
@ -43,13 +44,13 @@ func NewCollector(CollectType string, interval time.Duration, onReceive OnReceiv
conn.SetReadBuffer(maxDataGramSize)
collector := &Collector{
CollectType: CollectType,
CollectType: "nodeinfo statistics neighbours",
connection: conn,
nodes: nodes,
iface: iface,
queue: make(chan *Response, 400),
ticker: time.NewTicker(interval),
stop: make(chan interface{}, 1),
onReceive: onReceive,
intface: intface,
}
go collector.receiver()
@ -75,7 +76,7 @@ func (coll *Collector) Close() {
}
func (coll *Collector) sendOnce() {
coll.sendPacket(net.JoinHostPort(multiCastGroup+"%"+coll.intface, port))
coll.sendPacket(net.JoinHostPort(multiCastGroup+"%"+coll.iface, port))
log.Println("request", coll.CollectType)
}
@ -104,25 +105,47 @@ func (coll *Collector) sender() {
func (coll *Collector) parser() {
for obj := range coll.queue {
if err := coll.parse(obj); err != nil {
if data, err := obj.parse(); err != nil {
log.Println("unable to decode response from", obj.Address.String(), err, "\n", string(obj.Raw))
} else {
coll.saveResponse(obj.Address, data)
}
}
}
func (coll *Collector) parse(response *Response) (err error) {
func (res *Response) parse() (*data.ResponseData, error) {
// Deflate
deflater := flate.NewReader(bytes.NewReader(response.Raw))
deflater := flate.NewReader(bytes.NewReader(res.Raw))
defer deflater.Close()
// Unmarshal
res := &data.ResponseData{}
if err = json.NewDecoder(deflater).Decode(res); err == nil {
coll.onReceive(response.Address, res)
rdata := &data.ResponseData{}
err := json.NewDecoder(deflater).Decode(rdata)
return rdata, err
}
func (coll *Collector) saveResponse(addr net.UDPAddr, res *data.ResponseData) {
// Search for NodeID
var nodeId string
if val := res.NodeInfo; val != nil {
nodeId = val.NodeId
} else if val := res.Neighbours; val != nil {
nodeId = val.NodeId
} else if val := res.Statistics; val != nil {
nodeId = val.NodeId
}
// Updates nodes if NodeID found
if len(nodeId) != 12 {
log.Printf("invalid NodeID '%s' from %s", nodeId, addr.String())
return
}
node := coll.nodes.Update(nodeId, res)
if coll.db != nil && node.Statistics != nil {
coll.db.Add(nodeId, node)
}
}
func (coll *Collector) receiver() {

View File

@ -2,37 +2,26 @@ package respond
import (
"io/ioutil"
"net"
"reflect"
"testing"
"github.com/FreifunkBremen/respond-collector/data"
"github.com/stretchr/testify/assert"
)
func TestParse(t *testing.T) {
assert := assert.New(t)
var decompressed *data.ResponseData
// callback function
onReceive := func(addr net.UDPAddr, res *data.ResponseData) {
decompressed = res
}
collector := &Collector{
msgType: reflect.TypeOf(data.NodeInfo{}),
onReceive: onReceive,
}
// read testdata
compressed, err := ioutil.ReadFile("testdata/nodeinfo.flated")
assert.Nil(err)
collector.parse(&Response{
res := &Response{
Raw: compressed,
})
}
assert.NotNil(decompressed)
assert.NotNil(decompressed.NodeInfo)
assert.Equal("f81a67a5e9c1", decompressed.NodeInfo.NodeId)
data, err := res.parse()
assert.NoError(err)
assert.NotNil(data)
assert.Equal("f81a67a5e9c1", data.NodeInfo.NodeId)
}

View File

@ -1,198 +0,0 @@
package main
import (
"log"
"strconv"
"sync"
"time"
"github.com/FreifunkBremen/respond-collector/data"
"github.com/FreifunkBremen/respond-collector/models"
"github.com/influxdata/influxdb/client/v2"
)
const (
batchDuration = time.Second * 5
)
type StatsDb struct {
points chan *client.Point
wg sync.WaitGroup
nodes *models.Nodes
client client.Client
}
func NewStatsDb() *StatsDb {
// Make client
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: config.Influxdb.Addr,
Username: config.Influxdb.Username,
Password: config.Influxdb.Password,
})
if err != nil {
panic(err)
}
db := &StatsDb{
client: c,
points: make(chan *client.Point, 500),
nodes: nodes,
}
// start worker
db.wg.Add(1)
go db.worker()
return db
}
func (c *StatsDb) Add(nodeId string, node *models.Node) {
stats := node.Statistics
tags := map[string]string{
"nodeid": nodeId,
}
fields := map[string]interface{}{
"load": stats.LoadAverage,
"idletime": int64(stats.Idletime),
"uptime": int64(stats.Uptime),
"processes.running": stats.Processes.Running,
"clients.wifi": stats.Clients.Wifi,
"clients.wifi24": stats.Clients.Wifi24,
"clients.wifi5": stats.Clients.Wifi5,
"clients.total": stats.Clients.Total,
"memory.buffers": stats.Memory.Buffers,
"memory.cached": stats.Memory.Cached,
"memory.free": stats.Memory.Free,
"memory.total": stats.Memory.Total,
}
if nodeinfo := node.Nodeinfo; nodeinfo != nil {
if owner := nodeinfo.Owner; owner != nil {
tags["owner"] = owner.Contact
}
if wireless := nodeinfo.Wireless; wireless != nil {
fields["wireless.txpower24"] = wireless.TxPower24
fields["wireless.txpower5"] = wireless.TxPower5
}
// morpheus needs
tags["hostname"] = nodeinfo.Hostname
}
if t := stats.Traffic.Rx; t != nil {
fields["traffic.rx.bytes"] = int64(t.Bytes)
fields["traffic.rx.packets"] = t.Packets
}
if t := stats.Traffic.Tx; t != nil {
fields["traffic.tx.bytes"] = int64(t.Bytes)
fields["traffic.tx.packets"] = t.Packets
fields["traffic.tx.dropped"] = t.Dropped
}
if t := stats.Traffic.Forward; t != nil {
fields["traffic.forward.bytes"] = int64(t.Bytes)
fields["traffic.forward.packets"] = t.Packets
}
if t := stats.Traffic.MgmtRx; t != nil {
fields["traffic.mgmt_rx.bytes"] = int64(t.Bytes)
fields["traffic.mgmt_rx.packets"] = t.Packets
}
if t := stats.Traffic.MgmtTx; t != nil {
fields["traffic.mgmt_tx.bytes"] = int64(t.Bytes)
fields["traffic.mgmt_tx.packets"] = t.Packets
}
if w := stats.Wireless; w != nil {
addAirtime := func(suffix string, time *data.WirelessAirtime) {
fields["airtime"+suffix+".chan_util"] = time.ChanUtil
fields["airtime"+suffix+".rx_util"] = time.RxUtil
fields["airtime"+suffix+".tx_util"] = time.TxUtil
fields["airtime"+suffix+".noise"] = time.Noise
fields["airtime"+suffix+".frequency"] = time.Frequency
tags["frequency"+suffix+""] = strconv.Itoa(int(time.Frequency))
}
if time := w.Airtime24; time != nil {
addAirtime("24", w.Airtime24)
}
if time := w.Airtime5; time != nil {
addAirtime("5", w.Airtime5)
}
}
point, err := client.NewPoint("node", tags, fields, time.Now())
if err != nil {
panic(err)
}
c.points <- point
}
func (c *StatsDb) Close() {
close(c.points)
c.wg.Wait()
c.client.Close()
}
// stores data points in batches into the influxdb
func (c *StatsDb) worker() {
bpConfig := client.BatchPointsConfig{
Database: config.Influxdb.Database,
Precision: "m",
}
var bp client.BatchPoints
var err error
var writeNow, closed bool
timer := time.NewTimer(batchDuration)
globalDuration := time.Second * time.Duration(config.Nodes.SaveInterval)
globalTimer := time.NewTimer(globalDuration)
for !closed {
// wait for new points
select {
case <-globalTimer.C:
point, err := client.NewPoint("global", nil, nodes.GetStats(), time.Now())
if err != nil {
panic(err)
}
c.points <- point
globalTimer.Reset(globalDuration)
log.Print("saving global point")
case point, ok := <-c.points:
if ok {
if bp == nil {
// create new batch
timer.Reset(batchDuration)
if bp, err = client.NewBatchPoints(bpConfig); err != nil {
log.Fatal(err)
}
}
bp.AddPoint(point)
} else {
closed = true
}
case <-timer.C:
if bp == nil {
timer.Reset(batchDuration)
} else {
writeNow = true
}
}
// write batch now?
if bp != nil && (writeNow || closed) {
log.Println("saving", len(bp.Points()), "points")
if err = c.client.Write(bp); err != nil {
log.Fatal(err)
}
writeNow = false
bp = nil
}
}
globalTimer.Stop()
timer.Stop()
c.wg.Done()
}