edge/pkg/bus/memory/bus.go

89 lines
1.8 KiB
Go
Raw Permalink Normal View History

2023-02-09 12:16:36 +01:00
package memory
import (
"context"
"forge.cadoles.com/arcad/edge/pkg/bus"
cmap "github.com/orcaman/concurrent-map"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
type Bus struct {
opt *Option
dispatchers cmap.ConcurrentMap
nextRequestID uint64
}
2023-11-28 16:35:49 +01:00
func (b *Bus) Subscribe(ctx context.Context, address bus.Address) (<-chan bus.Envelope, error) {
2023-02-09 12:16:36 +01:00
logger.Debug(
2023-11-28 16:35:49 +01:00
ctx, "subscribing",
logger.F("address", address),
2023-02-09 12:16:36 +01:00
)
2023-11-28 16:35:49 +01:00
dispatchers := b.getDispatchers(address)
disp := newEventDispatcher(b.opt.BufferSize)
2023-02-09 12:16:36 +01:00
go disp.Run(ctx)
2023-02-09 12:16:36 +01:00
dispatchers.Add(disp)
2023-02-09 12:16:36 +01:00
return disp.Out(), nil
2023-02-09 12:16:36 +01:00
}
2023-11-28 16:35:49 +01:00
func (b *Bus) Unsubscribe(address bus.Address, ch <-chan bus.Envelope) {
2023-02-09 12:16:36 +01:00
logger.Debug(
2023-11-28 16:35:49 +01:00
context.Background(), "unsubscribing",
logger.F("address", address),
2023-02-09 12:16:36 +01:00
)
2023-11-28 16:35:49 +01:00
dispatchers := b.getDispatchers(address)
2023-02-09 12:16:36 +01:00
dispatchers.RemoveByOutChannel(ch)
}
2023-11-28 16:35:49 +01:00
func (b *Bus) Publish(env bus.Envelope) error {
dispatchers := b.getDispatchers(env.Address())
2023-02-09 12:16:36 +01:00
logger.Debug(
2023-11-28 16:35:49 +01:00
context.Background(), "publish",
logger.F("address", env.Address()),
2023-02-09 12:16:36 +01:00
)
2023-11-28 16:35:49 +01:00
dispatchers.Range(func(d *eventDispatcher) {
if err := d.In(env); err != nil {
logger.Error(context.Background(), "could not publish message", logger.CapturedE(errors.WithStack(err)))
}
2023-11-28 16:35:49 +01:00
})
2023-02-09 12:16:36 +01:00
return nil
}
2023-11-28 16:35:49 +01:00
func (b *Bus) getDispatchers(address bus.Address) *eventDispatcherSet {
rawAddress := string(address)
2023-02-09 12:16:36 +01:00
2023-11-28 16:35:49 +01:00
rawDispatchers, exists := b.dispatchers.Get(rawAddress)
2023-02-09 12:16:36 +01:00
dispatchers, ok := rawDispatchers.(*eventDispatcherSet)
if !exists || !ok {
dispatchers = newEventDispatcherSet()
2023-11-28 16:35:49 +01:00
b.dispatchers.Set(rawAddress, dispatchers)
2023-02-09 12:16:36 +01:00
}
return dispatchers
}
func NewBus(funcs ...OptionFunc) *Bus {
opt := DefaultOption()
for _, fn := range funcs {
fn(opt)
}
return &Bus{
opt: opt,
dispatchers: cmap.New(),
}
}
// Check bus implementation.
var _ bus.Bus = NewBus()