feat(client): tenant management commands
This commit is contained in:
@ -5,7 +5,6 @@ import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/Cadoles/emissary/internal/datastore"
|
||||
@ -16,8 +15,7 @@ import (
|
||||
)
|
||||
|
||||
type AgentRepository struct {
|
||||
db *sql.DB
|
||||
sqliteBusyRetryMaxAttempts int
|
||||
repository
|
||||
}
|
||||
|
||||
// Attach implements datastore.AgentRepository.
|
||||
@ -652,89 +650,8 @@ func (r *AgentRepository) agentExists(ctx context.Context, tx *sql.Tx, agentID d
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (r *AgentRepository) withTxRetry(ctx context.Context, fn func(*sql.Tx) error) error {
|
||||
attempts := 0
|
||||
max := r.sqliteBusyRetryMaxAttempts
|
||||
|
||||
ctx = logger.With(ctx, logger.F("max", max))
|
||||
|
||||
var err error
|
||||
for {
|
||||
ctx = logger.With(ctx)
|
||||
|
||||
if attempts >= max {
|
||||
logger.Debug(ctx, "transaction retrying failed", logger.F("attempts", attempts))
|
||||
|
||||
return errors.Wrapf(err, "transaction failed after %d attempts", max)
|
||||
}
|
||||
|
||||
err = r.withTx(ctx, fn)
|
||||
if err != nil {
|
||||
if !strings.Contains(err.Error(), "(5) (SQLITE_BUSY)") {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
err = errors.WithStack(err)
|
||||
|
||||
logger.Warn(ctx, "database is busy", logger.E(err))
|
||||
|
||||
wait := time.Duration(8<<(attempts+1)) * time.Millisecond
|
||||
|
||||
logger.Debug(
|
||||
ctx, "database is busy, waiting before retrying transaction",
|
||||
logger.F("wait", wait.String()),
|
||||
logger.F("attempts", attempts),
|
||||
)
|
||||
|
||||
timer := time.NewTimer(wait)
|
||||
select {
|
||||
case <-timer.C:
|
||||
attempts++
|
||||
continue
|
||||
|
||||
case <-ctx.Done():
|
||||
if err := ctx.Err(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (r *AgentRepository) withTx(ctx context.Context, fn func(*sql.Tx) error) error {
|
||||
tx, err := r.db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := tx.Rollback(); err != nil {
|
||||
if errors.Is(err, sql.ErrTxDone) {
|
||||
return
|
||||
}
|
||||
|
||||
err = errors.WithStack(err)
|
||||
logger.Error(ctx, "could not rollback transaction", logger.CapturedE(err))
|
||||
}
|
||||
}()
|
||||
|
||||
if err := fn(tx); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewAgentRepository(db *sql.DB, sqliteBusyRetryMaxAttempts int) *AgentRepository {
|
||||
return &AgentRepository{db, sqliteBusyRetryMaxAttempts}
|
||||
return &AgentRepository{repository{db, sqliteBusyRetryMaxAttempts}}
|
||||
}
|
||||
|
||||
var _ datastore.AgentRepository = &AgentRepository{}
|
||||
|
97
internal/datastore/sqlite/repository.go
Normal file
97
internal/datastore/sqlite/repository.go
Normal file
@ -0,0 +1,97 @@
|
||||
package sqlite
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
type repository struct {
|
||||
db *sql.DB
|
||||
sqliteBusyRetryMaxAttempts int
|
||||
}
|
||||
|
||||
func (r *repository) withTxRetry(ctx context.Context, fn func(*sql.Tx) error) error {
|
||||
attempts := 0
|
||||
max := r.sqliteBusyRetryMaxAttempts
|
||||
|
||||
ctx = logger.With(ctx, logger.F("max", max))
|
||||
|
||||
var err error
|
||||
for {
|
||||
ctx = logger.With(ctx)
|
||||
|
||||
if attempts >= max {
|
||||
logger.Debug(ctx, "transaction retrying failed", logger.F("attempts", attempts))
|
||||
|
||||
return errors.Wrapf(err, "transaction failed after %d attempts", max)
|
||||
}
|
||||
|
||||
err = r.withTx(ctx, fn)
|
||||
if err != nil {
|
||||
if !strings.Contains(err.Error(), "(5) (SQLITE_BUSY)") {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
err = errors.WithStack(err)
|
||||
|
||||
logger.Warn(ctx, "database is busy", logger.E(err))
|
||||
|
||||
wait := time.Duration(8<<(attempts+1)) * time.Millisecond
|
||||
|
||||
logger.Debug(
|
||||
ctx, "database is busy, waiting before retrying transaction",
|
||||
logger.F("wait", wait.String()),
|
||||
logger.F("attempts", attempts),
|
||||
)
|
||||
|
||||
timer := time.NewTimer(wait)
|
||||
select {
|
||||
case <-timer.C:
|
||||
attempts++
|
||||
continue
|
||||
|
||||
case <-ctx.Done():
|
||||
if err := ctx.Err(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (r *repository) withTx(ctx context.Context, fn func(*sql.Tx) error) error {
|
||||
tx, err := r.db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := tx.Rollback(); err != nil {
|
||||
if errors.Is(err, sql.ErrTxDone) {
|
||||
return
|
||||
}
|
||||
|
||||
err = errors.WithStack(err)
|
||||
logger.Error(ctx, "could not rollback transaction", logger.CapturedE(err))
|
||||
}
|
||||
}()
|
||||
|
||||
if err := fn(tx); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
202
internal/datastore/sqlite/tenant_repository.go
Normal file
202
internal/datastore/sqlite/tenant_repository.go
Normal file
@ -0,0 +1,202 @@
|
||||
package sqlite
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/Cadoles/emissary/internal/datastore"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
type TenantRepository struct {
|
||||
repository
|
||||
}
|
||||
|
||||
// Create implements datastore.TenantRepository.
|
||||
func (r *TenantRepository) Create(ctx context.Context, label string) (*datastore.Tenant, error) {
|
||||
var tenant datastore.Tenant
|
||||
|
||||
err := r.withTxRetry(ctx, func(tx *sql.Tx) error {
|
||||
now := time.Now().UTC()
|
||||
|
||||
query := `
|
||||
INSERT INTO tenants (id, label, created_at, updated_at)
|
||||
VALUES($1, $2, $3, $3)
|
||||
RETURNING "id", "label", "created_at", "updated_at"
|
||||
`
|
||||
|
||||
tenantID := datastore.NewTenantID()
|
||||
|
||||
row := tx.QueryRowContext(
|
||||
ctx, query,
|
||||
tenantID, label, now,
|
||||
)
|
||||
|
||||
if err := row.Scan(&tenant.ID, &tenant.Label, &tenant.CreatedAt, &tenant.UpdatedAt); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return &tenant, nil
|
||||
}
|
||||
|
||||
// Delete implements datastore.TenantRepository.
|
||||
func (r *TenantRepository) Delete(ctx context.Context, id datastore.TenantID) error {
|
||||
err := r.withTxRetry(ctx, func(tx *sql.Tx) error {
|
||||
if exists, err := r.tenantExists(ctx, tx, id); !exists {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
query := `DELETE FROM tenants WHERE id = $1`
|
||||
_, err := tx.ExecContext(ctx, query, id)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
query = `DELETE FROM agents WHERE tenant_id = $1`
|
||||
_, err = tx.ExecContext(ctx, query, id)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
query = `DELETE FROM specs WHERE tenant_id = $1`
|
||||
_, err = tx.ExecContext(ctx, query, id)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get implements datastore.TenantRepository.
|
||||
func (r *TenantRepository) Get(ctx context.Context, id datastore.TenantID) (*datastore.Tenant, error) {
|
||||
var tenant datastore.Tenant
|
||||
|
||||
err := r.withTxRetry(ctx, func(tx *sql.Tx) error {
|
||||
query := `
|
||||
SELECT "id", "label", "created_at", "updated_at"
|
||||
FROM tenants
|
||||
WHERE id = $1
|
||||
`
|
||||
|
||||
row := tx.QueryRowContext(ctx, query, id)
|
||||
|
||||
if err := row.Scan(&tenant.ID, &tenant.Label, &tenant.CreatedAt, &tenant.UpdatedAt); err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return errors.WithStack(datastore.ErrNotFound)
|
||||
}
|
||||
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return &tenant, nil
|
||||
}
|
||||
|
||||
// Update implements datastore.TenantRepository.
|
||||
func (r *TenantRepository) Update(ctx context.Context, id datastore.TenantID, updates ...datastore.TenantUpdateOptionFunc) (*datastore.Tenant, error) {
|
||||
options := &datastore.TenantUpdateOptions{}
|
||||
for _, fn := range updates {
|
||||
fn(options)
|
||||
}
|
||||
|
||||
var tenant datastore.Tenant
|
||||
|
||||
err := r.withTxRetry(ctx, func(tx *sql.Tx) error {
|
||||
if exists, err := r.tenantExists(ctx, tx, id); !exists {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
query := `
|
||||
UPDATE tenants SET updated_at = $1
|
||||
`
|
||||
|
||||
args := []any{id}
|
||||
index := 2
|
||||
|
||||
if options.Label != nil {
|
||||
query += fmt.Sprintf(`, label = $%d`, index)
|
||||
args = append(args, *options.Label)
|
||||
index++
|
||||
}
|
||||
|
||||
updated := options.Label != nil
|
||||
if updated {
|
||||
now := time.Now().UTC()
|
||||
query += fmt.Sprintf(`, updated_at = $%d`, index)
|
||||
args = append(args, now)
|
||||
index++
|
||||
}
|
||||
|
||||
query += `
|
||||
WHERE id = $1
|
||||
RETURNING "id", "label", "created_at", "updated_at"
|
||||
`
|
||||
|
||||
logger.Debug(ctx, "executing query", logger.F("query", query), logger.F("args", args))
|
||||
|
||||
row := tx.QueryRowContext(ctx, query, args...)
|
||||
|
||||
if err := row.Scan(&tenant.ID, &tenant.Label, &tenant.CreatedAt, &tenant.UpdatedAt); err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return errors.WithStack(datastore.ErrNotFound)
|
||||
}
|
||||
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return &tenant, nil
|
||||
}
|
||||
|
||||
func (r *TenantRepository) tenantExists(ctx context.Context, tx *sql.Tx, tenantID datastore.TenantID) (bool, error) {
|
||||
row := tx.QueryRowContext(ctx, `SELECT count(id) FROM tenants WHERE id = $1`, tenantID)
|
||||
|
||||
var count int
|
||||
|
||||
if err := row.Scan(&count); err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return false, errors.WithStack(datastore.ErrNotFound)
|
||||
}
|
||||
|
||||
return false, errors.WithStack(err)
|
||||
}
|
||||
|
||||
if count == 0 {
|
||||
return false, errors.WithStack(datastore.ErrNotFound)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func NewTenantRepository(db *sql.DB, sqliteBusyRetryMaxAttempts int) *TenantRepository {
|
||||
return &TenantRepository{
|
||||
repository: repository{db, sqliteBusyRetryMaxAttempts},
|
||||
}
|
||||
}
|
||||
|
||||
var _ datastore.TenantRepository = &TenantRepository{}
|
46
internal/datastore/sqlite/tenant_repository_test.go
Normal file
46
internal/datastore/sqlite/tenant_repository_test.go
Normal file
@ -0,0 +1,46 @@
|
||||
package sqlite
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/Cadoles/emissary/internal/datastore/testsuite"
|
||||
"forge.cadoles.com/Cadoles/emissary/internal/migrate"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
|
||||
_ "modernc.org/sqlite"
|
||||
)
|
||||
|
||||
func TestSQLiteTeantRepository(t *testing.T) {
|
||||
logger.SetLevel(logger.LevelDebug)
|
||||
|
||||
file := "testdata/tenant_repository_test.sqlite"
|
||||
|
||||
if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
|
||||
|
||||
migr, err := migrate.New("../../../migrations", "sqlite", "sqlite://"+dsn)
|
||||
if err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
if err := migr.Up(); err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
db, err := sql.Open("sqlite", dsn)
|
||||
if err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
repo := NewTenantRepository(db, 5)
|
||||
|
||||
testsuite.TestTenantRepository(t, repo)
|
||||
}
|
22
internal/datastore/tenant_repository.go
Normal file
22
internal/datastore/tenant_repository.go
Normal file
@ -0,0 +1,22 @@
|
||||
package datastore
|
||||
|
||||
import "context"
|
||||
|
||||
type TenantRepository interface {
|
||||
Create(ctx context.Context, label string) (*Tenant, error)
|
||||
Get(ctx context.Context, id TenantID) (*Tenant, error)
|
||||
Update(ctx context.Context, id TenantID, updates ...TenantUpdateOptionFunc) (*Tenant, error)
|
||||
Delete(ctx context.Context, id TenantID) error
|
||||
}
|
||||
|
||||
type TenantUpdateOptionFunc func(*TenantUpdateOptions)
|
||||
|
||||
type TenantUpdateOptions struct {
|
||||
Label *string
|
||||
}
|
||||
|
||||
func WithTenantUpdateLabel(label string) TenantUpdateOptionFunc {
|
||||
return func(opts *TenantUpdateOptions) {
|
||||
opts.Label = &label
|
||||
}
|
||||
}
|
14
internal/datastore/testsuite/tenant_repository.go
Normal file
14
internal/datastore/testsuite/tenant_repository.go
Normal file
@ -0,0 +1,14 @@
|
||||
package testsuite
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"forge.cadoles.com/Cadoles/emissary/internal/datastore"
|
||||
)
|
||||
|
||||
func TestTenantRepository(t *testing.T, repo datastore.TenantRepository) {
|
||||
t.Run("Cases", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
runTenantRepositoryTests(t, repo)
|
||||
})
|
||||
}
|
109
internal/datastore/testsuite/tenant_repository_cases.go
Normal file
109
internal/datastore/testsuite/tenant_repository_cases.go
Normal file
@ -0,0 +1,109 @@
|
||||
package testsuite
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"forge.cadoles.com/Cadoles/emissary/internal/datastore"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type tenantRepositoryTestCase struct {
|
||||
Name string
|
||||
Skip bool
|
||||
Run func(ctx context.Context, repo datastore.TenantRepository) error
|
||||
}
|
||||
|
||||
var tenantRepositoryTestCases = []tenantRepositoryTestCase{
|
||||
{
|
||||
Name: "Create a new tenant",
|
||||
Run: func(ctx context.Context, repo datastore.TenantRepository) error {
|
||||
label := "Foo"
|
||||
tenant, err := repo.Create(ctx, "Foo")
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if tenant.CreatedAt.IsZero() {
|
||||
return errors.Errorf("tenant.CreatedAt should not be zero time")
|
||||
}
|
||||
|
||||
if tenant.UpdatedAt.IsZero() {
|
||||
return errors.Errorf("tenant.UpdatedAt should not be zero time")
|
||||
}
|
||||
|
||||
if e, g := label, tenant.Label; e != g {
|
||||
return errors.Errorf("tenant.Label: expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
if tenant.ID == "" {
|
||||
return errors.Errorf("tenant.ID should not be empty")
|
||||
}
|
||||
|
||||
if _, err := datastore.ParseTenantID(string(tenant.ID)); err != nil {
|
||||
return errors.Wrapf(err, "tenant.ID should be valid")
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "Try to update an unexistant tenant",
|
||||
Run: func(ctx context.Context, repo datastore.TenantRepository) error {
|
||||
unexistantTenantID := datastore.TenantID("00000000-0000-0000-0000-000000000000")
|
||||
tenant, err := repo.Update(ctx, unexistantTenantID)
|
||||
if err == nil {
|
||||
return errors.New("error should not be nil")
|
||||
}
|
||||
|
||||
if !errors.Is(err, datastore.ErrNotFound) {
|
||||
return errors.Errorf("error should be datastore.ErrNotFound, got '%+v'", err)
|
||||
}
|
||||
|
||||
if tenant != nil {
|
||||
return errors.New("tenant should be nil")
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "Try to delete spec of an unexistant agent",
|
||||
Run: func(ctx context.Context, repo datastore.TenantRepository) error {
|
||||
unexistantTenantID := datastore.TenantID("00000000-0000-0000-0000-000000000000")
|
||||
|
||||
err := repo.Delete(ctx, unexistantTenantID)
|
||||
if err == nil {
|
||||
return errors.New("error should not be nil")
|
||||
}
|
||||
|
||||
if !errors.Is(err, datastore.ErrNotFound) {
|
||||
return errors.Errorf("error should be datastore.ErrNotFound, got '%+v'", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
func runTenantRepositoryTests(t *testing.T, repo datastore.TenantRepository) {
|
||||
for _, tc := range tenantRepositoryTestCases {
|
||||
func(tc tenantRepositoryTestCase) {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
if tc.Skip {
|
||||
t.SkipNow()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
if err := tc.Run(ctx, repo); err != nil {
|
||||
t.Errorf("%+v", errors.WithStack(err))
|
||||
}
|
||||
})
|
||||
}(tc)
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user