[TASK] learn nodes by yanic socket-database

This commit is contained in:
Martin Geno 2017-05-06 19:44:37 +02:00
parent f27a058c5a
commit 34ee3c5e62
No known key found for this signature in database
GPG Key ID: F0D39A37E925E941
12 changed files with 200 additions and 24 deletions

View File

@ -13,12 +13,16 @@ import (
configPackage "github.com/FreifunkBremen/freifunkmanager/config"
"github.com/FreifunkBremen/freifunkmanager/lib/log"
"github.com/FreifunkBremen/freifunkmanager/runtime"
"github.com/FreifunkBremen/freifunkmanager/ssh"
"github.com/FreifunkBremen/freifunkmanager/yanic"
)
var (
configFile string
config *configPackage.Config
nodes *runtime.Nodes
yanicDialer *yanic.Dialer
)
func main() {
@ -30,6 +34,13 @@ func main() {
log.Log.Info("starting...")
sshmanager := ssh.NewManager(config.SSHPrivateKey)
nodes := runtime.NewNodes(config.SSHInterface, sshmanager)
if config.Yanic.Enable {
yanicDialer := yanic.Dial(config.Yanic.Type, config.Yanic.Address)
yanicDialer.NodeHandler = nodes.AddNode
yanicDialer.Start()
}
// Startwebserver
router := goji.NewMux()
@ -55,6 +66,9 @@ func main() {
// Stop services
srv.Close()
if config.Yanic.Enable {
yanicDialer.Close()
}
sshmanager.Close()
log.Log.Info("stop recieve:", sig)

View File

@ -15,11 +15,17 @@ type Config struct {
// path to deliver static content
Webroot string `toml:"webroot"`
// yanic socket
YanicSocket string `toml:"yanic_socket"`
// SSH private key
SSHPrivateKey string `toml:"ssh_key"`
SSHInterface string `toml:"ssh_interface"`
// yanic socket
Yanic struct {
Enable bool `toml:"enable"`
Type string `toml:"type"`
Address string `toml:"address"`
} `toml:"yanic"`
}
//reads a config model from path of a yml file

View File

@ -1,4 +1,10 @@
webserver_bind = ":8080"
webroot = "webroot"
yanic_socket = ""
ssh_key = "/etc/id_rsa"
ssh_interface = "wlp4s0"
[yanic]
enable = true
type = "unix"
address = "/tmp/yanic-database.socket"

42
runtime/nodes.go Normal file
View File

@ -0,0 +1,42 @@
package runtime
import (
"net"
yanic "github.com/FreifunkBremen/yanic/runtime"
"github.com/FreifunkBremen/freifunkmanager/lib/log"
"github.com/FreifunkBremen/freifunkmanager/ssh"
)
type Nodes struct {
node map[string]struct{}
ssh *ssh.Manager
iface string
}
func NewNodes(iface string, mgmt *ssh.Manager) *Nodes {
return &Nodes{
node: make(map[string]struct{}),
ssh: mgmt,
iface: iface,
}
}
func (nodes *Nodes) AddNode(node *yanic.Node) {
logger := log.Log.WithField("method", "AddNode").WithField("node_id", node.Nodeinfo.NodeID)
// session := nodes.ssh.ConnectTo(node.Address)
if _, ok := nodes.node[node.Address.String()]; ok {
logger.Debugf("know already these node")
return
}
address := net.TCPAddr{IP: node.Address, Port: 22, Zone: nodes.iface}
result, err := nodes.ssh.RunOn(address, "uptime")
if err != nil {
logger.Error("init ssh command not run")
return
}
uptime := ssh.SSHResultToString(result)
logger.Infof("new node with uptime: %s", uptime)
nodes.node[node.Address.String()] = struct{}{}
}

View File

@ -14,9 +14,11 @@ func (m *Manager) ExecuteEverywhere(cmd string) {
}
}
func (m *Manager) ExecuteOn(host net.IP, cmd string) {
client := m.ConnectTo(host)
m.execute(host.String(), client, cmd)
func (m *Manager) ExecuteOn(addr net.TCPAddr, cmd string) {
client := m.ConnectTo(addr)
if client != nil {
m.execute(addr.IP.String(), client, cmd)
}
}
func (m *Manager) execute(host string, client *ssh.Client, cmd string) {

View File

@ -13,11 +13,11 @@ func TestExecute(t *testing.T) {
mgmt := NewManager("~/.ssh/id_rsa")
assert.NotNil(mgmt, "no new manager created")
mgmt.ConnectTo(net.ParseIP("2a06:8782:ffbb:1337::127"))
mgmt.ConnectTo(net.TCPAddr{IP: net.ParseIP("2a06:8782:ffbb:1337::127"), Port: 22})
mgmt.ExecuteEverywhere("echo $HOSTNAME")
mgmt.ExecuteOn(net.ParseIP("2a06:8782:ffbb:1337::127"), "uptime")
mgmt.ExecuteOn(net.ParseIP("2a06:8782:ffbb:1337::127"), "echo $HOSTNAME")
mgmt.ExecuteOn(net.TCPAddr{IP: net.ParseIP("2a06:8782:ffbb:1337::127"), Port: 22}, "uptime")
mgmt.ExecuteOn(net.TCPAddr{IP: net.ParseIP("2a06:8782:ffbb:1337::127"), Port: 22}, "echo $HOSTNAME")
mgmt.Close()
}

View File

@ -37,24 +37,21 @@ func NewManager(file string) *Manager {
}
}
func (m *Manager) ConnectTo(host net.IP) *ssh.Client {
func (m *Manager) ConnectTo(addr net.TCPAddr) *ssh.Client {
m.clientsMUX.Lock()
defer m.clientsMUX.Unlock()
if client, ok := m.clients[host.String()]; ok {
if client, ok := m.clients[addr.IP.String()]; ok {
return client
}
addr := net.TCPAddr{
IP: host,
Port: 22,
}
client, err := ssh.Dial("tcp", addr.String(), m.config)
if err != nil {
log.Log.Error(err)
return nil
}
m.clients[host.String()] = client
m.clients[addr.IP.String()] = client
return client
}

View File

@ -13,7 +13,7 @@ func TestManager(t *testing.T) {
mgmt := NewManager("~/.ssh/id_rsa")
assert.NotNil(mgmt, "no new manager created")
mgmt.ConnectTo(net.ParseIP("2a06:8782:ffbb:1337::127"))
mgmt.ConnectTo(net.TCPAddr{IP: net.ParseIP("2a06:8782:ffbb:1337::127"), Port: 22})
mgmt.Close()
}

View File

@ -2,6 +2,7 @@ package ssh
import (
"bytes"
"errors"
"io"
"net"
@ -34,9 +35,12 @@ func (m *Manager) RunEverywhere(cmd string, handler SSHResultHandler) {
}
}
func (m *Manager) RunOn(host net.IP, cmd string) ([]byte, error) {
client := m.ConnectTo(host)
return m.run(host.String(), client, cmd)
func (m *Manager) RunOn(addr net.TCPAddr, cmd string) ([]byte, error) {
client := m.ConnectTo(addr)
if client != nil {
return m.run(addr.IP.String(), client, cmd)
}
return nil, errors.New("no connection for runOn")
}
func (m *Manager) run(host string, client *ssh.Client, cmd string) ([]byte, error) {

View File

@ -14,14 +14,14 @@ func TestRun(t *testing.T) {
mgmt := NewManager("~/.ssh/id_rsa")
assert.NotNil(mgmt, "no new manager created")
mgmt.ConnectTo(net.ParseIP("2a06:8782:ffbb:1337::127"))
mgmt.ConnectTo(net.TCPAddr{IP: net.ParseIP("2a06:8782:ffbb:1337::127"), Port: 22})
mgmt.RunEverywhere("echo 13", SSHResultToStringHandler(func(result string, err error) {
assert.NoError(err)
assert.Equal("13", result)
}))
result, err := mgmt.RunOn(net.ParseIP("2a06:8782:ffbb:1337::127"), "echo 16")
result, err := mgmt.RunOn(net.TCPAddr{IP: net.ParseIP("2a06:8782:ffbb:1337::127"), Port: 22}, "echo 16")
assert.NoError(err)
str := SSHResultToString(result)

87
yanic/dial.go Normal file
View File

@ -0,0 +1,87 @@
package yanic
import (
"encoding/json"
"net"
yanicSocket "github.com/FreifunkBremen/yanic/database/socket"
yanic "github.com/FreifunkBremen/yanic/runtime"
"github.com/FreifunkBremen/freifunkmanager/lib/log"
)
type Dialer struct {
conn net.Conn
queue chan yanicSocket.EventMessage
quit chan struct{}
NodeHandler func(*yanic.Node)
GlobalsHandler func(*yanic.GlobalStats)
PruneNodesHandler func()
}
func Dial(ctype, addr string) *Dialer {
conn, err := net.Dial(ctype, addr)
if err != nil {
log.Log.Panic("yanic dial failed")
}
dialer := &Dialer{
conn: conn,
queue: make(chan yanicSocket.EventMessage),
quit: make(chan struct{}),
}
return dialer
}
func (d *Dialer) Start() {
go d.reciever()
go d.parser()
}
func (d *Dialer) Close() {
close(d.quit)
d.conn.Close()
close(d.queue)
}
func (d *Dialer) reciever() {
decoder := json.NewDecoder(d.conn)
var msg yanicSocket.EventMessage
for {
select {
case <-d.quit:
return
default:
decoder.Decode(&msg)
d.queue <- msg
}
}
}
func (d *Dialer) parser() {
for msg := range d.queue {
switch msg.Event {
case "insert_node":
if d.NodeHandler != nil {
var node yanic.Node
obj, _ := json.Marshal(msg.Body)
json.Unmarshal(obj, &node)
d.NodeHandler(&node)
}
case "insert_globals":
if d.GlobalsHandler != nil {
var globals yanic.GlobalStats
obj, _ := json.Marshal(msg.Body)
json.Unmarshal(obj, &globals)
d.GlobalsHandler(&globals)
}
case "prune_nodes":
if d.PruneNodesHandler != nil {
d.PruneNodesHandler()
}
}
}
}

18
yanic/dial_test.go Normal file
View File

@ -0,0 +1,18 @@
package yanic
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestLog(t *testing.T) {
assert := assert.New(t)
d := Dial("unix", "/tmp/yanic-database.socket")
assert.NotNil(d)
d.Start()
time.Sleep(time.Duration(3) * time.Minute)
d.Close()
}