From c28a14010d3719aaac16596157645b81c5f48924 Mon Sep 17 00:00:00 2001 From: Martin Geno Date: Fri, 7 Jul 2017 08:12:25 +0200 Subject: [PATCH] enable updater + some mutex --- config_example.conf | 4 ++-- runtime/node.go | 55 ++++++++------------------------------------ runtime/node_test.go | 2 -- runtime/nodes.go | 4 ++-- ssh/list.go | 7 ++++++ ssh/run.go | 38 +++++++++++------------------- websocket/client.go | 2 ++ 7 files changed, 36 insertions(+), 76 deletions(-) diff --git a/config_example.conf b/config_example.conf index 4ae0f2c..b623c57 100644 --- a/config_example.conf +++ b/config_example.conf @@ -8,5 +8,5 @@ ssh_interface = "wlp4s0" [yanic] enable = true -type = "unix" -address = "/tmp/yanic-database.socket" +type = "tcp" +address = "localhost:8081" diff --git a/runtime/node.go b/runtime/node.go index 36f9d10..57d1255 100644 --- a/runtime/node.go +++ b/runtime/node.go @@ -63,24 +63,24 @@ func NewNode(nodeOrigin *yanicRuntime.Node) *Node { func (n *Node) SSHUpdate(ssh *ssh.Manager, iface string, oldnode *Node) { addr := n.GetAddress(iface) + if oldnode == nil || n.Hostname != oldnode.Hostname { ssh.ExecuteOn(addr, fmt.Sprintf(SSHUpdateHostname, n.Hostname)) } if oldnode == nil || n.Owner != oldnode.Owner { ssh.ExecuteOn(addr, fmt.Sprintf(SSHUpdateOwner, n.Owner)) } - if oldnode == nil || !locationEqual(&n.Location, &oldnode.Location) { + if oldnode == nil || !locationEqual(n.Location, oldnode.Location) { ssh.ExecuteOn(addr, fmt.Sprintf(SSHUpdateLocation, n.Location.Latitude, n.Location.Longtitude)) } - if oldnode == nil || !wirelessEqual(&n.Wireless, &oldnode.Wireless) { + if oldnode == nil || !wirelessEqual(n.Wireless, oldnode.Wireless) { ssh.ExecuteOn(addr, fmt.Sprintf(SSHUpdateWifiFreq24, n.Wireless.Channel24, n.Wireless.TxPower24, n.Wireless.Channel24, n.Wireless.TxPower24)) ssh.ExecuteOn(addr, fmt.Sprintf(SSHUpdateWifiFreq5, n.Wireless.Channel5, n.Wireless.TxPower5, n.Wireless.Channel5, n.Wireless.TxPower5)) ssh.ExecuteOn(addr, "wifi") - log.Log.Info("[cmd] wifi", n.NodeID) - if oldnode != nil { - oldnode.Wireless = n.Wireless - } + // send warning for running wifi, because it kicks clients from node + log.Log.Warn("[cmd] wifi ", n.NodeID) } + oldnode = n } func (n *Node) GetAddress(iface string) net.TCPAddr { return net.TCPAddr{IP: n.Address, Port: 22, Zone: iface} @@ -115,48 +115,16 @@ func (n *Node) IsEqual(node *Node) bool { if n.Owner != node.Owner { return false } - if !locationEqual(&n.Location, &node.Location) { + if !locationEqual(n.Location, node.Location) { return false } - if !wirelessEqual(&n.Wireless, &node.Wireless) { - return false - } - return true -} -func (n *Node) IsEqualNode(node *yanicRuntime.Node) bool { - nodeinfo := node.Nodeinfo - if nodeinfo == nil { - return false - } - owner := nodeinfo.Owner - if owner == nil { - return false - } - if n.NodeID != nodeinfo.NodeID { - return false - } - if !bytes.Equal(n.Address, node.Address) { - return false - } - if n.Hostname != nodeinfo.Hostname { - return false - } - if n.Owner != owner.Contact { - return false - } - if !locationEqual(&n.Location, nodeinfo.Location) { - return false - } - if !wirelessEqual(&n.Wireless, nodeinfo.Wireless) { + if !wirelessEqual(n.Wireless, node.Wireless) { return false } return true } -func locationEqual(a, b *data.Location) bool { - if a == nil || b == nil { - return false - } +func locationEqual(a, b data.Location) bool { if a.Latitude != b.Latitude { return false } @@ -169,10 +137,7 @@ func locationEqual(a, b *data.Location) bool { return true } -func wirelessEqual(a, b *data.Wireless) bool { - if a == nil || b == nil { - return false - } +func wirelessEqual(a, b data.Wireless) bool { if a.Channel24 != b.Channel24 { return false } diff --git a/runtime/node_test.go b/runtime/node_test.go index ff6c9b9..010ae0f 100644 --- a/runtime/node_test.go +++ b/runtime/node_test.go @@ -26,10 +26,8 @@ func TestNode(t *testing.T) { n2 := NewNode(node1) assert.True(n2.IsEqual(n1)) - assert.True(n2.IsEqualNode(node1)) node1.Nodeinfo.Owner.Contact = "blub2" - assert.False(n2.IsEqualNode(node1)) n2.Update(node1) assert.False(n2.IsEqual(n1)) } diff --git a/runtime/nodes.go b/runtime/nodes.go index 410d17c..df0a4a6 100644 --- a/runtime/nodes.go +++ b/runtime/nodes.go @@ -58,7 +58,7 @@ func (nodes *Nodes) LearnNode(n *yanic.Node) { // session := nodes.ssh.ConnectTo(node.Address) result, err := nodes.ssh.RunOn(node.GetAddress(nodes.iface), "uptime") if err != nil { - logger.Error("init ssh command not run", err) + logger.Debug("init ssh command not run", err) return } uptime := ssh.SSHResultToString(result) @@ -109,7 +109,7 @@ func (nodes *Nodes) Updater() { } func (nodes *Nodes) load() { - if f, err := os.Open(nodes.statePath); err == nil { // transform data to legacy meshviewer + if f, err := os.Open(nodes.statePath); err == nil { if err = json.NewDecoder(f).Decode(nodes); err == nil { log.Log.Infof("loaded %d nodes", len(nodes.List)) } else { diff --git a/ssh/list.go b/ssh/list.go index 9bbf509..7aa0c10 100644 --- a/ssh/list.go +++ b/ssh/list.go @@ -10,6 +10,7 @@ type List struct { Command string `json:"cmd"` Clients map[string]*ListResult `json:"clients"` sshManager *Manager + sync.Mutex } type ListResult struct { ssh *ssh.Client @@ -24,6 +25,8 @@ func (m *Manager) CreateList(cmd string) *List { sshManager: m, Clients: make(map[string]*ListResult), } + m.clientsMUX.Lock() + defer m.clientsMUX.Unlock() for host, client := range m.clients { list.Clients[host] = &ListResult{Running: true, ssh: client} } @@ -43,6 +46,10 @@ func (l List) Run() { func (l List) runlistelement(host string, client *ListResult, wg *sync.WaitGroup) { defer wg.Done() result, err := l.sshManager.run(host, client.ssh, l.Command) + + l.Lock() + defer l.Unlock() + client.Running = false if err != nil { client.WithError = true diff --git a/ssh/run.go b/ssh/run.go index 02c75ad..0cfca34 100644 --- a/ssh/run.go +++ b/ssh/run.go @@ -2,26 +2,23 @@ package ssh import ( "bytes" - "io" "net" "github.com/genofire/golang-lib/log" "golang.org/x/crypto/ssh" ) -type SSHResultHandler func([]byte, error) +type SSHResultHandler func(string, error) -type SSHResultStringHandler func(string, error) - -func SSHResultToString(result []byte) string { +func SSHResultToString(result string) string { if len(result) > 0 { result = result[:len(result)-1] } - return string(result) + return result } -func SSHResultToStringHandler(handler SSHResultStringHandler) SSHResultHandler { - return func(result []byte, err error) { +func SSHResultToStringHandler(handler SSHResultHandler) SSHResultHandler { + return func(result string, err error) { handler(SSHResultToString(result), err) } } @@ -33,15 +30,15 @@ func (m *Manager) RunEverywhere(cmd string, handler SSHResultHandler) { } } -func (m *Manager) RunOn(addr net.TCPAddr, cmd string) ([]byte, error) { +func (m *Manager) RunOn(addr net.TCPAddr, cmd string) (string, error) { client, err := m.ConnectTo(addr) if err != nil { - return nil, err + return "", err } return m.run(addr.IP.String(), client, cmd) } -func (m *Manager) run(host string, client *ssh.Client, cmd string) ([]byte, error) { +func (m *Manager) run(host string, client *ssh.Client, cmd string) (string, error) { session, err := client.NewSession() defer session.Close() @@ -50,28 +47,19 @@ func (m *Manager) run(host string, client *ssh.Client, cmd string) ([]byte, erro m.clientsMUX.Lock() delete(m.clients, host) m.clientsMUX.Unlock() - return nil, err + return "", err } - stdout, err := session.StdoutPipe() buffer := &bytes.Buffer{} - go io.Copy(buffer, stdout) + session.Stdout = buffer if err != nil { log.Log.Warnf("can not create pipe for run on %s: %s", host, err) delete(m.clients, host) - return nil, err + return "", err } err = session.Run(cmd) if err != nil { log.Log.Warnf("could not run %s on %s: %s", cmd, host, err) - return nil, err + return "", err } - var result []byte - for { - b, err := buffer.ReadByte() - if err != nil { - break - } - result = append(result, b) - } - return result, nil + return buffer.String(), nil } diff --git a/websocket/client.go b/websocket/client.go index f6cbd79..b884595 100644 --- a/websocket/client.go +++ b/websocket/client.go @@ -86,7 +86,9 @@ func (c *Client) handleMessage(msg *Message) { } cmd := commands.AddCommand(msg.Command) w := worker.NewWorker(time.Millisecond*300, func() { + cmd.Lock() SendAll(Message{Type: MessageTypeCommand, Command: cmd}) + cmd.Unlock() }) go w.Start() go cmd.Run(func() {