215 lines
4.3 KiB
Go
215 lines
4.3 KiB
Go
|
package http
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"encoding/json"
|
||
|
|
||
|
"forge.cadoles.com/arcad/edge/pkg/module"
|
||
|
"github.com/igm/sockjs-go/v3/sockjs"
|
||
|
"github.com/pkg/errors"
|
||
|
"gitlab.com/wpetit/goweb/logger"
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
statusChannelClosed = iota
|
||
|
)
|
||
|
|
||
|
func (h *Handler) handleSockJSSession(sess sockjs.Session) {
|
||
|
ctx := logger.With(sess.Request().Context(),
|
||
|
logger.F("sessionID", sess.ID()),
|
||
|
)
|
||
|
|
||
|
logger.Debug(ctx, "new sockjs session")
|
||
|
|
||
|
defer func() {
|
||
|
if sess.GetSessionState() == sockjs.SessionActive {
|
||
|
if err := sess.Close(statusChannelClosed, "channel closed"); err != nil {
|
||
|
logger.Error(ctx, "could not close sockjs session", logger.E(errors.WithStack(err)))
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
go h.handleBackendMessages(ctx, sess)
|
||
|
h.handleClientMessages(ctx, sess)
|
||
|
}
|
||
|
|
||
|
func (h *Handler) handleBackendMessages(ctx context.Context, sess sockjs.Session) {
|
||
|
messages, err := h.bus.Subscribe(ctx, module.MessageNamespaceBackend)
|
||
|
if err != nil {
|
||
|
panic(errors.WithStack(err))
|
||
|
}
|
||
|
|
||
|
defer func() {
|
||
|
// Close messages subscriber
|
||
|
h.bus.Unsubscribe(ctx, module.MessageNamespaceBackend, messages)
|
||
|
|
||
|
logger.Debug(ctx, "unsubscribed")
|
||
|
|
||
|
if sess.GetSessionState() != sockjs.SessionActive {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if err := sess.Close(statusChannelClosed, "channel closed"); err != nil {
|
||
|
logger.Error(ctx, "could not close sockjs session", logger.E(errors.WithStack(err)))
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
for {
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
return
|
||
|
|
||
|
case msg := <-messages:
|
||
|
backendMessage, ok := msg.(*module.BackendMessage)
|
||
|
if !ok {
|
||
|
logger.Error(
|
||
|
ctx,
|
||
|
"unexpected backend message",
|
||
|
logger.F("message", msg),
|
||
|
)
|
||
|
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
payload, err := json.Marshal(backendMessage.Data)
|
||
|
if err != nil {
|
||
|
logger.Error(
|
||
|
ctx,
|
||
|
"could not encode message",
|
||
|
logger.E(err),
|
||
|
)
|
||
|
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
message := NewWebsocketMessage(
|
||
|
WebsocketMessageTypeMessage,
|
||
|
json.RawMessage(payload),
|
||
|
)
|
||
|
|
||
|
data, err := json.Marshal(message)
|
||
|
if err != nil {
|
||
|
logger.Error(
|
||
|
ctx,
|
||
|
"could not encode message",
|
||
|
logger.E(err),
|
||
|
)
|
||
|
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
logger.Debug(ctx, "sending message")
|
||
|
|
||
|
// Send message
|
||
|
if err := sess.Send(string(data)); err != nil {
|
||
|
logger.Error(
|
||
|
ctx,
|
||
|
"could not send message",
|
||
|
logger.E(err),
|
||
|
)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (h *Handler) handleClientMessages(ctx context.Context, sess sockjs.Session) {
|
||
|
for {
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
logger.Debug(ctx, "context done")
|
||
|
|
||
|
return
|
||
|
|
||
|
default:
|
||
|
logger.Debug(ctx, "waiting for websocket data")
|
||
|
|
||
|
data, err := sess.RecvCtx(ctx)
|
||
|
if err != nil {
|
||
|
if errors.Is(err, sockjs.ErrSessionNotOpen) {
|
||
|
break
|
||
|
}
|
||
|
|
||
|
logger.Error(
|
||
|
ctx,
|
||
|
"could not read message",
|
||
|
logger.E(errors.WithStack(err)),
|
||
|
)
|
||
|
|
||
|
break
|
||
|
}
|
||
|
|
||
|
logger.Debug(ctx, "websocket data received", logger.F("data", data))
|
||
|
|
||
|
message := &WebsocketMessage{}
|
||
|
if err := json.Unmarshal([]byte(data), message); err != nil {
|
||
|
logger.Error(
|
||
|
ctx,
|
||
|
"could not decode message",
|
||
|
logger.E(errors.WithStack(err)),
|
||
|
)
|
||
|
|
||
|
break
|
||
|
}
|
||
|
|
||
|
switch {
|
||
|
|
||
|
case message.Type == WebsocketMessageTypeMessage:
|
||
|
var payload map[string]interface{}
|
||
|
if err := json.Unmarshal(message.Payload, &payload); err != nil {
|
||
|
logger.Error(
|
||
|
ctx,
|
||
|
"could not decode payload",
|
||
|
logger.E(errors.WithStack(err)),
|
||
|
)
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
ctx := logger.With(ctx, logger.F("payload", payload))
|
||
|
|
||
|
frontendMessage := module.NewFrontendMessage(payload)
|
||
|
|
||
|
logger.Debug(ctx, "publishing new frontend message", logger.F("message", frontendMessage))
|
||
|
|
||
|
if err := h.bus.Publish(ctx, frontendMessage); err != nil {
|
||
|
logger.Error(ctx, "could not publish message",
|
||
|
logger.E(errors.WithStack(err)),
|
||
|
logger.F("message", frontendMessage),
|
||
|
)
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
logger.Debug(ctx, "new frontend message published", logger.F("message", frontendMessage))
|
||
|
|
||
|
default:
|
||
|
logger.Error(
|
||
|
ctx,
|
||
|
"unsupported message type",
|
||
|
logger.F("messageType", message.Type),
|
||
|
)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
const (
|
||
|
WebsocketMessageTypeMessage = "message"
|
||
|
)
|
||
|
|
||
|
type WebsocketMessage struct {
|
||
|
Type string `json:"t"`
|
||
|
Payload json.RawMessage `json:"p"`
|
||
|
}
|
||
|
|
||
|
type WebsocketMessagePayload struct {
|
||
|
Data map[string]interface{} `json:"d"`
|
||
|
}
|
||
|
|
||
|
func NewWebsocketMessage(dataType string, payload json.RawMessage) *WebsocketMessage {
|
||
|
return &WebsocketMessage{
|
||
|
Type: dataType,
|
||
|
Payload: payload,
|
||
|
}
|
||
|
}
|