fix(storage,sqlite): prevent 'database is busy' error by using busy_timeout pragma
This commit is contained in:
parent
32c6f0a77e
commit
054e80bbfb
|
@ -73,7 +73,7 @@ func RunCommand() *cli.Command {
|
|||
&cli.StringFlag{
|
||||
Name: "storage-file",
|
||||
Usage: "use `FILE` for SQLite storage database",
|
||||
Value: ".edge/%APPID%/data.sqlite",
|
||||
Value: ".edge/%APPID%/data.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=60000",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "accounts-file",
|
||||
|
|
|
@ -19,7 +19,7 @@ func TestBlobModule(t *testing.T) {
|
|||
logger.SetLevel(slog.LevelDebug)
|
||||
|
||||
bus := memory.NewBus()
|
||||
store := sqlite.NewBlobStore(":memory:")
|
||||
store := sqlite.NewBlobStore(":memory:?_pragma=foreign_keys(1)&_pragma=busy_timeout=60000")
|
||||
|
||||
server := app.NewServer(
|
||||
module.ContextModuleFactory(),
|
||||
|
|
|
@ -15,7 +15,7 @@ import (
|
|||
func TestStoreModule(t *testing.T) {
|
||||
logger.SetLevel(logger.LevelDebug)
|
||||
|
||||
store := sqlite.NewDocumentStore(":memory:")
|
||||
store := sqlite.NewDocumentStore(":memory:?_pragma=foreign_keys(1)&_pragma=busy_timeout=60000")
|
||||
server := app.NewServer(
|
||||
module.ContextModuleFactory(),
|
||||
module.ConsoleModuleFactory(),
|
||||
|
|
|
@ -35,6 +35,10 @@ func (b *BlobBucket) Size(ctx context.Context) (int64, error) {
|
|||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := row.Err(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
size = nullSize.Int64
|
||||
|
||||
return nil
|
||||
|
@ -111,6 +115,10 @@ func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobIn
|
|||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := row.Err(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
blobInfo = &BlobInfo{
|
||||
id: id,
|
||||
bucket: b.name,
|
||||
|
@ -143,6 +151,12 @@ func (b *BlobBucket) List(ctx context.Context) ([]storage.BlobInfo, error) {
|
|||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := rows.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close rows", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
blobs = make([]storage.BlobInfo, 0)
|
||||
|
||||
for rows.Next() {
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
package sqlite
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/testsuite"
|
||||
"github.com/pkg/errors"
|
||||
|
@ -19,7 +21,8 @@ func TestBlobStore(t *testing.T) {
|
|||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
store := NewBlobStore(file)
|
||||
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
|
||||
store := NewBlobStore(dsn)
|
||||
|
||||
testsuite.TestBlobStore(t, store)
|
||||
}
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"database/sql"
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
|
@ -18,10 +17,7 @@ import (
|
|||
)
|
||||
|
||||
type DocumentStore struct {
|
||||
db *sql.DB
|
||||
path string
|
||||
openOnce sync.Once
|
||||
mutex sync.RWMutex
|
||||
getDB getDBFunc
|
||||
}
|
||||
|
||||
// Delete implements storage.DocumentStore
|
||||
|
@ -74,6 +70,10 @@ func (s *DocumentStore) Get(ctx context.Context, collection string, id storage.D
|
|||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := row.Err(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
document = storage.Document(data)
|
||||
|
||||
document[storage.DocumentAttrID] = id
|
||||
|
@ -160,7 +160,11 @@ func (s *DocumentStore) Query(ctx context.Context, collection string, filter *fi
|
|||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer rows.Close()
|
||||
defer func() {
|
||||
if err := rows.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close rows", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
documents = make([]storage.Document, 0)
|
||||
|
||||
|
@ -238,6 +242,10 @@ func (s *DocumentStore) Upsert(ctx context.Context, collection string, document
|
|||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := row.Err(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
upsertedDocument = storage.Document(data)
|
||||
|
||||
upsertedDocument[storage.DocumentAttrID] = id
|
||||
|
@ -256,7 +264,7 @@ func (s *DocumentStore) Upsert(ctx context.Context, collection string, document
|
|||
func (s *DocumentStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) error {
|
||||
var db *sql.DB
|
||||
|
||||
db, err := s.getDatabase(ctx)
|
||||
db, err := s.getDB(ctx)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
@ -268,67 +276,7 @@ func (s *DocumentStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) e
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *DocumentStore) getDatabase(ctx context.Context) (*sql.DB, error) {
|
||||
s.mutex.RLock()
|
||||
if s.db != nil {
|
||||
defer s.mutex.RUnlock()
|
||||
|
||||
var err error
|
||||
|
||||
s.openOnce.Do(func() {
|
||||
if err = s.ensureTables(ctx, s.db); err != nil {
|
||||
err = errors.WithStack(err)
|
||||
|
||||
return
|
||||
}
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return s.db, nil
|
||||
}
|
||||
|
||||
s.mutex.RUnlock()
|
||||
|
||||
var (
|
||||
db *sql.DB
|
||||
err error
|
||||
)
|
||||
|
||||
s.openOnce.Do(func() {
|
||||
db, err = sql.Open("sqlite", s.path)
|
||||
if err != nil {
|
||||
err = errors.WithStack(err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if err = s.ensureTables(ctx, db); err != nil {
|
||||
err = errors.WithStack(err)
|
||||
|
||||
return
|
||||
}
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
if db != nil {
|
||||
s.mutex.Lock()
|
||||
s.db = db
|
||||
s.mutex.Unlock()
|
||||
}
|
||||
|
||||
s.mutex.RLock()
|
||||
defer s.mutex.RUnlock()
|
||||
|
||||
return s.db, nil
|
||||
}
|
||||
|
||||
func (s *DocumentStore) ensureTables(ctx context.Context, db *sql.DB) error {
|
||||
func ensureTables(ctx context.Context, db *sql.DB) error {
|
||||
err := withTx(ctx, db, func(tx *sql.Tx) error {
|
||||
query := `
|
||||
CREATE TABLE IF NOT EXISTS documents (
|
||||
|
@ -396,18 +344,18 @@ func withLimitOffsetClause(query string, args []any, limit int, offset int) (str
|
|||
}
|
||||
|
||||
func NewDocumentStore(path string) *DocumentStore {
|
||||
getDB := newGetDBFunc(path, ensureTables)
|
||||
|
||||
return &DocumentStore{
|
||||
db: nil,
|
||||
path: path,
|
||||
openOnce: sync.Once{},
|
||||
getDB: getDB,
|
||||
}
|
||||
}
|
||||
|
||||
func NewDocumentStoreWithDB(db *sql.DB) *DocumentStore {
|
||||
getDB := newGetDBFuncFromDB(db, ensureTables)
|
||||
|
||||
return &DocumentStore{
|
||||
db: db,
|
||||
path: "",
|
||||
openOnce: sync.Once{},
|
||||
getDB: getDB,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
package sqlite
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/testsuite"
|
||||
"github.com/pkg/errors"
|
||||
|
@ -10,7 +12,7 @@ import (
|
|||
)
|
||||
|
||||
func TestDocumentStore(t *testing.T) {
|
||||
// t.Parallel()
|
||||
t.Parallel()
|
||||
logger.SetLevel(logger.LevelDebug)
|
||||
|
||||
file := "./testdata/documentstore_test.sqlite"
|
||||
|
@ -19,7 +21,8 @@ func TestDocumentStore(t *testing.T) {
|
|||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
store := NewDocumentStore(file)
|
||||
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
|
||||
store := NewDocumentStore(dsn)
|
||||
|
||||
testsuite.TestDocumentStore(t, store)
|
||||
}
|
||||
|
|
|
@ -8,7 +8,9 @@ import (
|
|||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
|
||||
"modernc.org/sqlite"
|
||||
_ "modernc.org/sqlite"
|
||||
sqlite3 "modernc.org/sqlite/lib"
|
||||
)
|
||||
|
||||
func Open(path string) (*sql.DB, error) {
|
||||
|
@ -38,8 +40,27 @@ func withTx(ctx context.Context, db *sql.DB, fn func(tx *sql.Tx) error) error {
|
|||
}
|
||||
}()
|
||||
|
||||
if err = fn(tx); err != nil {
|
||||
return errors.WithStack(err)
|
||||
for {
|
||||
if err = fn(tx); err != nil {
|
||||
var sqlErr *sqlite.Error
|
||||
if errors.As(err, &sqlErr) {
|
||||
if sqlErr.Code() == sqlite3.SQLITE_BUSY {
|
||||
logger.Warn(ctx, "database busy, retrying transaction")
|
||||
|
||||
if err := ctx.Err(); err != nil {
|
||||
logger.Error(ctx, "could not execute transaction", logger.E(errors.WithStack(err)))
|
||||
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
if err = tx.Commit(); err != nil {
|
||||
|
|
|
@ -1 +1 @@
|
|||
/*.sqlite
|
||||
/*.sqlite*
|
|
@ -8,7 +8,7 @@ import (
|
|||
|
||||
func TestBlobStore(t *testing.T, store storage.BlobStore) {
|
||||
t.Run("Ops", func(t *testing.T) {
|
||||
// t.Parallel()
|
||||
t.Parallel()
|
||||
testBlobStoreOps(t, store)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -8,7 +8,7 @@ import (
|
|||
|
||||
func TestDocumentStore(t *testing.T, store storage.DocumentStore) {
|
||||
t.Run("Ops", func(t *testing.T) {
|
||||
// t.Parallel()
|
||||
t.Parallel()
|
||||
testDocumentStoreOps(t, store)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -437,6 +437,7 @@ func testDocumentStoreOps(t *testing.T, store storage.DocumentStore) {
|
|||
for _, tc := range documentStoreOpsTestCases {
|
||||
func(tc documentStoreOpsTestCase) {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
if err := tc.Run(context.Background(), store); err != nil {
|
||||
t.Errorf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue