diff --git a/cmd/freifunkmanager/main.go b/cmd/freifunkmanager/main.go index d1d64f5..b24799e 100644 --- a/cmd/freifunkmanager/main.go +++ b/cmd/freifunkmanager/main.go @@ -13,12 +13,16 @@ import ( configPackage "github.com/FreifunkBremen/freifunkmanager/config" "github.com/FreifunkBremen/freifunkmanager/lib/log" + "github.com/FreifunkBremen/freifunkmanager/runtime" "github.com/FreifunkBremen/freifunkmanager/ssh" + "github.com/FreifunkBremen/freifunkmanager/yanic" ) var ( - configFile string - config *configPackage.Config + configFile string + config *configPackage.Config + nodes *runtime.Nodes + yanicDialer *yanic.Dialer ) func main() { @@ -30,6 +34,13 @@ func main() { log.Log.Info("starting...") sshmanager := ssh.NewManager(config.SSHPrivateKey) + nodes := runtime.NewNodes(config.SSHInterface, sshmanager) + + if config.Yanic.Enable { + yanicDialer := yanic.Dial(config.Yanic.Type, config.Yanic.Address) + yanicDialer.NodeHandler = nodes.AddNode + yanicDialer.Start() + } // Startwebserver router := goji.NewMux() @@ -55,6 +66,9 @@ func main() { // Stop services srv.Close() + if config.Yanic.Enable { + yanicDialer.Close() + } sshmanager.Close() log.Log.Info("stop recieve:", sig) diff --git a/config/main.go b/config/main.go index 8af9fb4..ad70c1b 100644 --- a/config/main.go +++ b/config/main.go @@ -15,11 +15,17 @@ type Config struct { // path to deliver static content Webroot string `toml:"webroot"` - // yanic socket - YanicSocket string `toml:"yanic_socket"` // SSH private key SSHPrivateKey string `toml:"ssh_key"` + SSHInterface string `toml:"ssh_interface"` + + // yanic socket + Yanic struct { + Enable bool `toml:"enable"` + Type string `toml:"type"` + Address string `toml:"address"` + } `toml:"yanic"` } //reads a config model from path of a yml file diff --git a/config_example.conf b/config_example.conf index 93e3296..caaa25d 100644 --- a/config_example.conf +++ b/config_example.conf @@ -1,4 +1,10 @@ webserver_bind = ":8080" webroot = "webroot" -yanic_socket = "" + ssh_key = "/etc/id_rsa" +ssh_interface = "wlp4s0" + +[yanic] +enable = true +type = "unix" +address = "/tmp/yanic-database.socket" diff --git a/runtime/nodes.go b/runtime/nodes.go new file mode 100644 index 0000000..b8efc2c --- /dev/null +++ b/runtime/nodes.go @@ -0,0 +1,42 @@ +package runtime + +import ( + "net" + + yanic "github.com/FreifunkBremen/yanic/runtime" + + "github.com/FreifunkBremen/freifunkmanager/lib/log" + "github.com/FreifunkBremen/freifunkmanager/ssh" +) + +type Nodes struct { + node map[string]struct{} + ssh *ssh.Manager + iface string +} + +func NewNodes(iface string, mgmt *ssh.Manager) *Nodes { + return &Nodes{ + node: make(map[string]struct{}), + ssh: mgmt, + iface: iface, + } +} + +func (nodes *Nodes) AddNode(node *yanic.Node) { + logger := log.Log.WithField("method", "AddNode").WithField("node_id", node.Nodeinfo.NodeID) + // session := nodes.ssh.ConnectTo(node.Address) + if _, ok := nodes.node[node.Address.String()]; ok { + logger.Debugf("know already these node") + return + } + address := net.TCPAddr{IP: node.Address, Port: 22, Zone: nodes.iface} + result, err := nodes.ssh.RunOn(address, "uptime") + if err != nil { + logger.Error("init ssh command not run") + return + } + uptime := ssh.SSHResultToString(result) + logger.Infof("new node with uptime: %s", uptime) + nodes.node[node.Address.String()] = struct{}{} +} diff --git a/ssh/execute.go b/ssh/execute.go index ba6df0d..310137c 100644 --- a/ssh/execute.go +++ b/ssh/execute.go @@ -14,9 +14,11 @@ func (m *Manager) ExecuteEverywhere(cmd string) { } } -func (m *Manager) ExecuteOn(host net.IP, cmd string) { - client := m.ConnectTo(host) - m.execute(host.String(), client, cmd) +func (m *Manager) ExecuteOn(addr net.TCPAddr, cmd string) { + client := m.ConnectTo(addr) + if client != nil { + m.execute(addr.IP.String(), client, cmd) + } } func (m *Manager) execute(host string, client *ssh.Client, cmd string) { diff --git a/ssh/execute_test.go b/ssh/execute_test.go index 73784c2..b47ed7c 100644 --- a/ssh/execute_test.go +++ b/ssh/execute_test.go @@ -13,11 +13,11 @@ func TestExecute(t *testing.T) { mgmt := NewManager("~/.ssh/id_rsa") assert.NotNil(mgmt, "no new manager created") - mgmt.ConnectTo(net.ParseIP("2a06:8782:ffbb:1337::127")) + mgmt.ConnectTo(net.TCPAddr{IP: net.ParseIP("2a06:8782:ffbb:1337::127"), Port: 22}) mgmt.ExecuteEverywhere("echo $HOSTNAME") - mgmt.ExecuteOn(net.ParseIP("2a06:8782:ffbb:1337::127"), "uptime") - mgmt.ExecuteOn(net.ParseIP("2a06:8782:ffbb:1337::127"), "echo $HOSTNAME") + mgmt.ExecuteOn(net.TCPAddr{IP: net.ParseIP("2a06:8782:ffbb:1337::127"), Port: 22}, "uptime") + mgmt.ExecuteOn(net.TCPAddr{IP: net.ParseIP("2a06:8782:ffbb:1337::127"), Port: 22}, "echo $HOSTNAME") mgmt.Close() } diff --git a/ssh/manager.go b/ssh/manager.go index de6f158..c9c8bf4 100644 --- a/ssh/manager.go +++ b/ssh/manager.go @@ -37,24 +37,21 @@ func NewManager(file string) *Manager { } } -func (m *Manager) ConnectTo(host net.IP) *ssh.Client { +func (m *Manager) ConnectTo(addr net.TCPAddr) *ssh.Client { m.clientsMUX.Lock() defer m.clientsMUX.Unlock() - if client, ok := m.clients[host.String()]; ok { + if client, ok := m.clients[addr.IP.String()]; ok { return client } - addr := net.TCPAddr{ - IP: host, - Port: 22, - } + client, err := ssh.Dial("tcp", addr.String(), m.config) if err != nil { log.Log.Error(err) return nil } - m.clients[host.String()] = client + m.clients[addr.IP.String()] = client return client } diff --git a/ssh/manager_test.go b/ssh/manager_test.go index 38fe7b0..85be983 100644 --- a/ssh/manager_test.go +++ b/ssh/manager_test.go @@ -13,7 +13,7 @@ func TestManager(t *testing.T) { mgmt := NewManager("~/.ssh/id_rsa") assert.NotNil(mgmt, "no new manager created") - mgmt.ConnectTo(net.ParseIP("2a06:8782:ffbb:1337::127")) + mgmt.ConnectTo(net.TCPAddr{IP: net.ParseIP("2a06:8782:ffbb:1337::127"), Port: 22}) mgmt.Close() } diff --git a/ssh/run.go b/ssh/run.go index 44bb8a9..b5f430c 100644 --- a/ssh/run.go +++ b/ssh/run.go @@ -2,6 +2,7 @@ package ssh import ( "bytes" + "errors" "io" "net" @@ -34,9 +35,12 @@ func (m *Manager) RunEverywhere(cmd string, handler SSHResultHandler) { } } -func (m *Manager) RunOn(host net.IP, cmd string) ([]byte, error) { - client := m.ConnectTo(host) - return m.run(host.String(), client, cmd) +func (m *Manager) RunOn(addr net.TCPAddr, cmd string) ([]byte, error) { + client := m.ConnectTo(addr) + if client != nil { + return m.run(addr.IP.String(), client, cmd) + } + return nil, errors.New("no connection for runOn") } func (m *Manager) run(host string, client *ssh.Client, cmd string) ([]byte, error) { diff --git a/ssh/run_test.go b/ssh/run_test.go index 0ec0609..185baf1 100644 --- a/ssh/run_test.go +++ b/ssh/run_test.go @@ -14,14 +14,14 @@ func TestRun(t *testing.T) { mgmt := NewManager("~/.ssh/id_rsa") assert.NotNil(mgmt, "no new manager created") - mgmt.ConnectTo(net.ParseIP("2a06:8782:ffbb:1337::127")) + mgmt.ConnectTo(net.TCPAddr{IP: net.ParseIP("2a06:8782:ffbb:1337::127"), Port: 22}) mgmt.RunEverywhere("echo 13", SSHResultToStringHandler(func(result string, err error) { assert.NoError(err) assert.Equal("13", result) })) - result, err := mgmt.RunOn(net.ParseIP("2a06:8782:ffbb:1337::127"), "echo 16") + result, err := mgmt.RunOn(net.TCPAddr{IP: net.ParseIP("2a06:8782:ffbb:1337::127"), Port: 22}, "echo 16") assert.NoError(err) str := SSHResultToString(result) diff --git a/yanic/dial.go b/yanic/dial.go new file mode 100644 index 0000000..75d6b95 --- /dev/null +++ b/yanic/dial.go @@ -0,0 +1,87 @@ +package yanic + +import ( + "encoding/json" + "net" + + yanicSocket "github.com/FreifunkBremen/yanic/database/socket" + yanic "github.com/FreifunkBremen/yanic/runtime" + + "github.com/FreifunkBremen/freifunkmanager/lib/log" +) + +type Dialer struct { + conn net.Conn + queue chan yanicSocket.EventMessage + quit chan struct{} + NodeHandler func(*yanic.Node) + GlobalsHandler func(*yanic.GlobalStats) + PruneNodesHandler func() +} + +func Dial(ctype, addr string) *Dialer { + conn, err := net.Dial(ctype, addr) + if err != nil { + log.Log.Panic("yanic dial failed") + } + dialer := &Dialer{ + conn: conn, + queue: make(chan yanicSocket.EventMessage), + quit: make(chan struct{}), + } + + return dialer +} + +func (d *Dialer) Start() { + go d.reciever() + go d.parser() +} +func (d *Dialer) Close() { + close(d.quit) + d.conn.Close() + close(d.queue) +} + +func (d *Dialer) reciever() { + decoder := json.NewDecoder(d.conn) + var msg yanicSocket.EventMessage + + for { + select { + case <-d.quit: + return + default: + decoder.Decode(&msg) + d.queue <- msg + } + } +} + +func (d *Dialer) parser() { + for msg := range d.queue { + switch msg.Event { + case "insert_node": + if d.NodeHandler != nil { + var node yanic.Node + + obj, _ := json.Marshal(msg.Body) + json.Unmarshal(obj, &node) + d.NodeHandler(&node) + } + case "insert_globals": + if d.GlobalsHandler != nil { + var globals yanic.GlobalStats + + obj, _ := json.Marshal(msg.Body) + json.Unmarshal(obj, &globals) + + d.GlobalsHandler(&globals) + } + case "prune_nodes": + if d.PruneNodesHandler != nil { + d.PruneNodesHandler() + } + } + } +} diff --git a/yanic/dial_test.go b/yanic/dial_test.go new file mode 100644 index 0000000..66da933 --- /dev/null +++ b/yanic/dial_test.go @@ -0,0 +1,18 @@ +package yanic + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestLog(t *testing.T) { + assert := assert.New(t) + + d := Dial("unix", "/tmp/yanic-database.socket") + assert.NotNil(d) + d.Start() + time.Sleep(time.Duration(3) * time.Minute) + d.Close() +}