This commit is contained in:
Martin Geno 2016-02-19 11:13:30 +01:00
parent 66fbac4c2c
commit 708e8c4563
6 changed files with 245 additions and 44 deletions

32
announced.go Normal file
View File

@ -0,0 +1,32 @@
package main
import (
"time"
)
type Announced struct {
NodeServer *NodeServer
nodes *Nodes
outputFile string
collectInterval time.Duration
saveInterval time.Duration
collectors []*Collector
}
func NewAnnounced(ns NodeServer) *Announced {
collects := []*Collector{
NewCollector("statistics"),
NewCollector("nodeinfo"),
NewCollector("neighbours"),
}
return &Announced{
ns,
NewNodes(),
output,
time.Second * time.Duration(15),
time.Second * time.Duration(15),
collects,
}
}
func (announced *Announced) Run() {
}

View File

@ -73,7 +73,9 @@ func (coll *Collector) sendOnce() {
func (coll *Collector) sendPacket(address string) {
addr, err := net.ResolveUDPAddr("udp", address)
check(err)
if err != nil {
log.Panic(err)
}
coll.connection.WriteToUDP([]byte(coll.collectType), addr)
}

48
main.go
View File

@ -1,51 +1,19 @@
package main
import (
"flag"
"log"
"os"
"os/signal"
"syscall"
"time"
)
var (
nodes = NewNodes()
outputFile string
collectInterval time.Duration
saveInterval time.Duration
"net/http"
)
func main(){
var collectSeconds, saveSeconds int
node := NewNodeServer("/nodes")
go node.Listen()
flag.StringVar(&outputFile, "output", "nodes.json", "path output file")
flag.IntVar(&collectSeconds, "collectInterval", 15, "interval for data collections")
flag.IntVar(&saveSeconds, "saveInterval", 5, "interval for data saving")
flag.Parse()
annouced := NewAnnouced(node)
go annouced.Run()
collectInterval = time.Second * time.Duration(collectSeconds)
saveInterval = time.Second * time.Duration(saveSeconds)
// static files
http.Handle("/", http.FileServer(http.Dir("webroot")))
collectors := []*Collector{
NewCollector("statistics"),
NewCollector("nodeinfo"),
NewCollector("neighbours"),
}
// Wait for SIGINT or SIGTERM
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
sig := <-sigs
log.Println("received", sig)
for _, c := range collectors {
c.Close()
}
}
func check(e error) {
if e != nil {
log.Panic(e)
}
log.Fatal(http.ListenAndServe(":8080", nil))
}

82
nodeclient.go Normal file
View File

@ -0,0 +1,82 @@
package main
import (
"fmt"
"log"
"golang.org/x/net/websocket"
)
const channelBufSize = 100
var maxId int = 0
// Node client.
type NodeClient struct {
id int
ws *websocket.Conn
server *NodeServer
ch chan *Node
doneCh chan bool
}
// Create new node client.
func NewNodeClient(ws *websocket.Conn, server *NodeServer) *NodeClient {
if ws == nil {
panic("ws cannot be nil")
}
if server == nil {
panic("server cannot be nil")
}
maxId++
ch := make(chan *Node, channelBufSize)
doneCh := make(chan bool)
return &NodeClient{maxId, ws, server, ch, doneCh}
}
func (c *NodeClient) Conn() *websocket.Conn {
return c.ws
}
func (c *NodeClient) Write(node *Node) {
select {
case c.ch <- node:
default:
c.server.Del(c)
err := fmt.Errorf("NodeClient %d is disconnected.", c.id)
c.server.Err(err)
}
}
func (c *NodeClient) Done() {
c.doneCh <- true
}
// Listen Write and Read request via chanel
func (c *NodeClient) Listen() {
go c.listenWrite()
}
// Listen write request via chanel
func (c *NodeClient) listenWrite() {
log.Println("Listening write to NodeClient")
for {
select {
// send message to the client
case node := <-c.ch:
log.Println("Send:", node)
websocket.JSON.Send(c.ws, node)
// receive done request
case <-c.doneCh:
c.server.Del(c)
c.doneCh <- true // for listenRead method
return
}
}
}

View File

@ -69,7 +69,9 @@ func (nodes *Nodes) save() {
data, err := json.Marshal(nodes)
nodes.Unlock()
check(err)
if err !=nil{
log.Panic(e)
}
log.Println("saving", len(nodes.List), "nodes")
tmpFile := outputFile + ".tmp"

115
nodeserver.go Normal file
View File

@ -0,0 +1,115 @@
package main
import (
"log"
"net/http"
"golang.org/x/net/websocket"
)
// Node server.
type NodeServer struct {
pattern string
clients map[int]*NodeClient
addCh chan *NodeClient
delCh chan *NodeClient
sendAllCh chan *Node
doneCh chan bool
errCh chan error
}
// Create new node server.
func NewNodeServer(pattern string) *NodeServer {
clients := make(map[int]*NodeClient)
addCh := make(chan *NodeClient)
delCh := make(chan *NodeClient)
sendAllCh := make(chan *Node)
doneCh := make(chan bool)
errCh := make(chan error)
return &NodeServer{
pattern,
clients,
addCh,
delCh,
sendAllCh,
doneCh,
errCh,
}
}
func (s *NodeServer) Add(c *NodeClient) {
s.addCh <- c
}
func (s *NodeServer) Del(c *NodeClient) {
s.delCh <- c
}
func (s *NodeServer) SendAll(node *Node) {
s.sendAllCh <- node
}
func (s *NodeServer) Done() {
s.doneCh <- true
}
func (s *NodeServer) Err(err error) {
s.errCh <- err
}
func (s *NodeServer) sendAll(node *Node) {
for _, c := range s.clients {
c.Write(node)
}
}
// Listen and serve.
// It serves client connection and broadcast request.
func (s *NodeServer) Listen() {
log.Println("Listening NodeServer...")
// websocket handler
onConnected := func(ws *websocket.Conn) {
defer func() {
err := ws.Close()
if err != nil {
s.errCh <- err
}
}()
client := NewNodeClient(ws, s)
s.Add(client)
client.Listen()
}
http.Handle(s.pattern, websocket.Handler(onConnected))
log.Println("Created handler")
for {
select {
// Add new a client
case c := <-s.addCh:
log.Println("Added new client")
s.clients[c.id] = c
log.Println("Now", len(s.clients), "clients connected.")
// del a client
case c := <-s.delCh:
log.Println("Delete client")
delete(s.clients, c.id)
// broadcast message for all clients
case node := <-s.sendAllCh:
log.Println("Send all:", node)
s.sendAll(node)
case err := <-s.errCh:
log.Println("Error:", err.Error())
case <-s.doneCh:
return
}
}
}