Compare commits
4 Commits
2023.10.21
...
2023.10.22
Author | SHA1 | Date | |
---|---|---|---|
0d2aac41a8 | |||
38795a9767 | |||
327226aa74 | |||
fbf818e423 |
2
go.mod
2
go.mod
@ -5,7 +5,7 @@ go 1.21
|
|||||||
toolchain go1.21.2
|
toolchain go1.21.2
|
||||||
|
|
||||||
require (
|
require (
|
||||||
forge.cadoles.com/arcad/edge v0.0.0-20231021194651-0cfb132b659a
|
forge.cadoles.com/arcad/edge v0.0.0-20231022211802-2fc590d708ed
|
||||||
github.com/Masterminds/sprig/v3 v3.2.3
|
github.com/Masterminds/sprig/v3 v3.2.3
|
||||||
github.com/alecthomas/participle/v2 v2.0.0-beta.5
|
github.com/alecthomas/participle/v2 v2.0.0-beta.5
|
||||||
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883
|
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883
|
||||||
|
4
go.sum
4
go.sum
@ -54,8 +54,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
|
|||||||
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
|
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
|
||||||
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
|
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
|
||||||
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
||||||
forge.cadoles.com/arcad/edge v0.0.0-20231021194651-0cfb132b659a h1:lnd/CbsHVrkgoX0QSjCKNpqnH0yp5LEAuFZVzFpT3RI=
|
forge.cadoles.com/arcad/edge v0.0.0-20231022211802-2fc590d708ed h1:+tog5I64ej9mQK7WqBSJxCmpQprGNqXx4/5ORNxXK4o=
|
||||||
forge.cadoles.com/arcad/edge v0.0.0-20231021194651-0cfb132b659a/go.mod h1:8AYyWhcvG1to3Ig+WcG3TGSs1pp7qZwsXK7tG3Py3Es=
|
forge.cadoles.com/arcad/edge v0.0.0-20231022211802-2fc590d708ed/go.mod h1:8AYyWhcvG1to3Ig+WcG3TGSs1pp7qZwsXK7tG3Py3Es=
|
||||||
gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8=
|
gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8=
|
||||||
github.com/AdaLogics/go-fuzz-headers v0.0.0-20210715213245-6c3934b029d8/go.mod h1:CzsSbkDixRphAF5hS6wbMKq0eI6ccJRb7/A0M6JBnwg=
|
github.com/AdaLogics/go-fuzz-headers v0.0.0-20210715213245-6c3934b029d8/go.mod h1:CzsSbkDixRphAF5hS6wbMKq0eI6ccJRb7/A0M6JBnwg=
|
||||||
github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k=
|
github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k=
|
||||||
|
@ -15,6 +15,6 @@ type DatabaseConfig struct {
|
|||||||
func NewDefaultDatabaseConfig() DatabaseConfig {
|
func NewDefaultDatabaseConfig() DatabaseConfig {
|
||||||
return DatabaseConfig{
|
return DatabaseConfig{
|
||||||
Driver: "sqlite",
|
Driver: "sqlite",
|
||||||
DSN: "sqlite://emissary.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=60000",
|
DSN: "sqlite://emissary.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=150000&_pragma=journal_mode=WAL",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -5,6 +5,7 @@ import (
|
|||||||
"database/sql"
|
"database/sql"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"forge.cadoles.com/Cadoles/emissary/internal/datastore"
|
"forge.cadoles.com/Cadoles/emissary/internal/datastore"
|
||||||
@ -15,12 +16,13 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type AgentRepository struct {
|
type AgentRepository struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
|
sqliteBusyRetryMaxAttempts int
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteSpec implements datastore.AgentRepository.
|
// DeleteSpec implements datastore.AgentRepository.
|
||||||
func (r *AgentRepository) DeleteSpec(ctx context.Context, agentID datastore.AgentID, name string) error {
|
func (r *AgentRepository) DeleteSpec(ctx context.Context, agentID datastore.AgentID, name string) error {
|
||||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
err := r.withTxRetry(ctx, func(tx *sql.Tx) error {
|
||||||
exists, err := r.agentExists(ctx, tx, agentID)
|
exists, err := r.agentExists(ctx, tx, agentID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
@ -49,7 +51,7 @@ func (r *AgentRepository) DeleteSpec(ctx context.Context, agentID datastore.Agen
|
|||||||
func (r *AgentRepository) GetSpecs(ctx context.Context, agentID datastore.AgentID) ([]*datastore.Spec, error) {
|
func (r *AgentRepository) GetSpecs(ctx context.Context, agentID datastore.AgentID) ([]*datastore.Spec, error) {
|
||||||
specs := make([]*datastore.Spec, 0)
|
specs := make([]*datastore.Spec, 0)
|
||||||
|
|
||||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
err := r.withTxRetry(ctx, func(tx *sql.Tx) error {
|
||||||
exists, err := r.agentExists(ctx, tx, agentID)
|
exists, err := r.agentExists(ctx, tx, agentID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
@ -108,7 +110,7 @@ func (r *AgentRepository) GetSpecs(ctx context.Context, agentID datastore.AgentI
|
|||||||
func (r *AgentRepository) UpdateSpec(ctx context.Context, agentID datastore.AgentID, name string, revision int, data map[string]any) (*datastore.Spec, error) {
|
func (r *AgentRepository) UpdateSpec(ctx context.Context, agentID datastore.AgentID, name string, revision int, data map[string]any) (*datastore.Spec, error) {
|
||||||
spec := &datastore.Spec{}
|
spec := &datastore.Spec{}
|
||||||
|
|
||||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
err := r.withTxRetry(ctx, func(tx *sql.Tx) error {
|
||||||
exists, err := r.agentExists(ctx, tx, agentID)
|
exists, err := r.agentExists(ctx, tx, agentID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
@ -167,7 +169,7 @@ func (r *AgentRepository) Query(ctx context.Context, opts ...datastore.AgentQuer
|
|||||||
agents := make([]*datastore.Agent, 0)
|
agents := make([]*datastore.Agent, 0)
|
||||||
count := 0
|
count := 0
|
||||||
|
|
||||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
err := r.withTxRetry(ctx, func(tx *sql.Tx) error {
|
||||||
query := `SELECT id, label, thumbprint, status, contacted_at, created_at, updated_at FROM agents`
|
query := `SELECT id, label, thumbprint, status, contacted_at, created_at, updated_at FROM agents`
|
||||||
|
|
||||||
limit := 10
|
limit := 10
|
||||||
@ -220,7 +222,7 @@ func (r *AgentRepository) Query(ctx context.Context, opts ...datastore.AgentQuer
|
|||||||
|
|
||||||
logger.Debug(ctx, "executing query", logger.F("query", query), logger.F("args", args))
|
logger.Debug(ctx, "executing query", logger.F("query", query), logger.F("args", args))
|
||||||
|
|
||||||
rows, err := r.db.QueryContext(ctx, query, args...)
|
rows, err := tx.QueryContext(ctx, query, args...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
@ -272,7 +274,7 @@ func (r *AgentRepository) Query(ctx context.Context, opts ...datastore.AgentQuer
|
|||||||
func (r *AgentRepository) Create(ctx context.Context, thumbprint string, keySet jwk.Set, metadata map[string]any) (*datastore.Agent, error) {
|
func (r *AgentRepository) Create(ctx context.Context, thumbprint string, keySet jwk.Set, metadata map[string]any) (*datastore.Agent, error) {
|
||||||
agent := &datastore.Agent{}
|
agent := &datastore.Agent{}
|
||||||
|
|
||||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
err := r.withTxRetry(ctx, func(tx *sql.Tx) error {
|
||||||
query := `SELECT count(id) FROM agents WHERE thumbprint = $1`
|
query := `SELECT count(id) FROM agents WHERE thumbprint = $1`
|
||||||
row := tx.QueryRowContext(ctx, query, thumbprint)
|
row := tx.QueryRowContext(ctx, query, thumbprint)
|
||||||
|
|
||||||
@ -331,15 +333,15 @@ func (r *AgentRepository) Create(ctx context.Context, thumbprint string, keySet
|
|||||||
|
|
||||||
// Delete implements datastore.AgentRepository
|
// Delete implements datastore.AgentRepository
|
||||||
func (r *AgentRepository) Delete(ctx context.Context, id datastore.AgentID) error {
|
func (r *AgentRepository) Delete(ctx context.Context, id datastore.AgentID) error {
|
||||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
err := r.withTxRetry(ctx, func(tx *sql.Tx) error {
|
||||||
query := `DELETE FROM agents WHERE id = $1`
|
query := `DELETE FROM agents WHERE id = $1`
|
||||||
_, err := r.db.ExecContext(ctx, query, id)
|
_, err := tx.ExecContext(ctx, query, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
query = `DELETE FROM specs WHERE agent_id = $1`
|
query = `DELETE FROM specs WHERE agent_id = $1`
|
||||||
_, err = r.db.ExecContext(ctx, query, id)
|
_, err = tx.ExecContext(ctx, query, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
@ -359,14 +361,14 @@ func (r *AgentRepository) Get(ctx context.Context, id datastore.AgentID) (*datas
|
|||||||
ID: id,
|
ID: id,
|
||||||
}
|
}
|
||||||
|
|
||||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
err := r.withTxRetry(ctx, func(tx *sql.Tx) error {
|
||||||
query := `
|
query := `
|
||||||
SELECT "id", "label", "thumbprint", "keyset", "metadata", "status", "contacted_at", "created_at", "updated_at"
|
SELECT "id", "label", "thumbprint", "keyset", "metadata", "status", "contacted_at", "created_at", "updated_at"
|
||||||
FROM agents
|
FROM agents
|
||||||
WHERE id = $1
|
WHERE id = $1
|
||||||
`
|
`
|
||||||
|
|
||||||
row := r.db.QueryRowContext(ctx, query, id)
|
row := tx.QueryRowContext(ctx, query, id)
|
||||||
|
|
||||||
metadata := JSONMap{}
|
metadata := JSONMap{}
|
||||||
contactedAt := sql.NullTime{}
|
contactedAt := sql.NullTime{}
|
||||||
@ -410,7 +412,7 @@ func (r *AgentRepository) Update(ctx context.Context, id datastore.AgentID, opts
|
|||||||
|
|
||||||
agent := &datastore.Agent{}
|
agent := &datastore.Agent{}
|
||||||
|
|
||||||
err := r.withTx(ctx, func(tx *sql.Tx) error {
|
err := r.withTxRetry(ctx, func(tx *sql.Tx) error {
|
||||||
query := `
|
query := `
|
||||||
UPDATE agents SET id = $1
|
UPDATE agents SET id = $1
|
||||||
`
|
`
|
||||||
@ -534,6 +536,59 @@ func (r *AgentRepository) agentExists(ctx context.Context, tx *sql.Tx, agentID d
|
|||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *AgentRepository) withTxRetry(ctx context.Context, fn func(*sql.Tx) error) error {
|
||||||
|
attempts := 0
|
||||||
|
max := r.sqliteBusyRetryMaxAttempts
|
||||||
|
|
||||||
|
ctx = logger.With(ctx, logger.F("max", max))
|
||||||
|
|
||||||
|
var err error
|
||||||
|
for {
|
||||||
|
ctx = logger.With(ctx)
|
||||||
|
|
||||||
|
if attempts >= max {
|
||||||
|
logger.Debug(ctx, "transaction retrying failed", logger.F("attempts", attempts))
|
||||||
|
|
||||||
|
return errors.Wrapf(err, "transaction failed after %d attempts", max)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = r.withTx(ctx, fn)
|
||||||
|
if err != nil {
|
||||||
|
if !strings.Contains(err.Error(), "(5) (SQLITE_BUSY)") {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = errors.WithStack(err)
|
||||||
|
|
||||||
|
logger.Warn(ctx, "database is busy", logger.E(err))
|
||||||
|
|
||||||
|
wait := time.Duration(8<<(attempts+1)) * time.Millisecond
|
||||||
|
|
||||||
|
logger.Debug(
|
||||||
|
ctx, "database is busy, waiting before retrying transaction",
|
||||||
|
logger.F("wait", wait.String()),
|
||||||
|
logger.F("attempts", attempts),
|
||||||
|
)
|
||||||
|
|
||||||
|
timer := time.NewTimer(wait)
|
||||||
|
select {
|
||||||
|
case <-timer.C:
|
||||||
|
attempts++
|
||||||
|
continue
|
||||||
|
|
||||||
|
case <-ctx.Done():
|
||||||
|
if err := ctx.Err(); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (r *AgentRepository) withTx(ctx context.Context, fn func(*sql.Tx) error) error {
|
func (r *AgentRepository) withTx(ctx context.Context, fn func(*sql.Tx) error) error {
|
||||||
tx, err := r.db.BeginTx(ctx, nil)
|
tx, err := r.db.BeginTx(ctx, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -562,8 +617,8 @@ func (r *AgentRepository) withTx(ctx context.Context, fn func(*sql.Tx) error) er
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewAgentRepository(db *sql.DB) *AgentRepository {
|
func NewAgentRepository(db *sql.DB, sqliteBusyRetryMaxAttempts int) *AgentRepository {
|
||||||
return &AgentRepository{db}
|
return &AgentRepository{db, sqliteBusyRetryMaxAttempts}
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ datastore.AgentRepository = &AgentRepository{}
|
var _ datastore.AgentRepository = &AgentRepository{}
|
||||||
|
@ -40,7 +40,7 @@ func TestSQLiteAgentRepository(t *testing.T) {
|
|||||||
t.Fatalf("%+v", errors.WithStack(err))
|
t.Fatalf("%+v", errors.WithStack(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
repo := NewAgentRepository(db)
|
repo := NewAgentRepository(db, 5)
|
||||||
|
|
||||||
testsuite.TestAgentRepository(t, repo)
|
testsuite.TestAgentRepository(t, repo)
|
||||||
}
|
}
|
||||||
|
@ -59,7 +59,7 @@ func NewAgentRepository(ctx context.Context, conf config.DatabaseConfig) (datast
|
|||||||
return nil, errors.WithStack(err)
|
return nil, errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
agentRepository = sqlite.NewAgentRepository(db)
|
agentRepository = sqlite.NewAgentRepository(db, 5)
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return nil, errors.Errorf("unsupported database driver '%s'", driver)
|
return nil, errors.Errorf("unsupported database driver '%s'", driver)
|
||||||
|
Reference in New Issue
Block a user