2023-02-09 12:16:36 +01:00
|
|
|
package memory
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
|
|
|
|
"forge.cadoles.com/arcad/edge/pkg/bus"
|
|
|
|
cmap "github.com/orcaman/concurrent-map"
|
|
|
|
"github.com/pkg/errors"
|
|
|
|
"gitlab.com/wpetit/goweb/logger"
|
|
|
|
)
|
|
|
|
|
|
|
|
type Bus struct {
|
|
|
|
opt *Option
|
|
|
|
dispatchers cmap.ConcurrentMap
|
|
|
|
nextRequestID uint64
|
|
|
|
}
|
|
|
|
|
|
|
|
func (b *Bus) Subscribe(ctx context.Context, ns bus.MessageNamespace) (<-chan bus.Message, error) {
|
|
|
|
logger.Debug(
|
|
|
|
ctx, "subscribing to messages",
|
|
|
|
logger.F("messageNamespace", ns),
|
|
|
|
)
|
|
|
|
|
|
|
|
dispatchers := b.getDispatchers(ns)
|
2023-04-26 15:53:23 +02:00
|
|
|
disp := newEventDispatcher(b.opt.BufferSize)
|
2023-02-09 12:16:36 +01:00
|
|
|
|
2023-04-26 15:53:23 +02:00
|
|
|
go disp.Run(ctx)
|
2023-02-09 12:16:36 +01:00
|
|
|
|
2023-04-26 15:53:23 +02:00
|
|
|
dispatchers.Add(disp)
|
2023-02-09 12:16:36 +01:00
|
|
|
|
2023-04-26 15:53:23 +02:00
|
|
|
return disp.Out(), nil
|
2023-02-09 12:16:36 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
func (b *Bus) Unsubscribe(ctx context.Context, ns bus.MessageNamespace, ch <-chan bus.Message) {
|
|
|
|
logger.Debug(
|
|
|
|
ctx, "unsubscribing from messages",
|
|
|
|
logger.F("messageNamespace", ns),
|
|
|
|
)
|
|
|
|
|
|
|
|
dispatchers := b.getDispatchers(ns)
|
|
|
|
dispatchers.RemoveByOutChannel(ch)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (b *Bus) Publish(ctx context.Context, msg bus.Message) error {
|
|
|
|
dispatchers := b.getDispatchers(msg.MessageNamespace())
|
|
|
|
dispatchersList := dispatchers.List()
|
|
|
|
|
|
|
|
logger.Debug(
|
|
|
|
ctx, "publishing message",
|
|
|
|
logger.F("dispatchers", len(dispatchersList)),
|
|
|
|
logger.F("messageNamespace", msg.MessageNamespace()),
|
|
|
|
)
|
|
|
|
|
|
|
|
for _, d := range dispatchersList {
|
2023-04-26 15:53:23 +02:00
|
|
|
if d.Closed() {
|
|
|
|
dispatchers.Remove(d)
|
|
|
|
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2023-02-09 12:16:36 +01:00
|
|
|
if err := d.In(msg); err != nil {
|
|
|
|
return errors.WithStack(err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (b *Bus) getDispatchers(namespace bus.MessageNamespace) *eventDispatcherSet {
|
|
|
|
strNamespace := string(namespace)
|
|
|
|
|
|
|
|
rawDispatchers, exists := b.dispatchers.Get(strNamespace)
|
|
|
|
dispatchers, ok := rawDispatchers.(*eventDispatcherSet)
|
|
|
|
|
|
|
|
if !exists || !ok {
|
|
|
|
dispatchers = newEventDispatcherSet()
|
|
|
|
b.dispatchers.Set(strNamespace, dispatchers)
|
|
|
|
}
|
|
|
|
|
|
|
|
return dispatchers
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewBus(funcs ...OptionFunc) *Bus {
|
|
|
|
opt := DefaultOption()
|
|
|
|
|
|
|
|
for _, fn := range funcs {
|
|
|
|
fn(opt)
|
|
|
|
}
|
|
|
|
|
|
|
|
return &Bus{
|
|
|
|
opt: opt,
|
|
|
|
dispatchers: cmap.New(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Check bus implementation.
|
|
|
|
var _ bus.Bus = NewBus()
|