package sqlite import ( "bytes" "context" "database/sql" "io" "sync" "time" "forge.cadoles.com/arcad/edge/pkg/storage" "github.com/gabriel-vasile/mimetype" "github.com/pkg/errors" "gitlab.com/wpetit/goweb/logger" ) type BlobBucket struct { name string getDB GetDBFunc closed bool } // Size implements storage.BlobBucket func (b *BlobBucket) Size(ctx context.Context) (int64, error) { var size int64 err := b.withTx(ctx, func(tx *sql.Tx) error { query := `SELECT SUM(size) FROM blobs WHERE bucket = $1` row := tx.QueryRowContext(ctx, query, b.name) var nullSize sql.NullInt64 if err := row.Scan(&nullSize); err != nil { return errors.WithStack(err) } if err := row.Err(); err != nil { return errors.WithStack(err) } size = nullSize.Int64 return nil }) if err != nil { return 0, errors.WithStack(err) } return size, nil } // Name implements storage.BlobBucket func (b *BlobBucket) Name() string { return b.name } // Close implements storage.BlobBucket func (b *BlobBucket) Close() error { logger.Debug( context.Background(), "closing bucket", logger.F("alreadyClosed", b.closed), logger.F("name", b.name), ) b.closed = true return nil } // Delete implements storage.BlobBucket func (b *BlobBucket) Delete(ctx context.Context, id storage.BlobID) error { err := b.withTx(ctx, func(tx *sql.Tx) error { query := `DELETE FROM blobs WHERE bucket = $1 AND id = $2` args := []any{b.name, id} logger.Debug(ctx, "executing query", logger.F("query", query), logger.F("args", args)) if _, err := tx.ExecContext(ctx, query, args...); err != nil { return errors.WithStack(err) } return nil }) if err != nil { return errors.WithStack(err) } return nil } // Get implements storage.BlobBucket func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobInfo, error) { var blobInfo *BlobInfo err := b.withTx(ctx, func(tx *sql.Tx) error { query := `SELECT content_type, mod_time, size FROM blobs WHERE bucket = $1 AND id = $2` args := []any{b.name, id} logger.Debug(ctx, "executing query", logger.F("query", query), logger.F("args", args)) row := tx.QueryRowContext(ctx, query, args...) var ( contentType string modTime time.Time size int64 ) if err := row.Scan(&contentType, &modTime, &size); err != nil { if errors.Is(err, sql.ErrNoRows) { return errors.WithStack(storage.ErrBlobNotFound) } return errors.WithStack(err) } if err := row.Err(); err != nil { return errors.WithStack(err) } blobInfo = &BlobInfo{ id: id, bucket: b.name, contentType: contentType, modTime: modTime, size: size, } return nil }) if err != nil { return nil, errors.WithStack(err) } return blobInfo, nil } // List implements storage.BlobBucket func (b *BlobBucket) List(ctx context.Context) ([]storage.BlobInfo, error) { var blobs []storage.BlobInfo err := b.withTx(ctx, func(tx *sql.Tx) error { query := `SELECT id, content_type, mod_time, size FROM blobs WHERE bucket = $1` args := []any{b.name} logger.Debug(ctx, "executing query", logger.F("query", query), logger.F("args", args)) rows, err := tx.QueryContext(ctx, query, args...) if err != nil { return errors.WithStack(err) } defer func() { if err := rows.Close(); err != nil { logger.Error(ctx, "could not close rows", logger.CapturedE(errors.WithStack(err))) } }() blobs = make([]storage.BlobInfo, 0) for rows.Next() { var ( blobID string contentType string modTime time.Time size int64 ) if err := rows.Scan(&blobID, &contentType, &modTime, &size); err != nil { if errors.Is(err, sql.ErrNoRows) { return errors.WithStack(storage.ErrBlobNotFound) } return errors.WithStack(err) } blobInfo := &BlobInfo{ id: storage.BlobID(blobID), bucket: b.name, contentType: contentType, modTime: modTime, size: size, } blobs = append(blobs, blobInfo) } if err := rows.Err(); err != nil { return errors.WithStack(err) } return nil }) if err != nil { return nil, errors.WithStack(err) } return blobs, nil } // NewReader implements storage.BlobBucket func (b *BlobBucket) NewReader(ctx context.Context, id storage.BlobID) (io.ReadSeekCloser, error) { if b.closed { return nil, errors.WithStack(storage.ErrBucketClosed) } return &blobReaderCloser{ id: id, bucket: b.name, getDB: b.getDB, }, nil } // NewWriter implements storage.BlobBucket func (b *BlobBucket) NewWriter(ctx context.Context, id storage.BlobID) (io.WriteCloser, error) { if b.closed { return nil, errors.WithStack(storage.ErrBucketClosed) } return &blobWriterCloser{ id: id, bucket: b.name, getDB: b.getDB, buf: bytes.Buffer{}, }, nil } func (b *BlobBucket) withTx(ctx context.Context, fn func(tx *sql.Tx) error) error { if b.closed { return errors.WithStack(storage.ErrBucketClosed) } db, err := b.getDB(ctx) if err != nil { return errors.WithStack(err) } if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil { return errors.WithStack(err) } return nil } type blobWriterCloser struct { id storage.BlobID bucket string getDB GetDBFunc buf bytes.Buffer closed bool } // Write implements io.WriteCloser func (wbc *blobWriterCloser) Write(p []byte) (int, error) { logger.Debug( context.Background(), "writing data to blob", logger.F("size", len(p)), logger.F("blobID", wbc.id), logger.F("bucket", wbc.bucket), ) n, err := wbc.buf.Write(p) if err != nil { return n, errors.WithStack(err) } return n, nil } // Close implements io.WriteCloser func (wbc *blobWriterCloser) Close() error { ctx := context.Background() logger.Debug( ctx, "closing writer", logger.F("alreadyClosed", wbc.closed), logger.F("bucket", wbc.bucket), logger.F("blobID", wbc.id), ) if wbc.closed { return nil } err := wbc.withTx(ctx, func(tx *sql.Tx) error { query := ` INSERT INTO blobs (bucket, id, data, content_type, mod_time, size) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (id, bucket) DO UPDATE SET data = $3, content_type = $4, mod_time = $5, size = $6 ` data := wbc.buf.Bytes() mime := mimetype.Detect(data) modTime := time.Now().UTC() args := []any{ wbc.bucket, wbc.id, data, mime.String(), modTime, len(data), } logger.Debug(ctx, "executing query", logger.F("query", query)) _, err := tx.Exec( query, args..., ) if err != nil { return errors.WithStack(err) } return nil }) if err != nil { return errors.WithStack(err) } wbc.closed = true return nil } func (wbc *blobWriterCloser) withTx(ctx context.Context, fn func(tx *sql.Tx) error) error { if wbc.closed { return errors.WithStack(io.ErrClosedPipe) } db, err := wbc.getDB(ctx) if err != nil { return errors.WithStack(err) } if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil { return errors.WithStack(err) } return nil } type blobReaderCloser struct { id storage.BlobID bucket string getDB GetDBFunc reader bytes.Reader once sync.Once closed bool } // Read implements io.ReadSeekCloser func (brc *blobReaderCloser) Read(p []byte) (int, error) { var err error brc.once.Do(func() { err = brc.loadBlob() }) if err != nil { return 0, errors.WithStack(err) } n, err := brc.reader.Read(p) if err != nil { if errors.Is(err, io.EOF) { return n, io.EOF } return n, errors.WithStack(err) } return n, nil } // Seek implements io.ReadSeekCloser func (brc *blobReaderCloser) Seek(offset int64, whence int) (int64, error) { var err error brc.once.Do(func() { err = brc.loadBlob() }) if err != nil { return 0, errors.WithStack(err) } n, err := brc.reader.Seek(offset, whence) if err != nil { return n, errors.WithStack(err) } return n, nil } func (brc *blobReaderCloser) loadBlob() error { ctx := context.Background() logger.Debug(ctx, "loading blob", logger.F("alreadyClosed", brc.closed)) err := brc.withTx(ctx, func(tx *sql.Tx) error { query := `SELECT data FROM blobs WHERE bucket = $1 AND id = $2` row := tx.QueryRow(query, brc.bucket, brc.id) var data []byte if err := row.Scan(&data); err != nil { if errors.Is(err, sql.ErrNoRows) { return errors.WithStack(storage.ErrBlobNotFound) } return errors.WithStack(err) } brc.reader = *bytes.NewReader(data) return nil }) if err != nil { return errors.WithStack(err) } return nil } // Close implements io.ReadSeekCloser func (brc *blobReaderCloser) Close() error { logger.Debug( context.Background(), "closing reader", logger.F("alreadyClosed", brc.closed), logger.F("bucket", brc.bucket), logger.F("blobID", brc.id), ) brc.closed = true return nil } func (brc *blobReaderCloser) withTx(ctx context.Context, fn func(tx *sql.Tx) error) error { db, err := brc.getDB(ctx) if err != nil { return errors.WithStack(err) } if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil { return errors.WithStack(err) } return nil } var ( _ storage.BlobBucket = &BlobBucket{} _ storage.BlobInfo = &BlobInfo{} _ io.WriteCloser = &blobWriterCloser{} _ io.ReadSeekCloser = &blobReaderCloser{} )