William Petit
32f04af138
All checks were successful
arcad/edge/pipeline/head This commit looks good
ref #20
261 lines
6.0 KiB
Go
261 lines
6.0 KiB
Go
package rpc
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
|
|
"forge.cadoles.com/arcad/edge/pkg/app"
|
|
"forge.cadoles.com/arcad/edge/pkg/bus"
|
|
edgehttp "forge.cadoles.com/arcad/edge/pkg/http"
|
|
"forge.cadoles.com/arcad/edge/pkg/module/util"
|
|
"github.com/dop251/goja"
|
|
"github.com/pkg/errors"
|
|
"gitlab.com/wpetit/goweb/logger"
|
|
)
|
|
|
|
type Module struct {
|
|
server *app.Server
|
|
bus bus.Bus
|
|
callbacks sync.Map
|
|
}
|
|
|
|
func (m *Module) Name() string {
|
|
return "rpc"
|
|
}
|
|
|
|
func (m *Module) Export(export *goja.Object) {
|
|
if err := export.Set("register", m.register); err != nil {
|
|
panic(errors.Wrap(err, "could not set 'register' function"))
|
|
}
|
|
|
|
if err := export.Set("unregister", m.unregister); err != nil {
|
|
panic(errors.Wrap(err, "could not set 'unregister' function"))
|
|
}
|
|
}
|
|
|
|
func (m *Module) OnInit(ctx context.Context, rt *goja.Runtime) error {
|
|
requestErrs := m.bus.Reply(ctx, Address, m.handleRequest)
|
|
go func() {
|
|
for err := range requestErrs {
|
|
logger.Error(ctx, "error while replying to rpc requests", logger.CapturedE(errors.WithStack(err)))
|
|
}
|
|
}()
|
|
|
|
httpIncomingMessages, err := m.bus.Subscribe(ctx, edgehttp.AddressIncomingMessage)
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
go m.handleIncomingHTTPMessages(ctx, httpIncomingMessages)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *Module) register(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
|
|
fnName := util.AssertString(call.Argument(0), rt)
|
|
|
|
var (
|
|
callable goja.Callable
|
|
ok bool
|
|
)
|
|
|
|
if len(call.Arguments) > 1 {
|
|
callable, ok = goja.AssertFunction(call.Argument(1))
|
|
} else {
|
|
callable, ok = goja.AssertFunction(rt.Get(fnName))
|
|
}
|
|
|
|
if !ok {
|
|
panic(rt.NewTypeError("method should be a valid function"))
|
|
}
|
|
|
|
ctx := context.Background()
|
|
|
|
logger.Debug(ctx, "registering method", logger.F("method", fnName))
|
|
|
|
m.callbacks.Store(fnName, callable)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *Module) unregister(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
|
|
fnName := util.AssertString(call.Argument(0), rt)
|
|
|
|
m.callbacks.Delete(fnName)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *Module) handleRequest(env bus.Envelope) (any, error) {
|
|
request, ok := env.Message().(*Request)
|
|
if !ok {
|
|
logger.Warn(context.Background(), "unexpected bus message", logger.F("message", env.Message()))
|
|
|
|
return nil, errors.WithStack(bus.ErrUnexpectedMessage)
|
|
}
|
|
|
|
ctx := logger.With(request.Context, logger.F("request", request))
|
|
|
|
logger.Debug(ctx, "received rpc request")
|
|
|
|
rawCallable, exists := m.callbacks.Load(request.Method)
|
|
if !exists {
|
|
logger.Debug(ctx, "method not found")
|
|
|
|
return nil, errors.WithStack(ErrMethodNotFound)
|
|
}
|
|
|
|
callable, ok := rawCallable.(goja.Callable)
|
|
if !ok {
|
|
logger.Debug(ctx, "invalid method")
|
|
|
|
return nil, errors.WithStack(ErrMethodNotFound)
|
|
}
|
|
|
|
result, err := m.server.Exec(ctx, callable, request.Context, request.Params)
|
|
if err != nil {
|
|
logger.Error(
|
|
ctx, "rpc call error",
|
|
logger.CapturedE(errors.WithStack(err)),
|
|
)
|
|
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func (m *Module) handleIncomingHTTPMessages(ctx context.Context, incoming <-chan bus.Envelope) {
|
|
defer func() {
|
|
m.bus.Unsubscribe(edgehttp.AddressIncomingMessage, incoming)
|
|
}()
|
|
|
|
for env := range incoming {
|
|
msg, ok := env.Message().(*edgehttp.IncomingMessage)
|
|
if !ok {
|
|
logger.Error(ctx, "unexpected incoming http message type", logger.F("message", env.Message()))
|
|
continue
|
|
}
|
|
|
|
jsonReq, ok := m.isRPCRequest(msg.Payload)
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
sessionID, ok := edgehttp.ContextSessionID(msg.Context)
|
|
if !ok {
|
|
logger.Error(ctx, "could not find session id in context")
|
|
continue
|
|
}
|
|
|
|
request := NewRequestEnvelope(msg.Context, jsonReq.Method, jsonReq.Params)
|
|
|
|
requestCtx := logger.With(msg.Context, logger.F("rpcRequestMethod", jsonReq.Method), logger.F("rpcRequestID", jsonReq.ID))
|
|
|
|
reply, err := m.bus.Request(requestCtx, request)
|
|
if err != nil {
|
|
err = errors.WithStack(err)
|
|
|
|
logger.Error(
|
|
ctx, "could not execute rpc request",
|
|
logger.CapturedE(err),
|
|
)
|
|
|
|
if errors.Is(err, ErrMethodNotFound) {
|
|
if err := m.sendMethodNotFoundResponse(sessionID, jsonReq.ID); err != nil {
|
|
logger.Error(
|
|
ctx, "could not send json rpc error response",
|
|
logger.CapturedE(errors.WithStack(err)),
|
|
)
|
|
}
|
|
|
|
continue
|
|
}
|
|
|
|
if err := m.sendErrorResponse(sessionID, jsonReq.ID, err); err != nil {
|
|
logger.Error(
|
|
ctx, "could not send json rpc error response",
|
|
logger.CapturedE(errors.WithStack(err)),
|
|
)
|
|
}
|
|
|
|
continue
|
|
}
|
|
|
|
if err := m.sendResponse(sessionID, jsonReq.ID, reply.Message(), nil); err != nil {
|
|
logger.Error(
|
|
ctx, "could not send json rpc result response",
|
|
logger.CapturedE(err),
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Module) sendErrorResponse(sessionID string, requestID any, err error) error {
|
|
return m.sendResponse(sessionID, requestID, nil, &JSONRPCError{
|
|
Code: -32603,
|
|
Message: err.Error(),
|
|
})
|
|
}
|
|
|
|
func (m *Module) sendMethodNotFoundResponse(sessionID string, requestID any) error {
|
|
return m.sendResponse(sessionID, requestID, nil, &JSONRPCError{
|
|
Code: -32601,
|
|
Message: "method not found",
|
|
})
|
|
}
|
|
|
|
func (m *Module) sendResponse(sessionID string, requestID any, result any, err error) error {
|
|
env := edgehttp.NewOutgoingMessageEnvelope(sessionID, map[string]interface{}{
|
|
"jsonrpc": "2.0",
|
|
"id": requestID,
|
|
"error": err,
|
|
"result": result,
|
|
})
|
|
|
|
if err := m.bus.Publish(env); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *Module) isRPCRequest(payload map[string]any) (*JSONRPCRequest, bool) {
|
|
jsonRPC, exists := payload["jsonrpc"]
|
|
if !exists || jsonRPC != "2.0" {
|
|
return nil, false
|
|
}
|
|
|
|
rawMethod, exists := payload["method"]
|
|
if !exists {
|
|
return nil, false
|
|
}
|
|
|
|
method, ok := rawMethod.(string)
|
|
if !ok {
|
|
return nil, false
|
|
}
|
|
|
|
id := payload["id"]
|
|
params := payload["params"]
|
|
|
|
return &JSONRPCRequest{
|
|
ID: id,
|
|
Method: method,
|
|
Params: params,
|
|
}, true
|
|
}
|
|
|
|
func ModuleFactory(bus bus.Bus) app.ServerModuleFactory {
|
|
return func(server *app.Server) app.ServerModule {
|
|
mod := &Module{
|
|
server: server,
|
|
bus: bus,
|
|
}
|
|
|
|
return mod
|
|
}
|
|
}
|
|
|
|
var _ app.InitializableModule = &Module{}
|