From 2fc590d708edf1b19a9327dc7fb3caf0777de649 Mon Sep 17 00:00:00 2001 From: William Petit Date: Sun, 22 Oct 2023 23:18:02 +0200 Subject: [PATCH] feat(storage): retry sqlite failed transaction when database is busy --- pkg/storage/driver/sqlite/blob_bucket.go | 6 +-- pkg/storage/driver/sqlite/blob_store.go | 2 +- pkg/storage/driver/sqlite/const.go | 3 ++ pkg/storage/driver/sqlite/document_store.go | 2 +- pkg/storage/driver/sqlite/share_store.go | 2 +- pkg/storage/driver/sqlite/sql.go | 54 +++++++++++++++++++++ 6 files changed, 63 insertions(+), 6 deletions(-) create mode 100644 pkg/storage/driver/sqlite/const.go diff --git a/pkg/storage/driver/sqlite/blob_bucket.go b/pkg/storage/driver/sqlite/blob_bucket.go index db525b6..28e8703 100644 --- a/pkg/storage/driver/sqlite/blob_bucket.go +++ b/pkg/storage/driver/sqlite/blob_bucket.go @@ -236,7 +236,7 @@ func (b *BlobBucket) withTx(ctx context.Context, fn func(tx *sql.Tx) error) erro return errors.WithStack(err) } - if err := WithTx(ctx, db, fn); err != nil { + if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil { return errors.WithStack(err) } @@ -335,7 +335,7 @@ func (wbc *blobWriterCloser) withTx(ctx context.Context, fn func(tx *sql.Tx) err return errors.WithStack(err) } - if err := WithTx(ctx, db, fn); err != nil { + if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil { return errors.WithStack(err) } @@ -444,7 +444,7 @@ func (brc *blobReaderCloser) withTx(ctx context.Context, fn func(tx *sql.Tx) err return errors.WithStack(err) } - if err := WithTx(ctx, db, fn); err != nil { + if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil { return errors.WithStack(err) } diff --git a/pkg/storage/driver/sqlite/blob_store.go b/pkg/storage/driver/sqlite/blob_store.go index 1e43304..dcfc8cc 100644 --- a/pkg/storage/driver/sqlite/blob_store.go +++ b/pkg/storage/driver/sqlite/blob_store.go @@ -114,7 +114,7 @@ func (s *BlobStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) error return errors.WithStack(err) } - if err := WithTx(ctx, db, fn); err != nil { + if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil { return errors.WithStack(err) } diff --git a/pkg/storage/driver/sqlite/const.go b/pkg/storage/driver/sqlite/const.go new file mode 100644 index 0000000..c31e989 --- /dev/null +++ b/pkg/storage/driver/sqlite/const.go @@ -0,0 +1,3 @@ +package sqlite + +const sqliteBusyMaxRetry = 5 diff --git a/pkg/storage/driver/sqlite/document_store.go b/pkg/storage/driver/sqlite/document_store.go index 2166ee1..4364229 100644 --- a/pkg/storage/driver/sqlite/document_store.go +++ b/pkg/storage/driver/sqlite/document_store.go @@ -269,7 +269,7 @@ func (s *DocumentStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) e return errors.WithStack(err) } - if err := WithTx(ctx, db, fn); err != nil { + if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil { return errors.WithStack(err) } diff --git a/pkg/storage/driver/sqlite/share_store.go b/pkg/storage/driver/sqlite/share_store.go index ee1567f..3e2e6ed 100644 --- a/pkg/storage/driver/sqlite/share_store.go +++ b/pkg/storage/driver/sqlite/share_store.go @@ -368,7 +368,7 @@ func (s *ShareStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) erro return errors.WithStack(err) } - if err := WithTx(ctx, db, fn); err != nil { + if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil { return errors.WithStack(err) } diff --git a/pkg/storage/driver/sqlite/sql.go b/pkg/storage/driver/sqlite/sql.go index 3a28caa..9ad7a49 100644 --- a/pkg/storage/driver/sqlite/sql.go +++ b/pkg/storage/driver/sqlite/sql.go @@ -3,7 +3,9 @@ package sqlite import ( "context" "database/sql" + "strings" "sync" + "time" "github.com/pkg/errors" "gitlab.com/wpetit/goweb/logger" @@ -22,6 +24,58 @@ func Open(path string) (*sql.DB, error) { return db, nil } +func WithRetry(ctx context.Context, db *sql.DB, max int, fn func(*sql.Tx) error) error { + attempts := 0 + + ctx = logger.With(ctx, logger.F("max", max)) + + var err error + for { + ctx = logger.With(ctx) + + if attempts >= max { + logger.Debug(ctx, "transaction retrying failed", logger.F("attempts", attempts)) + + return errors.Wrapf(err, "transaction failed after %d attempts", max) + } + + err = WithTx(ctx, db, fn) + if err != nil { + if !strings.Contains(err.Error(), "(5) (SQLITE_BUSY)") { + return errors.WithStack(err) + } + + err = errors.WithStack(err) + + logger.Warn(ctx, "database is busy", logger.E(err)) + + wait := time.Duration(8<<(attempts+1)) * time.Millisecond + + logger.Debug( + ctx, "database is busy, waiting before retrying transaction", + logger.F("wait", wait.String()), + logger.F("attempts", attempts), + ) + + timer := time.NewTimer(wait) + select { + case <-timer.C: + attempts++ + continue + + case <-ctx.Done(): + if err := ctx.Err(); err != nil { + return errors.WithStack(err) + } + + return nil + } + } + + return nil + } +} + func WithTx(ctx context.Context, db *sql.DB, fn func(tx *sql.Tx) error) error { var tx *sql.Tx