feat(storage): retry transaction when sqlite database is busy
arcad/emissary/pipeline/head This commit looks good
Details
arcad/emissary/pipeline/head This commit looks good
Details
This commit is contained in:
parent
46a853a3f7
commit
fbf818e423
|
@ -15,6 +15,6 @@ type DatabaseConfig struct {
|
|||
func NewDefaultDatabaseConfig() DatabaseConfig {
|
||||
return DatabaseConfig{
|
||||
Driver: "sqlite",
|
||||
DSN: "sqlite://emissary.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=60000",
|
||||
DSN: "sqlite://emissary.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=150000&_pragma=journal_mode=WAL",
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/Cadoles/emissary/internal/datastore"
|
||||
|
@ -16,11 +17,12 @@ import (
|
|||
|
||||
type AgentRepository struct {
|
||||
db *sql.DB
|
||||
sqliteBusyRetryMaxAttempts int
|
||||
}
|
||||
|
||||
// DeleteSpec implements datastore.AgentRepository.
|
||||
func (r *AgentRepository) DeleteSpec(ctx context.Context, agentID datastore.AgentID, name string) error {
|
||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
||||
err := r.withTxRetry(ctx, func(tx *sql.Tx) error {
|
||||
exists, err := r.agentExists(ctx, tx, agentID)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
|
@ -49,7 +51,7 @@ func (r *AgentRepository) DeleteSpec(ctx context.Context, agentID datastore.Agen
|
|||
func (r *AgentRepository) GetSpecs(ctx context.Context, agentID datastore.AgentID) ([]*datastore.Spec, error) {
|
||||
specs := make([]*datastore.Spec, 0)
|
||||
|
||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
||||
err := r.withTxRetry(ctx, func(tx *sql.Tx) error {
|
||||
exists, err := r.agentExists(ctx, tx, agentID)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
|
@ -108,7 +110,7 @@ func (r *AgentRepository) GetSpecs(ctx context.Context, agentID datastore.AgentI
|
|||
func (r *AgentRepository) UpdateSpec(ctx context.Context, agentID datastore.AgentID, name string, revision int, data map[string]any) (*datastore.Spec, error) {
|
||||
spec := &datastore.Spec{}
|
||||
|
||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
||||
err := r.withTxRetry(ctx, func(tx *sql.Tx) error {
|
||||
exists, err := r.agentExists(ctx, tx, agentID)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
|
@ -167,7 +169,7 @@ func (r *AgentRepository) Query(ctx context.Context, opts ...datastore.AgentQuer
|
|||
agents := make([]*datastore.Agent, 0)
|
||||
count := 0
|
||||
|
||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
||||
err := r.withTxRetry(ctx, func(tx *sql.Tx) error {
|
||||
query := `SELECT id, label, thumbprint, status, contacted_at, created_at, updated_at FROM agents`
|
||||
|
||||
limit := 10
|
||||
|
@ -220,7 +222,7 @@ func (r *AgentRepository) Query(ctx context.Context, opts ...datastore.AgentQuer
|
|||
|
||||
logger.Debug(ctx, "executing query", logger.F("query", query), logger.F("args", args))
|
||||
|
||||
rows, err := r.db.QueryContext(ctx, query, args...)
|
||||
rows, err := tx.QueryContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
@ -272,7 +274,7 @@ func (r *AgentRepository) Query(ctx context.Context, opts ...datastore.AgentQuer
|
|||
func (r *AgentRepository) Create(ctx context.Context, thumbprint string, keySet jwk.Set, metadata map[string]any) (*datastore.Agent, error) {
|
||||
agent := &datastore.Agent{}
|
||||
|
||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
||||
err := r.withTxRetry(ctx, func(tx *sql.Tx) error {
|
||||
query := `SELECT count(id) FROM agents WHERE thumbprint = $1`
|
||||
row := tx.QueryRowContext(ctx, query, thumbprint)
|
||||
|
||||
|
@ -331,15 +333,15 @@ func (r *AgentRepository) Create(ctx context.Context, thumbprint string, keySet
|
|||
|
||||
// Delete implements datastore.AgentRepository
|
||||
func (r *AgentRepository) Delete(ctx context.Context, id datastore.AgentID) error {
|
||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
||||
err := r.withTxRetry(ctx, func(tx *sql.Tx) error {
|
||||
query := `DELETE FROM agents WHERE id = $1`
|
||||
_, err := r.db.ExecContext(ctx, query, id)
|
||||
_, err := tx.ExecContext(ctx, query, id)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
query = `DELETE FROM specs WHERE agent_id = $1`
|
||||
_, err = r.db.ExecContext(ctx, query, id)
|
||||
_, err = tx.ExecContext(ctx, query, id)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
@ -359,14 +361,14 @@ func (r *AgentRepository) Get(ctx context.Context, id datastore.AgentID) (*datas
|
|||
ID: id,
|
||||
}
|
||||
|
||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
||||
err := r.withTxRetry(ctx, func(tx *sql.Tx) error {
|
||||
query := `
|
||||
SELECT "id", "label", "thumbprint", "keyset", "metadata", "status", "contacted_at", "created_at", "updated_at"
|
||||
FROM agents
|
||||
WHERE id = $1
|
||||
`
|
||||
|
||||
row := r.db.QueryRowContext(ctx, query, id)
|
||||
row := tx.QueryRowContext(ctx, query, id)
|
||||
|
||||
metadata := JSONMap{}
|
||||
contactedAt := sql.NullTime{}
|
||||
|
@ -410,7 +412,7 @@ func (r *AgentRepository) Update(ctx context.Context, id datastore.AgentID, opts
|
|||
|
||||
agent := &datastore.Agent{}
|
||||
|
||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
||||
err := r.withTxRetry(ctx, func(tx *sql.Tx) error {
|
||||
query := `
|
||||
UPDATE agents SET id = $1
|
||||
`
|
||||
|
@ -534,6 +536,59 @@ func (r *AgentRepository) agentExists(ctx context.Context, tx *sql.Tx, agentID d
|
|||
return true, nil
|
||||
}
|
||||
|
||||
func (r *AgentRepository) withTxRetry(ctx context.Context, fn func(*sql.Tx) error) error {
|
||||
attempts := 0
|
||||
max := r.sqliteBusyRetryMaxAttempts
|
||||
|
||||
ctx = logger.With(ctx, logger.F("max", max))
|
||||
|
||||
var err error
|
||||
for {
|
||||
ctx = logger.With(ctx)
|
||||
|
||||
if attempts >= max {
|
||||
logger.Debug(ctx, "transaction retrying failed", logger.F("attempts", attempts))
|
||||
|
||||
return errors.Wrapf(err, "transaction failed after %d attempts", max)
|
||||
}
|
||||
|
||||
err = r.withTx(ctx, fn)
|
||||
if err != nil {
|
||||
if !strings.Contains(err.Error(), "(5) (SQLITE_BUSY)") {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
err = errors.WithStack(err)
|
||||
|
||||
logger.Warn(ctx, "database is busy", logger.E(err))
|
||||
|
||||
wait := time.Duration(8<<(attempts+1)) * time.Millisecond
|
||||
|
||||
logger.Debug(
|
||||
ctx, "database is busy, waiting before retrying transaction",
|
||||
logger.F("wait", wait.String()),
|
||||
logger.F("attempts", attempts),
|
||||
)
|
||||
|
||||
timer := time.NewTimer(wait)
|
||||
select {
|
||||
case <-timer.C:
|
||||
attempts++
|
||||
continue
|
||||
|
||||
case <-ctx.Done():
|
||||
if err := ctx.Err(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (r *AgentRepository) withTx(ctx context.Context, fn func(*sql.Tx) error) error {
|
||||
tx, err := r.db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
|
@ -562,8 +617,8 @@ func (r *AgentRepository) withTx(ctx context.Context, fn func(*sql.Tx) error) er
|
|||
return nil
|
||||
}
|
||||
|
||||
func NewAgentRepository(db *sql.DB) *AgentRepository {
|
||||
return &AgentRepository{db}
|
||||
func NewAgentRepository(db *sql.DB, sqliteBusyRetryMaxAttempts int) *AgentRepository {
|
||||
return &AgentRepository{db, sqliteBusyRetryMaxAttempts}
|
||||
}
|
||||
|
||||
var _ datastore.AgentRepository = &AgentRepository{}
|
||||
|
|
|
@ -40,7 +40,7 @@ func TestSQLiteAgentRepository(t *testing.T) {
|
|||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
repo := NewAgentRepository(db)
|
||||
repo := NewAgentRepository(db, 5)
|
||||
|
||||
testsuite.TestAgentRepository(t, repo)
|
||||
}
|
||||
|
|
|
@ -59,7 +59,7 @@ func NewAgentRepository(ctx context.Context, conf config.DatabaseConfig) (datast
|
|||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
agentRepository = sqlite.NewAgentRepository(db)
|
||||
agentRepository = sqlite.NewAgentRepository(db, 5)
|
||||
|
||||
default:
|
||||
return nil, errors.Errorf("unsupported database driver '%s'", driver)
|
||||
|
|
Loading…
Reference in New Issue