edge/pkg/storage/driver/sqlite/blob_bucket.go
William Petit 2fc590d708
All checks were successful
arcad/edge/pipeline/head This commit looks good
feat(storage): retry sqlite failed transaction when database is busy
2023-10-22 23:18:02 +02:00

460 lines
9.1 KiB
Go

package sqlite
import (
"bytes"
"context"
"database/sql"
"io"
"sync"
"time"
"forge.cadoles.com/arcad/edge/pkg/storage"
"github.com/gabriel-vasile/mimetype"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
type BlobBucket struct {
name string
getDB GetDBFunc
closed bool
}
// Size implements storage.BlobBucket
func (b *BlobBucket) Size(ctx context.Context) (int64, error) {
var size int64
err := b.withTx(ctx, func(tx *sql.Tx) error {
query := `SELECT SUM(size) FROM blobs WHERE bucket = $1`
row := tx.QueryRowContext(ctx, query, b.name)
var nullSize sql.NullInt64
if err := row.Scan(&nullSize); err != nil {
return errors.WithStack(err)
}
if err := row.Err(); err != nil {
return errors.WithStack(err)
}
size = nullSize.Int64
return nil
})
if err != nil {
return 0, errors.WithStack(err)
}
return size, nil
}
// Name implements storage.BlobBucket
func (b *BlobBucket) Name() string {
return b.name
}
// Close implements storage.BlobBucket
func (b *BlobBucket) Close() error {
logger.Debug(
context.Background(), "closing bucket",
logger.F("alreadyClosed", b.closed),
logger.F("name", b.name),
)
b.closed = true
return nil
}
// Delete implements storage.BlobBucket
func (b *BlobBucket) Delete(ctx context.Context, id storage.BlobID) error {
err := b.withTx(ctx, func(tx *sql.Tx) error {
query := `DELETE FROM blobs WHERE bucket = $1 AND id = $2`
args := []any{b.name, id}
logger.Debug(ctx, "executing query", logger.F("query", query), logger.F("args", args))
if _, err := tx.ExecContext(ctx, query, args...); err != nil {
return errors.WithStack(err)
}
return nil
})
if err != nil {
return errors.WithStack(err)
}
return nil
}
// Get implements storage.BlobBucket
func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobInfo, error) {
var blobInfo *BlobInfo
err := b.withTx(ctx, func(tx *sql.Tx) error {
query := `SELECT content_type, mod_time, size FROM blobs WHERE bucket = $1 AND id = $2`
args := []any{b.name, id}
logger.Debug(ctx, "executing query", logger.F("query", query), logger.F("args", args))
row := tx.QueryRowContext(ctx, query, args...)
var (
contentType string
modTime time.Time
size int64
)
if err := row.Scan(&contentType, &modTime, &size); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return errors.WithStack(storage.ErrBlobNotFound)
}
return errors.WithStack(err)
}
if err := row.Err(); err != nil {
return errors.WithStack(err)
}
blobInfo = &BlobInfo{
id: id,
bucket: b.name,
contentType: contentType,
modTime: modTime,
size: size,
}
return nil
})
if err != nil {
return nil, errors.WithStack(err)
}
return blobInfo, nil
}
// List implements storage.BlobBucket
func (b *BlobBucket) List(ctx context.Context) ([]storage.BlobInfo, error) {
var blobs []storage.BlobInfo
err := b.withTx(ctx, func(tx *sql.Tx) error {
query := `SELECT id, content_type, mod_time, size FROM blobs WHERE bucket = $1`
args := []any{b.name}
logger.Debug(ctx, "executing query", logger.F("query", query), logger.F("args", args))
rows, err := tx.QueryContext(ctx, query, args...)
if err != nil {
return errors.WithStack(err)
}
defer func() {
if err := rows.Close(); err != nil {
logger.Error(ctx, "could not close rows", logger.CapturedE(errors.WithStack(err)))
}
}()
blobs = make([]storage.BlobInfo, 0)
for rows.Next() {
var (
blobID string
contentType string
modTime time.Time
size int64
)
if err := rows.Scan(&blobID, &contentType, &modTime, &size); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return errors.WithStack(storage.ErrBlobNotFound)
}
return errors.WithStack(err)
}
blobInfo := &BlobInfo{
id: storage.BlobID(blobID),
bucket: b.name,
contentType: contentType,
modTime: modTime,
size: size,
}
blobs = append(blobs, blobInfo)
}
if err := rows.Err(); err != nil {
return errors.WithStack(err)
}
return nil
})
if err != nil {
return nil, errors.WithStack(err)
}
return blobs, nil
}
// NewReader implements storage.BlobBucket
func (b *BlobBucket) NewReader(ctx context.Context, id storage.BlobID) (io.ReadSeekCloser, error) {
if b.closed {
return nil, errors.WithStack(storage.ErrBucketClosed)
}
return &blobReaderCloser{
id: id,
bucket: b.name,
getDB: b.getDB,
}, nil
}
// NewWriter implements storage.BlobBucket
func (b *BlobBucket) NewWriter(ctx context.Context, id storage.BlobID) (io.WriteCloser, error) {
if b.closed {
return nil, errors.WithStack(storage.ErrBucketClosed)
}
return &blobWriterCloser{
id: id,
bucket: b.name,
getDB: b.getDB,
buf: bytes.Buffer{},
}, nil
}
func (b *BlobBucket) withTx(ctx context.Context, fn func(tx *sql.Tx) error) error {
if b.closed {
return errors.WithStack(storage.ErrBucketClosed)
}
db, err := b.getDB(ctx)
if err != nil {
return errors.WithStack(err)
}
if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
return errors.WithStack(err)
}
return nil
}
type blobWriterCloser struct {
id storage.BlobID
bucket string
getDB GetDBFunc
buf bytes.Buffer
closed bool
}
// Write implements io.WriteCloser
func (wbc *blobWriterCloser) Write(p []byte) (int, error) {
logger.Debug(
context.Background(), "writing data to blob",
logger.F("size", len(p)),
logger.F("blobID", wbc.id),
logger.F("bucket", wbc.bucket),
)
n, err := wbc.buf.Write(p)
if err != nil {
return n, errors.WithStack(err)
}
return n, nil
}
// Close implements io.WriteCloser
func (wbc *blobWriterCloser) Close() error {
ctx := context.Background()
logger.Debug(
ctx, "closing writer",
logger.F("alreadyClosed", wbc.closed),
logger.F("bucket", wbc.bucket),
logger.F("blobID", wbc.id),
)
if wbc.closed {
return nil
}
err := wbc.withTx(ctx, func(tx *sql.Tx) error {
query := `
INSERT INTO blobs (bucket, id, data, content_type, mod_time, size)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (id, bucket) DO UPDATE SET
data = $3, content_type = $4, mod_time = $5, size = $6
`
data := wbc.buf.Bytes()
mime := mimetype.Detect(data)
modTime := time.Now().UTC()
args := []any{
wbc.bucket,
wbc.id,
data,
mime.String(),
modTime,
len(data),
}
logger.Debug(ctx, "executing query", logger.F("query", query))
_, err := tx.Exec(
query,
args...,
)
if err != nil {
return errors.WithStack(err)
}
return nil
})
if err != nil {
return errors.WithStack(err)
}
wbc.closed = true
return nil
}
func (wbc *blobWriterCloser) withTx(ctx context.Context, fn func(tx *sql.Tx) error) error {
if wbc.closed {
return errors.WithStack(io.ErrClosedPipe)
}
db, err := wbc.getDB(ctx)
if err != nil {
return errors.WithStack(err)
}
if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
return errors.WithStack(err)
}
return nil
}
type blobReaderCloser struct {
id storage.BlobID
bucket string
getDB GetDBFunc
reader bytes.Reader
once sync.Once
closed bool
}
// Read implements io.ReadSeekCloser
func (brc *blobReaderCloser) Read(p []byte) (int, error) {
var err error
brc.once.Do(func() {
err = brc.loadBlob()
})
if err != nil {
return 0, errors.WithStack(err)
}
n, err := brc.reader.Read(p)
if err != nil {
if errors.Is(err, io.EOF) {
return n, io.EOF
}
return n, errors.WithStack(err)
}
return n, nil
}
// Seek implements io.ReadSeekCloser
func (brc *blobReaderCloser) Seek(offset int64, whence int) (int64, error) {
var err error
brc.once.Do(func() {
err = brc.loadBlob()
})
if err != nil {
return 0, errors.WithStack(err)
}
n, err := brc.reader.Seek(offset, whence)
if err != nil {
return n, errors.WithStack(err)
}
return n, nil
}
func (brc *blobReaderCloser) loadBlob() error {
ctx := context.Background()
logger.Debug(ctx, "loading blob", logger.F("alreadyClosed", brc.closed))
err := brc.withTx(ctx, func(tx *sql.Tx) error {
query := `SELECT data FROM blobs WHERE bucket = $1 AND id = $2`
row := tx.QueryRow(query, brc.bucket, brc.id)
var data []byte
if err := row.Scan(&data); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return errors.WithStack(storage.ErrBlobNotFound)
}
return errors.WithStack(err)
}
brc.reader = *bytes.NewReader(data)
return nil
})
if err != nil {
return errors.WithStack(err)
}
return nil
}
// Close implements io.ReadSeekCloser
func (brc *blobReaderCloser) Close() error {
logger.Debug(
context.Background(), "closing reader",
logger.F("alreadyClosed", brc.closed),
logger.F("bucket", brc.bucket),
logger.F("blobID", brc.id),
)
brc.closed = true
return nil
}
func (brc *blobReaderCloser) withTx(ctx context.Context, fn func(tx *sql.Tx) error) error {
db, err := brc.getDB(ctx)
if err != nil {
return errors.WithStack(err)
}
if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
return errors.WithStack(err)
}
return nil
}
var (
_ storage.BlobBucket = &BlobBucket{}
_ storage.BlobInfo = &BlobInfo{}
_ io.WriteCloser = &blobWriterCloser{}
_ io.ReadSeekCloser = &blobReaderCloser{}
)