edge/pkg/bus/memory/bus.go

100 lines
2.0 KiB
Go

package memory
import (
"context"
"forge.cadoles.com/arcad/edge/pkg/bus"
cmap "github.com/orcaman/concurrent-map"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
type Bus struct {
opt *Option
dispatchers cmap.ConcurrentMap
nextRequestID uint64
}
func (b *Bus) Subscribe(ctx context.Context, address bus.Address) (<-chan bus.Envelope, error) {
logger.Debug(
ctx, "subscribing",
logger.F("address", address),
)
dispatchers := b.getDispatchers(address)
disp := newEventDispatcher(b.opt.BufferSize)
go disp.Run(ctx)
dispatchers.Add(disp)
return disp.Out(), nil
}
func (b *Bus) Unsubscribe(address bus.Address, ch <-chan bus.Envelope) {
logger.Debug(
context.Background(), "unsubscribing",
logger.F("address", address),
)
dispatchers := b.getDispatchers(address)
dispatchers.RemoveByOutChannel(ch)
}
func (b *Bus) Publish(env bus.Envelope) error {
dispatchers := b.getDispatchers(env.Address())
dispatchersList := dispatchers.List()
logger.Debug(
context.Background(), "publish",
logger.F("dispatchers", len(dispatchersList)),
logger.F("address", env.Address()),
)
for _, d := range dispatchersList {
if d.Closed() {
dispatchers.Remove(d)
continue
}
go func(d *eventDispatcher) {
if err := d.In(env); err != nil {
logger.Error(context.Background(), "could not publish message", logger.CapturedE(errors.WithStack(err)))
}
}(d)
}
return nil
}
func (b *Bus) getDispatchers(address bus.Address) *eventDispatcherSet {
rawAddress := string(address)
rawDispatchers, exists := b.dispatchers.Get(rawAddress)
dispatchers, ok := rawDispatchers.(*eventDispatcherSet)
if !exists || !ok {
dispatchers = newEventDispatcherSet()
b.dispatchers.Set(rawAddress, dispatchers)
}
return dispatchers
}
func NewBus(funcs ...OptionFunc) *Bus {
opt := DefaultOption()
for _, fn := range funcs {
fn(opt)
}
return &Bus{
opt: opt,
dispatchers: cmap.New(),
}
}
// Check bus implementation.
var _ bus.Bus = NewBus()