edge/pkg/bus/memory/request_reply.go

184 lines
3.5 KiB
Go
Raw Normal View History

2023-02-09 12:16:36 +01:00
package memory
import (
"context"
"strconv"
"sync/atomic"
"forge.cadoles.com/arcad/edge/pkg/bus"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
const (
2023-11-28 16:35:49 +01:00
AddressRequest bus.Address = "bus/memory/request"
AddressReply bus.Address = "bus/memory/reply"
2023-02-09 12:16:36 +01:00
)
2023-11-28 16:35:49 +01:00
type RequestEnvelope struct {
requestID uint64
wrapped bus.Envelope
}
func (e *RequestEnvelope) Address() bus.Address {
return getRequestAddress(e.wrapped.Address())
}
func (e *RequestEnvelope) Message() any {
return e.wrapped.Message()
}
2023-02-09 12:16:36 +01:00
2023-11-28 16:35:49 +01:00
func (e *RequestEnvelope) RequestID() uint64 {
return e.requestID
}
2023-02-09 12:16:36 +01:00
2023-11-28 16:35:49 +01:00
func (e *RequestEnvelope) Unwrap() bus.Envelope {
return e.wrapped
2023-02-09 12:16:36 +01:00
}
2023-11-28 16:35:49 +01:00
type ReplyEnvelope struct {
requestID uint64
wrapped bus.Envelope
err error
2023-02-09 12:16:36 +01:00
}
2023-11-28 16:35:49 +01:00
func (e *ReplyEnvelope) Address() bus.Address {
return getReplyAddress(e.wrapped.Address(), e.requestID)
}
2023-02-09 12:16:36 +01:00
2023-11-28 16:35:49 +01:00
func (e *ReplyEnvelope) Message() any {
return e.wrapped.Message()
2023-02-09 12:16:36 +01:00
}
2023-11-28 16:35:49 +01:00
func (e *ReplyEnvelope) Err() error {
return e.err
2023-02-09 12:16:36 +01:00
}
2023-11-28 16:35:49 +01:00
func (e *ReplyEnvelope) Unwrap() bus.Envelope {
return e.wrapped
}
func (b *Bus) Request(ctx context.Context, env bus.Envelope) (bus.Envelope, error) {
2023-02-09 12:16:36 +01:00
requestID := atomic.AddUint64(&b.nextRequestID, 1)
2023-11-28 16:35:49 +01:00
req := &RequestEnvelope{
requestID: requestID,
wrapped: env,
2023-02-09 12:16:36 +01:00
}
2023-11-28 16:35:49 +01:00
replyAddress := getReplyAddress(env.Address(), requestID)
subCtx, cancel := context.WithCancel(ctx)
defer cancel()
2023-02-09 12:16:36 +01:00
2023-11-28 16:35:49 +01:00
replies, err := b.Subscribe(subCtx, replyAddress)
2023-02-09 12:16:36 +01:00
if err != nil {
return nil, errors.WithStack(err)
}
defer func() {
2023-11-28 16:35:49 +01:00
b.Unsubscribe(replyAddress, replies)
2023-02-09 12:16:36 +01:00
}()
logger.Debug(ctx, "publishing request", logger.F("request", req))
2023-11-28 16:35:49 +01:00
if err := b.Publish(req); err != nil {
2023-02-09 12:16:36 +01:00
return nil, errors.WithStack(err)
}
for {
select {
case <-ctx.Done():
return nil, errors.WithStack(ctx.Err())
2023-11-28 16:35:49 +01:00
case env, ok := <-replies:
2023-02-09 12:16:36 +01:00
if !ok {
return nil, errors.WithStack(bus.ErrNoResponse)
}
2023-11-28 16:35:49 +01:00
reply, ok := env.(*ReplyEnvelope)
2023-02-09 12:16:36 +01:00
if !ok {
return nil, errors.WithStack(bus.ErrUnexpectedMessage)
}
2023-11-28 16:35:49 +01:00
if err := reply.Err(); err != nil {
2023-02-09 12:16:36 +01:00
return nil, errors.WithStack(err)
}
2023-11-28 16:35:49 +01:00
return reply.Unwrap(), nil
2023-02-09 12:16:36 +01:00
}
}
}
2023-11-28 16:35:49 +01:00
func (b *Bus) Reply(ctx context.Context, address bus.Address, handler bus.RequestHandler) chan error {
requestAddress := getRequestAddress(address)
2023-02-09 12:16:36 +01:00
2023-11-28 16:35:49 +01:00
errs := make(chan error)
requests, err := b.Subscribe(ctx, requestAddress)
2023-02-09 12:16:36 +01:00
if err != nil {
2023-11-28 16:35:49 +01:00
go func() {
errs <- errors.WithStack(err)
close(errs)
}()
return errs
2023-02-09 12:16:36 +01:00
}
2023-11-28 16:35:49 +01:00
go func() {
defer func() {
b.Unsubscribe(requestAddress, requests)
close(errs)
}()
2023-02-09 12:16:36 +01:00
2023-11-28 16:35:49 +01:00
for {
select {
case <-ctx.Done():
errs <- errors.WithStack(ctx.Err())
return
2023-02-09 12:16:36 +01:00
2023-11-28 16:35:49 +01:00
case env, ok := <-requests:
if !ok {
return
}
2023-02-09 12:16:36 +01:00
2023-11-28 16:35:49 +01:00
request, ok := env.(*RequestEnvelope)
if !ok {
errs <- errors.WithStack(bus.ErrUnexpectedMessage)
continue
}
2023-02-09 12:16:36 +01:00
2023-11-28 16:35:49 +01:00
logger.Debug(ctx, "handling request", logger.F("request", request))
2023-02-09 12:16:36 +01:00
2023-11-28 16:35:49 +01:00
msg, err := handler(request.Unwrap())
2023-02-09 12:16:36 +01:00
2023-11-28 16:35:49 +01:00
reply := &ReplyEnvelope{
requestID: request.RequestID(),
wrapped: bus.NewEnvelope(request.Unwrap().Address(), msg),
}
2023-02-09 12:16:36 +01:00
2023-11-28 16:35:49 +01:00
if err != nil {
reply.err = errors.WithStack(err)
}
2023-02-09 12:16:36 +01:00
2023-11-28 16:35:49 +01:00
logger.Debug(ctx, "publishing reply", logger.F("reply", reply))
2023-02-09 12:16:36 +01:00
2023-11-28 16:35:49 +01:00
if err := b.Publish(reply); err != nil {
errs <- errors.WithStack(err)
continue
}
2023-02-09 12:16:36 +01:00
}
}
2023-11-28 16:35:49 +01:00
}()
return errs
}
func getRequestAddress(addr bus.Address) bus.Address {
return AddressRequest + "/" + addr
2023-02-09 12:16:36 +01:00
}
2023-11-28 16:35:49 +01:00
func getReplyAddress(addr bus.Address, requestID uint64) bus.Address {
return AddressReply + "/" + addr + "/" + bus.Address(strconv.FormatUint(requestID, 10))
2023-02-09 12:16:36 +01:00
}