diff --git a/cqrs/bus.go b/cqrs/bus.go new file mode 100644 index 0000000..7868c24 --- /dev/null +++ b/cqrs/bus.go @@ -0,0 +1,92 @@ +package cqrs + +import ( + "context" +) + +type Bus struct { + reg *Registry + cmdFactory CommandFactory + cmdResultFactory CommandResultFactory + qryFactory QueryFactory + qryResultFactory QueryResultFactory +} + +func (b *Bus) Exec(ctx context.Context, req CommandRequest) (CommandResult, error) { + cmd, err := b.cmdFactory(req) + if err != nil { + return nil, err + } + + hdlr, mdlwrs, err := b.reg.MatchCommand(cmd) + if err != nil { + return nil, err + } + + if len(mdlwrs) > 0 { + for i := len(mdlwrs) - 1; i >= 0; i-- { + hdlr = mdlwrs[i](hdlr) + } + } + + if err := hdlr.Handle(ctx, cmd); err != nil { + return nil, err + } + + result, err := b.cmdResultFactory(cmd) + if err != nil { + return nil, err + } + + return result, nil +} + +func (b *Bus) Query(ctx context.Context, req QueryRequest) (QueryResult, error) { + qry, err := b.qryFactory(req) + if err != nil { + return nil, err + } + + hdlr, mdlwrs, err := b.reg.MatchQuery(qry) + if err != nil { + return nil, err + } + + if len(mdlwrs) > 0 { + for i := len(mdlwrs) - 1; i >= 0; i-- { + hdlr = mdlwrs[i](hdlr) + } + } + + data, err := hdlr.Handle(ctx, qry) + if err != nil { + return nil, err + } + + result, err := b.qryResultFactory(qry, data) + if err != nil { + return nil, err + } + + return result, nil +} + +func (b *Bus) RegisterCommand(match MatchFunc, hdlr CommandHandler, mdlwrs ...CommandMiddleware) { + b.reg.RegisterCommand(match, hdlr, mdlwrs...) +} + +func (b *Bus) RegisterQuery(match MatchFunc, hdlr QueryHandler, mdlwrs ...QueryMiddleware) { + b.reg.RegisterQuery(match, hdlr, mdlwrs...) +} + +func NewBus(funcs ...OptionFunc) *Bus { + opt := CreateOption(funcs...) + + return &Bus{ + reg: NewRegistry(), + cmdFactory: opt.CommandFactory, + cmdResultFactory: opt.CommandResultFactory, + qryFactory: opt.QueryFactory, + qryResultFactory: opt.QueryResultFactory, + } +} diff --git a/cqrs/bus_test.go b/cqrs/bus_test.go new file mode 100644 index 0000000..6093a9c --- /dev/null +++ b/cqrs/bus_test.go @@ -0,0 +1,52 @@ +package cqrs + +import ( + "context" + "testing" +) + +type testCommandRequest struct { + Foo string +} + +func TestSimpleCommandExec(t *testing.T) { + bus := NewBus() + + handlerCalled := false + + handleTestCommand := func(ctx context.Context, cmd Command) error { + handlerCalled = true + return nil + } + + bus.RegisterCommand( + MatchCommandRequest(&testCommandRequest{}), + CommandHandlerFunc(handleTestCommand), + ) + + cmd := &testCommandRequest{ + Foo: "bar", + } + ctx := context.Background() + + result, err := bus.Exec(ctx, cmd) + if err != nil { + t.Error(err) + } + + if result == nil { + t.Error("result should not be nil") + } + + if result.Command() == nil { + t.Error("result.Command() should not be nil") + } + + if e, g := result.Command().Request(), cmd; e != g { + t.Errorf("result.Command().Request(): expected '%v', got '%v'", e, g) + } + + if e, g := handlerCalled, true; e != g { + t.Errorf("handlerCalled: expected '%v', got '%v'", e, g) + } +} diff --git a/cqrs/command.go b/cqrs/command.go new file mode 100644 index 0000000..8db8021 --- /dev/null +++ b/cqrs/command.go @@ -0,0 +1,61 @@ +package cqrs + +import ( + "context" + + "github.com/google/uuid" +) + +type CommandID string + +type CommandRequest interface { +} + +type Command interface { + ID() CommandID + Request() CommandRequest +} + +type BaseCommand struct { + id CommandID + req CommandRequest +} + +func (c *BaseCommand) ID() CommandID { + return c.id +} + +func (c *BaseCommand) Request() CommandRequest { + return c.req +} + +func NewBaseCommand(req CommandRequest) *BaseCommand { + id := CommandID(uuid.New().String()) + return &BaseCommand{id, req} +} + +type CommandResult interface { + Command() Command +} + +type BaseCommandResult struct { + cmd Command +} + +func (r *BaseCommandResult) Command() Command { + return r.cmd +} + +func NewBaseCommandResult(cmd Command) *BaseCommandResult { + return &BaseCommandResult{cmd} +} + +type CommandHandlerFunc func(context.Context, Command) error + +func (h CommandHandlerFunc) Handle(ctx context.Context, cmd Command) error { + return h(ctx, cmd) +} + +type CommandHandler interface { + Handle(context.Context, Command) error +} diff --git a/cqrs/error.go b/cqrs/error.go new file mode 100644 index 0000000..f32fa4e --- /dev/null +++ b/cqrs/error.go @@ -0,0 +1,9 @@ +package cqrs + +import "errors" + +var ( + ErrInvalidRequestType = errors.New("invalid request type") + ErrHandlerNotFound = errors.New("handler not found") + ErrUnexpectedRequest = errors.New("unexpected request") +) diff --git a/cqrs/match.go b/cqrs/match.go new file mode 100644 index 0000000..2a863dd --- /dev/null +++ b/cqrs/match.go @@ -0,0 +1,33 @@ +package cqrs + +import ( + "reflect" +) + +type MatchFunc func(interface{}) (bool, error) + +func MatchCommandRequest(req interface{}) MatchFunc { + reqType := reflect.TypeOf(req) + + return func(raw interface{}) (bool, error) { + cmd, ok := raw.(Command) + if !ok { + return false, ErrInvalidRequestType + } + + return reflect.TypeOf(cmd.Request()) == reqType, nil + } +} + +func MatchQueryRequest(req interface{}) MatchFunc { + reqType := reflect.TypeOf(req) + + return func(raw interface{}) (bool, error) { + cmd, ok := raw.(Query) + if !ok { + return false, ErrInvalidRequestType + } + + return reflect.TypeOf(cmd.Request()) == reqType, nil + } +} diff --git a/cqrs/match_test.go b/cqrs/match_test.go new file mode 100644 index 0000000..2fb5c41 --- /dev/null +++ b/cqrs/match_test.go @@ -0,0 +1,42 @@ +package cqrs + +import "testing" + +type testType struct{} + +func TestMatchCommandRequestType(t *testing.T) { + match := MatchCommandRequest(&testType{}) + + cmd := NewBaseCommand(&testType{}) + + matches, err := match(cmd) + if err != nil { + t.Error(err) + } + + if e, g := true, matches; e != g { + t.Errorf("match(&testType{}): expected '%v', got '%v'", e, g) + } + + cmd = NewBaseCommand(nil) + + matches, err = match(cmd) + if err != nil { + t.Error(err) + } + + if e, g := false, matches; e != g { + t.Errorf("match(nil): expected '%v', got '%v'", e, g) + } + + cmd = NewBaseCommand("test") + + matches, err = match(cmd) + if err != nil { + t.Error(err) + } + + if e, g := false, matches; e != g { + t.Errorf("match(\"test\"): expected '%v', got '%v'", e, g) + } +} diff --git a/cqrs/middleware_test.go b/cqrs/middleware_test.go new file mode 100644 index 0000000..bfde7b1 --- /dev/null +++ b/cqrs/middleware_test.go @@ -0,0 +1,64 @@ +package cqrs + +import ( + "context" + "testing" +) + +func TestBasicCommandMiddleware(t *testing.T) { + bus := NewBus() + + handlerCalled := false + + handleTestCommand := func(ctx context.Context, cmd Command) error { + handlerCalled = true + return nil + } + + middlewareCalled := false + + commandMiddleware := func(next CommandHandler) CommandHandler { + fn := func(ctx context.Context, cmd Command) error { + middlewareCalled = true + return next.Handle(ctx, cmd) + } + + return CommandHandlerFunc(fn) + } + + bus.RegisterCommand( + MatchCommandRequest(&testCommandRequest{}), + CommandHandlerFunc(handleTestCommand), + commandMiddleware, + ) + + cmd := &testCommandRequest{ + Foo: "bar", + } + ctx := context.Background() + + result, err := bus.Exec(ctx, cmd) + if err != nil { + t.Error(err) + } + + if result == nil { + t.Error("result should not be nil") + } + + if result.Command() == nil { + t.Error("result.Command() should not be nil") + } + + if e, g := result.Command().Request(), cmd; e != g { + t.Errorf("result.Command().Request(): expected '%v', got '%v'", e, g) + } + + if e, g := middlewareCalled, true; e != g { + t.Errorf("middlewareCalled: expected '%v', got '%v'", e, g) + } + + if e, g := handlerCalled, true; e != g { + t.Errorf("handlerCalled: expected '%v', got '%v'", e, g) + } +} diff --git a/cqrs/option.go b/cqrs/option.go new file mode 100644 index 0000000..2a3cb75 --- /dev/null +++ b/cqrs/option.go @@ -0,0 +1,86 @@ +package cqrs + +type Option struct { + CommandFactory CommandFactory + QueryFactory QueryFactory + CommandResultFactory CommandResultFactory + QueryResultFactory QueryResultFactory +} + +type CommandFactory func(CommandRequest) (Command, error) +type QueryFactory func(QueryRequest) (Query, error) +type CommandResultFactory func(Command) (CommandResult, error) +type QueryResultFactory func(Query, interface{}) (QueryResult, error) + +type OptionFunc func(*Option) + +func DefaultOption() *Option { + funcs := []OptionFunc{ + WithBaseQueryFactory(), + WithBaseCommandFactory(), + WithBaseQueryResultFactory(), + WithBaseCommandResultFactory(), + } + + opt := &Option{} + + for _, fn := range funcs { + fn(opt) + } + + return opt +} + +func CreateOption(funcs ...OptionFunc) *Option { + opt := DefaultOption() + + for _, fn := range funcs { + fn(opt) + } + + return opt +} + +func WithCommandFactory(factory CommandFactory) OptionFunc { + return func(opt *Option) { + opt.CommandFactory = factory + } +} + +func WithQueryFactory(factory QueryFactory) OptionFunc { + return func(opt *Option) { + opt.QueryFactory = factory + } +} + +func WithBaseQueryFactory() OptionFunc { + return func(opt *Option) { + opt.QueryFactory = func(req QueryRequest) (Query, error) { + return NewBaseQuery(req), nil + } + } +} + +func WithBaseCommandFactory() OptionFunc { + return func(opt *Option) { + opt.CommandFactory = func(req CommandRequest) (Command, error) { + return NewBaseCommand(req), nil + } + } +} + +func WithBaseQueryResultFactory() OptionFunc { + return func(opt *Option) { + opt.QueryResultFactory = func(qry Query, data interface{}) (QueryResult, error) { + return NewBaseQueryResult(qry, data), nil + } + } +} + +func WithBaseCommandResultFactory() OptionFunc { + return func(opt *Option) { + opt.CommandResultFactory = func(cmd Command) (CommandResult, error) { + return NewBaseCommandResult(cmd), nil + } + } +} diff --git a/cqrs/provider.go b/cqrs/provider.go new file mode 100644 index 0000000..42e961f --- /dev/null +++ b/cqrs/provider.go @@ -0,0 +1,13 @@ +package cqrs + +import "gitlab.com/wpetit/goweb/service" + +// ServiceProvider returns a service.Provider for the +// the cqrs service +func ServiceProvider() service.Provider { + bus := NewBus() + + return func(container *service.Container) (interface{}, error) { + return bus, nil + } +} diff --git a/cqrs/query.go b/cqrs/query.go new file mode 100644 index 0000000..b6b48ba --- /dev/null +++ b/cqrs/query.go @@ -0,0 +1,66 @@ +package cqrs + +import ( + "context" + + "github.com/google/uuid" +) + +type QueryID string + +type QueryRequest interface{} + +type Query interface { + ID() QueryID + Request() QueryRequest +} + +type BaseQuery struct { + id QueryID + req QueryRequest +} + +func (q *BaseQuery) ID() QueryID { + return q.id +} + +func (q *BaseQuery) Request() QueryRequest { + return q.req +} + +func NewBaseQuery(req QueryRequest) *BaseQuery { + id := QueryID(uuid.New().String()) + return &BaseQuery{id, req} +} + +type QueryResult interface { + Query() Query + Data() interface{} +} + +type BaseQueryResult struct { + qry Query + data interface{} +} + +func (r *BaseQueryResult) Query() Query { + return r.qry +} + +func (r *BaseQueryResult) Data() interface{} { + return r.data +} + +func NewBaseQueryResult(qry Query, data interface{}) *BaseQueryResult { + return &BaseQueryResult{qry, data} +} + +type QueryHandlerFunc func(context.Context, Query) (interface{}, error) + +func (h QueryHandlerFunc) Handle(ctx context.Context, qry Query) (interface{}, error) { + return h(ctx, qry) +} + +type QueryHandler interface { + Handle(context.Context, Query) (interface{}, error) +} diff --git a/cqrs/registry.go b/cqrs/registry.go new file mode 100644 index 0000000..2915944 --- /dev/null +++ b/cqrs/registry.go @@ -0,0 +1,68 @@ +package cqrs + +type CommandMiddleware func(next CommandHandler) CommandHandler +type QueryMiddleware func(next QueryHandler) QueryHandler + +type Registry struct { + commands []*commandHandler + queries []*queryHandler +} + +type commandHandler struct { + Match MatchFunc + Handler CommandHandler + Middlewares []CommandMiddleware +} + +type queryHandler struct { + Match MatchFunc + Handler QueryHandler + Middlewares []QueryMiddleware +} + +func (r *Registry) MatchCommand(cmd Command) (CommandHandler, []CommandMiddleware, error) { + // TODO cache matching to avoid unnecessary ops + for _, reg := range r.commands { + matches, err := reg.Match(cmd) + if err != nil { + return nil, nil, err + } + + if matches { + return reg.Handler, reg.Middlewares, nil + } + } + + return nil, nil, ErrHandlerNotFound +} + +func (r *Registry) MatchQuery(qry Query) (QueryHandler, []QueryMiddleware, error) { + // TODO cache matching to avoid unnecessary ops + for _, reg := range r.queries { + matches, err := reg.Match(qry) + if err != nil { + return nil, nil, err + } + + if matches { + return reg.Handler, reg.Middlewares, nil + } + } + + return nil, nil, ErrHandlerNotFound +} + +func (r *Registry) RegisterCommand(match MatchFunc, hdlr CommandHandler, mdlwrs ...CommandMiddleware) { + r.commands = append(r.commands, &commandHandler{match, hdlr, mdlwrs}) +} + +func (r *Registry) RegisterQuery(match MatchFunc, hdlr QueryHandler, mdlwrs ...QueryMiddleware) { + r.queries = append(r.queries, &queryHandler{match, hdlr, mdlwrs}) +} + +func NewRegistry() *Registry { + return &Registry{ + commands: make([]*commandHandler, 0), + queries: make([]*queryHandler, 0), + } +} diff --git a/cqrs/service.go b/cqrs/service.go new file mode 100644 index 0000000..fe6d4b7 --- /dev/null +++ b/cqrs/service.go @@ -0,0 +1,34 @@ +package cqrs + +import ( + "github.com/pkg/errors" + "gitlab.com/wpetit/goweb/service" +) + +// ServiceName defines the cqrs service name +const ServiceName service.Name = "cqrs" + +// From retrieves the cqrs service in the given container +func From(container *service.Container) (*Bus, error) { + raw, err := container.Service(ServiceName) + if err != nil { + return nil, errors.Wrapf(err, "error while retrieving '%s' service", ServiceName) + } + + srv, ok := raw.(*Bus) + if !ok { + return nil, errors.Errorf("retrieved service is not a valid '%s' service", ServiceName) + } + + return srv, nil +} + +// Must retrieves the cqrs service in the given container or panic otherwise +func Must(container *service.Container) *Bus { + service, err := From(container) + if err != nil { + panic(err) + } + + return service +} diff --git a/go.mod b/go.mod index fd38668..2418211 100644 --- a/go.mod +++ b/go.mod @@ -4,9 +4,11 @@ go 1.12 require ( cdr.dev/slog v1.3.0 + github.com/davecgh/go-spew v1.1.1 github.com/go-chi/chi v4.0.2+incompatible github.com/go-playground/locales v0.12.1 // indirect github.com/go-playground/universal-translator v0.16.0 // indirect + github.com/google/uuid v1.1.1 github.com/gorilla/securecookie v1.1.1 github.com/gorilla/sessions v1.2.0 github.com/leodido/go-urn v1.1.0 // indirect diff --git a/go.sum b/go.sum index eec87db..8a15a96 100644 --- a/go.sum +++ b/go.sum @@ -68,6 +68,8 @@ github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXi github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/gorilla/csrf v1.6.0/go.mod h1:7tSf8kmjNYr7IWDCYhd3U8Ck34iQ/Yw5CJu7bAkHEGI=