[BUGFIX] tests without sleep

This commit is contained in:
Martin Geno 2018-01-03 15:41:40 +01:00 committed by Julian Kornberger
parent b560e275db
commit cf2d2c3209
13 changed files with 256 additions and 464 deletions

View File

@ -3,8 +3,7 @@ package cmd
import ( import (
"log" "log"
"github.com/FreifunkBremen/yanic/database" allDatabase "github.com/FreifunkBremen/yanic/database/all"
"github.com/FreifunkBremen/yanic/database/all"
"github.com/FreifunkBremen/yanic/rrd" "github.com/FreifunkBremen/yanic/rrd"
"github.com/FreifunkBremen/yanic/runtime" "github.com/FreifunkBremen/yanic/runtime"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -21,12 +20,11 @@ var importCmd = &cobra.Command{
site := args[1] site := args[1]
config := loadConfig() config := loadConfig()
connections, err := all.Connect(config.Database.Connection) err := allDatabase.Start(config.Database)
if err != nil { if err != nil {
panic(err) panic(err)
} }
database.Start(connections, config) defer allDatabase.Close()
defer database.Close(connections)
log.Println("importing RRD from", path) log.Println("importing RRD from", path)

View File

@ -7,9 +7,7 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/FreifunkBremen/yanic/database"
allDatabase "github.com/FreifunkBremen/yanic/database/all" allDatabase "github.com/FreifunkBremen/yanic/database/all"
"github.com/FreifunkBremen/yanic/output"
allOutput "github.com/FreifunkBremen/yanic/output/all" allOutput "github.com/FreifunkBremen/yanic/output/all"
"github.com/FreifunkBremen/yanic/respond" "github.com/FreifunkBremen/yanic/respond"
"github.com/FreifunkBremen/yanic/runtime" "github.com/FreifunkBremen/yanic/runtime"
@ -25,22 +23,20 @@ var serveCmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
config := loadConfig() config := loadConfig()
connections, err := allDatabase.Connect(config.Database.Connection) err := allDatabase.Start(config.Database)
if err != nil { if err != nil {
panic(err) panic(err)
} }
database.Start(connections, config) defer allDatabase.Close()
defer database.Close(connections)
nodes = runtime.NewNodes(config) nodes = runtime.NewNodes(config)
nodes.Start() nodes.Start()
outputs, err := allOutput.Register(config.Nodes.Output) err = allOutput.Start(nodes, config.Nodes)
if err != nil { if err != nil {
panic(err) panic(err)
} }
output.Start(outputs, nodes, config) defer allOutput.Close()
defer output.Close()
if config.Webserver.Enable { if config.Webserver.Enable {
log.Println("starting webserver on", config.Webserver.Bind) log.Println("starting webserver on", config.Webserver.Bind)

View File

@ -0,0 +1,75 @@
package all
import (
"fmt"
"log"
"time"
"github.com/FreifunkBremen/yanic/database"
"github.com/FreifunkBremen/yanic/runtime"
)
type Connection struct {
database.Connection
list []database.Connection
}
func Connect(allConnection map[string]interface{}) (database.Connection, error) {
var list []database.Connection
for dbType, conn := range database.Adapters {
configForType := allConnection[dbType]
if configForType == nil {
log.Printf("the output type '%s' has no configuration", dbType)
continue
}
dbConfigs, ok := configForType.([]map[string]interface{})
if !ok {
return nil, fmt.Errorf("the output type '%s' has the wrong format", dbType)
}
for _, config := range dbConfigs {
if c, ok := config["enable"].(bool); ok && !c {
continue
}
connected, err := conn(config)
if err != nil {
return nil, err
}
if connected == nil {
continue
}
list = append(list, connected)
}
}
return &Connection{list: list}, nil
}
func (conn *Connection) InsertNode(node *runtime.Node) {
for _, item := range conn.list {
item.InsertNode(node)
}
}
func (conn *Connection) InsertLink(link *runtime.Link, time time.Time) {
for _, item := range conn.list {
item.InsertLink(link, time)
}
}
func (conn *Connection) InsertGlobals(stats *runtime.GlobalStats, time time.Time, site string) {
for _, item := range conn.list {
item.InsertGlobals(stats, time, site)
}
}
func (conn *Connection) PruneNodes(deleteAfter time.Duration) {
for _, item := range conn.list {
item.PruneNodes(deleteAfter)
}
}
func (conn *Connection) Close() {
for _, item := range conn.list {
item.Close()
}
}

View File

@ -1,74 +1,41 @@
package all package all
import ( import (
"log"
"time" "time"
"github.com/FreifunkBremen/yanic/database" "github.com/FreifunkBremen/yanic/database"
"github.com/FreifunkBremen/yanic/runtime" "github.com/FreifunkBremen/yanic/runtime"
) )
type Connection struct { var conn database.Connection
database.Connection var quit chan struct{}
list []database.Connection
}
func Connect(allConnection map[string]interface{}) (database.Connection, error) { func Start(config runtime.DatabaseConfig) (err error) {
var list []database.Connection conn, err = Connect(config.Connection)
for dbType, conn := range database.Adapters {
configForType := allConnection[dbType]
if configForType == nil {
log.Printf("the output type '%s' has no configuration\n", dbType)
continue
}
dbConfigs, ok := configForType.([]map[string]interface{})
if !ok {
log.Panicf("the output type '%s' has the wrong format\n", dbType)
}
for _, config := range dbConfigs {
if c, ok := config["enable"].(bool); ok && !c {
continue
}
connected, err := conn(config)
if err != nil { if err != nil {
return nil, err return
} }
if connected == nil { quit = make(chan struct{})
continue go deleteWorker(config.DeleteInterval.Duration, config.DeleteAfter.Duration)
} return
list = append(list, connected)
}
}
return &Connection{list: list}, nil
} }
func (conn *Connection) InsertNode(node *runtime.Node) { func Close() {
for _, item := range conn.list { close(quit)
item.InsertNode(node) conn.Close()
} quit = nil
} }
func (conn *Connection) InsertLink(link *runtime.Link, time time.Time) { // prunes node-specific data periodically
for _, item := range conn.list { func deleteWorker(deleteInterval time.Duration, deleteAfter time.Duration) {
item.InsertLink(link, time) ticker := time.NewTicker(deleteInterval)
for {
select {
case <-ticker.C:
conn.PruneNodes(deleteAfter)
case <-quit:
ticker.Stop()
return
} }
} }
func (conn *Connection) InsertGlobals(stats *runtime.GlobalStats, time time.Time, site string) {
for _, item := range conn.list {
item.InsertGlobals(stats, time, site)
}
}
func (conn *Connection) PruneNodes(deleteAfter time.Duration) {
for _, item := range conn.list {
item.PruneNodes(deleteAfter)
}
}
func (conn *Connection) Close() {
for _, item := range conn.list {
item.Close()
}
} }

View File

@ -2,7 +2,6 @@ package all
import ( import (
"errors" "errors"
"sync"
"testing" "testing"
"time" "time"
@ -11,87 +10,20 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
type testConn struct {
database.Connection
countNode int
countLink int
countGlobals int
countPrune int
countClose int
sync.Mutex
}
func (c *testConn) InsertNode(node *runtime.Node) {
c.Lock()
c.countNode++
c.Unlock()
}
func (c *testConn) GetNode() int {
c.Lock()
defer c.Unlock()
return c.countNode
}
func (c *testConn) InsertLink(link *runtime.Link, time time.Time) {
c.Lock()
c.countLink++
c.Unlock()
}
func (c *testConn) GetLink() int {
c.Lock()
defer c.Unlock()
return c.countLink
}
func (c *testConn) InsertGlobals(stats *runtime.GlobalStats, time time.Time, site string) {
c.Lock()
c.countGlobals++
c.Unlock()
}
func (c *testConn) GetGlobal() int {
c.Lock()
defer c.Unlock()
return c.countGlobals
}
func (c *testConn) PruneNodes(time.Duration) {
c.Lock()
c.countPrune++
c.Unlock()
}
func (c *testConn) GetPrune() int {
c.Lock()
defer c.Unlock()
return c.countPrune
}
func (c *testConn) Close() {
c.Lock()
c.countClose++
c.Unlock()
}
func (c *testConn) GetClose() int {
c.Lock()
defer c.Unlock()
return c.countClose
}
func TestStart(t *testing.T) { func TestStart(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
globalConn := &testConn{}
database.RegisterAdapter("a", func(config map[string]interface{}) (database.Connection, error) {
return globalConn, nil
})
database.RegisterAdapter("b", func(config map[string]interface{}) (database.Connection, error) {
return globalConn, nil
})
database.RegisterAdapter("c", func(config map[string]interface{}) (database.Connection, error) {
return globalConn, nil
})
database.RegisterAdapter("d", func(config map[string]interface{}) (database.Connection, error) { database.RegisterAdapter("d", func(config map[string]interface{}) (database.Connection, error) {
return nil, nil return nil, nil
}) })
database.RegisterAdapter("e", func(config map[string]interface{}) (database.Connection, error) { database.RegisterAdapter("e", func(config map[string]interface{}) (database.Connection, error) {
return nil, errors.New("blub") return nil, errors.New("blub")
}) })
allConn, err := Connect(map[string]interface{}{ // Test for PruneNodes (by start)
assert.Nil(quit)
err := Start(runtime.DatabaseConfig{
DeleteInterval: runtime.Duration{Duration: time.Millisecond},
Connection: map[string]interface{}{
"a": []map[string]interface{}{ "a": []map[string]interface{}{
map[string]interface{}{ map[string]interface{}{
"enable": false, "enable": false,
@ -117,28 +49,10 @@ func TestStart(t *testing.T) {
"path": "d0", "path": "d0",
}, },
}, },
},
}) })
assert.NoError(err) assert.NoError(err)
assert.NotNil(quit)
assert.Equal(0, globalConn.GetNode())
allConn.InsertNode(nil)
assert.Equal(3, globalConn.GetNode())
assert.Equal(0, globalConn.GetLink())
allConn.InsertLink(nil, time.Now())
assert.Equal(3, globalConn.GetLink())
assert.Equal(0, globalConn.GetGlobal())
allConn.InsertGlobals(nil, time.Now(), runtime.GLOBAL_SITE)
assert.Equal(3, globalConn.GetGlobal())
assert.Equal(0, globalConn.GetPrune())
allConn.PruneNodes(time.Second)
assert.Equal(3, globalConn.GetPrune())
assert.Equal(0, globalConn.GetClose())
allConn.Close()
assert.Equal(3, globalConn.GetClose())
_, err = Connect(map[string]interface{}{ _, err = Connect(map[string]interface{}{
"e": []map[string]interface{}{ "e": []map[string]interface{}{
@ -147,10 +61,9 @@ func TestStart(t *testing.T) {
}) })
assert.Error(err) assert.Error(err)
// wrong format -> the only panic in Register // wrong format
assert.Panics(func() { _, err = Connect(map[string]interface{}{
Connect(map[string]interface{}{
"e": true, "e": true,
}) })
}) assert.Error(err)
} }

View File

@ -1,40 +0,0 @@
package database
import (
"time"
"github.com/FreifunkBremen/yanic/runtime"
)
var quit chan struct{}
// Start workers of database
// WARNING: Do not override this function
// you should use New()
func Start(conn Connection, config *runtime.Config) {
quit = make(chan struct{})
go deleteWorker(conn, config.Database.DeleteInterval.Duration, config.Database.DeleteAfter.Duration)
}
func Close(conn Connection) {
if quit != nil {
close(quit)
}
if conn != nil {
conn.Close()
}
}
// prunes node-specific data periodically
func deleteWorker(conn Connection, deleteInterval time.Duration, deleteAfter time.Duration) {
ticker := time.NewTicker(deleteInterval)
for {
select {
case <-ticker.C:
conn.PruneNodes(deleteAfter)
case <-quit:
ticker.Stop()
return
}
}
}

View File

@ -1,69 +0,0 @@
package database
import (
"sync"
"testing"
"time"
"github.com/FreifunkBremen/yanic/runtime"
"github.com/stretchr/testify/assert"
)
type testConn struct {
Connection
countClose int
countPrune int
sync.Mutex
}
func (c *testConn) Close() {
c.Lock()
c.countClose++
c.Unlock()
}
func (c *testConn) GetClose() int {
c.Lock()
defer c.Unlock()
return c.countClose
}
func (c *testConn) PruneNodes(time.Duration) {
c.Lock()
c.countPrune++
c.Unlock()
}
func (c *testConn) GetPruneNodes() int {
c.Lock()
defer c.Unlock()
return c.countPrune
}
func TestStart(t *testing.T) {
assert := assert.New(t)
conn := &testConn{}
config := &runtime.Config{
Database: struct {
DeleteInterval runtime.Duration `toml:"delete_interval"`
DeleteAfter runtime.Duration `toml:"delete_after"`
Connection map[string]interface{}
}{
DeleteInterval: runtime.Duration{Duration: time.Millisecond * 10},
},
}
assert.Nil(quit)
Start(conn, config)
assert.NotNil(quit)
assert.Equal(0, conn.GetPruneNodes())
time.Sleep(time.Millisecond * 12)
assert.Equal(1, conn.GetPruneNodes())
assert.Equal(0, conn.GetClose())
Close(conn)
assert.NotNil(quit)
assert.Equal(1, conn.GetClose())
time.Sleep(time.Millisecond * 12) // to reach timer.Stop() line
}

View File

@ -1,63 +1,45 @@
package all package all
import ( import (
"log" "sync"
"time"
"github.com/FreifunkBremen/yanic/output" "github.com/FreifunkBremen/yanic/output"
"github.com/FreifunkBremen/yanic/runtime" "github.com/FreifunkBremen/yanic/runtime"
) )
type Output struct { var quit chan struct{}
output.Output var wg = sync.WaitGroup{}
list map[int]output.Output var outputA output.Output
filter map[int]filterConfig
}
func Register(configuration map[string]interface{}) (output.Output, error) { func Start(nodes *runtime.Nodes, config runtime.NodesConfig) (err error) {
list := make(map[int]output.Output) outputA, err = Register(config.Output)
filter := make(map[int]filterConfig)
i := 1
allOutputs := configuration
for outputType, outputRegister := range output.Adapters {
configForOutput := allOutputs[outputType]
if configForOutput == nil {
log.Printf("the output type '%s' has no configuration\n", outputType)
continue
}
outputConfigs, ok := configForOutput.([]map[string]interface{})
if !ok {
log.Panicf("the output type '%s' has the wrong format\n", outputType)
}
for _, config := range outputConfigs {
if c, ok := config["enable"].(bool); ok && !c {
continue
}
output, err := outputRegister(config)
if err != nil { if err != nil {
return nil, err return
} }
if output == nil { quit = make(chan struct{})
continue wg.Add(1)
} go saveWorker(nodes, config.SaveInterval.Duration)
list[i] = output return
if c := config["filter"]; c != nil {
filter[i] = config["filter"].(map[string]interface{})
}
i++
}
}
return &Output{list: list, filter: filter}, nil
} }
func (o *Output) Save(nodes *runtime.Nodes) { func Close() {
for i, item := range o.list { close(quit)
var filteredNodes *runtime.Nodes wg.Wait()
if config := o.filter[i]; config != nil { quit = nil
filteredNodes = config.filtering(nodes)
} else {
filteredNodes = filterConfig{}.filtering(nodes)
} }
item.Save(filteredNodes) // save periodically to output
func saveWorker(nodes *runtime.Nodes, saveInterval time.Duration) {
ticker := time.NewTicker(saveInterval)
for {
select {
case <-ticker.C:
outputA.Save(nodes)
case <-quit:
ticker.Stop()
wg.Done()
return
}
} }
} }

64
output/all/output.go Normal file
View File

@ -0,0 +1,64 @@
package all
import (
"fmt"
"log"
"github.com/FreifunkBremen/yanic/output"
"github.com/FreifunkBremen/yanic/runtime"
)
type Output struct {
output.Output
list map[int]output.Output
filter map[int]filterConfig
}
func Register(configuration map[string]interface{}) (output.Output, error) {
list := make(map[int]output.Output)
filter := make(map[int]filterConfig)
i := 1
allOutputs := configuration
for outputType, outputRegister := range output.Adapters {
configForOutput := allOutputs[outputType]
if configForOutput == nil {
log.Printf("the output type '%s' has no configuration\n", outputType)
continue
}
outputConfigs, ok := configForOutput.([]map[string]interface{})
if !ok {
return nil, fmt.Errorf("the output type '%s' has the wrong format", outputType)
}
for _, config := range outputConfigs {
if c, ok := config["enable"].(bool); ok && !c {
continue
}
output, err := outputRegister(config)
if err != nil {
return nil, err
}
if output == nil {
continue
}
list[i] = output
if c := config["filter"]; c != nil {
filter[i] = config["filter"].(map[string]interface{})
}
i++
}
}
return &Output{list: list, filter: filter}, nil
}
func (o *Output) Save(nodes *runtime.Nodes) {
for i, item := range o.list {
var filteredNodes *runtime.Nodes
if config := o.filter[i]; config != nil {
filteredNodes = config.filtering(nodes)
} else {
filteredNodes = filterConfig{}.filtering(nodes)
}
item.Save(filteredNodes)
}
}

View File

@ -89,10 +89,9 @@ func TestStart(t *testing.T) {
}) })
assert.Error(err) assert.Error(err)
// wrong format -> the only panic in Register // wrong format
assert.Panics(func() { _, err = Register(map[string]interface{}{
Register(map[string]interface{}{
"e": true, "e": true,
}) })
}) assert.Error(err)
} }

View File

@ -1,40 +0,0 @@
package output
import (
"sync"
"time"
"github.com/FreifunkBremen/yanic/runtime"
)
var quit chan struct{}
var wg = sync.WaitGroup{}
// Start workers of database
// WARNING: Do not override this function
// you should use New()
func Start(output Output, nodes *runtime.Nodes, config *runtime.Config) {
quit = make(chan struct{})
wg.Add(1)
go saveWorker(output, nodes, config.Nodes.SaveInterval.Duration)
}
func Close() {
close(quit)
wg.Wait()
}
// save periodically to output
func saveWorker(output Output, nodes *runtime.Nodes, saveInterval time.Duration) {
ticker := time.NewTicker(saveInterval)
for {
select {
case <-ticker.C:
output.Save(nodes)
case <-quit:
wg.Done()
ticker.Stop()
return
}
}
}

View File

@ -1,57 +0,0 @@
package output
import (
"sync"
"testing"
"time"
"github.com/FreifunkBremen/yanic/runtime"
"github.com/stretchr/testify/assert"
)
type testConn struct {
Output
countSave int
sync.Mutex
}
func (c *testConn) Save(nodes *runtime.Nodes) {
c.Lock()
c.countSave++
c.Unlock()
}
func (c *testConn) Get() int {
c.Lock()
defer c.Unlock()
return c.countSave
}
func TestStart(t *testing.T) {
assert := assert.New(t)
conn := &testConn{}
config := &runtime.Config{
Nodes: struct {
StatePath string `toml:"state_path"`
SaveInterval runtime.Duration `toml:"save_interval"`
OfflineAfter runtime.Duration `toml:"offline_after"`
PruneAfter runtime.Duration `toml:"prune_after"`
Output map[string]interface{}
}{
SaveInterval: runtime.Duration{Duration: time.Millisecond * 10},
},
}
assert.Nil(quit)
Start(conn, nil, config)
assert.NotNil(quit)
assert.Equal(0, conn.Get())
time.Sleep(time.Millisecond * 12)
assert.Equal(1, conn.Get())
time.Sleep(time.Millisecond * 12)
Close()
assert.Equal(2, conn.Get())
}

View File

@ -21,24 +21,28 @@ type Config struct {
Bind string `toml:"bind"` Bind string `toml:"bind"`
Webroot string `toml:"webroot"` Webroot string `toml:"webroot"`
} }
Nodes struct { Nodes NodesConfig
Meshviewer struct {
Version int `toml:"version"`
NodesPath string `toml:"nodes_path"`
GraphPath string `toml:"graph_path"`
}
Database DatabaseConfig
}
type NodesConfig struct {
StatePath string `toml:"state_path"` StatePath string `toml:"state_path"`
SaveInterval Duration `toml:"save_interval"` // Save nodes periodically SaveInterval Duration `toml:"save_interval"` // Save nodes periodically
OfflineAfter Duration `toml:"offline_after"` // Set node to offline if not seen within this period OfflineAfter Duration `toml:"offline_after"` // Set node to offline if not seen within this period
PruneAfter Duration `toml:"prune_after"` // Remove nodes after n days of inactivity PruneAfter Duration `toml:"prune_after"` // Remove nodes after n days of inactivity
Output map[string]interface{} Output map[string]interface{}
} }
Meshviewer struct {
Version int `toml:"version"` type DatabaseConfig struct {
NodesPath string `toml:"nodes_path"`
GraphPath string `toml:"graph_path"`
}
Database struct {
DeleteInterval Duration `toml:"delete_interval"` // Delete stats of nodes every n minutes DeleteInterval Duration `toml:"delete_interval"` // Delete stats of nodes every n minutes
DeleteAfter Duration `toml:"delete_after"` // Delete stats of nodes till now-deletetill n minutes DeleteAfter Duration `toml:"delete_after"` // Delete stats of nodes till now-deletetill n minutes
Connection map[string]interface{} Connection map[string]interface{}
} }
}
// ReadConfigFile reads a config model from path of a yml file // ReadConfigFile reads a config model from path of a yml file
func ReadConfigFile(path string) (config *Config, err error) { func ReadConfigFile(path string) (config *Config, err error) {