feat(storage): retry sqlite failed transaction when database is busy
arcad/edge/pipeline/head This commit looks good
Details
arcad/edge/pipeline/head This commit looks good
Details
This commit is contained in:
parent
6e4bf2f025
commit
2fc590d708
|
@ -236,7 +236,7 @@ func (b *BlobBucket) withTx(ctx context.Context, fn func(tx *sql.Tx) error) erro
|
||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := WithTx(ctx, db, fn); err != nil {
|
if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
|
||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -335,7 +335,7 @@ func (wbc *blobWriterCloser) withTx(ctx context.Context, fn func(tx *sql.Tx) err
|
||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := WithTx(ctx, db, fn); err != nil {
|
if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
|
||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -444,7 +444,7 @@ func (brc *blobReaderCloser) withTx(ctx context.Context, fn func(tx *sql.Tx) err
|
||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := WithTx(ctx, db, fn); err != nil {
|
if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
|
||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -114,7 +114,7 @@ func (s *BlobStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) error
|
||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := WithTx(ctx, db, fn); err != nil {
|
if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
|
||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,3 @@
|
||||||
|
package sqlite
|
||||||
|
|
||||||
|
const sqliteBusyMaxRetry = 5
|
|
@ -269,7 +269,7 @@ func (s *DocumentStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) e
|
||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := WithTx(ctx, db, fn); err != nil {
|
if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
|
||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -368,7 +368,7 @@ func (s *ShareStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) erro
|
||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := WithTx(ctx, db, fn); err != nil {
|
if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
|
||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,7 +3,9 @@ package sqlite
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"gitlab.com/wpetit/goweb/logger"
|
"gitlab.com/wpetit/goweb/logger"
|
||||||
|
@ -22,6 +24,58 @@ func Open(path string) (*sql.DB, error) {
|
||||||
return db, nil
|
return db, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithRetry(ctx context.Context, db *sql.DB, max int, fn func(*sql.Tx) error) error {
|
||||||
|
attempts := 0
|
||||||
|
|
||||||
|
ctx = logger.With(ctx, logger.F("max", max))
|
||||||
|
|
||||||
|
var err error
|
||||||
|
for {
|
||||||
|
ctx = logger.With(ctx)
|
||||||
|
|
||||||
|
if attempts >= max {
|
||||||
|
logger.Debug(ctx, "transaction retrying failed", logger.F("attempts", attempts))
|
||||||
|
|
||||||
|
return errors.Wrapf(err, "transaction failed after %d attempts", max)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = WithTx(ctx, db, fn)
|
||||||
|
if err != nil {
|
||||||
|
if !strings.Contains(err.Error(), "(5) (SQLITE_BUSY)") {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = errors.WithStack(err)
|
||||||
|
|
||||||
|
logger.Warn(ctx, "database is busy", logger.E(err))
|
||||||
|
|
||||||
|
wait := time.Duration(8<<(attempts+1)) * time.Millisecond
|
||||||
|
|
||||||
|
logger.Debug(
|
||||||
|
ctx, "database is busy, waiting before retrying transaction",
|
||||||
|
logger.F("wait", wait.String()),
|
||||||
|
logger.F("attempts", attempts),
|
||||||
|
)
|
||||||
|
|
||||||
|
timer := time.NewTimer(wait)
|
||||||
|
select {
|
||||||
|
case <-timer.C:
|
||||||
|
attempts++
|
||||||
|
continue
|
||||||
|
|
||||||
|
case <-ctx.Done():
|
||||||
|
if err := ctx.Err(); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func WithTx(ctx context.Context, db *sql.DB, fn func(tx *sql.Tx) error) error {
|
func WithTx(ctx context.Context, db *sql.DB, fn func(tx *sql.Tx) error) error {
|
||||||
var tx *sql.Tx
|
var tx *sql.Tx
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue