feat(storage): rpc based implementation
All checks were successful
arcad/edge/pipeline/pr-master This commit looks good

This commit is contained in:
2023-09-12 22:03:25 -06:00
parent c3535a4a9b
commit 8e574c299b
113 changed files with 3007 additions and 263 deletions

View File

@ -0,0 +1,459 @@
package sqlite
import (
"bytes"
"context"
"database/sql"
"io"
"sync"
"time"
"forge.cadoles.com/arcad/edge/pkg/storage"
"github.com/gabriel-vasile/mimetype"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
type BlobBucket struct {
name string
getDB GetDBFunc
closed bool
}
// Size implements storage.BlobBucket
func (b *BlobBucket) Size(ctx context.Context) (int64, error) {
var size int64
err := b.withTx(ctx, func(tx *sql.Tx) error {
query := `SELECT SUM(size) FROM blobs WHERE bucket = $1`
row := tx.QueryRowContext(ctx, query, b.name)
var nullSize sql.NullInt64
if err := row.Scan(&nullSize); err != nil {
return errors.WithStack(err)
}
if err := row.Err(); err != nil {
return errors.WithStack(err)
}
size = nullSize.Int64
return nil
})
if err != nil {
return 0, errors.WithStack(err)
}
return size, nil
}
// Name implements storage.BlobBucket
func (b *BlobBucket) Name() string {
return b.name
}
// Close implements storage.BlobBucket
func (b *BlobBucket) Close() error {
logger.Debug(
context.Background(), "closing bucket",
logger.F("alreadyClosed", b.closed),
logger.F("name", b.name),
)
b.closed = true
return nil
}
// Delete implements storage.BlobBucket
func (b *BlobBucket) Delete(ctx context.Context, id storage.BlobID) error {
err := b.withTx(ctx, func(tx *sql.Tx) error {
query := `DELETE FROM blobs WHERE bucket = $1 AND id = $2`
args := []any{b.name, id}
logger.Debug(ctx, "executing query", logger.F("query", query), logger.F("args", args))
if _, err := tx.ExecContext(ctx, query, args...); err != nil {
return errors.WithStack(err)
}
return nil
})
if err != nil {
return errors.WithStack(err)
}
return nil
}
// Get implements storage.BlobBucket
func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobInfo, error) {
var blobInfo *BlobInfo
err := b.withTx(ctx, func(tx *sql.Tx) error {
query := `SELECT content_type, mod_time, size FROM blobs WHERE bucket = $1 AND id = $2`
args := []any{b.name, id}
logger.Debug(ctx, "executing query", logger.F("query", query), logger.F("args", args))
row := tx.QueryRowContext(ctx, query, args...)
var (
contentType string
modTime time.Time
size int64
)
if err := row.Scan(&contentType, &modTime, &size); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return errors.WithStack(storage.ErrBlobNotFound)
}
return errors.WithStack(err)
}
if err := row.Err(); err != nil {
return errors.WithStack(err)
}
blobInfo = &BlobInfo{
id: id,
bucket: b.name,
contentType: contentType,
modTime: modTime,
size: size,
}
return nil
})
if err != nil {
return nil, errors.WithStack(err)
}
return blobInfo, nil
}
// List implements storage.BlobBucket
func (b *BlobBucket) List(ctx context.Context) ([]storage.BlobInfo, error) {
var blobs []storage.BlobInfo
err := b.withTx(ctx, func(tx *sql.Tx) error {
query := `SELECT id, content_type, mod_time, size FROM blobs WHERE bucket = $1`
args := []any{b.name}
logger.Debug(ctx, "executing query", logger.F("query", query), logger.F("args", args))
rows, err := tx.QueryContext(ctx, query, args...)
if err != nil {
return errors.WithStack(err)
}
defer func() {
if err := rows.Close(); err != nil {
logger.Error(ctx, "could not close rows", logger.E(errors.WithStack(err)))
}
}()
blobs = make([]storage.BlobInfo, 0)
for rows.Next() {
var (
blobID string
contentType string
modTime time.Time
size int64
)
if err := rows.Scan(&blobID, &contentType, &modTime, &size); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return errors.WithStack(storage.ErrBlobNotFound)
}
return errors.WithStack(err)
}
blobInfo := &BlobInfo{
id: storage.BlobID(blobID),
bucket: b.name,
contentType: contentType,
modTime: modTime,
size: size,
}
blobs = append(blobs, blobInfo)
}
if err := rows.Err(); err != nil {
return errors.WithStack(err)
}
return nil
})
if err != nil {
return nil, errors.WithStack(err)
}
return blobs, nil
}
// NewReader implements storage.BlobBucket
func (b *BlobBucket) NewReader(ctx context.Context, id storage.BlobID) (io.ReadSeekCloser, error) {
if b.closed {
return nil, errors.WithStack(storage.ErrBucketClosed)
}
return &blobReaderCloser{
id: id,
bucket: b.name,
getDB: b.getDB,
}, nil
}
// NewWriter implements storage.BlobBucket
func (b *BlobBucket) NewWriter(ctx context.Context, id storage.BlobID) (io.WriteCloser, error) {
if b.closed {
return nil, errors.WithStack(storage.ErrBucketClosed)
}
return &blobWriterCloser{
id: id,
bucket: b.name,
getDB: b.getDB,
buf: bytes.Buffer{},
}, nil
}
func (b *BlobBucket) withTx(ctx context.Context, fn func(tx *sql.Tx) error) error {
if b.closed {
return errors.WithStack(storage.ErrBucketClosed)
}
db, err := b.getDB(ctx)
if err != nil {
return errors.WithStack(err)
}
if err := WithTx(ctx, db, fn); err != nil {
return errors.WithStack(err)
}
return nil
}
type blobWriterCloser struct {
id storage.BlobID
bucket string
getDB GetDBFunc
buf bytes.Buffer
closed bool
}
// Write implements io.WriteCloser
func (wbc *blobWriterCloser) Write(p []byte) (int, error) {
logger.Debug(
context.Background(), "writing data to blob",
logger.F("size", len(p)),
logger.F("blobID", wbc.id),
logger.F("bucket", wbc.bucket),
)
n, err := wbc.buf.Write(p)
if err != nil {
return n, errors.WithStack(err)
}
return n, nil
}
// Close implements io.WriteCloser
func (wbc *blobWriterCloser) Close() error {
ctx := context.Background()
logger.Debug(
ctx, "closing writer",
logger.F("alreadyClosed", wbc.closed),
logger.F("bucket", wbc.bucket),
logger.F("blobID", wbc.id),
)
if wbc.closed {
return nil
}
err := wbc.withTx(ctx, func(tx *sql.Tx) error {
query := `
INSERT INTO blobs (bucket, id, data, content_type, mod_time, size)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (id, bucket) DO UPDATE SET
data = $3, content_type = $4, mod_time = $5, size = $6
`
data := wbc.buf.Bytes()
mime := mimetype.Detect(data)
modTime := time.Now().UTC()
args := []any{
wbc.bucket,
wbc.id,
data,
mime.String(),
modTime,
len(data),
}
logger.Debug(ctx, "executing query", logger.F("query", query))
_, err := tx.Exec(
query,
args...,
)
if err != nil {
return errors.WithStack(err)
}
return nil
})
if err != nil {
return errors.WithStack(err)
}
wbc.closed = true
return nil
}
func (wbc *blobWriterCloser) withTx(ctx context.Context, fn func(tx *sql.Tx) error) error {
if wbc.closed {
return errors.WithStack(io.ErrClosedPipe)
}
db, err := wbc.getDB(ctx)
if err != nil {
return errors.WithStack(err)
}
if err := WithTx(ctx, db, fn); err != nil {
return errors.WithStack(err)
}
return nil
}
type blobReaderCloser struct {
id storage.BlobID
bucket string
getDB GetDBFunc
reader bytes.Reader
once sync.Once
closed bool
}
// Read implements io.ReadSeekCloser
func (brc *blobReaderCloser) Read(p []byte) (int, error) {
var err error
brc.once.Do(func() {
err = brc.loadBlob()
})
if err != nil {
return 0, errors.WithStack(err)
}
n, err := brc.reader.Read(p)
if err != nil {
if errors.Is(err, io.EOF) {
return n, io.EOF
}
return n, errors.WithStack(err)
}
return n, nil
}
// Seek implements io.ReadSeekCloser
func (brc *blobReaderCloser) Seek(offset int64, whence int) (int64, error) {
var err error
brc.once.Do(func() {
err = brc.loadBlob()
})
if err != nil {
return 0, errors.WithStack(err)
}
n, err := brc.reader.Seek(offset, whence)
if err != nil {
return n, errors.WithStack(err)
}
return n, nil
}
func (brc *blobReaderCloser) loadBlob() error {
ctx := context.Background()
logger.Debug(ctx, "loading blob", logger.F("alreadyClosed", brc.closed))
err := brc.withTx(ctx, func(tx *sql.Tx) error {
query := `SELECT data FROM blobs WHERE bucket = $1 AND id = $2`
row := tx.QueryRow(query, brc.bucket, brc.id)
var data []byte
if err := row.Scan(&data); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return errors.WithStack(storage.ErrBlobNotFound)
}
return errors.WithStack(err)
}
brc.reader = *bytes.NewReader(data)
return nil
})
if err != nil {
return errors.WithStack(err)
}
return nil
}
// Close implements io.ReadSeekCloser
func (brc *blobReaderCloser) Close() error {
logger.Debug(
context.Background(), "closing reader",
logger.F("alreadyClosed", brc.closed),
logger.F("bucket", brc.bucket),
logger.F("blobID", brc.id),
)
brc.closed = true
return nil
}
func (brc *blobReaderCloser) withTx(ctx context.Context, fn func(tx *sql.Tx) error) error {
db, err := brc.getDB(ctx)
if err != nil {
return errors.WithStack(err)
}
if err := WithTx(ctx, db, fn); err != nil {
return errors.WithStack(err)
}
return nil
}
var (
_ storage.BlobBucket = &BlobBucket{}
_ storage.BlobInfo = &BlobInfo{}
_ io.WriteCloser = &blobWriterCloser{}
_ io.ReadSeekCloser = &blobReaderCloser{}
)

View File

@ -0,0 +1,40 @@
package sqlite
import (
"time"
"forge.cadoles.com/arcad/edge/pkg/storage"
)
type BlobInfo struct {
id storage.BlobID
bucket string
contentType string
modTime time.Time
size int64
}
// Bucket implements storage.BlobInfo
func (i *BlobInfo) Bucket() string {
return i.bucket
}
// ID implements storage.BlobInfo
func (i *BlobInfo) ID() storage.BlobID {
return i.id
}
// ContentType implements storage.BlobInfo
func (i *BlobInfo) ContentType() string {
return i.contentType
}
// ModTime implements storage.BlobInfo
func (i *BlobInfo) ModTime() time.Time {
return i.modTime
}
// Size implements storage.BlobInfo
func (i *BlobInfo) Size() int64 {
return i.size
}

View File

@ -0,0 +1,136 @@
package sqlite
import (
"context"
"database/sql"
"forge.cadoles.com/arcad/edge/pkg/storage"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
type BlobStore struct {
getDB GetDBFunc
}
// DeleteBucket implements storage.BlobStore
func (s *BlobStore) DeleteBucket(ctx context.Context, name string) error {
err := s.withTx(ctx, func(tx *sql.Tx) error {
query := `DELETE FROM blobs WHERE bucket = $1`
_, err := tx.ExecContext(ctx, query, name)
if err != nil {
return errors.WithStack(err)
}
return nil
})
if err != nil {
return errors.WithStack(err)
}
return nil
}
// ListBuckets implements storage.BlobStore
func (s *BlobStore) ListBuckets(ctx context.Context) ([]string, error) {
buckets := make([]string, 0)
err := s.withTx(ctx, func(tx *sql.Tx) error {
query := `SELECT DISTINCT bucket FROM blobs`
rows, err := tx.QueryContext(ctx, query)
if err != nil {
return errors.WithStack(err)
}
defer func() {
if err := rows.Close(); err != nil {
logger.Error(ctx, "could not close rows", logger.E(errors.WithStack(err)))
}
}()
for rows.Next() {
var name string
if err := rows.Scan(&name); err != nil {
return errors.WithStack(err)
}
buckets = append(buckets, name)
}
if err := rows.Err(); err != nil {
return errors.WithStack(err)
}
return nil
})
if err != nil {
return nil, errors.WithStack(err)
}
return buckets, nil
}
// OpenBucket implements storage.BlobStore
func (s *BlobStore) OpenBucket(ctx context.Context, name string) (storage.BlobBucket, error) {
return &BlobBucket{
name: name,
getDB: s.getDB,
}, nil
}
func ensureBlobTables(ctx context.Context, db *sql.DB) error {
logger.Debug(ctx, "creating blobs table")
err := WithTx(ctx, db, func(tx *sql.Tx) error {
query := `
CREATE TABLE IF NOT EXISTS blobs (
id TEXT,
bucket TEXT,
data BLOB,
content_type TEXT NOT NULL,
mod_time TIMESTAMP NOT NULL,
size INTEGER,
PRIMARY KEY (id, bucket)
);
`
if _, err := tx.ExecContext(ctx, query); err != nil {
return errors.WithStack(err)
}
return nil
})
if err != nil {
return errors.WithStack(err)
}
return nil
}
func (s *BlobStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) error {
var db *sql.DB
db, err := s.getDB(ctx)
if err != nil {
return errors.WithStack(err)
}
if err := WithTx(ctx, db, fn); err != nil {
return errors.WithStack(err)
}
return nil
}
func NewBlobStore(dsn string) *BlobStore {
getDB := NewGetDBFunc(dsn, ensureBlobTables)
return &BlobStore{getDB}
}
func NewBlobStoreWithDB(db *sql.DB) *BlobStore {
getDB := NewGetDBFuncFromDB(db, ensureBlobTables)
return &BlobStore{getDB}
}
var _ storage.BlobStore = &BlobStore{}

View File

@ -0,0 +1,46 @@
package sqlite
import (
"context"
"fmt"
"os"
"testing"
"time"
"forge.cadoles.com/arcad/edge/pkg/storage/testsuite"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
func TestBlobStore(t *testing.T) {
t.Parallel()
if testing.Verbose() {
logger.SetLevel(logger.LevelDebug)
}
file := "./testdata/blobstore_test.sqlite"
if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) {
t.Fatalf("%+v", errors.WithStack(err))
}
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
store := NewBlobStore(dsn)
testsuite.TestBlobStore(context.Background(), t, store)
}
func BenchmarkBlobStore(t *testing.B) {
logger.SetLevel(logger.LevelError)
file := "./testdata/blobstore_test.sqlite"
if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) {
t.Fatalf("%+v", errors.WithStack(err))
}
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
store := NewBlobStore(dsn)
testsuite.BenchmarkBlobStore(t, store)
}

View File

@ -0,0 +1,362 @@
package sqlite
import (
"context"
"database/sql"
"fmt"
"math"
"time"
"forge.cadoles.com/arcad/edge/pkg/storage"
"forge.cadoles.com/arcad/edge/pkg/storage/filter"
filterSQL "forge.cadoles.com/arcad/edge/pkg/storage/filter/sql"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
_ "modernc.org/sqlite"
)
type DocumentStore struct {
getDB GetDBFunc
}
// Delete implements storage.DocumentStore
func (s *DocumentStore) Delete(ctx context.Context, collection string, id storage.DocumentID) error {
err := s.withTx(ctx, func(tx *sql.Tx) error {
query := `
DELETE FROM documents
WHERE collection = $1 AND id = $2
`
_, err := tx.ExecContext(ctx, query, collection, string(id))
if err != nil {
return errors.WithStack(err)
}
return nil
})
if err != nil {
return errors.WithStack(err)
}
return nil
}
// Get implements storage.DocumentStore
func (s *DocumentStore) Get(ctx context.Context, collection string, id storage.DocumentID) (storage.Document, error) {
var document storage.Document
err := s.withTx(ctx, func(tx *sql.Tx) error {
query := `
SELECT id, data, created_at, updated_at
FROM documents
WHERE collection = $1 AND id = $2
`
row := tx.QueryRowContext(ctx, query, collection, string(id))
var (
createdAt time.Time
updatedAt time.Time
data JSONMap
)
err := row.Scan(&id, &data, &createdAt, &updatedAt)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return errors.WithStack(storage.ErrDocumentNotFound)
}
return errors.WithStack(err)
}
if err := row.Err(); err != nil {
return errors.WithStack(err)
}
document = storage.Document(data)
document[storage.DocumentAttrID] = id
document[storage.DocumentAttrCreatedAt] = createdAt
document[storage.DocumentAttrUpdatedAt] = updatedAt
return nil
})
if err != nil {
return nil, errors.WithStack(err)
}
return document, nil
}
// Query implements storage.DocumentStore
func (s *DocumentStore) Query(ctx context.Context, collection string, filter *filter.Filter, funcs ...storage.QueryOptionFunc) ([]storage.Document, error) {
opts := &storage.QueryOptions{}
for _, fn := range funcs {
fn(opts)
}
var documents []storage.Document
err := s.withTx(ctx, func(tx *sql.Tx) error {
criteria := "1 = 1"
args := make([]any, 0)
var err error
if filter != nil {
criteria, args, err = filterSQL.ToSQL(
filter.Root(),
filterSQL.WithPreparedParameter("$", 2),
filterSQL.WithTransform(transformOperator),
filterSQL.WithKeyTransform(func(key string) string {
return fmt.Sprintf("json_extract(data, '$.%s')", key)
}),
)
if err != nil {
return errors.WithStack(err)
}
}
query := `
SELECT id, data, created_at, updated_at
FROM documents
WHERE collection = $1 AND (` + criteria + `)
`
args = append([]interface{}{collection}, args...)
if opts.OrderBy != nil {
direction := storage.OrderDirectionAsc
if opts.OrderDirection != nil {
direction = *opts.OrderDirection
}
query, args = withOrderByClause(query, args, *opts.OrderBy, direction)
}
if opts.Offset != nil || opts.Limit != nil {
offset := 0
if opts.Offset != nil {
offset = *opts.Offset
}
limit := math.MaxInt
if opts.Limit != nil {
limit = *opts.Limit
}
query, args = withLimitOffsetClause(query, args, limit, offset)
}
logger.Debug(
ctx, "executing query",
logger.F("query", query),
logger.F("args", args),
)
rows, err := tx.QueryContext(ctx, query, args...)
if err != nil {
return errors.WithStack(err)
}
defer func() {
if err := rows.Close(); err != nil {
logger.Error(ctx, "could not close rows", logger.E(errors.WithStack(err)))
}
}()
documents = make([]storage.Document, 0)
for rows.Next() {
var (
id storage.DocumentID
createdAt time.Time
updatedAt time.Time
data JSONMap
)
if err := rows.Scan(&id, &data, &createdAt, &updatedAt); err != nil {
return errors.WithStack(err)
}
document := storage.Document(data)
document[storage.DocumentAttrID] = id
document[storage.DocumentAttrCreatedAt] = createdAt
document[storage.DocumentAttrUpdatedAt] = updatedAt
documents = append(documents, document)
}
if err := rows.Err(); err != nil {
return errors.WithStack(err)
}
return nil
})
if err != nil {
return nil, errors.WithStack(err)
}
return documents, nil
}
// Upsert implements storage.DocumentStore
func (s *DocumentStore) Upsert(ctx context.Context, collection string, document storage.Document) (storage.Document, error) {
var upsertedDocument storage.Document
err := s.withTx(ctx, func(tx *sql.Tx) error {
query := `
INSERT INTO documents (id, collection, data, created_at, updated_at)
VALUES($1, $2, $3, $4, $4)
ON CONFLICT (id, collection) DO UPDATE SET
data = $3, updated_at = $4
RETURNING "id", "data", "created_at", "updated_at"
`
now := time.Now().UTC()
id, exists := document.ID()
if !exists || id == "" {
id = storage.NewDocumentID()
}
args := []any{id, collection, JSONMap(document), now, now}
logger.Debug(
ctx, "executing query",
logger.F("query", query),
logger.F("args", args),
)
row := tx.QueryRowContext(ctx, query, args...)
var (
createdAt time.Time
updatedAt time.Time
data JSONMap
)
err := row.Scan(&id, &data, &createdAt, &updatedAt)
if err != nil {
return errors.WithStack(err)
}
if err := row.Err(); err != nil {
return errors.WithStack(err)
}
upsertedDocument = storage.Document(data)
upsertedDocument[storage.DocumentAttrID] = id
upsertedDocument[storage.DocumentAttrCreatedAt] = createdAt
upsertedDocument[storage.DocumentAttrUpdatedAt] = updatedAt
return nil
})
if err != nil {
return nil, errors.WithStack(err)
}
return upsertedDocument, nil
}
func (s *DocumentStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) error {
var db *sql.DB
db, err := s.getDB(ctx)
if err != nil {
return errors.WithStack(err)
}
if err := WithTx(ctx, db, fn); err != nil {
return errors.WithStack(err)
}
return nil
}
func ensureDocumentTables(ctx context.Context, db *sql.DB) error {
err := WithTx(ctx, db, func(tx *sql.Tx) error {
query := `
CREATE TABLE IF NOT EXISTS documents (
id TEXT PRIMARY KEY,
collection TEXT NOT NULL,
data TEXT,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL,
UNIQUE(id, collection) ON CONFLICT REPLACE
);
`
if _, err := tx.ExecContext(ctx, query); err != nil {
return errors.WithStack(err)
}
query = `
CREATE INDEX IF NOT EXISTS collection_idx ON documents (collection);
`
if _, err := tx.ExecContext(ctx, query); err != nil {
return errors.WithStack(err)
}
return nil
})
if err != nil {
return errors.WithStack(err)
}
return nil
}
func withOrderByClause(query string, args []any, orderBy string, orderDirection storage.OrderDirection) (string, []any) {
direction := "ASC"
if orderDirection == storage.OrderDirectionDesc {
direction = "DESC"
}
var column string
switch orderBy {
case storage.DocumentAttrID:
column = "id"
case storage.DocumentAttrCreatedAt:
column = "created_at"
case storage.DocumentAttrUpdatedAt:
column = "updated_at"
default:
column = fmt.Sprintf("json_extract(data, '$.' || $%d)", len(args)+1)
args = append(args, orderBy)
}
query += fmt.Sprintf(` ORDER BY %s %s`, column, direction)
return query, args
}
func withLimitOffsetClause(query string, args []any, limit int, offset int) (string, []any) {
query += fmt.Sprintf(` LIMIT $%d OFFSET $%d`, len(args)+1, len(args)+2)
args = append(args, limit, offset)
return query, args
}
func NewDocumentStore(path string) *DocumentStore {
getDB := NewGetDBFunc(path, ensureDocumentTables)
return &DocumentStore{
getDB: getDB,
}
}
func NewDocumentStoreWithDB(db *sql.DB) *DocumentStore {
getDB := NewGetDBFuncFromDB(db, ensureDocumentTables)
return &DocumentStore{
getDB: getDB,
}
}
var _ storage.DocumentStore = &DocumentStore{}

View File

@ -0,0 +1,29 @@
package sqlite
import (
"context"
"fmt"
"os"
"testing"
"time"
"forge.cadoles.com/arcad/edge/pkg/storage/testsuite"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
func TestDocumentStore(t *testing.T) {
t.Parallel()
logger.SetLevel(logger.LevelDebug)
file := "./testdata/documentstore_test.sqlite"
if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) {
t.Fatalf("%+v", errors.WithStack(err))
}
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
store := NewDocumentStore(dsn)
testsuite.TestDocumentStore(context.Background(), t, store)
}

View File

@ -0,0 +1,75 @@
package sqlite
import (
"net/url"
"os"
"path/filepath"
"forge.cadoles.com/arcad/edge/pkg/storage"
"forge.cadoles.com/arcad/edge/pkg/storage/driver"
"forge.cadoles.com/arcad/edge/pkg/storage/share"
"github.com/pkg/errors"
)
func init() {
driver.RegisterDocumentStoreFactory("sqlite", documentStoreFactory)
driver.RegisterBlobStoreFactory("sqlite", blobStoreFactory)
driver.RegisterShareStoreFactory("sqlite", shareStoreFactory)
}
func documentStoreFactory(url *url.URL) (storage.DocumentStore, error) {
dir := filepath.Dir(url.Host + url.Path)
if dir != ":memory:" {
if err := os.MkdirAll(dir, os.FileMode(0750)); err != nil {
return nil, errors.WithStack(err)
}
}
path := url.Host + url.Path + "?" + url.RawQuery
db, err := Open(path)
if err != nil {
return nil, errors.WithStack(err)
}
return NewDocumentStoreWithDB(db), nil
}
func blobStoreFactory(url *url.URL) (storage.BlobStore, error) {
dir := filepath.Dir(url.Host + url.Path)
if dir != ":memory:" {
if err := os.MkdirAll(dir, os.FileMode(0750)); err != nil {
return nil, errors.WithStack(err)
}
}
path := url.Host + url.Path + "?" + url.RawQuery
db, err := Open(path)
if err != nil {
return nil, errors.WithStack(err)
}
return NewBlobStoreWithDB(db), nil
}
func shareStoreFactory(url *url.URL) (share.Store, error) {
dir := filepath.Dir(url.Host + url.Path)
if dir != ":memory:" {
if err := os.MkdirAll(dir, os.FileMode(0750)); err != nil {
return nil, errors.WithStack(err)
}
}
path := url.Host + url.Path + "?" + url.RawQuery
db, err := Open(path)
if err != nil {
return nil, errors.WithStack(err)
}
return NewShareStoreWithDB(db), nil
}

View File

@ -0,0 +1,50 @@
package sqlite
import (
"fmt"
"forge.cadoles.com/arcad/edge/pkg/storage"
"forge.cadoles.com/arcad/edge/pkg/storage/filter/sql"
)
func transformOperator(operator string, invert bool, key string, value any, option *sql.Option) (string, any, error) {
isDataAttr := true
switch key {
case storage.DocumentAttrCreatedAt:
key = "created_at"
isDataAttr = false
case storage.DocumentAttrUpdatedAt:
key = "updated_at"
isDataAttr = false
case storage.DocumentAttrID:
key = "id"
isDataAttr = false
}
if !isDataAttr {
option = &sql.Option{
PreparedParameter: option.PreparedParameter,
KeyTransform: func(key string) string {
return key
},
ValueTransform: option.ValueTransform,
Transform: option.Transform,
}
}
switch operator {
case sql.OpIn:
return transformInOperator(key, value, option)
default:
return sql.DefaultTransform(operator, invert, key, value, option)
}
}
func transformInOperator(key string, value any, option *sql.Option) (string, any, error) {
return fmt.Sprintf(
"EXISTS (SELECT 1 FROM json_each(json_extract(data, \"$.%v\")) WHERE value = %v)",
key,
option.PreparedParameter(),
), option.ValueTransform(value), nil
}

View File

@ -0,0 +1,42 @@
package sqlite
import (
"database/sql/driver"
"encoding/json"
"github.com/pkg/errors"
)
type JSONMap map[string]any
func (j *JSONMap) Scan(value interface{}) error {
if value == nil {
return nil
}
var data []byte
switch typ := value.(type) {
case []byte:
data = typ
case string:
data = []byte(typ)
default:
return errors.Errorf("unexpected type '%T'", value)
}
if err := json.Unmarshal(data, &j); err != nil {
return errors.WithStack(err)
}
return nil
}
func (j JSONMap) Value() (driver.Value, error) {
data, err := json.Marshal(j)
if err != nil {
return nil, errors.WithStack(err)
}
return data, nil
}

View File

@ -0,0 +1,428 @@
package sqlite
import (
"context"
"database/sql"
"fmt"
"time"
"forge.cadoles.com/arcad/edge/pkg/app"
"forge.cadoles.com/arcad/edge/pkg/storage/share"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
type ShareStore struct {
getDB GetDBFunc
}
// DeleteAttributes implements share.Repository
func (s *ShareStore) DeleteAttributes(ctx context.Context, origin app.ID, resourceID share.ResourceID, names ...string) error {
err := s.withTx(ctx, func(tx *sql.Tx) error {
query := `
DELETE FROM resources
WHERE origin = $1 AND resource_id = $2
`
args := []any{origin, resourceID}
criteria := ""
for idx, name := range names {
if idx == 0 {
criteria += " AND ("
}
if idx != 0 {
criteria += " OR "
}
criteria += fmt.Sprintf(" name = $%d", len(args)+1)
args = append(args, name)
if idx == len(names)-1 {
criteria += " )"
}
}
query += criteria
logger.Debug(
ctx, "executing query",
logger.F("query", query),
logger.F("args", args),
)
res, err := tx.ExecContext(ctx, query, args...)
if err != nil {
return errors.WithStack(err)
}
affected, err := res.RowsAffected()
if err != nil {
return errors.WithStack(err)
}
if affected == 0 {
return errors.WithStack(share.ErrNotFound)
}
return nil
})
if err != nil {
return errors.WithStack(err)
}
return err
}
// DeleteResource implements share.Repository
func (s *ShareStore) DeleteResource(ctx context.Context, origin app.ID, resourceID share.ResourceID) error {
err := s.withTx(ctx, func(tx *sql.Tx) error {
query := `
DELETE FROM resources
WHERE origin = $1 AND resource_id = $2
`
args := []any{origin, resourceID}
logger.Debug(
ctx, "executing query",
logger.F("query", query),
logger.F("args", args),
)
res, err := tx.ExecContext(ctx, query, args...)
if err != nil {
return errors.WithStack(err)
}
affected, err := res.RowsAffected()
if err != nil {
return errors.WithStack(err)
}
if affected == 0 {
return errors.WithStack(share.ErrNotFound)
}
return nil
})
if err != nil {
return errors.WithStack(err)
}
return err
}
// FindResources implements share.Repository
func (s *ShareStore) FindResources(ctx context.Context, funcs ...share.FindResourcesOptionFunc) ([]share.Resource, error) {
opts := share.NewFindResourcesOptions(funcs...)
var resources []share.Resource
err := s.withTx(ctx, func(tx *sql.Tx) error {
query := `
SELECT
main.origin, main.resource_id,
main.name, main.type, main.value,
main.created_at, main.updated_at
FROM resources AS main
JOIN resources AS sub ON
main.resource_id = sub.resource_id
AND main.origin = sub.origin
`
criteria := " WHERE 1 = 1"
preparedArgIndex := 1
args := make([]any, 0)
if opts.Name != nil {
criteria += fmt.Sprintf(" AND sub.name = $%d", preparedArgIndex)
args = append(args, *opts.Name)
preparedArgIndex++
}
if opts.ValueType != nil {
criteria += fmt.Sprintf(" AND sub.type = $%d", preparedArgIndex)
args = append(args, *opts.ValueType)
preparedArgIndex++
}
query += criteria
logger.Debug(
ctx, "executing query",
logger.F("query", query),
logger.F("args", args),
)
rows, err := tx.QueryContext(ctx, query, args...)
if err != nil {
return errors.WithStack(err)
}
defer func() {
if err := rows.Close(); err != nil {
logger.Error(ctx, "could not close rows", logger.E(errors.WithStack(err)))
}
}()
indexedResources := make(map[string]*share.BaseResource)
for rows.Next() {
var (
origin string
resourceID string
name string
valueType string
value any
updatedAt time.Time
createdAt time.Time
)
if err := rows.Scan(&origin, &resourceID, &name, &valueType, &value, &createdAt, &updatedAt); err != nil {
return errors.WithStack(err)
}
resourceKey := origin + resourceID
resource, exists := indexedResources[resourceKey]
if !exists {
resource = share.NewBaseResource(app.ID(origin), share.ResourceID(resourceID))
indexedResources[resourceKey] = resource
}
attr := share.NewBaseAttribute(
name,
share.ValueType(valueType),
value,
)
attr.SetCreatedAt(createdAt)
attr.SetUpdatedAt(updatedAt)
resource.SetAttribute(attr)
}
if err := rows.Err(); err != nil {
return errors.WithStack(err)
}
resources = make([]share.Resource, 0, len(indexedResources))
for _, res := range indexedResources {
resources = append(resources, res)
}
return nil
})
if err != nil {
return nil, errors.WithStack(err)
}
return resources, nil
}
// GetResource implements share.Repository
func (s *ShareStore) GetResource(ctx context.Context, origin app.ID, resourceID share.ResourceID) (share.Resource, error) {
var (
resource *share.BaseResource
err error
)
err = s.withTx(ctx, func(tx *sql.Tx) error {
resource, err = s.getResourceWithinTx(ctx, tx, origin, resourceID)
if err != nil {
return errors.WithStack(err)
}
return nil
})
if err != nil {
return nil, errors.WithStack(err)
}
return resource, nil
}
// UpdateAttributes implements share.Repository
func (s *ShareStore) UpdateAttributes(ctx context.Context, origin app.ID, resourceID share.ResourceID, attributes ...share.Attribute) (share.Resource, error) {
if len(attributes) == 0 {
return nil, errors.WithStack(share.ErrAttributeRequired)
}
var resource *share.BaseResource
err := s.withTx(ctx, func(tx *sql.Tx) error {
query := `
INSERT INTO resources (origin, resource_id, name, type, value, created_at, updated_at)
VALUES($1, $2, $3, $4, $5, $6, $6)
ON CONFLICT (origin, resource_id, name) DO UPDATE SET
type = $4, value = $5, updated_at = $6
`
stmt, err := tx.PrepareContext(ctx, query)
if err != nil {
return errors.WithStack(err)
}
defer func() {
if err := stmt.Close(); err != nil {
logger.Error(ctx, "could not close statement", logger.E(errors.WithStack(err)))
}
}()
now := time.Now().UTC()
for _, attr := range attributes {
args := []any{
string(origin), string(resourceID),
attr.Name(), string(attr.Type()), attr.Value(),
now, now,
}
logger.Debug(
ctx, "executing query",
logger.F("query", query),
logger.F("args", args),
)
if _, err := stmt.ExecContext(ctx, args...); err != nil {
return errors.WithStack(err)
}
}
resource, err = s.getResourceWithinTx(ctx, tx, origin, resourceID)
if err != nil {
return errors.WithStack(err)
}
return nil
})
if err != nil {
return nil, errors.WithStack(err)
}
return resource, nil
}
func (s *ShareStore) getResourceWithinTx(ctx context.Context, tx *sql.Tx, origin app.ID, resourceID share.ResourceID) (*share.BaseResource, error) {
query := `
SELECT name, type, value, created_at, updated_at
FROM resources
WHERE origin = $1 AND resource_id = $2
`
rows, err := tx.QueryContext(ctx, query, origin, resourceID)
if err != nil {
return nil, errors.WithStack(err)
}
defer func() {
if err := rows.Close(); err != nil {
logger.Error(ctx, "could not close rows", logger.E(errors.WithStack(err)))
}
}()
attributes := make([]share.Attribute, 0)
for rows.Next() {
var (
name string
valueType string
value any
updatedAt time.Time
createdAt time.Time
)
if err := rows.Scan(&name, &valueType, &value, &createdAt, &updatedAt); err != nil {
return nil, errors.WithStack(err)
}
attr := share.NewBaseAttribute(
name,
share.ValueType(valueType),
value,
)
attr.SetCreatedAt(createdAt)
attr.SetUpdatedAt(updatedAt)
attributes = append(attributes, attr)
}
if err := rows.Err(); err != nil {
return nil, errors.WithStack(err)
}
if len(attributes) == 0 {
return nil, errors.WithStack(share.ErrNotFound)
}
resource := share.NewBaseResource(origin, resourceID, attributes...)
return resource, nil
}
func (s *ShareStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) error {
var db *sql.DB
db, err := s.getDB(ctx)
if err != nil {
return errors.WithStack(err)
}
if err := WithTx(ctx, db, fn); err != nil {
return errors.WithStack(err)
}
return nil
}
func ensureShareTables(ctx context.Context, db *sql.DB) error {
err := WithTx(ctx, db, func(tx *sql.Tx) error {
query := `
CREATE TABLE IF NOT EXISTS resources (
resource_id TEXT NOT NULL,
origin TEXT NOT NULL,
name TEXT NOT NULL,
type TEXT NOT NULL,
value TEXT,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL,
UNIQUE(origin, resource_id, name) ON CONFLICT REPLACE
);
`
if _, err := tx.ExecContext(ctx, query); err != nil {
return errors.WithStack(err)
}
query = `
CREATE INDEX IF NOT EXISTS resource_idx ON resources (origin, resource_id, name);
`
if _, err := tx.ExecContext(ctx, query); err != nil {
return errors.WithStack(err)
}
return nil
})
if err != nil {
return errors.WithStack(err)
}
return nil
}
func NewShareStore(path string) *ShareStore {
getDB := NewGetDBFunc(path, ensureShareTables)
return &ShareStore{
getDB: getDB,
}
}
func NewShareStoreWithDB(db *sql.DB) *ShareStore {
getDB := NewGetDBFuncFromDB(db, ensureShareTables)
return &ShareStore{
getDB: getDB,
}
}
var _ share.Store = &ShareStore{}

View File

@ -0,0 +1,33 @@
package sqlite
import (
"fmt"
"os"
"strings"
"testing"
"time"
"forge.cadoles.com/arcad/edge/pkg/storage/share"
"forge.cadoles.com/arcad/edge/pkg/storage/share/testsuite"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
func TestRepository(t *testing.T) {
logger.SetLevel(logger.LevelDebug)
testsuite.TestStore(t, newTestStore)
}
func newTestStore(testName string) (share.Store, error) {
filename := strings.ToLower(strings.ReplaceAll(testName, " ", "_"))
file := fmt.Sprintf("./testdata/%s.sqlite", filename)
if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) {
return nil, errors.WithStack(err)
}
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
store := NewShareStore(dsn)
return store, nil
}

View File

@ -0,0 +1,131 @@
package sqlite
import (
"context"
"database/sql"
"sync"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
"modernc.org/sqlite"
_ "modernc.org/sqlite"
sqlite3 "modernc.org/sqlite/lib"
)
func Open(path string) (*sql.DB, error) {
db, err := sql.Open("sqlite", path)
if err != nil {
return nil, errors.Wrapf(err, "could not open database with path '%s'", path)
}
return db, nil
}
func WithTx(ctx context.Context, db *sql.DB, fn func(tx *sql.Tx) error) error {
var tx *sql.Tx
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return errors.WithStack(err)
}
defer func() {
if err := tx.Rollback(); err != nil {
if errors.Is(err, sql.ErrTxDone) {
return
}
panic(errors.WithStack(err))
}
}()
for {
if err = fn(tx); err != nil {
var sqlErr *sqlite.Error
if errors.As(err, &sqlErr) {
if sqlErr.Code() == sqlite3.SQLITE_BUSY {
logger.Warn(ctx, "database busy, retrying transaction")
if err := ctx.Err(); err != nil {
logger.Error(ctx, "could not execute transaction", logger.E(errors.WithStack(err)))
return errors.WithStack(err)
}
continue
}
}
return errors.WithStack(err)
}
break
}
if err = tx.Commit(); err != nil {
return errors.WithStack(err)
}
return nil
}
type GetDBFunc func(ctx context.Context) (*sql.DB, error)
func NewGetDBFunc(dsn string, initFunc func(ctx context.Context, db *sql.DB) error) GetDBFunc {
var (
db *sql.DB
mutex sync.RWMutex
)
return func(ctx context.Context) (*sql.DB, error) {
mutex.RLock()
if db != nil {
defer mutex.RUnlock()
return db, nil
}
mutex.RUnlock()
mutex.Lock()
defer mutex.Unlock()
logger.Debug(ctx, "opening database", logger.F("dsn", dsn))
newDB, err := sql.Open("sqlite", dsn)
if err != nil {
return nil, errors.WithStack(err)
}
logger.Debug(ctx, "initializing database")
if err = initFunc(ctx, newDB); err != nil {
return nil, errors.WithStack(err)
}
db = newDB
return db, nil
}
}
func NewGetDBFuncFromDB(db *sql.DB, initFunc func(ctx context.Context, db *sql.DB) error) GetDBFunc {
var err error
initOnce := &sync.Once{}
return func(ctx context.Context) (*sql.DB, error) {
initOnce.Do(func() {
logger.Debug(ctx, "initializing database")
err = initFunc(ctx, db)
})
if err != nil {
return nil, errors.WithStack(err)
}
return db, nil
}
}

View File

@ -0,0 +1 @@
/*.sqlite*