resturctur everything -> commit with a bug

This commit is contained in:
Martin Geno 2016-02-25 21:06:37 +01:00
parent 46d11abe45
commit dc5569cea5
8 changed files with 225 additions and 216 deletions

View File

@ -1,32 +0,0 @@
package main
import (
"time"
)
type Announced struct {
NodeServer *NodeServer
nodes *Nodes
outputFile string
collectInterval time.Duration
saveInterval time.Duration
collectors []*Collector
}
func NewAnnounced(ns *NodeServer) *Announced {
collects := []*Collector{
NewCollector("statistics"),
NewCollector("nodeinfo"),
NewCollector("neighbours"),
}
return &Announced{
ns,
NewNodes(),
"webroot/nodes.json",
time.Second * time.Duration(15),
time.Second * time.Duration(15),
collects,
}
}
func (announced *Announced) Run() {
}

67
main.go
View File

@ -1,18 +1,31 @@
package main package main
import ( import (
"encoding/json"
"flag" "flag"
"log" "log"
"time"
"net/http" "net/http"
"os"
"os/signal"
"reflect"
"strings"
"syscall"
"time"
"github.com/monitormap/micro-daemon/models"
"github.com/monitormap/micro-daemon/responed"
"github.com/monitormap/micro-daemon/websocketserver"
) )
var ( var (
nodeserver = NewNodeServer("/nodes") wsserverForNodes = websocketserver.NewServer("/nodes")
nodes = NewNodes() responedDaemon *responed.Daemon
nodes = models.NewNodes()
outputFile string outputFile string
collectInterval time.Duration collectInterval time.Duration
saveInterval time.Duration saveInterval time.Duration
) )
func main() { func main() {
var collectSeconds, saveSeconds int var collectSeconds, saveSeconds int
@ -24,19 +37,47 @@ func main(){
collectInterval = time.Second * time.Duration(collectSeconds) collectInterval = time.Second * time.Duration(collectSeconds)
saveInterval = time.Second * time.Duration(saveSeconds) saveInterval = time.Second * time.Duration(saveSeconds)
collectors := []*Collector{ go wsserverForNodes.Listen()
NewCollector("statistics"), go nodes.Saver(outputFile, saveInterval)
NewCollector("nodeinfo"), responedDaemon = responed.NewDaemon(func(coll *responed.Collector, res *responed.Response) {
NewCollector("neighbours"), var result map[string]interface{}
json.Unmarshal(res.Raw, &result)
nodeID, _ := result["node_id"].(string)
if nodeID == "" {
log.Println("unable to parse node_id")
return
} }
go nodeserver.Listen() node := nodes.Get(nodeID)
// Set result
elem := reflect.ValueOf(node).Elem()
field := elem.FieldByName(strings.Title(coll.CollectType))
log.Println(field)
log.Println(result)
if !reflect.DeepEqual(field, result) {
wsserverForNodes.SendAll(node)
}
field.Set(reflect.ValueOf(result))
})
go responedDaemon.ListenAndSend(collectInterval)
// static files
http.Handle("/", http.FileServer(http.Dir("webroot"))) http.Handle("/", http.FileServer(http.Dir("webroot")))
//TODO bad
log.Fatal(http.ListenAndServe(":8080", nil)) log.Fatal(http.ListenAndServe(":8080", nil))
for _, c := range collectors {
c.Close() // Wait for End
} sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
sig := <-sigs
log.Println("received", sig)
// Close everything at the end
wsserverForNodes.Close()
responedDaemon.Close()
} }

View File

@ -1,4 +1,4 @@
package main package models
import ( import (
"encoding/json" "encoding/json"
@ -30,12 +30,10 @@ func NewNodes() *Nodes {
List: make(map[string]*Node), List: make(map[string]*Node),
} }
go nodes.saver()
return nodes return nodes
} }
func (nodes *Nodes) get(nodeId string) *Node { func (nodes *Nodes) Get(nodeId string) *Node {
now := time.Now() now := time.Now()
nodes.Lock() nodes.Lock()
@ -54,15 +52,15 @@ func (nodes *Nodes) get(nodeId string) *Node {
return node return node
} }
func (nodes *Nodes) saver() { func (nodes *Nodes) Saver(outputFile string, saveInterval time.Duration) {
c := time.Tick(saveInterval) c := time.Tick(saveInterval)
for range c { for range c {
nodes.save() nodes.save(outputFile)
} }
} }
func (nodes *Nodes) save() { func (nodes *Nodes) save(outputFile string) {
nodes.Timestamp = time.Now() nodes.Timestamp = time.Now()
nodes.Lock() nodes.Lock()

View File

@ -1,79 +0,0 @@
package main
import (
"fmt"
"golang.org/x/net/websocket"
)
const channelBufSize = 100
var maxId int = 0
// Node client.
type NodeClient struct {
id int
ws *websocket.Conn
server *NodeServer
ch chan *Node
doneCh chan bool
}
// Create new node client.
func NewNodeClient(ws *websocket.Conn, server *NodeServer) *NodeClient {
if ws == nil {
panic("ws cannot be nil")
}
if server == nil {
panic("server cannot be nil")
}
maxId++
ch := make(chan *Node, channelBufSize)
doneCh := make(chan bool)
return &NodeClient{maxId, ws, server, ch, doneCh}
}
func (c *NodeClient) Conn() *websocket.Conn {
return c.ws
}
func (c *NodeClient) Write(node *Node) {
select {
case c.ch <- node:
default:
c.server.Del(c)
err := fmt.Errorf("NodeClient %d is disconnected.", c.id)
c.server.Err(err)
}
}
func (c *NodeClient) Done() {
c.doneCh <- true
}
// Listen Write and Read request via chanel
func (c *NodeClient) Listen() {
c.listenWrite()
}
// Listen write request via chanel
func (c *NodeClient) listenWrite() {
for {
select {
// send message to the client
case node := <-c.ch:
websocket.JSON.Send(c.ws, node)
// receive done request
case <-c.doneCh:
c.server.Del(c)
c.doneCh <- true // for listenRead method
return
}
}
}

View File

@ -1,39 +1,40 @@
package main package responed
import ( import (
"encoding/json"
"log" "log"
"net" "net"
"reflect"
"strings"
"time" "time"
) )
const ( const (
// default multicast group used by announced // default multicast group used by announced
MultiCastGroup string = "ff02:0:0:0:0:0:2:1001" multiCastGroup string = "ff02:0:0:0:0:0:2:1001"
// default udp port used by announced // default udp port used by announced
Port string = "1001" port string = "1001"
// maximum receivable size // maximum receivable size
MaxDataGramSize int = 8192 maxDataGramSize int = 8192
) )
//Response of the
type Response struct { type Response struct {
Address net.UDPAddr Address net.UDPAddr
Raw []byte Raw []byte
} }
//Collector for a specificle responed messages
type Collector struct { type Collector struct {
collectType string CollectType string
connection *net.UDPConn // UDP socket connection *net.UDPConn // UDP socket
queue chan *Response // received responses queue chan *Response // received responses
parse func(coll *Collector, res *Response)
} }
func NewCollector(collectType string) *Collector { //NewCollector creates a Collector struct
func NewCollector(CollectType string, parseFunc func(coll *Collector, res *Response)) *Collector {
// Parse address // Parse address
addr, err := net.ResolveUDPAddr("udp", "[::%wlp3s0]:0") addr, err := net.ResolveUDPAddr("udp", "[::]:0")
if err != nil { if err != nil {
log.Panic(err) log.Panic(err)
} }
@ -43,16 +44,15 @@ func NewCollector(collectType string) *Collector {
if err != nil { if err != nil {
log.Panic(err) log.Panic(err)
} }
conn.SetReadBuffer(MaxDataGramSize) conn.SetReadBuffer(maxDataGramSize)
collector := &Collector{ collector := &Collector{
collectType: collectType, CollectType: CollectType,
connection: conn, connection: conn,
queue: make(chan *Response, 400), queue: make(chan *Response, 400),
parse: parseFunc,
} }
go collector.sender()
go collector.receiver() go collector.receiver()
go collector.parser() go collector.parser()
@ -61,13 +61,14 @@ func NewCollector(collectType string) *Collector {
return collector return collector
} }
//Close Collector
func (coll *Collector) Close() { func (coll *Collector) Close() {
coll.connection.Close() coll.connection.Close()
close(coll.queue) close(coll.queue)
} }
func (coll *Collector) sendOnce() { func (coll *Collector) sendOnce() {
coll.sendPacket(net.JoinHostPort(MultiCastGroup,Port)) coll.sendPacket(net.JoinHostPort(multiCastGroup, port))
log.Println("send request") log.Println("send request")
} }
@ -77,10 +78,10 @@ func (coll *Collector) sendPacket(address string) {
log.Panic(err) log.Panic(err)
} }
coll.connection.WriteToUDP([]byte(coll.collectType), addr) coll.connection.WriteToUDP([]byte(coll.CollectType), addr)
} }
func (coll *Collector) sender() { func (coll *Collector) sender(collectInterval time.Duration) {
c := time.Tick(collectInterval) c := time.Tick(collectInterval)
for range c { for range c {
@ -91,40 +92,12 @@ func (coll *Collector) sender() {
func (coll *Collector) parser() { func (coll *Collector) parser() {
for obj := range coll.queue { for obj := range coll.queue {
coll.parse(obj) coll.parse(coll, obj)
} }
} }
// Parses a response
func (coll *Collector) parse(res *Response) {
var result map[string]interface{}
json.Unmarshal(res.Raw, &result)
nodeId, _ := result["node_id"].(string)
if nodeId == "" {
log.Println("unable to parse node_id")
return
}
node := nodes.get(nodeId)
// Set result
elem := reflect.ValueOf(node).Elem()
field := elem.FieldByName(strings.Title(coll.collectType))
log.Println(field)
log.Println(result)
if !reflect.DeepEqual(field,result){
nodeserver.SendAll(node)
}
field.Set(reflect.ValueOf(result))
}
func (coll *Collector) receiver() { func (coll *Collector) receiver() {
buf := make([]byte, MaxDataGramSize) buf := make([]byte, maxDataGramSize)
for { for {
n, src, err := coll.connection.ReadFromUDP(buf) n, src, err := coll.connection.ReadFromUDP(buf)
@ -140,6 +113,6 @@ func (coll *Collector) receiver() {
Address: *src, Address: *src,
Raw: raw, Raw: raw,
} }
log.Println("received", coll.collectType, "from", src) log.Println("received", coll.CollectType, "from", src)
} }
} }

34
responed/daemon.go Normal file
View File

@ -0,0 +1,34 @@
package responed
import "time"
//Daemon struct
type Daemon struct {
collectors []*Collector
}
//NewDaemon create a list of collectors
func NewDaemon(parseFunc func(coll *Collector, res *Response)) *Daemon {
collectors := []*Collector{
NewCollector("statistics", parseFunc),
NewCollector("nodeinfo", parseFunc),
NewCollector("neighbours", parseFunc),
}
return &Daemon{
collectors,
}
}
//ListenAndSend on Collection
func (daemon *Daemon) ListenAndSend(collectInterval time.Duration) {
for _, col := range daemon.collectors {
col.sender(collectInterval)
}
}
//Close all Collections
func (daemon *Daemon) Close() {
for _, col := range daemon.collectors {
col.Close()
}
}

69
websocketserver/client.go Normal file
View File

@ -0,0 +1,69 @@
package websocketserver
import (
"fmt"
"golang.org/x/net/websocket"
)
const channelBufSize = 100
var maxID = 0
//Client struct
type Client struct {
id int
ws *websocket.Conn
server *Server
ch chan *struct{}
doneCh chan bool
}
//NewClient creates a new Client
func NewClient(ws *websocket.Conn, server *Server) *Client {
if ws == nil {
panic("ws cannot be nil")
}
if server == nil {
panic("server cannot be nil")
}
maxID++
ch := make(chan *struct{}, channelBufSize)
doneCh := make(chan bool)
return &Client{maxID, ws, server, ch, doneCh}
}
//GetConnection the websocket connection of a listen client
func (c *Client) GetConnection() *websocket.Conn {
return c.ws
}
//Write send the msg informations to the clients
func (c *Client) Write(msg *struct{}) {
select {
case c.ch <- msg:
default:
c.server.Del(c)
err := fmt.Errorf("Client %d is disconnected.", c.id)
c.server.Err(err)
}
}
// Listen Write and Read request via chanel
func (c *Client) Listen() {
c.listen()
}
// listen for new msg informations
func (c *Client) listen() {
for {
select {
case msg := <-c.ch:
websocket.JSON.Send(c.ws, msg)
}
}
}

View File

@ -1,4 +1,4 @@
package main package websocketserver
import ( import (
"log" "log"
@ -7,58 +7,63 @@ import (
"golang.org/x/net/websocket" "golang.org/x/net/websocket"
) )
// Node server. //Server struct
type NodeServer struct { type Server struct {
pattern string pattern string
clients map[int]*NodeClient clients map[int]*Client
addCh chan *NodeClient addCh chan *Client
delCh chan *NodeClient delCh chan *Client
sendAllCh chan *Node sendAllCh chan *struct{}
doneCh chan bool closeCh chan bool
errCh chan error errCh chan error
} }
// Create new node server. //NewServer creates a new node server
func NewNodeServer(pattern string) *NodeServer { func NewServer(pattern string) *Server {
clients := make(map[int]*NodeClient) clients := make(map[int]*Client)
addCh := make(chan *NodeClient) addCh := make(chan *Client)
delCh := make(chan *NodeClient) delCh := make(chan *Client)
sendAllCh := make(chan *Node) sendAllCh := make(chan *struct{})
doneCh := make(chan bool) closeCh := make(chan bool)
errCh := make(chan error) errCh := make(chan error)
return &NodeServer{ return &Server{
pattern, pattern,
clients, clients,
addCh, addCh,
delCh, delCh,
sendAllCh, sendAllCh,
doneCh, closeCh,
errCh, errCh,
} }
} }
func (s *NodeServer) Add(c *NodeClient) { //Add a node listen client
func (s *Server) Add(c *Client) {
s.addCh <- c s.addCh <- c
} }
func (s *NodeServer) Del(c *NodeClient) { //Del a node listen client
func (s *Server) Del(c *Client) {
s.delCh <- c s.delCh <- c
} }
func (s *NodeServer) SendAll(node *Node) { //SendAll to all listen clients refreshed information of a node
func (s *Server) SendAll(node *struct{}) {
s.sendAllCh <- node s.sendAllCh <- node
} }
func (s *NodeServer) Done() { //Close stops node server
s.doneCh <- true func (s *Server) Close() {
s.closeCh <- true
} }
func (s *NodeServer) Err(err error) { //Err send to server
func (s *Server) Err(err error) {
s.errCh <- err s.errCh <- err
} }
func (s *NodeServer) sendAll(node *Node) { func (s *Server) sendAll(node *struct{}) {
for _, c := range s.clients { for _, c := range s.clients {
c.Write(node) c.Write(node)
} }
@ -66,9 +71,9 @@ func (s *NodeServer) sendAll(node *Node) {
// Listen and serve. // Listen and serve.
// It serves client connection and broadcast request. // It serves client connection and broadcast request.
func (s *NodeServer) Listen() { func (s *Server) Listen() {
log.Println("Listening NodeServer...") log.Println("Listening Server...")
// websocket handler // websocket handler
onConnected := func(ws *websocket.Conn) { onConnected := func(ws *websocket.Conn) {
@ -79,7 +84,7 @@ func (s *NodeServer) Listen() {
} }
}() }()
client := NewNodeClient(ws, s) client := NewClient(ws, s)
s.Add(client) s.Add(client)
client.Listen() client.Listen()
} }
@ -107,7 +112,7 @@ func (s *NodeServer) Listen() {
case err := <-s.errCh: case err := <-s.errCh:
log.Println("Error:", err.Error()) log.Println("Error:", err.Error())
case <-s.doneCh: case <-s.closeCh:
return return
} }
} }