enable updater + some mutex

This commit is contained in:
Martin Geno 2017-07-07 08:12:25 +02:00
parent 87a2f08c85
commit c28a14010d
No known key found for this signature in database
GPG Key ID: F0D39A37E925E941
7 changed files with 36 additions and 76 deletions

View File

@ -8,5 +8,5 @@ ssh_interface = "wlp4s0"
[yanic] [yanic]
enable = true enable = true
type = "unix" type = "tcp"
address = "/tmp/yanic-database.socket" address = "localhost:8081"

View File

@ -63,24 +63,24 @@ func NewNode(nodeOrigin *yanicRuntime.Node) *Node {
func (n *Node) SSHUpdate(ssh *ssh.Manager, iface string, oldnode *Node) { func (n *Node) SSHUpdate(ssh *ssh.Manager, iface string, oldnode *Node) {
addr := n.GetAddress(iface) addr := n.GetAddress(iface)
if oldnode == nil || n.Hostname != oldnode.Hostname { if oldnode == nil || n.Hostname != oldnode.Hostname {
ssh.ExecuteOn(addr, fmt.Sprintf(SSHUpdateHostname, n.Hostname)) ssh.ExecuteOn(addr, fmt.Sprintf(SSHUpdateHostname, n.Hostname))
} }
if oldnode == nil || n.Owner != oldnode.Owner { if oldnode == nil || n.Owner != oldnode.Owner {
ssh.ExecuteOn(addr, fmt.Sprintf(SSHUpdateOwner, n.Owner)) ssh.ExecuteOn(addr, fmt.Sprintf(SSHUpdateOwner, n.Owner))
} }
if oldnode == nil || !locationEqual(&n.Location, &oldnode.Location) { if oldnode == nil || !locationEqual(n.Location, oldnode.Location) {
ssh.ExecuteOn(addr, fmt.Sprintf(SSHUpdateLocation, n.Location.Latitude, n.Location.Longtitude)) ssh.ExecuteOn(addr, fmt.Sprintf(SSHUpdateLocation, n.Location.Latitude, n.Location.Longtitude))
} }
if oldnode == nil || !wirelessEqual(&n.Wireless, &oldnode.Wireless) { if oldnode == nil || !wirelessEqual(n.Wireless, oldnode.Wireless) {
ssh.ExecuteOn(addr, fmt.Sprintf(SSHUpdateWifiFreq24, n.Wireless.Channel24, n.Wireless.TxPower24, n.Wireless.Channel24, n.Wireless.TxPower24)) ssh.ExecuteOn(addr, fmt.Sprintf(SSHUpdateWifiFreq24, n.Wireless.Channel24, n.Wireless.TxPower24, n.Wireless.Channel24, n.Wireless.TxPower24))
ssh.ExecuteOn(addr, fmt.Sprintf(SSHUpdateWifiFreq5, n.Wireless.Channel5, n.Wireless.TxPower5, n.Wireless.Channel5, n.Wireless.TxPower5)) ssh.ExecuteOn(addr, fmt.Sprintf(SSHUpdateWifiFreq5, n.Wireless.Channel5, n.Wireless.TxPower5, n.Wireless.Channel5, n.Wireless.TxPower5))
ssh.ExecuteOn(addr, "wifi") ssh.ExecuteOn(addr, "wifi")
log.Log.Info("[cmd] wifi", n.NodeID) // send warning for running wifi, because it kicks clients from node
if oldnode != nil { log.Log.Warn("[cmd] wifi ", n.NodeID)
oldnode.Wireless = n.Wireless
}
} }
oldnode = n
} }
func (n *Node) GetAddress(iface string) net.TCPAddr { func (n *Node) GetAddress(iface string) net.TCPAddr {
return net.TCPAddr{IP: n.Address, Port: 22, Zone: iface} return net.TCPAddr{IP: n.Address, Port: 22, Zone: iface}
@ -115,48 +115,16 @@ func (n *Node) IsEqual(node *Node) bool {
if n.Owner != node.Owner { if n.Owner != node.Owner {
return false return false
} }
if !locationEqual(&n.Location, &node.Location) { if !locationEqual(n.Location, node.Location) {
return false return false
} }
if !wirelessEqual(&n.Wireless, &node.Wireless) { if !wirelessEqual(n.Wireless, node.Wireless) {
return false
}
return true
}
func (n *Node) IsEqualNode(node *yanicRuntime.Node) bool {
nodeinfo := node.Nodeinfo
if nodeinfo == nil {
return false
}
owner := nodeinfo.Owner
if owner == nil {
return false
}
if n.NodeID != nodeinfo.NodeID {
return false
}
if !bytes.Equal(n.Address, node.Address) {
return false
}
if n.Hostname != nodeinfo.Hostname {
return false
}
if n.Owner != owner.Contact {
return false
}
if !locationEqual(&n.Location, nodeinfo.Location) {
return false
}
if !wirelessEqual(&n.Wireless, nodeinfo.Wireless) {
return false return false
} }
return true return true
} }
func locationEqual(a, b *data.Location) bool { func locationEqual(a, b data.Location) bool {
if a == nil || b == nil {
return false
}
if a.Latitude != b.Latitude { if a.Latitude != b.Latitude {
return false return false
} }
@ -169,10 +137,7 @@ func locationEqual(a, b *data.Location) bool {
return true return true
} }
func wirelessEqual(a, b *data.Wireless) bool { func wirelessEqual(a, b data.Wireless) bool {
if a == nil || b == nil {
return false
}
if a.Channel24 != b.Channel24 { if a.Channel24 != b.Channel24 {
return false return false
} }

View File

@ -26,10 +26,8 @@ func TestNode(t *testing.T) {
n2 := NewNode(node1) n2 := NewNode(node1)
assert.True(n2.IsEqual(n1)) assert.True(n2.IsEqual(n1))
assert.True(n2.IsEqualNode(node1))
node1.Nodeinfo.Owner.Contact = "blub2" node1.Nodeinfo.Owner.Contact = "blub2"
assert.False(n2.IsEqualNode(node1))
n2.Update(node1) n2.Update(node1)
assert.False(n2.IsEqual(n1)) assert.False(n2.IsEqual(n1))
} }

View File

@ -58,7 +58,7 @@ func (nodes *Nodes) LearnNode(n *yanic.Node) {
// session := nodes.ssh.ConnectTo(node.Address) // session := nodes.ssh.ConnectTo(node.Address)
result, err := nodes.ssh.RunOn(node.GetAddress(nodes.iface), "uptime") result, err := nodes.ssh.RunOn(node.GetAddress(nodes.iface), "uptime")
if err != nil { if err != nil {
logger.Error("init ssh command not run", err) logger.Debug("init ssh command not run", err)
return return
} }
uptime := ssh.SSHResultToString(result) uptime := ssh.SSHResultToString(result)
@ -109,7 +109,7 @@ func (nodes *Nodes) Updater() {
} }
func (nodes *Nodes) load() { func (nodes *Nodes) load() {
if f, err := os.Open(nodes.statePath); err == nil { // transform data to legacy meshviewer if f, err := os.Open(nodes.statePath); err == nil {
if err = json.NewDecoder(f).Decode(nodes); err == nil { if err = json.NewDecoder(f).Decode(nodes); err == nil {
log.Log.Infof("loaded %d nodes", len(nodes.List)) log.Log.Infof("loaded %d nodes", len(nodes.List))
} else { } else {

View File

@ -10,6 +10,7 @@ type List struct {
Command string `json:"cmd"` Command string `json:"cmd"`
Clients map[string]*ListResult `json:"clients"` Clients map[string]*ListResult `json:"clients"`
sshManager *Manager sshManager *Manager
sync.Mutex
} }
type ListResult struct { type ListResult struct {
ssh *ssh.Client ssh *ssh.Client
@ -24,6 +25,8 @@ func (m *Manager) CreateList(cmd string) *List {
sshManager: m, sshManager: m,
Clients: make(map[string]*ListResult), Clients: make(map[string]*ListResult),
} }
m.clientsMUX.Lock()
defer m.clientsMUX.Unlock()
for host, client := range m.clients { for host, client := range m.clients {
list.Clients[host] = &ListResult{Running: true, ssh: client} list.Clients[host] = &ListResult{Running: true, ssh: client}
} }
@ -43,6 +46,10 @@ func (l List) Run() {
func (l List) runlistelement(host string, client *ListResult, wg *sync.WaitGroup) { func (l List) runlistelement(host string, client *ListResult, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
result, err := l.sshManager.run(host, client.ssh, l.Command) result, err := l.sshManager.run(host, client.ssh, l.Command)
l.Lock()
defer l.Unlock()
client.Running = false client.Running = false
if err != nil { if err != nil {
client.WithError = true client.WithError = true

View File

@ -2,26 +2,23 @@ package ssh
import ( import (
"bytes" "bytes"
"io"
"net" "net"
"github.com/genofire/golang-lib/log" "github.com/genofire/golang-lib/log"
"golang.org/x/crypto/ssh" "golang.org/x/crypto/ssh"
) )
type SSHResultHandler func([]byte, error) type SSHResultHandler func(string, error)
type SSHResultStringHandler func(string, error) func SSHResultToString(result string) string {
func SSHResultToString(result []byte) string {
if len(result) > 0 { if len(result) > 0 {
result = result[:len(result)-1] result = result[:len(result)-1]
} }
return string(result) return result
} }
func SSHResultToStringHandler(handler SSHResultStringHandler) SSHResultHandler { func SSHResultToStringHandler(handler SSHResultHandler) SSHResultHandler {
return func(result []byte, err error) { return func(result string, err error) {
handler(SSHResultToString(result), err) handler(SSHResultToString(result), err)
} }
} }
@ -33,15 +30,15 @@ func (m *Manager) RunEverywhere(cmd string, handler SSHResultHandler) {
} }
} }
func (m *Manager) RunOn(addr net.TCPAddr, cmd string) ([]byte, error) { func (m *Manager) RunOn(addr net.TCPAddr, cmd string) (string, error) {
client, err := m.ConnectTo(addr) client, err := m.ConnectTo(addr)
if err != nil { if err != nil {
return nil, err return "", err
} }
return m.run(addr.IP.String(), client, cmd) return m.run(addr.IP.String(), client, cmd)
} }
func (m *Manager) run(host string, client *ssh.Client, cmd string) ([]byte, error) { func (m *Manager) run(host string, client *ssh.Client, cmd string) (string, error) {
session, err := client.NewSession() session, err := client.NewSession()
defer session.Close() defer session.Close()
@ -50,28 +47,19 @@ func (m *Manager) run(host string, client *ssh.Client, cmd string) ([]byte, erro
m.clientsMUX.Lock() m.clientsMUX.Lock()
delete(m.clients, host) delete(m.clients, host)
m.clientsMUX.Unlock() m.clientsMUX.Unlock()
return nil, err return "", err
} }
stdout, err := session.StdoutPipe()
buffer := &bytes.Buffer{} buffer := &bytes.Buffer{}
go io.Copy(buffer, stdout) session.Stdout = buffer
if err != nil { if err != nil {
log.Log.Warnf("can not create pipe for run on %s: %s", host, err) log.Log.Warnf("can not create pipe for run on %s: %s", host, err)
delete(m.clients, host) delete(m.clients, host)
return nil, err return "", err
} }
err = session.Run(cmd) err = session.Run(cmd)
if err != nil { if err != nil {
log.Log.Warnf("could not run %s on %s: %s", cmd, host, err) log.Log.Warnf("could not run %s on %s: %s", cmd, host, err)
return nil, err return "", err
} }
var result []byte return buffer.String(), nil
for {
b, err := buffer.ReadByte()
if err != nil {
break
}
result = append(result, b)
}
return result, nil
} }

View File

@ -86,7 +86,9 @@ func (c *Client) handleMessage(msg *Message) {
} }
cmd := commands.AddCommand(msg.Command) cmd := commands.AddCommand(msg.Command)
w := worker.NewWorker(time.Millisecond*300, func() { w := worker.NewWorker(time.Millisecond*300, func() {
cmd.Lock()
SendAll(Message{Type: MessageTypeCommand, Command: cmd}) SendAll(Message{Type: MessageTypeCommand, Command: cmd})
cmd.Unlock()
}) })
go w.Start() go w.Start()
go cmd.Run(func() { go cmd.Run(func() {