fix(storage,document): sequential upserts
This commit is contained in:
parent
a13dfffd5c
commit
c721d46218
@ -31,12 +31,17 @@ func (d Document) ID() (DocumentID, bool) {
|
||||
return "", false
|
||||
}
|
||||
|
||||
id, ok := rawID.(string)
|
||||
strID, ok := rawID.(string)
|
||||
if ok {
|
||||
return "", false
|
||||
return DocumentID(strID), true
|
||||
}
|
||||
|
||||
return DocumentID(id), true
|
||||
docID, ok := rawID.(DocumentID)
|
||||
if ok {
|
||||
return docID, true
|
||||
}
|
||||
|
||||
return "", false
|
||||
}
|
||||
|
||||
func (d Document) CreatedAt() (time.Time, bool) {
|
||||
@ -54,7 +59,7 @@ func (d Document) timeAttr(attr string) (time.Time, bool) {
|
||||
}
|
||||
|
||||
t, ok := rawTime.(time.Time)
|
||||
if ok {
|
||||
if !ok {
|
||||
return time.Time{}, false
|
||||
}
|
||||
|
||||
|
@ -188,10 +188,6 @@ func (s *DocumentStore) Upsert(ctx context.Context, collection string, document
|
||||
id = storage.NewDocumentID()
|
||||
}
|
||||
|
||||
delete(document, storage.DocumentAttrID)
|
||||
delete(document, storage.DocumentAttrCreatedAt)
|
||||
delete(document, storage.DocumentAttrUpdatedAt)
|
||||
|
||||
args := []any{id, collection, JSONMap(document), now, now}
|
||||
|
||||
row := tx.QueryRowContext(ctx, query, args...)
|
||||
|
@ -7,8 +7,8 @@ import (
|
||||
)
|
||||
|
||||
func TestDocumentStore(t *testing.T, store storage.DocumentStore) {
|
||||
t.Run("Query", func(t *testing.T) {
|
||||
t.Run("Ops", func(t *testing.T) {
|
||||
// t.Parallel()
|
||||
testDocumentStoreQuery(t, store)
|
||||
testDocumentStoreOps(t, store)
|
||||
})
|
||||
}
|
||||
|
212
pkg/storage/testsuite/document_store_ops.go
Normal file
212
pkg/storage/testsuite/document_store_ops.go
Normal file
@ -0,0 +1,212 @@
|
||||
package testsuite
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/filter"
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type documentStoreOpsTestCase struct {
|
||||
Name string
|
||||
Run func(ctx context.Context, store storage.DocumentStore) error
|
||||
}
|
||||
|
||||
var documentStoreOpsTestCases = []documentStoreOpsTestCase{
|
||||
{
|
||||
Name: "Basic query",
|
||||
Run: func(ctx context.Context, store storage.DocumentStore) error {
|
||||
collection := "simple_select"
|
||||
|
||||
docs := []storage.Document{
|
||||
{
|
||||
"attr1": "Foo",
|
||||
},
|
||||
{
|
||||
"attr1": "Bar",
|
||||
},
|
||||
}
|
||||
|
||||
for _, d := range docs {
|
||||
if _, err := store.Upsert(ctx, collection, d); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
}
|
||||
|
||||
filter := filter.New(
|
||||
filter.NewEqOperator(map[string]interface{}{
|
||||
"attr1": "Foo",
|
||||
}),
|
||||
)
|
||||
|
||||
results, err := store.Query(ctx, collection, filter, nil)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if e, g := 1, len(results); e != g {
|
||||
return errors.Errorf("len(results): expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
if e, g := "Foo", results[0]["attr1"]; e != g {
|
||||
return errors.Errorf("results[0][\"Attr1\"]: expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "Query with 'IN' operator",
|
||||
Run: func(ctx context.Context, store storage.DocumentStore) error {
|
||||
docs := []storage.Document{
|
||||
{
|
||||
"counter": 1,
|
||||
"tags": []string{"foo", "bar"},
|
||||
},
|
||||
{
|
||||
"counter": 1,
|
||||
"tags": []string{"nope"},
|
||||
},
|
||||
}
|
||||
|
||||
collection := "in_operator"
|
||||
|
||||
for _, doc := range docs {
|
||||
if _, err := store.Upsert(ctx, collection, doc); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
}
|
||||
|
||||
filter := filter.New(
|
||||
filter.NewAndOperator(
|
||||
filter.NewEqOperator(map[string]any{
|
||||
"counter": 1,
|
||||
}),
|
||||
filter.NewInOperator(map[string]any{
|
||||
"tags": "foo",
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
results, err := store.Query(ctx, collection, filter, nil)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if e, g := 1, len(results); e != g {
|
||||
return errors.Errorf("len(results): expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "Double upsert",
|
||||
Run: func(ctx context.Context, store storage.DocumentStore) error {
|
||||
collection := "double_upsert"
|
||||
|
||||
oriDoc := storage.Document{
|
||||
"attr1": "Foo",
|
||||
}
|
||||
|
||||
// Upsert document for the first time
|
||||
upsertedDoc, err := store.Upsert(ctx, collection, oriDoc)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
id, exists := upsertedDoc.ID()
|
||||
if !exists {
|
||||
return errors.New("id, exists := upsertedDoc.ID(): 'exists' should be true")
|
||||
}
|
||||
|
||||
if id == storage.DocumentID("") {
|
||||
return errors.New("id, exists := upsertedDoc.ID(): 'id' should not be an empty string")
|
||||
}
|
||||
|
||||
createdAt, exists := upsertedDoc.CreatedAt()
|
||||
if !exists {
|
||||
return errors.New("createdAt, exists := upsertedDoc.CreatedAt(): 'exists' should be true")
|
||||
}
|
||||
|
||||
if createdAt.IsZero() {
|
||||
return errors.New("createdAt, exists := upsertedDoc.CreatedAt(): 'createdAt' should not be zero time")
|
||||
}
|
||||
|
||||
updatedAt, exists := upsertedDoc.UpdatedAt()
|
||||
if !exists {
|
||||
return errors.New("updatedAt, exists := upsertedDoc.UpdatedAt(): 'exists' should be true")
|
||||
}
|
||||
|
||||
if updatedAt.IsZero() {
|
||||
return errors.New("updatedAt, exists := upsertedDoc.UpdatedAt(): 'updatedAt' should not be zero time")
|
||||
}
|
||||
|
||||
if e, g := oriDoc["attr1"], upsertedDoc["attr1"]; e != g {
|
||||
return errors.Errorf("upsertedDoc[\"attr1\"]: expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
// Check that document does not have unexpected properties
|
||||
if e, g := 4, len(upsertedDoc); e != g {
|
||||
return errors.Errorf("len(upsertedDoc): expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
// Upsert document for the second time
|
||||
upsertedDoc2, err := store.Upsert(ctx, collection, upsertedDoc)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
spew.Dump(upsertedDoc, upsertedDoc2)
|
||||
|
||||
prevID, _ := upsertedDoc.ID()
|
||||
newID, _ := upsertedDoc2.ID()
|
||||
|
||||
if e, g := prevID, newID; e != g {
|
||||
return errors.Errorf("newID: expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
createdAt1, _ := upsertedDoc.CreatedAt()
|
||||
createdAt2, _ := upsertedDoc2.CreatedAt()
|
||||
|
||||
if e, g := createdAt1, createdAt2; e != g {
|
||||
return errors.Errorf("upsertedDoc2.CreatedAt(): expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
updatedAt1, _ := upsertedDoc.UpdatedAt()
|
||||
updatedAt2, _ := upsertedDoc2.UpdatedAt()
|
||||
|
||||
if e, g := updatedAt1, updatedAt2; e == g {
|
||||
return errors.New("upsertedDoc2.UpdatedAt() should have been different than upsertedDoc.UpdatedAt()")
|
||||
}
|
||||
|
||||
// Verify that there is no additional created document in the collection
|
||||
|
||||
results, err := store.Query(ctx, collection, nil, nil)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if e, g := 1, len(results); e != g {
|
||||
return errors.Errorf("len(results): expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
func testDocumentStoreOps(t *testing.T, store storage.DocumentStore) {
|
||||
for _, tc := range documentStoreOpsTestCases {
|
||||
func(tc documentStoreOpsTestCase) {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
if err := tc.Run(context.Background(), store); err != nil {
|
||||
t.Errorf("%+v", errors.WithStack(err))
|
||||
}
|
||||
})
|
||||
}(tc)
|
||||
}
|
||||
}
|
@ -1,128 +0,0 @@
|
||||
package testsuite
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/filter"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type documentStoreQueryTestCase struct {
|
||||
Name string
|
||||
Before func(ctx context.Context, store storage.DocumentStore) error
|
||||
Collection string
|
||||
Filter *filter.Filter
|
||||
QueryOptionsFuncs []storage.QueryOptionFunc
|
||||
After func(t *testing.T, results []storage.Document, err error)
|
||||
}
|
||||
|
||||
var documentStoreQueryTestCases = []documentStoreQueryTestCase{
|
||||
{
|
||||
Name: "Simple select",
|
||||
Before: func(ctx context.Context, store storage.DocumentStore) error {
|
||||
doc1 := storage.Document{
|
||||
"attr1": "Foo",
|
||||
}
|
||||
|
||||
if _, err := store.Upsert(ctx, "simple_select", doc1); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
doc2 := storage.Document{
|
||||
"attr1": "Bar",
|
||||
}
|
||||
|
||||
if _, err := store.Upsert(ctx, "simple_select", doc2); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
Collection: "simple_select",
|
||||
Filter: filter.New(
|
||||
filter.NewEqOperator(map[string]interface{}{
|
||||
"attr1": "Foo",
|
||||
}),
|
||||
),
|
||||
After: func(t *testing.T, results []storage.Document, err error) {
|
||||
if err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
if e, g := 1, len(results); e != g {
|
||||
t.Errorf("len(results): expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
if e, g := "Foo", results[0]["attr1"]; e != g {
|
||||
t.Errorf("results[0][\"Attr1\"]: expected '%v', got '%v'", e, g)
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "IN Operator",
|
||||
Before: func(ctx context.Context, store storage.DocumentStore) error {
|
||||
docs := []storage.Document{
|
||||
{
|
||||
"counter": 1,
|
||||
"tags": []string{"foo", "bar"},
|
||||
},
|
||||
{
|
||||
"counter": 1,
|
||||
"tags": []string{"nope"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, doc := range docs {
|
||||
if _, err := store.Upsert(ctx, "in_operator", doc); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
Collection: "in_operator",
|
||||
Filter: filter.New(
|
||||
filter.NewAndOperator(
|
||||
filter.NewEqOperator(map[string]any{
|
||||
"counter": 1,
|
||||
}),
|
||||
filter.NewInOperator(map[string]any{
|
||||
"tags": "foo",
|
||||
}),
|
||||
),
|
||||
),
|
||||
After: func(t *testing.T, results []storage.Document, err error) {
|
||||
if err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
if e, g := 1, len(results); e != g {
|
||||
t.Errorf("len(results): expected '%v', got '%v'", e, g)
|
||||
}
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
func testDocumentStoreQuery(t *testing.T, store storage.DocumentStore) {
|
||||
for _, tc := range documentStoreQueryTestCases {
|
||||
func(tc documentStoreQueryTestCase) {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
// t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
if tc.Before != nil {
|
||||
if err := tc.Before(ctx, store); err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
}
|
||||
|
||||
documents, err := store.Query(ctx, tc.Collection, tc.Filter, tc.QueryOptionsFuncs...)
|
||||
|
||||
tc.After(t, documents, err)
|
||||
})
|
||||
}(tc)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user