From de11bae83563dd70743bd2c0019048c67ccc60d3 Mon Sep 17 00:00:00 2001 From: Geno Date: Tue, 1 Jun 2021 18:08:29 +0200 Subject: [PATCH] docs: improve of websocket --- web/ws/const.go | 7 +- web/ws/main.go | 245 +++++++++++++++++++++++++++--------------------- 2 files changed, 142 insertions(+), 110 deletions(-) diff --git a/web/ws/const.go b/web/ws/const.go index f4309d3..7f159e3 100644 --- a/web/ws/const.go +++ b/web/ws/const.go @@ -1,7 +1,10 @@ package ws const ( + // BodyError Message Body map typ for errors BodyError = "error" - BodySet = "set" - BodyGet = "get" + // BodySet Message Body map typ for set values + BodySet = "set" + // BodyGet Message Body map typ for get values + BodyGet = "get" ) diff --git a/web/ws/main.go b/web/ws/main.go index a1e30e0..3bc3ec2 100644 --- a/web/ws/main.go +++ b/web/ws/main.go @@ -8,12 +8,14 @@ import ( "github.com/bdlm/log" "github.com/gin-gonic/gin" + "github.com/google/uuid" "golang.org/x/time/rate" "nhooyr.io/websocket" "nhooyr.io/websocket/wsjson" ) +// WebsocketEndpoint to handle Request type WebsocketEndpoint struct { // publishLimiter controls the rate limit applied to the publish endpoint. // @@ -24,25 +26,52 @@ type WebsocketEndpoint struct { Subscribers map[*Subscriber]struct{} // Message Handler - handlers map[string]MessageHandleFunc + handlers map[string]MessageHandleFunc + // DefaultMessageHandler if no other handler for MessageType found DefaultMessageHandler MessageHandleFunc - OnOpen SubscriberEventFunc - OnClose SubscriberEventFunc + // Run Function on open connection by subscriper + OnOpen SubscriberEventFunc + // Run Function on close connection to subscriper + OnClose SubscriberEventFunc } -// MessageHandleFunc for handling messages -type MessageHandleFunc func(ctx context.Context, msg *Message) +// Subscriber of websocket endpoint +type Subscriber struct { + out chan *Message + closeSlow func() +} +// SubscriberEventFunc for handling connection state of Subsriber type SubscriberEventFunc func(s *Subscriber, msg chan<- *Message) // Message on websocket type Message struct { Type string `json:"type"` + ID *uuid.UUID `json:"id,omitempty"` + ReplyID *uuid.UUID `json:"reply_id,omitempty"` Body map[string]interface{} `json:"body"` - Reply chan<- *Message `json:"-"` Subscriber *Subscriber `json:"-"` } +// Reply to Message +func (m *Message) Reply(msg *Message) { + if m == nil || m.Subscriber == nil { + return + } + if m.ID != nil { + msg.ReplyID = m.ID + if msg.ID == nil { + id := uuid.New() + msg.ID = &id + } + } + m.Subscriber.out <- msg +} + +// MessageHandleFunc for handling messages +type MessageHandleFunc func(ctx context.Context, msg *Message) + +// NewEndpoint - create an empty websocket func NewEndpoint() *WebsocketEndpoint { return &WebsocketEndpoint{ publishLimiter: rate.NewLimiter(rate.Every(time.Millisecond*100), 8), @@ -50,109 +79,8 @@ func NewEndpoint() *WebsocketEndpoint { handlers: make(map[string]MessageHandleFunc), } } -func (this *WebsocketEndpoint) Handler(ctx *gin.Context) { - c, err := websocket.Accept(ctx.Writer, ctx.Request, &websocket.AcceptOptions{ - InsecureSkipVerify: true, - }) - if err != nil { - ctx.JSON(http.StatusBadRequest, false) - return - } - defer c.Close(websocket.StatusInternalError, "") - - err = this.addSubscriber(ctx, c) - - if websocket.CloseStatus(err) == websocket.StatusNormalClosure || - websocket.CloseStatus(err) == websocket.StatusGoingAway { - return - } - log.Errorf("subscriber stopped: %s", err) -} - -func (this *WebsocketEndpoint) AddMessageHandler(typ string, f MessageHandleFunc) { - this.handlers[typ] = f -} - -type Subscriber struct { - out chan *Message - closeSlow func() -} - -func (this *WebsocketEndpoint) readWorker(ctx context.Context, c *websocket.Conn, s *Subscriber) error { - for { - var msg Message - err := wsjson.Read(ctx, c, &msg) - if err != nil { - return err - } - log.WithField("type", msg.Type).Debug("receive") - msg.Subscriber = s - msg.Reply = s.out - if handler, ok := this.handlers[msg.Type]; ok { - handler(ctx, &msg) - } else if this.DefaultMessageHandler != nil { - this.DefaultMessageHandler(ctx, &msg) - } - } -} - -func (this *WebsocketEndpoint) addSubscriber(ctxGin *gin.Context, c *websocket.Conn) error { - s := &Subscriber{ - out: make(chan *Message, 10), - closeSlow: func() { - c.Close(websocket.StatusPolicyViolation, "connection too slow to keep up with messages") - }, - } - - this.subscribersMu.Lock() - this.Subscribers[s] = struct{}{} - this.subscribersMu.Unlock() - defer func() { - this.subscribersMu.Lock() - delete(this.Subscribers, s) - this.subscribersMu.Unlock() - if this.OnClose != nil { - this.OnClose(s, s.out) - } - log.Debug("websocket closed") - }() - - if this.OnOpen != nil { - this.OnOpen(s, s.out) - } - - ctx := ctxGin.Request.Context() - - go func() { - err := this.readWorker(ctx, c, s) - if websocket.CloseStatus(err) == websocket.StatusNormalClosure || - websocket.CloseStatus(err) == websocket.StatusGoingAway { - return - } - log.Errorf("websocket reading error: %s", err) - }() - log.Debug("websocket started") - - for { - select { - case msg := <-s.out: - err := writeTimeout(ctx, time.Second*5, c, msg) - if err != nil { - return err - } - case <-ctx.Done(): - return ctx.Err() - } - } -} - -func writeTimeout(ctx context.Context, timeout time.Duration, c *websocket.Conn, msg *Message) error { - ctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - - return wsjson.Write(ctx, c, msg) -} +// Broadcast Message to all subscriber (exclude sender of Message) func (this *WebsocketEndpoint) Broadcast(msg *Message) { this.subscribersMu.Lock() defer this.subscribersMu.Unlock() @@ -170,3 +98,104 @@ func (this *WebsocketEndpoint) Broadcast(msg *Message) { } } } + +// AddMessageHandler - add websocket message handler +func (we *WebsocketEndpoint) AddMessageHandler(typ string, f MessageHandleFunc) { + we.handlers[typ] = f +} + +// Handler - to register in gin webservice +func (we *WebsocketEndpoint) Handler(ctx *gin.Context) { + c, err := websocket.Accept(ctx.Writer, ctx.Request, nil) + if err != nil { + ctx.JSON(http.StatusBadRequest, false) + return + } + defer c.Close(websocket.StatusInternalError, "") + + err = we.addSubscriber(ctx, c) + + if websocket.CloseStatus(err) == websocket.StatusNormalClosure || + websocket.CloseStatus(err) == websocket.StatusGoingAway { + return + } + log.Errorf("subscriber stopped: %s", err) +} + +// addSubscriber and startup of websocket endpoint +func (we *WebsocketEndpoint) addSubscriber(ctxGin *gin.Context, c *websocket.Conn) error { + s := &Subscriber{ + out: make(chan *Message, 10), + closeSlow: func() { + c.Close(websocket.StatusPolicyViolation, "connection too slow to keep up with messages") + }, + } + + we.subscribersMu.Lock() + we.Subscribers[s] = struct{}{} + we.subscribersMu.Unlock() + defer func() { + we.subscribersMu.Lock() + delete(we.Subscribers, s) + we.subscribersMu.Unlock() + if we.OnClose != nil { + we.OnClose(s, s.out) + } + log.Debug("websocket closed") + }() + + if we.OnOpen != nil { + we.OnOpen(s, s.out) + } + + ctx := ctxGin.Request.Context() + + go func() { + err := we.readWorker(ctx, c, s) + if websocket.CloseStatus(err) == websocket.StatusNormalClosure || + websocket.CloseStatus(err) == websocket.StatusGoingAway { + return + } + log.Errorf("websocket reading error: %s", err) + }() + + log.Debug("websocket started") + + for { + select { + case msg := <-s.out: + err := writeTimeout(ctx, time.Second*5, c, msg) + if err != nil { + return err + } + case <-ctx.Done(): + return ctx.Err() + } + } +} + +// readWorker of subscriber +func (we *WebsocketEndpoint) readWorker(ctx context.Context, c *websocket.Conn, s *Subscriber) error { + for { + var msg Message + err := wsjson.Read(ctx, c, &msg) + if err != nil { + return err + } + log.WithField("type", msg.Type).Debug("receive") + msg.Subscriber = s + if handler, ok := we.handlers[msg.Type]; ok { + handler(ctx, &msg) + } else if we.DefaultMessageHandler != nil { + we.DefaultMessageHandler(ctx, &msg) + } + } +} + +// writeTimeout send message to subscriber with timeout +func writeTimeout(ctx context.Context, timeout time.Duration, c *websocket.Conn, msg *Message) error { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + return wsjson.Write(ctx, c, msg) +}