package memory import ( "context" "sync" "forge.cadoles.com/arcad/edge/pkg/bus" "github.com/pkg/errors" "gitlab.com/wpetit/goweb/logger" ) type eventDispatcherSet struct { mutex sync.Mutex items map[*eventDispatcher]struct{} } func (s *eventDispatcherSet) Add(d *eventDispatcher) { s.mutex.Lock() defer s.mutex.Unlock() s.items[d] = struct{}{} } func (s *eventDispatcherSet) Remove(d *eventDispatcher) { s.mutex.Lock() defer s.mutex.Unlock() d.close() delete(s.items, d) } func (s *eventDispatcherSet) RemoveByOutChannel(out <-chan bus.Envelope) { s.mutex.Lock() defer s.mutex.Unlock() for d := range s.items { if d.IsOut(out) { d.close() delete(s.items, d) } } } func (s *eventDispatcherSet) Range(fn func(d *eventDispatcher)) { s.mutex.Lock() defer s.mutex.Unlock() for d := range s.items { if d.Closed() { s.Remove(d) continue } fn(d) } } func newEventDispatcherSet() *eventDispatcherSet { return &eventDispatcherSet{ items: make(map[*eventDispatcher]struct{}), } } type eventDispatcher struct { in chan bus.Envelope out chan bus.Envelope mutex sync.RWMutex closed bool } func (d *eventDispatcher) Closed() bool { d.mutex.RLock() defer d.mutex.RUnlock() return d.closed } func (d *eventDispatcher) Close() { d.mutex.Lock() defer d.mutex.Unlock() d.close() } func (d *eventDispatcher) close() { if d.closed { return } close(d.in) d.closed = true } func (d *eventDispatcher) In(msg bus.Envelope) (err error) { d.mutex.RLock() defer d.mutex.RUnlock() if d.closed { return } d.in <- msg return nil } func (d *eventDispatcher) Out() <-chan bus.Envelope { return d.out } func (d *eventDispatcher) IsOut(out <-chan bus.Envelope) bool { return d.out == out } func (d *eventDispatcher) Run(ctx context.Context) { defer func() { logger.Debug(ctx, "closing dispatcher, flushing out incoming messages") close(d.out) for range d.in { // Flush all incoming messages } }() for { select { case <-ctx.Done(): if err := ctx.Err(); !errors.Is(err, context.Canceled) { logger.Error( ctx, "message subscription context canceled", logger.CapturedE(errors.WithStack(err)), ) } return case msg, ok := <-d.in: if !ok { return } d.out <- msg } } } func newEventDispatcher(bufferSize int64) *eventDispatcher { return &eventDispatcher{ in: make(chan bus.Envelope, bufferSize), out: make(chan bus.Envelope, bufferSize), closed: false, } }