diff --git a/pkg/storage/document_store.go b/pkg/storage/document_store.go index 8182bfb..5255e1d 100644 --- a/pkg/storage/document_store.go +++ b/pkg/storage/document_store.go @@ -9,12 +9,16 @@ import ( "github.com/oklog/ulid/v2" ) -var ErrDocumentNotFound = errors.New("document not found") +var ( + ErrDocumentNotFound = errors.New("document not found") + ErrDocumentRevisionConflict = errors.New("document revision conflict") +) type DocumentID string const ( DocumentAttrID = "_id" + DocumentAttrRevision = "_revision" DocumentAttrCreatedAt = "_createdAt" DocumentAttrUpdatedAt = "_updatedAt" ) @@ -44,6 +48,20 @@ func (d Document) ID() (DocumentID, bool) { return "", false } +func (d Document) Revision() (int, bool) { + rawRevision, exists := d[DocumentAttrRevision] + if !exists { + return 0, false + } + + revision, ok := rawRevision.(int) + if ok { + return revision, true + } + + return 0, false +} + func (d Document) CreatedAt() (time.Time, bool) { return d.timeAttr(DocumentAttrCreatedAt) } diff --git a/pkg/storage/driver/sqlite/blob_store_test.go b/pkg/storage/driver/sqlite/blob_store_test.go index f3812f4..34db917 100644 --- a/pkg/storage/driver/sqlite/blob_store_test.go +++ b/pkg/storage/driver/sqlite/blob_store_test.go @@ -24,7 +24,7 @@ func TestBlobStore(t *testing.T) { t.Fatalf("%+v", errors.WithStack(err)) } - dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds()) + dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d&_pragma=journal_mode=wal", file, (60 * time.Second).Milliseconds()) store := NewBlobStore(dsn) testsuite.TestBlobStore(context.Background(), t, store) diff --git a/pkg/storage/driver/sqlite/document_store.go b/pkg/storage/driver/sqlite/document_store.go index 4364229..3f51808 100644 --- a/pkg/storage/driver/sqlite/document_store.go +++ b/pkg/storage/driver/sqlite/document_store.go @@ -13,9 +13,14 @@ import ( "github.com/pkg/errors" "gitlab.com/wpetit/goweb/logger" + _ "embed" + _ "modernc.org/sqlite" ) +//go:embed document_store.sql +var documentStoreSchema string + type DocumentStore struct { getDB GetDBFunc } @@ -48,7 +53,7 @@ func (s *DocumentStore) Get(ctx context.Context, collection string, id storage.D err := s.withTx(ctx, func(tx *sql.Tx) error { query := ` - SELECT id, data, created_at, updated_at + SELECT id, revision, data, created_at, updated_at FROM documents WHERE collection = $1 AND id = $2 ` @@ -59,9 +64,10 @@ func (s *DocumentStore) Get(ctx context.Context, collection string, id storage.D createdAt time.Time updatedAt time.Time data JSONMap + revision int ) - err := row.Scan(&id, &data, &createdAt, &updatedAt) + err := row.Scan(&id, &revision, &data, &createdAt, &updatedAt) if err != nil { if errors.Is(err, sql.ErrNoRows) { return errors.WithStack(storage.ErrDocumentNotFound) @@ -77,6 +83,7 @@ func (s *DocumentStore) Get(ctx context.Context, collection string, id storage.D document = storage.Document(data) document[storage.DocumentAttrID] = id + document[storage.DocumentAttrRevision] = revision document[storage.DocumentAttrCreatedAt] = createdAt document[storage.DocumentAttrUpdatedAt] = updatedAt @@ -119,7 +126,7 @@ func (s *DocumentStore) Query(ctx context.Context, collection string, filter *fi } query := ` - SELECT id, data, created_at, updated_at + SELECT id, revision, data, created_at, updated_at FROM documents WHERE collection = $1 AND (` + criteria + `) ` @@ -171,17 +178,19 @@ func (s *DocumentStore) Query(ctx context.Context, collection string, filter *fi for rows.Next() { var ( id storage.DocumentID + revision int createdAt time.Time updatedAt time.Time data JSONMap ) - if err := rows.Scan(&id, &data, &createdAt, &updatedAt); err != nil { + if err := rows.Scan(&id, &revision, &data, &createdAt, &updatedAt); err != nil { return errors.WithStack(err) } document := storage.Document(data) document[storage.DocumentAttrID] = id + document[storage.DocumentAttrRevision] = revision document[storage.DocumentAttrCreatedAt] = createdAt document[storage.DocumentAttrUpdatedAt] = updatedAt @@ -206,22 +215,16 @@ func (s *DocumentStore) Upsert(ctx context.Context, collection string, document var upsertedDocument storage.Document err := s.withTx(ctx, func(tx *sql.Tx) error { - query := ` - INSERT INTO documents (id, collection, data, created_at, updated_at) - VALUES($1, $2, $3, $4, $4) - ON CONFLICT (id, collection) DO UPDATE SET - data = $3, updated_at = $4 - RETURNING "id", "data", "created_at", "updated_at" - ` - - now := time.Now().UTC() - id, exists := document.ID() if !exists || id == "" { id = storage.NewDocumentID() } - args := []any{id, collection, JSONMap(document), now, now} + query := ` + SELECT revision FROM documents WHERE id = $1 + ` + + args := []any{id} logger.Debug( ctx, "executing query", @@ -231,14 +234,44 @@ func (s *DocumentStore) Upsert(ctx context.Context, collection string, document row := tx.QueryRowContext(ctx, query, args...) + var storedRevision int + + if err := row.Scan(&storedRevision); err != nil && !errors.Is(err, sql.ErrNoRows) { + return errors.WithStack(err) + } + + revision, found := document.Revision() + if found && storedRevision != revision { + return errors.Wrapf(storage.ErrDocumentRevisionConflict, "document revision '%d' does not match stored '%d'", revision, storedRevision) + } + + query = ` + INSERT INTO documents (id, collection, data, created_at, updated_at) + VALUES($1, $2, $3, $4, $4) + ON CONFLICT (id, collection) DO UPDATE SET + data = $3, updated_at = $4, revision = revision + 1 + RETURNING "id", "revision", "data", "created_at", "updated_at" + ` + + now := time.Now().UTC() + + args = []any{id, collection, JSONMap(document), now, now} + + logger.Debug( + ctx, "executing query", + logger.F("query", query), + logger.F("args", args), + ) + + row = tx.QueryRowContext(ctx, query, args...) + var ( createdAt time.Time updatedAt time.Time data JSONMap ) - err := row.Scan(&id, &data, &createdAt, &updatedAt) - if err != nil { + if err := row.Scan(&id, &revision, &data, &createdAt, &updatedAt); err != nil { return errors.WithStack(err) } @@ -249,6 +282,7 @@ func (s *DocumentStore) Upsert(ctx context.Context, collection string, document upsertedDocument = storage.Document(data) upsertedDocument[storage.DocumentAttrID] = id + upsertedDocument[storage.DocumentAttrRevision] = revision upsertedDocument[storage.DocumentAttrCreatedAt] = createdAt upsertedDocument[storage.DocumentAttrUpdatedAt] = updatedAt @@ -276,29 +310,13 @@ func (s *DocumentStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) e return nil } -func ensureDocumentTables(ctx context.Context, db *sql.DB) error { +func migrateSchema(ctx context.Context, db *sql.DB) error { err := WithTx(ctx, db, func(tx *sql.Tx) error { - query := ` - CREATE TABLE IF NOT EXISTS documents ( - id TEXT PRIMARY KEY, - collection TEXT NOT NULL, - data TEXT, - created_at TIMESTAMP NOT NULL, - updated_at TIMESTAMP NOT NULL, - UNIQUE(id, collection) ON CONFLICT REPLACE - ); - ` - if _, err := tx.ExecContext(ctx, query); err != nil { - return errors.WithStack(err) + for _, migr := range documentStoreMigrations { + if err := migr(ctx, tx); err != nil { + return errors.WithStack(err) + } } - - query = ` - CREATE INDEX IF NOT EXISTS collection_idx ON documents (collection); - ` - if _, err := tx.ExecContext(ctx, query); err != nil { - return errors.WithStack(err) - } - return nil }) if err != nil { @@ -344,7 +362,7 @@ func withLimitOffsetClause(query string, args []any, limit int, offset int) (str } func NewDocumentStore(path string) *DocumentStore { - getDB := NewGetDBFunc(path, ensureDocumentTables) + getDB := NewGetDBFunc(path, migrateSchema) return &DocumentStore{ getDB: getDB, @@ -352,7 +370,7 @@ func NewDocumentStore(path string) *DocumentStore { } func NewDocumentStoreWithDB(db *sql.DB) *DocumentStore { - getDB := NewGetDBFuncFromDB(db, ensureDocumentTables) + getDB := NewGetDBFuncFromDB(db, migrateSchema) return &DocumentStore{ getDB: getDB, diff --git a/pkg/storage/driver/sqlite/document_store.sql b/pkg/storage/driver/sqlite/document_store.sql new file mode 100644 index 0000000..85761ec --- /dev/null +++ b/pkg/storage/driver/sqlite/document_store.sql @@ -0,0 +1,16 @@ +CREATE TABLE IF NOT EXISTS documents ( + id TEXT PRIMARY KEY, + collection TEXT NOT NULL, + data TEXT, + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL, + UNIQUE(id, collection) ON CONFLICT REPLACE +); + +--- + +CREATE INDEX IF NOT EXISTS collection_idx ON documents (collection); + +--- + +ALTER TABLE documents ADD COLUMN revision INTEGER DEFAULT 0; \ No newline at end of file diff --git a/pkg/storage/driver/sqlite/document_store_migrations.go b/pkg/storage/driver/sqlite/document_store_migrations.go new file mode 100644 index 0000000..4a79148 --- /dev/null +++ b/pkg/storage/driver/sqlite/document_store_migrations.go @@ -0,0 +1,98 @@ +package sqlite + +import ( + "context" + "database/sql" + + "github.com/pkg/errors" + "gitlab.com/wpetit/goweb/logger" +) + +type MigrateFunc func(ctx context.Context, tx *sql.Tx) error + +var documentStoreMigrations = []MigrateFunc{ + documentStoreMigrationBaseSchema, + documentStoreMigrationAddIDIndex, + documentStoreMigrationAddRevisionColumn, +} + +func documentStoreMigrationBaseSchema(ctx context.Context, tx *sql.Tx) error { + query := ` + CREATE TABLE IF NOT EXISTS documents ( + id TEXT PRIMARY KEY, + collection TEXT NOT NULL, + data TEXT, + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL, + UNIQUE(id, collection) ON CONFLICT REPLACE + ); + ` + + if _, err := tx.ExecContext(ctx, query); err != nil { + return errors.WithStack(err) + } + + return nil +} + +func documentStoreMigrationAddIDIndex(ctx context.Context, tx *sql.Tx) error { + query := ` + CREATE INDEX IF NOT EXISTS collection_idx ON documents (collection); + ` + + if _, err := tx.ExecContext(ctx, query); err != nil { + return errors.WithStack(err) + } + + return nil +} + +func documentStoreMigrationAddRevisionColumn(ctx context.Context, tx *sql.Tx) error { + query := `PRAGMA table_info(documents)` + + rows, err := tx.QueryContext(ctx, query) + if err != nil { + return errors.WithStack(err) + } + + defer func() { + if err := rows.Close(); err != nil { + logger.Error(ctx, "could not close rows", logger.CapturedE(errors.WithStack(err))) + } + }() + + var hasRevisionColumn bool + + for rows.Next() { + var ( + id int + name string + dataType string + nullable int + defaultValue any + primaryKey int + ) + + if err := rows.Scan(&id, &name, &dataType, &nullable, &defaultValue, &primaryKey); err != nil { + return errors.WithStack(err) + } + + if name == "revision" { + hasRevisionColumn = true + break + } + } + + if err := rows.Err(); err != nil { + return errors.WithStack(err) + } + + if !hasRevisionColumn { + query = `ALTER TABLE documents ADD COLUMN revision INTEGER DEFAULT 0` + if _, err := tx.ExecContext(ctx, query); err != nil { + return errors.WithStack(err) + } + } + + return nil +} diff --git a/pkg/storage/driver/sqlite/document_store_test.go b/pkg/storage/driver/sqlite/document_store_test.go index 1833b43..ec0d2a7 100644 --- a/pkg/storage/driver/sqlite/document_store_test.go +++ b/pkg/storage/driver/sqlite/document_store_test.go @@ -22,7 +22,7 @@ func TestDocumentStore(t *testing.T) { t.Fatalf("%+v", errors.WithStack(err)) } - dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds()) + dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d&_pragma=journal_mode=wal", file, (60 * time.Second).Milliseconds()) store := NewDocumentStore(dsn) testsuite.TestDocumentStore(context.Background(), t, store) diff --git a/pkg/storage/driver/sqlite/share_store_test.go b/pkg/storage/driver/sqlite/share_store_test.go index 4d1ff27..821d32c 100644 --- a/pkg/storage/driver/sqlite/share_store_test.go +++ b/pkg/storage/driver/sqlite/share_store_test.go @@ -26,7 +26,7 @@ func newTestStore(testName string) (share.Store, error) { return nil, errors.WithStack(err) } - dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds()) + dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d&_pragma=journal_mode=wal", file, (60 * time.Second).Milliseconds()) store := NewShareStore(dsn) return store, nil diff --git a/pkg/storage/testsuite/document_store_ops.go b/pkg/storage/testsuite/document_store_ops.go index 1de8e46..51d8aa8 100644 --- a/pkg/storage/testsuite/document_store_ops.go +++ b/pkg/storage/testsuite/document_store_ops.go @@ -185,8 +185,13 @@ var documentStoreOpsTestCases = []documentStoreOpsTestCase{ return errors.Errorf("upsertedDoc[\"attr1\"]: expected '%v', got '%v'", e, g) } + upsertedDocRevision, _ := upsertedDoc.Revision() + if e, g := 0, upsertedDocRevision; e != g { + return errors.Errorf("upsertedDoc.Revision(): expected '%v', got '%v'", e, g) + } + // Check that document does not have unexpected properties - if e, g := 4, len(upsertedDoc); e != g { + if e, g := 5, len(upsertedDoc); e != g { return errors.Errorf("len(upsertedDoc): expected '%v', got '%v'", e, g) } @@ -217,6 +222,11 @@ var documentStoreOpsTestCases = []documentStoreOpsTestCase{ return errors.New("upsertedDoc2.UpdatedAt() should have been different than upsertedDoc.UpdatedAt()") } + upsertedDoc2Revision, _ := upsertedDoc2.Revision() + if e, g := 1, upsertedDoc2Revision; e != g { + return errors.Errorf("upsertedDoc.Revision(): expected '%v', got '%v'", e, g) + } + // Verify that there is no additional created document in the collection results, err := store.Query(ctx, collection, nil) @@ -228,6 +238,11 @@ var documentStoreOpsTestCases = []documentStoreOpsTestCase{ return errors.Errorf("len(results): expected '%v', got '%v'", e, g) } + firstResultRevision, _ := results[0].Revision() + if e, g := 1, firstResultRevision; e != g { + return errors.Errorf("results[0].Revision(): expected '%v', got '%v'", e, g) + } + return nil }, }, @@ -437,7 +452,6 @@ func testDocumentStoreOps(ctx context.Context, t *testing.T, store storage.Docum for _, tc := range documentStoreOpsTestCases { func(tc documentStoreOpsTestCase) { t.Run(tc.Name, func(t *testing.T) { - t.Parallel() if err := tc.Run(ctx, store); err != nil { t.Errorf("%+v", errors.WithStack(err)) }