diff --git a/pkg/storage/document_store.go b/pkg/storage/document_store.go index 8182bfb..5255e1d 100644 --- a/pkg/storage/document_store.go +++ b/pkg/storage/document_store.go @@ -9,12 +9,16 @@ import ( "github.com/oklog/ulid/v2" ) -var ErrDocumentNotFound = errors.New("document not found") +var ( + ErrDocumentNotFound = errors.New("document not found") + ErrDocumentRevisionConflict = errors.New("document revision conflict") +) type DocumentID string const ( DocumentAttrID = "_id" + DocumentAttrRevision = "_revision" DocumentAttrCreatedAt = "_createdAt" DocumentAttrUpdatedAt = "_updatedAt" ) @@ -44,6 +48,20 @@ func (d Document) ID() (DocumentID, bool) { return "", false } +func (d Document) Revision() (int, bool) { + rawRevision, exists := d[DocumentAttrRevision] + if !exists { + return 0, false + } + + revision, ok := rawRevision.(int) + if ok { + return revision, true + } + + return 0, false +} + func (d Document) CreatedAt() (time.Time, bool) { return d.timeAttr(DocumentAttrCreatedAt) } diff --git a/pkg/storage/driver/sqlite/blob_store_test.go b/pkg/storage/driver/sqlite/blob_store_test.go index f3812f4..34db917 100644 --- a/pkg/storage/driver/sqlite/blob_store_test.go +++ b/pkg/storage/driver/sqlite/blob_store_test.go @@ -24,7 +24,7 @@ func TestBlobStore(t *testing.T) { t.Fatalf("%+v", errors.WithStack(err)) } - dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds()) + dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d&_pragma=journal_mode=wal", file, (60 * time.Second).Milliseconds()) store := NewBlobStore(dsn) testsuite.TestBlobStore(context.Background(), t, store) diff --git a/pkg/storage/driver/sqlite/document_store.go b/pkg/storage/driver/sqlite/document_store.go index 4364229..763f84f 100644 --- a/pkg/storage/driver/sqlite/document_store.go +++ b/pkg/storage/driver/sqlite/document_store.go @@ -13,9 +13,14 @@ import ( "github.com/pkg/errors" "gitlab.com/wpetit/goweb/logger" + _ "embed" + _ "modernc.org/sqlite" ) +//go:embed document_store.sql +var documentStoreSchema string + type DocumentStore struct { getDB GetDBFunc } @@ -48,7 +53,7 @@ func (s *DocumentStore) Get(ctx context.Context, collection string, id storage.D err := s.withTx(ctx, func(tx *sql.Tx) error { query := ` - SELECT id, data, created_at, updated_at + SELECT id, revision, data, created_at, updated_at FROM documents WHERE collection = $1 AND id = $2 ` @@ -59,9 +64,10 @@ func (s *DocumentStore) Get(ctx context.Context, collection string, id storage.D createdAt time.Time updatedAt time.Time data JSONMap + revision int ) - err := row.Scan(&id, &data, &createdAt, &updatedAt) + err := row.Scan(&id, &revision, &data, &createdAt, &updatedAt) if err != nil { if errors.Is(err, sql.ErrNoRows) { return errors.WithStack(storage.ErrDocumentNotFound) @@ -77,6 +83,7 @@ func (s *DocumentStore) Get(ctx context.Context, collection string, id storage.D document = storage.Document(data) document[storage.DocumentAttrID] = id + document[storage.DocumentAttrRevision] = revision document[storage.DocumentAttrCreatedAt] = createdAt document[storage.DocumentAttrUpdatedAt] = updatedAt @@ -119,7 +126,7 @@ func (s *DocumentStore) Query(ctx context.Context, collection string, filter *fi } query := ` - SELECT id, data, created_at, updated_at + SELECT id, revision, data, created_at, updated_at FROM documents WHERE collection = $1 AND (` + criteria + `) ` @@ -171,17 +178,19 @@ func (s *DocumentStore) Query(ctx context.Context, collection string, filter *fi for rows.Next() { var ( id storage.DocumentID + revision int createdAt time.Time updatedAt time.Time data JSONMap ) - if err := rows.Scan(&id, &data, &createdAt, &updatedAt); err != nil { + if err := rows.Scan(&id, &revision, &data, &createdAt, &updatedAt); err != nil { return errors.WithStack(err) } document := storage.Document(data) document[storage.DocumentAttrID] = id + document[storage.DocumentAttrRevision] = revision document[storage.DocumentAttrCreatedAt] = createdAt document[storage.DocumentAttrUpdatedAt] = updatedAt @@ -206,22 +215,16 @@ func (s *DocumentStore) Upsert(ctx context.Context, collection string, document var upsertedDocument storage.Document err := s.withTx(ctx, func(tx *sql.Tx) error { - query := ` - INSERT INTO documents (id, collection, data, created_at, updated_at) - VALUES($1, $2, $3, $4, $4) - ON CONFLICT (id, collection) DO UPDATE SET - data = $3, updated_at = $4 - RETURNING "id", "data", "created_at", "updated_at" - ` - - now := time.Now().UTC() - id, exists := document.ID() if !exists || id == "" { id = storage.NewDocumentID() } - args := []any{id, collection, JSONMap(document), now, now} + query := ` + SELECT revision FROM documents WHERE id = $1 + ` + + args := []any{id} logger.Debug( ctx, "executing query", @@ -231,14 +234,44 @@ func (s *DocumentStore) Upsert(ctx context.Context, collection string, document row := tx.QueryRowContext(ctx, query, args...) + var revision int + + if err := row.Scan(&revision); err != nil && !errors.Is(err, sql.ErrNoRows) { + return errors.WithStack(err) + } + + docRevision, _ := document.Revision() + if revision != docRevision { + return errors.Wrapf(storage.ErrDocumentRevisionConflict, "document revision '%d' does not match current '%d'", docRevision, revision) + } + + query = ` + INSERT INTO documents (id, collection, data, created_at, updated_at) + VALUES($1, $2, $3, $4, $4) + ON CONFLICT (id, collection) DO UPDATE SET + data = $3, updated_at = $4, revision = revision + 1 + RETURNING "id", "revision", "data", "created_at", "updated_at" + ` + + now := time.Now().UTC() + + args = []any{id, collection, JSONMap(document), now, now} + + logger.Debug( + ctx, "executing query", + logger.F("query", query), + logger.F("args", args), + ) + + row = tx.QueryRowContext(ctx, query, args...) + var ( createdAt time.Time updatedAt time.Time data JSONMap ) - err := row.Scan(&id, &data, &createdAt, &updatedAt) - if err != nil { + if err := row.Scan(&id, &revision, &data, &createdAt, &updatedAt); err != nil { return errors.WithStack(err) } @@ -249,6 +282,7 @@ func (s *DocumentStore) Upsert(ctx context.Context, collection string, document upsertedDocument = storage.Document(data) upsertedDocument[storage.DocumentAttrID] = id + upsertedDocument[storage.DocumentAttrRevision] = revision upsertedDocument[storage.DocumentAttrCreatedAt] = createdAt upsertedDocument[storage.DocumentAttrUpdatedAt] = updatedAt @@ -278,27 +312,11 @@ func (s *DocumentStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) e func ensureDocumentTables(ctx context.Context, db *sql.DB) error { err := WithTx(ctx, db, func(tx *sql.Tx) error { - query := ` - CREATE TABLE IF NOT EXISTS documents ( - id TEXT PRIMARY KEY, - collection TEXT NOT NULL, - data TEXT, - created_at TIMESTAMP NOT NULL, - updated_at TIMESTAMP NOT NULL, - UNIQUE(id, collection) ON CONFLICT REPLACE - ); - ` - if _, err := tx.ExecContext(ctx, query); err != nil { - return errors.WithStack(err) + for _, migr := range documentStoreMigrations { + if err := migr(ctx, tx); err != nil { + return errors.WithStack(err) + } } - - query = ` - CREATE INDEX IF NOT EXISTS collection_idx ON documents (collection); - ` - if _, err := tx.ExecContext(ctx, query); err != nil { - return errors.WithStack(err) - } - return nil }) if err != nil { diff --git a/pkg/storage/driver/sqlite/document_store.sql b/pkg/storage/driver/sqlite/document_store.sql new file mode 100644 index 0000000..85761ec --- /dev/null +++ b/pkg/storage/driver/sqlite/document_store.sql @@ -0,0 +1,16 @@ +CREATE TABLE IF NOT EXISTS documents ( + id TEXT PRIMARY KEY, + collection TEXT NOT NULL, + data TEXT, + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL, + UNIQUE(id, collection) ON CONFLICT REPLACE +); + +--- + +CREATE INDEX IF NOT EXISTS collection_idx ON documents (collection); + +--- + +ALTER TABLE documents ADD COLUMN revision INTEGER DEFAULT 0; \ No newline at end of file diff --git a/pkg/storage/driver/sqlite/document_store_migrations.go b/pkg/storage/driver/sqlite/document_store_migrations.go new file mode 100644 index 0000000..0125403 --- /dev/null +++ b/pkg/storage/driver/sqlite/document_store_migrations.go @@ -0,0 +1,59 @@ +package sqlite + +import ( + "context" + "database/sql" + + "github.com/pkg/errors" +) + +type MigrateFunc func(ctx context.Context, tx *sql.Tx) error + +var documentStoreMigrations = []MigrateFunc{ + documentStoreMigrationBaseSchema, + documentStoreMigrationAddIDIndex, + documentStoreMigrationAddRevisionColumn, +} + +func documentStoreMigrationBaseSchema(ctx context.Context, tx *sql.Tx) error { + query := ` + CREATE TABLE IF NOT EXISTS documents ( + id TEXT PRIMARY KEY, + collection TEXT NOT NULL, + data TEXT, + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL, + UNIQUE(id, collection) ON CONFLICT REPLACE + ); + ` + + if _, err := tx.ExecContext(ctx, query); err != nil { + return errors.WithStack(err) + } + + return nil +} + +func documentStoreMigrationAddIDIndex(ctx context.Context, tx *sql.Tx) error { + query := ` + CREATE INDEX IF NOT EXISTS collection_idx ON documents (collection); + ` + + if _, err := tx.ExecContext(ctx, query); err != nil { + return errors.WithStack(err) + } + + return nil +} + +func documentStoreMigrationAddRevisionColumn(ctx context.Context, tx *sql.Tx) error { + query := ` + ALTER TABLE documents ADD COLUMN revision INTEGER DEFAULT 0; + ` + + if _, err := tx.ExecContext(ctx, query); err != nil { + return errors.WithStack(err) + } + + return nil +} diff --git a/pkg/storage/driver/sqlite/document_store_test.go b/pkg/storage/driver/sqlite/document_store_test.go index 1833b43..ec0d2a7 100644 --- a/pkg/storage/driver/sqlite/document_store_test.go +++ b/pkg/storage/driver/sqlite/document_store_test.go @@ -22,7 +22,7 @@ func TestDocumentStore(t *testing.T) { t.Fatalf("%+v", errors.WithStack(err)) } - dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds()) + dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d&_pragma=journal_mode=wal", file, (60 * time.Second).Milliseconds()) store := NewDocumentStore(dsn) testsuite.TestDocumentStore(context.Background(), t, store) diff --git a/pkg/storage/driver/sqlite/share_store_test.go b/pkg/storage/driver/sqlite/share_store_test.go index 4d1ff27..821d32c 100644 --- a/pkg/storage/driver/sqlite/share_store_test.go +++ b/pkg/storage/driver/sqlite/share_store_test.go @@ -26,7 +26,7 @@ func newTestStore(testName string) (share.Store, error) { return nil, errors.WithStack(err) } - dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds()) + dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d&_pragma=journal_mode=wal", file, (60 * time.Second).Milliseconds()) store := NewShareStore(dsn) return store, nil diff --git a/pkg/storage/testsuite/document_store_ops.go b/pkg/storage/testsuite/document_store_ops.go index 1de8e46..a2eafde 100644 --- a/pkg/storage/testsuite/document_store_ops.go +++ b/pkg/storage/testsuite/document_store_ops.go @@ -186,7 +186,7 @@ var documentStoreOpsTestCases = []documentStoreOpsTestCase{ } // Check that document does not have unexpected properties - if e, g := 4, len(upsertedDoc); e != g { + if e, g := 5, len(upsertedDoc); e != g { return errors.Errorf("len(upsertedDoc): expected '%v', got '%v'", e, g) } @@ -437,7 +437,6 @@ func testDocumentStoreOps(ctx context.Context, t *testing.T, store storage.Docum for _, tc := range documentStoreOpsTestCases { func(tc documentStoreOpsTestCase) { t.Run(tc.Name, func(t *testing.T) { - t.Parallel() if err := tc.Run(ctx, store); err != nil { t.Errorf("%+v", errors.WithStack(err)) }