package memory import ( "sync" "forge.cadoles.com/arcad/edge/pkg/bus" ) type eventDispatcherSet struct { mutex sync.Mutex items map[*eventDispatcher]struct{} } func (s *eventDispatcherSet) Add(d *eventDispatcher) { s.mutex.Lock() defer s.mutex.Unlock() s.items[d] = struct{}{} } func (s *eventDispatcherSet) RemoveByOutChannel(out <-chan bus.Message) { s.mutex.Lock() defer s.mutex.Unlock() for d := range s.items { if d.IsOut(out) { d.Close() delete(s.items, d) } } } func (s *eventDispatcherSet) List() []*eventDispatcher { s.mutex.Lock() defer s.mutex.Unlock() dispatchers := make([]*eventDispatcher, 0, len(s.items)) for d := range s.items { dispatchers = append(dispatchers, d) } return dispatchers } func newEventDispatcherSet() *eventDispatcherSet { return &eventDispatcherSet{ items: make(map[*eventDispatcher]struct{}), } } type eventDispatcher struct { in chan bus.Message out chan bus.Message mutex sync.RWMutex closed bool } func (d *eventDispatcher) Close() { d.mutex.Lock() defer d.mutex.Unlock() d.closed = true close(d.in) } func (d *eventDispatcher) In(msg bus.Message) (err error) { d.mutex.RLock() defer d.mutex.RUnlock() if d.closed { return } d.in <- msg return nil } func (d *eventDispatcher) Out() <-chan bus.Message { return d.out } func (d *eventDispatcher) IsOut(out <-chan bus.Message) bool { return d.out == out } func (d *eventDispatcher) Run() { for { msg, ok := <-d.in if !ok { close(d.out) return } d.out <- msg } } func newEventDispatcher(bufferSize int64) *eventDispatcher { return &eventDispatcher{ in: make(chan bus.Message, bufferSize), out: make(chan bus.Message, bufferSize), closed: false, } }