feat(storage): implements 'offline' capable document store driver
arcad/edge/pipeline/head This commit looks good Details

This commit is contained in:
wpetit 2024-01-11 19:30:30 +01:00
parent a268759d33
commit 8a0f283333
8 changed files with 153 additions and 43 deletions

View File

@ -9,12 +9,16 @@ import (
"github.com/oklog/ulid/v2"
)
var ErrDocumentNotFound = errors.New("document not found")
var (
ErrDocumentNotFound = errors.New("document not found")
ErrDocumentRevisionConflict = errors.New("document revision conflict")
)
type DocumentID string
const (
DocumentAttrID = "_id"
DocumentAttrRevision = "_revision"
DocumentAttrCreatedAt = "_createdAt"
DocumentAttrUpdatedAt = "_updatedAt"
)
@ -44,6 +48,20 @@ func (d Document) ID() (DocumentID, bool) {
return "", false
}
func (d Document) Revision() (int, bool) {
rawRevision, exists := d[DocumentAttrRevision]
if !exists {
return 0, false
}
revision, ok := rawRevision.(int)
if ok {
return revision, true
}
return 0, false
}
func (d Document) CreatedAt() (time.Time, bool) {
return d.timeAttr(DocumentAttrCreatedAt)
}

View File

@ -24,7 +24,7 @@ func TestBlobStore(t *testing.T) {
t.Fatalf("%+v", errors.WithStack(err))
}
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d&_pragma=journal_mode=wal", file, (60 * time.Second).Milliseconds())
store := NewBlobStore(dsn)
testsuite.TestBlobStore(context.Background(), t, store)

View File

@ -13,9 +13,14 @@ import (
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
_ "embed"
_ "modernc.org/sqlite"
)
//go:embed document_store.sql
var documentStoreSchema string
type DocumentStore struct {
getDB GetDBFunc
}
@ -48,7 +53,7 @@ func (s *DocumentStore) Get(ctx context.Context, collection string, id storage.D
err := s.withTx(ctx, func(tx *sql.Tx) error {
query := `
SELECT id, data, created_at, updated_at
SELECT id, revision, data, created_at, updated_at
FROM documents
WHERE collection = $1 AND id = $2
`
@ -59,9 +64,10 @@ func (s *DocumentStore) Get(ctx context.Context, collection string, id storage.D
createdAt time.Time
updatedAt time.Time
data JSONMap
revision int
)
err := row.Scan(&id, &data, &createdAt, &updatedAt)
err := row.Scan(&id, &revision, &data, &createdAt, &updatedAt)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return errors.WithStack(storage.ErrDocumentNotFound)
@ -77,6 +83,7 @@ func (s *DocumentStore) Get(ctx context.Context, collection string, id storage.D
document = storage.Document(data)
document[storage.DocumentAttrID] = id
document[storage.DocumentAttrRevision] = revision
document[storage.DocumentAttrCreatedAt] = createdAt
document[storage.DocumentAttrUpdatedAt] = updatedAt
@ -119,7 +126,7 @@ func (s *DocumentStore) Query(ctx context.Context, collection string, filter *fi
}
query := `
SELECT id, data, created_at, updated_at
SELECT id, revision, data, created_at, updated_at
FROM documents
WHERE collection = $1 AND (` + criteria + `)
`
@ -171,17 +178,19 @@ func (s *DocumentStore) Query(ctx context.Context, collection string, filter *fi
for rows.Next() {
var (
id storage.DocumentID
revision int
createdAt time.Time
updatedAt time.Time
data JSONMap
)
if err := rows.Scan(&id, &data, &createdAt, &updatedAt); err != nil {
if err := rows.Scan(&id, &revision, &data, &createdAt, &updatedAt); err != nil {
return errors.WithStack(err)
}
document := storage.Document(data)
document[storage.DocumentAttrID] = id
document[storage.DocumentAttrRevision] = revision
document[storage.DocumentAttrCreatedAt] = createdAt
document[storage.DocumentAttrUpdatedAt] = updatedAt
@ -206,22 +215,16 @@ func (s *DocumentStore) Upsert(ctx context.Context, collection string, document
var upsertedDocument storage.Document
err := s.withTx(ctx, func(tx *sql.Tx) error {
query := `
INSERT INTO documents (id, collection, data, created_at, updated_at)
VALUES($1, $2, $3, $4, $4)
ON CONFLICT (id, collection) DO UPDATE SET
data = $3, updated_at = $4
RETURNING "id", "data", "created_at", "updated_at"
`
now := time.Now().UTC()
id, exists := document.ID()
if !exists || id == "" {
id = storage.NewDocumentID()
}
args := []any{id, collection, JSONMap(document), now, now}
query := `
SELECT revision FROM documents WHERE id = $1
`
args := []any{id}
logger.Debug(
ctx, "executing query",
@ -231,14 +234,44 @@ func (s *DocumentStore) Upsert(ctx context.Context, collection string, document
row := tx.QueryRowContext(ctx, query, args...)
var revision int
if err := row.Scan(&revision); err != nil && !errors.Is(err, sql.ErrNoRows) {
return errors.WithStack(err)
}
docRevision, _ := document.Revision()
if revision != docRevision {
return errors.Wrapf(storage.ErrDocumentRevisionConflict, "document revision '%d' does not match current '%d'", docRevision, revision)
}
query = `
INSERT INTO documents (id, collection, data, created_at, updated_at)
VALUES($1, $2, $3, $4, $4)
ON CONFLICT (id, collection) DO UPDATE SET
data = $3, updated_at = $4, revision = revision + 1
RETURNING "id", "revision", "data", "created_at", "updated_at"
`
now := time.Now().UTC()
args = []any{id, collection, JSONMap(document), now, now}
logger.Debug(
ctx, "executing query",
logger.F("query", query),
logger.F("args", args),
)
row = tx.QueryRowContext(ctx, query, args...)
var (
createdAt time.Time
updatedAt time.Time
data JSONMap
)
err := row.Scan(&id, &data, &createdAt, &updatedAt)
if err != nil {
if err := row.Scan(&id, &revision, &data, &createdAt, &updatedAt); err != nil {
return errors.WithStack(err)
}
@ -249,6 +282,7 @@ func (s *DocumentStore) Upsert(ctx context.Context, collection string, document
upsertedDocument = storage.Document(data)
upsertedDocument[storage.DocumentAttrID] = id
upsertedDocument[storage.DocumentAttrRevision] = revision
upsertedDocument[storage.DocumentAttrCreatedAt] = createdAt
upsertedDocument[storage.DocumentAttrUpdatedAt] = updatedAt
@ -278,27 +312,11 @@ func (s *DocumentStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) e
func ensureDocumentTables(ctx context.Context, db *sql.DB) error {
err := WithTx(ctx, db, func(tx *sql.Tx) error {
query := `
CREATE TABLE IF NOT EXISTS documents (
id TEXT PRIMARY KEY,
collection TEXT NOT NULL,
data TEXT,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL,
UNIQUE(id, collection) ON CONFLICT REPLACE
);
`
if _, err := tx.ExecContext(ctx, query); err != nil {
for _, migr := range documentStoreMigrations {
if err := migr(ctx, tx); err != nil {
return errors.WithStack(err)
}
query = `
CREATE INDEX IF NOT EXISTS collection_idx ON documents (collection);
`
if _, err := tx.ExecContext(ctx, query); err != nil {
return errors.WithStack(err)
}
return nil
})
if err != nil {

View File

@ -0,0 +1,16 @@
CREATE TABLE IF NOT EXISTS documents (
id TEXT PRIMARY KEY,
collection TEXT NOT NULL,
data TEXT,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL,
UNIQUE(id, collection) ON CONFLICT REPLACE
);
---
CREATE INDEX IF NOT EXISTS collection_idx ON documents (collection);
---
ALTER TABLE documents ADD COLUMN revision INTEGER DEFAULT 0;

View File

@ -0,0 +1,59 @@
package sqlite
import (
"context"
"database/sql"
"github.com/pkg/errors"
)
type MigrateFunc func(ctx context.Context, tx *sql.Tx) error
var documentStoreMigrations = []MigrateFunc{
documentStoreMigrationBaseSchema,
documentStoreMigrationAddIDIndex,
documentStoreMigrationAddRevisionColumn,
}
func documentStoreMigrationBaseSchema(ctx context.Context, tx *sql.Tx) error {
query := `
CREATE TABLE IF NOT EXISTS documents (
id TEXT PRIMARY KEY,
collection TEXT NOT NULL,
data TEXT,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL,
UNIQUE(id, collection) ON CONFLICT REPLACE
);
`
if _, err := tx.ExecContext(ctx, query); err != nil {
return errors.WithStack(err)
}
return nil
}
func documentStoreMigrationAddIDIndex(ctx context.Context, tx *sql.Tx) error {
query := `
CREATE INDEX IF NOT EXISTS collection_idx ON documents (collection);
`
if _, err := tx.ExecContext(ctx, query); err != nil {
return errors.WithStack(err)
}
return nil
}
func documentStoreMigrationAddRevisionColumn(ctx context.Context, tx *sql.Tx) error {
query := `
ALTER TABLE documents ADD COLUMN revision INTEGER DEFAULT 0;
`
if _, err := tx.ExecContext(ctx, query); err != nil {
return errors.WithStack(err)
}
return nil
}

View File

@ -22,7 +22,7 @@ func TestDocumentStore(t *testing.T) {
t.Fatalf("%+v", errors.WithStack(err))
}
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d&_pragma=journal_mode=wal", file, (60 * time.Second).Milliseconds())
store := NewDocumentStore(dsn)
testsuite.TestDocumentStore(context.Background(), t, store)

View File

@ -26,7 +26,7 @@ func newTestStore(testName string) (share.Store, error) {
return nil, errors.WithStack(err)
}
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d&_pragma=journal_mode=wal", file, (60 * time.Second).Milliseconds())
store := NewShareStore(dsn)
return store, nil

View File

@ -186,7 +186,7 @@ var documentStoreOpsTestCases = []documentStoreOpsTestCase{
}
// Check that document does not have unexpected properties
if e, g := 4, len(upsertedDoc); e != g {
if e, g := 5, len(upsertedDoc); e != g {
return errors.Errorf("len(upsertedDoc): expected '%v', got '%v'", e, g)
}
@ -437,7 +437,6 @@ func testDocumentStoreOps(ctx context.Context, t *testing.T, store storage.Docum
for _, tc := range documentStoreOpsTestCases {
func(tc documentStoreOpsTestCase) {
t.Run(tc.Name, func(t *testing.T) {
t.Parallel()
if err := tc.Run(ctx, store); err != nil {
t.Errorf("%+v", errors.WithStack(err))
}