diff --git a/announced.go b/announced.go new file mode 100644 index 0000000..c57534c --- /dev/null +++ b/announced.go @@ -0,0 +1,32 @@ +package main +import ( + "time" +) + +type Announced struct { + NodeServer *NodeServer + nodes *Nodes + outputFile string + collectInterval time.Duration + saveInterval time.Duration + collectors []*Collector +} + +func NewAnnounced(ns NodeServer) *Announced { + collects := []*Collector{ + NewCollector("statistics"), + NewCollector("nodeinfo"), + NewCollector("neighbours"), + } + return &Announced{ + ns, + NewNodes(), + output, + time.Second * time.Duration(15), + time.Second * time.Duration(15), + collects, + } +} +func (announced *Announced) Run() { + +} diff --git a/collector.go b/collector.go index 56dd7e3..3e9d085 100644 --- a/collector.go +++ b/collector.go @@ -73,7 +73,9 @@ func (coll *Collector) sendOnce() { func (coll *Collector) sendPacket(address string) { addr, err := net.ResolveUDPAddr("udp", address) - check(err) + if err != nil { + log.Panic(err) + } coll.connection.WriteToUDP([]byte(coll.collectType), addr) } diff --git a/main.go b/main.go index a9bbc95..ecb7d95 100644 --- a/main.go +++ b/main.go @@ -1,51 +1,19 @@ package main import ( - "flag" "log" - "os" - "os/signal" - "syscall" - "time" + "net/http" ) -var ( - nodes = NewNodes() - outputFile string - collectInterval time.Duration - saveInterval time.Duration -) +func main(){ + node := NewNodeServer("/nodes") + go node.Listen() + + annouced := NewAnnouced(node) + go annouced.Run() -func main() { - var collectSeconds, saveSeconds int + // static files + http.Handle("/", http.FileServer(http.Dir("webroot"))) - flag.StringVar(&outputFile, "output", "nodes.json", "path output file") - flag.IntVar(&collectSeconds, "collectInterval", 15, "interval for data collections") - flag.IntVar(&saveSeconds, "saveInterval", 5, "interval for data saving") - flag.Parse() - - collectInterval = time.Second * time.Duration(collectSeconds) - saveInterval = time.Second * time.Duration(saveSeconds) - - collectors := []*Collector{ - NewCollector("statistics"), - NewCollector("nodeinfo"), - NewCollector("neighbours"), - } - - // Wait for SIGINT or SIGTERM - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - sig := <-sigs - log.Println("received", sig) - - for _, c := range collectors { - c.Close() - } -} - -func check(e error) { - if e != nil { - log.Panic(e) - } + log.Fatal(http.ListenAndServe(":8080", nil)) } diff --git a/nodeclient.go b/nodeclient.go new file mode 100644 index 0000000..addbf8e --- /dev/null +++ b/nodeclient.go @@ -0,0 +1,82 @@ +package main + +import ( + "fmt" + "log" + + "golang.org/x/net/websocket" +) + +const channelBufSize = 100 + +var maxId int = 0 + +// Node client. +type NodeClient struct { + id int + ws *websocket.Conn + server *NodeServer + ch chan *Node + doneCh chan bool +} + +// Create new node client. +func NewNodeClient(ws *websocket.Conn, server *NodeServer) *NodeClient { + + if ws == nil { + panic("ws cannot be nil") + } + + if server == nil { + panic("server cannot be nil") + } + + maxId++ + ch := make(chan *Node, channelBufSize) + doneCh := make(chan bool) + + return &NodeClient{maxId, ws, server, ch, doneCh} +} + +func (c *NodeClient) Conn() *websocket.Conn { + return c.ws +} + +func (c *NodeClient) Write(node *Node) { + select { + case c.ch <- node: + default: + c.server.Del(c) + err := fmt.Errorf("NodeClient %d is disconnected.", c.id) + c.server.Err(err) + } +} + +func (c *NodeClient) Done() { + c.doneCh <- true +} + +// Listen Write and Read request via chanel +func (c *NodeClient) Listen() { + go c.listenWrite() +} + +// Listen write request via chanel +func (c *NodeClient) listenWrite() { + log.Println("Listening write to NodeClient") + for { + select { + + // send message to the client + case node := <-c.ch: + log.Println("Send:", node) + websocket.JSON.Send(c.ws, node) + + // receive done request + case <-c.doneCh: + c.server.Del(c) + c.doneCh <- true // for listenRead method + return + } + } +} diff --git a/nodes.go b/nodes.go index 48ae29b..2f16f07 100644 --- a/nodes.go +++ b/nodes.go @@ -69,7 +69,9 @@ func (nodes *Nodes) save() { data, err := json.Marshal(nodes) nodes.Unlock() - check(err) + if err !=nil{ + log.Panic(e) + } log.Println("saving", len(nodes.List), "nodes") tmpFile := outputFile + ".tmp" diff --git a/nodeserver.go b/nodeserver.go new file mode 100644 index 0000000..ed69862 --- /dev/null +++ b/nodeserver.go @@ -0,0 +1,115 @@ +package main + +import ( + "log" + "net/http" + + "golang.org/x/net/websocket" +) + +// Node server. +type NodeServer struct { + pattern string + clients map[int]*NodeClient + addCh chan *NodeClient + delCh chan *NodeClient + sendAllCh chan *Node + doneCh chan bool + errCh chan error +} + +// Create new node server. +func NewNodeServer(pattern string) *NodeServer { + clients := make(map[int]*NodeClient) + addCh := make(chan *NodeClient) + delCh := make(chan *NodeClient) + sendAllCh := make(chan *Node) + doneCh := make(chan bool) + errCh := make(chan error) + + return &NodeServer{ + pattern, + clients, + addCh, + delCh, + sendAllCh, + doneCh, + errCh, + } +} + +func (s *NodeServer) Add(c *NodeClient) { + s.addCh <- c +} + +func (s *NodeServer) Del(c *NodeClient) { + s.delCh <- c +} + +func (s *NodeServer) SendAll(node *Node) { + s.sendAllCh <- node +} + +func (s *NodeServer) Done() { + s.doneCh <- true +} + +func (s *NodeServer) Err(err error) { + s.errCh <- err +} + +func (s *NodeServer) sendAll(node *Node) { + for _, c := range s.clients { + c.Write(node) + } +} + +// Listen and serve. +// It serves client connection and broadcast request. +func (s *NodeServer) Listen() { + + log.Println("Listening NodeServer...") + + // websocket handler + onConnected := func(ws *websocket.Conn) { + defer func() { + err := ws.Close() + if err != nil { + s.errCh <- err + } + }() + + client := NewNodeClient(ws, s) + s.Add(client) + client.Listen() + } + http.Handle(s.pattern, websocket.Handler(onConnected)) + log.Println("Created handler") + + for { + select { + + // Add new a client + case c := <-s.addCh: + log.Println("Added new client") + s.clients[c.id] = c + log.Println("Now", len(s.clients), "clients connected.") + + // del a client + case c := <-s.delCh: + log.Println("Delete client") + delete(s.clients, c.id) + + // broadcast message for all clients + case node := <-s.sendAllCh: + log.Println("Send all:", node) + s.sendAll(node) + + case err := <-s.errCh: + log.Println("Error:", err.Error()) + + case <-s.doneCh: + return + } + } +}