package http import ( "context" "encoding/json" "forge.cadoles.com/arcad/edge/pkg/module" "github.com/igm/sockjs-go/v3/sockjs" "github.com/pkg/errors" "gitlab.com/wpetit/goweb/logger" ) const ( statusChannelClosed = iota ) func (h *Handler) handleSockJSSession(sess sockjs.Session) { ctx := logger.With(sess.Request().Context(), logger.F("sessionID", sess.ID()), ) logger.Debug(ctx, "new sockjs session") defer func() { if sess.GetSessionState() == sockjs.SessionActive { if err := sess.Close(statusChannelClosed, "channel closed"); err != nil { logger.Error(ctx, "could not close sockjs session", logger.E(errors.WithStack(err))) } } }() go h.handleBackendMessages(ctx, sess) h.handleClientMessages(ctx, sess) } func (h *Handler) handleBackendMessages(ctx context.Context, sess sockjs.Session) { messages, err := h.bus.Subscribe(ctx, module.MessageNamespaceBackend) if err != nil { panic(errors.WithStack(err)) } defer func() { // Close messages subscriber h.bus.Unsubscribe(ctx, module.MessageNamespaceBackend, messages) logger.Debug(ctx, "unsubscribed") if sess.GetSessionState() != sockjs.SessionActive { return } if err := sess.Close(statusChannelClosed, "channel closed"); err != nil { logger.Error(ctx, "could not close sockjs session", logger.E(errors.WithStack(err))) } }() for { select { case <-ctx.Done(): return case msg := <-messages: backendMessage, ok := msg.(*module.BackendMessage) if !ok { logger.Error( ctx, "unexpected backend message", logger.F("message", msg), ) continue } payload, err := json.Marshal(backendMessage.Data) if err != nil { logger.Error( ctx, "could not encode message", logger.E(err), ) continue } message := NewWebsocketMessage( WebsocketMessageTypeMessage, json.RawMessage(payload), ) data, err := json.Marshal(message) if err != nil { logger.Error( ctx, "could not encode message", logger.E(err), ) continue } logger.Debug(ctx, "sending message") // Send message if err := sess.Send(string(data)); err != nil { logger.Error( ctx, "could not send message", logger.E(err), ) } } } } func (h *Handler) handleClientMessages(ctx context.Context, sess sockjs.Session) { for { select { case <-ctx.Done(): logger.Debug(ctx, "context done") return default: logger.Debug(ctx, "waiting for websocket data") data, err := sess.RecvCtx(ctx) if err != nil { if errors.Is(err, sockjs.ErrSessionNotOpen) { break } logger.Error( ctx, "could not read message", logger.E(errors.WithStack(err)), ) break } logger.Debug(ctx, "websocket data received", logger.F("data", data)) message := &WebsocketMessage{} if err := json.Unmarshal([]byte(data), message); err != nil { logger.Error( ctx, "could not decode message", logger.E(errors.WithStack(err)), ) break } switch { case message.Type == WebsocketMessageTypeMessage: var payload map[string]interface{} if err := json.Unmarshal(message.Payload, &payload); err != nil { logger.Error( ctx, "could not decode payload", logger.E(errors.WithStack(err)), ) return } ctx := logger.With(ctx, logger.F("payload", payload)) frontendMessage := module.NewFrontendMessage(payload) logger.Debug(ctx, "publishing new frontend message", logger.F("message", frontendMessage)) if err := h.bus.Publish(ctx, frontendMessage); err != nil { logger.Error(ctx, "could not publish message", logger.E(errors.WithStack(err)), logger.F("message", frontendMessage), ) return } logger.Debug(ctx, "new frontend message published", logger.F("message", frontendMessage)) default: logger.Error( ctx, "unsupported message type", logger.F("messageType", message.Type), ) } } } } const ( WebsocketMessageTypeMessage = "message" ) type WebsocketMessage struct { Type string `json:"t"` Payload json.RawMessage `json:"p"` } type WebsocketMessagePayload struct { Data map[string]interface{} `json:"d"` } func NewWebsocketMessage(dataType string, payload json.RawMessage) *WebsocketMessage { return &WebsocketMessage{ Type: dataType, Payload: payload, } }