feat: initial commit
This commit is contained in:
50
internal/core/workflow/error.go
Normal file
50
internal/core/workflow/error.go
Normal file
@ -0,0 +1,50 @@
|
||||
package workflow
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type CompensationError struct {
|
||||
executionErr error
|
||||
compensationErrs []error
|
||||
}
|
||||
|
||||
func (e *CompensationError) ExecutionError() error {
|
||||
return e.executionErr
|
||||
}
|
||||
|
||||
func (e *CompensationError) CompensationErrors() []error {
|
||||
return e.compensationErrs
|
||||
}
|
||||
|
||||
func (e *CompensationError) Error() string {
|
||||
var sb strings.Builder
|
||||
|
||||
sb.WriteString("compensation error: ")
|
||||
sb.WriteString("execution error '")
|
||||
sb.WriteString(e.ExecutionError().Error())
|
||||
sb.WriteString("' resulted in following compensation errors: ")
|
||||
|
||||
for idx, err := range e.CompensationErrors() {
|
||||
if idx > 0 {
|
||||
sb.WriteString(", ")
|
||||
}
|
||||
|
||||
sb.WriteString("[")
|
||||
sb.WriteString(strconv.FormatInt(int64(idx), 10))
|
||||
sb.WriteString("] ")
|
||||
sb.WriteString(err.Error())
|
||||
}
|
||||
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
func NewCompensationError(executionErr error, compensationErrs ...error) *CompensationError {
|
||||
return &CompensationError{
|
||||
executionErr: executionErr,
|
||||
compensationErrs: compensationErrs,
|
||||
}
|
||||
}
|
||||
|
||||
var _ error = &CompensationError{}
|
40
internal/core/workflow/step.go
Normal file
40
internal/core/workflow/step.go
Normal file
@ -0,0 +1,40 @@
|
||||
package workflow
|
||||
|
||||
import "context"
|
||||
|
||||
type Step interface {
|
||||
Execute(ctx context.Context) error
|
||||
Compensate(ctx context.Context) error
|
||||
}
|
||||
|
||||
type step struct {
|
||||
execute func(ctx context.Context) error
|
||||
compensate func(ctx context.Context) error
|
||||
}
|
||||
|
||||
// Compensate implements Step.
|
||||
func (s *step) Compensate(ctx context.Context) error {
|
||||
if s.compensate == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return s.compensate(ctx)
|
||||
}
|
||||
|
||||
// Execute implements Step.
|
||||
func (s *step) Execute(ctx context.Context) error {
|
||||
if s.execute == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return s.execute(ctx)
|
||||
}
|
||||
|
||||
var _ Step = &step{}
|
||||
|
||||
func StepFunc(execute func(ctx context.Context) error, compensate func(ctx context.Context) error) Step {
|
||||
return &step{
|
||||
execute: execute,
|
||||
compensate: compensate,
|
||||
}
|
||||
}
|
46
internal/core/workflow/workflow.go
Normal file
46
internal/core/workflow/workflow.go
Normal file
@ -0,0 +1,46 @@
|
||||
package workflow
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type Workflow struct {
|
||||
steps []Step
|
||||
}
|
||||
|
||||
func (w *Workflow) Execute(ctx context.Context) error {
|
||||
for idx, step := range w.steps {
|
||||
if executionErr := step.Execute(ctx); executionErr != nil {
|
||||
if compensationErrs := w.compensate(ctx, idx-1); compensationErrs != nil {
|
||||
return errors.WithStack(NewCompensationError(executionErr, compensationErrs...))
|
||||
}
|
||||
|
||||
return errors.WithStack(executionErr)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Workflow) compensate(ctx context.Context, fromIndex int) []error {
|
||||
errs := make([]error, 0)
|
||||
for idx := fromIndex; idx >= 0; idx -= 1 {
|
||||
act := w.steps[idx]
|
||||
|
||||
if err := act.Compensate(ctx); err != nil {
|
||||
errs = append(errs, errors.WithStack(err))
|
||||
}
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
return errs
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func New(steps ...Step) *Workflow {
|
||||
return &Workflow{steps: steps}
|
||||
}
|
Reference in New Issue
Block a user