From dc5569cea58430dcdea011e18c49f79dea0ccda8 Mon Sep 17 00:00:00 2001 From: Martin Geno Date: Thu, 25 Feb 2016 21:06:37 +0100 Subject: [PATCH] resturctur everything -> commit with a bug --- announced.go | 32 --------- main.go | 79 ++++++++++++++++------ nodes.go => models/nodes.go | 18 +++-- nodeclient.go | 79 ---------------------- collector.go => responed/collector.go | 69 ++++++------------- responed/daemon.go | 34 ++++++++++ websocketserver/client.go | 69 +++++++++++++++++++ nodeserver.go => websocketserver/server.go | 61 +++++++++-------- 8 files changed, 225 insertions(+), 216 deletions(-) delete mode 100644 announced.go rename nodes.go => models/nodes.go (82%) delete mode 100644 nodeclient.go rename collector.go => responed/collector.go (54%) create mode 100644 responed/daemon.go create mode 100644 websocketserver/client.go rename nodeserver.go => websocketserver/server.go (54%) diff --git a/announced.go b/announced.go deleted file mode 100644 index 1c3eb29..0000000 --- a/announced.go +++ /dev/null @@ -1,32 +0,0 @@ -package main -import ( - "time" -) - -type Announced struct { - NodeServer *NodeServer - nodes *Nodes - outputFile string - collectInterval time.Duration - saveInterval time.Duration - collectors []*Collector -} - -func NewAnnounced(ns *NodeServer) *Announced { - collects := []*Collector{ - NewCollector("statistics"), - NewCollector("nodeinfo"), - NewCollector("neighbours"), - } - return &Announced{ - ns, - NewNodes(), - "webroot/nodes.json", - time.Second * time.Duration(15), - time.Second * time.Duration(15), - collects, - } -} -func (announced *Announced) Run() { - -} diff --git a/main.go b/main.go index 42ba6ea..c519dca 100644 --- a/main.go +++ b/main.go @@ -1,19 +1,32 @@ package main import ( + "encoding/json" "flag" "log" - "time" "net/http" + "os" + "os/signal" + "reflect" + "strings" + "syscall" + "time" + + "github.com/monitormap/micro-daemon/models" + "github.com/monitormap/micro-daemon/responed" + "github.com/monitormap/micro-daemon/websocketserver" ) + var ( - nodeserver = NewNodeServer("/nodes") - nodes = NewNodes() - outputFile string - collectInterval time.Duration - saveInterval time.Duration + wsserverForNodes = websocketserver.NewServer("/nodes") + responedDaemon *responed.Daemon + nodes = models.NewNodes() + outputFile string + collectInterval time.Duration + saveInterval time.Duration ) -func main(){ + +func main() { var collectSeconds, saveSeconds int flag.StringVar(&outputFile, "output", "webroot/nodes.json", "path output file") @@ -24,19 +37,47 @@ func main(){ collectInterval = time.Second * time.Duration(collectSeconds) saveInterval = time.Second * time.Duration(saveSeconds) - collectors := []*Collector{ - NewCollector("statistics"), - NewCollector("nodeinfo"), - NewCollector("neighbours"), - } + go wsserverForNodes.Listen() + go nodes.Saver(outputFile, saveInterval) + responedDaemon = responed.NewDaemon(func(coll *responed.Collector, res *responed.Response) { + var result map[string]interface{} + json.Unmarshal(res.Raw, &result) + + nodeID, _ := result["node_id"].(string) + + if nodeID == "" { + log.Println("unable to parse node_id") + return + } + + node := nodes.Get(nodeID) + + // Set result + elem := reflect.ValueOf(node).Elem() + field := elem.FieldByName(strings.Title(coll.CollectType)) + + log.Println(field) + log.Println(result) + + if !reflect.DeepEqual(field, result) { + wsserverForNodes.SendAll(node) + } + + field.Set(reflect.ValueOf(result)) + }) + go responedDaemon.ListenAndSend(collectInterval) - go nodeserver.Listen() - - // static files http.Handle("/", http.FileServer(http.Dir("webroot"))) - + //TODO bad log.Fatal(http.ListenAndServe(":8080", nil)) - for _, c := range collectors { - c.Close() - } + + // Wait for End + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + sig := <-sigs + log.Println("received", sig) + + // Close everything at the end + wsserverForNodes.Close() + responedDaemon.Close() } diff --git a/nodes.go b/models/nodes.go similarity index 82% rename from nodes.go rename to models/nodes.go index 0069569..86d936a 100644 --- a/nodes.go +++ b/models/nodes.go @@ -1,4 +1,4 @@ -package main +package models import ( "encoding/json" @@ -30,12 +30,10 @@ func NewNodes() *Nodes { List: make(map[string]*Node), } - go nodes.saver() - return nodes } -func (nodes *Nodes) get(nodeId string) *Node { +func (nodes *Nodes) Get(nodeId string) *Node { now := time.Now() nodes.Lock() @@ -54,22 +52,22 @@ func (nodes *Nodes) get(nodeId string) *Node { return node } -func (nodes *Nodes) saver() { +func (nodes *Nodes) Saver(outputFile string, saveInterval time.Duration) { c := time.Tick(saveInterval) for range c { - nodes.save() + nodes.save(outputFile) } } -func (nodes *Nodes) save() { +func (nodes *Nodes) save(outputFile string) { nodes.Timestamp = time.Now() nodes.Lock() data, err := json.Marshal(nodes) nodes.Unlock() - if err !=nil{ + if err != nil { log.Panic(err) } log.Println("saving", len(nodes.List), "nodes") @@ -77,11 +75,11 @@ func (nodes *Nodes) save() { tmpFile := outputFile + ".tmp" err = ioutil.WriteFile(tmpFile, data, 0644) - if err !=nil{ + if err != nil { log.Panic(err) } err = os.Rename(tmpFile, outputFile) - if err !=nil{ + if err != nil { log.Panic(err) } } diff --git a/nodeclient.go b/nodeclient.go deleted file mode 100644 index 92dbd80..0000000 --- a/nodeclient.go +++ /dev/null @@ -1,79 +0,0 @@ -package main - -import ( - "fmt" - - "golang.org/x/net/websocket" -) - -const channelBufSize = 100 - -var maxId int = 0 - -// Node client. -type NodeClient struct { - id int - ws *websocket.Conn - server *NodeServer - ch chan *Node - doneCh chan bool -} - -// Create new node client. -func NewNodeClient(ws *websocket.Conn, server *NodeServer) *NodeClient { - - if ws == nil { - panic("ws cannot be nil") - } - - if server == nil { - panic("server cannot be nil") - } - - maxId++ - ch := make(chan *Node, channelBufSize) - doneCh := make(chan bool) - - return &NodeClient{maxId, ws, server, ch, doneCh} -} - -func (c *NodeClient) Conn() *websocket.Conn { - return c.ws -} - -func (c *NodeClient) Write(node *Node) { - select { - case c.ch <- node: - default: - c.server.Del(c) - err := fmt.Errorf("NodeClient %d is disconnected.", c.id) - c.server.Err(err) - } -} - -func (c *NodeClient) Done() { - c.doneCh <- true -} - -// Listen Write and Read request via chanel -func (c *NodeClient) Listen() { - c.listenWrite() -} - -// Listen write request via chanel -func (c *NodeClient) listenWrite() { - for { - select { - - // send message to the client - case node := <-c.ch: - websocket.JSON.Send(c.ws, node) - - // receive done request - case <-c.doneCh: - c.server.Del(c) - c.doneCh <- true // for listenRead method - return - } - } -} diff --git a/collector.go b/responed/collector.go similarity index 54% rename from collector.go rename to responed/collector.go index 07ebfd6..c4905b0 100644 --- a/collector.go +++ b/responed/collector.go @@ -1,39 +1,40 @@ -package main +package responed import ( - "encoding/json" "log" "net" - "reflect" - "strings" "time" ) const ( // default multicast group used by announced - MultiCastGroup string = "ff02:0:0:0:0:0:2:1001" + multiCastGroup string = "ff02:0:0:0:0:0:2:1001" // default udp port used by announced - Port string = "1001" + port string = "1001" // maximum receivable size - MaxDataGramSize int = 8192 + maxDataGramSize int = 8192 ) +//Response of the type Response struct { Address net.UDPAddr Raw []byte } +//Collector for a specificle responed messages type Collector struct { - collectType string + CollectType string connection *net.UDPConn // UDP socket queue chan *Response // received responses + parse func(coll *Collector, res *Response) } -func NewCollector(collectType string) *Collector { +//NewCollector creates a Collector struct +func NewCollector(CollectType string, parseFunc func(coll *Collector, res *Response)) *Collector { // Parse address - addr, err := net.ResolveUDPAddr("udp", "[::%wlp3s0]:0") + addr, err := net.ResolveUDPAddr("udp", "[::]:0") if err != nil { log.Panic(err) } @@ -43,16 +44,15 @@ func NewCollector(collectType string) *Collector { if err != nil { log.Panic(err) } - conn.SetReadBuffer(MaxDataGramSize) + conn.SetReadBuffer(maxDataGramSize) collector := &Collector{ - collectType: collectType, + CollectType: CollectType, connection: conn, queue: make(chan *Response, 400), + parse: parseFunc, } - go collector.sender() - go collector.receiver() go collector.parser() @@ -61,13 +61,14 @@ func NewCollector(collectType string) *Collector { return collector } +//Close Collector func (coll *Collector) Close() { coll.connection.Close() close(coll.queue) } func (coll *Collector) sendOnce() { - coll.sendPacket(net.JoinHostPort(MultiCastGroup,Port)) + coll.sendPacket(net.JoinHostPort(multiCastGroup, port)) log.Println("send request") } @@ -77,10 +78,10 @@ func (coll *Collector) sendPacket(address string) { log.Panic(err) } - coll.connection.WriteToUDP([]byte(coll.collectType), addr) + coll.connection.WriteToUDP([]byte(coll.CollectType), addr) } -func (coll *Collector) sender() { +func (coll *Collector) sender(collectInterval time.Duration) { c := time.Tick(collectInterval) for range c { @@ -91,40 +92,12 @@ func (coll *Collector) sender() { func (coll *Collector) parser() { for obj := range coll.queue { - coll.parse(obj) + coll.parse(coll, obj) } } -// Parses a response -func (coll *Collector) parse(res *Response) { - var result map[string]interface{} - json.Unmarshal(res.Raw, &result) - - nodeId, _ := result["node_id"].(string) - - if nodeId == "" { - log.Println("unable to parse node_id") - return - } - - node := nodes.get(nodeId) - - // Set result - elem := reflect.ValueOf(node).Elem() - field := elem.FieldByName(strings.Title(coll.collectType)) - - log.Println(field) - log.Println(result) - - if !reflect.DeepEqual(field,result){ - nodeserver.SendAll(node) - } - - field.Set(reflect.ValueOf(result)) -} - func (coll *Collector) receiver() { - buf := make([]byte, MaxDataGramSize) + buf := make([]byte, maxDataGramSize) for { n, src, err := coll.connection.ReadFromUDP(buf) @@ -140,6 +113,6 @@ func (coll *Collector) receiver() { Address: *src, Raw: raw, } - log.Println("received", coll.collectType, "from", src) + log.Println("received", coll.CollectType, "from", src) } } diff --git a/responed/daemon.go b/responed/daemon.go new file mode 100644 index 0000000..6348f62 --- /dev/null +++ b/responed/daemon.go @@ -0,0 +1,34 @@ +package responed + +import "time" + +//Daemon struct +type Daemon struct { + collectors []*Collector +} + +//NewDaemon create a list of collectors +func NewDaemon(parseFunc func(coll *Collector, res *Response)) *Daemon { + collectors := []*Collector{ + NewCollector("statistics", parseFunc), + NewCollector("nodeinfo", parseFunc), + NewCollector("neighbours", parseFunc), + } + return &Daemon{ + collectors, + } +} + +//ListenAndSend on Collection +func (daemon *Daemon) ListenAndSend(collectInterval time.Duration) { + for _, col := range daemon.collectors { + col.sender(collectInterval) + } +} + +//Close all Collections +func (daemon *Daemon) Close() { + for _, col := range daemon.collectors { + col.Close() + } +} diff --git a/websocketserver/client.go b/websocketserver/client.go new file mode 100644 index 0000000..c5de2b8 --- /dev/null +++ b/websocketserver/client.go @@ -0,0 +1,69 @@ +package websocketserver + +import ( + "fmt" + + "golang.org/x/net/websocket" +) + +const channelBufSize = 100 + +var maxID = 0 + +//Client struct +type Client struct { + id int + ws *websocket.Conn + server *Server + ch chan *struct{} + doneCh chan bool +} + +//NewClient creates a new Client +func NewClient(ws *websocket.Conn, server *Server) *Client { + + if ws == nil { + panic("ws cannot be nil") + } + + if server == nil { + panic("server cannot be nil") + } + + maxID++ + ch := make(chan *struct{}, channelBufSize) + doneCh := make(chan bool) + + return &Client{maxID, ws, server, ch, doneCh} +} + +//GetConnection the websocket connection of a listen client +func (c *Client) GetConnection() *websocket.Conn { + return c.ws +} + +//Write send the msg informations to the clients +func (c *Client) Write(msg *struct{}) { + select { + case c.ch <- msg: + default: + c.server.Del(c) + err := fmt.Errorf("Client %d is disconnected.", c.id) + c.server.Err(err) + } +} + +// Listen Write and Read request via chanel +func (c *Client) Listen() { + c.listen() +} + +// listen for new msg informations +func (c *Client) listen() { + for { + select { + case msg := <-c.ch: + websocket.JSON.Send(c.ws, msg) + } + } +} diff --git a/nodeserver.go b/websocketserver/server.go similarity index 54% rename from nodeserver.go rename to websocketserver/server.go index 9906ed0..6c27751 100644 --- a/nodeserver.go +++ b/websocketserver/server.go @@ -1,4 +1,4 @@ -package main +package websocketserver import ( "log" @@ -7,58 +7,63 @@ import ( "golang.org/x/net/websocket" ) -// Node server. -type NodeServer struct { +//Server struct +type Server struct { pattern string - clients map[int]*NodeClient - addCh chan *NodeClient - delCh chan *NodeClient - sendAllCh chan *Node - doneCh chan bool + clients map[int]*Client + addCh chan *Client + delCh chan *Client + sendAllCh chan *struct{} + closeCh chan bool errCh chan error } -// Create new node server. -func NewNodeServer(pattern string) *NodeServer { - clients := make(map[int]*NodeClient) - addCh := make(chan *NodeClient) - delCh := make(chan *NodeClient) - sendAllCh := make(chan *Node) - doneCh := make(chan bool) +//NewServer creates a new node server +func NewServer(pattern string) *Server { + clients := make(map[int]*Client) + addCh := make(chan *Client) + delCh := make(chan *Client) + sendAllCh := make(chan *struct{}) + closeCh := make(chan bool) errCh := make(chan error) - return &NodeServer{ + return &Server{ pattern, clients, addCh, delCh, sendAllCh, - doneCh, + closeCh, errCh, } } -func (s *NodeServer) Add(c *NodeClient) { +//Add a node listen client +func (s *Server) Add(c *Client) { s.addCh <- c } -func (s *NodeServer) Del(c *NodeClient) { +//Del a node listen client +func (s *Server) Del(c *Client) { s.delCh <- c } -func (s *NodeServer) SendAll(node *Node) { +//SendAll to all listen clients refreshed information of a node +func (s *Server) SendAll(node *struct{}) { s.sendAllCh <- node } -func (s *NodeServer) Done() { - s.doneCh <- true +//Close stops node server +func (s *Server) Close() { + s.closeCh <- true } -func (s *NodeServer) Err(err error) { +//Err send to server +func (s *Server) Err(err error) { s.errCh <- err } -func (s *NodeServer) sendAll(node *Node) { +func (s *Server) sendAll(node *struct{}) { for _, c := range s.clients { c.Write(node) } @@ -66,9 +71,9 @@ func (s *NodeServer) sendAll(node *Node) { // Listen and serve. // It serves client connection and broadcast request. -func (s *NodeServer) Listen() { +func (s *Server) Listen() { - log.Println("Listening NodeServer...") + log.Println("Listening Server...") // websocket handler onConnected := func(ws *websocket.Conn) { @@ -79,7 +84,7 @@ func (s *NodeServer) Listen() { } }() - client := NewNodeClient(ws, s) + client := NewClient(ws, s) s.Add(client) client.Listen() } @@ -107,7 +112,7 @@ func (s *NodeServer) Listen() { case err := <-s.errCh: log.Println("Error:", err.Error()) - case <-s.doneCh: + case <-s.closeCh: return } }