Compare commits

..

No commits in common. "870db072e0431ebdc4074fa3ddf54badfe81550c" and "f4a7366aad19e327583f8ed4ea7abdfbcd522ea5" have entirely different histories.

50 changed files with 1339 additions and 1624 deletions

View File

@ -23,11 +23,10 @@ import (
authModule "forge.cadoles.com/arcad/edge/pkg/module/auth" authModule "forge.cadoles.com/arcad/edge/pkg/module/auth"
authHTTP "forge.cadoles.com/arcad/edge/pkg/module/auth/http" authHTTP "forge.cadoles.com/arcad/edge/pkg/module/auth/http"
authModuleMiddleware "forge.cadoles.com/arcad/edge/pkg/module/auth/middleware" authModuleMiddleware "forge.cadoles.com/arcad/edge/pkg/module/auth/middleware"
blobModule "forge.cadoles.com/arcad/edge/pkg/module/blob" "forge.cadoles.com/arcad/edge/pkg/module/blob"
castModule "forge.cadoles.com/arcad/edge/pkg/module/cast" "forge.cadoles.com/arcad/edge/pkg/module/cast"
fetchModule "forge.cadoles.com/arcad/edge/pkg/module/fetch" "forge.cadoles.com/arcad/edge/pkg/module/fetch"
netModule "forge.cadoles.com/arcad/edge/pkg/module/net" netModule "forge.cadoles.com/arcad/edge/pkg/module/net"
rpcModule "forge.cadoles.com/arcad/edge/pkg/module/rpc"
shareModule "forge.cadoles.com/arcad/edge/pkg/module/share" shareModule "forge.cadoles.com/arcad/edge/pkg/module/share"
"forge.cadoles.com/arcad/edge/pkg/storage" "forge.cadoles.com/arcad/edge/pkg/storage"
"gitlab.com/wpetit/goweb/logger" "gitlab.com/wpetit/goweb/logger"
@ -107,11 +106,6 @@ func RunCommand() *cli.Command {
Usage: "use `FILE` as local accounts", Usage: "use `FILE` as local accounts",
Value: ".edge/%APPID%/accounts.json", Value: ".edge/%APPID%/accounts.json",
}, },
&cli.IntFlag{
Name: "max-upload-size",
Usage: "use `MAX-UPLOAD-SIZE` as blob max upload size",
Value: 10 << (10 * 2), // 10Mb
},
}, },
Action: func(ctx *cli.Context) error { Action: func(ctx *cli.Context) error {
address := ctx.String("address") address := ctx.String("address")
@ -123,7 +117,6 @@ func RunCommand() *cli.Command {
documentstoreDSN := ctx.String("documentstore-dsn") documentstoreDSN := ctx.String("documentstore-dsn")
shareStoreDSN := ctx.String("sharestore-dsn") shareStoreDSN := ctx.String("sharestore-dsn")
accountsFile := ctx.String("accounts-file") accountsFile := ctx.String("accounts-file")
maxUploadSize := ctx.Int("max-upload-size")
logger.SetFormat(logger.Format(logFormat)) logger.SetFormat(logger.Format(logFormat))
logger.SetLevel(logger.Level(logLevel)) logger.SetLevel(logger.Level(logLevel))
@ -169,7 +162,7 @@ func RunCommand() *cli.Command {
appCtx := logger.With(cmdCtx, logger.F("address", address)) appCtx := logger.With(cmdCtx, logger.F("address", address))
if err := runApp(appCtx, path, address, documentstoreDSN, blobstoreDSN, shareStoreDSN, accountsFile, appsRepository, maxUploadSize); err != nil { if err := runApp(appCtx, path, address, documentstoreDSN, blobstoreDSN, shareStoreDSN, accountsFile, appsRepository); err != nil {
logger.Error(appCtx, "could not run app", logger.CapturedE(errors.WithStack(err))) logger.Error(appCtx, "could not run app", logger.CapturedE(errors.WithStack(err)))
} }
}(p, port, idx) }(p, port, idx)
@ -182,7 +175,7 @@ func RunCommand() *cli.Command {
} }
} }
func runApp(ctx context.Context, path, address, documentStoreDSN, blobStoreDSN, shareStoreDSN, accountsFile string, appRepository appModule.Repository, maxUploadSize int) error { func runApp(ctx context.Context, path, address, documentStoreDSN, blobStoreDSN, shareStoreDSN, accountsFile string, appRepository appModule.Repository) error {
absPath, err := filepath.Abs(path) absPath, err := filepath.Abs(path)
if err != nil { if err != nil {
return errors.Wrapf(err, "could not resolve path '%s'", path) return errors.Wrapf(err, "could not resolve path '%s'", path)
@ -243,8 +236,6 @@ func runApp(ctx context.Context, path, address, documentStoreDSN, blobStoreDSN,
return jwtutil.NewSymmetricKeySet(dummySecret) return jwtutil.NewSymmetricKeySet(dummySecret)
}), }),
), ),
blobModule.Mount(maxUploadSize), // 10Mb,
fetchModule.Mount(),
), ),
appHTTP.WithHTTPMiddlewares( appHTTP.WithHTTPMiddlewares(
authModuleMiddleware.AnonymousUser(key, jwa.HS256), authModuleMiddleware.AnonymousUser(key, jwa.HS256),
@ -287,18 +278,18 @@ func getServerModules(deps *moduleDeps) []app.ServerModuleFactory {
module.LifecycleModuleFactory(), module.LifecycleModuleFactory(),
module.ContextModuleFactory(), module.ContextModuleFactory(),
module.ConsoleModuleFactory(), module.ConsoleModuleFactory(),
castModule.CastModuleFactory(), cast.CastModuleFactory(),
netModule.ModuleFactory(deps.Bus), netModule.ModuleFactory(deps.Bus),
rpcModule.ModuleFactory(deps.Bus), module.RPCModuleFactory(deps.Bus),
module.StoreModuleFactory(deps.DocumentStore), module.StoreModuleFactory(deps.DocumentStore),
blobModule.ModuleFactory(deps.Bus, deps.BlobStore), blob.ModuleFactory(deps.Bus, deps.BlobStore),
authModule.ModuleFactory( authModule.ModuleFactory(
authModule.WithJWT(func() (jwk.Set, error) { authModule.WithJWT(func() (jwk.Set, error) {
return jwtutil.NewSymmetricKeySet(dummySecret) return jwtutil.NewSymmetricKeySet(dummySecret)
}), }),
), ),
appModule.ModuleFactory(deps.AppRepository), appModule.ModuleFactory(deps.AppRepository),
fetchModule.ModuleFactory(deps.Bus), fetch.ModuleFactory(deps.Bus),
shareModule.ModuleFactory(deps.AppID, deps.ShareStore), shareModule.ModuleFactory(deps.AppID, deps.ShareStore),
} }
} }

1
go.mod
View File

@ -29,7 +29,6 @@ require (
github.com/lestrrat-go/iter v1.0.2 // indirect github.com/lestrrat-go/iter v1.0.2 // indirect
github.com/lestrrat-go/option v1.0.0 // indirect github.com/lestrrat-go/option v1.0.0 // indirect
github.com/miekg/dns v1.1.53 // indirect github.com/miekg/dns v1.1.53 // indirect
go.uber.org/goleak v1.3.0 // indirect
golang.org/x/sync v0.1.0 // indirect golang.org/x/sync v0.1.0 // indirect
google.golang.org/genproto v0.0.0-20210226172003-ab064af71705 // indirect google.golang.org/genproto v0.0.0-20210226172003-ab064af71705 // indirect
google.golang.org/grpc v1.35.0 // indirect google.golang.org/grpc v1.35.0 // indirect

2
go.sum
View File

@ -326,8 +326,6 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.5 h1:dntmOdLpSpHlVqbW5Eay97DelsZHe+55D+xC6i0dDS0= go.opencensus.io v0.22.5 h1:dntmOdLpSpHlVqbW5Eay97DelsZHe+55D+xC6i0dDS0=
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=

View File

@ -47,10 +47,6 @@ func NewPromiseProxyFrom(rt *goja.Runtime) *PromiseProxy {
} }
func IsPromise(v goja.Value) (*goja.Promise, bool) { func IsPromise(v goja.Value) (*goja.Promise, bool) {
if v == nil {
return nil, false
}
promise, ok := v.Export().(*goja.Promise) promise, ok := v.Export().(*goja.Promise)
return promise, ok return promise, ok
} }

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"math/rand" "math/rand"
"sync" "sync"
"time"
"github.com/dop251/goja" "github.com/dop251/goja"
"github.com/dop251/goja_nodejs/eventloop" "github.com/dop251/goja_nodejs/eventloop"
@ -23,7 +22,23 @@ type Server struct {
modules []ServerModule modules []ServerModule
} }
func (s *Server) ExecFuncByName(ctx context.Context, funcName string, args ...interface{}) (any, error) { func (s *Server) Load(name string, src string) error {
var err error
s.loop.RunOnLoop(func(rt *goja.Runtime) {
_, err = rt.RunScript(name, src)
if err != nil {
err = errors.Wrap(err, "could not run js script")
}
})
if err != nil {
return errors.WithStack(err)
}
return nil
}
func (s *Server) ExecFuncByName(ctx context.Context, funcName string, args ...interface{}) (goja.Value, error) {
ctx = logger.With(ctx, logger.F("function", funcName), logger.F("args", args)) ctx = logger.With(ctx, logger.F("function", funcName), logger.F("args", args))
ret, err := s.Exec(ctx, funcName, args...) ret, err := s.Exec(ctx, funcName, args...)
@ -34,23 +49,16 @@ func (s *Server) ExecFuncByName(ctx context.Context, funcName string, args ...in
return ret, nil return ret, nil
} }
func (s *Server) Exec(ctx context.Context, callableOrFuncname any, args ...interface{}) (any, error) { func (s *Server) Exec(ctx context.Context, callableOrFuncname any, args ...interface{}) (goja.Value, error) {
type result struct { var (
wg sync.WaitGroup
value goja.Value value goja.Value
err error err error
} )
done := make(chan result) wg.Add(1)
defer func() {
// Drain done channel
for range done {
}
}()
s.loop.RunOnLoop(func(rt *goja.Runtime) { s.loop.RunOnLoop(func(rt *goja.Runtime) {
defer close(done)
var callable goja.Callable var callable goja.Callable
switch typ := callableOrFuncname.(type) { switch typ := callableOrFuncname.(type) {
case goja.Callable: case goja.Callable:
@ -59,9 +67,7 @@ func (s *Server) Exec(ctx context.Context, callableOrFuncname any, args ...inter
case string: case string:
call, ok := goja.AssertFunction(rt.Get(typ)) call, ok := goja.AssertFunction(rt.Get(typ))
if !ok { if !ok {
done <- result{ err = errors.WithStack(ErrFuncDoesNotExist)
err: errors.WithStack(ErrFuncDoesNotExist),
}
return return
} }
@ -69,27 +75,28 @@ func (s *Server) Exec(ctx context.Context, callableOrFuncname any, args ...inter
callable = call callable = call
default: default:
done <- result{ err = errors.Errorf("callableOrFuncname: expected callable or function name, got '%T'", callableOrFuncname)
err: errors.Errorf("callableOrFuncname: expected callable or function name, got '%T'", callableOrFuncname),
}
return return
} }
logger.Debug(ctx, "executing callable")
defer wg.Done()
defer func() { defer func() {
recovered := recover() if recovered := recover(); recovered != nil {
if recovered == nil { revoveredErr, ok := recovered.(error)
if ok {
logger.Error(ctx, "recovered runtime error", logger.CapturedE(errors.WithStack(revoveredErr)))
err = errors.WithStack(ErrUnknownError)
return return
} }
recoveredErr, ok := recovered.(error)
if !ok {
panic(recovered) panic(recovered)
} }
done <- result{
err: recoveredErr,
}
}() }()
jsArgs := make([]goja.Value, 0, len(args)) jsArgs := make([]goja.Value, 0, len(args))
@ -97,49 +104,22 @@ func (s *Server) Exec(ctx context.Context, callableOrFuncname any, args ...inter
jsArgs = append(jsArgs, rt.ToValue(a)) jsArgs = append(jsArgs, rt.ToValue(a))
} }
logger.Debug(ctx, "executing callable", logger.F("callable", callableOrFuncname)) value, err = callable(nil, jsArgs...)
start := time.Now()
value, err := callable(nil, jsArgs...)
if err != nil { if err != nil {
done <- result{ err = errors.WithStack(err)
err: errors.WithStack(err),
} }
return
}
done <- result{
value: value,
}
logger.Debug(ctx, "executed callable", logger.F("callable", callableOrFuncname), logger.F("duration", time.Since(start).String()))
}) })
select { wg.Wait()
case <-ctx.Done():
if err := ctx.Err(); err != nil { if err != nil {
return nil, errors.WithStack(err) return nil, errors.WithStack(err)
} }
return nil, nil return value, nil
case result := <-done:
if result.err != nil {
return nil, errors.WithStack(result.err)
}
value := result.value
if promise, ok := IsPromise(value); ok {
value = s.waitForPromise(promise)
}
return value.Export(), nil
}
} }
func (s *Server) waitForPromise(promise *goja.Promise) goja.Value { func (s *Server) WaitForPromise(promise *goja.Promise) goja.Value {
var ( var (
wg sync.WaitGroup wg sync.WaitGroup
value goja.Value value goja.Value
@ -182,40 +162,20 @@ func (s *Server) waitForPromise(promise *goja.Promise) goja.Value {
return value return value
} }
func (s *Server) Start(ctx context.Context, name string, src string) error { func (s *Server) Start(ctx context.Context) error {
s.loop.Start() s.loop.Start()
done := make(chan error) var err error
s.loop.RunOnLoop(func(rt *goja.Runtime) { s.loop.RunOnLoop(func(rt *goja.Runtime) {
defer close(done)
rt.SetFieldNameMapper(goja.TagFieldNameMapper("goja", true)) rt.SetFieldNameMapper(goja.TagFieldNameMapper("goja", true))
rt.SetRandSource(createRandomSource()) rt.SetRandSource(createRandomSource())
if err := s.loadModules(ctx, rt); err != nil { if err = s.initModules(ctx, rt); err != nil {
err = errors.WithStack(err) err = errors.WithStack(err)
done <- err
return
} }
if _, err := rt.RunScript(name, src); err != nil {
done <- errors.Wrap(err, "could not run js script")
return
}
if err := s.initModules(ctx, rt); err != nil {
err = errors.WithStack(err)
done <- err
return
}
done <- nil
}) })
if err != nil {
if err := <-done; err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }
@ -226,7 +186,7 @@ func (s *Server) Stop() {
s.loop.Stop() s.loop.Stop()
} }
func (s *Server) loadModules(ctx context.Context, rt *goja.Runtime) error { func (s *Server) initModules(ctx context.Context, rt *goja.Runtime) error {
modules := make([]ServerModule, 0, len(s.factories)) modules := make([]ServerModule, 0, len(s.factories))
for _, moduleFactory := range s.factories { for _, moduleFactory := range s.factories {
@ -240,13 +200,7 @@ func (s *Server) loadModules(ctx context.Context, rt *goja.Runtime) error {
modules = append(modules, mod) modules = append(modules, mod)
} }
s.modules = modules for _, mod := range modules {
return nil
}
func (s *Server) initModules(ctx context.Context, rt *goja.Runtime) error {
for _, mod := range s.modules {
initMod, ok := mod.(InitializableModule) initMod, ok := mod.(InitializableModule)
if !ok { if !ok {
continue continue
@ -259,6 +213,8 @@ func (s *Server) initModules(ctx context.Context, rt *goja.Runtime) error {
} }
} }
s.modules = modules
return nil return nil
} }

View File

@ -3,11 +3,11 @@ package bus
import "context" import "context"
type Bus interface { type Bus interface {
Subscribe(ctx context.Context, addr Address) (<-chan Envelope, error) Subscribe(ctx context.Context, ns MessageNamespace) (<-chan Message, error)
Unsubscribe(addr Address, ch <-chan Envelope) Unsubscribe(ctx context.Context, ns MessageNamespace, ch <-chan Message)
Publish(env Envelope) error Publish(ctx context.Context, msg Message) error
Request(ctx context.Context, env Envelope) (Envelope, error) Request(ctx context.Context, msg Message) (Message, error)
Reply(ctx context.Context, addr Address, h RequestHandler) chan error Reply(ctx context.Context, ns MessageNamespace, h RequestHandler) error
} }
type RequestHandler func(env Envelope) (any, error) type RequestHandler func(msg Message) (Message, error)

View File

@ -1,32 +0,0 @@
package bus
type Address string
type Envelope interface {
Message() any
Address() Address
}
type BaseEnvelope struct {
msg any
addr Address
}
// Address implements Envelope.
func (e *BaseEnvelope) Address() Address {
return e.addr
}
// Message implements Envelope.
func (e *BaseEnvelope) Message() any {
return e.msg
}
func NewEnvelope(addr Address, msg any) *BaseEnvelope {
return &BaseEnvelope{
addr: addr,
msg: msg,
}
}
var _ Envelope = &BaseEnvelope{}

View File

@ -15,13 +15,13 @@ type Bus struct {
nextRequestID uint64 nextRequestID uint64
} }
func (b *Bus) Subscribe(ctx context.Context, address bus.Address) (<-chan bus.Envelope, error) { func (b *Bus) Subscribe(ctx context.Context, ns bus.MessageNamespace) (<-chan bus.Message, error) {
logger.Debug( logger.Debug(
ctx, "subscribing", ctx, "subscribing to messages",
logger.F("address", address), logger.F("messageNamespace", ns),
) )
dispatchers := b.getDispatchers(address) dispatchers := b.getDispatchers(ns)
disp := newEventDispatcher(b.opt.BufferSize) disp := newEventDispatcher(b.opt.BufferSize)
go disp.Run(ctx) go disp.Run(ctx)
@ -31,41 +31,50 @@ func (b *Bus) Subscribe(ctx context.Context, address bus.Address) (<-chan bus.En
return disp.Out(), nil return disp.Out(), nil
} }
func (b *Bus) Unsubscribe(address bus.Address, ch <-chan bus.Envelope) { func (b *Bus) Unsubscribe(ctx context.Context, ns bus.MessageNamespace, ch <-chan bus.Message) {
logger.Debug( logger.Debug(
context.Background(), "unsubscribing", ctx, "unsubscribing from messages",
logger.F("address", address), logger.F("messageNamespace", ns),
) )
dispatchers := b.getDispatchers(address) dispatchers := b.getDispatchers(ns)
dispatchers.RemoveByOutChannel(ch) dispatchers.RemoveByOutChannel(ch)
} }
func (b *Bus) Publish(env bus.Envelope) error { func (b *Bus) Publish(ctx context.Context, msg bus.Message) error {
dispatchers := b.getDispatchers(env.Address()) dispatchers := b.getDispatchers(msg.MessageNamespace())
dispatchersList := dispatchers.List()
logger.Debug( logger.Debug(
context.Background(), "publish", ctx, "publishing message",
logger.F("address", env.Address()), logger.F("dispatchers", len(dispatchersList)),
logger.F("messageNamespace", msg.MessageNamespace()),
) )
dispatchers.Range(func(d *eventDispatcher) { for _, d := range dispatchersList {
if err := d.In(env); err != nil { if d.Closed() {
logger.Error(context.Background(), "could not publish message", logger.CapturedE(errors.WithStack(err))) dispatchers.Remove(d)
continue
}
if err := d.In(msg); err != nil {
return errors.WithStack(err)
}
} }
})
return nil return nil
} }
func (b *Bus) getDispatchers(address bus.Address) *eventDispatcherSet { func (b *Bus) getDispatchers(namespace bus.MessageNamespace) *eventDispatcherSet {
rawAddress := string(address) strNamespace := string(namespace)
rawDispatchers, exists := b.dispatchers.Get(rawAddress) rawDispatchers, exists := b.dispatchers.Get(strNamespace)
dispatchers, ok := rawDispatchers.(*eventDispatcherSet) dispatchers, ok := rawDispatchers.(*eventDispatcherSet)
if !exists || !ok { if !exists || !ok {
dispatchers = newEventDispatcherSet() dispatchers = newEventDispatcherSet()
b.dispatchers.Set(rawAddress, dispatchers) b.dispatchers.Set(strNamespace, dispatchers)
} }
return dispatchers return dispatchers

View File

@ -4,23 +4,13 @@ import (
"testing" "testing"
busTesting "forge.cadoles.com/arcad/edge/pkg/bus/testing" busTesting "forge.cadoles.com/arcad/edge/pkg/bus/testing"
"gitlab.com/wpetit/goweb/logger"
"go.uber.org/goleak"
) )
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
func TestMemoryBus(t *testing.T) { func TestMemoryBus(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("Test disabled when -short flag is set") t.Skip("Test disabled when -short flag is set")
} }
if testing.Verbose() {
logger.SetLevel(logger.LevelDebug)
}
t.Parallel() t.Parallel()
t.Run("PublishSubscribe", func(t *testing.T) { t.Run("PublishSubscribe", func(t *testing.T) {
@ -36,11 +26,4 @@ func TestMemoryBus(t *testing.T) {
b := NewBus() b := NewBus()
busTesting.TestRequestReply(t, b) busTesting.TestRequestReply(t, b)
}) })
t.Run("CanceledRequestReply", func(t *testing.T) {
t.Parallel()
b := NewBus()
busTesting.TestCanceledRequest(t, b)
})
} }

View File

@ -3,6 +3,7 @@ package memory
import ( import (
"context" "context"
"sync" "sync"
"time"
"forge.cadoles.com/arcad/edge/pkg/bus" "forge.cadoles.com/arcad/edge/pkg/bus"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -29,7 +30,7 @@ func (s *eventDispatcherSet) Remove(d *eventDispatcher) {
delete(s.items, d) delete(s.items, d)
} }
func (s *eventDispatcherSet) RemoveByOutChannel(out <-chan bus.Envelope) { func (s *eventDispatcherSet) RemoveByOutChannel(out <-chan bus.Message) {
s.mutex.Lock() s.mutex.Lock()
defer s.mutex.Unlock() defer s.mutex.Unlock()
@ -41,18 +42,17 @@ func (s *eventDispatcherSet) RemoveByOutChannel(out <-chan bus.Envelope) {
} }
} }
func (s *eventDispatcherSet) Range(fn func(d *eventDispatcher)) { func (s *eventDispatcherSet) List() []*eventDispatcher {
s.mutex.Lock() s.mutex.Lock()
defer s.mutex.Unlock() defer s.mutex.Unlock()
dispatchers := make([]*eventDispatcher, 0, len(s.items))
for d := range s.items { for d := range s.items {
if d.Closed() { dispatchers = append(dispatchers, d)
s.Remove(d)
continue
} }
fn(d) return dispatchers
}
} }
func newEventDispatcherSet() *eventDispatcherSet { func newEventDispatcherSet() *eventDispatcherSet {
@ -62,8 +62,8 @@ func newEventDispatcherSet() *eventDispatcherSet {
} }
type eventDispatcher struct { type eventDispatcher struct {
in chan bus.Envelope in chan bus.Message
out chan bus.Envelope out chan bus.Message
mutex sync.RWMutex mutex sync.RWMutex
closed bool closed bool
} }
@ -91,7 +91,7 @@ func (d *eventDispatcher) close() {
d.closed = true d.closed = true
} }
func (d *eventDispatcher) In(msg bus.Envelope) (err error) { func (d *eventDispatcher) In(msg bus.Message) (err error) {
d.mutex.RLock() d.mutex.RLock()
defer d.mutex.RUnlock() defer d.mutex.RUnlock()
@ -104,52 +104,67 @@ func (d *eventDispatcher) In(msg bus.Envelope) (err error) {
return nil return nil
} }
func (d *eventDispatcher) Out() <-chan bus.Envelope { func (d *eventDispatcher) Out() <-chan bus.Message {
return d.out return d.out
} }
func (d *eventDispatcher) IsOut(out <-chan bus.Envelope) bool { func (d *eventDispatcher) IsOut(out <-chan bus.Message) bool {
return d.out == out return d.out == out
} }
func (d *eventDispatcher) Run(ctx context.Context) { func (d *eventDispatcher) Run(ctx context.Context) {
defer func() { defer func() {
for {
logger.Debug(ctx, "closing dispatcher, flushing out incoming messages") logger.Debug(ctx, "closing dispatcher, flushing out incoming messages")
close(d.out) close(d.out)
for range d.in {
// Flush all incoming messages // Flush all incoming messages
for {
_, ok := <-d.in
if !ok {
return
}
}
} }
}() }()
for { for {
select { msg, ok := <-d.in
case <-ctx.Done():
if err := ctx.Err(); !errors.Is(err, context.Canceled) {
logger.Error(
ctx,
"message subscription context canceled",
logger.CapturedE(errors.WithStack(err)),
)
}
return
case msg, ok := <-d.in:
if !ok { if !ok {
return return
} }
d.out <- msg timeout := time.After(time.Second)
select {
case d.out <- msg:
case <-timeout:
logger.Error(
ctx,
"out message channel timeout",
logger.F("message", msg),
)
return
case <-ctx.Done():
logger.Error(
ctx,
"message subscription context canceled",
logger.F("message", msg),
logger.CapturedE(errors.WithStack(ctx.Err())),
)
return
} }
} }
} }
func newEventDispatcher(bufferSize int64) *eventDispatcher { func newEventDispatcher(bufferSize int64) *eventDispatcher {
return &eventDispatcher{ return &eventDispatcher{
in: make(chan bus.Envelope, bufferSize), in: make(chan bus.Message, bufferSize),
out: make(chan bus.Envelope, bufferSize), out: make(chan bus.Message, bufferSize),
closed: false, closed: false,
} }
} }

View File

@ -11,78 +11,57 @@ import (
) )
const ( const (
AddressRequest bus.Address = "bus/memory/request" MessageNamespaceRequest bus.MessageNamespace = "reqrep/request"
AddressReply bus.Address = "bus/memory/reply" MessageNamespaceReply bus.MessageNamespace = "reqrep/reply"
) )
type RequestEnvelope struct { type RequestMessage struct {
requestID uint64 RequestID uint64
wrapped bus.Envelope
Message bus.Message
ns bus.MessageNamespace
} }
func (e *RequestEnvelope) Address() bus.Address { func (m *RequestMessage) MessageNamespace() bus.MessageNamespace {
return getRequestAddress(e.wrapped.Address()) return m.ns
} }
func (e *RequestEnvelope) Message() any { type ReplyMessage struct {
return e.wrapped.Message() RequestID uint64
Message bus.Message
Error error
ns bus.MessageNamespace
} }
func (e *RequestEnvelope) RequestID() uint64 { func (m *ReplyMessage) MessageNamespace() bus.MessageNamespace {
return e.requestID return m.ns
} }
func (e *RequestEnvelope) Unwrap() bus.Envelope { func (b *Bus) Request(ctx context.Context, msg bus.Message) (bus.Message, error) {
return e.wrapped
}
type ReplyEnvelope struct {
requestID uint64
wrapped bus.Envelope
err error
}
func (e *ReplyEnvelope) Address() bus.Address {
return getReplyAddress(e.wrapped.Address(), e.requestID)
}
func (e *ReplyEnvelope) Message() any {
return e.wrapped.Message()
}
func (e *ReplyEnvelope) Err() error {
return e.err
}
func (e *ReplyEnvelope) Unwrap() bus.Envelope {
return e.wrapped
}
func (b *Bus) Request(ctx context.Context, env bus.Envelope) (bus.Envelope, error) {
requestID := atomic.AddUint64(&b.nextRequestID, 1) requestID := atomic.AddUint64(&b.nextRequestID, 1)
req := &RequestEnvelope{ req := &RequestMessage{
requestID: requestID, RequestID: requestID,
wrapped: env, Message: msg,
ns: msg.MessageNamespace(),
} }
replyAddress := getReplyAddress(env.Address(), requestID) replyNamespace := createReplyNamespace(requestID)
subCtx, cancel := context.WithCancel(ctx) replies, err := b.Subscribe(ctx, replyNamespace)
defer cancel()
replies, err := b.Subscribe(subCtx, replyAddress)
if err != nil { if err != nil {
return nil, errors.WithStack(err) return nil, errors.WithStack(err)
} }
defer func() { defer func() {
b.Unsubscribe(replyAddress, replies) b.Unsubscribe(ctx, replyNamespace, replies)
}() }()
logger.Debug(ctx, "publishing request", logger.F("request", req)) logger.Debug(ctx, "publishing request", logger.F("request", req))
if err := b.Publish(req); err != nil { if err := b.Publish(ctx, req); err != nil {
return nil, errors.WithStack(err) return nil, errors.WithStack(err)
} }
@ -91,93 +70,82 @@ func (b *Bus) Request(ctx context.Context, env bus.Envelope) (bus.Envelope, erro
case <-ctx.Done(): case <-ctx.Done():
return nil, errors.WithStack(ctx.Err()) return nil, errors.WithStack(ctx.Err())
case env, ok := <-replies: case msg, ok := <-replies:
if !ok { if !ok {
return nil, errors.WithStack(bus.ErrNoResponse) return nil, errors.WithStack(bus.ErrNoResponse)
} }
reply, ok := env.(*ReplyEnvelope) reply, ok := msg.(*ReplyMessage)
if !ok { if !ok {
return nil, errors.WithStack(bus.ErrUnexpectedMessage) return nil, errors.WithStack(bus.ErrUnexpectedMessage)
} }
if err := reply.Err(); err != nil { if reply.Error != nil {
return nil, errors.WithStack(err) return nil, errors.WithStack(err)
} }
return reply.Unwrap(), nil return reply.Message, nil
} }
} }
} }
func (b *Bus) Reply(ctx context.Context, address bus.Address, handler bus.RequestHandler) chan error { type RequestHandler func(evt bus.Message) (bus.Message, error)
requestAddress := getRequestAddress(address)
errs := make(chan error) func (b *Bus) Reply(ctx context.Context, msgNamespace bus.MessageNamespace, h bus.RequestHandler) error {
requests, err := b.Subscribe(ctx, msgNamespace)
requests, err := b.Subscribe(ctx, requestAddress)
if err != nil { if err != nil {
go func() { return errors.WithStack(err)
errs <- errors.WithStack(err)
close(errs)
}()
return errs
} }
go func() {
defer func() { defer func() {
b.Unsubscribe(requestAddress, requests) b.Unsubscribe(ctx, msgNamespace, requests)
close(errs)
}() }()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
errs <- errors.WithStack(ctx.Err()) return errors.WithStack(ctx.Err())
return
case env, ok := <-requests: case msg, ok := <-requests:
if !ok { if !ok {
return return nil
} }
request, ok := env.(*RequestEnvelope) request, ok := msg.(*RequestMessage)
if !ok { if !ok {
errs <- errors.WithStack(bus.ErrUnexpectedMessage) return errors.WithStack(bus.ErrUnexpectedMessage)
continue
} }
logger.Debug(ctx, "handling request", logger.F("request", request)) logger.Debug(ctx, "handling request", logger.F("request", request))
msg, err := handler(request.Unwrap()) msg, err := h(request.Message)
reply := &ReplyEnvelope{ reply := &ReplyMessage{
requestID: request.RequestID(), RequestID: request.RequestID,
wrapped: bus.NewEnvelope(request.Unwrap().Address(), msg), Message: nil,
Error: nil,
ns: createReplyNamespace(request.RequestID),
} }
if err != nil { if err != nil {
reply.err = errors.WithStack(err) reply.Error = errors.WithStack(err)
} else {
reply.Message = msg
} }
logger.Debug(ctx, "publishing reply", logger.F("reply", reply)) logger.Debug(ctx, "publishing reply", logger.F("reply", reply))
if err := b.Publish(reply); err != nil { if err := b.Publish(ctx, reply); err != nil {
errs <- errors.WithStack(err) return errors.WithStack(err)
continue
} }
} }
} }
}()
return errs
} }
func getRequestAddress(addr bus.Address) bus.Address { func createReplyNamespace(requestID uint64) bus.MessageNamespace {
return AddressRequest + "/" + addr return bus.NewMessageNamespace(
} MessageNamespaceReply,
bus.MessageNamespace(strconv.FormatUint(requestID, 10)),
func getReplyAddress(addr bus.Address, requestID uint64) bus.Address { )
return AddressReply + "/" + addr + "/" + bus.Address(strconv.FormatUint(requestID, 10))
} }

33
pkg/bus/message.go Normal file
View File

@ -0,0 +1,33 @@
package bus
import (
"strings"
"github.com/pkg/errors"
)
type (
MessageNamespace string
)
type Message interface {
MessageNamespace() MessageNamespace
}
func NewMessageNamespace(namespaces ...MessageNamespace) MessageNamespace {
var sb strings.Builder
for i, ns := range namespaces {
if i != 0 {
if _, err := sb.WriteString(":"); err != nil {
panic(errors.Wrap(err, "could not build new message namespace"))
}
}
if _, err := sb.WriteString(string(ns)); err != nil {
panic(errors.Wrap(err, "could not build new message namespace"))
}
}
return MessageNamespace(sb.String())
}

View File

@ -2,7 +2,6 @@ package testing
import ( import (
"context" "context"
"fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
"testing" "testing"
@ -13,52 +12,74 @@ import (
) )
const ( const (
testAddress bus.Address = "testAddress" testNamespace bus.MessageNamespace = "testNamespace"
) )
type testMessage struct{}
func (e *testMessage) MessageNamespace() bus.MessageNamespace {
return testNamespace
}
func TestPublishSubscribe(t *testing.T, b bus.Bus) { func TestPublishSubscribe(t *testing.T, b bus.Bus) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute) ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel() defer cancel()
t.Log("subscribe") t.Log("subscribe")
envelopes, err := b.Subscribe(ctx, testAddress) messages, err := b.Subscribe(ctx, testNamespace)
if err != nil { if err != nil {
t.Fatal(errors.WithStack(err)) t.Fatal(errors.WithStack(err))
} }
expectedTotal := 5
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(expectedTotal) wg.Add(5)
go func() { go func() {
// 5 events should be received
t.Log("publish 0")
count := expectedTotal if err := b.Publish(ctx, &testMessage{}); err != nil {
for i := 0; i < count; i++ {
env := bus.NewEnvelope(testAddress, fmt.Sprintf("message %d", i))
if err := b.Publish(env); err != nil {
t.Error(errors.WithStack(err)) t.Error(errors.WithStack(err))
} }
t.Logf("published %d", i) t.Log("publish 1")
if err := b.Publish(ctx, &testMessage{}); err != nil {
t.Error(errors.WithStack(err))
}
t.Log("publish 2")
if err := b.Publish(ctx, &testMessage{}); err != nil {
t.Error(errors.WithStack(err))
}
t.Log("publish 3")
if err := b.Publish(ctx, &testMessage{}); err != nil {
t.Error(errors.WithStack(err))
}
t.Log("publish 4")
if err := b.Publish(ctx, &testMessage{}); err != nil {
t.Error(errors.WithStack(err))
} }
}() }()
var count int32 = 0 var count int32 = 0
go func() { go func() {
t.Log("range for received envelopes") t.Log("range for events")
for env := range envelopes { for msg := range messages {
t.Logf("received msg %d", atomic.LoadInt32(&count)) t.Logf("received msg %d", atomic.LoadInt32(&count))
atomic.AddInt32(&count, 1) atomic.AddInt32(&count, 1)
if e, g := testAddress, env.Address(); e != g { if e, g := testNamespace, msg.MessageNamespace(); e != g {
t.Errorf("env.Address(): expected '%v', got '%v'", e, g) t.Errorf("evt.MessageNamespace(): expected '%v', got '%v'", e, g)
} }
wg.Done() wg.Done()
@ -67,9 +88,9 @@ func TestPublishSubscribe(t *testing.T, b bus.Bus) {
wg.Wait() wg.Wait()
b.Unsubscribe(testAddress, envelopes) b.Unsubscribe(ctx, testNamespace, messages)
if e, g := int32(expectedTotal), count; e != g { if e, g := int32(5), count; e != g {
t.Errorf("envelopes received count: expected '%v', got '%v'", e, g) t.Errorf("message received count: expected '%v', got '%v'", e, g)
} }
} }

View File

@ -11,42 +11,58 @@ import (
) )
const ( const (
testTypeReqResAddress bus.Address = "testTypeReqResAddress" testTypeReqRes bus.MessageNamespace = "testNamspaceReqRes"
) )
type testReqResMessage struct {
i int
}
func (m *testReqResMessage) MessageNamespace() bus.MessageNamespace {
return testNamespace
}
func TestRequestReply(t *testing.T, b bus.Bus) { func TestRequestReply(t *testing.T, b bus.Bus) {
expectedRoundTrips := 256 expectedRoundTrips := 256
timeout := time.Now().Add(time.Duration(expectedRoundTrips) * time.Second) timeout := time.Now().Add(time.Duration(expectedRoundTrips) * time.Second)
replyCtx, cancelReply := context.WithDeadline(context.Background(), timeout) var (
defer cancelReply() initWaitGroup sync.WaitGroup
resWaitGroup sync.WaitGroup
)
var resWaitGroup sync.WaitGroup initWaitGroup.Add(1)
replyErrs := b.Reply(replyCtx, testTypeReqResAddress, func(env bus.Envelope) (any, error) { go func() {
repondCtx, cancelRespond := context.WithDeadline(context.Background(), timeout)
defer cancelRespond()
initWaitGroup.Done()
err := b.Reply(repondCtx, testNamespace, func(msg bus.Message) (bus.Message, error) {
defer resWaitGroup.Done() defer resWaitGroup.Done()
req, ok := env.Message().(int) req, ok := msg.(*testReqResMessage)
if !ok { if !ok {
return nil, errors.WithStack(bus.ErrUnexpectedMessage) return nil, errors.WithStack(bus.ErrUnexpectedMessage)
} }
result := &testReqResMessage{req.i}
// Simulate random work // Simulate random work
time.Sleep(time.Millisecond * 100) time.Sleep(time.Millisecond * 100)
t.Logf("[RES] sending res #%d", req) t.Logf("[RES] sending res #%d", req.i)
return req, nil return result, nil
}) })
if err != nil {
go func() { t.Error(err)
for err := range replyErrs {
if !errors.Is(err, context.Canceled) {
t.Errorf("%+v", errors.WithStack(err))
}
} }
}() }()
initWaitGroup.Wait()
var reqWaitGroup sync.WaitGroup var reqWaitGroup sync.WaitGroup
for i := 0; i < expectedRoundTrips; i++ { for i := 0; i < expectedRoundTrips; i++ {
@ -59,30 +75,32 @@ func TestRequestReply(t *testing.T, b bus.Bus) {
requestCtx, cancelRequest := context.WithDeadline(context.Background(), timeout) requestCtx, cancelRequest := context.WithDeadline(context.Background(), timeout)
defer cancelRequest() defer cancelRequest()
req := &testReqResMessage{i}
t.Logf("[REQ] sending req #%d", i) t.Logf("[REQ] sending req #%d", i)
response, err := b.Request(requestCtx, bus.NewEnvelope(testTypeReqResAddress, i)) result, err := b.Request(requestCtx, req)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
t.Logf("[REQ] received req #%d reply", i) t.Logf("[REQ] received req #%d reply", i)
if response == nil { if result == nil {
t.Error("response should not be nil") t.Error("result should not be nil")
return return
} }
result, ok := response.Message().(int) res, ok := result.(*testReqResMessage)
if !ok { if !ok {
t.Error(errors.WithStack(bus.ErrUnexpectedMessage)) t.Error(errors.WithStack(bus.ErrUnexpectedMessage))
return return
} }
if e, g := i, result; e != g { if e, g := req.i, res.i; e != g {
t.Errorf("response.Message(): expected '%v', got '%v'", e, g) t.Errorf("res.i: expected '%v', got '%v'", e, g)
} }
}(i) }(i)
} }
@ -90,77 +108,3 @@ func TestRequestReply(t *testing.T, b bus.Bus) {
reqWaitGroup.Wait() reqWaitGroup.Wait()
resWaitGroup.Wait() resWaitGroup.Wait()
} }
func TestCanceledRequest(t *testing.T, b bus.Bus) {
replyCtx, cancelReply := context.WithCancel(context.Background())
defer cancelReply()
errs := b.Reply(replyCtx, testTypeReqResAddress, func(env bus.Envelope) (any, error) {
return env.Message(), nil
})
go func() {
for err := range errs {
if !errors.Is(err, context.Canceled) {
t.Errorf("%+v", errors.WithStack(err))
}
}
}()
var wg sync.WaitGroup
count := 100
wg.Add(count)
for i := 0; i < count; i++ {
go func(i int) {
defer wg.Done()
t.Logf("calling %d", i)
isCanceled := i%2 == 0
var ctx context.Context
if isCanceled {
canceledCtx, cancel := context.WithCancel(context.Background())
cancel()
ctx = canceledCtx
} else {
ctx = context.Background()
}
t.Logf("publishing envelope #%d", i)
reply, err := b.Request(ctx, bus.NewEnvelope(testTypeReqResAddress, int64(i)))
if err != nil {
if errors.Is(err, context.Canceled) && isCanceled {
return
}
if errors.Is(err, bus.ErrNoResponse) && isCanceled {
return
}
t.Errorf("%+v", errors.WithStack(err))
return
}
result, ok := reply.Message().(int64)
if !ok {
t.Errorf("response.Result: expected type '%T', got '%T'", int64(0), reply.Message())
return
}
if e, g := i, int(result); e != g {
t.Errorf("response.Result: expected '%v', got '%v'", e, g)
return
}
}(i)
}
wg.Wait()
}

282
pkg/http/blob.go Normal file
View File

@ -0,0 +1,282 @@
package http
import (
"encoding/json"
"io"
"io/fs"
"mime/multipart"
"net/http"
"os"
"time"
"forge.cadoles.com/arcad/edge/pkg/bus"
"forge.cadoles.com/arcad/edge/pkg/module"
"forge.cadoles.com/arcad/edge/pkg/module/blob"
"forge.cadoles.com/arcad/edge/pkg/storage"
"github.com/go-chi/chi/v5"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
const (
errorCodeForbidden = "forbidden"
errorCodeInternalError = "internal-error"
errorCodeBadRequest = "bad-request"
errorCodeNotFound = "not-found"
)
type uploadResponse struct {
Bucket string `json:"bucket"`
BlobID storage.BlobID `json:"blobId"`
}
func (h *Handler) handleAppUpload(w http.ResponseWriter, r *http.Request) {
h.mutex.RLock()
defer h.mutex.RUnlock()
ctx := r.Context()
r.Body = http.MaxBytesReader(w, r.Body, h.uploadMaxFileSize)
if err := r.ParseMultipartForm(h.uploadMaxFileSize); err != nil {
logger.Error(ctx, "could not parse multipart form", logger.CapturedE(errors.WithStack(err)))
jsonError(w, http.StatusBadRequest, errorCodeBadRequest)
return
}
_, fileHeader, err := r.FormFile("file")
if err != nil {
logger.Error(ctx, "could not read form file", logger.CapturedE(errors.WithStack(err)))
jsonError(w, http.StatusBadRequest, errorCodeBadRequest)
return
}
var metadata map[string]any
rawMetadata := r.Form.Get("metadata")
if rawMetadata != "" {
if err := json.Unmarshal([]byte(rawMetadata), &metadata); err != nil {
logger.Error(ctx, "could not parse metadata", logger.CapturedE(errors.WithStack(err)))
jsonError(w, http.StatusBadRequest, errorCodeBadRequest)
return
}
}
ctx = module.WithContext(ctx, map[module.ContextKey]any{
ContextKeyOriginRequest: r,
})
requestMsg := blob.NewMessageUploadRequest(ctx, fileHeader, metadata)
reply, err := h.bus.Request(ctx, requestMsg)
if err != nil {
logger.Error(ctx, "could not retrieve file", logger.CapturedE(errors.WithStack(err)))
jsonError(w, http.StatusInternalServerError, errorCodeInternalError)
return
}
logger.Debug(ctx, "upload reply", logger.F("reply", reply))
responseMsg, ok := reply.(*blob.MessageUploadResponse)
if !ok {
logger.Error(
ctx, "unexpected upload response message",
logger.F("message", reply),
)
jsonError(w, http.StatusInternalServerError, errorCodeInternalError)
return
}
if !responseMsg.Allow {
jsonError(w, http.StatusForbidden, errorCodeForbidden)
return
}
encoder := json.NewEncoder(w)
res := &uploadResponse{
Bucket: responseMsg.Bucket,
BlobID: responseMsg.BlobID,
}
if err := encoder.Encode(res); err != nil {
panic(errors.Wrap(err, "could not encode upload response"))
}
}
func (h *Handler) handleAppDownload(w http.ResponseWriter, r *http.Request) {
h.mutex.RLock()
defer h.mutex.RUnlock()
bucket := chi.URLParam(r, "bucket")
blobID := chi.URLParam(r, "blobID")
ctx := logger.With(r.Context(), logger.F("blobID", blobID), logger.F("bucket", bucket))
ctx = module.WithContext(ctx, map[module.ContextKey]any{
ContextKeyOriginRequest: r,
})
requestMsg := blob.NewMessageDownloadRequest(ctx, bucket, storage.BlobID(blobID))
reply, err := h.bus.Request(ctx, requestMsg)
if err != nil {
logger.Error(ctx, "could not retrieve file", logger.CapturedE(errors.WithStack(err)))
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
replyMsg, ok := reply.(*blob.MessageDownloadResponse)
if !ok {
logger.Error(
ctx, "unexpected download response message",
logger.CapturedE(errors.WithStack(bus.ErrUnexpectedMessage)),
logger.F("message", reply),
)
jsonError(w, http.StatusInternalServerError, errorCodeInternalError)
return
}
if !replyMsg.Allow {
jsonError(w, http.StatusForbidden, errorCodeForbidden)
return
}
if replyMsg.Blob == nil {
jsonError(w, http.StatusNotFound, errorCodeNotFound)
return
}
defer func() {
if err := replyMsg.Blob.Close(); err != nil {
logger.Error(ctx, "could not close blob", logger.CapturedE(errors.WithStack(err)))
}
}()
http.ServeContent(w, r, string(replyMsg.BlobInfo.ID()), replyMsg.BlobInfo.ModTime(), replyMsg.Blob)
}
func serveFile(w http.ResponseWriter, r *http.Request, fs fs.FS, path string) {
ctx := logger.With(r.Context(), logger.F("path", path))
file, err := fs.Open(path)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
return
}
logger.Error(ctx, "error while opening fs file", logger.CapturedE(errors.WithStack(err)))
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
defer func() {
if err := file.Close(); err != nil {
logger.Error(ctx, "error while closing fs file", logger.CapturedE(errors.WithStack(err)))
}
}()
info, err := file.Stat()
if err != nil {
logger.Error(ctx, "error while retrieving fs file stat", logger.CapturedE(errors.WithStack(err)))
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
reader, ok := file.(io.ReadSeeker)
if !ok {
return
}
http.ServeContent(w, r, path, info.ModTime(), reader)
}
type jsonErrorResponse struct {
Error jsonErr `json:"error"`
}
type jsonErr struct {
Code string `json:"code"`
}
func jsonError(w http.ResponseWriter, status int, code string) {
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(status)
encoder := json.NewEncoder(w)
response := jsonErrorResponse{
Error: jsonErr{
Code: code,
},
}
if err := encoder.Encode(response); err != nil {
panic(errors.WithStack(err))
}
}
type uploadedFile struct {
multipart.File
header *multipart.FileHeader
modTime time.Time
}
// Stat implements fs.File
func (f *uploadedFile) Stat() (fs.FileInfo, error) {
return &uploadedFileInfo{
header: f.header,
modTime: f.modTime,
}, nil
}
type uploadedFileInfo struct {
header *multipart.FileHeader
modTime time.Time
}
// IsDir implements fs.FileInfo
func (i *uploadedFileInfo) IsDir() bool {
return false
}
// ModTime implements fs.FileInfo
func (i *uploadedFileInfo) ModTime() time.Time {
return i.modTime
}
// Mode implements fs.FileInfo
func (i *uploadedFileInfo) Mode() fs.FileMode {
return os.ModePerm
}
// Name implements fs.FileInfo
func (i *uploadedFileInfo) Name() string {
return i.header.Filename
}
// Size implements fs.FileInfo
func (i *uploadedFileInfo) Size() int64 {
return i.header.Size
}
// Sys implements fs.FileInfo
func (i *uploadedFileInfo) Sys() any {
return nil
}
var (
_ fs.File = &uploadedFile{}
_ fs.FileInfo = &uploadedFileInfo{}
)

View File

@ -7,11 +7,11 @@ import (
) )
func (h *Handler) handleSDKClient(w http.ResponseWriter, r *http.Request) { func (h *Handler) handleSDKClient(w http.ResponseWriter, r *http.Request) {
ServeFile(w, r, &sdk.FS, "client/dist/client.js") serveFile(w, r, &sdk.FS, "client/dist/client.js")
} }
func (h *Handler) handleSDKClientMap(w http.ResponseWriter, r *http.Request) { func (h *Handler) handleSDKClientMap(w http.ResponseWriter, r *http.Request) {
ServeFile(w, r, &sdk.FS, "client/dist/client.js.map") serveFile(w, r, &sdk.FS, "client/dist/client.js.map")
} }
func (h *Handler) handleAppFiles(w http.ResponseWriter, r *http.Request) { func (h *Handler) handleAppFiles(w http.ResponseWriter, r *http.Request) {

View File

@ -1,55 +0,0 @@
package http
import (
"context"
"net/http"
"forge.cadoles.com/arcad/edge/pkg/bus"
"github.com/pkg/errors"
)
type contextKey string
var (
contextKeyBus contextKey = "bus"
contextKeyHTTPRequest contextKey = "httpRequest"
contextKeyHTTPClient contextKey = "httpClient"
)
func (h *Handler) contextMiddleware(next http.Handler) http.Handler {
fn := func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
ctx = context.WithValue(ctx, contextKeyBus, h.bus)
ctx = context.WithValue(ctx, contextKeyHTTPRequest, r)
ctx = context.WithValue(ctx, contextKeyHTTPClient, h.httpClient)
r = r.WithContext(ctx)
next.ServeHTTP(w, r)
}
return http.HandlerFunc(fn)
}
func ContextBus(ctx context.Context) bus.Bus {
return contextValue[bus.Bus](ctx, contextKeyBus)
}
func ContextHTTPRequest(ctx context.Context) *http.Request {
return contextValue[*http.Request](ctx, contextKeyHTTPRequest)
}
func ContextHTTPClient(ctx context.Context) *http.Client {
return contextValue[*http.Client](ctx, contextKeyHTTPClient)
}
func contextValue[T any](ctx context.Context, key any) T {
value, ok := ctx.Value(key).(T)
if !ok {
panic(errors.Errorf("could not find key '%v' on context", key))
}
return value
}

View File

@ -1,30 +0,0 @@
package http
import (
"context"
"forge.cadoles.com/arcad/edge/pkg/bus"
)
var (
AddressIncomingMessage bus.Address = "http/incoming-message"
AddressOutgoingMessage bus.Address = "http/outgoing-message"
)
type IncomingMessage struct {
Context context.Context
Payload map[string]any
}
func NewIncomingMessageEnvelope(ctx context.Context, payload map[string]any) bus.Envelope {
return bus.NewEnvelope(AddressIncomingMessage, &IncomingMessage{ctx, payload})
}
type OutgoingMessage struct {
SessionID string
Data any
}
func NewOutgoingMessageEnvelope(sessionID string, data any) bus.Envelope {
return bus.NewEnvelope(AddressOutgoingMessage, &OutgoingMessage{sessionID, data})
}

View File

@ -1,61 +1,60 @@
package fetch package http
import ( import (
"io" "io"
"net/http" "net/http"
"net/url" "net/url"
edgehttp "forge.cadoles.com/arcad/edge/pkg/http" "forge.cadoles.com/arcad/edge/pkg/module"
"github.com/go-chi/chi/v5" "forge.cadoles.com/arcad/edge/pkg/module/fetch"
"github.com/pkg/errors" "github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger" "gitlab.com/wpetit/goweb/logger"
) )
func Mount() func(r chi.Router) { func (h *Handler) handleAppFetch(w http.ResponseWriter, r *http.Request) {
return func(r chi.Router) { h.mutex.RLock()
r.Get("/api/v1/fetch", handleAppFetch) defer h.mutex.RUnlock()
}
}
func handleAppFetch(w http.ResponseWriter, r *http.Request) {
ctx := r.Context() ctx := r.Context()
ctx = module.WithContext(ctx, map[module.ContextKey]any{
ContextKeyOriginRequest: r,
})
rawURL := r.URL.Query().Get("url") rawURL := r.URL.Query().Get("url")
url, err := url.Parse(rawURL) url, err := url.Parse(rawURL)
if err != nil { if err != nil {
edgehttp.JSONError(w, http.StatusBadRequest, edgehttp.ErrCodeBadRequest) jsonError(w, http.StatusBadRequest, errorCodeBadRequest)
return return
} }
requestMsg := NewFetchRequestEnvelope(ctx, r.RemoteAddr, url) requestMsg := fetch.NewMessageFetchRequest(ctx, r.RemoteAddr, url)
bus := edgehttp.ContextBus(ctx) reply, err := h.bus.Request(ctx, requestMsg)
reply, err := bus.Request(ctx, requestMsg)
if err != nil { if err != nil {
logger.Error(ctx, "could not retrieve fetch request reply", logger.CapturedE(errors.WithStack(err))) logger.Error(ctx, "could not retrieve fetch request reply", logger.CapturedE(errors.WithStack(err)))
edgehttp.JSONError(w, http.StatusInternalServerError, edgehttp.ErrCodeInternalError) jsonError(w, http.StatusInternalServerError, errorCodeInternalError)
return return
} }
logger.Debug(ctx, "fetch reply", logger.F("reply", reply)) logger.Debug(ctx, "fetch reply", logger.F("reply", reply))
responseMsg, ok := reply.Message().(*FetchResponse) responseMsg, ok := reply.(*fetch.MessageFetchResponse)
if !ok { if !ok {
logger.Error( logger.Error(
ctx, "unexpected fetch response message", ctx, "unexpected fetch response message",
logger.F("message", reply), logger.F("message", reply),
) )
edgehttp.JSONError(w, http.StatusInternalServerError, edgehttp.ErrCodeInternalError) jsonError(w, http.StatusInternalServerError, errorCodeInternalError)
return return
} }
if !responseMsg.Allow { if !responseMsg.Allow {
edgehttp.JSONError(w, http.StatusForbidden, edgehttp.ErrCodeForbidden) jsonError(w, http.StatusForbidden, errorCodeForbidden)
return return
} }
@ -66,7 +65,7 @@ func handleAppFetch(w http.ResponseWriter, r *http.Request) {
ctx, "could not create proxy request", ctx, "could not create proxy request",
logger.CapturedE(errors.WithStack(err)), logger.CapturedE(errors.WithStack(err)),
) )
edgehttp.JSONError(w, http.StatusInternalServerError, edgehttp.ErrCodeInternalError) jsonError(w, http.StatusInternalServerError, errorCodeInternalError)
return return
} }
@ -79,15 +78,13 @@ func handleAppFetch(w http.ResponseWriter, r *http.Request) {
proxyReq.Header.Add("X-Forwarded-From", r.RemoteAddr) proxyReq.Header.Add("X-Forwarded-From", r.RemoteAddr)
httpClient := edgehttp.ContextHTTPClient(ctx) res, err := h.httpClient.Do(proxyReq)
res, err := httpClient.Do(proxyReq)
if err != nil { if err != nil {
logger.Error( logger.Error(
ctx, "could not execute proxy request", ctx, "could not execute proxy request",
logger.CapturedE(errors.WithStack(err)), logger.CapturedE(errors.WithStack(err)),
) )
edgehttp.JSONError(w, http.StatusInternalServerError, edgehttp.ErrCodeInternalError) jsonError(w, http.StatusInternalServerError, errorCodeInternalError)
return return
} }

View File

@ -27,6 +27,7 @@ type Handler struct {
sockjs http.Handler sockjs http.Handler
bus bus.Bus bus bus.Bus
sockjsOpts sockjs.Options sockjsOpts sockjs.Options
uploadMaxFileSize int64
server *app.Server server *app.Server
serverModuleFactories []app.ServerModuleFactory serverModuleFactories []app.ServerModuleFactory
@ -56,6 +57,10 @@ func (h *Handler) Load(ctx context.Context, bdle bundle.Bundle) error {
server := app.NewServer(h.serverModuleFactories...) server := app.NewServer(h.serverModuleFactories...)
if err := server.Load(serverMainScript, string(mainScript)); err != nil {
return errors.WithStack(err)
}
fs := bundle.NewFileSystem("public", bdle) fs := bundle.NewFileSystem("public", bdle)
public := HTML5Fileserver(fs) public := HTML5Fileserver(fs)
sockjs := sockjs.NewHandler(sockJSPathPrefix, h.sockjsOpts, h.handleSockJSSession) sockjs := sockjs.NewHandler(sockJSPathPrefix, h.sockjsOpts, h.handleSockJSSession)
@ -64,7 +69,7 @@ func (h *Handler) Load(ctx context.Context, bdle bundle.Bundle) error {
h.server.Stop() h.server.Stop()
} }
if err := server.Start(ctx, serverMainScript, string(mainScript)); err != nil { if err := server.Start(ctx); err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }
@ -85,6 +90,7 @@ func NewHandler(funcs ...HandlerOptionFunc) *Handler {
router := chi.NewRouter() router := chi.NewRouter()
handler := &Handler{ handler := &Handler{
uploadMaxFileSize: opts.UploadMaxFileSize,
sockjsOpts: opts.SockJS, sockjsOpts: opts.SockJS,
router: router, router: router,
serverModuleFactories: opts.ServerModuleFactories, serverModuleFactories: opts.ServerModuleFactories,
@ -102,14 +108,18 @@ func NewHandler(funcs ...HandlerOptionFunc) *Handler {
r.Get("/client.js.map", handler.handleSDKClientMap) r.Get("/client.js.map", handler.handleSDKClientMap)
}) })
r.Group(func(r chi.Router) { r.Route("/api", func(r chi.Router) {
r.Use(handler.contextMiddleware) r.Post("/v1/upload", handler.handleAppUpload)
r.Get("/v1/download/{bucket}/{blobID}", handler.handleAppDownload)
r.Get("/v1/fetch", handler.handleAppFetch)
})
for _, fn := range opts.HTTPMounts { for _, fn := range opts.HTTPMounts {
r.Group(func(r chi.Router) { r.Group(func(r chi.Router) {
fn(r) fn(r)
}) })
} }
})
r.HandleFunc("/sock/*", handler.handleSockJS) r.HandleFunc("/sock/*", handler.handleSockJS)
}) })

View File

@ -15,6 +15,7 @@ type HandlerOptions struct {
Bus bus.Bus Bus bus.Bus
SockJS sockjs.Options SockJS sockjs.Options
ServerModuleFactories []app.ServerModuleFactory ServerModuleFactories []app.ServerModuleFactory
UploadMaxFileSize int64
HTTPClient *http.Client HTTPClient *http.Client
HTTPMounts []func(r chi.Router) HTTPMounts []func(r chi.Router)
HTTPMiddlewares []func(next http.Handler) http.Handler HTTPMiddlewares []func(next http.Handler) http.Handler
@ -30,6 +31,7 @@ func defaultHandlerOptions() *HandlerOptions {
Bus: memory.NewBus(), Bus: memory.NewBus(),
SockJS: sockjsOptions, SockJS: sockjsOptions,
ServerModuleFactories: make([]app.ServerModuleFactory, 0), ServerModuleFactories: make([]app.ServerModuleFactory, 0),
UploadMaxFileSize: 10 << (10 * 2), // 10Mb
HTTPClient: &http.Client{ HTTPClient: &http.Client{
Timeout: time.Second * 30, Timeout: time.Second * 30,
}, },
@ -58,6 +60,12 @@ func WithBus(bus bus.Bus) HandlerOptionFunc {
} }
} }
func WithUploadMaxFileSize(size int64) HandlerOptionFunc {
return func(opts *HandlerOptions) {
opts.UploadMaxFileSize = size
}
}
func WithHTTPClient(client *http.Client) HandlerOptionFunc { func WithHTTPClient(client *http.Client) HandlerOptionFunc {
return func(opts *HandlerOptions) { return func(opts *HandlerOptions) {
opts.HTTPClient = client opts.HTTPClient = client

View File

@ -42,18 +42,19 @@ func (h *Handler) handleSockJSSession(sess sockjs.Session) {
} }
}() }()
go h.handleOutgoingMessages(ctx, sess) go h.handleServerMessages(ctx, sess)
h.handleIncomingMessages(ctx, sess) h.handleClientMessages(ctx, sess)
} }
func (h *Handler) handleOutgoingMessages(ctx context.Context, sess sockjs.Session) { func (h *Handler) handleServerMessages(ctx context.Context, sess sockjs.Session) {
envelopes, err := h.bus.Subscribe(ctx, AddressOutgoingMessage) messages, err := h.bus.Subscribe(ctx, module.MessageNamespaceServer)
if err != nil { if err != nil {
panic(errors.WithStack(err)) panic(errors.WithStack(err))
} }
defer func() { defer func() {
h.bus.Unsubscribe(AddressOutgoingMessage, envelopes) // Close messages subscriber
h.bus.Unsubscribe(ctx, module.MessageNamespaceServer, messages)
logger.Debug(ctx, "unsubscribed") logger.Debug(ctx, "unsubscribed")
@ -71,22 +72,26 @@ func (h *Handler) handleOutgoingMessages(ctx context.Context, sess sockjs.Sessio
case <-ctx.Done(): case <-ctx.Done():
return return
case env := <-envelopes: case msg := <-messages:
outgoingMessage, ok := env.Message().(*OutgoingMessage) serverMessage, ok := msg.(*module.ServerMessage)
if !ok { if !ok {
logger.Error( logger.Error(
ctx, ctx,
"unexpected outgoing message", "unexpected server message",
logger.F("message", env.Message()), logger.F("message", msg),
) )
continue
} }
isDest := outgoingMessage.SessionID == "" || outgoingMessage.SessionID == sess.ID() sessionID := module.ContextValue[string](serverMessage.Context, ContextKeySessionID)
isDest := sessionID == "" || sessionID == sess.ID()
if !isDest { if !isDest {
continue continue
} }
payload, err := json.Marshal(outgoingMessage.Data) payload, err := json.Marshal(serverMessage.Data)
if err != nil { if err != nil {
logger.Error( logger.Error(
ctx, ctx,
@ -127,7 +132,7 @@ func (h *Handler) handleOutgoingMessages(ctx context.Context, sess sockjs.Sessio
} }
} }
func (h *Handler) handleIncomingMessages(ctx context.Context, sess sockjs.Session) { func (h *Handler) handleClientMessages(ctx context.Context, sess sockjs.Session) {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -140,7 +145,7 @@ func (h *Handler) handleIncomingMessages(ctx context.Context, sess sockjs.Sessio
data, err := sess.RecvCtx(ctx) data, err := sess.RecvCtx(ctx)
if err != nil { if err != nil {
if errors.Is(err, sockjs.ErrSessionNotOpen) || errors.Is(err, context.Canceled) { if errors.Is(err, sockjs.ErrSessionNotOpen) {
break break
} }
@ -169,7 +174,7 @@ func (h *Handler) handleIncomingMessages(ctx context.Context, sess sockjs.Sessio
switch { switch {
case message.Type == WebsocketMessageTypeMessage: case message.Type == WebsocketMessageTypeMessage:
var payload map[string]any var payload map[string]interface{}
if err := json.Unmarshal(message.Payload, &payload); err != nil { if err := json.Unmarshal(message.Payload, &payload); err != nil {
logger.Error( logger.Error(
ctx, ctx,
@ -186,19 +191,21 @@ func (h *Handler) handleIncomingMessages(ctx context.Context, sess sockjs.Sessio
ContextKeyOriginRequest: sess.Request(), ContextKeyOriginRequest: sess.Request(),
}) })
incomingMessage := NewIncomingMessageEnvelope(ctx, payload) clientMessage := module.NewClientMessage(ctx, payload)
logger.Debug(ctx, "publishing new incoming message", logger.F("message", incomingMessage)) logger.Debug(ctx, "publishing new client message", logger.F("message", clientMessage))
if err := h.bus.Publish(incomingMessage); err != nil { if err := h.bus.Publish(ctx, clientMessage); err != nil {
logger.Error(ctx, "could not publish message", logger.Error(ctx, "could not publish message",
logger.CapturedE(errors.WithStack(err)), logger.CapturedE(errors.WithStack(err)),
logger.F("message", incomingMessage), logger.F("message", clientMessage),
) )
return return
} }
logger.Debug(ctx, "new client message published", logger.F("message", clientMessage))
default: default:
logger.Error( logger.Error(
ctx, ctx,

View File

@ -1,82 +0,0 @@
package http
import (
"encoding/json"
"io"
"io/fs"
"net/http"
"os"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
const (
ErrCodeForbidden = "forbidden"
ErrCodeInternalError = "internal-error"
ErrCodeBadRequest = "bad-request"
ErrCodeNotFound = "not-found"
)
type jsonErrorResponse struct {
Error jsonErr `json:"error"`
}
type jsonErr struct {
Code string `json:"code"`
}
func JSONError(w http.ResponseWriter, status int, code string) {
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(status)
encoder := json.NewEncoder(w)
response := jsonErrorResponse{
Error: jsonErr{
Code: code,
},
}
if err := encoder.Encode(response); err != nil {
panic(errors.WithStack(err))
}
}
func ServeFile(w http.ResponseWriter, r *http.Request, fs fs.FS, path string) {
ctx := logger.With(r.Context(), logger.F("path", path))
file, err := fs.Open(path)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
return
}
logger.Error(ctx, "error while opening fs file", logger.CapturedE(errors.WithStack(err)))
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
defer func() {
if err := file.Close(); err != nil {
logger.Error(ctx, "error while closing fs file", logger.CapturedE(errors.WithStack(err)))
}
}()
info, err := file.Stat()
if err != nil {
logger.Error(ctx, "error while retrieving fs file stat", logger.CapturedE(errors.WithStack(err)))
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
reader, ok := file.(io.ReadSeeker)
if !ok {
return
}
http.ServeContent(w, r, path, info.ModTime(), reader)
}

View File

@ -39,17 +39,21 @@ func TestAppModuleWithMemoryRepository(t *testing.T) {
)), )),
) )
script := "testdata/app.js" file := "testdata/app.js"
data, err := os.ReadFile(script) data, err := os.ReadFile(file)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
ctx := context.Background() if err := server.Load(file, string(data)); err != nil {
if err := server.Start(ctx, script, string(data)); err != nil { t.Fatal(err)
t.Fatalf("%+v", errors.WithStack(err))
} }
defer server.Stop() defer server.Stop()
ctx := context.Background()
if err := server.Start(ctx); err != nil {
t.Fatalf("%+v", errors.WithStack(err))
}
} }

View File

@ -2,8 +2,8 @@ package auth
import ( import (
"context" "context"
"io/ioutil"
"net/http" "net/http"
"os"
"testing" "testing"
"time" "time"
@ -22,9 +22,7 @@ import (
func TestAuthModule(t *testing.T) { func TestAuthModule(t *testing.T) {
t.Parallel() t.Parallel()
if testing.Verbose() {
logger.SetLevel(slog.LevelDebug) logger.SetLevel(slog.LevelDebug)
}
key := getDummyKey() key := getDummyKey()
@ -35,15 +33,17 @@ func TestAuthModule(t *testing.T) {
), ),
) )
script := "testdata/auth.js" data, err := ioutil.ReadFile("testdata/auth.js")
data, err := os.ReadFile(script)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := server.Load("testdata/auth.js", string(data)); err != nil {
t.Fatal(err)
}
ctx := context.Background() ctx := context.Background()
if err := server.Start(ctx, script, string(data)); err != nil { if err := server.Start(ctx); err != nil {
t.Fatalf("%+v", errors.WithStack(err)) t.Fatalf("%+v", errors.WithStack(err))
} }
@ -81,9 +81,7 @@ func TestAuthModule(t *testing.T) {
func TestAuthAnonymousModule(t *testing.T) { func TestAuthAnonymousModule(t *testing.T) {
t.Parallel() t.Parallel()
if testing.Verbose() {
logger.SetLevel(slog.LevelDebug) logger.SetLevel(slog.LevelDebug)
}
key := getDummyKey() key := getDummyKey()
@ -92,15 +90,17 @@ func TestAuthAnonymousModule(t *testing.T) {
ModuleFactory(WithJWT(getDummyKeySet(key))), ModuleFactory(WithJWT(getDummyKeySet(key))),
) )
script := "testdata/auth_anonymous.js" data, err := ioutil.ReadFile("testdata/auth_anonymous.js")
data, err := os.ReadFile("testdata/auth_anonymous.js")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := server.Load("testdata/auth_anonymous.js", string(data)); err != nil {
t.Fatal(err)
}
ctx := context.Background() ctx := context.Background()
if err := server.Start(ctx, script, string(data)); err != nil { if err := server.Start(ctx); err != nil {
t.Fatalf("%+v", errors.WithStack(err)) t.Fatalf("%+v", errors.WithStack(err))
} }

View File

@ -0,0 +1,92 @@
package blob
import (
"context"
"io"
"mime/multipart"
"forge.cadoles.com/arcad/edge/pkg/bus"
"forge.cadoles.com/arcad/edge/pkg/storage"
"github.com/oklog/ulid/v2"
)
const (
MessageNamespaceUploadRequest bus.MessageNamespace = "uploadRequest"
MessageNamespaceUploadResponse bus.MessageNamespace = "uploadResponse"
MessageNamespaceDownloadRequest bus.MessageNamespace = "downloadRequest"
MessageNamespaceDownloadResponse bus.MessageNamespace = "downloadResponse"
)
type MessageUploadRequest struct {
Context context.Context
RequestID string
FileHeader *multipart.FileHeader
Metadata map[string]interface{}
}
func (m *MessageUploadRequest) MessageNamespace() bus.MessageNamespace {
return MessageNamespaceUploadRequest
}
func NewMessageUploadRequest(ctx context.Context, fileHeader *multipart.FileHeader, metadata map[string]interface{}) *MessageUploadRequest {
return &MessageUploadRequest{
Context: ctx,
RequestID: ulid.Make().String(),
FileHeader: fileHeader,
Metadata: metadata,
}
}
type MessageUploadResponse struct {
RequestID string
BlobID storage.BlobID
Bucket string
Allow bool
}
func (m *MessageUploadResponse) MessageNamespace() bus.MessageNamespace {
return MessageNamespaceDownloadResponse
}
func NewMessageUploadResponse(requestID string) *MessageUploadResponse {
return &MessageUploadResponse{
RequestID: requestID,
}
}
type MessageDownloadRequest struct {
Context context.Context
RequestID string
Bucket string
BlobID storage.BlobID
}
func (m *MessageDownloadRequest) MessageNamespace() bus.MessageNamespace {
return MessageNamespaceDownloadRequest
}
func NewMessageDownloadRequest(ctx context.Context, bucket string, blobID storage.BlobID) *MessageDownloadRequest {
return &MessageDownloadRequest{
Context: ctx,
RequestID: ulid.Make().String(),
Bucket: bucket,
BlobID: blobID,
}
}
type MessageDownloadResponse struct {
RequestID string
Allow bool
BlobInfo storage.BlobInfo
Blob io.ReadSeekCloser
}
func (m *MessageDownloadResponse) MessageNamespace() bus.MessageNamespace {
return MessageNamespaceDownloadResponse
}
func NewMessageDownloadResponse(requestID string) *MessageDownloadResponse {
return &MessageDownloadResponse{
RequestID: requestID,
}
}

View File

@ -1,55 +0,0 @@
package blob
import (
"context"
"io"
"mime/multipart"
"forge.cadoles.com/arcad/edge/pkg/bus"
"forge.cadoles.com/arcad/edge/pkg/storage"
)
const (
AddressUpload bus.Address = "module/blob/upload"
AddressDownload bus.Address = "module/blob/download"
)
type UploadRequest struct {
Context context.Context
FileHeader *multipart.FileHeader
Metadata map[string]interface{}
}
func NewUploadRequestEnvelope(ctx context.Context, fileHeader *multipart.FileHeader, metadata map[string]interface{}) bus.Envelope {
return bus.NewEnvelope(AddressUpload, &UploadRequest{
Context: ctx,
FileHeader: fileHeader,
Metadata: metadata,
})
}
type UploadResponse struct {
Allow bool
Bucket string
BlobID storage.BlobID
}
type DownloadRequest struct {
Context context.Context
Bucket string
BlobID storage.BlobID
}
func NewDownloadRequestEnvelope(ctx context.Context, bucket string, blobID storage.BlobID) bus.Envelope {
return bus.NewEnvelope(AddressDownload, &DownloadRequest{
Context: ctx,
Bucket: bucket,
BlobID: blobID,
})
}
type DownloadResponse struct {
Allow bool
Blob io.ReadSeekCloser
BlobInfo storage.BlobInfo
}

View File

@ -1,212 +0,0 @@
package blob
import (
"encoding/json"
"io/fs"
"mime/multipart"
"net/http"
"os"
"time"
"forge.cadoles.com/arcad/edge/pkg/bus"
edgehttp "forge.cadoles.com/arcad/edge/pkg/http"
"forge.cadoles.com/arcad/edge/pkg/storage"
"github.com/go-chi/chi/v5"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
type uploadResponse struct {
Bucket string `json:"bucket"`
BlobID storage.BlobID `json:"blobId"`
}
func Mount(uploadMaxFileSize int) func(r chi.Router) {
return func(r chi.Router) {
r.Post("/api/v1/upload", getAppUploadHandler(uploadMaxFileSize))
r.Get("/api/v1/download/{bucket}/{blobID}", handleAppDownload)
}
}
func getAppUploadHandler(fileMaxUpload int) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
uploadMaxFileSize := int64(8000)
r.Body = http.MaxBytesReader(w, r.Body, uploadMaxFileSize)
if err := r.ParseMultipartForm(uploadMaxFileSize); err != nil {
logger.Error(ctx, "could not parse multipart form", logger.CapturedE(errors.WithStack(err)))
edgehttp.JSONError(w, http.StatusBadRequest, edgehttp.ErrCodeBadRequest)
return
}
_, fileHeader, err := r.FormFile("file")
if err != nil {
logger.Error(ctx, "could not read form file", logger.CapturedE(errors.WithStack(err)))
edgehttp.JSONError(w, http.StatusBadRequest, edgehttp.ErrCodeBadRequest)
return
}
var metadata map[string]any
rawMetadata := r.Form.Get("metadata")
if rawMetadata != "" {
if err := json.Unmarshal([]byte(rawMetadata), &metadata); err != nil {
logger.Error(ctx, "could not parse metadata", logger.CapturedE(errors.WithStack(err)))
edgehttp.JSONError(w, http.StatusBadRequest, edgehttp.ErrCodeBadRequest)
return
}
}
requestEnv := NewUploadRequestEnvelope(ctx, fileHeader, metadata)
bus := edgehttp.ContextBus(ctx)
reply, err := bus.Request(ctx, requestEnv)
if err != nil {
logger.Error(ctx, "could not retrieve file", logger.CapturedE(errors.WithStack(err)))
edgehttp.JSONError(w, http.StatusInternalServerError, edgehttp.ErrCodeInternalError)
return
}
logger.Debug(ctx, "upload reply", logger.F("reply", reply))
replyMessage, ok := reply.Message().(*UploadResponse)
if !ok {
logger.Error(
ctx, "unexpected upload response message",
logger.F("message", reply.Message()),
)
edgehttp.JSONError(w, http.StatusInternalServerError, edgehttp.ErrCodeInternalError)
return
}
if !replyMessage.Allow {
edgehttp.JSONError(w, http.StatusForbidden, edgehttp.ErrCodeForbidden)
return
}
encoder := json.NewEncoder(w)
res := &uploadResponse{
Bucket: replyMessage.Bucket,
BlobID: replyMessage.BlobID,
}
if err := encoder.Encode(res); err != nil {
panic(errors.Wrap(err, "could not encode upload response"))
}
}
}
func handleAppDownload(w http.ResponseWriter, r *http.Request) {
bucket := chi.URLParam(r, "bucket")
blobID := chi.URLParam(r, "blobID")
ctx := logger.With(r.Context(), logger.F("blobID", blobID), logger.F("bucket", bucket))
requestMsg := NewDownloadRequestEnvelope(ctx, bucket, storage.BlobID(blobID))
bs := edgehttp.ContextBus(ctx)
reply, err := bs.Request(ctx, requestMsg)
if err != nil {
logger.Error(ctx, "could not retrieve file", logger.CapturedE(errors.WithStack(err)))
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
replyMessage, ok := reply.Message().(*DownloadResponse)
if !ok {
logger.Error(
ctx, "unexpected download response message",
logger.CapturedE(errors.WithStack(bus.ErrUnexpectedMessage)),
logger.F("message", reply),
)
edgehttp.JSONError(w, http.StatusInternalServerError, edgehttp.ErrCodeInternalError)
return
}
if !replyMessage.Allow {
edgehttp.JSONError(w, http.StatusForbidden, edgehttp.ErrCodeForbidden)
return
}
if replyMessage.Blob == nil {
edgehttp.JSONError(w, http.StatusNotFound, edgehttp.ErrCodeNotFound)
return
}
defer func() {
if err := replyMessage.Blob.Close(); err != nil {
logger.Error(ctx, "could not close blob", logger.CapturedE(errors.WithStack(err)))
}
}()
http.ServeContent(w, r, string(replyMessage.BlobInfo.ID()), replyMessage.BlobInfo.ModTime(), replyMessage.Blob)
}
type uploadedFile struct {
multipart.File
header *multipart.FileHeader
modTime time.Time
}
// Stat implements fs.File
func (f *uploadedFile) Stat() (fs.FileInfo, error) {
return &uploadedFileInfo{
header: f.header,
modTime: f.modTime,
}, nil
}
type uploadedFileInfo struct {
header *multipart.FileHeader
modTime time.Time
}
// IsDir implements fs.FileInfo
func (i *uploadedFileInfo) IsDir() bool {
return false
}
// ModTime implements fs.FileInfo
func (i *uploadedFileInfo) ModTime() time.Time {
return i.modTime
}
// Mode implements fs.FileInfo
func (i *uploadedFileInfo) Mode() fs.FileMode {
return os.ModePerm
}
// Name implements fs.FileInfo
func (i *uploadedFileInfo) Name() string {
return i.header.Filename
}
// Size implements fs.FileInfo
func (i *uploadedFileInfo) Size() int64 {
return i.header.Size
}
// Sys implements fs.FileInfo
func (i *uploadedFileInfo) Sys() any {
return nil
}
var (
_ fs.File = &uploadedFile{}
_ fs.FileInfo = &uploadedFileInfo{}
)

View File

@ -236,10 +236,11 @@ func (m *Module) getBucketSize(call goja.FunctionCall, rt *goja.Runtime) goja.Va
func (m *Module) handleMessages() { func (m *Module) handleMessages() {
ctx := context.Background() ctx := context.Background()
uploadRequestErrs := m.bus.Reply(ctx, AddressUpload, func(env bus.Envelope) (any, error) { go func() {
uploadRequest, ok := env.Message().(*UploadRequest) err := m.bus.Reply(ctx, MessageNamespaceUploadRequest, func(msg bus.Message) (bus.Message, error) {
uploadRequest, ok := msg.(*MessageUploadRequest)
if !ok { if !ok {
return nil, errors.Wrapf(bus.ErrUnexpectedMessage, "expected message upload request, got '%T'", env.Message()) return nil, errors.Wrapf(bus.ErrUnexpectedMessage, "expected message upload request, got '%T'", msg)
} }
res, err := m.handleUploadRequest(uploadRequest) res, err := m.handleUploadRequest(uploadRequest)
@ -253,17 +254,15 @@ func (m *Module) handleMessages() {
return res, nil return res, nil
}) })
if err != nil {
go func() { panic(errors.WithStack(err))
for err := range uploadRequestErrs {
logger.Error(ctx, "error while replying to upload requests", logger.CapturedE(errors.WithStack(err)))
} }
}() }()
downloadRequestErrs := m.bus.Reply(ctx, AddressDownload, func(env bus.Envelope) (any, error) { err := m.bus.Reply(ctx, MessageNamespaceDownloadRequest, func(msg bus.Message) (bus.Message, error) {
downloadRequest, ok := env.Message().(*DownloadRequest) downloadRequest, ok := msg.(*MessageDownloadRequest)
if !ok { if !ok {
return nil, errors.Wrapf(bus.ErrUnexpectedMessage, "expected message download request, got '%T'", env.Message()) return nil, errors.Wrapf(bus.ErrUnexpectedMessage, "expected message download request, got '%T'", msg)
} }
res, err := m.handleDownloadRequest(downloadRequest) res, err := m.handleDownloadRequest(downloadRequest)
@ -275,15 +274,14 @@ func (m *Module) handleMessages() {
return res, nil return res, nil
}) })
if err != nil {
for err := range downloadRequestErrs { panic(errors.WithStack(err))
logger.Fatal(ctx, "error while replying to download requests", logger.CapturedE(errors.WithStack(err)))
} }
} }
func (m *Module) handleUploadRequest(req *UploadRequest) (*UploadResponse, error) { func (m *Module) handleUploadRequest(req *MessageUploadRequest) (*MessageUploadResponse, error) {
blobID := storage.NewBlobID() blobID := storage.NewBlobID()
res := &UploadResponse{} res := NewMessageUploadResponse(req.RequestID)
ctx := logger.With(req.Context, logger.F("blobID", blobID)) ctx := logger.With(req.Context, logger.F("blobID", blobID))
@ -304,11 +302,11 @@ func (m *Module) handleUploadRequest(req *UploadRequest) (*UploadResponse, error
return nil, errors.WithStack(err) return nil, errors.WithStack(err)
} }
result, ok := rawResult.(map[string]interface{}) result, ok := rawResult.Export().(map[string]interface{})
if !ok { if !ok {
return nil, errors.Errorf( return nil, errors.Errorf(
"unexpected onBlobUpload result: expected 'map[string]interface{}', got '%T'", "unexpected onBlobUpload result: expected 'map[string]interface{}', got '%T'",
rawResult, rawResult.Export(),
) )
} }
@ -395,8 +393,8 @@ func (m *Module) saveBlob(ctx context.Context, bucketName string, blobID storage
return nil return nil
} }
func (m *Module) handleDownloadRequest(req *DownloadRequest) (*DownloadResponse, error) { func (m *Module) handleDownloadRequest(req *MessageDownloadRequest) (*MessageDownloadResponse, error) {
res := &DownloadResponse{} res := NewMessageDownloadResponse(req.RequestID)
rawResult, err := m.server.ExecFuncByName(req.Context, "onBlobDownload", req.Context, req.Bucket, req.BlobID) rawResult, err := m.server.ExecFuncByName(req.Context, "onBlobDownload", req.Context, req.Bucket, req.BlobID)
if err != nil { if err != nil {
@ -409,11 +407,11 @@ func (m *Module) handleDownloadRequest(req *DownloadRequest) (*DownloadResponse,
return nil, errors.WithStack(err) return nil, errors.WithStack(err)
} }
result, ok := rawResult.(map[string]interface{}) result, ok := rawResult.Export().(map[string]interface{})
if !ok { if !ok {
return nil, errors.Errorf( return nil, errors.Errorf(
"unexpected onBlobDownload result: expected 'map[string]interface{}', got '%T'", "unexpected onBlobDownload result: expected 'map[string]interface{}', got '%T'",
rawResult, rawResult.Export(),
) )
} }

View File

@ -17,9 +17,7 @@ import (
func TestBlobModule(t *testing.T) { func TestBlobModule(t *testing.T) {
t.Parallel() t.Parallel()
if testing.Verbose() {
logger.SetLevel(slog.LevelDebug) logger.SetLevel(slog.LevelDebug)
}
bus := memory.NewBus() bus := memory.NewBus()
store := sqlite.NewBlobStore(":memory:?_pragma=foreign_keys(1)&_pragma=busy_timeout=60000") store := sqlite.NewBlobStore(":memory:?_pragma=foreign_keys(1)&_pragma=busy_timeout=60000")
@ -30,17 +28,19 @@ func TestBlobModule(t *testing.T) {
ModuleFactory(bus, store), ModuleFactory(bus, store),
) )
script := "testdata/blob.js" data, err := os.ReadFile("testdata/blob.js")
data, err := os.ReadFile(script)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
ctx := context.Background() if err := server.Load("testdata/blob.js", string(data)); err != nil {
if err := server.Start(ctx, script, string(data)); err != nil { t.Fatal(err)
t.Fatalf("%+v", errors.WithStack(err))
} }
defer server.Stop() defer server.Stop()
ctx := context.Background()
if err := server.Start(ctx); err != nil {
t.Fatalf("%+v", errors.WithStack(err))
}
} }

View File

@ -21,9 +21,7 @@ func TestCastLoadURL(t *testing.T) {
return return
} }
if testing.Verbose() {
logger.SetLevel(slog.LevelDebug) logger.SetLevel(slog.LevelDebug)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() defer cancel()

View File

@ -2,6 +2,7 @@ package cast
import ( import (
"context" "context"
"io/ioutil"
"os" "os"
"testing" "testing"
"time" "time"
@ -23,24 +24,24 @@ func TestCastModule(t *testing.T) {
return return
} }
if testing.Verbose() {
logger.SetLevel(slog.LevelDebug) logger.SetLevel(slog.LevelDebug)
}
server := app.NewServer( server := app.NewServer(
module.ConsoleModuleFactory(), module.ConsoleModuleFactory(),
CastModuleFactory(), CastModuleFactory(),
) )
script := "testdata/cast.js" data, err := ioutil.ReadFile("testdata/cast.js")
data, err := os.ReadFile(script)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := server.Load("testdata/cast.js", string(data)); err != nil {
t.Fatal(err)
}
ctx := context.Background() ctx := context.Background()
if err := server.Start(ctx, script, string(data)); err != nil { if err := server.Start(ctx); err != nil {
t.Fatalf("%+v", errors.WithStack(err)) t.Fatalf("%+v", errors.WithStack(err))
} }
@ -58,24 +59,24 @@ func TestCastModuleRefreshDevices(t *testing.T) {
return return
} }
if testing.Verbose() {
logger.SetLevel(slog.LevelDebug) logger.SetLevel(slog.LevelDebug)
}
server := app.NewServer( server := app.NewServer(
module.ConsoleModuleFactory(), module.ConsoleModuleFactory(),
CastModuleFactory(), CastModuleFactory(),
) )
script := "testdata/refresh_devices.js" data, err := ioutil.ReadFile("testdata/refresh_devices.js")
data, err := os.ReadFile(script)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := server.Load("testdata/refresh_devices.js", string(data)); err != nil {
t.Fatal(err)
}
ctx := context.Background() ctx := context.Background()
if err := server.Start(ctx, script, string(data)); err != nil { if err := server.Start(ctx); err != nil {
t.Fatalf("%+v", errors.WithStack(err)) t.Fatalf("%+v", errors.WithStack(err))
} }
@ -86,5 +87,12 @@ func TestCastModuleRefreshDevices(t *testing.T) {
t.Error(errors.WithStack(err)) t.Error(errors.WithStack(err))
} }
spew.Dump(result) promise, ok := app.IsPromise(result)
if !ok {
t.Fatal("expected promise")
}
value := server.WaitForPromise(promise)
spew.Dump(value.Export())
} }

View File

@ -1,38 +0,0 @@
package fetch
import (
"context"
"net/url"
"forge.cadoles.com/arcad/edge/pkg/bus"
)
const (
AddressFetchRequest bus.Address = "module/fetch/request"
AddressFetchResponse bus.Address = "module/fetch/response"
)
type FetchRequest struct {
Context context.Context
RequestID string
URL *url.URL
RemoteAddr string
}
func NewFetchRequestEnvelope(ctx context.Context, remoteAddr string, url *url.URL) bus.Envelope {
return bus.NewEnvelope(AddressFetchRequest, &FetchRequest{
Context: ctx,
URL: url,
RemoteAddr: remoteAddr,
})
}
type FetchResponse struct {
Allow bool
}
func NewFetchResponseEnvelope(allow bool) bus.Envelope {
return bus.NewEnvelope(AddressFetchResponse, &FetchResponse{
Allow: allow,
})
}

View File

@ -0,0 +1,49 @@
package fetch
import (
"context"
"net/url"
"forge.cadoles.com/arcad/edge/pkg/bus"
"github.com/oklog/ulid/v2"
)
const (
MessageNamespaceFetchRequest bus.MessageNamespace = "fetchRequest"
MessageNamespaceFetchResponse bus.MessageNamespace = "fetchResponse"
)
type MessageFetchRequest struct {
Context context.Context
RequestID string
URL *url.URL
RemoteAddr string
}
func (m *MessageFetchRequest) MessageNamespace() bus.MessageNamespace {
return MessageNamespaceFetchRequest
}
func NewMessageFetchRequest(ctx context.Context, remoteAddr string, url *url.URL) *MessageFetchRequest {
return &MessageFetchRequest{
Context: ctx,
RequestID: ulid.Make().String(),
RemoteAddr: remoteAddr,
URL: url,
}
}
type MessageFetchResponse struct {
RequestID string
Allow bool
}
func (m *MessageFetchResponse) MessageNamespace() bus.MessageNamespace {
return MessageNamespaceFetchResponse
}
func NewMessageFetchResponse(requestID string) *MessageFetchResponse {
return &MessageFetchResponse{
RequestID: requestID,
}
}

View File

@ -40,10 +40,10 @@ func (m *Module) get(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
func (m *Module) handleMessages() { func (m *Module) handleMessages() {
ctx := context.Background() ctx := context.Background()
fetchErrs := m.bus.Reply(ctx, AddressFetchRequest, func(env bus.Envelope) (any, error) { err := m.bus.Reply(ctx, MessageNamespaceFetchRequest, func(msg bus.Message) (bus.Message, error) {
fetchRequest, ok := env.Message().(*FetchRequest) fetchRequest, ok := msg.(*MessageFetchRequest)
if !ok { if !ok {
return nil, errors.Wrapf(bus.ErrUnexpectedMessage, "expected fetch request, got '%T'", env.Message()) return nil, errors.Wrapf(bus.ErrUnexpectedMessage, "expected message fetch request, got '%T'", msg)
} }
res, err := m.handleFetchRequest(fetchRequest) res, err := m.handleFetchRequest(fetchRequest)
@ -57,14 +57,13 @@ func (m *Module) handleMessages() {
return res, nil return res, nil
}) })
if err != nil {
for err := range fetchErrs { panic(errors.WithStack(err))
logger.Fatal(ctx, "error while replying to fetch requests", logger.CapturedE(errors.WithStack(err)))
} }
} }
func (m *Module) handleFetchRequest(req *FetchRequest) (*FetchResponse, error) { func (m *Module) handleFetchRequest(req *MessageFetchRequest) (*MessageFetchResponse, error) {
res := &FetchResponse{} res := NewMessageFetchResponse(req.RequestID)
ctx := logger.With( ctx := logger.With(
req.Context, req.Context,
@ -84,11 +83,11 @@ func (m *Module) handleFetchRequest(req *FetchRequest) (*FetchResponse, error) {
return nil, errors.WithStack(err) return nil, errors.WithStack(err)
} }
result, ok := rawResult.(map[string]interface{}) result, ok := rawResult.Export().(map[string]interface{})
if !ok { if !ok {
return nil, errors.Errorf( return nil, errors.Errorf(
"unexpected onClientFetch result: expected 'map[string]interface{}', got '%T'", "unexpected onClientFetch result: expected 'map[string]interface{}', got '%T'",
rawResult, rawResult.Export(),
) )
} }

View File

@ -2,8 +2,8 @@ package fetch
import ( import (
"context" "context"
"io/ioutil"
"net/url" "net/url"
"os"
"testing" "testing"
"time" "time"
@ -18,9 +18,7 @@ import (
func TestFetchModule(t *testing.T) { func TestFetchModule(t *testing.T) {
t.Parallel() t.Parallel()
if testing.Verbose() {
logger.SetLevel(slog.LevelDebug) logger.SetLevel(slog.LevelDebug)
}
bus := memory.NewBus() bus := memory.NewBus()
@ -30,20 +28,22 @@ func TestFetchModule(t *testing.T) {
ModuleFactory(bus), ModuleFactory(bus),
) )
path := "testdata/fetch.js" data, err := ioutil.ReadFile("testdata/fetch.js")
data, err := os.ReadFile(path)
if err != nil { if err != nil {
t.Fatalf("%+v", errors.WithStack(err)) t.Fatalf("%+v", errors.WithStack(err))
} }
ctx := context.Background() if err := server.Load("testdata/fetch.js", string(data)); err != nil {
if err := server.Start(ctx, path, string(data)); err != nil {
t.Fatalf("%+v", errors.WithStack(err)) t.Fatalf("%+v", errors.WithStack(err))
} }
defer server.Stop() defer server.Stop()
ctx := context.Background()
if err := server.Start(ctx); err != nil {
t.Fatalf("%+v", errors.WithStack(err))
}
// Wait for module to startup // Wait for module to startup
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
@ -53,33 +53,33 @@ func TestFetchModule(t *testing.T) {
remoteAddr := "127.0.0.1" remoteAddr := "127.0.0.1"
url, _ := url.Parse("http://example.com") url, _ := url.Parse("http://example.com")
reply, err := bus.Request(ctx, NewFetchRequestEnvelope(ctx, remoteAddr, url)) rawReply, err := bus.Request(ctx, NewMessageFetchRequest(ctx, remoteAddr, url))
if err != nil { if err != nil {
t.Fatalf("%+v", errors.WithStack(err)) t.Fatalf("%+v", errors.WithStack(err))
} }
response, ok := reply.Message().(*FetchResponse) reply, ok := rawReply.(*MessageFetchResponse)
if !ok { if !ok {
t.Fatalf("unexpected reply message type '%T'", reply.Message()) t.Fatalf("unexpected reply type '%T'", rawReply)
} }
if e, g := true, response.Allow; e != g { if e, g := true, reply.Allow; e != g {
t.Errorf("reply.Allow: expected '%v', got '%v'", e, g) t.Errorf("reply.Allow: expected '%v', got '%v'", e, g)
} }
url, _ = url.Parse("https://google.com") url, _ = url.Parse("https://google.com")
reply, err = bus.Request(ctx, NewFetchRequestEnvelope(ctx, remoteAddr, url)) rawReply, err = bus.Request(ctx, NewMessageFetchRequest(ctx, remoteAddr, url))
if err != nil { if err != nil {
t.Fatalf("%+v", errors.WithStack(err)) t.Fatalf("%+v", errors.WithStack(err))
} }
response, ok = reply.Message().(*FetchResponse) reply, ok = rawReply.(*MessageFetchResponse)
if !ok { if !ok {
t.Fatalf("unexpected reply message type '%T'", reply.Message()) t.Fatalf("unexpected reply type '%T'", rawReply)
} }
if e, g := false, response.Allow; e != g { if e, g := false, reply.Allow; e != g {
t.Errorf("reply.Allow: expected '%v', got '%v'", e, g) t.Errorf("reply.Allow: expected '%v', got '%v'", e, g)
} }
} }

View File

@ -5,6 +5,7 @@ import (
"forge.cadoles.com/arcad/edge/pkg/app" "forge.cadoles.com/arcad/edge/pkg/app"
"github.com/dop251/goja" "github.com/dop251/goja"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger" "gitlab.com/wpetit/goweb/logger"
) )
@ -18,29 +19,17 @@ func (m *LifecycleModule) Export(export *goja.Object) {
} }
func (m *LifecycleModule) OnInit(ctx context.Context, rt *goja.Runtime) (err error) { func (m *LifecycleModule) OnInit(ctx context.Context, rt *goja.Runtime) (err error) {
call, ok := goja.AssertFunction(rt.Get("onInit")) _, ok := goja.AssertFunction(rt.Get("onInit"))
if !ok { if !ok {
logger.Warn(ctx, "could not find onInit() function") logger.Warn(ctx, "could not find onInit() function")
return nil return nil
} }
defer func() { if _, err := rt.RunString("setTimeout(onInit, 0)"); err != nil {
recovered := recover() return errors.WithStack(err)
if recovered == nil {
return
} }
recoveredErr, ok := recovered.(error)
if !ok {
panic(recovered)
}
err = recoveredErr
}()
call(nil, rt.ToValue(ctx))
return nil return nil
} }

38
pkg/module/message.go Normal file
View File

@ -0,0 +1,38 @@
package module
import (
"context"
"forge.cadoles.com/arcad/edge/pkg/bus"
)
const (
MessageNamespaceClient bus.MessageNamespace = "client"
MessageNamespaceServer bus.MessageNamespace = "server"
)
type ServerMessage struct {
Context context.Context
Data interface{}
}
func (m *ServerMessage) MessageNamespace() bus.MessageNamespace {
return MessageNamespaceServer
}
func NewServerMessage(ctx context.Context, data interface{}) *ServerMessage {
return &ServerMessage{ctx, data}
}
type ClientMessage struct {
Context context.Context
Data map[string]interface{}
}
func (m *ClientMessage) MessageNamespace() bus.MessageNamespace {
return MessageNamespaceClient
}
func NewClientMessage(ctx context.Context, data map[string]interface{}) *ClientMessage {
return &ClientMessage{ctx, data}
}

View File

@ -5,7 +5,7 @@ import (
"forge.cadoles.com/arcad/edge/pkg/app" "forge.cadoles.com/arcad/edge/pkg/app"
"forge.cadoles.com/arcad/edge/pkg/bus" "forge.cadoles.com/arcad/edge/pkg/bus"
edgehttp "forge.cadoles.com/arcad/edge/pkg/http" edgeHTTP "forge.cadoles.com/arcad/edge/pkg/http"
"forge.cadoles.com/arcad/edge/pkg/module" "forge.cadoles.com/arcad/edge/pkg/module"
"forge.cadoles.com/arcad/edge/pkg/module/util" "forge.cadoles.com/arcad/edge/pkg/module/util"
"github.com/dop251/goja" "github.com/dop251/goja"
@ -38,9 +38,10 @@ func (m *Module) broadcast(call goja.FunctionCall, rt *goja.Runtime) goja.Value
} }
data := call.Argument(0).Export() data := call.Argument(0).Export()
ctx := context.Background()
env := edgehttp.NewOutgoingMessageEnvelope("", data) msg := module.NewServerMessage(ctx, data)
if err := m.bus.Publish(env); err != nil { if err := m.bus.Publish(ctx, msg); err != nil {
panic(rt.ToValue(errors.WithStack(err))) panic(rt.ToValue(errors.WithStack(err)))
} }
@ -52,33 +53,38 @@ func (m *Module) send(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
panic(rt.ToValue(errors.New("invalid number of argument"))) panic(rt.ToValue(errors.New("invalid number of argument")))
} }
var ctx context.Context
firstArg := call.Argument(0) firstArg := call.Argument(0)
sessionID, ok := firstArg.Export().(string) sessionID, ok := firstArg.Export().(string)
if !ok { if ok {
ctx := util.AssertContext(firstArg, rt) ctx = module.WithContext(context.Background(), map[module.ContextKey]any{
sessionID = module.ContextValue[string](ctx, edgehttp.ContextKeySessionID) edgeHTTP.ContextKeySessionID: sessionID,
})
} else {
ctx = util.AssertContext(firstArg, rt)
} }
data := call.Argument(1).Export() data := call.Argument(1).Export()
env := edgehttp.NewOutgoingMessageEnvelope(sessionID, data) msg := module.NewServerMessage(ctx, data)
if err := m.bus.Publish(env); err != nil { if err := m.bus.Publish(ctx, msg); err != nil {
panic(rt.ToValue(errors.WithStack(err))) panic(rt.ToValue(errors.WithStack(err)))
} }
return nil return nil
} }
func (m *Module) handleIncomingMessages() { func (m *Module) handleClientMessages() {
ctx := context.Background() ctx := context.Background()
logger.Debug( logger.Debug(
ctx, ctx,
"subscribing to bus envelopes", "subscribing to bus messages",
) )
envelopes, err := m.bus.Subscribe(ctx, edgehttp.AddressIncomingMessage) clientMessages, err := m.bus.Subscribe(ctx, module.MessageNamespaceClient)
if err != nil { if err != nil {
panic(errors.WithStack(err)) panic(errors.WithStack(err))
} }
@ -86,16 +92,16 @@ func (m *Module) handleIncomingMessages() {
defer func() { defer func() {
logger.Debug( logger.Debug(
ctx, ctx,
"unsubscribing from bus envelopes", "unsubscribing from bus messages",
) )
m.bus.Unsubscribe(edgehttp.AddressIncomingMessage, envelopes) m.bus.Unsubscribe(ctx, module.MessageNamespaceClient, clientMessages)
}() }()
for { for {
logger.Debug( logger.Debug(
ctx, ctx,
"waiting for next envelope", "waiting for next message",
) )
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -106,13 +112,13 @@ func (m *Module) handleIncomingMessages() {
return return
case env := <-envelopes: case msg := <-clientMessages:
incomingMessage, ok := env.Message().(*edgehttp.IncomingMessage) clientMessage, ok := msg.(*module.ClientMessage)
if !ok { if !ok {
logger.Warn( logger.Warn(
ctx, ctx,
"unexpected message type", "unexpected message type",
logger.F("message", env.Message()), logger.F("message", msg),
) )
continue continue
@ -120,11 +126,11 @@ func (m *Module) handleIncomingMessages() {
logger.Debug( logger.Debug(
ctx, ctx,
"received incoming message", "received client message",
logger.F("message", incomingMessage), logger.F("message", clientMessage),
) )
if _, err := m.server.ExecFuncByName(incomingMessage.Context, "onClientMessage", incomingMessage.Context, incomingMessage.Payload); err != nil { if _, err := m.server.ExecFuncByName(clientMessage.Context, "onClientMessage", clientMessage.Context, clientMessage.Data); err != nil {
if errors.Is(err, app.ErrFuncDoesNotExist) { if errors.Is(err, app.ErrFuncDoesNotExist) {
continue continue
} }
@ -146,7 +152,7 @@ func ModuleFactory(bus bus.Bus) app.ServerModuleFactory {
bus: bus, bus: bus,
} }
go module.handleIncomingMessages() go module.handleClientMessages()
return module return module
} }

278
pkg/module/rpc.go Normal file
View File

@ -0,0 +1,278 @@
package module
import (
"context"
"fmt"
"sync"
"forge.cadoles.com/arcad/edge/pkg/app"
"forge.cadoles.com/arcad/edge/pkg/bus"
"forge.cadoles.com/arcad/edge/pkg/module/util"
"github.com/dop251/goja"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
type RPCRequest struct {
Method string
Params interface{}
ID interface{}
}
type RPCError struct {
Code int `json:"code"`
Message string `json:"message"`
Data interface{} `json:"data"`
}
type RPCResponse struct {
Result interface{}
Error *RPCError
ID interface{}
}
type RPCModule struct {
server *app.Server
bus bus.Bus
callbacks sync.Map
}
func (m *RPCModule) Name() string {
return "rpc"
}
func (m *RPCModule) Export(export *goja.Object) {
if err := export.Set("register", m.register); err != nil {
panic(errors.Wrap(err, "could not set 'register' function"))
}
if err := export.Set("unregister", m.unregister); err != nil {
panic(errors.Wrap(err, "could not set 'unregister' function"))
}
}
func (m *RPCModule) OnInit(ctx context.Context, rt *goja.Runtime) error {
go m.handleMessages(ctx)
return nil
}
func (m *RPCModule) register(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
fnName := util.AssertString(call.Argument(0), rt)
var (
callable goja.Callable
ok bool
)
if len(call.Arguments) > 1 {
callable, ok = goja.AssertFunction(call.Argument(1))
} else {
callable, ok = goja.AssertFunction(rt.Get(fnName))
}
if !ok {
panic(rt.NewTypeError("method should be a valid function"))
}
ctx := context.Background()
logger.Debug(ctx, "registering method", logger.F("method", fnName))
m.callbacks.Store(fnName, callable)
return nil
}
func (m *RPCModule) unregister(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
fnName := util.AssertString(call.Argument(0), rt)
m.callbacks.Delete(fnName)
return nil
}
func (m *RPCModule) handleMessages(ctx context.Context) {
clientMessages, err := m.bus.Subscribe(ctx, MessageNamespaceClient)
if err != nil {
panic(errors.WithStack(err))
}
defer func() {
m.bus.Unsubscribe(ctx, MessageNamespaceClient, clientMessages)
}()
sendRes := func(ctx context.Context, req *RPCRequest, result goja.Value) {
res := &RPCResponse{
ID: req.ID,
Result: result.Export(),
}
logger.Debug(ctx, "sending rpc response", logger.F("response", res))
if err := m.sendResponse(ctx, res); err != nil {
logger.Error(
ctx, "could not send response",
logger.CapturedE(errors.WithStack(err)),
logger.F("response", res),
logger.F("request", req),
)
}
}
for msg := range clientMessages {
go m.handleMessage(ctx, msg, sendRes)
}
}
func (m *RPCModule) handleMessage(ctx context.Context, msg bus.Message, sendRes func(ctx context.Context, req *RPCRequest, result goja.Value)) {
clientMessage, ok := msg.(*ClientMessage)
if !ok {
logger.Warn(ctx, "unexpected bus message", logger.F("message", msg))
return
}
ok, req := m.isRPCRequest(clientMessage)
if !ok {
return
}
logger.Debug(ctx, "received rpc request", logger.F("request", req))
rawCallable, exists := m.callbacks.Load(req.Method)
if !exists {
logger.Debug(ctx, "method not found", logger.F("req", req))
if err := m.sendMethodNotFoundResponse(clientMessage.Context, req); err != nil {
logger.Error(
ctx, "could not send method not found response",
logger.CapturedE(errors.WithStack(err)),
logger.F("request", req),
)
}
return
}
callable, ok := rawCallable.(goja.Callable)
if !ok {
logger.Debug(ctx, "invalid method", logger.F("req", req))
if err := m.sendMethodNotFoundResponse(clientMessage.Context, req); err != nil {
logger.Error(
ctx, "could not send method not found response",
logger.CapturedE(errors.WithStack(err)),
logger.F("request", req),
)
}
return
}
result, err := m.server.Exec(clientMessage.Context, callable, clientMessage.Context, req.Params)
if err != nil {
logger.Error(
ctx, "rpc call error",
logger.CapturedE(errors.WithStack(err)),
logger.F("request", req),
)
if err := m.sendErrorResponse(clientMessage.Context, req, err); err != nil {
logger.Error(
ctx, "could not send error response",
logger.CapturedE(errors.WithStack(err)),
logger.F("originalError", err),
logger.F("request", req),
)
}
return
}
promise, ok := app.IsPromise(result)
if ok {
go func(ctx context.Context, req *RPCRequest, promise *goja.Promise) {
result := m.server.WaitForPromise(promise)
sendRes(ctx, req, result)
}(clientMessage.Context, req, promise)
} else {
sendRes(clientMessage.Context, req, result)
}
}
func (m *RPCModule) sendErrorResponse(ctx context.Context, req *RPCRequest, err error) error {
return m.sendResponse(ctx, &RPCResponse{
ID: req.ID,
Result: nil,
Error: &RPCError{
Code: -32603,
Message: err.Error(),
},
})
}
func (m *RPCModule) sendMethodNotFoundResponse(ctx context.Context, req *RPCRequest) error {
return m.sendResponse(ctx, &RPCResponse{
ID: req.ID,
Result: nil,
Error: &RPCError{
Code: -32601,
Message: fmt.Sprintf("method not found"),
},
})
}
func (m *RPCModule) sendResponse(ctx context.Context, res *RPCResponse) error {
msg := NewServerMessage(ctx, map[string]interface{}{
"jsonrpc": "2.0",
"id": res.ID,
"error": res.Error,
"result": res.Result,
})
if err := m.bus.Publish(ctx, msg); err != nil {
return errors.WithStack(err)
}
return nil
}
func (m *RPCModule) isRPCRequest(msg *ClientMessage) (bool, *RPCRequest) {
jsonRPC, exists := msg.Data["jsonrpc"]
if !exists || jsonRPC != "2.0" {
return false, nil
}
rawMethod, exists := msg.Data["method"]
if !exists {
return false, nil
}
method, ok := rawMethod.(string)
if !ok {
return false, nil
}
id := msg.Data["id"]
params := msg.Data["params"]
return true, &RPCRequest{
ID: id,
Method: method,
Params: params,
}
}
func RPCModuleFactory(bus bus.Bus) app.ServerModuleFactory {
return func(server *app.Server) app.ServerModule {
mod := &RPCModule{
server: server,
bus: bus,
}
return mod
}
}
var _ app.InitializableModule = &RPCModule{}

View File

@ -1,21 +0,0 @@
package rpc
import (
"context"
"forge.cadoles.com/arcad/edge/pkg/bus"
)
const (
Address bus.Address = "module/rpc"
)
type Request struct {
Context context.Context
Method string
Params any
}
func NewRequestEnvelope(ctx context.Context, method string, params any) bus.Envelope {
return bus.NewEnvelope(Address, &Request{ctx, method, params})
}

View File

@ -1,7 +0,0 @@
package rpc
import "errors"
var (
ErrMethodNotFound = errors.New("method not found")
)

View File

@ -1,19 +0,0 @@
package rpc
import "fmt"
type JSONRPCRequest struct {
ID any
Method string
Params any
}
type JSONRPCError struct {
Code int `json:"code"`
Message string `json:"message"`
Data interface{} `json:"data"`
}
func (e *JSONRPCError) Error() string {
return fmt.Sprintf("json-rpc error: %d - %s", e.Code, e.Message)
}

View File

@ -1,256 +0,0 @@
package rpc
import (
"context"
"sync"
"forge.cadoles.com/arcad/edge/pkg/app"
"forge.cadoles.com/arcad/edge/pkg/bus"
edgehttp "forge.cadoles.com/arcad/edge/pkg/http"
"forge.cadoles.com/arcad/edge/pkg/module"
"forge.cadoles.com/arcad/edge/pkg/module/util"
"github.com/dop251/goja"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
type Module struct {
server *app.Server
bus bus.Bus
callbacks sync.Map
}
func (m *Module) Name() string {
return "rpc"
}
func (m *Module) Export(export *goja.Object) {
if err := export.Set("register", m.register); err != nil {
panic(errors.Wrap(err, "could not set 'register' function"))
}
if err := export.Set("unregister", m.unregister); err != nil {
panic(errors.Wrap(err, "could not set 'unregister' function"))
}
}
func (m *Module) OnInit(ctx context.Context, rt *goja.Runtime) error {
requestErrs := m.bus.Reply(ctx, Address, m.handleRequest)
go func() {
for err := range requestErrs {
logger.Error(ctx, "error while replying to rpc requests", logger.CapturedE(errors.WithStack(err)))
}
}()
httpIncomingMessages, err := m.bus.Subscribe(ctx, edgehttp.AddressIncomingMessage)
if err != nil {
return errors.WithStack(err)
}
go m.handleIncomingHTTPMessages(ctx, httpIncomingMessages)
return nil
}
func (m *Module) register(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
fnName := util.AssertString(call.Argument(0), rt)
var (
callable goja.Callable
ok bool
)
if len(call.Arguments) > 1 {
callable, ok = goja.AssertFunction(call.Argument(1))
} else {
callable, ok = goja.AssertFunction(rt.Get(fnName))
}
if !ok {
panic(rt.NewTypeError("method should be a valid function"))
}
ctx := context.Background()
logger.Debug(ctx, "registering method", logger.F("method", fnName))
m.callbacks.Store(fnName, callable)
return nil
}
func (m *Module) unregister(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
fnName := util.AssertString(call.Argument(0), rt)
m.callbacks.Delete(fnName)
return nil
}
func (m *Module) handleRequest(env bus.Envelope) (any, error) {
request, ok := env.Message().(*Request)
if !ok {
logger.Warn(context.Background(), "unexpected bus message", logger.F("message", env.Message()))
return nil, errors.WithStack(bus.ErrUnexpectedMessage)
}
ctx := logger.With(request.Context, logger.F("request", request))
logger.Debug(ctx, "received rpc request")
rawCallable, exists := m.callbacks.Load(request.Method)
if !exists {
logger.Debug(ctx, "method not found")
return nil, errors.WithStack(ErrMethodNotFound)
}
callable, ok := rawCallable.(goja.Callable)
if !ok {
logger.Debug(ctx, "invalid method")
return nil, errors.WithStack(ErrMethodNotFound)
}
result, err := m.server.Exec(ctx, callable, request.Context, request.Params)
if err != nil {
logger.Error(
ctx, "rpc call error",
logger.CapturedE(errors.WithStack(err)),
)
return nil, errors.WithStack(err)
}
return result, nil
}
func (m *Module) handleIncomingHTTPMessages(ctx context.Context, incoming <-chan bus.Envelope) {
defer func() {
m.bus.Unsubscribe(edgehttp.AddressIncomingMessage, incoming)
}()
for env := range incoming {
msg, ok := env.Message().(*edgehttp.IncomingMessage)
if !ok {
logger.Error(ctx, "unexpected incoming http message type", logger.F("message", env.Message()))
continue
}
jsonReq, ok := m.isRPCRequest(msg.Payload)
if !ok {
continue
}
requestCtx := logger.With(msg.Context, logger.F("rpcRequestMethod", jsonReq.Method), logger.F("rpcRequestID", jsonReq.ID))
request := NewRequestEnvelope(msg.Context, jsonReq.Method, jsonReq.Params)
sessionID := module.ContextValue[string](msg.Context, edgehttp.ContextKeySessionID)
reply, err := m.bus.Request(requestCtx, request)
if err != nil {
err = errors.WithStack(err)
logger.Error(
ctx, "could not execute rpc request",
logger.CapturedE(err),
)
if errors.Is(err, ErrMethodNotFound) {
if err := m.sendMethodNotFoundResponse(sessionID, jsonReq.ID); err != nil {
logger.Error(
ctx, "could not send json rpc error response",
logger.CapturedE(errors.WithStack(err)),
)
}
continue
}
if err := m.sendErrorResponse(sessionID, jsonReq.ID, err); err != nil {
logger.Error(
ctx, "could not send json rpc error response",
logger.CapturedE(errors.WithStack(err)),
)
}
continue
}
if err := m.sendResponse(sessionID, jsonReq.ID, reply.Message(), nil); err != nil {
logger.Error(
ctx, "could not send json rpc result response",
logger.CapturedE(err),
)
}
}
}
func (m *Module) sendErrorResponse(sessionID string, requestID any, err error) error {
return m.sendResponse(sessionID, requestID, nil, &JSONRPCError{
Code: -32603,
Message: err.Error(),
})
}
func (m *Module) sendMethodNotFoundResponse(sessionID string, requestID any) error {
return m.sendResponse(sessionID, requestID, nil, &JSONRPCError{
Code: -32601,
Message: "method not found",
})
}
func (m *Module) sendResponse(sessionID string, requestID any, result any, err error) error {
env := edgehttp.NewOutgoingMessageEnvelope(sessionID, map[string]interface{}{
"jsonrpc": "2.0",
"id": requestID,
"error": err,
"result": result,
})
if err := m.bus.Publish(env); err != nil {
return errors.WithStack(err)
}
return nil
}
func (m *Module) isRPCRequest(payload map[string]any) (*JSONRPCRequest, bool) {
jsonRPC, exists := payload["jsonrpc"]
if !exists || jsonRPC != "2.0" {
return nil, false
}
rawMethod, exists := payload["method"]
if !exists {
return nil, false
}
method, ok := rawMethod.(string)
if !ok {
return nil, false
}
id := payload["id"]
params := payload["params"]
return &JSONRPCRequest{
ID: id,
Method: method,
Params: params,
}, true
}
func ModuleFactory(bus bus.Bus) app.ServerModuleFactory {
return func(server *app.Server) app.ServerModule {
mod := &Module{
server: server,
bus: bus,
}
return mod
}
}
var _ app.InitializableModule = &Module{}

View File

@ -1,109 +0,0 @@
package rpc
import (
"context"
"os"
"sync"
"testing"
"forge.cadoles.com/arcad/edge/pkg/app"
"forge.cadoles.com/arcad/edge/pkg/bus"
"forge.cadoles.com/arcad/edge/pkg/bus/memory"
"forge.cadoles.com/arcad/edge/pkg/module"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
func TestServerExecDeadlock(t *testing.T) {
if testing.Verbose() {
logger.SetLevel(logger.LevelDebug)
}
b := memory.NewBus(memory.WithBufferSize(1))
server := app.NewServer(
module.ConsoleModuleFactory(),
ModuleFactory(b),
module.LifecycleModuleFactory(),
)
data, err := os.ReadFile("testdata/deadlock.js")
if err != nil {
t.Fatalf("%+v", errors.WithStack(err))
}
ctx := context.Background()
t.Log("starting server")
if err := server.Start(ctx, "deadlock.js", string(data)); err != nil {
t.Fatalf("%+v", errors.WithStack(err))
}
defer server.Stop()
t.Log("server started")
count := 100
delay := 100
var wg sync.WaitGroup
wg.Add(count)
for i := 0; i < count; i++ {
go func(i int) {
defer wg.Done()
t.Logf("calling %d", i)
isCanceled := i%2 == 0
var ctx context.Context
if isCanceled {
canceledCtx, cancel := context.WithCancel(context.Background())
cancel()
ctx = canceledCtx
} else {
ctx = context.Background()
}
env := NewRequestEnvelope(ctx, "doSomethingLong", map[string]any{
"i": i,
"delay": delay,
})
t.Logf("publishing envelope #%d", i)
reply, err := b.Request(ctx, env)
if err != nil {
if errors.Is(err, context.Canceled) && isCanceled {
return
}
if errors.Is(err, bus.ErrNoResponse) && isCanceled {
return
}
t.Errorf("%+v", errors.WithStack(err))
return
}
result, ok := reply.Message().(int64)
if !ok {
t.Errorf("response.Result: expected type '%T', got '%T'", int64(0), reply.Message())
return
}
if e, g := i, int(result); e != g {
t.Errorf("response.Result: expected '%v', got '%v'", e, g)
return
}
}(i)
}
wg.Wait()
}

View File

@ -1,14 +0,0 @@
function onInit() {
rpc.register("doSomethingLong", doSomethingLong)
}
function doSomethingLong(ctx, params) {
var start = Date.now()
while (true) {
var now = Date.now()
if (now - start >= params.delay) break
}
return params.i;
}

View File

@ -33,14 +33,18 @@ func TestModule(t *testing.T) {
t.Fatalf("%+v", errors.WithStack(err)) t.Fatalf("%+v", errors.WithStack(err))
} }
ctx := context.Background() if err := server.Load("testdata/share.js", string(data)); err != nil {
if err := server.Start(ctx, "testdata/share.js", string(data)); err != nil {
t.Fatalf("%+v", errors.WithStack(err)) t.Fatalf("%+v", errors.WithStack(err))
} }
defer server.Stop() ctx := context.Background()
if err := server.Start(ctx); err != nil {
t.Fatalf("%+v", errors.WithStack(err))
}
if _, err := server.ExecFuncByName(context.Background(), "testModule"); err != nil { if _, err := server.ExecFuncByName(context.Background(), "testModule"); err != nil {
t.Fatalf("%+v", errors.WithStack(err)) t.Fatalf("%+v", errors.WithStack(err))
} }
server.Stop()
} }

View File

@ -27,14 +27,18 @@ func TestStoreModule(t *testing.T) {
t.Fatalf("%+v", errors.WithStack(err)) t.Fatalf("%+v", errors.WithStack(err))
} }
ctx := context.Background() if err := server.Load("testdata/store.js", string(data)); err != nil {
if err := server.Start(ctx, "testdata/store.js", string(data)); err != nil {
t.Fatalf("%+v", errors.WithStack(err)) t.Fatalf("%+v", errors.WithStack(err))
} }
defer server.Stop() ctx := context.Background()
if err := server.Start(ctx); err != nil {
t.Fatalf("%+v", errors.WithStack(err))
}
if _, err := server.ExecFuncByName(context.Background(), "testStore"); err != nil { if _, err := server.ExecFuncByName(context.Background(), "testStore"); err != nil {
t.Fatalf("%+v", errors.WithStack(err)) t.Fatalf("%+v", errors.WithStack(err))
} }
server.Stop()
} }

View File

@ -5,7 +5,6 @@ import (
"net/url" "net/url"
"strconv" "strconv"
"sync" "sync"
"time"
"github.com/jackc/puddle/v2" "github.com/jackc/puddle/v2"
"github.com/keegancsmith/rpc" "github.com/keegancsmith/rpc"
@ -75,53 +74,16 @@ func WithPooledClient(serverURL *url.URL) WithClientFunc {
return errors.WithStack(err) return errors.WithStack(err)
} }
attempts := 0
max := 5
for {
if attempts >= max {
logger.Debug(ctx, "rpc client call retrying failed", logger.F("attempts", attempts))
return errors.Wrapf(err, "rpc client call failed after %d attempts", max)
}
clientResource, err := pool.Acquire(ctx) clientResource, err := pool.Acquire(ctx)
if err != nil { if err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }
client := clientResource.Value() if err := fn(ctx, clientResource.Value()); err != nil {
if err := fn(ctx, client); err != nil {
if errors.Is(err, rpc.ErrShutdown) { if errors.Is(err, rpc.ErrShutdown) {
clientResource.Destroy() clientResource.Destroy()
wait := time.Duration(8<<(attempts+1)) * time.Millisecond
logger.Warn(
ctx, "rpc client connection is shutdown, retrying",
logger.F("attempts", attempts),
logger.F("max", max),
logger.F("delay", wait),
)
timer := time.NewTimer(wait)
select {
case <-timer.C:
attempts++
continue
case <-ctx.Done():
if err := ctx.Err(); err != nil {
return errors.WithStack(err)
} }
return nil
}
}
clientResource.Release()
return errors.WithStack(err) return errors.WithStack(err)
} }
@ -129,5 +91,4 @@ func WithPooledClient(serverURL *url.URL) WithClientFunc {
return nil return nil
} }
}
} }

View File

@ -45,12 +45,12 @@ func (s *Service) NewBlobReader(ctx context.Context, args *NewBlobReaderArgs, re
func (s *Service) getOpenedReader(id ReaderID) (io.ReadSeekCloser, error) { func (s *Service) getOpenedReader(id ReaderID) (io.ReadSeekCloser, error) {
raw, exists := s.readers.Load(id) raw, exists := s.readers.Load(id)
if !exists { if !exists {
return nil, errors.Errorf("could not find reader '%s'", id) return nil, errors.Errorf("could not find writer '%s'", id)
} }
reader, ok := raw.(io.ReadSeekCloser) reader, ok := raw.(io.ReadSeekCloser)
if !ok { if !ok {
return nil, errors.Errorf("unexpected type '%T' for reader", raw) return nil, errors.Errorf("unexpected type '%T' for writer", raw)
} }
return reader, nil return reader, nil