From fbf818e423a634ad9acdcbae8dd0eb974975ab4e Mon Sep 17 00:00:00 2001 From: William Petit Date: Sun, 22 Oct 2023 09:50:22 +0200 Subject: [PATCH] feat(storage): retry transaction when sqlite database is busy --- internal/config/database.go | 2 +- internal/datastore/sqlite/agent_repository.go | 85 +++++++++++++++---- .../datastore/sqlite/agent_repository_test.go | 2 +- internal/setup/repository.go | 2 +- 4 files changed, 73 insertions(+), 18 deletions(-) diff --git a/internal/config/database.go b/internal/config/database.go index 1714c55..23da959 100644 --- a/internal/config/database.go +++ b/internal/config/database.go @@ -15,6 +15,6 @@ type DatabaseConfig struct { func NewDefaultDatabaseConfig() DatabaseConfig { return DatabaseConfig{ Driver: "sqlite", - DSN: "sqlite://emissary.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=60000", + DSN: "sqlite://emissary.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=150000&_pragma=journal_mode=WAL", } } diff --git a/internal/datastore/sqlite/agent_repository.go b/internal/datastore/sqlite/agent_repository.go index 4107813..9b22100 100644 --- a/internal/datastore/sqlite/agent_repository.go +++ b/internal/datastore/sqlite/agent_repository.go @@ -5,6 +5,7 @@ import ( "database/sql" "encoding/json" "fmt" + "strings" "time" "forge.cadoles.com/Cadoles/emissary/internal/datastore" @@ -15,12 +16,13 @@ import ( ) type AgentRepository struct { - db *sql.DB + db *sql.DB + sqliteBusyRetryMaxAttempts int } // DeleteSpec implements datastore.AgentRepository. func (r *AgentRepository) DeleteSpec(ctx context.Context, agentID datastore.AgentID, name string) error { - err := r.withTx(ctx, func(tx *sql.Tx) error { + err := r.withTxRetry(ctx, func(tx *sql.Tx) error { exists, err := r.agentExists(ctx, tx, agentID) if err != nil { return errors.WithStack(err) @@ -49,7 +51,7 @@ func (r *AgentRepository) DeleteSpec(ctx context.Context, agentID datastore.Agen func (r *AgentRepository) GetSpecs(ctx context.Context, agentID datastore.AgentID) ([]*datastore.Spec, error) { specs := make([]*datastore.Spec, 0) - err := r.withTx(ctx, func(tx *sql.Tx) error { + err := r.withTxRetry(ctx, func(tx *sql.Tx) error { exists, err := r.agentExists(ctx, tx, agentID) if err != nil { return errors.WithStack(err) @@ -108,7 +110,7 @@ func (r *AgentRepository) GetSpecs(ctx context.Context, agentID datastore.AgentI func (r *AgentRepository) UpdateSpec(ctx context.Context, agentID datastore.AgentID, name string, revision int, data map[string]any) (*datastore.Spec, error) { spec := &datastore.Spec{} - err := r.withTx(ctx, func(tx *sql.Tx) error { + err := r.withTxRetry(ctx, func(tx *sql.Tx) error { exists, err := r.agentExists(ctx, tx, agentID) if err != nil { return errors.WithStack(err) @@ -167,7 +169,7 @@ func (r *AgentRepository) Query(ctx context.Context, opts ...datastore.AgentQuer agents := make([]*datastore.Agent, 0) count := 0 - err := r.withTx(ctx, func(tx *sql.Tx) error { + err := r.withTxRetry(ctx, func(tx *sql.Tx) error { query := `SELECT id, label, thumbprint, status, contacted_at, created_at, updated_at FROM agents` limit := 10 @@ -220,7 +222,7 @@ func (r *AgentRepository) Query(ctx context.Context, opts ...datastore.AgentQuer logger.Debug(ctx, "executing query", logger.F("query", query), logger.F("args", args)) - rows, err := r.db.QueryContext(ctx, query, args...) + rows, err := tx.QueryContext(ctx, query, args...) if err != nil { return errors.WithStack(err) } @@ -272,7 +274,7 @@ func (r *AgentRepository) Query(ctx context.Context, opts ...datastore.AgentQuer func (r *AgentRepository) Create(ctx context.Context, thumbprint string, keySet jwk.Set, metadata map[string]any) (*datastore.Agent, error) { agent := &datastore.Agent{} - err := r.withTx(ctx, func(tx *sql.Tx) error { + err := r.withTxRetry(ctx, func(tx *sql.Tx) error { query := `SELECT count(id) FROM agents WHERE thumbprint = $1` row := tx.QueryRowContext(ctx, query, thumbprint) @@ -331,15 +333,15 @@ func (r *AgentRepository) Create(ctx context.Context, thumbprint string, keySet // Delete implements datastore.AgentRepository func (r *AgentRepository) Delete(ctx context.Context, id datastore.AgentID) error { - err := r.withTx(ctx, func(tx *sql.Tx) error { + err := r.withTxRetry(ctx, func(tx *sql.Tx) error { query := `DELETE FROM agents WHERE id = $1` - _, err := r.db.ExecContext(ctx, query, id) + _, err := tx.ExecContext(ctx, query, id) if err != nil { return errors.WithStack(err) } query = `DELETE FROM specs WHERE agent_id = $1` - _, err = r.db.ExecContext(ctx, query, id) + _, err = tx.ExecContext(ctx, query, id) if err != nil { return errors.WithStack(err) } @@ -359,14 +361,14 @@ func (r *AgentRepository) Get(ctx context.Context, id datastore.AgentID) (*datas ID: id, } - err := r.withTx(ctx, func(tx *sql.Tx) error { + err := r.withTxRetry(ctx, func(tx *sql.Tx) error { query := ` SELECT "id", "label", "thumbprint", "keyset", "metadata", "status", "contacted_at", "created_at", "updated_at" FROM agents WHERE id = $1 ` - row := r.db.QueryRowContext(ctx, query, id) + row := tx.QueryRowContext(ctx, query, id) metadata := JSONMap{} contactedAt := sql.NullTime{} @@ -410,7 +412,7 @@ func (r *AgentRepository) Update(ctx context.Context, id datastore.AgentID, opts agent := &datastore.Agent{} - err := r.withTx(ctx, func(tx *sql.Tx) error { + err := r.withTxRetry(ctx, func(tx *sql.Tx) error { query := ` UPDATE agents SET id = $1 ` @@ -534,6 +536,59 @@ func (r *AgentRepository) agentExists(ctx context.Context, tx *sql.Tx, agentID d return true, nil } +func (r *AgentRepository) withTxRetry(ctx context.Context, fn func(*sql.Tx) error) error { + attempts := 0 + max := r.sqliteBusyRetryMaxAttempts + + ctx = logger.With(ctx, logger.F("max", max)) + + var err error + for { + ctx = logger.With(ctx) + + if attempts >= max { + logger.Debug(ctx, "transaction retrying failed", logger.F("attempts", attempts)) + + return errors.Wrapf(err, "transaction failed after %d attempts", max) + } + + err = r.withTx(ctx, fn) + if err != nil { + if !strings.Contains(err.Error(), "(5) (SQLITE_BUSY)") { + return errors.WithStack(err) + } + + err = errors.WithStack(err) + + logger.Warn(ctx, "database is busy", logger.E(err)) + + wait := time.Duration(8<<(attempts+1)) * time.Millisecond + + logger.Debug( + ctx, "database is busy, waiting before retrying transaction", + logger.F("wait", wait.String()), + logger.F("attempts", attempts), + ) + + timer := time.NewTimer(wait) + select { + case <-timer.C: + attempts++ + continue + + case <-ctx.Done(): + if err := ctx.Err(); err != nil { + return errors.WithStack(err) + } + + return nil + } + } + + return nil + } +} + func (r *AgentRepository) withTx(ctx context.Context, fn func(*sql.Tx) error) error { tx, err := r.db.BeginTx(ctx, nil) if err != nil { @@ -562,8 +617,8 @@ func (r *AgentRepository) withTx(ctx context.Context, fn func(*sql.Tx) error) er return nil } -func NewAgentRepository(db *sql.DB) *AgentRepository { - return &AgentRepository{db} +func NewAgentRepository(db *sql.DB, sqliteBusyRetryMaxAttempts int) *AgentRepository { + return &AgentRepository{db, sqliteBusyRetryMaxAttempts} } var _ datastore.AgentRepository = &AgentRepository{} diff --git a/internal/datastore/sqlite/agent_repository_test.go b/internal/datastore/sqlite/agent_repository_test.go index 47e70c9..ed3c3a5 100644 --- a/internal/datastore/sqlite/agent_repository_test.go +++ b/internal/datastore/sqlite/agent_repository_test.go @@ -40,7 +40,7 @@ func TestSQLiteAgentRepository(t *testing.T) { t.Fatalf("%+v", errors.WithStack(err)) } - repo := NewAgentRepository(db) + repo := NewAgentRepository(db, 5) testsuite.TestAgentRepository(t, repo) } diff --git a/internal/setup/repository.go b/internal/setup/repository.go index da824b2..611cff3 100644 --- a/internal/setup/repository.go +++ b/internal/setup/repository.go @@ -59,7 +59,7 @@ func NewAgentRepository(ctx context.Context, conf config.DatabaseConfig) (datast return nil, errors.WithStack(err) } - agentRepository = sqlite.NewAgentRepository(db) + agentRepository = sqlite.NewAgentRepository(db, 5) default: return nil, errors.Errorf("unsupported database driver '%s'", driver)