package memory import ( "context" "sync" "time" "forge.cadoles.com/arcad/edge/pkg/bus" "github.com/pkg/errors" "gitlab.com/wpetit/goweb/logger" ) type eventDispatcherSet struct { mutex sync.Mutex items map[*eventDispatcher]struct{} } func (s *eventDispatcherSet) Add(d *eventDispatcher) { s.mutex.Lock() defer s.mutex.Unlock() s.items[d] = struct{}{} } func (s *eventDispatcherSet) Remove(d *eventDispatcher) { s.mutex.Lock() defer s.mutex.Unlock() d.close() delete(s.items, d) } func (s *eventDispatcherSet) RemoveByOutChannel(out <-chan bus.Message) { s.mutex.Lock() defer s.mutex.Unlock() for d := range s.items { if d.IsOut(out) { d.close() delete(s.items, d) } } } func (s *eventDispatcherSet) List() []*eventDispatcher { s.mutex.Lock() defer s.mutex.Unlock() dispatchers := make([]*eventDispatcher, 0, len(s.items)) for d := range s.items { dispatchers = append(dispatchers, d) } return dispatchers } func newEventDispatcherSet() *eventDispatcherSet { return &eventDispatcherSet{ items: make(map[*eventDispatcher]struct{}), } } type eventDispatcher struct { in chan bus.Message out chan bus.Message mutex sync.RWMutex closed bool } func (d *eventDispatcher) Closed() bool { d.mutex.RLock() defer d.mutex.RUnlock() return d.closed } func (d *eventDispatcher) Close() { d.mutex.Lock() defer d.mutex.Unlock() d.close() } func (d *eventDispatcher) close() { if d.closed { return } close(d.in) d.closed = true } func (d *eventDispatcher) In(msg bus.Message) (err error) { d.mutex.RLock() defer d.mutex.RUnlock() if d.closed { return } d.in <- msg return nil } func (d *eventDispatcher) Out() <-chan bus.Message { return d.out } func (d *eventDispatcher) IsOut(out <-chan bus.Message) bool { return d.out == out } func (d *eventDispatcher) Run(ctx context.Context) { defer func() { for { logger.Debug(ctx, "closing dispatcher, flushing out incoming messages") close(d.out) // Flush all incoming messages for { _, ok := <-d.in if !ok { return } } } }() for { msg, ok := <-d.in if !ok { return } timeout := time.After(time.Second) select { case d.out <- msg: case <-timeout: logger.Error( ctx, "out message channel timeout", logger.F("message", msg), ) return case <-ctx.Done(): logger.Error( ctx, "message subscription context canceled", logger.F("message", msg), logger.CapturedE(errors.WithStack(ctx.Err())), ) return } } } func newEventDispatcher(bufferSize int64) *eventDispatcher { return &eventDispatcher{ in: make(chan bus.Message, bufferSize), out: make(chan bus.Message, bufferSize), closed: false, } }