Compare commits
4 Commits
2023.10.21
...
2023.10.22
Author | SHA1 | Date | |
---|---|---|---|
0d2aac41a8 | |||
38795a9767 | |||
327226aa74 | |||
fbf818e423 |
2
go.mod
2
go.mod
@ -5,7 +5,7 @@ go 1.21
|
||||
toolchain go1.21.2
|
||||
|
||||
require (
|
||||
forge.cadoles.com/arcad/edge v0.0.0-20231021194651-0cfb132b659a
|
||||
forge.cadoles.com/arcad/edge v0.0.0-20231022211802-2fc590d708ed
|
||||
github.com/Masterminds/sprig/v3 v3.2.3
|
||||
github.com/alecthomas/participle/v2 v2.0.0-beta.5
|
||||
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883
|
||||
|
4
go.sum
4
go.sum
@ -54,8 +54,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
|
||||
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
|
||||
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
|
||||
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
||||
forge.cadoles.com/arcad/edge v0.0.0-20231021194651-0cfb132b659a h1:lnd/CbsHVrkgoX0QSjCKNpqnH0yp5LEAuFZVzFpT3RI=
|
||||
forge.cadoles.com/arcad/edge v0.0.0-20231021194651-0cfb132b659a/go.mod h1:8AYyWhcvG1to3Ig+WcG3TGSs1pp7qZwsXK7tG3Py3Es=
|
||||
forge.cadoles.com/arcad/edge v0.0.0-20231022211802-2fc590d708ed h1:+tog5I64ej9mQK7WqBSJxCmpQprGNqXx4/5ORNxXK4o=
|
||||
forge.cadoles.com/arcad/edge v0.0.0-20231022211802-2fc590d708ed/go.mod h1:8AYyWhcvG1to3Ig+WcG3TGSs1pp7qZwsXK7tG3Py3Es=
|
||||
gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8=
|
||||
github.com/AdaLogics/go-fuzz-headers v0.0.0-20210715213245-6c3934b029d8/go.mod h1:CzsSbkDixRphAF5hS6wbMKq0eI6ccJRb7/A0M6JBnwg=
|
||||
github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k=
|
||||
|
@ -15,6 +15,6 @@ type DatabaseConfig struct {
|
||||
func NewDefaultDatabaseConfig() DatabaseConfig {
|
||||
return DatabaseConfig{
|
||||
Driver: "sqlite",
|
||||
DSN: "sqlite://emissary.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=60000",
|
||||
DSN: "sqlite://emissary.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=150000&_pragma=journal_mode=WAL",
|
||||
}
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/Cadoles/emissary/internal/datastore"
|
||||
@ -15,12 +16,13 @@ import (
|
||||
)
|
||||
|
||||
type AgentRepository struct {
|
||||
db *sql.DB
|
||||
db *sql.DB
|
||||
sqliteBusyRetryMaxAttempts int
|
||||
}
|
||||
|
||||
// DeleteSpec implements datastore.AgentRepository.
|
||||
func (r *AgentRepository) DeleteSpec(ctx context.Context, agentID datastore.AgentID, name string) error {
|
||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
||||
err := r.withTxRetry(ctx, func(tx *sql.Tx) error {
|
||||
exists, err := r.agentExists(ctx, tx, agentID)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
@ -49,7 +51,7 @@ func (r *AgentRepository) DeleteSpec(ctx context.Context, agentID datastore.Agen
|
||||
func (r *AgentRepository) GetSpecs(ctx context.Context, agentID datastore.AgentID) ([]*datastore.Spec, error) {
|
||||
specs := make([]*datastore.Spec, 0)
|
||||
|
||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
||||
err := r.withTxRetry(ctx, func(tx *sql.Tx) error {
|
||||
exists, err := r.agentExists(ctx, tx, agentID)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
@ -108,7 +110,7 @@ func (r *AgentRepository) GetSpecs(ctx context.Context, agentID datastore.AgentI
|
||||
func (r *AgentRepository) UpdateSpec(ctx context.Context, agentID datastore.AgentID, name string, revision int, data map[string]any) (*datastore.Spec, error) {
|
||||
spec := &datastore.Spec{}
|
||||
|
||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
||||
err := r.withTxRetry(ctx, func(tx *sql.Tx) error {
|
||||
exists, err := r.agentExists(ctx, tx, agentID)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
@ -167,7 +169,7 @@ func (r *AgentRepository) Query(ctx context.Context, opts ...datastore.AgentQuer
|
||||
agents := make([]*datastore.Agent, 0)
|
||||
count := 0
|
||||
|
||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
||||
err := r.withTxRetry(ctx, func(tx *sql.Tx) error {
|
||||
query := `SELECT id, label, thumbprint, status, contacted_at, created_at, updated_at FROM agents`
|
||||
|
||||
limit := 10
|
||||
@ -220,7 +222,7 @@ func (r *AgentRepository) Query(ctx context.Context, opts ...datastore.AgentQuer
|
||||
|
||||
logger.Debug(ctx, "executing query", logger.F("query", query), logger.F("args", args))
|
||||
|
||||
rows, err := r.db.QueryContext(ctx, query, args...)
|
||||
rows, err := tx.QueryContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
@ -272,7 +274,7 @@ func (r *AgentRepository) Query(ctx context.Context, opts ...datastore.AgentQuer
|
||||
func (r *AgentRepository) Create(ctx context.Context, thumbprint string, keySet jwk.Set, metadata map[string]any) (*datastore.Agent, error) {
|
||||
agent := &datastore.Agent{}
|
||||
|
||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
||||
err := r.withTxRetry(ctx, func(tx *sql.Tx) error {
|
||||
query := `SELECT count(id) FROM agents WHERE thumbprint = $1`
|
||||
row := tx.QueryRowContext(ctx, query, thumbprint)
|
||||
|
||||
@ -331,15 +333,15 @@ func (r *AgentRepository) Create(ctx context.Context, thumbprint string, keySet
|
||||
|
||||
// Delete implements datastore.AgentRepository
|
||||
func (r *AgentRepository) Delete(ctx context.Context, id datastore.AgentID) error {
|
||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
||||
err := r.withTxRetry(ctx, func(tx *sql.Tx) error {
|
||||
query := `DELETE FROM agents WHERE id = $1`
|
||||
_, err := r.db.ExecContext(ctx, query, id)
|
||||
_, err := tx.ExecContext(ctx, query, id)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
query = `DELETE FROM specs WHERE agent_id = $1`
|
||||
_, err = r.db.ExecContext(ctx, query, id)
|
||||
_, err = tx.ExecContext(ctx, query, id)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
@ -359,14 +361,14 @@ func (r *AgentRepository) Get(ctx context.Context, id datastore.AgentID) (*datas
|
||||
ID: id,
|
||||
}
|
||||
|
||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
||||
err := r.withTxRetry(ctx, func(tx *sql.Tx) error {
|
||||
query := `
|
||||
SELECT "id", "label", "thumbprint", "keyset", "metadata", "status", "contacted_at", "created_at", "updated_at"
|
||||
FROM agents
|
||||
WHERE id = $1
|
||||
`
|
||||
|
||||
row := r.db.QueryRowContext(ctx, query, id)
|
||||
row := tx.QueryRowContext(ctx, query, id)
|
||||
|
||||
metadata := JSONMap{}
|
||||
contactedAt := sql.NullTime{}
|
||||
@ -410,7 +412,7 @@ func (r *AgentRepository) Update(ctx context.Context, id datastore.AgentID, opts
|
||||
|
||||
agent := &datastore.Agent{}
|
||||
|
||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
||||
err := r.withTxRetry(ctx, func(tx *sql.Tx) error {
|
||||
query := `
|
||||
UPDATE agents SET id = $1
|
||||
`
|
||||
@ -534,6 +536,59 @@ func (r *AgentRepository) agentExists(ctx context.Context, tx *sql.Tx, agentID d
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (r *AgentRepository) withTxRetry(ctx context.Context, fn func(*sql.Tx) error) error {
|
||||
attempts := 0
|
||||
max := r.sqliteBusyRetryMaxAttempts
|
||||
|
||||
ctx = logger.With(ctx, logger.F("max", max))
|
||||
|
||||
var err error
|
||||
for {
|
||||
ctx = logger.With(ctx)
|
||||
|
||||
if attempts >= max {
|
||||
logger.Debug(ctx, "transaction retrying failed", logger.F("attempts", attempts))
|
||||
|
||||
return errors.Wrapf(err, "transaction failed after %d attempts", max)
|
||||
}
|
||||
|
||||
err = r.withTx(ctx, fn)
|
||||
if err != nil {
|
||||
if !strings.Contains(err.Error(), "(5) (SQLITE_BUSY)") {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
err = errors.WithStack(err)
|
||||
|
||||
logger.Warn(ctx, "database is busy", logger.E(err))
|
||||
|
||||
wait := time.Duration(8<<(attempts+1)) * time.Millisecond
|
||||
|
||||
logger.Debug(
|
||||
ctx, "database is busy, waiting before retrying transaction",
|
||||
logger.F("wait", wait.String()),
|
||||
logger.F("attempts", attempts),
|
||||
)
|
||||
|
||||
timer := time.NewTimer(wait)
|
||||
select {
|
||||
case <-timer.C:
|
||||
attempts++
|
||||
continue
|
||||
|
||||
case <-ctx.Done():
|
||||
if err := ctx.Err(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (r *AgentRepository) withTx(ctx context.Context, fn func(*sql.Tx) error) error {
|
||||
tx, err := r.db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
@ -562,8 +617,8 @@ func (r *AgentRepository) withTx(ctx context.Context, fn func(*sql.Tx) error) er
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewAgentRepository(db *sql.DB) *AgentRepository {
|
||||
return &AgentRepository{db}
|
||||
func NewAgentRepository(db *sql.DB, sqliteBusyRetryMaxAttempts int) *AgentRepository {
|
||||
return &AgentRepository{db, sqliteBusyRetryMaxAttempts}
|
||||
}
|
||||
|
||||
var _ datastore.AgentRepository = &AgentRepository{}
|
||||
|
@ -40,7 +40,7 @@ func TestSQLiteAgentRepository(t *testing.T) {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
repo := NewAgentRepository(db)
|
||||
repo := NewAgentRepository(db, 5)
|
||||
|
||||
testsuite.TestAgentRepository(t, repo)
|
||||
}
|
||||
|
@ -59,7 +59,7 @@ func NewAgentRepository(ctx context.Context, conf config.DatabaseConfig) (datast
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
agentRepository = sqlite.NewAgentRepository(db)
|
||||
agentRepository = sqlite.NewAgentRepository(db, 5)
|
||||
|
||||
default:
|
||||
return nil, errors.Errorf("unsupported database driver '%s'", driver)
|
||||
|
Reference in New Issue
Block a user