diff --git a/.drone.yml b/.drone.yml index 41a90a5..1b06a57 100644 --- a/.drone.yml +++ b/.drone.yml @@ -15,7 +15,6 @@ pipeline: codestyle: image: golang:latest commands: - - go get -d -t ./... - ./contrib/ci/check-testfiles - ./contrib/ci/check-gofmt - go get github.com/client9/misspell/cmd/misspell diff --git a/file/main.go b/file/main.go index 7e75867..cd4ecd5 100644 --- a/file/main.go +++ b/file/main.go @@ -1,3 +1,4 @@ +// Package file provides functionality to load and save marshal files package file import ( diff --git a/websocket/client.go b/websocket/client.go index 4e1782b..cfe1c6e 100644 --- a/websocket/client.go +++ b/websocket/client.go @@ -19,12 +19,11 @@ type Client struct { readQuit chan bool } +// NewClient by websocket func NewClient(s *Server, ws *websocket.Conn) *Client { - if ws == nil { log.Panic("ws cannot be nil") } - return &Client{ server: s, ws: ws, @@ -35,6 +34,7 @@ func NewClient(s *Server, ws *websocket.Conn) *Client { } } +// GetID of Client ( UUID or Address to Client) func (c *Client) GetID() string { if c.ws != nil { return c.ws.RemoteAddr().String() @@ -42,41 +42,45 @@ func (c *Client) GetID() string { return c.id.String() } +// Write Message to Client func (c *Client) Write(msg *Message) { select { case c.out <- msg: default: - c.server.DelClient(c) + c.server.delClient(c) c.Close() } } +// Close Client func (c *Client) Close() { c.writeQuit <- true c.readQuit <- true log.Info("client disconnecting...", c.GetID()) } -// Listen Write and Read request via channel +// Listen write and read request via channel func (c *Client) Listen() { go c.listenWrite() - c.server.AddClient(c) + c.server.addClient(c) c.listenRead() } +// handleInput manage session and valide message before send to server func (c *Client) handleInput(msg *Message) { msg.From = c if sm := c.server.sessionManager; sm != nil && sm.HandleMessage(msg) { return } if ok, err := msg.Validate(); ok { + msg.server = c.server c.server.msgChanIn <- msg } else { log.Println("no valid msg for:", c.GetID(), "error:", err, "\nmessage:", msg) } } -// Listen write request via channel +// listenWrite request via channel func (c *Client) listenWrite() { for { select { @@ -84,7 +88,7 @@ func (c *Client) listenWrite() { websocket.WriteJSON(c.ws, msg) case <-c.writeQuit: - c.server.DelClient(c) + c.server.delClient(c) close(c.out) close(c.writeQuit) return @@ -92,13 +96,13 @@ func (c *Client) listenWrite() { } } -// Listen read request via channel +// listenRead request via channel func (c *Client) listenRead() { for { select { case <-c.readQuit: - c.server.DelClient(c) + c.server.delClient(c) close(c.readQuit) return diff --git a/websocket/doc.go b/websocket/doc.go new file mode 100644 index 0000000..9261ab6 --- /dev/null +++ b/websocket/doc.go @@ -0,0 +1,2 @@ +// Package websocket to handling connection and session by messages and there subject +package websocket diff --git a/websocket/handler.go b/websocket/handler.go new file mode 100644 index 0000000..ca2b0b3 --- /dev/null +++ b/websocket/handler.go @@ -0,0 +1,52 @@ +package websocket + +import ( + "net/http" +) + +// MessageHandleFunc for handling messages +type MessageHandleFunc func(msg *Message) + +// WebsocketHandlerService to handle every Message on there Subject by Handlers +type WebsocketHandlerService struct { + inputMSG chan *Message + server *Server + handlers map[string]MessageHandleFunc + FallbackHandler MessageHandleFunc +} + +// NewWebsocketHandlerService with Websocket Server +func NewWebsocketHandlerService() *WebsocketHandlerService { + ws := WebsocketHandlerService{ + handlers: make(map[string]MessageHandleFunc), + inputMSG: make(chan *Message), + } + ws.server = NewServer(ws.inputMSG, NewSessionManager()) + return &ws +} + +func (ws *WebsocketHandlerService) messageHandler() { + for msg := range ws.inputMSG { + if handler, ok := ws.handlers[msg.Subject]; ok { + handler(msg) + } else if ws.FallbackHandler != nil { + ws.FallbackHandler(msg) + } + } +} + +// SetHandler for a message type by subject +func (ws *WebsocketHandlerService) SetHandler(subject string, f MessageHandleFunc) { + ws.handlers[subject] = f +} + +// Listen on net/http server at `path` and start running handling +func (ws *WebsocketHandlerService) Listen(path string) { + http.HandleFunc(path, ws.server.Handler) + go ws.messageHandler() +} + +// Close webserver +func (ws *WebsocketHandlerService) Close() { + close(ws.inputMSG) +} diff --git a/websocket/handler_test.go b/websocket/handler_test.go new file mode 100644 index 0000000..9474dea --- /dev/null +++ b/websocket/handler_test.go @@ -0,0 +1,42 @@ +package websocket + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestHandler(t *testing.T) { + assert := assert.New(t) + + chanMsg := make(chan *Message) + handlerService := NewWebsocketHandlerService() + assert.NotNil(handlerService) + + handlerService.inputMSG = chanMsg + handlerService.server.msgChanIn = chanMsg + + wg := sync.WaitGroup{} + + handlerService.SetHandler("dummy", func(msg *Message) { + assert.Equal("expected", msg.Body) + wg.Done() + }) + wg.Add(1) + + handlerService.Listen("path") + defer handlerService.Close() + + chanMsg <- &Message{Subject: "dummy", Body: "expected"} + + wg.Wait() + + wg.Add(1) + handlerService.FallbackHandler = func(msg *Message) { + assert.Equal("unexpected", msg.Body) + wg.Done() + } + chanMsg <- &Message{Subject: "mist", Body: "unexpected"} + wg.Wait() +} diff --git a/websocket/msg.go b/websocket/msg.go index 272154d..d8aa796 100644 --- a/websocket/msg.go +++ b/websocket/msg.go @@ -6,7 +6,9 @@ import ( "github.com/google/uuid" ) +// Message which send over websocket type Message struct { + server *Server ID uuid.UUID `json:"id,omitempty"` Session uuid.UUID `json:"-"` From *Client `json:"-"` @@ -14,6 +16,7 @@ type Message struct { Body interface{} `json:"body,omitempty"` } +// Validate is it valid message to forward func (msg *Message) Validate() (bool, error) { if msg.Subject == "" { return false, errors.New("no subject definied") @@ -24,7 +27,16 @@ func (msg *Message) Validate() (bool, error) { return true, nil } -func (msg *Message) Answer(subject string, body interface{}) { +// Replay to request +func (msg *Message) Replay(body interface{}) error { + return msg.Answer(msg.Subject, body) +} + +// Answer to replay at a request +func (msg *Message) Answer(subject string, body interface{}) error { + if msg.From == nil { + return errors.New("Message not received by a websocket Server") + } msg.From.Write(&Message{ ID: msg.ID, Session: msg.Session, @@ -32,4 +44,48 @@ func (msg *Message) Answer(subject string, body interface{}) { Subject: subject, Body: body, }) + return nil +} + +// ReplaySession to replay all of current Session +func (msg *Message) ReplaySession(body interface{}) error { + return msg.AnswerSession(msg.Subject, body) +} + +// AnswerSession to replay all of current Session +func (msg *Message) AnswerSession(subject string, body interface{}) error { + if msg.server == nil { + return errors.New("Message not received by a websocket Server") + } + if msg.server.sessionManager == nil { + return errors.New("websocket Server run without SessionManager") + } + msg.server.sessionManager.Send(msg.Session, &Message{ + ID: msg.ID, + Session: msg.Session, + From: msg.From, + Subject: subject, + Body: body, + }) + return nil +} + +// ReplayEverybody to replay all connection on Server +func (msg *Message) ReplayEverybody(body interface{}) error { + return msg.AnswerEverybody(msg.Subject, body) +} + +// AnswerEverybody to replay all connection on Server +func (msg *Message) AnswerEverybody(subject string, body interface{}) error { + if msg.server == nil { + return errors.New("Message not received by a websocket Server") + } + msg.server.SendAll(&Message{ + ID: msg.ID, + Session: msg.Session, + From: msg.From, + Subject: subject, + Body: body, + }) + return nil } diff --git a/websocket/msg_test.go b/websocket/msg_test.go index efdf046..c6320db 100644 --- a/websocket/msg_test.go +++ b/websocket/msg_test.go @@ -1,6 +1,7 @@ package websocket import ( + "sync" "testing" "github.com/google/uuid" @@ -23,7 +24,7 @@ func TestMSGValidate(t *testing.T) { assert.False(msg.Validate()) } -func TestMSGAnswer(t *testing.T) { +func TestMSGReplay(t *testing.T) { assert := assert.New(t) out := make(chan *Message, channelBufSize) @@ -37,16 +38,152 @@ func TestMSGAnswer(t *testing.T) { conversationID := uuid.New() msg := &Message{ - From: client, - ID: conversationID, + ID: conversationID, + Subject: "lola", } + err := msg.Replay(nil) + assert.Error(err) - go msg.Answer("hi", nil) + msg.From = client + + done := make(chan bool) + defer close(done) + + go func() { + err := msg.Replay("hi") + assert.NoError(err) + done <- true + }() msg = <-out + <-done assert.Equal(conversationID, msg.ID) assert.Equal(uuid.Nil, msg.Session) assert.Equal(client, msg.From) - assert.Equal("hi", msg.Subject) - assert.Nil(msg.Body) + assert.Equal("lola", msg.Subject) + assert.Equal("hi", msg.Body) +} + +func TestMSGSession(t *testing.T) { + assert := assert.New(t) + + srv := NewServer(nil, nil) + assert.NotNil(srv) + + sessionID := uuid.New() + + conversationID := uuid.New() + msg := &Message{ + Session: sessionID, + ID: conversationID, + Subject: "lola", + } + + err := msg.ReplaySession("error") + assert.Error(err) + + msg.server = srv + err = msg.ReplaySession("error") + assert.Error(err) + + srv.sessionManager = NewSessionManager() + + out1 := make(chan *Message, 3) + c1 := &Client{ + id: uuid.New(), + out: out1, + server: srv, + } + + out2 := make(chan *Message, 3) + c2 := &Client{ + id: uuid.New(), + out: out2, + server: srv, + } + srv.addClient(c1) + srv.addClient(c2) + + wgSession := sync.WaitGroup{} + wg := sync.WaitGroup{} + client := func(out chan *Message) { + for msg := range out { + if msg.Subject == SessionMessageInit { + msg.ID = sessionID + msg.From.handleInput(msg) + wgSession.Done() + } else { + assert.Equal("lola", msg.Subject) + assert.Equal("hi", msg.Body) + assert.Equal(conversationID, msg.ID) + assert.Equal(sessionID, msg.Session) + wg.Done() + } + } + } + wg.Add(2) + wgSession.Add(2) + go client(out1) + go client(out2) + wgSession.Wait() + + err = msg.ReplaySession("hi") + assert.NoError(err) + wg.Wait() + + srv.delClient(c2) + srv.delClient(c1) +} + +func TestMSGEverbody(t *testing.T) { + assert := assert.New(t) + + srv := NewServer(nil, nil) + assert.NotNil(srv) + + out1 := make(chan *Message, 2) + c1 := &Client{ + id: uuid.New(), + out: out1, + server: srv, + } + + out2 := make(chan *Message, 2) + c2 := &Client{ + id: uuid.New(), + out: out2, + server: srv, + } + srv.addClient(c1) + srv.addClient(c2) + + wg := sync.WaitGroup{} + + conversationID := uuid.New() + msg := &Message{ + ID: conversationID, + Subject: "lola", + } + err := msg.ReplayEverybody("error") + assert.Error(err) + + client := func(out chan *Message) { + msg := <-out + assert.Equal("lola", msg.Subject) + assert.Equal("hi", msg.Body) + assert.Equal(conversationID, msg.ID) + assert.Equal(uuid.Nil, msg.Session) + wg.Done() + } + wg.Add(2) + go client(out1) + go client(out2) + + msg.server = srv + err = msg.ReplayEverybody("hi") + assert.NoError(err) + wg.Wait() + + srv.delClient(c2) + srv.delClient(c1) } diff --git a/websocket/server.go b/websocket/server.go index 51136ca..ec77035 100644 --- a/websocket/server.go +++ b/websocket/server.go @@ -8,6 +8,7 @@ import ( log "github.com/sirupsen/logrus" ) +// Server of websocket type Server struct { msgChanIn chan *Message clients map[string]*Client @@ -16,6 +17,7 @@ type Server struct { upgrader websocket.Upgrader } +// NewServer to get a new Server for websockets func NewServer(msgChanIn chan *Message, sessionManager *SessionManager) *Server { return &Server{ clients: make(map[string]*Client), @@ -28,6 +30,7 @@ func NewServer(msgChanIn chan *Message, sessionManager *SessionManager) *Server } } +// Handler of websocket Server for binding on webserver func (s *Server) Handler(w http.ResponseWriter, r *http.Request) { conn, err := s.upgrader.Upgrade(w, r, nil) if err != nil { @@ -39,7 +42,7 @@ func (s *Server) Handler(w http.ResponseWriter, r *http.Request) { client.Listen() } -func (s *Server) AddClient(c *Client) { +func (s *Server) addClient(c *Client) { if c == nil { return } @@ -53,7 +56,7 @@ func (s *Server) AddClient(c *Client) { } } -func (s *Server) DelClient(c *Client) { +func (s *Server) delClient(c *Client) { if c == nil { return } @@ -66,6 +69,8 @@ func (s *Server) DelClient(c *Client) { } } } + +// SendAll to Send a message on every Client func (s *Server) SendAll(msg *Message) { s.clientsMutex.Lock() defer s.clientsMutex.Unlock() diff --git a/websocket/server_test.go b/websocket/server_test.go index a1127f8..f0c2322 100644 --- a/websocket/server_test.go +++ b/websocket/server_test.go @@ -25,13 +25,13 @@ func TestServer(t *testing.T) { out: out, server: srv, } - srv.AddClient(nil) - go srv.AddClient(c) + srv.addClient(nil) + go srv.addClient(c) msg := <-out assert.Equal(SessionMessageInit, msg.Subject) - srv.DelClient(nil) - srv.DelClient(c) + srv.delClient(nil) + srv.delClient(c) } func TestServerSendAll(t *testing.T) { @@ -52,8 +52,8 @@ func TestServerSendAll(t *testing.T) { out: out2, server: srv, } - srv.AddClient(c1) - srv.AddClient(c2) + srv.addClient(c1) + srv.addClient(c2) wg := sync.WaitGroup{} @@ -71,6 +71,6 @@ func TestServerSendAll(t *testing.T) { }) wg.Wait() - srv.DelClient(c2) - srv.DelClient(c1) + srv.delClient(c2) + srv.delClient(c1) } diff --git a/websocket/session.go b/websocket/session.go index 7ca05bb..cdd7aeb 100644 --- a/websocket/session.go +++ b/websocket/session.go @@ -6,14 +6,17 @@ import ( "github.com/google/uuid" ) +// SessionMessageInit subject in messages const SessionMessageInit = "session_init" +// SessionManager to handle reconnected websocket type SessionManager struct { sessionToClient map[uuid.UUID]map[string]*Client clientToSession map[string]uuid.UUID sync.Mutex } +// NewSessionManager to get a new SessionManager func NewSessionManager() *SessionManager { return &SessionManager{ sessionToClient: make(map[uuid.UUID]map[string]*Client), @@ -21,9 +24,15 @@ func NewSessionManager() *SessionManager { } } +// Init Session for given Client func (s *SessionManager) Init(c *Client) { - c.Write(&Message{Subject: SessionMessageInit}) + c.Write(&Message{ + From: c, + Subject: SessionMessageInit, + }) } + +// HandleMessage of client for Session func (s *SessionManager) HandleMessage(msg *Message) bool { if msg == nil { return false @@ -40,7 +49,8 @@ func (s *SessionManager) HandleMessage(msg *Message) bool { s.clientToSession[id] = msg.ID s.sessionToClient[msg.ID] = list return true - } else if msg.From != nil { + } + if msg.From != nil { id := msg.From.GetID() msg.Session = s.clientToSession[id] } @@ -66,15 +76,15 @@ func (s *SessionManager) Remove(c *Client) (client bool, session bool) { if len(clients) > 0 { s.sessionToClient[session] = clients return true, false - } else { - delete(s.sessionToClient, session) - return true, true } + delete(s.sessionToClient, session) + return true, true } } return false, false } +// Send a message to a specific Session (and all his Websocket clients) func (s *SessionManager) Send(id uuid.UUID, msg *Message) { s.Lock() defer s.Unlock() diff --git a/worker/main.go b/worker/main.go index df1c9d6..a6789d0 100644 --- a/worker/main.go +++ b/worker/main.go @@ -1,4 +1,4 @@ -// Package worker a lib for cronjobs to run in background +// Package worker to run functions like a cronjob in background package worker import (