package memory import ( "context" "sync" "time" "forge.cadoles.com/arcad/edge/pkg/bus" "gitlab.com/wpetit/goweb/logger" ) type eventDispatcherSet struct { mutex sync.Mutex items map[*eventDispatcher]struct{} } func (s *eventDispatcherSet) Add(d *eventDispatcher) { s.mutex.Lock() defer s.mutex.Unlock() s.items[d] = struct{}{} } func (s *eventDispatcherSet) RemoveByOutChannel(out <-chan bus.Message) { s.mutex.Lock() defer s.mutex.Unlock() for d := range s.items { if d.IsOut(out) { d.Close() delete(s.items, d) } } } func (s *eventDispatcherSet) List() []*eventDispatcher { s.mutex.Lock() defer s.mutex.Unlock() dispatchers := make([]*eventDispatcher, 0, len(s.items)) for d := range s.items { dispatchers = append(dispatchers, d) } return dispatchers } func newEventDispatcherSet() *eventDispatcherSet { return &eventDispatcherSet{ items: make(map[*eventDispatcher]struct{}), } } type eventDispatcher struct { in chan bus.Message out chan bus.Message mutex sync.RWMutex closed bool } func (d *eventDispatcher) Close() { d.mutex.Lock() defer d.mutex.Unlock() d.closed = true close(d.in) } func (d *eventDispatcher) In(msg bus.Message) (err error) { d.mutex.RLock() defer d.mutex.RUnlock() if d.closed { return } d.in <- msg return nil } func (d *eventDispatcher) Out() <-chan bus.Message { return d.out } func (d *eventDispatcher) IsOut(out <-chan bus.Message) bool { return d.out == out } func (d *eventDispatcher) Run() { ctx := context.Background() for { msg, ok := <-d.in if !ok { close(d.out) return } timeout := time.After(2 * time.Second) select { case d.out <- msg: case <-timeout: logger.Error(ctx, "message out chan timed out", logger.F("message", msg)) } } } func newEventDispatcher(bufferSize int64) *eventDispatcher { return &eventDispatcher{ in: make(chan bus.Message, bufferSize), out: make(chan bus.Message, bufferSize), closed: false, } }