From 054e80bbfb569730d8fbc3af8b2d87a5a5371942 Mon Sep 17 00:00:00 2001 From: William Petit Date: Thu, 6 Apr 2023 14:45:50 +0200 Subject: [PATCH] fix(storage,sqlite): prevent 'database is busy' error by using busy_timeout pragma --- cmd/cli/command/app/run.go | 2 +- pkg/module/blob/module_test.go | 2 +- pkg/module/store/module_test.go | 2 +- pkg/storage/sqlite/blob_bucket.go | 14 +++ pkg/storage/sqlite/blob_store_test.go | 5 +- pkg/storage/sqlite/document_store.go | 96 +++++---------------- pkg/storage/sqlite/document_store_test.go | 7 +- pkg/storage/sqlite/sql.go | 25 +++++- pkg/storage/sqlite/testdata/.gitignore | 2 +- pkg/storage/testsuite/blob_store.go | 2 +- pkg/storage/testsuite/document_store.go | 2 +- pkg/storage/testsuite/document_store_ops.go | 1 + 12 files changed, 75 insertions(+), 85 deletions(-) diff --git a/cmd/cli/command/app/run.go b/cmd/cli/command/app/run.go index 4ce1e8d..0129ba9 100644 --- a/cmd/cli/command/app/run.go +++ b/cmd/cli/command/app/run.go @@ -73,7 +73,7 @@ func RunCommand() *cli.Command { &cli.StringFlag{ Name: "storage-file", Usage: "use `FILE` for SQLite storage database", - Value: ".edge/%APPID%/data.sqlite", + Value: ".edge/%APPID%/data.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=60000", }, &cli.StringFlag{ Name: "accounts-file", diff --git a/pkg/module/blob/module_test.go b/pkg/module/blob/module_test.go index b979a60..e7bd736 100644 --- a/pkg/module/blob/module_test.go +++ b/pkg/module/blob/module_test.go @@ -19,7 +19,7 @@ func TestBlobModule(t *testing.T) { logger.SetLevel(slog.LevelDebug) bus := memory.NewBus() - store := sqlite.NewBlobStore(":memory:") + store := sqlite.NewBlobStore(":memory:?_pragma=foreign_keys(1)&_pragma=busy_timeout=60000") server := app.NewServer( module.ContextModuleFactory(), diff --git a/pkg/module/store/module_test.go b/pkg/module/store/module_test.go index ebc4801..b48ccb7 100644 --- a/pkg/module/store/module_test.go +++ b/pkg/module/store/module_test.go @@ -15,7 +15,7 @@ import ( func TestStoreModule(t *testing.T) { logger.SetLevel(logger.LevelDebug) - store := sqlite.NewDocumentStore(":memory:") + store := sqlite.NewDocumentStore(":memory:?_pragma=foreign_keys(1)&_pragma=busy_timeout=60000") server := app.NewServer( module.ContextModuleFactory(), module.ConsoleModuleFactory(), diff --git a/pkg/storage/sqlite/blob_bucket.go b/pkg/storage/sqlite/blob_bucket.go index 982325a..0661f81 100644 --- a/pkg/storage/sqlite/blob_bucket.go +++ b/pkg/storage/sqlite/blob_bucket.go @@ -35,6 +35,10 @@ func (b *BlobBucket) Size(ctx context.Context) (int64, error) { return errors.WithStack(err) } + if err := row.Err(); err != nil { + return errors.WithStack(err) + } + size = nullSize.Int64 return nil @@ -111,6 +115,10 @@ func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobIn return errors.WithStack(err) } + if err := row.Err(); err != nil { + return errors.WithStack(err) + } + blobInfo = &BlobInfo{ id: id, bucket: b.name, @@ -143,6 +151,12 @@ func (b *BlobBucket) List(ctx context.Context) ([]storage.BlobInfo, error) { return errors.WithStack(err) } + defer func() { + if err := rows.Close(); err != nil { + logger.Error(ctx, "could not close rows", logger.E(errors.WithStack(err))) + } + }() + blobs = make([]storage.BlobInfo, 0) for rows.Next() { diff --git a/pkg/storage/sqlite/blob_store_test.go b/pkg/storage/sqlite/blob_store_test.go index cfa5fb7..7fabd7c 100644 --- a/pkg/storage/sqlite/blob_store_test.go +++ b/pkg/storage/sqlite/blob_store_test.go @@ -1,8 +1,10 @@ package sqlite import ( + "fmt" "os" "testing" + "time" "forge.cadoles.com/arcad/edge/pkg/storage/testsuite" "github.com/pkg/errors" @@ -19,7 +21,8 @@ func TestBlobStore(t *testing.T) { t.Fatalf("%+v", errors.WithStack(err)) } - store := NewBlobStore(file) + dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds()) + store := NewBlobStore(dsn) testsuite.TestBlobStore(t, store) } diff --git a/pkg/storage/sqlite/document_store.go b/pkg/storage/sqlite/document_store.go index e25a96c..057903a 100644 --- a/pkg/storage/sqlite/document_store.go +++ b/pkg/storage/sqlite/document_store.go @@ -5,7 +5,6 @@ import ( "database/sql" "fmt" "math" - "sync" "time" "forge.cadoles.com/arcad/edge/pkg/storage" @@ -18,10 +17,7 @@ import ( ) type DocumentStore struct { - db *sql.DB - path string - openOnce sync.Once - mutex sync.RWMutex + getDB getDBFunc } // Delete implements storage.DocumentStore @@ -74,6 +70,10 @@ func (s *DocumentStore) Get(ctx context.Context, collection string, id storage.D return errors.WithStack(err) } + if err := row.Err(); err != nil { + return errors.WithStack(err) + } + document = storage.Document(data) document[storage.DocumentAttrID] = id @@ -160,7 +160,11 @@ func (s *DocumentStore) Query(ctx context.Context, collection string, filter *fi return errors.WithStack(err) } - defer rows.Close() + defer func() { + if err := rows.Close(); err != nil { + logger.Error(ctx, "could not close rows", logger.E(errors.WithStack(err))) + } + }() documents = make([]storage.Document, 0) @@ -238,6 +242,10 @@ func (s *DocumentStore) Upsert(ctx context.Context, collection string, document return errors.WithStack(err) } + if err := row.Err(); err != nil { + return errors.WithStack(err) + } + upsertedDocument = storage.Document(data) upsertedDocument[storage.DocumentAttrID] = id @@ -256,7 +264,7 @@ func (s *DocumentStore) Upsert(ctx context.Context, collection string, document func (s *DocumentStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) error { var db *sql.DB - db, err := s.getDatabase(ctx) + db, err := s.getDB(ctx) if err != nil { return errors.WithStack(err) } @@ -268,67 +276,7 @@ func (s *DocumentStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) e return nil } -func (s *DocumentStore) getDatabase(ctx context.Context) (*sql.DB, error) { - s.mutex.RLock() - if s.db != nil { - defer s.mutex.RUnlock() - - var err error - - s.openOnce.Do(func() { - if err = s.ensureTables(ctx, s.db); err != nil { - err = errors.WithStack(err) - - return - } - }) - - if err != nil { - return nil, errors.WithStack(err) - } - - return s.db, nil - } - - s.mutex.RUnlock() - - var ( - db *sql.DB - err error - ) - - s.openOnce.Do(func() { - db, err = sql.Open("sqlite", s.path) - if err != nil { - err = errors.WithStack(err) - - return - } - - if err = s.ensureTables(ctx, db); err != nil { - err = errors.WithStack(err) - - return - } - }) - - if err != nil { - return nil, errors.WithStack(err) - } - - if db != nil { - s.mutex.Lock() - s.db = db - s.mutex.Unlock() - } - - s.mutex.RLock() - defer s.mutex.RUnlock() - - return s.db, nil -} - -func (s *DocumentStore) ensureTables(ctx context.Context, db *sql.DB) error { +func ensureTables(ctx context.Context, db *sql.DB) error { err := withTx(ctx, db, func(tx *sql.Tx) error { query := ` CREATE TABLE IF NOT EXISTS documents ( @@ -396,18 +344,18 @@ func withLimitOffsetClause(query string, args []any, limit int, offset int) (str } func NewDocumentStore(path string) *DocumentStore { + getDB := newGetDBFunc(path, ensureTables) + return &DocumentStore{ - db: nil, - path: path, - openOnce: sync.Once{}, + getDB: getDB, } } func NewDocumentStoreWithDB(db *sql.DB) *DocumentStore { + getDB := newGetDBFuncFromDB(db, ensureTables) + return &DocumentStore{ - db: db, - path: "", - openOnce: sync.Once{}, + getDB: getDB, } } diff --git a/pkg/storage/sqlite/document_store_test.go b/pkg/storage/sqlite/document_store_test.go index 157f918..a4a24de 100644 --- a/pkg/storage/sqlite/document_store_test.go +++ b/pkg/storage/sqlite/document_store_test.go @@ -1,8 +1,10 @@ package sqlite import ( + "fmt" "os" "testing" + "time" "forge.cadoles.com/arcad/edge/pkg/storage/testsuite" "github.com/pkg/errors" @@ -10,7 +12,7 @@ import ( ) func TestDocumentStore(t *testing.T) { - // t.Parallel() + t.Parallel() logger.SetLevel(logger.LevelDebug) file := "./testdata/documentstore_test.sqlite" @@ -19,7 +21,8 @@ func TestDocumentStore(t *testing.T) { t.Fatalf("%+v", errors.WithStack(err)) } - store := NewDocumentStore(file) + dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds()) + store := NewDocumentStore(dsn) testsuite.TestDocumentStore(t, store) } diff --git a/pkg/storage/sqlite/sql.go b/pkg/storage/sqlite/sql.go index bd4f34b..0269a9a 100644 --- a/pkg/storage/sqlite/sql.go +++ b/pkg/storage/sqlite/sql.go @@ -8,7 +8,9 @@ import ( "github.com/pkg/errors" "gitlab.com/wpetit/goweb/logger" + "modernc.org/sqlite" _ "modernc.org/sqlite" + sqlite3 "modernc.org/sqlite/lib" ) func Open(path string) (*sql.DB, error) { @@ -38,8 +40,27 @@ func withTx(ctx context.Context, db *sql.DB, fn func(tx *sql.Tx) error) error { } }() - if err = fn(tx); err != nil { - return errors.WithStack(err) + for { + if err = fn(tx); err != nil { + var sqlErr *sqlite.Error + if errors.As(err, &sqlErr) { + if sqlErr.Code() == sqlite3.SQLITE_BUSY { + logger.Warn(ctx, "database busy, retrying transaction") + + if err := ctx.Err(); err != nil { + logger.Error(ctx, "could not execute transaction", logger.E(errors.WithStack(err))) + + return errors.WithStack(err) + } + + continue + } + } + + return errors.WithStack(err) + } + + break } if err = tx.Commit(); err != nil { diff --git a/pkg/storage/sqlite/testdata/.gitignore b/pkg/storage/sqlite/testdata/.gitignore index 972b8db..2a2bc40 100644 --- a/pkg/storage/sqlite/testdata/.gitignore +++ b/pkg/storage/sqlite/testdata/.gitignore @@ -1 +1 @@ -/*.sqlite \ No newline at end of file +/*.sqlite* \ No newline at end of file diff --git a/pkg/storage/testsuite/blob_store.go b/pkg/storage/testsuite/blob_store.go index e2b4291..b73c086 100644 --- a/pkg/storage/testsuite/blob_store.go +++ b/pkg/storage/testsuite/blob_store.go @@ -8,7 +8,7 @@ import ( func TestBlobStore(t *testing.T, store storage.BlobStore) { t.Run("Ops", func(t *testing.T) { - // t.Parallel() + t.Parallel() testBlobStoreOps(t, store) }) } diff --git a/pkg/storage/testsuite/document_store.go b/pkg/storage/testsuite/document_store.go index 8e5c8c1..cdbea14 100644 --- a/pkg/storage/testsuite/document_store.go +++ b/pkg/storage/testsuite/document_store.go @@ -8,7 +8,7 @@ import ( func TestDocumentStore(t *testing.T, store storage.DocumentStore) { t.Run("Ops", func(t *testing.T) { - // t.Parallel() + t.Parallel() testDocumentStoreOps(t, store) }) } diff --git a/pkg/storage/testsuite/document_store_ops.go b/pkg/storage/testsuite/document_store_ops.go index efc2822..7281fed 100644 --- a/pkg/storage/testsuite/document_store_ops.go +++ b/pkg/storage/testsuite/document_store_ops.go @@ -437,6 +437,7 @@ func testDocumentStoreOps(t *testing.T, store storage.DocumentStore) { for _, tc := range documentStoreOpsTestCases { func(tc documentStoreOpsTestCase) { t.Run(tc.Name, func(t *testing.T) { + t.Parallel() if err := tc.Run(context.Background(), store); err != nil { t.Errorf("%+v", errors.WithStack(err)) }