fix: prevent bus congestion by flushing out messages
arcad/edge/pipeline/head This commit looks good Details

This commit is contained in:
wpetit 2023-04-26 15:53:23 +02:00
parent ba9ae6e391
commit 17808d14c9
3 changed files with 138 additions and 69 deletions

View File

@ -22,13 +22,13 @@ func (b *Bus) Subscribe(ctx context.Context, ns bus.MessageNamespace) (<-chan bu
) )
dispatchers := b.getDispatchers(ns) dispatchers := b.getDispatchers(ns)
d := newEventDispatcher(b.opt.BufferSize) disp := newEventDispatcher(b.opt.BufferSize)
go d.Run() go disp.Run(ctx)
dispatchers.Add(d) dispatchers.Add(disp)
return d.Out(), nil return disp.Out(), nil
} }
func (b *Bus) Unsubscribe(ctx context.Context, ns bus.MessageNamespace, ch <-chan bus.Message) { func (b *Bus) Unsubscribe(ctx context.Context, ns bus.MessageNamespace, ch <-chan bus.Message) {
@ -52,6 +52,12 @@ func (b *Bus) Publish(ctx context.Context, msg bus.Message) error {
) )
for _, d := range dispatchersList { for _, d := range dispatchersList {
if d.Closed() {
dispatchers.Remove(d)
continue
}
if err := d.In(msg); err != nil { if err := d.In(msg); err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }

View File

@ -1,9 +1,13 @@
package memory package memory
import ( import (
"context"
"sync" "sync"
"time"
"forge.cadoles.com/arcad/edge/pkg/bus" "forge.cadoles.com/arcad/edge/pkg/bus"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
) )
type eventDispatcherSet struct { type eventDispatcherSet struct {
@ -18,13 +22,21 @@ func (s *eventDispatcherSet) Add(d *eventDispatcher) {
s.items[d] = struct{}{} s.items[d] = struct{}{}
} }
func (s *eventDispatcherSet) Remove(d *eventDispatcher) {
s.mutex.Lock()
defer s.mutex.Unlock()
d.close()
delete(s.items, d)
}
func (s *eventDispatcherSet) RemoveByOutChannel(out <-chan bus.Message) { func (s *eventDispatcherSet) RemoveByOutChannel(out <-chan bus.Message) {
s.mutex.Lock() s.mutex.Lock()
defer s.mutex.Unlock() defer s.mutex.Unlock()
for d := range s.items { for d := range s.items {
if d.IsOut(out) { if d.IsOut(out) {
d.Close() d.close()
delete(s.items, d) delete(s.items, d)
} }
} }
@ -56,10 +68,21 @@ type eventDispatcher struct {
closed bool closed bool
} }
func (d *eventDispatcher) Closed() bool {
d.mutex.RLock()
defer d.mutex.RUnlock()
return d.closed
}
func (d *eventDispatcher) Close() { func (d *eventDispatcher) Close() {
d.mutex.Lock() d.mutex.Lock()
defer d.mutex.Unlock() defer d.mutex.Unlock()
d.close()
}
func (d *eventDispatcher) close() {
d.closed = true d.closed = true
close(d.in) close(d.in)
} }
@ -85,16 +108,52 @@ func (d *eventDispatcher) IsOut(out <-chan bus.Message) bool {
return d.out == out return d.out == out
} }
func (d *eventDispatcher) Run() { func (d *eventDispatcher) Run(ctx context.Context) {
defer func() {
for {
logger.Debug(ctx, "closing dispatcher, flushing out incoming messages")
close(d.out)
// Flush all incoming messages
for {
_, ok := <-d.in
if !ok {
return
}
}
}
}()
for { for {
msg, ok := <-d.in msg, ok := <-d.in
if !ok { if !ok {
close(d.out)
return return
} }
d.out <- msg timeout := time.After(time.Second)
select {
case d.out <- msg:
case <-timeout:
logger.Error(
ctx,
"out message channel timeout",
logger.F("message", msg),
)
return
case <-ctx.Done():
logger.Error(
ctx,
"message subscription context canceled",
logger.F("message", msg),
logger.E(errors.WithStack(ctx.Err())),
)
return
}
} }
} }

View File

@ -123,79 +123,83 @@ func (m *RPCModule) handleMessages() {
} }
for msg := range clientMessages { for msg := range clientMessages {
clientMessage, ok := msg.(*ClientMessage) go m.handleMessage(ctx, msg, sendRes)
if !ok { }
logger.Warn(ctx, "unexpected bus message", logger.F("message", msg)) }
continue func (m *RPCModule) handleMessage(ctx context.Context, msg bus.Message, sendRes func(ctx context.Context, req *RPCRequest, result goja.Value)) {
} clientMessage, ok := msg.(*ClientMessage)
if !ok {
logger.Warn(ctx, "unexpected bus message", logger.F("message", msg))
ok, req := m.isRPCRequest(clientMessage) return
if !ok { }
continue
}
logger.Debug(ctx, "received rpc request", logger.F("request", req)) ok, req := m.isRPCRequest(clientMessage)
if !ok {
return
}
rawCallable, exists := m.callbacks.Load(req.Method) logger.Debug(ctx, "received rpc request", logger.F("request", req))
if !exists {
logger.Debug(ctx, "method not found", logger.F("req", req))
if err := m.sendMethodNotFoundResponse(clientMessage.Context, req); err != nil { rawCallable, exists := m.callbacks.Load(req.Method)
logger.Error( if !exists {
ctx, "could not send method not found response", logger.Debug(ctx, "method not found", logger.F("req", req))
logger.E(errors.WithStack(err)),
logger.F("request", req),
)
}
continue if err := m.sendMethodNotFoundResponse(clientMessage.Context, req); err != nil {
}
callable, ok := rawCallable.(goja.Callable)
if !ok {
logger.Debug(ctx, "invalid method", logger.F("req", req))
if err := m.sendMethodNotFoundResponse(clientMessage.Context, req); err != nil {
logger.Error(
ctx, "could not send method not found response",
logger.E(errors.WithStack(err)),
logger.F("request", req),
)
}
continue
}
result, err := m.server.Exec(clientMessage.Context, callable, clientMessage.Context, req.Params)
if err != nil {
logger.Error( logger.Error(
ctx, "rpc call error", ctx, "could not send method not found response",
logger.E(errors.WithStack(err)), logger.E(errors.WithStack(err)),
logger.F("request", req), logger.F("request", req),
) )
if err := m.sendErrorResponse(clientMessage.Context, req, err); err != nil {
logger.Error(
ctx, "could not send error response",
logger.E(errors.WithStack(err)),
logger.F("originalError", err),
logger.F("request", req),
)
}
continue
} }
promise, ok := app.IsPromise(result) return
if ok { }
go func(ctx context.Context, req *RPCRequest, promise *goja.Promise) {
result := m.server.WaitForPromise(promise) callable, ok := rawCallable.(goja.Callable)
sendRes(ctx, req, result) if !ok {
}(clientMessage.Context, req, promise) logger.Debug(ctx, "invalid method", logger.F("req", req))
} else {
sendRes(clientMessage.Context, req, result) if err := m.sendMethodNotFoundResponse(clientMessage.Context, req); err != nil {
logger.Error(
ctx, "could not send method not found response",
logger.E(errors.WithStack(err)),
logger.F("request", req),
)
} }
return
}
result, err := m.server.Exec(clientMessage.Context, callable, clientMessage.Context, req.Params)
if err != nil {
logger.Error(
ctx, "rpc call error",
logger.E(errors.WithStack(err)),
logger.F("request", req),
)
if err := m.sendErrorResponse(clientMessage.Context, req, err); err != nil {
logger.Error(
ctx, "could not send error response",
logger.E(errors.WithStack(err)),
logger.F("originalError", err),
logger.F("request", req),
)
}
return
}
promise, ok := app.IsPromise(result)
if ok {
go func(ctx context.Context, req *RPCRequest, promise *goja.Promise) {
result := m.server.WaitForPromise(promise)
sendRes(ctx, req, result)
}(clientMessage.Context, req, promise)
} else {
sendRes(clientMessage.Context, req, result)
} }
} }