Merge pull request #110 from FreifunkBremen/fix-sleep

[TASK] refactoring
This commit is contained in:
Julian K 2018-01-13 15:29:07 +01:00 committed by GitHub
commit 873f469c63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
47 changed files with 418 additions and 631 deletions

View File

@ -2,13 +2,24 @@ package cmd
import ( import (
"fmt" "fmt"
"io/ioutil"
"os" "os"
"github.com/FreifunkBremen/yanic/database" "github.com/FreifunkBremen/yanic/database"
"github.com/FreifunkBremen/yanic/respond" "github.com/FreifunkBremen/yanic/respond"
"github.com/FreifunkBremen/yanic/runtime" "github.com/FreifunkBremen/yanic/runtime"
"github.com/FreifunkBremen/yanic/webserver"
"github.com/naoina/toml"
) )
// Config represents the whole configuration
type Config struct {
Respondd respond.Config
Webserver webserver.Config
Nodes runtime.NodesConfig
Database database.Config
}
var ( var (
configPath string configPath string
collector *respond.Collector collector *respond.Collector
@ -16,11 +27,28 @@ var (
nodes *runtime.Nodes nodes *runtime.Nodes
) )
func loadConfig() *runtime.Config { func loadConfig() *Config {
config, err := runtime.ReadConfigFile(configPath) config, err := ReadConfigFile(configPath)
if err != nil { if err != nil {
fmt.Fprintln(os.Stderr, "unable to load config file:", err) fmt.Fprintln(os.Stderr, "unable to load config file:", err)
os.Exit(2) os.Exit(2)
} }
return config return config
} }
// ReadConfigFile reads a config model from path of a yml file
func ReadConfigFile(path string) (config *Config, err error) {
config = &Config{}
file, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
}
err = toml.Unmarshal(file, config)
if err != nil {
return nil, err
}
return
}

43
cmd/config_test.go Normal file
View File

@ -0,0 +1,43 @@
package cmd
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestReadConfig(t *testing.T) {
assert := assert.New(t)
config, err := ReadConfigFile("../config_example.toml")
assert.NoError(err)
assert.NotNil(config)
assert.True(config.Respondd.Enable)
assert.Equal([]string{"br-ffhb"}, config.Respondd.Interfaces)
assert.Equal(time.Minute, config.Respondd.CollectInterval.Duration)
assert.Equal(time.Hour*24*7, config.Nodes.PruneAfter.Duration)
assert.Equal(time.Hour*24*7, config.Database.DeleteAfter.Duration)
// Test output plugins
assert.Len(config.Nodes.Output, 3)
outputs := config.Nodes.Output["meshviewer"].([]interface{})
assert.Len(outputs, 1)
meshviewer := outputs[0]
assert.EqualValues(map[string]interface{}{
"version": int64(2),
"enable": false,
"nodes_path": "/var/www/html/meshviewer/data/nodes.json",
"graph_path": "/var/www/html/meshviewer/data/graph.json",
}, meshviewer)
_, err = ReadConfigFile("testdata/config_invalid.toml")
assert.Error(err, "not unmarshalable")
assert.Contains(err.Error(), "invalid TOML syntax")
_, err = ReadConfigFile("testdata/adsa.toml")
assert.Error(err, "not found able")
assert.Contains(err.Error(), "no such file or directory")
}

View File

@ -3,8 +3,7 @@ package cmd
import ( import (
"log" "log"
"github.com/FreifunkBremen/yanic/database" allDatabase "github.com/FreifunkBremen/yanic/database/all"
"github.com/FreifunkBremen/yanic/database/all"
"github.com/FreifunkBremen/yanic/rrd" "github.com/FreifunkBremen/yanic/rrd"
"github.com/FreifunkBremen/yanic/runtime" "github.com/FreifunkBremen/yanic/runtime"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -21,12 +20,11 @@ var importCmd = &cobra.Command{
site := args[1] site := args[1]
config := loadConfig() config := loadConfig()
connections, err := all.Connect(config.Database.Connection) err := allDatabase.Start(config.Database)
if err != nil { if err != nil {
panic(err) panic(err)
} }
database.Start(connections, config) defer allDatabase.Close()
defer database.Close(connections)
log.Println("importing RRD from", path) log.Println("importing RRD from", path)

View File

@ -24,7 +24,7 @@ var queryCmd = &cobra.Command{
log.Printf("Sending request address=%s iface=%s", dstAddress, iface) log.Printf("Sending request address=%s iface=%s", dstAddress, iface)
nodes := runtime.NewNodes(&runtime.Config{}) nodes := runtime.NewNodes(&runtime.NodesConfig{})
collector := respond.NewCollector(nil, nodes, []string{}, []string{iface}, 0) collector := respond.NewCollector(nil, nodes, []string{}, []string{iface}, 0)
defer collector.Close() defer collector.Close()

View File

@ -7,9 +7,7 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/FreifunkBremen/yanic/database"
allDatabase "github.com/FreifunkBremen/yanic/database/all" allDatabase "github.com/FreifunkBremen/yanic/database/all"
"github.com/FreifunkBremen/yanic/output"
allOutput "github.com/FreifunkBremen/yanic/output/all" allOutput "github.com/FreifunkBremen/yanic/output/all"
"github.com/FreifunkBremen/yanic/respond" "github.com/FreifunkBremen/yanic/respond"
"github.com/FreifunkBremen/yanic/runtime" "github.com/FreifunkBremen/yanic/runtime"
@ -25,22 +23,20 @@ var serveCmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
config := loadConfig() config := loadConfig()
connections, err := allDatabase.Connect(config.Database.Connection) err := allDatabase.Start(config.Database)
if err != nil { if err != nil {
panic(err) panic(err)
} }
database.Start(connections, config) defer allDatabase.Close()
defer database.Close(connections)
nodes = runtime.NewNodes(config) nodes = runtime.NewNodes(&config.Nodes)
nodes.Start() nodes.Start()
outputs, err := allOutput.Register(config.Nodes.Output) err = allOutput.Start(nodes, config.Nodes)
if err != nil { if err != nil {
panic(err) panic(err)
} }
output.Start(outputs, nodes, config) defer allOutput.Close()
defer output.Close()
if config.Webserver.Enable { if config.Webserver.Enable {
log.Println("starting webserver on", config.Webserver.Bind) log.Println("starting webserver on", config.Webserver.Bind)

1
cmd/testdata/config_invalid.toml vendored Normal file
View File

@ -0,0 +1 @@
foobar

View File

@ -0,0 +1,75 @@
package all
import (
"fmt"
"log"
"time"
"github.com/FreifunkBremen/yanic/database"
"github.com/FreifunkBremen/yanic/runtime"
)
type Connection struct {
database.Connection
list []database.Connection
}
func Connect(allConnection map[string]interface{}) (database.Connection, error) {
var list []database.Connection
for dbType, conn := range database.Adapters {
configForType := allConnection[dbType]
if configForType == nil {
log.Printf("the output type '%s' has no configuration", dbType)
continue
}
dbConfigs, ok := configForType.([]map[string]interface{})
if !ok {
return nil, fmt.Errorf("the output type '%s' has the wrong format", dbType)
}
for _, config := range dbConfigs {
if c, ok := config["enable"].(bool); ok && !c {
continue
}
connected, err := conn(config)
if err != nil {
return nil, err
}
if connected == nil {
continue
}
list = append(list, connected)
}
}
return &Connection{list: list}, nil
}
func (conn *Connection) InsertNode(node *runtime.Node) {
for _, item := range conn.list {
item.InsertNode(node)
}
}
func (conn *Connection) InsertLink(link *runtime.Link, time time.Time) {
for _, item := range conn.list {
item.InsertLink(link, time)
}
}
func (conn *Connection) InsertGlobals(stats *runtime.GlobalStats, time time.Time, site string) {
for _, item := range conn.list {
item.InsertGlobals(stats, time, site)
}
}
func (conn *Connection) PruneNodes(deleteAfter time.Duration) {
for _, item := range conn.list {
item.PruneNodes(deleteAfter)
}
}
func (conn *Connection) Close() {
for _, item := range conn.list {
item.Close()
}
}

View File

@ -1,74 +1,45 @@
package all package all
import ( import (
"log" "sync"
"time" "time"
"github.com/FreifunkBremen/yanic/database" "github.com/FreifunkBremen/yanic/database"
"github.com/FreifunkBremen/yanic/runtime"
) )
type Connection struct { var conn database.Connection
database.Connection var wg = sync.WaitGroup{}
list []database.Connection var quit chan struct{}
}
func Connect(allConnection map[string]interface{}) (database.Connection, error) { func Start(config database.Config) (err error) {
var list []database.Connection conn, err = Connect(config.Connection)
for dbType, conn := range database.Adapters {
configForType := allConnection[dbType]
if configForType == nil {
log.Printf("the output type '%s' has no configuration\n", dbType)
continue
}
dbConfigs, ok := configForType.([]map[string]interface{})
if !ok {
log.Panicf("the output type '%s' has the wrong format\n", dbType)
}
for _, config := range dbConfigs {
if c, ok := config["enable"].(bool); ok && !c {
continue
}
connected, err := conn(config)
if err != nil { if err != nil {
return nil, err return
} }
if connected == nil { quit = make(chan struct{})
continue wg.Add(1)
} go deleteWorker(config.DeleteInterval.Duration, config.DeleteAfter.Duration)
list = append(list, connected) return
}
}
return &Connection{list: list}, nil
} }
func (conn *Connection) InsertNode(node *runtime.Node) { func Close() {
for _, item := range conn.list { close(quit)
item.InsertNode(node) wg.Wait()
} conn.Close()
quit = nil
} }
func (conn *Connection) InsertLink(link *runtime.Link, time time.Time) { // prunes node-specific data periodically
for _, item := range conn.list { func deleteWorker(deleteInterval time.Duration, deleteAfter time.Duration) {
item.InsertLink(link, time) ticker := time.NewTicker(deleteInterval)
} for {
} select {
case <-ticker.C:
func (conn *Connection) InsertGlobals(stats *runtime.GlobalStats, time time.Time, site string) { conn.PruneNodes(deleteAfter)
for _, item := range conn.list { case <-quit:
item.InsertGlobals(stats, time, site) ticker.Stop()
} wg.Done()
} return
}
func (conn *Connection) PruneNodes(deleteAfter time.Duration) {
for _, item := range conn.list {
item.PruneNodes(deleteAfter)
}
}
func (conn *Connection) Close() {
for _, item := range conn.list {
item.Close()
} }
} }

View File

@ -2,96 +2,28 @@ package all
import ( import (
"errors" "errors"
"sync"
"testing" "testing"
"time" "time"
"github.com/FreifunkBremen/yanic/database" "github.com/FreifunkBremen/yanic/database"
"github.com/FreifunkBremen/yanic/runtime" "github.com/FreifunkBremen/yanic/lib/duration"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
type testConn struct {
database.Connection
countNode int
countLink int
countGlobals int
countPrune int
countClose int
sync.Mutex
}
func (c *testConn) InsertNode(node *runtime.Node) {
c.Lock()
c.countNode++
c.Unlock()
}
func (c *testConn) GetNode() int {
c.Lock()
defer c.Unlock()
return c.countNode
}
func (c *testConn) InsertLink(link *runtime.Link, time time.Time) {
c.Lock()
c.countLink++
c.Unlock()
}
func (c *testConn) GetLink() int {
c.Lock()
defer c.Unlock()
return c.countLink
}
func (c *testConn) InsertGlobals(stats *runtime.GlobalStats, time time.Time, site string) {
c.Lock()
c.countGlobals++
c.Unlock()
}
func (c *testConn) GetGlobal() int {
c.Lock()
defer c.Unlock()
return c.countGlobals
}
func (c *testConn) PruneNodes(time.Duration) {
c.Lock()
c.countPrune++
c.Unlock()
}
func (c *testConn) GetPrune() int {
c.Lock()
defer c.Unlock()
return c.countPrune
}
func (c *testConn) Close() {
c.Lock()
c.countClose++
c.Unlock()
}
func (c *testConn) GetClose() int {
c.Lock()
defer c.Unlock()
return c.countClose
}
func TestStart(t *testing.T) { func TestStart(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
globalConn := &testConn{}
database.RegisterAdapter("a", func(config map[string]interface{}) (database.Connection, error) {
return globalConn, nil
})
database.RegisterAdapter("b", func(config map[string]interface{}) (database.Connection, error) {
return globalConn, nil
})
database.RegisterAdapter("c", func(config map[string]interface{}) (database.Connection, error) {
return globalConn, nil
})
database.RegisterAdapter("d", func(config map[string]interface{}) (database.Connection, error) { database.RegisterAdapter("d", func(config map[string]interface{}) (database.Connection, error) {
return nil, nil return nil, nil
}) })
database.RegisterAdapter("e", func(config map[string]interface{}) (database.Connection, error) { database.RegisterAdapter("e", func(config map[string]interface{}) (database.Connection, error) {
return nil, errors.New("blub") return nil, errors.New("blub")
}) })
allConn, err := Connect(map[string]interface{}{ // Test for PruneNodes (by start)
assert.Nil(quit)
err := Start(database.Config{
DeleteInterval: duration.Duration{Duration: time.Millisecond},
Connection: map[string]interface{}{
"a": []map[string]interface{}{ "a": []map[string]interface{}{
map[string]interface{}{ map[string]interface{}{
"enable": false, "enable": false,
@ -117,29 +49,12 @@ func TestStart(t *testing.T) {
"path": "d0", "path": "d0",
}, },
}, },
},
}) })
assert.NoError(err) assert.NoError(err)
assert.NotNil(quit)
assert.Equal(0, globalConn.GetNode()) // connection type not found
allConn.InsertNode(nil)
assert.Equal(3, globalConn.GetNode())
assert.Equal(0, globalConn.GetLink())
allConn.InsertLink(nil, time.Now())
assert.Equal(3, globalConn.GetLink())
assert.Equal(0, globalConn.GetGlobal())
allConn.InsertGlobals(nil, time.Now(), runtime.GLOBAL_SITE)
assert.Equal(3, globalConn.GetGlobal())
assert.Equal(0, globalConn.GetPrune())
allConn.PruneNodes(time.Second)
assert.Equal(3, globalConn.GetPrune())
assert.Equal(0, globalConn.GetClose())
allConn.Close()
assert.Equal(3, globalConn.GetClose())
_, err = Connect(map[string]interface{}{ _, err = Connect(map[string]interface{}{
"e": []map[string]interface{}{ "e": []map[string]interface{}{
map[string]interface{}{}, map[string]interface{}{},
@ -147,10 +62,14 @@ func TestStart(t *testing.T) {
}) })
assert.Error(err) assert.Error(err)
// wrong format -> the only panic in Register // test close
assert.Panics(func() { Close()
Connect(map[string]interface{}{
// wrong format
err = Start(database.Config{
Connection: map[string]interface{}{
"e": true, "e": true,
},
}) })
}) assert.Error(err)
} }

9
database/config.go Normal file
View File

@ -0,0 +1,9 @@
package database
import "github.com/FreifunkBremen/yanic/lib/duration"
type Config struct {
DeleteInterval duration.Duration `toml:"delete_interval"` // Delete stats of nodes every n minutes
DeleteAfter duration.Duration `toml:"delete_after"` // Delete stats of nodes till now-deletetill n minutes
Connection map[string]interface{}
}

View File

@ -88,7 +88,7 @@ func TestGlobalStats(t *testing.T) {
} }
func createTestNodes() *runtime.Nodes { func createTestNodes() *runtime.Nodes {
nodes := runtime.NewNodes(&runtime.Config{}) nodes := runtime.NewNodes(&runtime.NodesConfig{})
nodeData := &runtime.Node{ nodeData := &runtime.Node{
Online: true, Online: true,

View File

@ -177,7 +177,7 @@ func testPoints(nodes ...*runtime.Node) (points []*client.Point) {
panic(err) panic(err)
} }
nodesList := runtime.NewNodes(&runtime.Config{}) nodesList := runtime.NewNodes(&runtime.NodesConfig{})
// Create dummy connection // Create dummy connection
conn := &Connection{ conn := &Connection{

View File

@ -1,40 +0,0 @@
package database
import (
"time"
"github.com/FreifunkBremen/yanic/runtime"
)
var quit chan struct{}
// Start workers of database
// WARNING: Do not override this function
// you should use New()
func Start(conn Connection, config *runtime.Config) {
quit = make(chan struct{})
go deleteWorker(conn, config.Database.DeleteInterval.Duration, config.Database.DeleteAfter.Duration)
}
func Close(conn Connection) {
if quit != nil {
close(quit)
}
if conn != nil {
conn.Close()
}
}
// prunes node-specific data periodically
func deleteWorker(conn Connection, deleteInterval time.Duration, deleteAfter time.Duration) {
ticker := time.NewTicker(deleteInterval)
for {
select {
case <-ticker.C:
conn.PruneNodes(deleteAfter)
case <-quit:
ticker.Stop()
return
}
}
}

View File

@ -1,69 +0,0 @@
package database
import (
"sync"
"testing"
"time"
"github.com/FreifunkBremen/yanic/runtime"
"github.com/stretchr/testify/assert"
)
type testConn struct {
Connection
countClose int
countPrune int
sync.Mutex
}
func (c *testConn) Close() {
c.Lock()
c.countClose++
c.Unlock()
}
func (c *testConn) GetClose() int {
c.Lock()
defer c.Unlock()
return c.countClose
}
func (c *testConn) PruneNodes(time.Duration) {
c.Lock()
c.countPrune++
c.Unlock()
}
func (c *testConn) GetPruneNodes() int {
c.Lock()
defer c.Unlock()
return c.countPrune
}
func TestStart(t *testing.T) {
assert := assert.New(t)
conn := &testConn{}
config := &runtime.Config{
Database: struct {
DeleteInterval runtime.Duration `toml:"delete_interval"`
DeleteAfter runtime.Duration `toml:"delete_after"`
Connection map[string]interface{}
}{
DeleteInterval: runtime.Duration{Duration: time.Millisecond * 10},
},
}
assert.Nil(quit)
Start(conn, config)
assert.NotNil(quit)
assert.Equal(0, conn.GetPruneNodes())
time.Sleep(time.Millisecond * 12)
assert.Equal(1, conn.GetPruneNodes())
assert.Equal(0, conn.GetClose())
Close(conn)
assert.NotNil(quit)
assert.Equal(1, conn.GetClose())
time.Sleep(time.Millisecond * 12) // to reach timer.Stop() line
}

View File

@ -1,4 +1,4 @@
package runtime package duration
import ( import (
"fmt" "fmt"
@ -18,14 +18,8 @@ type Duration struct {
} }
// UnmarshalTOML parses a duration string. // UnmarshalTOML parses a duration string.
func (d *Duration) UnmarshalTOML(dataInterface interface{}) error { func (d *Duration) UnmarshalText(data []byte) error {
var data string
switch dataInterface.(type) {
case string:
data = dataInterface.(string)
default:
return fmt.Errorf("invalid duration: \"%s\"", dataInterface)
}
// " + int + unit + " // " + int + unit + "
if len(data) < 2 { if len(data) < 2 {
return fmt.Errorf("invalid duration: \"%s\"", data) return fmt.Errorf("invalid duration: \"%s\"", data)

View File

@ -1,4 +1,4 @@
package runtime package duration
import ( import (
"testing" "testing"
@ -16,6 +16,8 @@ func TestDuration(t *testing.T) {
duration time.Duration duration time.Duration
}{ }{
{"", "invalid duration: \"\"", 0}, {"", "invalid duration: \"\"", 0},
{"3", "invalid duration: \"3\"", 0},
{"am", "unable to parse duration \"am\": strconv.Atoi: parsing \"a\": invalid syntax", 0},
{"1x", "invalid duration unit \"x\"", 0}, {"1x", "invalid duration unit \"x\"", 0},
{"1s", "", time.Second}, {"1s", "", time.Second},
{"73s", "", time.Second * 73}, {"73s", "", time.Second * 73},
@ -34,7 +36,7 @@ func TestDuration(t *testing.T) {
for _, test := range tests { for _, test := range tests {
d := Duration{} d := Duration{}
err := d.UnmarshalTOML(test.input) err := d.UnmarshalText([]byte(test.input))
duration := d.Duration duration := d.Duration
if test.err == "" { if test.err == "" {
@ -44,13 +46,4 @@ func TestDuration(t *testing.T) {
assert.EqualError(err, test.err) assert.EqualError(err, test.err)
} }
} }
d := Duration{}
err := d.UnmarshalTOML(3)
assert.Error(err)
assert.Contains(err.Error(), "invalid duration")
err = d.UnmarshalTOML("am")
assert.Error(err)
assert.EqualError(err, "unable to parse duration \"am\": strconv.Atoi: parsing \"a\": invalid syntax")
} }

View File

@ -1,6 +1,8 @@
package all package all
import "github.com/FreifunkBremen/yanic/runtime" import (
"github.com/FreifunkBremen/yanic/runtime"
)
// Config Filter // Config Filter
type filterConfig map[string]interface{} type filterConfig map[string]interface{}
@ -13,7 +15,7 @@ func noFilter(node *runtime.Node) *runtime.Node {
// Create Filter // Create Filter
func (f filterConfig) filtering(nodesOrigin *runtime.Nodes) *runtime.Nodes { func (f filterConfig) filtering(nodesOrigin *runtime.Nodes) *runtime.Nodes {
nodes := runtime.NewNodes(&runtime.Config{}) nodes := runtime.NewNodes(&runtime.NodesConfig{})
filterfuncs := []filterFunc{ filterfuncs := []filterFunc{
f.HasLocation(), f.HasLocation(),
f.Blacklist(), f.Blacklist(),

View File

@ -1,63 +1,45 @@
package all package all
import ( import (
"log" "sync"
"time"
"github.com/FreifunkBremen/yanic/output" "github.com/FreifunkBremen/yanic/output"
"github.com/FreifunkBremen/yanic/runtime" "github.com/FreifunkBremen/yanic/runtime"
) )
type Output struct { var quit chan struct{}
output.Output var wg = sync.WaitGroup{}
list map[int]output.Output var outputA output.Output
filter map[int]filterConfig
}
func Register(configuration map[string]interface{}) (output.Output, error) { func Start(nodes *runtime.Nodes, config runtime.NodesConfig) (err error) {
list := make(map[int]output.Output) outputA, err = Register(config.Output)
filter := make(map[int]filterConfig)
i := 1
allOutputs := configuration
for outputType, outputRegister := range output.Adapters {
configForOutput := allOutputs[outputType]
if configForOutput == nil {
log.Printf("the output type '%s' has no configuration\n", outputType)
continue
}
outputConfigs, ok := configForOutput.([]map[string]interface{})
if !ok {
log.Panicf("the output type '%s' has the wrong format\n", outputType)
}
for _, config := range outputConfigs {
if c, ok := config["enable"].(bool); ok && !c {
continue
}
output, err := outputRegister(config)
if err != nil { if err != nil {
return nil, err return
} }
if output == nil { quit = make(chan struct{})
continue wg.Add(1)
} go saveWorker(nodes, config.SaveInterval.Duration)
list[i] = output return
if c := config["filter"]; c != nil {
filter[i] = config["filter"].(map[string]interface{})
}
i++
}
}
return &Output{list: list, filter: filter}, nil
} }
func (o *Output) Save(nodes *runtime.Nodes) { func Close() {
for i, item := range o.list { close(quit)
var filteredNodes *runtime.Nodes wg.Wait()
if config := o.filter[i]; config != nil { quit = nil
filteredNodes = config.filtering(nodes) }
} else {
filteredNodes = filterConfig{}.filtering(nodes)
}
item.Save(filteredNodes) // save periodically to output
func saveWorker(nodes *runtime.Nodes, saveInterval time.Duration) {
ticker := time.NewTicker(saveInterval)
for {
select {
case <-ticker.C:
outputA.Save(nodes)
case <-quit:
ticker.Stop()
wg.Done()
return
}
} }
} }

64
output/all/output.go Normal file
View File

@ -0,0 +1,64 @@
package all
import (
"fmt"
"log"
"github.com/FreifunkBremen/yanic/output"
"github.com/FreifunkBremen/yanic/runtime"
)
type Output struct {
output.Output
list map[int]output.Output
filter map[int]filterConfig
}
func Register(configuration map[string]interface{}) (output.Output, error) {
list := make(map[int]output.Output)
filter := make(map[int]filterConfig)
i := 1
allOutputs := configuration
for outputType, outputRegister := range output.Adapters {
configForOutput := allOutputs[outputType]
if configForOutput == nil {
log.Printf("the output type '%s' has no configuration\n", outputType)
continue
}
outputConfigs, ok := configForOutput.([]map[string]interface{})
if !ok {
return nil, fmt.Errorf("the output type '%s' has the wrong format", outputType)
}
for _, config := range outputConfigs {
if c, ok := config["enable"].(bool); ok && !c {
continue
}
output, err := outputRegister(config)
if err != nil {
return nil, err
}
if output == nil {
continue
}
list[i] = output
if c := config["filter"]; c != nil {
filter[i] = config["filter"].(map[string]interface{})
}
i++
}
}
return &Output{list: list, filter: filter}, nil
}
func (o *Output) Save(nodes *runtime.Nodes) {
for i, item := range o.list {
var filteredNodes *runtime.Nodes
if config := o.filter[i]; config != nil {
filteredNodes = config.filtering(nodes)
} else {
filteredNodes = filterConfig{}.filtering(nodes)
}
item.Save(filteredNodes)
}
}

View File

@ -89,10 +89,9 @@ func TestStart(t *testing.T) {
}) })
assert.Error(err) assert.Error(err)
// wrong format -> the only panic in Register // wrong format
assert.Panics(func() { _, err = Register(map[string]interface{}{
Register(map[string]interface{}{
"e": true, "e": true,
}) })
}) assert.Error(err)
} }

View File

@ -1,40 +0,0 @@
package output
import (
"sync"
"time"
"github.com/FreifunkBremen/yanic/runtime"
)
var quit chan struct{}
var wg = sync.WaitGroup{}
// Start workers of database
// WARNING: Do not override this function
// you should use New()
func Start(output Output, nodes *runtime.Nodes, config *runtime.Config) {
quit = make(chan struct{})
wg.Add(1)
go saveWorker(output, nodes, config.Nodes.SaveInterval.Duration)
}
func Close() {
close(quit)
wg.Wait()
}
// save periodically to output
func saveWorker(output Output, nodes *runtime.Nodes, saveInterval time.Duration) {
ticker := time.NewTicker(saveInterval)
for {
select {
case <-ticker.C:
output.Save(nodes)
case <-quit:
wg.Done()
ticker.Stop()
return
}
}
}

View File

@ -1,57 +0,0 @@
package output
import (
"sync"
"testing"
"time"
"github.com/FreifunkBremen/yanic/runtime"
"github.com/stretchr/testify/assert"
)
type testConn struct {
Output
countSave int
sync.Mutex
}
func (c *testConn) Save(nodes *runtime.Nodes) {
c.Lock()
c.countSave++
c.Unlock()
}
func (c *testConn) Get() int {
c.Lock()
defer c.Unlock()
return c.countSave
}
func TestStart(t *testing.T) {
assert := assert.New(t)
conn := &testConn{}
config := &runtime.Config{
Nodes: struct {
StatePath string `toml:"state_path"`
SaveInterval runtime.Duration `toml:"save_interval"`
OfflineAfter runtime.Duration `toml:"offline_after"`
PruneAfter runtime.Duration `toml:"prune_after"`
Output map[string]interface{}
}{
SaveInterval: runtime.Duration{Duration: time.Millisecond * 10},
},
}
assert.Nil(quit)
Start(conn, nil, config)
assert.NotNil(quit)
assert.Equal(0, conn.Get())
time.Sleep(time.Millisecond * 12)
assert.Equal(1, conn.Get())
time.Sleep(time.Millisecond * 12)
Close()
assert.Equal(2, conn.Get())
}

View File

@ -4,7 +4,7 @@ import (
"fmt" "fmt"
"strings" "strings"
"github.com/FreifunkBremen/yanic/jsontime" "github.com/FreifunkBremen/yanic/lib/jsontime"
"github.com/FreifunkBremen/yanic/runtime" "github.com/FreifunkBremen/yanic/runtime"
) )

View File

@ -11,7 +11,7 @@ import (
func TestTransform(t *testing.T) { func TestTransform(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
nodes := runtime.NewNodes(&runtime.Config{}) nodes := runtime.NewNodes(&runtime.NodesConfig{})
nodes.AddNode(&runtime.Node{ nodes.AddNode(&runtime.Node{
Online: true, Online: true,
Nodeinfo: &data.NodeInfo{ Nodeinfo: &data.NodeInfo{

View File

@ -3,7 +3,7 @@ package meshviewerFFRGB
import ( import (
"time" "time"
"github.com/FreifunkBremen/yanic/jsontime" "github.com/FreifunkBremen/yanic/lib/jsontime"
"github.com/FreifunkBremen/yanic/runtime" "github.com/FreifunkBremen/yanic/runtime"
) )

View File

@ -10,7 +10,7 @@ import (
func TestRegister(t *testing.T) { func TestRegister(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
nodes := runtime.NewNodes(&runtime.Config{}) nodes := runtime.NewNodes(&runtime.NodesConfig{})
node := NewNode(nodes, &runtime.Node{ node := NewNode(nodes, &runtime.Node{
Nodeinfo: &data.NodeInfo{ Nodeinfo: &data.NodeInfo{
Owner: &data.Owner{ Owner: &data.Owner{

View File

@ -33,7 +33,7 @@ func TestGenerateGraph(t *testing.T) {
func testGetNodesByFile(files ...string) *runtime.Nodes { func testGetNodesByFile(files ...string) *runtime.Nodes {
nodes := runtime.NewNodes(&runtime.Config{}) nodes := runtime.NewNodes(&runtime.NodesConfig{})
for _, file := range files { for _, file := range files {
node := testGetNodeByFile(file) node := testGetNodeByFile(file)

View File

@ -2,7 +2,7 @@ package meshviewer
import ( import (
"github.com/FreifunkBremen/yanic/data" "github.com/FreifunkBremen/yanic/data"
"github.com/FreifunkBremen/yanic/jsontime" "github.com/FreifunkBremen/yanic/lib/jsontime"
) )
// Node struct // Node struct

View File

@ -23,7 +23,7 @@ func TestNodesV2(t *testing.T) {
} }
func createTestNodes() *runtime.Nodes { func createTestNodes() *runtime.Nodes {
nodes := runtime.NewNodes(&runtime.Config{}) nodes := runtime.NewNodes(&runtime.NodesConfig{})
nodeData := &runtime.Node{ nodeData := &runtime.Node{
Statistics: &data.Statistics{ Statistics: &data.Statistics{

View File

@ -1,7 +1,7 @@
package meshviewer package meshviewer
import ( import (
"github.com/FreifunkBremen/yanic/jsontime" "github.com/FreifunkBremen/yanic/lib/jsontime"
"github.com/FreifunkBremen/yanic/runtime" "github.com/FreifunkBremen/yanic/runtime"
) )

View File

@ -1,7 +1,7 @@
package meshviewer package meshviewer
import ( import (
"github.com/FreifunkBremen/yanic/jsontime" "github.com/FreifunkBremen/yanic/lib/jsontime"
"github.com/FreifunkBremen/yanic/runtime" "github.com/FreifunkBremen/yanic/runtime"
) )

View File

@ -1,7 +1,7 @@
package nodelist package nodelist
import ( import (
"github.com/FreifunkBremen/yanic/jsontime" "github.com/FreifunkBremen/yanic/lib/jsontime"
"github.com/FreifunkBremen/yanic/runtime" "github.com/FreifunkBremen/yanic/runtime"
) )

View File

@ -17,7 +17,7 @@ func TestTransform(t *testing.T) {
} }
func createTestNodes() *runtime.Nodes { func createTestNodes() *runtime.Nodes {
nodes := runtime.NewNodes(&runtime.Config{}) nodes := runtime.NewNodes(&runtime.NodesConfig{})
nodeData := &runtime.Node{ nodeData := &runtime.Node{
Statistics: &data.Statistics{ Statistics: &data.Statistics{

View File

@ -11,7 +11,7 @@ import (
"github.com/FreifunkBremen/yanic/data" "github.com/FreifunkBremen/yanic/data"
"github.com/FreifunkBremen/yanic/database" "github.com/FreifunkBremen/yanic/database"
"github.com/FreifunkBremen/yanic/jsontime" "github.com/FreifunkBremen/yanic/lib/jsontime"
"github.com/FreifunkBremen/yanic/runtime" "github.com/FreifunkBremen/yanic/runtime"
) )
@ -30,7 +30,7 @@ type Collector struct {
} }
// NewCollector creates a Collector struct // NewCollector creates a Collector struct
func NewCollector(db database.Connection, nodes *runtime.Nodes, sites []string , ifaces []string, port int) *Collector { func NewCollector(db database.Connection, nodes *runtime.Nodes, sites []string, ifaces []string, port int) *Collector {
coll := &Collector{ coll := &Collector{
db: db, db: db,

View File

@ -12,7 +12,7 @@ import (
const SITE_TEST = "ffxx" const SITE_TEST = "ffxx"
func TestCollector(t *testing.T) { func TestCollector(t *testing.T) {
nodes := runtime.NewNodes(&runtime.Config{}) nodes := runtime.NewNodes(&runtime.NodesConfig{})
collector := NewCollector(nil, nodes, []string{SITE_TEST}, []string{}, 10001) collector := NewCollector(nil, nodes, []string{SITE_TEST}, []string{}, 10001)
collector.Start(time.Millisecond) collector.Start(time.Millisecond)

12
respond/config.go Normal file
View File

@ -0,0 +1,12 @@
package respond
import "github.com/FreifunkBremen/yanic/lib/duration"
type Config struct {
Enable bool `toml:"enable"`
Synchronize duration.Duration `toml:"synchronize"`
Interfaces []string `toml:"interfaces"`
Sites []string `toml:"sites"`
Port int `toml:"port"`
CollectInterval duration.Duration `toml:"collect_interval"`
}

View File

@ -1,58 +0,0 @@
package runtime
import (
"io/ioutil"
"github.com/BurntSushi/toml"
)
//Config the config File of this daemon
type Config struct {
Respondd struct {
Enable bool `toml:"enable"`
Synchronize Duration `toml:"synchronize"`
Interfaces []string `toml:"interfaces"`
Sites []string `toml:"sites"`
Port int `toml:"port"`
CollectInterval Duration `toml:"collect_interval"`
}
Webserver struct {
Enable bool `toml:"enable"`
Bind string `toml:"bind"`
Webroot string `toml:"webroot"`
}
Nodes struct {
StatePath string `toml:"state_path"`
SaveInterval Duration `toml:"save_interval"` // Save nodes periodically
OfflineAfter Duration `toml:"offline_after"` // Set node to offline if not seen within this period
PruneAfter Duration `toml:"prune_after"` // Remove nodes after n days of inactivity
Output map[string]interface{}
}
Meshviewer struct {
Version int `toml:"version"`
NodesPath string `toml:"nodes_path"`
GraphPath string `toml:"graph_path"`
}
Database struct {
DeleteInterval Duration `toml:"delete_interval"` // Delete stats of nodes every n minutes
DeleteAfter Duration `toml:"delete_after"` // Delete stats of nodes till now-deletetill n minutes
Connection map[string]interface{}
}
}
// ReadConfigFile reads a config model from path of a yml file
func ReadConfigFile(path string) (config *Config, err error) {
config = &Config{}
file, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
}
err = toml.Unmarshal(file, config)
if err != nil {
return nil, err
}
return
}

View File

@ -1,52 +0,0 @@
package runtime
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestReadConfig(t *testing.T) {
assert := assert.New(t)
config, err := ReadConfigFile("../config_example.toml")
assert.NoError(err, "no error during reading")
assert.NotNil(config)
assert.True(config.Respondd.Enable)
assert.Equal([]string{"br-ffhb"}, config.Respondd.Interfaces)
assert.Equal(time.Minute, config.Respondd.CollectInterval.Duration)
assert.Equal(time.Hour*24*7, config.Nodes.PruneAfter.Duration)
assert.Equal(time.Hour*24*7, config.Database.DeleteAfter.Duration)
var meshviewer map[string]interface{}
var outputs []map[string]interface{}
outputs = config.Nodes.Output["meshviewer"].([]map[string]interface{})
assert.Len(outputs, 1, "more outputs are given")
meshviewer = outputs[0]
assert.Equal(int64(2), meshviewer["version"])
assert.Equal("/var/www/html/meshviewer/data/nodes.json", meshviewer["nodes_path"])
var influxdb map[string]interface{}
dbs := config.Database.Connection["influxdb"].([]map[string]interface{})
assert.Len(dbs, 1, "more influxdb are given")
influxdb = dbs[0]
assert.Equal(influxdb["database"], "ffhb")
var graphitedb map[string]interface{}
dbs = config.Database.Connection["graphite"].([]map[string]interface{})
assert.Len(dbs, 1, "more graphitedb are given")
graphitedb = dbs[0]
assert.Equal(graphitedb["address"], "localhost:2003")
_, err = ReadConfigFile("testdata/config_failed.toml")
assert.Error(err, "not unmarshalable")
assert.Contains(err.Error(), "Near line ")
_, err = ReadConfigFile("testdata/adsa.toml")
assert.Error(err, "not found able")
assert.Contains(err.Error(), "no such file or directory")
}

View File

@ -4,7 +4,7 @@ import (
"net" "net"
"github.com/FreifunkBremen/yanic/data" "github.com/FreifunkBremen/yanic/data"
"github.com/FreifunkBremen/yanic/jsontime" "github.com/FreifunkBremen/yanic/lib/jsontime"
) )
// Node struct // Node struct

View File

@ -8,26 +8,26 @@ import (
"time" "time"
"github.com/FreifunkBremen/yanic/data" "github.com/FreifunkBremen/yanic/data"
"github.com/FreifunkBremen/yanic/jsontime" "github.com/FreifunkBremen/yanic/lib/jsontime"
) )
// Nodes struct: cache DB of Node's structs // Nodes struct: cache DB of Node's structs
type Nodes struct { type Nodes struct {
List map[string]*Node `json:"nodes"` // the current nodemap, indexed by node ID List map[string]*Node `json:"nodes"` // the current nodemap, indexed by node ID
ifaceToNodeID map[string]string // mapping from MAC address to NodeID ifaceToNodeID map[string]string // mapping from MAC address to NodeID
config *Config config *NodesConfig
sync.RWMutex sync.RWMutex
} }
// NewNodes create Nodes structs // NewNodes create Nodes structs
func NewNodes(config *Config) *Nodes { func NewNodes(config *NodesConfig) *Nodes {
nodes := &Nodes{ nodes := &Nodes{
List: make(map[string]*Node), List: make(map[string]*Node),
ifaceToNodeID: make(map[string]string), ifaceToNodeID: make(map[string]string),
config: config, config: config,
} }
if config.Nodes.StatePath != "" { if config.StatePath != "" {
nodes.load() nodes.load()
} }
@ -130,7 +130,7 @@ func (nodes *Nodes) NodeLinks(node *Node) (result []Link) {
// Periodically saves the cached DB to json file // Periodically saves the cached DB to json file
func (nodes *Nodes) worker() { func (nodes *Nodes) worker() {
c := time.Tick(nodes.config.Nodes.SaveInterval.Duration) c := time.Tick(nodes.config.SaveInterval.Duration)
for range c { for range c {
nodes.expire() nodes.expire()
@ -143,14 +143,14 @@ func (nodes *Nodes) expire() {
now := jsontime.Now() now := jsontime.Now()
// Nodes last seen before expireAfter will be removed // Nodes last seen before expireAfter will be removed
prunePeriod := nodes.config.Nodes.PruneAfter.Duration prunePeriod := nodes.config.PruneAfter.Duration
if prunePeriod == 0 { if prunePeriod == 0 {
prunePeriod = time.Hour * 24 * 7 // our default prunePeriod = time.Hour * 24 * 7 // our default
} }
pruneAfter := now.Add(-prunePeriod) pruneAfter := now.Add(-prunePeriod)
// Nodes last seen within OfflineAfter are changed to 'offline' // Nodes last seen within OfflineAfter are changed to 'offline'
offlineAfter := now.Add(-nodes.config.Nodes.OfflineAfter.Duration) offlineAfter := now.Add(-nodes.config.OfflineAfter.Duration)
// Locking foo // Locking foo
nodes.Lock() nodes.Lock()
@ -194,7 +194,7 @@ func (nodes *Nodes) readIfaces(nodeinfo *data.NodeInfo) {
} }
func (nodes *Nodes) load() { func (nodes *Nodes) load() {
path := nodes.config.Nodes.StatePath path := nodes.config.StatePath
if f, err := os.Open(path); err == nil { // transform data to legacy meshviewer if f, err := os.Open(path); err == nil { // transform data to legacy meshviewer
if err = json.NewDecoder(f).Decode(nodes); err == nil { if err = json.NewDecoder(f).Decode(nodes); err == nil {
@ -222,7 +222,7 @@ func (nodes *Nodes) save() {
defer nodes.RUnlock() defer nodes.RUnlock()
// serialize nodes // serialize nodes
SaveJSON(nodes, nodes.config.Nodes.StatePath) SaveJSON(nodes, nodes.config.StatePath)
} }
// SaveJSON to path // SaveJSON to path

11
runtime/nodes_config.go Normal file
View File

@ -0,0 +1,11 @@
package runtime
import "github.com/FreifunkBremen/yanic/lib/duration"
type NodesConfig struct {
StatePath string `toml:"state_path"`
SaveInterval duration.Duration `toml:"save_interval"` // Save nodes periodically
OfflineAfter duration.Duration `toml:"offline_after"` // Set node to offline if not seen within this period
PruneAfter duration.Duration `toml:"prune_after"` // Remove nodes after n days of inactivity
Output map[string]interface{}
}

View File

@ -9,15 +9,15 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/FreifunkBremen/yanic/data" "github.com/FreifunkBremen/yanic/data"
"github.com/FreifunkBremen/yanic/jsontime" "github.com/FreifunkBremen/yanic/lib/jsontime"
) )
func TestExpire(t *testing.T) { func TestExpire(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
config := &Config{} config := &NodesConfig{}
config.Nodes.OfflineAfter.Duration = time.Minute * 10 config.OfflineAfter.Duration = time.Minute * 10
// to get default (100%) path of testing // to get default (100%) path of testing
// config.Nodes.PruneAfter.Duration = time.Hour * 24 * 6 // config.PruneAfter.Duration = time.Hour * 24 * 6
nodes := &Nodes{ nodes := &Nodes{
config: config, config: config,
List: make(map[string]*Node), List: make(map[string]*Node),
@ -51,22 +51,22 @@ func TestExpire(t *testing.T) {
func TestLoadAndSave(t *testing.T) { func TestLoadAndSave(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
config := &Config{} config := &NodesConfig{}
// not autoload without StatePath // not autoload without StatePath
NewNodes(config) NewNodes(config)
// Test unmarshalable /dev/null - autolead with StatePath // Test unmarshalable /dev/null - autolead with StatePath
config.Nodes.StatePath = "/dev/null" config.StatePath = "/dev/null"
nodes := NewNodes(config) nodes := NewNodes(config)
// Test unopen able // Test unopen able
config.Nodes.StatePath = "/root/nodes.json" config.StatePath = "/root/nodes.json"
nodes.load() nodes.load()
// works ;) // works ;)
config.Nodes.StatePath = "testdata/nodes.json" config.StatePath = "testdata/nodes.json"
nodes.load() nodes.load()
tmpfile, _ := ioutil.TempFile("/tmp", "nodes") tmpfile, _ := ioutil.TempFile("/tmp", "nodes")
config.Nodes.StatePath = tmpfile.Name() config.StatePath = tmpfile.Name()
nodes.save() nodes.save()
os.Remove(tmpfile.Name()) os.Remove(tmpfile.Name())
@ -113,8 +113,8 @@ func TestUpdateNodes(t *testing.T) {
func TestSelectNodes(t *testing.T) { func TestSelectNodes(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
config := &Config{} config := &NodesConfig{}
config.Nodes.StatePath = "testdata/nodes.json" config.StatePath = "testdata/nodes.json"
nodes := NewNodes(config) nodes := NewNodes(config)
@ -139,7 +139,7 @@ func TestSelectNodes(t *testing.T) {
func TestAddNode(t *testing.T) { func TestAddNode(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
nodes := NewNodes(&Config{}) nodes := NewNodes(&NodesConfig{})
nodes.AddNode(&Node{}) nodes.AddNode(&Node{})
assert.Len(nodes.List, 0) assert.Len(nodes.List, 0)

View File

@ -54,7 +54,7 @@ func TestGlobalStats(t *testing.T) {
} }
func createTestNodes() *Nodes { func createTestNodes() *Nodes {
nodes := NewNodes(&Config{}) nodes := NewNodes(&NodesConfig{})
nodeData := &Node{ nodeData := &Node{
Online: true, Online: true,

View File

@ -1 +0,0 @@
asdas

7
webserver/config.go Normal file
View File

@ -0,0 +1,7 @@
package webserver
type Config struct {
Enable bool `toml:"enable"`
Bind string `toml:"bind"`
Webroot string `toml:"webroot"`
}