2023-02-09 12:16:36 +01:00
|
|
|
package memory
|
|
|
|
|
|
|
|
import (
|
2023-04-26 15:53:23 +02:00
|
|
|
"context"
|
2023-02-09 12:16:36 +01:00
|
|
|
"sync"
|
2023-04-26 15:53:23 +02:00
|
|
|
"time"
|
2023-02-09 12:16:36 +01:00
|
|
|
|
|
|
|
"forge.cadoles.com/arcad/edge/pkg/bus"
|
2023-04-26 15:53:23 +02:00
|
|
|
"github.com/pkg/errors"
|
|
|
|
"gitlab.com/wpetit/goweb/logger"
|
2023-02-09 12:16:36 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
type eventDispatcherSet struct {
|
|
|
|
mutex sync.Mutex
|
|
|
|
items map[*eventDispatcher]struct{}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *eventDispatcherSet) Add(d *eventDispatcher) {
|
|
|
|
s.mutex.Lock()
|
|
|
|
defer s.mutex.Unlock()
|
|
|
|
|
|
|
|
s.items[d] = struct{}{}
|
|
|
|
}
|
|
|
|
|
2023-04-26 15:53:23 +02:00
|
|
|
func (s *eventDispatcherSet) Remove(d *eventDispatcher) {
|
|
|
|
s.mutex.Lock()
|
|
|
|
defer s.mutex.Unlock()
|
|
|
|
|
|
|
|
d.close()
|
|
|
|
delete(s.items, d)
|
|
|
|
}
|
|
|
|
|
2023-02-09 12:16:36 +01:00
|
|
|
func (s *eventDispatcherSet) RemoveByOutChannel(out <-chan bus.Message) {
|
|
|
|
s.mutex.Lock()
|
|
|
|
defer s.mutex.Unlock()
|
|
|
|
|
|
|
|
for d := range s.items {
|
|
|
|
if d.IsOut(out) {
|
2023-04-26 15:53:23 +02:00
|
|
|
d.close()
|
2023-02-09 12:16:36 +01:00
|
|
|
delete(s.items, d)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *eventDispatcherSet) List() []*eventDispatcher {
|
|
|
|
s.mutex.Lock()
|
|
|
|
defer s.mutex.Unlock()
|
|
|
|
|
|
|
|
dispatchers := make([]*eventDispatcher, 0, len(s.items))
|
|
|
|
|
|
|
|
for d := range s.items {
|
|
|
|
dispatchers = append(dispatchers, d)
|
|
|
|
}
|
|
|
|
|
|
|
|
return dispatchers
|
|
|
|
}
|
|
|
|
|
|
|
|
func newEventDispatcherSet() *eventDispatcherSet {
|
|
|
|
return &eventDispatcherSet{
|
|
|
|
items: make(map[*eventDispatcher]struct{}),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
type eventDispatcher struct {
|
|
|
|
in chan bus.Message
|
|
|
|
out chan bus.Message
|
|
|
|
mutex sync.RWMutex
|
|
|
|
closed bool
|
|
|
|
}
|
|
|
|
|
2023-04-26 15:53:23 +02:00
|
|
|
func (d *eventDispatcher) Closed() bool {
|
|
|
|
d.mutex.RLock()
|
|
|
|
defer d.mutex.RUnlock()
|
|
|
|
|
|
|
|
return d.closed
|
|
|
|
}
|
|
|
|
|
2023-02-09 12:16:36 +01:00
|
|
|
func (d *eventDispatcher) Close() {
|
|
|
|
d.mutex.Lock()
|
|
|
|
defer d.mutex.Unlock()
|
|
|
|
|
2023-04-26 15:53:23 +02:00
|
|
|
d.close()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *eventDispatcher) close() {
|
2023-02-09 12:16:36 +01:00
|
|
|
d.closed = true
|
|
|
|
close(d.in)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *eventDispatcher) In(msg bus.Message) (err error) {
|
|
|
|
d.mutex.RLock()
|
|
|
|
defer d.mutex.RUnlock()
|
|
|
|
|
|
|
|
if d.closed {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
d.in <- msg
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *eventDispatcher) Out() <-chan bus.Message {
|
|
|
|
return d.out
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *eventDispatcher) IsOut(out <-chan bus.Message) bool {
|
|
|
|
return d.out == out
|
|
|
|
}
|
|
|
|
|
2023-04-26 15:53:23 +02:00
|
|
|
func (d *eventDispatcher) Run(ctx context.Context) {
|
|
|
|
defer func() {
|
|
|
|
for {
|
|
|
|
logger.Debug(ctx, "closing dispatcher, flushing out incoming messages")
|
|
|
|
|
|
|
|
close(d.out)
|
|
|
|
|
|
|
|
// Flush all incoming messages
|
|
|
|
for {
|
|
|
|
_, ok := <-d.in
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
2023-02-09 12:16:36 +01:00
|
|
|
for {
|
|
|
|
msg, ok := <-d.in
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2023-04-26 15:53:23 +02:00
|
|
|
timeout := time.After(time.Second)
|
|
|
|
|
|
|
|
select {
|
|
|
|
case d.out <- msg:
|
|
|
|
case <-timeout:
|
|
|
|
logger.Error(
|
|
|
|
ctx,
|
|
|
|
"out message channel timeout",
|
|
|
|
logger.F("message", msg),
|
|
|
|
)
|
|
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
logger.Error(
|
|
|
|
ctx,
|
|
|
|
"message subscription context canceled",
|
|
|
|
logger.F("message", msg),
|
2023-10-19 21:47:09 +02:00
|
|
|
logger.CapturedE(errors.WithStack(ctx.Err())),
|
2023-04-26 15:53:23 +02:00
|
|
|
)
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
2023-02-09 12:16:36 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func newEventDispatcher(bufferSize int64) *eventDispatcher {
|
|
|
|
return &eventDispatcher{
|
|
|
|
in: make(chan bus.Message, bufferSize),
|
|
|
|
out: make(chan bus.Message, bufferSize),
|
|
|
|
closed: false,
|
|
|
|
}
|
|
|
|
}
|