add websocket

This commit is contained in:
Martin Geno 2017-10-25 18:42:44 +02:00
parent fc00fdb7a3
commit 0d64ba751e
No known key found for this signature in database
GPG Key ID: F0D39A37E925E941
9 changed files with 367 additions and 2 deletions

View File

@ -26,6 +26,7 @@ done
# Failures have incomplete results, so don't send # Failures have incomplete results, so don't send
if [ "$FAIL" -eq 0 ]; then if [ "$FAIL" -eq 0 ]; then
bash <(curl -s https://codecov.io/bash) -t $CODECOV_TOKEN -f profile.cov
goveralls -v -coverprofile=profile.cov -service=$CI -repotoken=$COVERALLS_REPO_TOKEN goveralls -v -coverprofile=profile.cov -service=$CI -repotoken=$COVERALLS_REPO_TOKEN
fi fi

116
websocket/client.go Normal file
View File

@ -0,0 +1,116 @@
package websocket
import (
"io"
log "github.com/sirupsen/logrus"
"github.com/gorilla/websocket"
)
const channelBufSize = 100
type Client struct {
server *Server
ws *websocket.Conn
out chan *Message
writeQuit chan bool
readQuit chan bool
}
func NewClient(s *Server, ws *websocket.Conn) *Client {
if ws == nil {
log.Panic("ws cannot be nil")
}
return &Client{
server: s,
ws: ws,
out: make(chan *Message, channelBufSize),
writeQuit: make(chan bool),
readQuit: make(chan bool),
}
}
func (c *Client) GetID() string {
return c.ws.RemoteAddr().String()
}
func (c *Client) Write(msg *Message) {
select {
case c.out <- msg:
default:
c.server.SessionManager.Remove(c)
c.server.DelClient(c)
c.Close()
}
}
func (c *Client) Close() {
c.writeQuit <- true
c.readQuit <- true
log.Info("client disconnecting...", c.GetID())
}
// Listen Write and Read request via channel
func (c *Client) Listen() {
go c.listenWrite()
c.server.AddClient(c)
c.server.SessionManager.Init(c)
c.listenRead()
}
func (c *Client) handleInput(msg *Message) {
msg.From = c
if c.server.SessionManager.HandleMessage(msg) {
return
}
if ok, err := msg.Validate(); ok {
c.server.msgChanIn <- msg
} else {
log.Println("no valid msg for:", c.GetID(), "error:", err, "\nmessage:", msg)
}
}
// Listen write request via channel
func (c *Client) listenWrite() {
for {
select {
case msg := <-c.out:
websocket.WriteJSON(c.ws, msg)
case <-c.writeQuit:
c.server.SessionManager.Remove(c)
c.server.DelClient(c)
close(c.out)
close(c.writeQuit)
return
}
}
}
// Listen read request via channel
func (c *Client) listenRead() {
for {
select {
case <-c.readQuit:
c.server.SessionManager.Remove(c)
c.server.DelClient(c)
close(c.readQuit)
return
default:
var msg Message
err := websocket.ReadJSON(c.ws, &msg)
if err == io.EOF {
return
} else if err != nil {
log.Println(err, c.GetID())
} else {
c.handleInput(&msg)
}
}
}
}

35
websocket/msg.go Normal file
View File

@ -0,0 +1,35 @@
package websocket
import (
"errors"
"github.com/google/uuid"
)
type Message struct {
ID uuid.UUID `json:"id,omitempty"`
Session uuid.UUID `json:"-"`
From *Client `json:"-"`
Subject string `json:"subject,omitempty"`
Body interface{} `json:"body,omitempty"`
}
func (msg *Message) Validate() (bool, error) {
if msg.Subject == "" {
return false, errors.New("no subject definied")
}
if msg.From == nil {
return false, errors.New("no sender definied")
}
return true, nil
}
func (msg *Message) Answer(subject string, body interface{}) {
msg.From.Write(&Message{
ID: msg.ID,
Session: msg.Session,
From: msg.From,
Subject: subject,
Body: body,
})
}

24
websocket/msg_test.go Normal file
View File

@ -0,0 +1,24 @@
package websocket
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestMSGValidate(t *testing.T) {
assert := assert.New(t)
msg := &Message{}
assert.False(msg.Validate())
msg.Subject = "login"
assert.False(msg.Validate())
msg.From = &Client{}
assert.True(msg.Validate())
msg.Subject = ""
assert.False(msg.Validate())
}

63
websocket/server.go Normal file
View File

@ -0,0 +1,63 @@
package websocket
import (
"net/http"
"sync"
"github.com/gorilla/websocket"
log "github.com/sirupsen/logrus"
)
type Server struct {
msgChanIn chan *Message
clients map[string]*Client
clientsMutex sync.Mutex
SessionManager *SessionManager
upgrader websocket.Upgrader
}
func NewServer(msgChanIn chan *Message) *Server {
return &Server{
clients: make(map[string]*Client),
msgChanIn: msgChanIn,
SessionManager: NewSessionManager(),
upgrader: websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
},
}
}
func (s *Server) Handler(w http.ResponseWriter, r *http.Request) {
conn, err := s.upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
client := NewClient(s, conn)
defer client.Close()
client.Listen()
}
func (s *Server) AddClient(c *Client) {
if c == nil {
return
}
if id := c.GetID(); id != "" {
s.clientsMutex.Lock()
defer s.clientsMutex.Unlock()
s.clients[id] = c
}
}
func (s *Server) DelClient(c *Client) {
if c == nil {
return
}
if id := c.GetID(); id != "" {
s.clientsMutex.Lock()
delete(s.clients, id)
s.clientsMutex.Unlock()
s.SessionManager.Remove(c)
}
}

77
websocket/session.go Normal file
View File

@ -0,0 +1,77 @@
package websocket
import (
"sync"
"github.com/google/uuid"
)
const SessionMessageInit = "session_init"
type SessionManager struct {
sessionToClient map[uuid.UUID]map[string]*Client
clientToSession map[string]uuid.UUID
sync.Mutex
}
func NewSessionManager() *SessionManager {
return &SessionManager{
sessionToClient: make(map[uuid.UUID]map[string]*Client),
clientToSession: make(map[string]uuid.UUID),
}
}
func (s *SessionManager) Init(c *Client) {
c.Write(&Message{Subject: SessionMessageInit})
}
func (s *SessionManager) HandleMessage(msg *Message) bool {
if msg == nil {
return false
}
if msg.ID != uuid.Nil && msg.Subject == SessionMessageInit && msg.From != nil {
s.Lock()
defer s.Unlock()
list := s.sessionToClient[msg.ID]
if list == nil {
list = make(map[string]*Client)
}
id := msg.From.GetID()
list[id] = msg.From
s.clientToSession[id] = msg.ID
s.sessionToClient[msg.ID] = list
return true
} else {
id := msg.From.GetID()
msg.Session = s.clientToSession[id]
}
return false
}
func (s *SessionManager) Remove(c *Client) {
if c == nil {
return
}
if id := c.GetID(); id != "" {
session := s.clientToSession[id]
if session != uuid.Nil {
s.Lock()
defer s.Unlock()
list := s.sessionToClient[session]
delete(list, id)
if len(list) > 0 {
s.sessionToClient[session] = list
} else {
delete(s.sessionToClient, session)
}
}
delete(s.clientToSession, id)
}
}
func (s *SessionManager) Send(id uuid.UUID, msg *Message) {
session := s.sessionToClient[id]
for _, c := range session {
c.Write(msg)
}
}

43
websocket/session_test.go Normal file
View File

@ -0,0 +1,43 @@
package websocket
import (
"testing"
"github.com/gorilla/websocket"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
)
func TestSessionManager(t *testing.T) {
assert := assert.New(t)
session := NewSessionManager()
assert.NotNil(session)
out := make(chan *Message, channelBufSize)
client := &Client{
out: out,
writeQuit: make(chan bool),
readQuit: make(chan bool),
ws: &websocket.Conn{},
}
session.Init(client)
msg := <-out
assert.Equal(SessionMessageInit, msg.Subject)
result := session.HandleMessage(nil)
assert.False(result)
msgFillSession := &Message{}
result = session.HandleMessage(msgFillSession)
assert.False(result)
result = session.HandleMessage(&Message{
ID: uuid.New(),
From: client,
Subject: SessionMessageInit,
})
assert.True(result)
}

View File

@ -1,13 +1,17 @@
// Package with a lib for cronjobs to run in background // Package with a lib for cronjobs to run in background
package worker package worker
import "time" import (
"sync"
"time"
)
// Struct which handles the job // Struct which handles the job
type Worker struct { type Worker struct {
every time.Duration every time.Duration
run func() run func()
quit chan struct{} quit chan struct{}
wg sync.WaitGroup
} }
// Function to create a new Worker with a timestamp, run, every and it's function // Function to create a new Worker with a timestamp, run, every and it's function
@ -23,6 +27,7 @@ func NewWorker(every time.Duration, f func()) (w *Worker) {
// Function to start the Worker // Function to start the Worker
// (please us it as a go routine with go w.Start()) // (please us it as a go routine with go w.Start())
func (w *Worker) Start() { func (w *Worker) Start() {
w.wg.Add(1)
ticker := time.NewTicker(w.every) ticker := time.NewTicker(w.every)
for { for {
select { select {
@ -30,6 +35,7 @@ func (w *Worker) Start() {
w.run() w.run()
case <-w.quit: case <-w.quit:
ticker.Stop() ticker.Stop()
w.wg.Done()
return return
} }
} }
@ -38,4 +44,5 @@ func (w *Worker) Start() {
// Function to stop the Worker // Function to stop the Worker
func (w *Worker) Close() { func (w *Worker) Close() {
close(w.quit) close(w.quit)
w.wg.Wait()
} }

View File

@ -22,5 +22,4 @@ func TestWorker(t *testing.T) {
w.Close() w.Close()
assert.Equal(3, runtime) assert.Equal(3, runtime)
time.Sleep(time.Duration(8) * time.Millisecond)
} }