feat: initial commit
This commit is contained in:
91
pkg/bus/memory/bus.go
Normal file
91
pkg/bus/memory/bus.go
Normal file
@ -0,0 +1,91 @@
|
||||
package memory
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/bus"
|
||||
cmap "github.com/orcaman/concurrent-map"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
type Bus struct {
|
||||
opt *Option
|
||||
dispatchers cmap.ConcurrentMap
|
||||
nextRequestID uint64
|
||||
}
|
||||
|
||||
func (b *Bus) Subscribe(ctx context.Context, ns bus.MessageNamespace) (<-chan bus.Message, error) {
|
||||
logger.Debug(
|
||||
ctx, "subscribing to messages",
|
||||
logger.F("messageNamespace", ns),
|
||||
)
|
||||
|
||||
dispatchers := b.getDispatchers(ns)
|
||||
d := newEventDispatcher(b.opt.BufferSize)
|
||||
|
||||
go d.Run()
|
||||
|
||||
dispatchers.Add(d)
|
||||
|
||||
return d.Out(), nil
|
||||
}
|
||||
|
||||
func (b *Bus) Unsubscribe(ctx context.Context, ns bus.MessageNamespace, ch <-chan bus.Message) {
|
||||
logger.Debug(
|
||||
ctx, "unsubscribing from messages",
|
||||
logger.F("messageNamespace", ns),
|
||||
)
|
||||
|
||||
dispatchers := b.getDispatchers(ns)
|
||||
dispatchers.RemoveByOutChannel(ch)
|
||||
}
|
||||
|
||||
func (b *Bus) Publish(ctx context.Context, msg bus.Message) error {
|
||||
dispatchers := b.getDispatchers(msg.MessageNamespace())
|
||||
dispatchersList := dispatchers.List()
|
||||
|
||||
logger.Debug(
|
||||
ctx, "publishing message",
|
||||
logger.F("dispatchers", len(dispatchersList)),
|
||||
logger.F("messageNamespace", msg.MessageNamespace()),
|
||||
)
|
||||
|
||||
for _, d := range dispatchersList {
|
||||
if err := d.In(msg); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *Bus) getDispatchers(namespace bus.MessageNamespace) *eventDispatcherSet {
|
||||
strNamespace := string(namespace)
|
||||
|
||||
rawDispatchers, exists := b.dispatchers.Get(strNamespace)
|
||||
dispatchers, ok := rawDispatchers.(*eventDispatcherSet)
|
||||
|
||||
if !exists || !ok {
|
||||
dispatchers = newEventDispatcherSet()
|
||||
b.dispatchers.Set(strNamespace, dispatchers)
|
||||
}
|
||||
|
||||
return dispatchers
|
||||
}
|
||||
|
||||
func NewBus(funcs ...OptionFunc) *Bus {
|
||||
opt := DefaultOption()
|
||||
|
||||
for _, fn := range funcs {
|
||||
fn(opt)
|
||||
}
|
||||
|
||||
return &Bus{
|
||||
opt: opt,
|
||||
dispatchers: cmap.New(),
|
||||
}
|
||||
}
|
||||
|
||||
// Check bus implementation.
|
||||
var _ bus.Bus = NewBus()
|
29
pkg/bus/memory/bus_test.go
Normal file
29
pkg/bus/memory/bus_test.go
Normal file
@ -0,0 +1,29 @@
|
||||
package memory
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
busTesting "forge.cadoles.com/arcad/edge/pkg/bus/testing"
|
||||
)
|
||||
|
||||
func TestMemoryBus(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("Test disabled when -short flag is set")
|
||||
}
|
||||
|
||||
t.Parallel()
|
||||
|
||||
t.Run("PublishSubscribe", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := NewBus()
|
||||
busTesting.TestPublishSubscribe(t, b)
|
||||
})
|
||||
|
||||
t.Run("RequestReply", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := NewBus()
|
||||
busTesting.TestRequestReply(t, b)
|
||||
})
|
||||
}
|
117
pkg/bus/memory/event_dispatcher.go
Normal file
117
pkg/bus/memory/event_dispatcher.go
Normal file
@ -0,0 +1,117 @@
|
||||
package memory
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/bus"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
type eventDispatcherSet struct {
|
||||
mutex sync.Mutex
|
||||
items map[*eventDispatcher]struct{}
|
||||
}
|
||||
|
||||
func (s *eventDispatcherSet) Add(d *eventDispatcher) {
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
|
||||
s.items[d] = struct{}{}
|
||||
}
|
||||
|
||||
func (s *eventDispatcherSet) RemoveByOutChannel(out <-chan bus.Message) {
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
|
||||
for d := range s.items {
|
||||
if d.IsOut(out) {
|
||||
d.Close()
|
||||
delete(s.items, d)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *eventDispatcherSet) List() []*eventDispatcher {
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
|
||||
dispatchers := make([]*eventDispatcher, 0, len(s.items))
|
||||
|
||||
for d := range s.items {
|
||||
dispatchers = append(dispatchers, d)
|
||||
}
|
||||
|
||||
return dispatchers
|
||||
}
|
||||
|
||||
func newEventDispatcherSet() *eventDispatcherSet {
|
||||
return &eventDispatcherSet{
|
||||
items: make(map[*eventDispatcher]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
type eventDispatcher struct {
|
||||
in chan bus.Message
|
||||
out chan bus.Message
|
||||
mutex sync.RWMutex
|
||||
closed bool
|
||||
}
|
||||
|
||||
func (d *eventDispatcher) Close() {
|
||||
d.mutex.Lock()
|
||||
defer d.mutex.Unlock()
|
||||
|
||||
d.closed = true
|
||||
close(d.in)
|
||||
}
|
||||
|
||||
func (d *eventDispatcher) In(msg bus.Message) (err error) {
|
||||
d.mutex.RLock()
|
||||
defer d.mutex.RUnlock()
|
||||
|
||||
if d.closed {
|
||||
return
|
||||
}
|
||||
|
||||
d.in <- msg
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *eventDispatcher) Out() <-chan bus.Message {
|
||||
return d.out
|
||||
}
|
||||
|
||||
func (d *eventDispatcher) IsOut(out <-chan bus.Message) bool {
|
||||
return d.out == out
|
||||
}
|
||||
|
||||
func (d *eventDispatcher) Run() {
|
||||
ctx := context.Background()
|
||||
|
||||
for {
|
||||
msg, ok := <-d.in
|
||||
if !ok {
|
||||
close(d.out)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
timeout := time.After(2 * time.Second)
|
||||
select {
|
||||
case d.out <- msg:
|
||||
case <-timeout:
|
||||
logger.Error(ctx, "message out chan timed out", logger.F("message", msg))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func newEventDispatcher(bufferSize int64) *eventDispatcher {
|
||||
return &eventDispatcher{
|
||||
in: make(chan bus.Message, bufferSize),
|
||||
out: make(chan bus.Message, bufferSize),
|
||||
closed: false,
|
||||
}
|
||||
}
|
19
pkg/bus/memory/option.go
Normal file
19
pkg/bus/memory/option.go
Normal file
@ -0,0 +1,19 @@
|
||||
package memory
|
||||
|
||||
type Option struct {
|
||||
BufferSize int64
|
||||
}
|
||||
|
||||
type OptionFunc func(*Option)
|
||||
|
||||
func DefaultOption() *Option {
|
||||
return &Option{
|
||||
BufferSize: 16, // nolint: gomnd
|
||||
}
|
||||
}
|
||||
|
||||
func WithBufferSize(size int64) OptionFunc {
|
||||
return func(o *Option) {
|
||||
o.BufferSize = size
|
||||
}
|
||||
}
|
151
pkg/bus/memory/request_reply.go
Normal file
151
pkg/bus/memory/request_reply.go
Normal file
@ -0,0 +1,151 @@
|
||||
package memory
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/bus"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
const (
|
||||
MessageNamespaceRequest bus.MessageNamespace = "reqrep/request"
|
||||
MessageNamespaceReply bus.MessageNamespace = "reqrep/reply"
|
||||
)
|
||||
|
||||
type RequestMessage struct {
|
||||
RequestID uint64
|
||||
|
||||
Message bus.Message
|
||||
|
||||
ns bus.MessageNamespace
|
||||
}
|
||||
|
||||
func (m *RequestMessage) MessageNamespace() bus.MessageNamespace {
|
||||
return m.ns
|
||||
}
|
||||
|
||||
type ReplyMessage struct {
|
||||
RequestID uint64
|
||||
Message bus.Message
|
||||
Error error
|
||||
|
||||
ns bus.MessageNamespace
|
||||
}
|
||||
|
||||
func (m *ReplyMessage) MessageNamespace() bus.MessageNamespace {
|
||||
return m.ns
|
||||
}
|
||||
|
||||
func (b *Bus) Request(ctx context.Context, msg bus.Message) (bus.Message, error) {
|
||||
requestID := atomic.AddUint64(&b.nextRequestID, 1)
|
||||
|
||||
req := &RequestMessage{
|
||||
RequestID: requestID,
|
||||
Message: msg,
|
||||
ns: msg.MessageNamespace(),
|
||||
}
|
||||
|
||||
replyNamespace := createReplyNamespace(requestID)
|
||||
|
||||
replies, err := b.Subscribe(ctx, replyNamespace)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
b.Unsubscribe(ctx, replyNamespace, replies)
|
||||
}()
|
||||
|
||||
logger.Debug(ctx, "publishing request", logger.F("request", req))
|
||||
|
||||
if err := b.Publish(ctx, req); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, errors.WithStack(ctx.Err())
|
||||
|
||||
case msg, ok := <-replies:
|
||||
if !ok {
|
||||
return nil, errors.WithStack(bus.ErrNoResponse)
|
||||
}
|
||||
|
||||
reply, ok := msg.(*ReplyMessage)
|
||||
if !ok {
|
||||
return nil, errors.WithStack(bus.ErrUnexpectedMessage)
|
||||
}
|
||||
|
||||
if reply.Error != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return reply.Message, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type RequestHandler func(evt bus.Message) (bus.Message, error)
|
||||
|
||||
func (b *Bus) Reply(ctx context.Context, msgNamespace bus.MessageNamespace, h bus.RequestHandler) error {
|
||||
requests, err := b.Subscribe(ctx, msgNamespace)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
b.Unsubscribe(ctx, msgNamespace, requests)
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return errors.WithStack(ctx.Err())
|
||||
|
||||
case msg, ok := <-requests:
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
request, ok := msg.(*RequestMessage)
|
||||
if !ok {
|
||||
return errors.WithStack(bus.ErrUnexpectedMessage)
|
||||
}
|
||||
|
||||
logger.Debug(ctx, "handling request", logger.F("request", request))
|
||||
|
||||
msg, err := h(request.Message)
|
||||
|
||||
reply := &ReplyMessage{
|
||||
RequestID: request.RequestID,
|
||||
Message: nil,
|
||||
Error: nil,
|
||||
|
||||
ns: createReplyNamespace(request.RequestID),
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
reply.Error = errors.WithStack(err)
|
||||
} else {
|
||||
reply.Message = msg
|
||||
}
|
||||
|
||||
logger.Debug(ctx, "publishing reply", logger.F("reply", reply))
|
||||
|
||||
if err := b.Publish(ctx, reply); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func createReplyNamespace(requestID uint64) bus.MessageNamespace {
|
||||
return bus.NewMessageNamespace(
|
||||
MessageNamespaceReply,
|
||||
bus.MessageNamespace(strconv.FormatUint(requestID, 10)),
|
||||
)
|
||||
}
|
Reference in New Issue
Block a user