edge/pkg/module/net/module.go
William Petit 32f04af138
All checks were successful
arcad/edge/pipeline/head This commit looks good
feat(storage): improve caching in cache driver
ref #20
2023-11-30 19:09:51 +01:00

156 lines
3.1 KiB
Go

package net
import (
"context"
"forge.cadoles.com/arcad/edge/pkg/app"
"forge.cadoles.com/arcad/edge/pkg/bus"
edgehttp "forge.cadoles.com/arcad/edge/pkg/http"
"forge.cadoles.com/arcad/edge/pkg/module/util"
"github.com/dop251/goja"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
type Module struct {
server *app.Server
bus bus.Bus
}
func (m *Module) Name() string {
return "net"
}
func (m *Module) Export(export *goja.Object) {
if err := export.Set("broadcast", m.broadcast); err != nil {
panic(errors.Wrap(err, "could not set 'broadcast' function"))
}
if err := export.Set("send", m.send); err != nil {
panic(errors.Wrap(err, "could not set 'send' function"))
}
}
func (m *Module) broadcast(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
if len(call.Arguments) < 1 {
panic(rt.ToValue(errors.New("invalid number of argument")))
}
data := call.Argument(0).Export()
env := edgehttp.NewOutgoingMessageEnvelope("", data)
if err := m.bus.Publish(env); err != nil {
panic(rt.ToValue(errors.WithStack(err)))
}
return nil
}
func (m *Module) send(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
if len(call.Arguments) < 2 {
panic(rt.ToValue(errors.New("invalid number of argument")))
}
firstArg := call.Argument(0)
sessionID, ok := firstArg.Export().(string)
if !ok {
ctx := util.AssertContext(firstArg, rt)
sessionID, ok = edgehttp.ContextSessionID(ctx)
if !ok {
panic(rt.ToValue(errors.New("could not find session id in context")))
}
}
data := call.Argument(1).Export()
env := edgehttp.NewOutgoingMessageEnvelope(sessionID, data)
if err := m.bus.Publish(env); err != nil {
panic(rt.ToValue(errors.WithStack(err)))
}
return nil
}
func (m *Module) handleIncomingMessages() {
ctx := context.Background()
logger.Debug(
ctx,
"subscribing to bus envelopes",
)
envelopes, err := m.bus.Subscribe(ctx, edgehttp.AddressIncomingMessage)
if err != nil {
panic(errors.WithStack(err))
}
defer func() {
logger.Debug(
ctx,
"unsubscribing from bus envelopes",
)
m.bus.Unsubscribe(edgehttp.AddressIncomingMessage, envelopes)
}()
for {
logger.Debug(
ctx,
"waiting for next envelope",
)
select {
case <-ctx.Done():
logger.Debug(
ctx,
"context done",
)
return
case env := <-envelopes:
incomingMessage, ok := env.Message().(*edgehttp.IncomingMessage)
if !ok {
logger.Warn(
ctx,
"unexpected message type",
logger.F("message", env.Message()),
)
continue
}
logger.Debug(
ctx,
"received incoming message",
logger.F("message", incomingMessage),
)
if _, err := m.server.ExecFuncByName(incomingMessage.Context, "onClientMessage", incomingMessage.Context, incomingMessage.Payload); err != nil {
if errors.Is(err, app.ErrFuncDoesNotExist) {
continue
}
logger.Error(
ctx,
"on client message error",
logger.CapturedE(errors.WithStack(err)),
)
}
}
}
}
func ModuleFactory(bus bus.Bus) app.ServerModuleFactory {
return func(server *app.Server) app.ServerModule {
module := &Module{
server: server,
bus: bus,
}
go module.handleIncomingMessages()
return module
}
}