improve websocket + add webserver handler

This commit is contained in:
Martin/Geno 2018-08-24 15:36:20 +02:00
parent cca8a0a88c
commit 5c20ee9561
No known key found for this signature in database
GPG Key ID: 9D7D3C6BFF600C6A
12 changed files with 341 additions and 33 deletions

View File

@ -15,7 +15,6 @@ pipeline:
codestyle:
image: golang:latest
commands:
- go get -d -t ./...
- ./contrib/ci/check-testfiles
- ./contrib/ci/check-gofmt
- go get github.com/client9/misspell/cmd/misspell

View File

@ -1,3 +1,4 @@
// Package file provides functionality to load and save marshal files
package file
import (

View File

@ -19,12 +19,11 @@ type Client struct {
readQuit chan bool
}
// NewClient by websocket
func NewClient(s *Server, ws *websocket.Conn) *Client {
if ws == nil {
log.Panic("ws cannot be nil")
}
return &Client{
server: s,
ws: ws,
@ -35,6 +34,7 @@ func NewClient(s *Server, ws *websocket.Conn) *Client {
}
}
// GetID of Client ( UUID or Address to Client)
func (c *Client) GetID() string {
if c.ws != nil {
return c.ws.RemoteAddr().String()
@ -42,41 +42,45 @@ func (c *Client) GetID() string {
return c.id.String()
}
// Write Message to Client
func (c *Client) Write(msg *Message) {
select {
case c.out <- msg:
default:
c.server.DelClient(c)
c.server.delClient(c)
c.Close()
}
}
// Close Client
func (c *Client) Close() {
c.writeQuit <- true
c.readQuit <- true
log.Info("client disconnecting...", c.GetID())
}
// Listen Write and Read request via channel
// Listen write and read request via channel
func (c *Client) Listen() {
go c.listenWrite()
c.server.AddClient(c)
c.server.addClient(c)
c.listenRead()
}
// handleInput manage session and valide message before send to server
func (c *Client) handleInput(msg *Message) {
msg.From = c
if sm := c.server.sessionManager; sm != nil && sm.HandleMessage(msg) {
return
}
if ok, err := msg.Validate(); ok {
msg.server = c.server
c.server.msgChanIn <- msg
} else {
log.Println("no valid msg for:", c.GetID(), "error:", err, "\nmessage:", msg)
}
}
// Listen write request via channel
// listenWrite request via channel
func (c *Client) listenWrite() {
for {
select {
@ -84,7 +88,7 @@ func (c *Client) listenWrite() {
websocket.WriteJSON(c.ws, msg)
case <-c.writeQuit:
c.server.DelClient(c)
c.server.delClient(c)
close(c.out)
close(c.writeQuit)
return
@ -92,13 +96,13 @@ func (c *Client) listenWrite() {
}
}
// Listen read request via channel
// listenRead request via channel
func (c *Client) listenRead() {
for {
select {
case <-c.readQuit:
c.server.DelClient(c)
c.server.delClient(c)
close(c.readQuit)
return

2
websocket/doc.go Normal file
View File

@ -0,0 +1,2 @@
// Package websocket to handling connection and session by messages and there subject
package websocket

52
websocket/handler.go Normal file
View File

@ -0,0 +1,52 @@
package websocket
import (
"net/http"
)
// MessageHandleFunc for handling messages
type MessageHandleFunc func(msg *Message)
// WebsocketHandlerService to handle every Message on there Subject by Handlers
type WebsocketHandlerService struct {
inputMSG chan *Message
server *Server
handlers map[string]MessageHandleFunc
FallbackHandler MessageHandleFunc
}
// NewWebsocketHandlerService with Websocket Server
func NewWebsocketHandlerService() *WebsocketHandlerService {
ws := WebsocketHandlerService{
handlers: make(map[string]MessageHandleFunc),
inputMSG: make(chan *Message),
}
ws.server = NewServer(ws.inputMSG, NewSessionManager())
return &ws
}
func (ws *WebsocketHandlerService) messageHandler() {
for msg := range ws.inputMSG {
if handler, ok := ws.handlers[msg.Subject]; ok {
handler(msg)
} else if ws.FallbackHandler != nil {
ws.FallbackHandler(msg)
}
}
}
// SetHandler for a message type by subject
func (ws *WebsocketHandlerService) SetHandler(subject string, f MessageHandleFunc) {
ws.handlers[subject] = f
}
// Listen on net/http server at `path` and start running handling
func (ws *WebsocketHandlerService) Listen(path string) {
http.HandleFunc(path, ws.server.Handler)
go ws.messageHandler()
}
// Close webserver
func (ws *WebsocketHandlerService) Close() {
close(ws.inputMSG)
}

42
websocket/handler_test.go Normal file
View File

@ -0,0 +1,42 @@
package websocket
import (
"sync"
"testing"
"github.com/stretchr/testify/assert"
)
func TestHandler(t *testing.T) {
assert := assert.New(t)
chanMsg := make(chan *Message)
handlerService := NewWebsocketHandlerService()
assert.NotNil(handlerService)
handlerService.inputMSG = chanMsg
handlerService.server.msgChanIn = chanMsg
wg := sync.WaitGroup{}
handlerService.SetHandler("dummy", func(msg *Message) {
assert.Equal("expected", msg.Body)
wg.Done()
})
wg.Add(1)
handlerService.Listen("path")
defer handlerService.Close()
chanMsg <- &Message{Subject: "dummy", Body: "expected"}
wg.Wait()
wg.Add(1)
handlerService.FallbackHandler = func(msg *Message) {
assert.Equal("unexpected", msg.Body)
wg.Done()
}
chanMsg <- &Message{Subject: "mist", Body: "unexpected"}
wg.Wait()
}

View File

@ -6,7 +6,9 @@ import (
"github.com/google/uuid"
)
// Message which send over websocket
type Message struct {
server *Server
ID uuid.UUID `json:"id,omitempty"`
Session uuid.UUID `json:"-"`
From *Client `json:"-"`
@ -14,6 +16,7 @@ type Message struct {
Body interface{} `json:"body,omitempty"`
}
// Validate is it valid message to forward
func (msg *Message) Validate() (bool, error) {
if msg.Subject == "" {
return false, errors.New("no subject definied")
@ -24,7 +27,16 @@ func (msg *Message) Validate() (bool, error) {
return true, nil
}
func (msg *Message) Answer(subject string, body interface{}) {
// Replay to request
func (msg *Message) Replay(body interface{}) error {
return msg.Answer(msg.Subject, body)
}
// Answer to replay at a request
func (msg *Message) Answer(subject string, body interface{}) error {
if msg.From == nil {
return errors.New("Message not received by a websocket Server")
}
msg.From.Write(&Message{
ID: msg.ID,
Session: msg.Session,
@ -32,4 +44,48 @@ func (msg *Message) Answer(subject string, body interface{}) {
Subject: subject,
Body: body,
})
return nil
}
// ReplaySession to replay all of current Session
func (msg *Message) ReplaySession(body interface{}) error {
return msg.AnswerSession(msg.Subject, body)
}
// AnswerSession to replay all of current Session
func (msg *Message) AnswerSession(subject string, body interface{}) error {
if msg.server == nil {
return errors.New("Message not received by a websocket Server")
}
if msg.server.sessionManager == nil {
return errors.New("websocket Server run without SessionManager")
}
msg.server.sessionManager.Send(msg.Session, &Message{
ID: msg.ID,
Session: msg.Session,
From: msg.From,
Subject: subject,
Body: body,
})
return nil
}
// ReplayEverybody to replay all connection on Server
func (msg *Message) ReplayEverybody(body interface{}) error {
return msg.AnswerEverybody(msg.Subject, body)
}
// AnswerEverybody to replay all connection on Server
func (msg *Message) AnswerEverybody(subject string, body interface{}) error {
if msg.server == nil {
return errors.New("Message not received by a websocket Server")
}
msg.server.SendAll(&Message{
ID: msg.ID,
Session: msg.Session,
From: msg.From,
Subject: subject,
Body: body,
})
return nil
}

View File

@ -1,6 +1,7 @@
package websocket
import (
"sync"
"testing"
"github.com/google/uuid"
@ -23,7 +24,7 @@ func TestMSGValidate(t *testing.T) {
assert.False(msg.Validate())
}
func TestMSGAnswer(t *testing.T) {
func TestMSGReplay(t *testing.T) {
assert := assert.New(t)
out := make(chan *Message, channelBufSize)
@ -37,16 +38,152 @@ func TestMSGAnswer(t *testing.T) {
conversationID := uuid.New()
msg := &Message{
From: client,
ID: conversationID,
ID: conversationID,
Subject: "lola",
}
err := msg.Replay(nil)
assert.Error(err)
go msg.Answer("hi", nil)
msg.From = client
done := make(chan bool)
defer close(done)
go func() {
err := msg.Replay("hi")
assert.NoError(err)
done <- true
}()
msg = <-out
<-done
assert.Equal(conversationID, msg.ID)
assert.Equal(uuid.Nil, msg.Session)
assert.Equal(client, msg.From)
assert.Equal("hi", msg.Subject)
assert.Nil(msg.Body)
assert.Equal("lola", msg.Subject)
assert.Equal("hi", msg.Body)
}
func TestMSGSession(t *testing.T) {
assert := assert.New(t)
srv := NewServer(nil, nil)
assert.NotNil(srv)
sessionID := uuid.New()
conversationID := uuid.New()
msg := &Message{
Session: sessionID,
ID: conversationID,
Subject: "lola",
}
err := msg.ReplaySession("error")
assert.Error(err)
msg.server = srv
err = msg.ReplaySession("error")
assert.Error(err)
srv.sessionManager = NewSessionManager()
out1 := make(chan *Message, 3)
c1 := &Client{
id: uuid.New(),
out: out1,
server: srv,
}
out2 := make(chan *Message, 3)
c2 := &Client{
id: uuid.New(),
out: out2,
server: srv,
}
srv.addClient(c1)
srv.addClient(c2)
wgSession := sync.WaitGroup{}
wg := sync.WaitGroup{}
client := func(out chan *Message) {
for msg := range out {
if msg.Subject == SessionMessageInit {
msg.ID = sessionID
msg.From.handleInput(msg)
wgSession.Done()
} else {
assert.Equal("lola", msg.Subject)
assert.Equal("hi", msg.Body)
assert.Equal(conversationID, msg.ID)
assert.Equal(sessionID, msg.Session)
wg.Done()
}
}
}
wg.Add(2)
wgSession.Add(2)
go client(out1)
go client(out2)
wgSession.Wait()
err = msg.ReplaySession("hi")
assert.NoError(err)
wg.Wait()
srv.delClient(c2)
srv.delClient(c1)
}
func TestMSGEverbody(t *testing.T) {
assert := assert.New(t)
srv := NewServer(nil, nil)
assert.NotNil(srv)
out1 := make(chan *Message, 2)
c1 := &Client{
id: uuid.New(),
out: out1,
server: srv,
}
out2 := make(chan *Message, 2)
c2 := &Client{
id: uuid.New(),
out: out2,
server: srv,
}
srv.addClient(c1)
srv.addClient(c2)
wg := sync.WaitGroup{}
conversationID := uuid.New()
msg := &Message{
ID: conversationID,
Subject: "lola",
}
err := msg.ReplayEverybody("error")
assert.Error(err)
client := func(out chan *Message) {
msg := <-out
assert.Equal("lola", msg.Subject)
assert.Equal("hi", msg.Body)
assert.Equal(conversationID, msg.ID)
assert.Equal(uuid.Nil, msg.Session)
wg.Done()
}
wg.Add(2)
go client(out1)
go client(out2)
msg.server = srv
err = msg.ReplayEverybody("hi")
assert.NoError(err)
wg.Wait()
srv.delClient(c2)
srv.delClient(c1)
}

View File

@ -8,6 +8,7 @@ import (
log "github.com/sirupsen/logrus"
)
// Server of websocket
type Server struct {
msgChanIn chan *Message
clients map[string]*Client
@ -16,6 +17,7 @@ type Server struct {
upgrader websocket.Upgrader
}
// NewServer to get a new Server for websockets
func NewServer(msgChanIn chan *Message, sessionManager *SessionManager) *Server {
return &Server{
clients: make(map[string]*Client),
@ -28,6 +30,7 @@ func NewServer(msgChanIn chan *Message, sessionManager *SessionManager) *Server
}
}
// Handler of websocket Server for binding on webserver
func (s *Server) Handler(w http.ResponseWriter, r *http.Request) {
conn, err := s.upgrader.Upgrade(w, r, nil)
if err != nil {
@ -39,7 +42,7 @@ func (s *Server) Handler(w http.ResponseWriter, r *http.Request) {
client.Listen()
}
func (s *Server) AddClient(c *Client) {
func (s *Server) addClient(c *Client) {
if c == nil {
return
}
@ -53,7 +56,7 @@ func (s *Server) AddClient(c *Client) {
}
}
func (s *Server) DelClient(c *Client) {
func (s *Server) delClient(c *Client) {
if c == nil {
return
}
@ -66,6 +69,8 @@ func (s *Server) DelClient(c *Client) {
}
}
}
// SendAll to Send a message on every Client
func (s *Server) SendAll(msg *Message) {
s.clientsMutex.Lock()
defer s.clientsMutex.Unlock()

View File

@ -25,13 +25,13 @@ func TestServer(t *testing.T) {
out: out,
server: srv,
}
srv.AddClient(nil)
go srv.AddClient(c)
srv.addClient(nil)
go srv.addClient(c)
msg := <-out
assert.Equal(SessionMessageInit, msg.Subject)
srv.DelClient(nil)
srv.DelClient(c)
srv.delClient(nil)
srv.delClient(c)
}
func TestServerSendAll(t *testing.T) {
@ -52,8 +52,8 @@ func TestServerSendAll(t *testing.T) {
out: out2,
server: srv,
}
srv.AddClient(c1)
srv.AddClient(c2)
srv.addClient(c1)
srv.addClient(c2)
wg := sync.WaitGroup{}
@ -71,6 +71,6 @@ func TestServerSendAll(t *testing.T) {
})
wg.Wait()
srv.DelClient(c2)
srv.DelClient(c1)
srv.delClient(c2)
srv.delClient(c1)
}

View File

@ -6,14 +6,17 @@ import (
"github.com/google/uuid"
)
// SessionMessageInit subject in messages
const SessionMessageInit = "session_init"
// SessionManager to handle reconnected websocket
type SessionManager struct {
sessionToClient map[uuid.UUID]map[string]*Client
clientToSession map[string]uuid.UUID
sync.Mutex
}
// NewSessionManager to get a new SessionManager
func NewSessionManager() *SessionManager {
return &SessionManager{
sessionToClient: make(map[uuid.UUID]map[string]*Client),
@ -21,9 +24,15 @@ func NewSessionManager() *SessionManager {
}
}
// Init Session for given Client
func (s *SessionManager) Init(c *Client) {
c.Write(&Message{Subject: SessionMessageInit})
c.Write(&Message{
From: c,
Subject: SessionMessageInit,
})
}
// HandleMessage of client for Session
func (s *SessionManager) HandleMessage(msg *Message) bool {
if msg == nil {
return false
@ -40,7 +49,8 @@ func (s *SessionManager) HandleMessage(msg *Message) bool {
s.clientToSession[id] = msg.ID
s.sessionToClient[msg.ID] = list
return true
} else if msg.From != nil {
}
if msg.From != nil {
id := msg.From.GetID()
msg.Session = s.clientToSession[id]
}
@ -66,15 +76,15 @@ func (s *SessionManager) Remove(c *Client) (client bool, session bool) {
if len(clients) > 0 {
s.sessionToClient[session] = clients
return true, false
} else {
delete(s.sessionToClient, session)
return true, true
}
delete(s.sessionToClient, session)
return true, true
}
}
return false, false
}
// Send a message to a specific Session (and all his Websocket clients)
func (s *SessionManager) Send(id uuid.UUID, msg *Message) {
s.Lock()
defer s.Unlock()

View File

@ -1,4 +1,4 @@
// Package worker a lib for cronjobs to run in background
// Package worker to run functions like a cronjob in background
package worker
import (