Compare commits

..

1 Commits

Author SHA1 Message Date
776d0cc3cd feat(storage): rpc based implementation
All checks were successful
arcad/edge/pipeline/pr-master This commit looks good
2023-09-26 19:54:20 -06:00
39 changed files with 787 additions and 58 deletions

3
.gitignore vendored
View File

@ -4,4 +4,5 @@
/tools /tools
*.sqlite *.sqlite
/.gitea-release /.gitea-release
/.edge /.edge
/data

View File

@ -11,9 +11,12 @@ DATE_VERSION := $(shell date +%Y.%-m.%-d)
FULL_VERSION := v$(DATE_VERSION)-$(GIT_VERSION)$(if $(shell git diff --stat),-dirty,) FULL_VERSION := v$(DATE_VERSION)-$(GIT_VERSION)$(if $(shell git diff --stat),-dirty,)
APP_PATH ?= misc/client-sdk-testsuite/dist APP_PATH ?= misc/client-sdk-testsuite/dist
RUN_APP_ARGS ?= RUN_APP_ARGS ?=
RUN_STORAGE_SERVER_ARGS ?=
SHELL := bash SHELL := bash
build: build-edge-cli build-client-sdk-test-app
build: build-cli build-storage-server build-client-sdk-test-app
watch: tools/modd/bin/modd watch: tools/modd/bin/modd
tools/modd/bin/modd tools/modd/bin/modd
@ -27,12 +30,18 @@ test-go:
lint: lint:
golangci-lint run --enable-all $(LINT_ARGS) golangci-lint run --enable-all $(LINT_ARGS)
build-edge-cli: build-sdk build-cli: build-sdk
CGO_ENABLED=0 go build \ CGO_ENABLED=0 go build \
-v \ -v \
-o ./bin/cli \ -o ./bin/cli \
./cmd/cli ./cmd/cli
build-storage-server: build-sdk
CGO_ENABLED=0 go build \
-v \
-o ./bin/storage-server \
./cmd/storage-server
build-client-sdk-test-app: build-client-sdk-test-app:
cd misc/client-sdk-testsuite && $(MAKE) dist cd misc/client-sdk-testsuite && $(MAKE) dist
@ -68,6 +77,9 @@ node_modules:
run-app: .env run-app: .env
( set -o allexport && source .env && set +o allexport && bin/cli app run -p $(APP_PATH) $$RUN_APP_ARGS ) ( set -o allexport && source .env && set +o allexport && bin/cli app run -p $(APP_PATH) $$RUN_APP_ARGS )
run-storage-server: .env
( set -o allexport && source .env && set +o allexport && bin/storage-server run $$RUN_STORAGE_SERVER_ARGS )
.env: .env:
cp .env.dist .env cp .env.dist .env

View File

@ -13,6 +13,8 @@ import (
"strings" "strings"
"sync" "sync"
"github.com/keegancsmith/rpc"
"forge.cadoles.com/arcad/edge/pkg/app" "forge.cadoles.com/arcad/edge/pkg/app"
"forge.cadoles.com/arcad/edge/pkg/bus" "forge.cadoles.com/arcad/edge/pkg/bus"
"forge.cadoles.com/arcad/edge/pkg/bus/memory" "forge.cadoles.com/arcad/edge/pkg/bus/memory"
@ -30,7 +32,7 @@ import (
shareModule "forge.cadoles.com/arcad/edge/pkg/module/share" shareModule "forge.cadoles.com/arcad/edge/pkg/module/share"
shareSqlite "forge.cadoles.com/arcad/edge/pkg/module/share/sqlite" shareSqlite "forge.cadoles.com/arcad/edge/pkg/module/share/sqlite"
"forge.cadoles.com/arcad/edge/pkg/storage" "forge.cadoles.com/arcad/edge/pkg/storage"
storageSqlite "forge.cadoles.com/arcad/edge/pkg/storage/sqlite" "forge.cadoles.com/arcad/edge/pkg/storage/rpc/client"
"gitlab.com/wpetit/goweb/logger" "gitlab.com/wpetit/goweb/logger"
"forge.cadoles.com/arcad/edge/pkg/bundle" "forge.cadoles.com/arcad/edge/pkg/bundle"
@ -439,19 +441,41 @@ func initMemoryBus(deps *moduleDeps) error {
func initDatastores(storageFile string, appID app.ID) ModuleDepFunc { func initDatastores(storageFile string, appID app.ID) ModuleDepFunc {
return func(deps *moduleDeps) error { return func(deps *moduleDeps) error {
storageFile = injectAppID(storageFile, appID) // storageFile = injectAppID(storageFile, appID)
if err := ensureDir(storageFile); err != nil { blobStoreClient, err := rpc.DialHTTPPath(
return errors.WithStack(err) "tcp",
} "localhost:3001",
injectAppID("/blobstore?tenant=%APPID%", appID),
db, err := storageSqlite.Open(storageFile) )
if err != nil { if err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }
deps.DocumentStore = storageSqlite.NewDocumentStoreWithDB(db) deps.BlobStore = client.NewBlobStore(blobStoreClient)
deps.BlobStore = storageSqlite.NewBlobStoreWithDB(db)
documentStoreClient, err := rpc.DialHTTPPath(
"tcp",
"localhost:3001",
injectAppID("/documentstore?tenant=%APPID%", appID),
)
if err != nil {
return errors.WithStack(err)
}
deps.DocumentStore = client.NewDocumentStore(documentStoreClient)
// if err := ensureDir(storageFile); err != nil {
// return errors.WithStack(err)
// }
// db, err := storageSqlite.Open(storageFile)
// if err != nil {
// return errors.WithStack(err)
// }
// deps.DocumentStore = storageSqlite.NewDocumentStoreWithDB(db)
// deps.BlobStore = storageSqlite.NewBlobStoreWithDB(db)
return nil return nil
} }

View File

@ -0,0 +1,13 @@
package auth
import (
"github.com/urfave/cli/v2"
)
func Root() *cli.Command {
return &cli.Command{
Name: "auth",
Usage: "Auth related command",
Subcommands: []*cli.Command{},
}
}

View File

@ -0,0 +1,48 @@
package command
import (
"context"
"fmt"
"os"
"sort"
"github.com/urfave/cli/v2"
)
func Main(commands ...*cli.Command) {
ctx := context.Background()
app := &cli.App{
Name: "storage-server",
Usage: "Edge storage server",
Commands: commands,
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "debug",
EnvVars: []string{"DEBUG"},
Value: false,
},
},
}
app.ExitErrHandler = func(ctx *cli.Context, err error) {
if err == nil {
return
}
debug := ctx.Bool("debug")
if !debug {
fmt.Printf("[ERROR] %v\n", err)
} else {
fmt.Printf("%+v", err)
}
}
sort.Sort(cli.FlagsByName(app.Flags))
sort.Sort(cli.CommandsByName(app.Commands))
if err := app.RunContext(ctx, os.Args); err != nil {
os.Exit(1)
}
}

View File

@ -0,0 +1,145 @@
package command
import (
"fmt"
"net/http"
"os"
"path/filepath"
"sync"
"time"
"github.com/keegancsmith/rpc"
"gitlab.com/wpetit/goweb/logger"
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/server"
"forge.cadoles.com/arcad/edge/pkg/storage/sqlite"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/pkg/errors"
"github.com/urfave/cli/v2"
)
func Run() *cli.Command {
return &cli.Command{
Name: "run",
Usage: "Run server",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "address",
Aliases: []string{"addr"},
Value: ":3001",
},
&cli.StringFlag{
Name: "data-dir",
Value: "./data",
},
&cli.StringFlag{
Name: "dsn-query-string",
Value: fmt.Sprintf("_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", (60 * time.Second).Milliseconds()),
},
},
Action: func(ctx *cli.Context) error {
addr := ctx.String("address")
dataDir := ctx.String("data-dir")
dsnQueryString := ctx.String("dsn-query-string")
router := chi.NewRouter()
router.Use(middleware.RealIP)
router.Use(middleware.Logger)
router.Handle("/blobstore", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tenant := r.URL.Query().Get("tenant")
if tenant == "" {
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
server, err := getBlobStoreStoreServer(dataDir, tenant, dsnQueryString)
if err != nil {
logger.Error(r.Context(), "could not retrieve blob store server", logger.E(errors.WithStack(err)), logger.F("tenant", tenant))
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
server.ServeHTTP(w, r)
}))
router.Handle("/documentstore", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tenant := r.URL.Query().Get("tenant")
if tenant == "" {
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
server, err := getDocumentStoreServer(dataDir, tenant, dsnQueryString)
if err != nil {
logger.Error(r.Context(), "could not retrieve document store server", logger.E(errors.WithStack(err)), logger.F("tenant", tenant))
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
server.ServeHTTP(w, r)
}))
if err := http.ListenAndServe(addr, router); err != nil {
return errors.WithStack(err)
}
return nil
},
}
}
var documentStoreTenants sync.Map
func getDocumentStoreServer(dataDir, tenant, dsnQueryString string) (*rpc.Server, error) {
dir := filepath.Join(dataDir, tenant)
if err := os.MkdirAll(dir, os.FileMode(0750)); err != nil {
return nil, errors.WithStack(err)
}
file := filepath.Join(dir, "documentstore.sqlite")
dsn := fmt.Sprintf("%s?%s", file, dsnQueryString)
documentStore := sqlite.NewDocumentStore(dsn)
documentStoreServer := server.NewDocumentStoreServer(documentStore)
rawDocumentStoreServer, _ := documentStoreTenants.LoadOrStore(tenant, documentStoreServer)
documentStoreServer, ok := rawDocumentStoreServer.(*rpc.Server)
if !ok {
return nil, errors.Errorf("unexpected document store server value of type '%T'", rawDocumentStoreServer)
}
return documentStoreServer, nil
}
var blobStoreTenants sync.Map
func getBlobStoreStoreServer(dataDir, tenant, dsnQueryString string) (*rpc.Server, error) {
dir := filepath.Join(dataDir, tenant)
if err := os.MkdirAll(dir, os.FileMode(0750)); err != nil {
return nil, errors.WithStack(err)
}
file := filepath.Join(dir, "blobstore.sqlite")
dsn := fmt.Sprintf("%s?%s", file, dsnQueryString)
blobStore := sqlite.NewBlobStore(dsn)
blobStoreServer := server.NewBlobStoreServer(blobStore)
rawBlobStoreServer, _ := documentStoreTenants.LoadOrStore(tenant, blobStoreServer)
blobStoreServer, ok := rawBlobStoreServer.(*rpc.Server)
if !ok {
return nil, errors.Errorf("unexpected document store server value of type '%T'", rawBlobStoreServer)
}
return blobStoreServer, nil
}

View File

@ -0,0 +1,13 @@
package main
import (
"forge.cadoles.com/arcad/edge/cmd/storage-server/command"
"forge.cadoles.com/arcad/edge/cmd/storage-server/command/auth"
)
func main() {
command.Main(
command.Run(),
auth.Root(),
)
}

View File

@ -9,6 +9,7 @@ modd.conf
prep: make build-client-sdk-test-app prep: make build-client-sdk-test-app
prep: make build prep: make build
daemon: make run-app daemon: make run-app
daemon: make run-storage-server
} }
**/*.go { **/*.go {

View File

@ -8,6 +8,16 @@ func (o *AndOperator) Token() Token {
return TokenAnd return TokenAnd
} }
func (o *AndOperator) AsMap() map[string]any {
children := make([]map[string]any, 0, len(o.children))
for _, c := range o.children {
children = append(children, c.AsMap())
}
return map[string]any{
string(TokenAnd): children,
}
}
func (o *AndOperator) Children() []Operator { func (o *AndOperator) Children() []Operator {
return o.children return o.children
} }

View File

@ -12,6 +12,16 @@ func (o *EqOperator) Fields() map[string]interface{} {
return o.fields return o.fields
} }
func (o *EqOperator) AsMap() map[string]any {
fields := make(map[string]any, len(o.fields))
for k, v := range o.fields {
fields[k] = v
}
return map[string]any{
string(TokenEq): fields,
}
}
func NewEqOperator(fields map[string]interface{}) *EqOperator { func NewEqOperator(fields map[string]interface{}) *EqOperator {
return &EqOperator{fields} return &EqOperator{fields}
} }

View File

@ -29,6 +29,15 @@ func NewFrom(raw map[string]interface{}) (*Filter, error) {
return &Filter{op}, nil return &Filter{op}, nil
} }
func (f *Filter) AsMap() map[string]any {
root := f.Root()
if root == nil {
return nil
}
return f.Root().AsMap()
}
func toFieldOperator(v interface{}) (Operator, error) { func toFieldOperator(v interface{}) (Operator, error) {
vv, ok := v.(map[string]interface{}) vv, ok := v.(map[string]interface{})
if !ok { if !ok {
@ -93,8 +102,17 @@ func toFieldOperator(v interface{}) (Operator, error) {
} }
func toAggregateOperator(token Token, v interface{}) (Operator, error) { func toAggregateOperator(token Token, v interface{}) (Operator, error) {
vv, ok := v.([]interface{}) var vv []interface{}
if !ok {
switch typed := v.(type) {
case []interface{}:
vv = typed
case []map[string]interface{}:
vv = make([]interface{}, 0, len(typed))
for _, item := range typed {
vv = append(vv, item)
}
default:
return nil, errors.WithStack(ErrInvalidAggregationOperator) return nil, errors.WithStack(ErrInvalidAggregationOperator)
} }

View File

@ -12,6 +12,16 @@ func (o *GtOperator) Fields() map[string]interface{} {
return o.fields return o.fields
} }
func (o *GtOperator) AsMap() map[string]any {
fields := make(map[string]any, len(o.fields))
for k, v := range o.fields {
fields[k] = v
}
return map[string]any{
string(TokenGt): fields,
}
}
func NewGtOperator(fields OperatorFields) *GtOperator { func NewGtOperator(fields OperatorFields) *GtOperator {
return &GtOperator{fields} return &GtOperator{fields}
} }

View File

@ -12,6 +12,16 @@ func (o *GteOperator) Fields() map[string]interface{} {
return o.fields return o.fields
} }
func (o *GteOperator) AsMap() map[string]any {
fields := make(map[string]any, len(o.fields))
for k, v := range o.fields {
fields[k] = v
}
return map[string]any{
string(TokenGte): fields,
}
}
func NewGteOperator(fields OperatorFields) *GteOperator { func NewGteOperator(fields OperatorFields) *GteOperator {
return &GteOperator{fields} return &GteOperator{fields}
} }

View File

@ -12,6 +12,16 @@ func (o *InOperator) Fields() map[string]interface{} {
return o.fields return o.fields
} }
func (o *InOperator) AsMap() map[string]any {
fields := make(map[string]any, len(o.fields))
for k, v := range o.fields {
fields[k] = v
}
return map[string]any{
string(TokenIn): fields,
}
}
func NewInOperator(fields OperatorFields) *InOperator { func NewInOperator(fields OperatorFields) *InOperator {
return &InOperator{fields} return &InOperator{fields}
} }

View File

@ -12,6 +12,16 @@ func (o *LikeOperator) Fields() map[string]interface{} {
return o.fields return o.fields
} }
func (o *LikeOperator) AsMap() map[string]any {
fields := make(map[string]any, len(o.fields))
for k, v := range o.fields {
fields[k] = v
}
return map[string]any{
string(TokenLike): fields,
}
}
func NewLikeOperator(fields OperatorFields) *LikeOperator { func NewLikeOperator(fields OperatorFields) *LikeOperator {
return &LikeOperator{fields} return &LikeOperator{fields}
} }

View File

@ -12,6 +12,16 @@ func (o *LtOperator) Fields() map[string]interface{} {
return o.fields return o.fields
} }
func (o *LtOperator) AsMap() map[string]any {
fields := make(map[string]any, len(o.fields))
for k, v := range o.fields {
fields[k] = v
}
return map[string]any{
string(TokenLt): fields,
}
}
func NewLtOperator(fields OperatorFields) *LtOperator { func NewLtOperator(fields OperatorFields) *LtOperator {
return &LtOperator{fields} return &LtOperator{fields}
} }

View File

@ -12,6 +12,16 @@ func (o *LteOperator) Fields() map[string]interface{} {
return o.fields return o.fields
} }
func (o *LteOperator) AsMap() map[string]any {
fields := make(map[string]any, len(o.fields))
for k, v := range o.fields {
fields[k] = v
}
return map[string]any{
string(TokenLte): fields,
}
}
func NewLteOperator(fields OperatorFields) *LteOperator { func NewLteOperator(fields OperatorFields) *LteOperator {
return &LteOperator{fields} return &LteOperator{fields}
} }

View File

@ -12,6 +12,16 @@ func (o *NeqOperator) Fields() map[string]interface{} {
return o.fields return o.fields
} }
func (o *NeqOperator) AsMap() map[string]any {
fields := make(map[string]any, len(o.fields))
for k, v := range o.fields {
fields[k] = v
}
return map[string]any{
string(TokenNeq): fields,
}
}
func NewNeqOperator(fields map[string]interface{}) *NeqOperator { func NewNeqOperator(fields map[string]interface{}) *NeqOperator {
return &NeqOperator{fields} return &NeqOperator{fields}
} }

View File

@ -12,6 +12,16 @@ func (o *NotOperator) Children() []Operator {
return o.children return o.children
} }
func (o *NotOperator) AsMap() map[string]any {
children := make([]map[string]any, 0, len(o.children))
for _, c := range o.children {
children = append(children, c.AsMap())
}
return map[string]any{
string(TokenNot): children,
}
}
func NewNotOperator(ops ...Operator) *NotOperator { func NewNotOperator(ops ...Operator) *NotOperator {
return &NotOperator{ops} return &NotOperator{ops}
} }

View File

@ -20,4 +20,15 @@ type OperatorFields map[string]interface{}
type Operator interface { type Operator interface {
Token() Token Token() Token
AsMap() map[string]any
}
type FieldOperator interface {
Operator
Fields() map[string]any
}
type AggregatorOperator interface {
Operator
Children() []Operator
} }

View File

@ -12,6 +12,16 @@ func (o *OrOperator) Children() []Operator {
return o.children return o.children
} }
func (o *OrOperator) AsMap() map[string]any {
children := make([]map[string]any, 0, len(o.children))
for _, c := range o.children {
children = append(children, c.AsMap())
}
return map[string]any{
string(TokenOr): children,
}
}
func NewOrOperator(ops ...Operator) *OrOperator { func NewOrOperator(ops ...Operator) *OrOperator {
return &OrOperator{ops} return &OrOperator{ops}
} }

View File

@ -70,8 +70,8 @@ func (b *BlobBucket) Delete(ctx context.Context, id storage.BlobID) error {
// Get implements storage.BlobBucket // Get implements storage.BlobBucket
func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobInfo, error) { func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobInfo, error) {
args := blob.GetBlobInfoArgs{ args := blob.GetBlobInfoArgs{
Name: b.name, BucketID: b.id,
BlobID: id, BlobID: id,
} }
reply := blob.GetBlobInfoReply{} reply := blob.GetBlobInfoReply{}

View File

@ -1,6 +1,7 @@
package client package client
import ( import (
"context"
"fmt" "fmt"
"net/http/httptest" "net/http/httptest"
"os" "os"
@ -18,9 +19,11 @@ import (
func TestBlobStore(t *testing.T) { func TestBlobStore(t *testing.T) {
t.Parallel() t.Parallel()
logger.SetLevel(logger.LevelDebug) if testing.Verbose() {
logger.SetLevel(logger.LevelDebug)
}
httpServer, err := startNewServer() httpServer, err := startNewBlobStoreServer()
if err != nil { if err != nil {
t.Fatalf("%+v", errors.WithStack(err)) t.Fatalf("%+v", errors.WithStack(err))
} }
@ -41,13 +44,13 @@ func TestBlobStore(t *testing.T) {
store := NewBlobStore(client) store := NewBlobStore(client)
testsuite.TestBlobStore(t, store) testsuite.TestBlobStore(context.Background(), t, store)
} }
func BenchmarkBlobStore(t *testing.B) { func BenchmarkBlobStore(t *testing.B) {
logger.SetLevel(logger.LevelError) logger.SetLevel(logger.LevelError)
httpServer, err := startNewServer() httpServer, err := startNewBlobStoreServer()
if err != nil { if err != nil {
t.Fatalf("%+v", errors.WithStack(err)) t.Fatalf("%+v", errors.WithStack(err))
} }
@ -72,7 +75,7 @@ func BenchmarkBlobStore(t *testing.B) {
} }
func getSQLiteBlobstore() (*sqlite.BlobStore, error) { func getSQLiteBlobStore() (*sqlite.BlobStore, error) {
file := "./testdata/blobstore_test.sqlite" file := "./testdata/blobstore_test.sqlite"
if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) { if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) {
@ -85,8 +88,8 @@ func getSQLiteBlobstore() (*sqlite.BlobStore, error) {
return store, nil return store, nil
} }
func startNewServer() (*httptest.Server, error) { func startNewBlobStoreServer() (*httptest.Server, error) {
store, err := getSQLiteBlobstore() store, err := getSQLiteBlobStore()
if err != nil { if err != nil {
return nil, errors.WithStack(err) return nil, errors.WithStack(err)
} }

View File

@ -2,35 +2,102 @@ package client
import ( import (
"context" "context"
"encoding/gob"
"time"
"github.com/keegancsmith/rpc" "github.com/keegancsmith/rpc"
"github.com/pkg/errors"
"forge.cadoles.com/arcad/edge/pkg/storage" "forge.cadoles.com/arcad/edge/pkg/storage"
"forge.cadoles.com/arcad/edge/pkg/storage/filter" "forge.cadoles.com/arcad/edge/pkg/storage/filter"
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/server/document"
) )
func init() {
gob.Register(storage.Document{})
gob.Register(storage.DocumentID(""))
gob.Register(time.Time{})
gob.Register(map[string]interface{}{})
gob.Register([]interface{}{})
gob.Register([]map[string]interface{}{})
}
type DocumentStore struct { type DocumentStore struct {
client *rpc.Client client *rpc.Client
} }
// Delete implements storage.DocumentStore. // Delete implements storage.DocumentStore.
func (*DocumentStore) Delete(ctx context.Context, collection string, id storage.DocumentID) error { func (s *DocumentStore) Delete(ctx context.Context, collection string, id storage.DocumentID) error {
panic("unimplemented") args := document.DeleteDocumentArgs{
Collection: collection,
DocumentID: id,
}
reply := document.DeleteDocumentReply{}
if err := s.client.Call(ctx, "Service.DeleteDocument", args, &reply); err != nil {
return errors.WithStack(err)
}
return nil
} }
// Get implements storage.DocumentStore. // Get implements storage.DocumentStore.
func (*DocumentStore) Get(ctx context.Context, collection string, id storage.DocumentID) (storage.Document, error) { func (s *DocumentStore) Get(ctx context.Context, collection string, id storage.DocumentID) (storage.Document, error) {
panic("unimplemented") args := document.GetDocumentArgs{
Collection: collection,
DocumentID: id,
}
reply := document.GetDocumentReply{}
if err := s.client.Call(ctx, "Service.GetDocument", args, &reply); err != nil {
return nil, errors.WithStack(err)
}
return reply.Document, nil
} }
// Query implements storage.DocumentStore. // Query implements storage.DocumentStore.
func (*DocumentStore) Query(ctx context.Context, collection string, filter *filter.Filter, funcs ...storage.QueryOptionFunc) ([]storage.Document, error) { func (s *DocumentStore) Query(ctx context.Context, collection string, filter *filter.Filter, funcs ...storage.QueryOptionFunc) ([]storage.Document, error) {
panic("unimplemented") opts := &storage.QueryOptions{}
for _, fn := range funcs {
fn(opts)
}
args := document.QueryDocumentsArgs{
Collection: collection,
Filter: nil,
Options: opts,
}
if filter != nil {
args.Filter = filter.AsMap()
}
reply := document.QueryDocumentsReply{}
if err := s.client.Call(ctx, "Service.QueryDocuments", args, &reply); err != nil {
return nil, errors.WithStack(err)
}
return reply.Documents, nil
} }
// Upsert implements storage.DocumentStore. // Upsert implements storage.DocumentStore.
func (*DocumentStore) Upsert(ctx context.Context, collection string, document storage.Document) (storage.Document, error) { func (s *DocumentStore) Upsert(ctx context.Context, collection string, doc storage.Document) (storage.Document, error) {
panic("unimplemented") args := document.UpsertDocumentArgs{
Collection: collection,
Document: doc,
}
reply := document.UpsertDocumentReply{}
if err := s.client.Call(ctx, "Service.UpsertDocument", args, &reply); err != nil {
return nil, errors.WithStack(err)
}
return reply.Document, nil
} }
func NewDocumentStore(client *rpc.Client) *DocumentStore { func NewDocumentStore(client *rpc.Client) *DocumentStore {

View File

@ -0,0 +1,74 @@
package client
import (
"context"
"fmt"
"net/http/httptest"
"os"
"testing"
"time"
"github.com/keegancsmith/rpc"
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/server"
"forge.cadoles.com/arcad/edge/pkg/storage/sqlite"
"forge.cadoles.com/arcad/edge/pkg/storage/testsuite"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
func TestDocumentStore(t *testing.T) {
t.Parallel()
if testing.Verbose() {
logger.SetLevel(logger.LevelDebug)
}
httpServer, err := startNewDocumentStoreServer()
if err != nil {
t.Fatalf("%+v", errors.WithStack(err))
}
defer httpServer.Close()
serverAddr := httpServer.Listener.Addr()
client, err := rpc.DialHTTPPath(
serverAddr.Network(),
serverAddr.String(),
"",
)
if err != nil {
t.Fatalf("%+v", errors.WithStack(err))
}
defer client.Close()
store := NewDocumentStore(client)
testsuite.TestDocumentStore(context.Background(), t, store)
}
func getSQLiteDocumentStore() (*sqlite.DocumentStore, error) {
file := "./testdata/documentstore_test.sqlite"
if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) {
return nil, errors.WithStack(err)
}
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
store := sqlite.NewDocumentStore(dsn)
return store, nil
}
func startNewDocumentStoreServer() (*httptest.Server, error) {
store, err := getSQLiteDocumentStore()
if err != nil {
return nil, errors.WithStack(err)
}
server := server.NewDocumentStoreServer(store)
httpServer := httptest.NewServer(server)
return httpServer, nil
}

View File

@ -2,13 +2,18 @@ package blob
import ( import (
"context" "context"
"encoding/gob"
"time"
"forge.cadoles.com/arcad/edge/pkg/storage" "forge.cadoles.com/arcad/edge/pkg/storage"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
func init() {
gob.Register(&BlobInfo{})
}
type GetBlobInfoArgs struct { type GetBlobInfoArgs struct {
Name string
BlobID storage.BlobID BlobID storage.BlobID
BucketID BucketID BucketID BucketID
} }
@ -29,8 +34,49 @@ func (s *Service) GetBlobInfo(ctx context.Context, args *GetBlobInfoArgs, reply
} }
*reply = GetBlobInfoReply{ *reply = GetBlobInfoReply{
BlobInfo: blobInfo, BlobInfo: &BlobInfo{
Bucket_: blobInfo.Bucket(),
ContentType_: blobInfo.ContentType(),
BlobID_: blobInfo.ID(),
ModTime_: blobInfo.ModTime(),
Size_: blobInfo.Size(),
},
} }
return nil return nil
} }
type BlobInfo struct {
Bucket_ string
ContentType_ string
BlobID_ storage.BlobID
ModTime_ time.Time
Size_ int64
}
// Bucket implements storage.BlobInfo.
func (bi *BlobInfo) Bucket() string {
return bi.Bucket_
}
// ContentType implements storage.BlobInfo.
func (bi *BlobInfo) ContentType() string {
return bi.ContentType_
}
// ID implements storage.BlobInfo.
func (bi *BlobInfo) ID() storage.BlobID {
return bi.BlobID_
}
// ModTime implements storage.BlobInfo.
func (bi *BlobInfo) ModTime() time.Time {
return bi.ModTime_
}
// Size implements storage.BlobInfo.
func (bi *BlobInfo) Size() int64 {
return bi.Size_
}
var _ storage.BlobInfo = &BlobInfo{}

View File

@ -0,0 +1,26 @@
package document
import (
"context"
"forge.cadoles.com/arcad/edge/pkg/storage"
"github.com/pkg/errors"
)
type DeleteDocumentArgs struct {
Collection string
DocumentID storage.DocumentID
}
type DeleteDocumentReply struct {
}
func (s *Service) DeleteDocument(ctx context.Context, args *DeleteDocumentArgs, reply *DeleteDocumentReply) error {
if err := s.store.Delete(ctx, args.Collection, args.DocumentID); err != nil {
return errors.WithStack(err)
}
*reply = DeleteDocumentReply{}
return nil
}

View File

@ -0,0 +1,30 @@
package document
import (
"context"
"forge.cadoles.com/arcad/edge/pkg/storage"
"github.com/pkg/errors"
)
type GetDocumentArgs struct {
Collection string
DocumentID storage.DocumentID
}
type GetDocumentReply struct {
Document storage.Document
}
func (s *Service) GetDocument(ctx context.Context, args *GetDocumentArgs, reply *GetDocumentReply) error {
document, err := s.store.Get(ctx, args.Collection, args.DocumentID)
if err != nil {
return errors.WithStack(err)
}
*reply = GetDocumentReply{
Document: document,
}
return nil
}

View File

@ -0,0 +1,53 @@
package document
import (
"context"
"forge.cadoles.com/arcad/edge/pkg/storage"
"forge.cadoles.com/arcad/edge/pkg/storage/filter"
"github.com/pkg/errors"
)
type QueryDocumentsArgs struct {
Collection string
Filter map[string]any
Options *storage.QueryOptions
}
type QueryDocumentsReply struct {
Documents []storage.Document
}
func (s *Service) QueryDocuments(ctx context.Context, args *QueryDocumentsArgs, reply *QueryDocumentsReply) error {
var (
argsFilter *filter.Filter
err error
)
if args.Filter != nil {
argsFilter, err = filter.NewFrom(args.Filter)
if err != nil {
return errors.WithStack(err)
}
}
documents, err := s.store.Query(ctx, args.Collection, argsFilter, withQueryOptions(args.Options))
if err != nil {
return errors.WithStack(err)
}
*reply = QueryDocumentsReply{
Documents: documents,
}
return nil
}
func withQueryOptions(opts *storage.QueryOptions) storage.QueryOptionFunc {
return func(o *storage.QueryOptions) {
o.Limit = opts.Limit
o.Offset = opts.Offset
o.OrderBy = opts.OrderBy
o.OrderDirection = opts.OrderDirection
}
}

View File

@ -0,0 +1,11 @@
package document
import "forge.cadoles.com/arcad/edge/pkg/storage"
type Service struct {
store storage.DocumentStore
}
func NewService(store storage.DocumentStore) *Service {
return &Service{store}
}

View File

@ -0,0 +1,30 @@
package document
import (
"context"
"forge.cadoles.com/arcad/edge/pkg/storage"
"github.com/pkg/errors"
)
type UpsertDocumentArgs struct {
Collection string
Document storage.Document
}
type UpsertDocumentReply struct {
Document storage.Document
}
func (s *Service) UpsertDocument(ctx context.Context, args *UpsertDocumentArgs, reply *UpsertDocumentReply) error {
document, err := s.store.Upsert(ctx, args.Collection, args.Document)
if err != nil {
return errors.WithStack(err)
}
*reply = UpsertDocumentReply{
Document: document,
}
return nil
}

View File

@ -1,11 +0,0 @@
package server
import "forge.cadoles.com/arcad/edge/pkg/storage"
type DocumentStore struct {
store storage.DocumentStore
}
func NewDocumentStore(store storage.DocumentStore) *DocumentStore {
return &DocumentStore{store}
}

View File

@ -5,6 +5,7 @@ import (
"forge.cadoles.com/arcad/edge/pkg/storage" "forge.cadoles.com/arcad/edge/pkg/storage"
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/server/blob" "forge.cadoles.com/arcad/edge/pkg/storage/rpc/server/blob"
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/server/document"
) )
func NewBlobStoreServer(store storage.BlobStore) *rpc.Server { func NewBlobStoreServer(store storage.BlobStore) *rpc.Server {
@ -15,6 +16,6 @@ func NewBlobStoreServer(store storage.BlobStore) *rpc.Server {
func NewDocumentStoreServer(store storage.DocumentStore) *rpc.Server { func NewDocumentStoreServer(store storage.DocumentStore) *rpc.Server {
server := rpc.NewServer() server := rpc.NewServer()
server.Register(NewDocumentStore(store)) server.Register(document.NewService(store))
return server return server
} }

View File

@ -1,6 +1,7 @@
package sqlite package sqlite
import ( import (
"context"
"fmt" "fmt"
"os" "os"
"testing" "testing"
@ -13,7 +14,9 @@ import (
func TestBlobStore(t *testing.T) { func TestBlobStore(t *testing.T) {
t.Parallel() t.Parallel()
logger.SetLevel(logger.LevelDebug) if testing.Verbose() {
logger.SetLevel(logger.LevelDebug)
}
file := "./testdata/blobstore_test.sqlite" file := "./testdata/blobstore_test.sqlite"
@ -24,7 +27,7 @@ func TestBlobStore(t *testing.T) {
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds()) dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
store := NewBlobStore(dsn) store := NewBlobStore(dsn)
testsuite.TestBlobStore(t, store) testsuite.TestBlobStore(context.Background(), t, store)
} }
func BenchmarkBlobStore(t *testing.B) { func BenchmarkBlobStore(t *testing.B) {

View File

@ -1,6 +1,7 @@
package sqlite package sqlite
import ( import (
"context"
"fmt" "fmt"
"os" "os"
"testing" "testing"
@ -24,5 +25,5 @@ func TestDocumentStore(t *testing.T) {
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds()) dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
store := NewDocumentStore(dsn) store := NewDocumentStore(dsn)
testsuite.TestDocumentStore(t, store) testsuite.TestDocumentStore(context.Background(), t, store)
} }

View File

@ -1,13 +1,14 @@
package testsuite package testsuite
import ( import (
"context"
"testing" "testing"
"forge.cadoles.com/arcad/edge/pkg/storage" "forge.cadoles.com/arcad/edge/pkg/storage"
) )
func TestBlobStore(t *testing.T, store storage.BlobStore) { func TestBlobStore(ctx context.Context, t *testing.T, store storage.BlobStore) {
t.Run("Ops", func(t *testing.T) { t.Run("Ops", func(t *testing.T) {
testBlobStoreOps(t, store) testBlobStoreOps(ctx, t, store)
}) })
} }

View File

@ -131,14 +131,12 @@ var blobStoreTestCases = []blobStoreTestCase{
}, },
} }
func testBlobStoreOps(t *testing.T, store storage.BlobStore) { func testBlobStoreOps(ctx context.Context, t *testing.T, store storage.BlobStore) {
for _, tc := range blobStoreTestCases { for _, tc := range blobStoreTestCases {
func(tc blobStoreTestCase) { func(tc blobStoreTestCase) {
t.Run(tc.Name, func(t *testing.T) { t.Run(tc.Name, func(t *testing.T) {
t.Parallel() t.Parallel()
ctx := context.Background()
if err := tc.Run(ctx, store); err != nil { if err := tc.Run(ctx, store); err != nil {
t.Errorf("%+v", errors.WithStack(err)) t.Errorf("%+v", errors.WithStack(err))
} }

View File

@ -1,14 +1,14 @@
package testsuite package testsuite
import ( import (
"context"
"testing" "testing"
"forge.cadoles.com/arcad/edge/pkg/storage" "forge.cadoles.com/arcad/edge/pkg/storage"
) )
func TestDocumentStore(t *testing.T, store storage.DocumentStore) { func TestDocumentStore(ctx context.Context, t *testing.T, store storage.DocumentStore) {
t.Run("Ops", func(t *testing.T) { t.Run("Ops", func(t *testing.T) {
t.Parallel() testDocumentStoreOps(ctx, t, store)
testDocumentStoreOps(t, store)
}) })
} }

View File

@ -433,12 +433,12 @@ var documentStoreOpsTestCases = []documentStoreOpsTestCase{
}, },
} }
func testDocumentStoreOps(t *testing.T, store storage.DocumentStore) { func testDocumentStoreOps(ctx context.Context, t *testing.T, store storage.DocumentStore) {
for _, tc := range documentStoreOpsTestCases { for _, tc := range documentStoreOpsTestCases {
func(tc documentStoreOpsTestCase) { func(tc documentStoreOpsTestCase) {
t.Run(tc.Name, func(t *testing.T) { t.Run(tc.Name, func(t *testing.T) {
t.Parallel() t.Parallel()
if err := tc.Run(context.Background(), store); err != nil { if err := tc.Run(ctx, store); err != nil {
t.Errorf("%+v", errors.WithStack(err)) t.Errorf("%+v", errors.WithStack(err))
} }
}) })