package memory import ( "context" "forge.cadoles.com/arcad/edge/pkg/bus" cmap "github.com/orcaman/concurrent-map" "github.com/pkg/errors" "gitlab.com/wpetit/goweb/logger" ) type Bus struct { opt *Option dispatchers cmap.ConcurrentMap nextRequestID uint64 } func (b *Bus) Subscribe(ctx context.Context, ns bus.MessageNamespace) (<-chan bus.Message, error) { logger.Debug( ctx, "subscribing to messages", logger.F("messageNamespace", ns), ) dispatchers := b.getDispatchers(ns) disp := newEventDispatcher(b.opt.BufferSize) go disp.Run(ctx) dispatchers.Add(disp) return disp.Out(), nil } func (b *Bus) Unsubscribe(ctx context.Context, ns bus.MessageNamespace, ch <-chan bus.Message) { logger.Debug( ctx, "unsubscribing from messages", logger.F("messageNamespace", ns), ) dispatchers := b.getDispatchers(ns) dispatchers.RemoveByOutChannel(ch) } func (b *Bus) Publish(ctx context.Context, msg bus.Message) error { dispatchers := b.getDispatchers(msg.MessageNamespace()) dispatchersList := dispatchers.List() logger.Debug( ctx, "publishing message", logger.F("dispatchers", len(dispatchersList)), logger.F("messageNamespace", msg.MessageNamespace()), ) for _, d := range dispatchersList { if d.Closed() { dispatchers.Remove(d) continue } if err := d.In(msg); err != nil { return errors.WithStack(err) } } return nil } func (b *Bus) getDispatchers(namespace bus.MessageNamespace) *eventDispatcherSet { strNamespace := string(namespace) rawDispatchers, exists := b.dispatchers.Get(strNamespace) dispatchers, ok := rawDispatchers.(*eventDispatcherSet) if !exists || !ok { dispatchers = newEventDispatcherSet() b.dispatchers.Set(strNamespace, dispatchers) } return dispatchers } func NewBus(funcs ...OptionFunc) *Bus { opt := DefaultOption() for _, fn := range funcs { fn(opt) } return &Bus{ opt: opt, dispatchers: cmap.New(), } } // Check bus implementation. var _ bus.Bus = NewBus()