From 32aaf48792a042943ac48226cb2bd083047cfb02 Mon Sep 17 00:00:00 2001 From: William Petit Date: Tue, 12 Sep 2023 22:03:25 -0600 Subject: [PATCH] feat(storage): rpc based implementation --- .env.dist | 4 +- .gitignore | 3 +- Makefile | 16 +- cmd/cli/command/app/run.go | 55 ++-- cmd/storage-server/command/auth/root.go | 13 + cmd/storage-server/command/main.go | 48 ++++ cmd/storage-server/command/run.go | 192 ++++++++++++++ cmd/storage-server/main.go | 13 + go.mod | 2 + go.sum | 4 + modd.conf | 1 + pkg/storage/filter/and.go | 10 + pkg/storage/filter/eq.go | 10 + pkg/storage/filter/filter.go | 22 +- pkg/storage/filter/gt.go | 10 + pkg/storage/filter/gte.go | 10 + pkg/storage/filter/in.go | 10 + pkg/storage/filter/like.go | 10 + pkg/storage/filter/lt.go | 10 + pkg/storage/filter/lte.go | 10 + pkg/storage/filter/neq.go | 10 + pkg/storage/filter/not.go | 10 + pkg/storage/filter/operator.go | 11 + pkg/storage/filter/or.go | 10 + pkg/storage/registry/blob_store.go | 35 +++ pkg/storage/registry/document_store.go | 35 +++ pkg/storage/registry/error.go | 5 + pkg/storage/rpc/client/blob_bucket.go | 239 ++++++++++++++++++ pkg/storage/rpc/client/blob_info.go | 40 +++ pkg/storage/rpc/client/blob_store.go | 101 ++++++++ pkg/storage/rpc/client/blob_store_test.go | 87 +++++++ pkg/storage/rpc/client/document_store.go | 134 ++++++++++ pkg/storage/rpc/client/document_store_test.go | 67 +++++ pkg/storage/rpc/client/init.go | 9 + pkg/storage/rpc/client/testdata/.gitignore | 1 + pkg/storage/rpc/gob/blob_info.go | 42 +++ pkg/storage/rpc/gob/init.go | 18 ++ pkg/storage/rpc/registry/registry.go | 22 ++ pkg/storage/rpc/server/blob/close_bucket.go | 31 +++ pkg/storage/rpc/server/blob/close_reader.go | 31 +++ pkg/storage/rpc/server/blob/close_writer.go | 31 +++ pkg/storage/rpc/server/blob/delete_bucket.go | 22 ++ pkg/storage/rpc/server/blob/get_blob_info.go | 42 +++ .../rpc/server/blob/get_bucket_size.go | 33 +++ pkg/storage/rpc/server/blob/list_blob_info.go | 34 +++ pkg/storage/rpc/server/blob/list_buckets.go | 27 ++ .../rpc/server/blob/new_blob_reader.go | 57 +++++ .../rpc/server/blob/new_blob_writer.go | 57 +++++ pkg/storage/rpc/server/blob/open_bucket.go | 50 ++++ pkg/storage/rpc/server/blob/read_blob.go | 41 +++ pkg/storage/rpc/server/blob/seek_blob.go | 38 +++ pkg/storage/rpc/server/blob/service.go | 60 +++++ pkg/storage/rpc/server/blob/write_blob.go | 34 +++ .../rpc/server/document/delete_document.go | 26 ++ .../rpc/server/document/get_document.go | 30 +++ .../rpc/server/document/query_documents.go | 53 ++++ pkg/storage/rpc/server/document/service.go | 11 + .../rpc/server/document/upsert_document.go | 30 +++ pkg/storage/rpc/server/init.go | 5 + pkg/storage/rpc/server/server.go | 21 ++ pkg/storage/sqlite/blob_store_test.go | 22 +- pkg/storage/sqlite/document_store_test.go | 3 +- pkg/storage/sqlite/registry/registry.go | 51 ++++ pkg/storage/testsuite/blob_store.go | 6 +- pkg/storage/testsuite/blob_store_benchmark.go | 79 ++++++ pkg/storage/testsuite/blob_store_ops.go | 27 +- pkg/storage/testsuite/document_store.go | 6 +- pkg/storage/testsuite/document_store_ops.go | 4 +- 68 files changed, 2249 insertions(+), 42 deletions(-) create mode 100644 cmd/storage-server/command/auth/root.go create mode 100644 cmd/storage-server/command/main.go create mode 100644 cmd/storage-server/command/run.go create mode 100644 cmd/storage-server/main.go create mode 100644 pkg/storage/registry/blob_store.go create mode 100644 pkg/storage/registry/document_store.go create mode 100644 pkg/storage/registry/error.go create mode 100644 pkg/storage/rpc/client/blob_bucket.go create mode 100644 pkg/storage/rpc/client/blob_info.go create mode 100644 pkg/storage/rpc/client/blob_store.go create mode 100644 pkg/storage/rpc/client/blob_store_test.go create mode 100644 pkg/storage/rpc/client/document_store.go create mode 100644 pkg/storage/rpc/client/document_store_test.go create mode 100644 pkg/storage/rpc/client/init.go create mode 100644 pkg/storage/rpc/client/testdata/.gitignore create mode 100644 pkg/storage/rpc/gob/blob_info.go create mode 100644 pkg/storage/rpc/gob/init.go create mode 100644 pkg/storage/rpc/registry/registry.go create mode 100644 pkg/storage/rpc/server/blob/close_bucket.go create mode 100644 pkg/storage/rpc/server/blob/close_reader.go create mode 100644 pkg/storage/rpc/server/blob/close_writer.go create mode 100644 pkg/storage/rpc/server/blob/delete_bucket.go create mode 100644 pkg/storage/rpc/server/blob/get_blob_info.go create mode 100644 pkg/storage/rpc/server/blob/get_bucket_size.go create mode 100644 pkg/storage/rpc/server/blob/list_blob_info.go create mode 100644 pkg/storage/rpc/server/blob/list_buckets.go create mode 100644 pkg/storage/rpc/server/blob/new_blob_reader.go create mode 100644 pkg/storage/rpc/server/blob/new_blob_writer.go create mode 100644 pkg/storage/rpc/server/blob/open_bucket.go create mode 100644 pkg/storage/rpc/server/blob/read_blob.go create mode 100644 pkg/storage/rpc/server/blob/seek_blob.go create mode 100644 pkg/storage/rpc/server/blob/service.go create mode 100644 pkg/storage/rpc/server/blob/write_blob.go create mode 100644 pkg/storage/rpc/server/document/delete_document.go create mode 100644 pkg/storage/rpc/server/document/get_document.go create mode 100644 pkg/storage/rpc/server/document/query_documents.go create mode 100644 pkg/storage/rpc/server/document/service.go create mode 100644 pkg/storage/rpc/server/document/upsert_document.go create mode 100644 pkg/storage/rpc/server/init.go create mode 100644 pkg/storage/rpc/server/server.go create mode 100644 pkg/storage/sqlite/registry/registry.go create mode 100644 pkg/storage/testsuite/blob_store_benchmark.go diff --git a/.env.dist b/.env.dist index 660424e..6f09b89 100644 --- a/.env.dist +++ b/.env.dist @@ -1 +1,3 @@ -RUN_APP_ARGS="" \ No newline at end of file +RUN_APP_ARGS="" +#EDGE_DOCUMENTSTORE_DSN="rpc://localhost:3001/documentstore?tenant=local&appId=%APPID%" +#EDGE_BLOBSTORE_DSN="rpc://localhost:3001/blobstore?tenant=local&appId=%APPID%" \ No newline at end of file diff --git a/.gitignore b/.gitignore index 665f9ac..8477fe8 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ /tools *.sqlite /.gitea-release -/.edge \ No newline at end of file +/.edge +/data \ No newline at end of file diff --git a/Makefile b/Makefile index e2829a2..640113d 100644 --- a/Makefile +++ b/Makefile @@ -11,9 +11,12 @@ DATE_VERSION := $(shell date +%Y.%-m.%-d) FULL_VERSION := v$(DATE_VERSION)-$(GIT_VERSION)$(if $(shell git diff --stat),-dirty,) APP_PATH ?= misc/client-sdk-testsuite/dist RUN_APP_ARGS ?= +RUN_STORAGE_SERVER_ARGS ?= + SHELL := bash -build: build-edge-cli build-client-sdk-test-app + +build: build-cli build-storage-server build-client-sdk-test-app watch: tools/modd/bin/modd tools/modd/bin/modd @@ -27,12 +30,18 @@ test-go: lint: golangci-lint run --enable-all $(LINT_ARGS) -build-edge-cli: build-sdk +build-cli: build-sdk CGO_ENABLED=0 go build \ -v \ -o ./bin/cli \ ./cmd/cli +build-storage-server: build-sdk + CGO_ENABLED=0 go build \ + -v \ + -o ./bin/storage-server \ + ./cmd/storage-server + build-client-sdk-test-app: cd misc/client-sdk-testsuite && $(MAKE) dist @@ -68,6 +77,9 @@ node_modules: run-app: .env ( set -o allexport && source .env && set +o allexport && bin/cli app run -p $(APP_PATH) $$RUN_APP_ARGS ) +run-storage-server: .env + ( set -o allexport && source .env && set +o allexport && bin/storage-server run $$RUN_STORAGE_SERVER_ARGS ) + .env: cp .env.dist .env diff --git a/cmd/cli/command/app/run.go b/cmd/cli/command/app/run.go index 6ce6d44..d06eeb8 100644 --- a/cmd/cli/command/app/run.go +++ b/cmd/cli/command/app/run.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "io/ioutil" "net" "net/http" "os" @@ -30,7 +29,7 @@ import ( shareModule "forge.cadoles.com/arcad/edge/pkg/module/share" shareSqlite "forge.cadoles.com/arcad/edge/pkg/module/share/sqlite" "forge.cadoles.com/arcad/edge/pkg/storage" - storageSqlite "forge.cadoles.com/arcad/edge/pkg/storage/sqlite" + "forge.cadoles.com/arcad/edge/pkg/storage/registry" "gitlab.com/wpetit/goweb/logger" "forge.cadoles.com/arcad/edge/pkg/bundle" @@ -45,6 +44,10 @@ import ( _ "forge.cadoles.com/arcad/edge/pkg/module/auth/http/passwd/argon2id" _ "forge.cadoles.com/arcad/edge/pkg/module/auth/http/passwd/plain" + + // Register storage schemes + _ "forge.cadoles.com/arcad/edge/pkg/storage/rpc/registry" + _ "forge.cadoles.com/arcad/edge/pkg/storage/sqlite/registry" ) func RunCommand() *cli.Command { @@ -75,9 +78,16 @@ func RunCommand() *cli.Command { Value: 0, }, &cli.StringFlag{ - Name: "storage-file", - Usage: "use `FILE` for SQLite storage database", - Value: ".edge/%APPID%/data.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=60000", + Name: "blobstore-dsn", + Usage: "use `DSN` for blob storage", + EnvVars: []string{"EDGE_BLOBSTORE_DSN"}, + Value: "sqlite://.edge/%APPID%/data.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=60000", + }, + &cli.StringFlag{ + Name: "documentstore-dsn", + Usage: "use `DSN` for document storage", + EnvVars: []string{"EDGE_DOCUMENTSTORE_DSN"}, + Value: "sqlite://.edge/%APPID%/data.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=60000", }, &cli.StringFlag{ Name: "shared-resources-file", @@ -96,7 +106,8 @@ func RunCommand() *cli.Command { logFormat := ctx.String("log-format") logLevel := ctx.Int("log-level") - storageFile := ctx.String("storage-file") + blobstoreDSN := ctx.String("blobstore-dsn") + documentstoreDSN := ctx.String("documentstore-dsn") accountsFile := ctx.String("accounts-file") sharedResourcesFile := ctx.String("shared-resources-file") @@ -144,7 +155,7 @@ func RunCommand() *cli.Command { appCtx := logger.With(cmdCtx, logger.F("address", address)) - if err := runApp(appCtx, path, address, storageFile, accountsFile, appsRepository, sharedResourcesFile); err != nil { + if err := runApp(appCtx, path, address, documentstoreDSN, blobstoreDSN, accountsFile, appsRepository, sharedResourcesFile); err != nil { logger.Error(appCtx, "could not run app", logger.E(errors.WithStack(err))) } }(p, port, idx) @@ -157,7 +168,7 @@ func RunCommand() *cli.Command { } } -func runApp(ctx context.Context, path string, address string, storageFile string, accountsFile string, appRepository appModule.Repository, sharedResourcesFile string) error { +func runApp(ctx context.Context, path, address, documentStoreDSN, blobStoreDSN, accountsFile string, appRepository appModule.Repository, sharedResourcesFile string) error { absPath, err := filepath.Abs(path) if err != nil { return errors.Wrapf(err, "could not resolve path '%s'", path) @@ -190,7 +201,7 @@ func runApp(ctx context.Context, path string, address string, storageFile string deps := &moduleDeps{} funcs := []ModuleDepFunc{ initMemoryBus, - initDatastores(storageFile, manifest.ID), + initDatastores(documentStoreDSN, blobStoreDSN, manifest.ID), initAccounts(accountsFile, manifest.ID), initShareRepository(sharedResourcesFile), initAppRepository(appRepository), @@ -323,10 +334,10 @@ func loadLocalAccounts(path string) ([]authHTTP.LocalAccount, error) { return nil, errors.WithStack(err) } - data, err := ioutil.ReadFile(path) + data, err := os.ReadFile(path) if err != nil { if errors.Is(err, os.ErrNotExist) { - if err := ioutil.WriteFile(path, defaultAccounts, 0o640); err != nil { + if err := os.WriteFile(path, defaultAccounts, 0o640); err != nil { return nil, errors.WithStack(err) } @@ -437,21 +448,25 @@ func initMemoryBus(deps *moduleDeps) error { return nil } -func initDatastores(storageFile string, appID app.ID) ModuleDepFunc { +func initDatastores(documentStoreDSN, blobStoreDSN string, appID app.ID) ModuleDepFunc { return func(deps *moduleDeps) error { - storageFile = injectAppID(storageFile, appID) + documentStoreDSN = injectAppID(documentStoreDSN, appID) - if err := ensureDir(storageFile); err != nil { - return errors.WithStack(err) - } - - db, err := storageSqlite.Open(storageFile) + documentStore, err := registry.NewDocumentStore(documentStoreDSN) if err != nil { return errors.WithStack(err) } - deps.DocumentStore = storageSqlite.NewDocumentStoreWithDB(db) - deps.BlobStore = storageSqlite.NewBlobStoreWithDB(db) + deps.DocumentStore = documentStore + + blobStoreDSN = injectAppID(blobStoreDSN, appID) + + blobStore, err := registry.NewBlobStore(blobStoreDSN) + if err != nil { + return errors.WithStack(err) + } + + deps.BlobStore = blobStore return nil } diff --git a/cmd/storage-server/command/auth/root.go b/cmd/storage-server/command/auth/root.go new file mode 100644 index 0000000..bdf044a --- /dev/null +++ b/cmd/storage-server/command/auth/root.go @@ -0,0 +1,13 @@ +package auth + +import ( + "github.com/urfave/cli/v2" +) + +func Root() *cli.Command { + return &cli.Command{ + Name: "auth", + Usage: "Auth related command", + Subcommands: []*cli.Command{}, + } +} diff --git a/cmd/storage-server/command/main.go b/cmd/storage-server/command/main.go new file mode 100644 index 0000000..791d8dc --- /dev/null +++ b/cmd/storage-server/command/main.go @@ -0,0 +1,48 @@ +package command + +import ( + "context" + "fmt" + "os" + "sort" + + "github.com/urfave/cli/v2" +) + +func Main(commands ...*cli.Command) { + ctx := context.Background() + + app := &cli.App{ + Name: "storage-server", + Usage: "Edge storage server", + Commands: commands, + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "debug", + EnvVars: []string{"DEBUG"}, + Value: false, + }, + }, + } + + app.ExitErrHandler = func(ctx *cli.Context, err error) { + if err == nil { + return + } + + debug := ctx.Bool("debug") + + if !debug { + fmt.Printf("[ERROR] %v\n", err) + } else { + fmt.Printf("%+v", err) + } + } + + sort.Sort(cli.FlagsByName(app.Flags)) + sort.Sort(cli.CommandsByName(app.Commands)) + + if err := app.RunContext(ctx, os.Args); err != nil { + os.Exit(1) + } +} diff --git a/cmd/storage-server/command/run.go b/cmd/storage-server/command/run.go new file mode 100644 index 0000000..c2ee644 --- /dev/null +++ b/cmd/storage-server/command/run.go @@ -0,0 +1,192 @@ +package command + +import ( + "fmt" + "net/http" + "strings" + "sync" + "time" + + "github.com/hashicorp/golang-lru/v2/expirable" + "github.com/keegancsmith/rpc" + "gitlab.com/wpetit/goweb/logger" + + "forge.cadoles.com/arcad/edge/pkg/storage/registry" + "forge.cadoles.com/arcad/edge/pkg/storage/rpc/server" + "github.com/go-chi/chi/v5" + "github.com/go-chi/chi/v5/middleware" + "github.com/pkg/errors" + "github.com/urfave/cli/v2" + + // Register storage schemes + _ "forge.cadoles.com/arcad/edge/pkg/storage/rpc/registry" + _ "forge.cadoles.com/arcad/edge/pkg/storage/sqlite/registry" +) + +func Run() *cli.Command { + return &cli.Command{ + Name: "run", + Usage: "Run server", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "address", + Aliases: []string{"addr"}, + Value: ":3001", + }, + &cli.StringFlag{ + Name: "blobstore-dsn-pattern", + EnvVars: []string{"STORAGE_SERVER_BLOBSTORE_DSN_PATTERN"}, + Value: fmt.Sprintf("sqlite://data/%%TENANT%%/%%APPID%%/blobstore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", (60 * time.Second).Milliseconds()), + }, + &cli.StringFlag{ + Name: "documentstore-dsn-pattern", + EnvVars: []string{"STORAGE_SERVER_DOCUMENTSTORE_DSN_PATTERN"}, + Value: fmt.Sprintf("sqlite://data/%%TENANT%%/%%APPID%%/documentstore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", (60 * time.Second).Milliseconds()), + }, + &cli.DurationFlag{ + Name: "cache-ttl", + EnvVars: []string{"STORAGE_SERVER_CACHE_TTL"}, + Value: time.Hour, + }, + &cli.IntFlag{ + Name: "cache-size", + EnvVars: []string{"STORAGE_SERVER_CACHE_SIZE"}, + Value: 32, + }, + }, + Action: func(ctx *cli.Context) error { + addr := ctx.String("address") + blobstoreDSNPattern := ctx.String("blobstore-dsn-pattern") + documentstoreDSNPattern := ctx.String("documentstore-dsn-pattern") + cacheSize := ctx.Int("cache-size") + cacheTTL := ctx.Duration("cache-ttl") + + router := chi.NewRouter() + + router.Use(middleware.RealIP) + router.Use(middleware.Logger) + + router.Handle("/blobstore", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + tenant := r.URL.Query().Get("tenant") + if tenant == "" { + http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) + + return + } + + appID := r.URL.Query().Get("appId") + if tenant == "" { + http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) + + return + } + + server, err := getBlobStoreStoreServer(cacheSize, cacheTTL, tenant, appID, blobstoreDSNPattern) + if err != nil { + logger.Error(r.Context(), "could not retrieve blob store server", logger.E(errors.WithStack(err)), logger.F("tenant", tenant)) + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + + return + } + + server.ServeHTTP(w, r) + })) + + router.Handle("/documentstore", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + tenant := r.URL.Query().Get("tenant") + if tenant == "" { + http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) + + return + } + + appID := r.URL.Query().Get("appId") + if tenant == "" { + http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) + + return + } + + server, err := getDocumentStoreServer(cacheSize, cacheTTL, tenant, appID, documentstoreDSNPattern) + if err != nil { + logger.Error(r.Context(), "could not retrieve document store server", logger.E(errors.WithStack(err)), logger.F("tenant", tenant)) + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + + return + } + + server.ServeHTTP(w, r) + })) + + if err := http.ListenAndServe(addr, router); err != nil { + return errors.WithStack(err) + } + + return nil + }, + } +} + +var ( + documentStoreCache *expirable.LRU[string, *rpc.Server] + initDocumentStoreCache sync.Once +) + +func getDocumentStoreServer(cacheSize int, cacheTTL time.Duration, tenant, appID, dsnPattern string) (*rpc.Server, error) { + initDocumentStoreCache.Do(func() { + documentStoreCache = expirable.NewLRU[string, *rpc.Server](cacheSize, nil, cacheTTL) + }) + + key := fmt.Sprintf("%s:%s", tenant, appID) + + documentStoreServer, _ := documentStoreCache.Get(key) + if documentStoreServer != nil { + return documentStoreServer, nil + } + + dsn := strings.ReplaceAll(dsnPattern, "%TENANT%", tenant) + dsn = strings.ReplaceAll(dsn, "%APPID%", appID) + + documentStore, err := registry.NewDocumentStore(dsn) + if err != nil { + return nil, errors.WithStack(err) + } + + documentStoreServer = server.NewDocumentStoreServer(documentStore) + + documentStoreCache.Add(key, documentStoreServer) + + return documentStoreServer, nil +} + +var ( + blobStoreCache *expirable.LRU[string, *rpc.Server] + initBlobStoreCache sync.Once +) + +func getBlobStoreStoreServer(cacheSize int, cacheTTL time.Duration, tenant, appID, dsnPattern string) (*rpc.Server, error) { + initBlobStoreCache.Do(func() { + blobStoreCache = expirable.NewLRU[string, *rpc.Server](cacheSize, nil, cacheTTL) + }) + + key := fmt.Sprintf("%s:%s", tenant, appID) + + blobStoreServer, _ := blobStoreCache.Get(key) + if blobStoreServer != nil { + return blobStoreServer, nil + } + + dsn := strings.ReplaceAll(dsnPattern, "%TENANT%", tenant) + dsn = strings.ReplaceAll(dsn, "%APPID%", appID) + + blobStore, err := registry.NewBlobStore(dsn) + if err != nil { + return nil, errors.WithStack(err) + } + + blobStoreServer = server.NewBlobStoreServer(blobStore) + + blobStoreCache.Add(key, blobStoreServer) + + return blobStoreServer, nil +} diff --git a/cmd/storage-server/main.go b/cmd/storage-server/main.go new file mode 100644 index 0000000..35ed74e --- /dev/null +++ b/cmd/storage-server/main.go @@ -0,0 +1,13 @@ +package main + +import ( + "forge.cadoles.com/arcad/edge/cmd/storage-server/command" + "forge.cadoles.com/arcad/edge/cmd/storage-server/command/auth" +) + +func main() { + command.Main( + command.Run(), + auth.Root(), + ) +} diff --git a/go.mod b/go.mod index 19c0759..a206a84 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,8 @@ require ( github.com/go-playground/universal-translator v0.16.0 // indirect github.com/goccy/go-json v0.9.11 // indirect github.com/gogo/protobuf v0.0.0-20161014173244-50d1bd39ce4e // indirect + github.com/hashicorp/golang-lru/v2 v2.0.6 // indirect + github.com/keegancsmith/rpc v1.3.0 // indirect github.com/leodido/go-urn v1.1.0 // indirect github.com/lestrrat-go/blackmagic v1.0.1 // indirect github.com/lestrrat-go/httpcc v1.0.1 // indirect diff --git a/go.sum b/go.sum index e1a0564..d01e884 100644 --- a/go.sum +++ b/go.sum @@ -188,6 +188,8 @@ github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad github.com/hashicorp/go.net v0.0.0-20151006203346-104dcad90073/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/golang-lru/v2 v2.0.6 h1:3xi/Cafd1NaoEnS/yDssIiuVeDVywU0QdFGl3aQaQHM= +github.com/hashicorp/golang-lru/v2 v2.0.6/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/mdns v0.0.0-20151206042412-9d85cf22f9f8/go.mod h1:aa76Av3qgPeIQp9Y3qIkTBPieQYNkQ13Kxe7pze9Wb0= github.com/hashicorp/mdns v1.0.5 h1:1M5hW1cunYeoXOqHwEb/GBDDHAFo0Yqb/uz/beC6LbE= @@ -201,6 +203,8 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1 github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= +github.com/keegancsmith/rpc v1.3.0 h1:wGWOpjcNrZaY8GDYZJfvyxmlLljm3YQWF+p918DXtDk= +github.com/keegancsmith/rpc v1.3.0/go.mod h1:6O2xnOGjPyvIPbvp0MdrOe5r6cu1GZ4JoTzpzDhWeo0= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= diff --git a/modd.conf b/modd.conf index 5f5e28a..5f1a776 100644 --- a/modd.conf +++ b/modd.conf @@ -9,6 +9,7 @@ modd.conf prep: make build-client-sdk-test-app prep: make build daemon: make run-app + daemon: make run-storage-server } **/*.go { diff --git a/pkg/storage/filter/and.go b/pkg/storage/filter/and.go index c504797..b73e4ca 100644 --- a/pkg/storage/filter/and.go +++ b/pkg/storage/filter/and.go @@ -8,6 +8,16 @@ func (o *AndOperator) Token() Token { return TokenAnd } +func (o *AndOperator) AsMap() map[string]any { + children := make([]map[string]any, 0, len(o.children)) + for _, c := range o.children { + children = append(children, c.AsMap()) + } + return map[string]any{ + string(TokenAnd): children, + } +} + func (o *AndOperator) Children() []Operator { return o.children } diff --git a/pkg/storage/filter/eq.go b/pkg/storage/filter/eq.go index c7d0163..497f06b 100644 --- a/pkg/storage/filter/eq.go +++ b/pkg/storage/filter/eq.go @@ -12,6 +12,16 @@ func (o *EqOperator) Fields() map[string]interface{} { return o.fields } +func (o *EqOperator) AsMap() map[string]any { + fields := make(map[string]any, len(o.fields)) + for k, v := range o.fields { + fields[k] = v + } + return map[string]any{ + string(TokenEq): fields, + } +} + func NewEqOperator(fields map[string]interface{}) *EqOperator { return &EqOperator{fields} } diff --git a/pkg/storage/filter/filter.go b/pkg/storage/filter/filter.go index 9a6acf3..87006eb 100644 --- a/pkg/storage/filter/filter.go +++ b/pkg/storage/filter/filter.go @@ -29,6 +29,15 @@ func NewFrom(raw map[string]interface{}) (*Filter, error) { return &Filter{op}, nil } +func (f *Filter) AsMap() map[string]any { + root := f.Root() + if root == nil { + return nil + } + + return f.Root().AsMap() +} + func toFieldOperator(v interface{}) (Operator, error) { vv, ok := v.(map[string]interface{}) if !ok { @@ -93,8 +102,17 @@ func toFieldOperator(v interface{}) (Operator, error) { } func toAggregateOperator(token Token, v interface{}) (Operator, error) { - vv, ok := v.([]interface{}) - if !ok { + var vv []interface{} + + switch typed := v.(type) { + case []interface{}: + vv = typed + case []map[string]interface{}: + vv = make([]interface{}, 0, len(typed)) + for _, item := range typed { + vv = append(vv, item) + } + default: return nil, errors.WithStack(ErrInvalidAggregationOperator) } diff --git a/pkg/storage/filter/gt.go b/pkg/storage/filter/gt.go index 32d1361..b72eddc 100644 --- a/pkg/storage/filter/gt.go +++ b/pkg/storage/filter/gt.go @@ -12,6 +12,16 @@ func (o *GtOperator) Fields() map[string]interface{} { return o.fields } +func (o *GtOperator) AsMap() map[string]any { + fields := make(map[string]any, len(o.fields)) + for k, v := range o.fields { + fields[k] = v + } + return map[string]any{ + string(TokenGt): fields, + } +} + func NewGtOperator(fields OperatorFields) *GtOperator { return &GtOperator{fields} } diff --git a/pkg/storage/filter/gte.go b/pkg/storage/filter/gte.go index de9a330..1fef96e 100644 --- a/pkg/storage/filter/gte.go +++ b/pkg/storage/filter/gte.go @@ -12,6 +12,16 @@ func (o *GteOperator) Fields() map[string]interface{} { return o.fields } +func (o *GteOperator) AsMap() map[string]any { + fields := make(map[string]any, len(o.fields)) + for k, v := range o.fields { + fields[k] = v + } + return map[string]any{ + string(TokenGte): fields, + } +} + func NewGteOperator(fields OperatorFields) *GteOperator { return &GteOperator{fields} } diff --git a/pkg/storage/filter/in.go b/pkg/storage/filter/in.go index 9672785..393074b 100644 --- a/pkg/storage/filter/in.go +++ b/pkg/storage/filter/in.go @@ -12,6 +12,16 @@ func (o *InOperator) Fields() map[string]interface{} { return o.fields } +func (o *InOperator) AsMap() map[string]any { + fields := make(map[string]any, len(o.fields)) + for k, v := range o.fields { + fields[k] = v + } + return map[string]any{ + string(TokenIn): fields, + } +} + func NewInOperator(fields OperatorFields) *InOperator { return &InOperator{fields} } diff --git a/pkg/storage/filter/like.go b/pkg/storage/filter/like.go index 2dd6cc0..80277bc 100644 --- a/pkg/storage/filter/like.go +++ b/pkg/storage/filter/like.go @@ -12,6 +12,16 @@ func (o *LikeOperator) Fields() map[string]interface{} { return o.fields } +func (o *LikeOperator) AsMap() map[string]any { + fields := make(map[string]any, len(o.fields)) + for k, v := range o.fields { + fields[k] = v + } + return map[string]any{ + string(TokenLike): fields, + } +} + func NewLikeOperator(fields OperatorFields) *LikeOperator { return &LikeOperator{fields} } diff --git a/pkg/storage/filter/lt.go b/pkg/storage/filter/lt.go index a60fc60..a680530 100644 --- a/pkg/storage/filter/lt.go +++ b/pkg/storage/filter/lt.go @@ -12,6 +12,16 @@ func (o *LtOperator) Fields() map[string]interface{} { return o.fields } +func (o *LtOperator) AsMap() map[string]any { + fields := make(map[string]any, len(o.fields)) + for k, v := range o.fields { + fields[k] = v + } + return map[string]any{ + string(TokenLt): fields, + } +} + func NewLtOperator(fields OperatorFields) *LtOperator { return &LtOperator{fields} } diff --git a/pkg/storage/filter/lte.go b/pkg/storage/filter/lte.go index 070635c..8f06e7d 100644 --- a/pkg/storage/filter/lte.go +++ b/pkg/storage/filter/lte.go @@ -12,6 +12,16 @@ func (o *LteOperator) Fields() map[string]interface{} { return o.fields } +func (o *LteOperator) AsMap() map[string]any { + fields := make(map[string]any, len(o.fields)) + for k, v := range o.fields { + fields[k] = v + } + return map[string]any{ + string(TokenLte): fields, + } +} + func NewLteOperator(fields OperatorFields) *LteOperator { return &LteOperator{fields} } diff --git a/pkg/storage/filter/neq.go b/pkg/storage/filter/neq.go index bdba587..ce3aba5 100644 --- a/pkg/storage/filter/neq.go +++ b/pkg/storage/filter/neq.go @@ -12,6 +12,16 @@ func (o *NeqOperator) Fields() map[string]interface{} { return o.fields } +func (o *NeqOperator) AsMap() map[string]any { + fields := make(map[string]any, len(o.fields)) + for k, v := range o.fields { + fields[k] = v + } + return map[string]any{ + string(TokenNeq): fields, + } +} + func NewNeqOperator(fields map[string]interface{}) *NeqOperator { return &NeqOperator{fields} } diff --git a/pkg/storage/filter/not.go b/pkg/storage/filter/not.go index 1b60af9..19a9b1f 100644 --- a/pkg/storage/filter/not.go +++ b/pkg/storage/filter/not.go @@ -12,6 +12,16 @@ func (o *NotOperator) Children() []Operator { return o.children } +func (o *NotOperator) AsMap() map[string]any { + children := make([]map[string]any, 0, len(o.children)) + for _, c := range o.children { + children = append(children, c.AsMap()) + } + return map[string]any{ + string(TokenNot): children, + } +} + func NewNotOperator(ops ...Operator) *NotOperator { return &NotOperator{ops} } diff --git a/pkg/storage/filter/operator.go b/pkg/storage/filter/operator.go index e4781b9..344b2da 100644 --- a/pkg/storage/filter/operator.go +++ b/pkg/storage/filter/operator.go @@ -20,4 +20,15 @@ type OperatorFields map[string]interface{} type Operator interface { Token() Token + AsMap() map[string]any +} + +type FieldOperator interface { + Operator + Fields() map[string]any +} + +type AggregatorOperator interface { + Operator + Children() []Operator } diff --git a/pkg/storage/filter/or.go b/pkg/storage/filter/or.go index 276f801..845d22a 100644 --- a/pkg/storage/filter/or.go +++ b/pkg/storage/filter/or.go @@ -12,6 +12,16 @@ func (o *OrOperator) Children() []Operator { return o.children } +func (o *OrOperator) AsMap() map[string]any { + children := make([]map[string]any, 0, len(o.children)) + for _, c := range o.children { + children = append(children, c.AsMap()) + } + return map[string]any{ + string(TokenOr): children, + } +} + func NewOrOperator(ops ...Operator) *OrOperator { return &OrOperator{ops} } diff --git a/pkg/storage/registry/blob_store.go b/pkg/storage/registry/blob_store.go new file mode 100644 index 0000000..c18a57d --- /dev/null +++ b/pkg/storage/registry/blob_store.go @@ -0,0 +1,35 @@ +package registry + +import ( + "net/url" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "github.com/pkg/errors" +) + +var blobStoreFactories = make(map[string]BlobStoreFactory, 0) + +type BlobStoreFactory func(url *url.URL) (storage.BlobStore, error) + +func AddBlobStoreFactory(scheme string, factory BlobStoreFactory) { + blobStoreFactories[scheme] = factory +} + +func NewBlobStore(dsn string) (storage.BlobStore, error) { + url, err := url.Parse(dsn) + if err != nil { + return nil, errors.WithStack(err) + } + + factory, exists := blobStoreFactories[url.Scheme] + if !exists { + return nil, errors.WithStack(ErrSchemeNotRegistered) + } + + store, err := factory(url) + if err != nil { + return nil, errors.WithStack(err) + } + + return store, nil +} diff --git a/pkg/storage/registry/document_store.go b/pkg/storage/registry/document_store.go new file mode 100644 index 0000000..2ee9c8b --- /dev/null +++ b/pkg/storage/registry/document_store.go @@ -0,0 +1,35 @@ +package registry + +import ( + "net/url" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "github.com/pkg/errors" +) + +var documentStoreFactories = make(map[string]DocumentStoreFactory, 0) + +type DocumentStoreFactory func(url *url.URL) (storage.DocumentStore, error) + +func AddDocumentStoreFactory(scheme string, factory DocumentStoreFactory) { + documentStoreFactories[scheme] = factory +} + +func NewDocumentStore(dsn string) (storage.DocumentStore, error) { + url, err := url.Parse(dsn) + if err != nil { + return nil, errors.WithStack(err) + } + + factory, exists := documentStoreFactories[url.Scheme] + if !exists { + return nil, errors.WithStack(ErrSchemeNotRegistered) + } + + store, err := factory(url) + if err != nil { + return nil, errors.WithStack(err) + } + + return store, nil +} diff --git a/pkg/storage/registry/error.go b/pkg/storage/registry/error.go new file mode 100644 index 0000000..ec0d5b3 --- /dev/null +++ b/pkg/storage/registry/error.go @@ -0,0 +1,5 @@ +package registry + +import "errors" + +var ErrSchemeNotRegistered = errors.New("scheme was not registered") diff --git a/pkg/storage/rpc/client/blob_bucket.go b/pkg/storage/rpc/client/blob_bucket.go new file mode 100644 index 0000000..a3746cf --- /dev/null +++ b/pkg/storage/rpc/client/blob_bucket.go @@ -0,0 +1,239 @@ +package client + +import ( + "context" + "io" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "forge.cadoles.com/arcad/edge/pkg/storage/rpc/server/blob" + "github.com/pkg/errors" +) + +type BlobBucket struct { + name string + id blob.BucketID + call CallFunc +} + +// Size implements storage.BlobBucket +func (b *BlobBucket) Size(ctx context.Context) (int64, error) { + args := blob.GetBucketSizeArgs{ + BucketID: b.id, + } + + reply := blob.GetBucketSizeReply{} + + if err := b.call(ctx, "Service.GetBucketSize", args, &reply); err != nil { + return 0, errors.WithStack(err) + } + + return reply.Size, nil +} + +// Name implements storage.BlobBucket +func (b *BlobBucket) Name() string { + return b.name +} + +// Close implements storage.BlobBucket +func (b *BlobBucket) Close() error { + args := blob.CloseBucketArgs{ + BucketID: b.id, + } + + reply := blob.CloseBucketReply{} + + if err := b.call(context.Background(), "Service.CloseBucket", args, &reply); err != nil { + return errors.WithStack(err) + } + + return nil +} + +// Delete implements storage.BlobBucket +func (b *BlobBucket) Delete(ctx context.Context, id storage.BlobID) error { + args := blob.DeleteBucketArgs{ + BucketName: b.name, + } + + reply := blob.DeleteBucketReply{} + + if err := b.call(context.Background(), "Service.DeleteBucket", args, &reply); err != nil { + return errors.WithStack(err) + } + + return nil +} + +// Get implements storage.BlobBucket +func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobInfo, error) { + args := blob.GetBlobInfoArgs{ + BucketID: b.id, + BlobID: id, + } + + reply := blob.GetBlobInfoReply{} + + if err := b.call(context.Background(), "Service.GetBlobInfo", args, &reply); err != nil { + return nil, errors.WithStack(err) + } + + return reply.BlobInfo, nil +} + +// List implements storage.BlobBucket +func (b *BlobBucket) List(ctx context.Context) ([]storage.BlobInfo, error) { + args := blob.ListBlobInfoArgs{ + BucketID: b.id, + } + + reply := blob.ListBlobInfoReply{} + + if err := b.call(context.Background(), "Service.ListBlobInfo", args, &reply); err != nil { + return nil, errors.WithStack(err) + } + + return reply.BlobInfos, nil +} + +// NewReader implements storage.BlobBucket +func (b *BlobBucket) NewReader(ctx context.Context, id storage.BlobID) (io.ReadSeekCloser, error) { + args := blob.NewBlobReaderArgs{ + BucketID: b.id, + BlobID: id, + } + + reply := blob.NewBlobReaderReply{} + + if err := b.call(context.Background(), "Service.NewBlobReader", args, &reply); err != nil { + return nil, errors.WithStack(err) + } + + return &blobReaderCloser{ + readerID: reply.ReaderID, + call: b.call, + }, nil +} + +// NewWriter implements storage.BlobBucket +func (b *BlobBucket) NewWriter(ctx context.Context, id storage.BlobID) (io.WriteCloser, error) { + args := blob.NewBlobWriterArgs{ + BucketID: b.id, + BlobID: id, + } + + reply := blob.NewBlobWriterReply{} + + if err := b.call(context.Background(), "Service.NewBlobWriter", args, &reply); err != nil { + return nil, errors.WithStack(err) + } + + return &blobWriterCloser{ + blobID: id, + writerID: reply.WriterID, + call: b.call, + }, nil +} + +type blobWriterCloser struct { + blobID storage.BlobID + writerID blob.WriterID + call CallFunc +} + +// Write implements io.WriteCloser +func (bwc *blobWriterCloser) Write(data []byte) (int, error) { + args := blob.WriteBlobArgs{ + WriterID: bwc.writerID, + Data: data, + } + + reply := blob.WriteBlobReply{} + + if err := bwc.call(context.Background(), "Service.WriteBlob", args, &reply); err != nil { + return 0, errors.WithStack(err) + } + + return reply.Written, nil +} + +// Close implements io.WriteCloser +func (bwc *blobWriterCloser) Close() error { + args := blob.CloseWriterArgs{ + WriterID: bwc.writerID, + } + + reply := blob.CloseBucketReply{} + + if err := bwc.call(context.Background(), "Service.CloseWriter", args, &reply); err != nil { + return errors.WithStack(err) + } + + return nil +} + +type blobReaderCloser struct { + readerID blob.ReaderID + call func(ctx context.Context, serviceMethod string, args any, reply any) error +} + +// Read implements io.ReadSeekCloser +func (brc *blobReaderCloser) Read(p []byte) (int, error) { + args := blob.ReadBlobArgs{ + ReaderID: brc.readerID, + Length: len(p), + } + + reply := blob.ReadBlobReply{} + + if err := brc.call(context.Background(), "Service.ReadBlob", args, &reply); err != nil { + return 0, errors.WithStack(err) + } + + copy(p, reply.Data) + + if reply.EOF { + return reply.Read, io.EOF + } + + return reply.Read, nil +} + +// Seek implements io.ReadSeekCloser +func (brc *blobReaderCloser) Seek(offset int64, whence int) (int64, error) { + args := blob.SeekBlobArgs{ + ReaderID: brc.readerID, + Offset: offset, + Whence: whence, + } + + reply := blob.SeekBlobReply{} + + if err := brc.call(context.Background(), "Service.SeekBlob", args, &reply); err != nil { + return 0, errors.WithStack(err) + } + + return reply.Read, nil +} + +// Close implements io.ReadSeekCloser +func (brc *blobReaderCloser) Close() error { + args := blob.CloseReaderArgs{ + ReaderID: brc.readerID, + } + + reply := blob.CloseReaderReply{} + + if err := brc.call(context.Background(), "Service.CloseReader", args, &reply); err != nil { + return errors.WithStack(err) + } + + return nil +} + +var ( + _ storage.BlobBucket = &BlobBucket{} + _ storage.BlobInfo = &BlobInfo{} + _ io.WriteCloser = &blobWriterCloser{} + _ io.ReadSeekCloser = &blobReaderCloser{} +) diff --git a/pkg/storage/rpc/client/blob_info.go b/pkg/storage/rpc/client/blob_info.go new file mode 100644 index 0000000..5cc9b24 --- /dev/null +++ b/pkg/storage/rpc/client/blob_info.go @@ -0,0 +1,40 @@ +package client + +import ( + "time" + + "forge.cadoles.com/arcad/edge/pkg/storage" +) + +type BlobInfo struct { + id storage.BlobID + bucket string + contentType string + modTime time.Time + size int64 +} + +// Bucket implements storage.BlobInfo +func (i *BlobInfo) Bucket() string { + return i.bucket +} + +// ID implements storage.BlobInfo +func (i *BlobInfo) ID() storage.BlobID { + return i.id +} + +// ContentType implements storage.BlobInfo +func (i *BlobInfo) ContentType() string { + return i.contentType +} + +// ModTime implements storage.BlobInfo +func (i *BlobInfo) ModTime() time.Time { + return i.modTime +} + +// Size implements storage.BlobInfo +func (i *BlobInfo) Size() int64 { + return i.size +} diff --git a/pkg/storage/rpc/client/blob_store.go b/pkg/storage/rpc/client/blob_store.go new file mode 100644 index 0000000..de09896 --- /dev/null +++ b/pkg/storage/rpc/client/blob_store.go @@ -0,0 +1,101 @@ +package client + +import ( + "context" + "net/url" + + "github.com/keegancsmith/rpc" + "gitlab.com/wpetit/goweb/logger" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "forge.cadoles.com/arcad/edge/pkg/storage/rpc/server/blob" + "github.com/pkg/errors" +) + +type BlobStore struct { + serverURL *url.URL +} + +// DeleteBucket implements storage.BlobStore. +func (s *BlobStore) DeleteBucket(ctx context.Context, name string) error { + args := &blob.DeleteBucketArgs{ + BucketName: name, + } + + if err := s.call(ctx, "Service.DeleteBucket", args, nil); err != nil { + return errors.WithStack(err) + } + + return nil +} + +// ListBuckets implements storage.BlobStore. +func (s *BlobStore) ListBuckets(ctx context.Context) ([]string, error) { + args := &blob.ListBucketsArgs{} + + reply := blob.ListBucketsReply{} + + if err := s.call(ctx, "Service.ListBuckets", args, &reply); err != nil { + return nil, errors.WithStack(err) + } + + return reply.Buckets, nil +} + +// OpenBucket implements storage.BlobStore. +func (s *BlobStore) OpenBucket(ctx context.Context, name string) (storage.BlobBucket, error) { + args := &blob.OpenBucketArgs{ + BucketName: name, + } + reply := &blob.OpenBucketReply{} + + if err := s.call(ctx, "Service.OpenBucket", args, reply); err != nil { + return nil, errors.WithStack(err) + } + + return &BlobBucket{ + name: name, + id: reply.BucketID, + call: s.call, + }, nil +} + +func (s *BlobStore) call(ctx context.Context, serviceMethod string, args any, reply any) error { + err := s.withClient(ctx, func(ctx context.Context, client *rpc.Client) error { + if err := client.Call(ctx, serviceMethod, args, reply); err != nil { + return errors.WithStack(err) + } + + return nil + }) + if err != nil { + return errors.WithStack(err) + } + + return nil +} + +func (s *BlobStore) withClient(ctx context.Context, fn func(ctx context.Context, client *rpc.Client) error) error { + client, err := rpc.DialHTTPPath("tcp", s.serverURL.Host, s.serverURL.Path+"?"+s.serverURL.RawQuery) + if err != nil { + return errors.WithStack(err) + } + + defer func() { + if err := client.Close(); err != nil { + logger.Error(ctx, "could not close rpc client", logger.E(errors.WithStack(err))) + } + }() + + if err := fn(ctx, client); err != nil { + return errors.WithStack(err) + } + + return nil +} + +func NewBlobStore(serverURL *url.URL) *BlobStore { + return &BlobStore{serverURL} +} + +var _ storage.BlobStore = &BlobStore{} diff --git a/pkg/storage/rpc/client/blob_store_test.go b/pkg/storage/rpc/client/blob_store_test.go new file mode 100644 index 0000000..76c208f --- /dev/null +++ b/pkg/storage/rpc/client/blob_store_test.go @@ -0,0 +1,87 @@ +package client + +import ( + "context" + "fmt" + "net/http/httptest" + "net/url" + "os" + "testing" + "time" + + "forge.cadoles.com/arcad/edge/pkg/storage/rpc/server" + "forge.cadoles.com/arcad/edge/pkg/storage/sqlite" + "forge.cadoles.com/arcad/edge/pkg/storage/testsuite" + "github.com/pkg/errors" + "gitlab.com/wpetit/goweb/logger" +) + +func TestBlobStore(t *testing.T) { + t.Parallel() + if testing.Verbose() { + logger.SetLevel(logger.LevelDebug) + } + + httpServer, err := startNewBlobStoreServer() + if err != nil { + t.Fatalf("%+v", errors.WithStack(err)) + } + + defer httpServer.Close() + + serverAddr := httpServer.Listener.Addr() + serverURL := &url.URL{ + Host: serverAddr.String(), + } + + store := NewBlobStore(serverURL) + + testsuite.TestBlobStore(context.Background(), t, store) +} + +func BenchmarkBlobStore(t *testing.B) { + logger.SetLevel(logger.LevelError) + + httpServer, err := startNewBlobStoreServer() + if err != nil { + t.Fatalf("%+v", errors.WithStack(err)) + } + + defer httpServer.Close() + + serverAddr := httpServer.Listener.Addr() + serverURL := &url.URL{ + Host: serverAddr.String(), + } + + store := NewBlobStore(serverURL) + + testsuite.BenchmarkBlobStore(t, store) + +} + +func getSQLiteBlobStore() (*sqlite.BlobStore, error) { + file := "./testdata/blobstore_test.sqlite" + + if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) { + return nil, errors.WithStack(err) + } + + dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds()) + store := sqlite.NewBlobStore(dsn) + + return store, nil +} + +func startNewBlobStoreServer() (*httptest.Server, error) { + store, err := getSQLiteBlobStore() + if err != nil { + return nil, errors.WithStack(err) + } + + server := server.NewBlobStoreServer(store) + + httpServer := httptest.NewServer(server) + + return httpServer, nil +} diff --git a/pkg/storage/rpc/client/document_store.go b/pkg/storage/rpc/client/document_store.go new file mode 100644 index 0000000..2063af4 --- /dev/null +++ b/pkg/storage/rpc/client/document_store.go @@ -0,0 +1,134 @@ +package client + +import ( + "context" + "net/url" + + "github.com/keegancsmith/rpc" + "github.com/pkg/errors" + "gitlab.com/wpetit/goweb/logger" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "forge.cadoles.com/arcad/edge/pkg/storage/filter" + "forge.cadoles.com/arcad/edge/pkg/storage/rpc/server/document" +) + +type DocumentStore struct { + serverURL *url.URL +} + +// Delete implements storage.DocumentStore. +func (s *DocumentStore) Delete(ctx context.Context, collection string, id storage.DocumentID) error { + args := document.DeleteDocumentArgs{ + Collection: collection, + DocumentID: id, + } + + reply := document.DeleteDocumentReply{} + + if err := s.call(ctx, "Service.DeleteDocument", args, &reply); err != nil { + return errors.WithStack(err) + } + + return nil +} + +// Get implements storage.DocumentStore. +func (s *DocumentStore) Get(ctx context.Context, collection string, id storage.DocumentID) (storage.Document, error) { + args := document.GetDocumentArgs{ + Collection: collection, + DocumentID: id, + } + + reply := document.GetDocumentReply{} + + if err := s.call(ctx, "Service.GetDocument", args, &reply); err != nil { + return nil, errors.WithStack(err) + } + + return reply.Document, nil +} + +// Query implements storage.DocumentStore. +func (s *DocumentStore) Query(ctx context.Context, collection string, filter *filter.Filter, funcs ...storage.QueryOptionFunc) ([]storage.Document, error) { + opts := &storage.QueryOptions{} + for _, fn := range funcs { + fn(opts) + } + + args := document.QueryDocumentsArgs{ + Collection: collection, + Filter: nil, + Options: opts, + } + + if filter != nil { + args.Filter = filter.AsMap() + } + + reply := document.QueryDocumentsReply{ + Documents: []storage.Document{}, + } + + if err := s.call(ctx, "Service.QueryDocuments", args, &reply); err != nil { + return nil, errors.WithStack(err) + } + + return reply.Documents, nil +} + +// Upsert implements storage.DocumentStore. +func (s *DocumentStore) Upsert(ctx context.Context, collection string, doc storage.Document) (storage.Document, error) { + args := document.UpsertDocumentArgs{ + Collection: collection, + Document: doc, + } + + reply := document.UpsertDocumentReply{} + + if err := s.call(ctx, "Service.UpsertDocument", args, &reply); err != nil { + return nil, errors.WithStack(err) + } + + return reply.Document, nil +} + +func (s *DocumentStore) call(ctx context.Context, serviceMethod string, args any, reply any) error { + err := s.withClient(ctx, func(ctx context.Context, client *rpc.Client) error { + if err := client.Call(ctx, serviceMethod, args, reply); err != nil { + return errors.WithStack(err) + } + + return nil + }) + if err != nil { + return errors.WithStack(err) + } + + return nil +} + +func (s *DocumentStore) withClient(ctx context.Context, fn func(ctx context.Context, client *rpc.Client) error) error { + client, err := rpc.DialHTTPPath("tcp", s.serverURL.Host, s.serverURL.Path+"?"+s.serverURL.RawQuery) + if err != nil { + return errors.WithStack(err) + } + + defer func() { + if err := client.Close(); err != nil { + logger.Error(ctx, "could not close rpc client", logger.E(errors.WithStack(err))) + } + }() + + if err := fn(ctx, client); err != nil { + return errors.WithStack(err) + } + + return nil +} + +func NewDocumentStore(url *url.URL) *DocumentStore { + return &DocumentStore{url} +} + +var _ storage.DocumentStore = &DocumentStore{} diff --git a/pkg/storage/rpc/client/document_store_test.go b/pkg/storage/rpc/client/document_store_test.go new file mode 100644 index 0000000..89914e6 --- /dev/null +++ b/pkg/storage/rpc/client/document_store_test.go @@ -0,0 +1,67 @@ +package client + +import ( + "context" + "fmt" + "net/http/httptest" + "net/url" + "os" + "testing" + "time" + + "forge.cadoles.com/arcad/edge/pkg/storage/rpc/server" + "forge.cadoles.com/arcad/edge/pkg/storage/sqlite" + "forge.cadoles.com/arcad/edge/pkg/storage/testsuite" + "github.com/pkg/errors" + "gitlab.com/wpetit/goweb/logger" +) + +func TestDocumentStore(t *testing.T) { + t.Parallel() + if testing.Verbose() { + logger.SetLevel(logger.LevelDebug) + } + + httpServer, err := startNewDocumentStoreServer() + if err != nil { + t.Fatalf("%+v", errors.WithStack(err)) + } + + defer httpServer.Close() + + serverAddr := httpServer.Listener.Addr() + + serverURL := &url.URL{ + Host: serverAddr.String(), + } + + store := NewDocumentStore(serverURL) + + testsuite.TestDocumentStore(context.Background(), t, store) +} + +func getSQLiteDocumentStore() (*sqlite.DocumentStore, error) { + file := "./testdata/documentstore_test.sqlite" + + if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) { + return nil, errors.WithStack(err) + } + + dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds()) + store := sqlite.NewDocumentStore(dsn) + + return store, nil +} + +func startNewDocumentStoreServer() (*httptest.Server, error) { + store, err := getSQLiteDocumentStore() + if err != nil { + return nil, errors.WithStack(err) + } + + server := server.NewDocumentStoreServer(store) + + httpServer := httptest.NewServer(server) + + return httpServer, nil +} diff --git a/pkg/storage/rpc/client/init.go b/pkg/storage/rpc/client/init.go new file mode 100644 index 0000000..f9a9cb6 --- /dev/null +++ b/pkg/storage/rpc/client/init.go @@ -0,0 +1,9 @@ +package client + +import ( + "context" + + _ "forge.cadoles.com/arcad/edge/pkg/storage/rpc/gob" +) + +type CallFunc func(ctx context.Context, serviceMethod string, args any, reply any) error diff --git a/pkg/storage/rpc/client/testdata/.gitignore b/pkg/storage/rpc/client/testdata/.gitignore new file mode 100644 index 0000000..2a2bc40 --- /dev/null +++ b/pkg/storage/rpc/client/testdata/.gitignore @@ -0,0 +1 @@ +/*.sqlite* \ No newline at end of file diff --git a/pkg/storage/rpc/gob/blob_info.go b/pkg/storage/rpc/gob/blob_info.go new file mode 100644 index 0000000..a0c2e33 --- /dev/null +++ b/pkg/storage/rpc/gob/blob_info.go @@ -0,0 +1,42 @@ +package gob + +import ( + "time" + + "forge.cadoles.com/arcad/edge/pkg/storage" +) + +type BlobInfo struct { + Bucket_ string + ContentType_ string + BlobID_ storage.BlobID + ModTime_ time.Time + Size_ int64 +} + +// Bucket implements storage.BlobInfo. +func (bi *BlobInfo) Bucket() string { + return bi.Bucket_ +} + +// ContentType implements storage.BlobInfo. +func (bi *BlobInfo) ContentType() string { + return bi.ContentType_ +} + +// ID implements storage.BlobInfo. +func (bi *BlobInfo) ID() storage.BlobID { + return bi.BlobID_ +} + +// ModTime implements storage.BlobInfo. +func (bi *BlobInfo) ModTime() time.Time { + return bi.ModTime_ +} + +// Size implements storage.BlobInfo. +func (bi *BlobInfo) Size() int64 { + return bi.Size_ +} + +var _ storage.BlobInfo = &BlobInfo{} diff --git a/pkg/storage/rpc/gob/init.go b/pkg/storage/rpc/gob/init.go new file mode 100644 index 0000000..1666eeb --- /dev/null +++ b/pkg/storage/rpc/gob/init.go @@ -0,0 +1,18 @@ +package gob + +import ( + "encoding/gob" + "time" + + "forge.cadoles.com/arcad/edge/pkg/storage" +) + +func init() { + gob.Register(storage.Document{}) + gob.Register(storage.DocumentID("")) + gob.Register(time.Time{}) + gob.Register(map[string]interface{}{}) + gob.Register([]interface{}{}) + gob.Register([]map[string]interface{}{}) + gob.Register(&BlobInfo{}) +} diff --git a/pkg/storage/rpc/registry/registry.go b/pkg/storage/rpc/registry/registry.go new file mode 100644 index 0000000..a54f02c --- /dev/null +++ b/pkg/storage/rpc/registry/registry.go @@ -0,0 +1,22 @@ +package registry + +import ( + "net/url" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "forge.cadoles.com/arcad/edge/pkg/storage/registry" + "forge.cadoles.com/arcad/edge/pkg/storage/rpc/client" +) + +func init() { + registry.AddDocumentStoreFactory("rpc", documentStoreFactory) + registry.AddBlobStoreFactory("rpc", blobStoreFactory) +} + +func documentStoreFactory(url *url.URL) (storage.DocumentStore, error) { + return client.NewDocumentStore(url), nil +} + +func blobStoreFactory(url *url.URL) (storage.BlobStore, error) { + return client.NewBlobStore(url), nil +} diff --git a/pkg/storage/rpc/server/blob/close_bucket.go b/pkg/storage/rpc/server/blob/close_bucket.go new file mode 100644 index 0000000..64c403e --- /dev/null +++ b/pkg/storage/rpc/server/blob/close_bucket.go @@ -0,0 +1,31 @@ +package blob + +import ( + "context" + + "github.com/pkg/errors" +) + +type CloseBucketArgs struct { + BucketID BucketID +} + +type CloseBucketReply struct { +} + +func (s *Service) CloseBucket(ctx context.Context, args *CloseBucketArgs, reply *CloseBucketReply) error { + bucket, err := s.getOpenedBucket(args.BucketID) + if err != nil { + return errors.WithStack(err) + } + + if err := bucket.Close(); err != nil { + return errors.WithStack(err) + } + + s.buckets.Delete(args.BucketID) + + *reply = CloseBucketReply{} + + return nil +} diff --git a/pkg/storage/rpc/server/blob/close_reader.go b/pkg/storage/rpc/server/blob/close_reader.go new file mode 100644 index 0000000..8f98134 --- /dev/null +++ b/pkg/storage/rpc/server/blob/close_reader.go @@ -0,0 +1,31 @@ +package blob + +import ( + "context" + + "github.com/pkg/errors" +) + +type CloseReaderArgs struct { + ReaderID ReaderID +} + +type CloseReaderReply struct { +} + +func (s *Service) CloseReader(ctx context.Context, args *CloseReaderArgs, reply *CloseReaderReply) error { + reader, err := s.getOpenedReader(args.ReaderID) + if err != nil { + return errors.WithStack(err) + } + + if err := reader.Close(); err != nil { + return errors.WithStack(err) + } + + s.readers.Delete(args.ReaderID) + + *reply = CloseReaderReply{} + + return nil +} diff --git a/pkg/storage/rpc/server/blob/close_writer.go b/pkg/storage/rpc/server/blob/close_writer.go new file mode 100644 index 0000000..8fe6ab1 --- /dev/null +++ b/pkg/storage/rpc/server/blob/close_writer.go @@ -0,0 +1,31 @@ +package blob + +import ( + "context" + + "github.com/pkg/errors" +) + +type CloseWriterArgs struct { + WriterID WriterID +} + +type CloseWriterReply struct { +} + +func (s *Service) CloseWriter(ctx context.Context, args *CloseWriterArgs, reply *CloseWriterReply) error { + writer, err := s.getOpenedWriter(args.WriterID) + if err != nil { + return errors.WithStack(err) + } + + if err := writer.Close(); err != nil { + return errors.WithStack(err) + } + + s.writers.Delete(args.WriterID) + + *reply = CloseWriterReply{} + + return nil +} diff --git a/pkg/storage/rpc/server/blob/delete_bucket.go b/pkg/storage/rpc/server/blob/delete_bucket.go new file mode 100644 index 0000000..751d97a --- /dev/null +++ b/pkg/storage/rpc/server/blob/delete_bucket.go @@ -0,0 +1,22 @@ +package blob + +import ( + "context" + + "github.com/pkg/errors" +) + +type DeleteBucketArgs struct { + BucketName string +} + +type DeleteBucketReply struct { +} + +func (s *Service) DeleteBucket(ctx context.Context, args *DeleteBucketArgs, reply *DeleteBucketReply) error { + if err := s.store.DeleteBucket(ctx, args.BucketName); err != nil { + return errors.WithStack(err) + } + + return nil +} diff --git a/pkg/storage/rpc/server/blob/get_blob_info.go b/pkg/storage/rpc/server/blob/get_blob_info.go new file mode 100644 index 0000000..cbbaf9a --- /dev/null +++ b/pkg/storage/rpc/server/blob/get_blob_info.go @@ -0,0 +1,42 @@ +package blob + +import ( + "context" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "forge.cadoles.com/arcad/edge/pkg/storage/rpc/gob" + "github.com/pkg/errors" +) + +type GetBlobInfoArgs struct { + BlobID storage.BlobID + BucketID BucketID +} + +type GetBlobInfoReply struct { + BlobInfo storage.BlobInfo +} + +func (s *Service) GetBlobInfo(ctx context.Context, args *GetBlobInfoArgs, reply *GetBlobInfoReply) error { + bucket, err := s.getOpenedBucket(args.BucketID) + if err != nil { + return errors.WithStack(err) + } + + blobInfo, err := bucket.Get(ctx, args.BlobID) + if err != nil { + return errors.WithStack(err) + } + + *reply = GetBlobInfoReply{ + BlobInfo: &gob.BlobInfo{ + Bucket_: blobInfo.Bucket(), + ContentType_: blobInfo.ContentType(), + BlobID_: blobInfo.ID(), + ModTime_: blobInfo.ModTime(), + Size_: blobInfo.Size(), + }, + } + + return nil +} diff --git a/pkg/storage/rpc/server/blob/get_bucket_size.go b/pkg/storage/rpc/server/blob/get_bucket_size.go new file mode 100644 index 0000000..5c02733 --- /dev/null +++ b/pkg/storage/rpc/server/blob/get_bucket_size.go @@ -0,0 +1,33 @@ +package blob + +import ( + "context" + + "github.com/pkg/errors" +) + +type GetBucketSizeArgs struct { + BucketID BucketID +} + +type GetBucketSizeReply struct { + Size int64 +} + +func (s *Service) GetBucketSize(ctx context.Context, args *GetBucketSizeArgs, reply *GetBucketSizeReply) error { + bucket, err := s.getOpenedBucket(args.BucketID) + if err != nil { + return errors.WithStack(err) + } + + size, err := bucket.Size(ctx) + if err != nil { + return errors.WithStack(err) + } + + *reply = GetBucketSizeReply{ + Size: size, + } + + return nil +} diff --git a/pkg/storage/rpc/server/blob/list_blob_info.go b/pkg/storage/rpc/server/blob/list_blob_info.go new file mode 100644 index 0000000..6614b0c --- /dev/null +++ b/pkg/storage/rpc/server/blob/list_blob_info.go @@ -0,0 +1,34 @@ +package blob + +import ( + "context" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "github.com/pkg/errors" +) + +type ListBlobInfoArgs struct { + BucketID BucketID +} + +type ListBlobInfoReply struct { + BlobInfos []storage.BlobInfo +} + +func (s *Service) ListBlobInfo(ctx context.Context, args *ListBlobInfoArgs, reply *ListBlobInfoReply) error { + bucket, err := s.getOpenedBucket(args.BucketID) + if err != nil { + return errors.WithStack(err) + } + + blobInfos, err := bucket.List(ctx) + if err != nil { + return errors.WithStack(err) + } + + *reply = ListBlobInfoReply{ + BlobInfos: blobInfos, + } + + return nil +} diff --git a/pkg/storage/rpc/server/blob/list_buckets.go b/pkg/storage/rpc/server/blob/list_buckets.go new file mode 100644 index 0000000..52c8a10 --- /dev/null +++ b/pkg/storage/rpc/server/blob/list_buckets.go @@ -0,0 +1,27 @@ +package blob + +import ( + "context" + + "github.com/pkg/errors" +) + +type ListBucketsArgs struct { +} + +type ListBucketsReply struct { + Buckets []string +} + +func (s *Service) ListBuckets(ctx context.Context, args *ListBucketsArgs, reply *ListBucketsReply) error { + buckets, err := s.store.ListBuckets(ctx) + if err != nil { + return errors.WithStack(err) + } + + *reply = ListBucketsReply{ + Buckets: buckets, + } + + return nil +} diff --git a/pkg/storage/rpc/server/blob/new_blob_reader.go b/pkg/storage/rpc/server/blob/new_blob_reader.go new file mode 100644 index 0000000..b1d1286 --- /dev/null +++ b/pkg/storage/rpc/server/blob/new_blob_reader.go @@ -0,0 +1,57 @@ +package blob + +import ( + "context" + "io" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "github.com/pkg/errors" +) + +type NewBlobReaderArgs struct { + BlobID storage.BlobID + BucketID BucketID +} + +type NewBlobReaderReply struct { + ReaderID ReaderID +} + +func (s *Service) NewBlobReader(ctx context.Context, args *NewBlobReaderArgs, reply *NewBlobReaderReply) error { + bucket, err := s.getOpenedBucket(args.BucketID) + if err != nil { + return errors.WithStack(err) + } + + readerID, err := NewReaderID() + if err != nil { + return errors.WithStack(err) + } + + reader, err := bucket.NewReader(ctx, args.BlobID) + if err != nil { + return errors.WithStack(err) + } + + s.readers.Store(readerID, reader) + + *reply = NewBlobReaderReply{ + ReaderID: readerID, + } + + return nil +} + +func (s *Service) getOpenedReader(id ReaderID) (io.ReadSeekCloser, error) { + raw, exists := s.readers.Load(id) + if !exists { + return nil, errors.Errorf("could not find writer '%s'", id) + } + + reader, ok := raw.(io.ReadSeekCloser) + if !ok { + return nil, errors.Errorf("unexpected type '%T' for writer", raw) + } + + return reader, nil +} diff --git a/pkg/storage/rpc/server/blob/new_blob_writer.go b/pkg/storage/rpc/server/blob/new_blob_writer.go new file mode 100644 index 0000000..de25439 --- /dev/null +++ b/pkg/storage/rpc/server/blob/new_blob_writer.go @@ -0,0 +1,57 @@ +package blob + +import ( + "context" + "io" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "github.com/pkg/errors" +) + +type NewBlobWriterArgs struct { + BlobID storage.BlobID + BucketID BucketID +} + +type NewBlobWriterReply struct { + WriterID WriterID +} + +func (s *Service) NewBlobWriter(ctx context.Context, args *NewBlobWriterArgs, reply *NewBlobWriterReply) error { + bucket, err := s.getOpenedBucket(args.BucketID) + if err != nil { + return errors.WithStack(err) + } + + writerID, err := NewWriterID() + if err != nil { + return errors.WithStack(err) + } + + writer, err := bucket.NewWriter(ctx, args.BlobID) + if err != nil { + return errors.WithStack(err) + } + + s.writers.Store(writerID, writer) + + *reply = NewBlobWriterReply{ + WriterID: writerID, + } + + return nil +} + +func (s *Service) getOpenedWriter(id WriterID) (io.WriteCloser, error) { + raw, exists := s.writers.Load(id) + if !exists { + return nil, errors.Errorf("could not find writer '%s'", id) + } + + writer, ok := raw.(io.WriteCloser) + if !ok { + return nil, errors.Errorf("unexpected type '%T' for writer", raw) + } + + return writer, nil +} diff --git a/pkg/storage/rpc/server/blob/open_bucket.go b/pkg/storage/rpc/server/blob/open_bucket.go new file mode 100644 index 0000000..bf5ce28 --- /dev/null +++ b/pkg/storage/rpc/server/blob/open_bucket.go @@ -0,0 +1,50 @@ +package blob + +import ( + "context" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "github.com/pkg/errors" +) + +type OpenBucketArgs struct { + BucketName string +} + +type OpenBucketReply struct { + BucketID BucketID +} + +func (s *Service) OpenBucket(ctx context.Context, args *OpenBucketArgs, reply *OpenBucketReply) error { + bucket, err := s.store.OpenBucket(ctx, args.BucketName) + if err != nil { + return errors.WithStack(err) + } + + bucketID, err := NewBucketID() + if err != nil { + return errors.WithStack(err) + } + + s.buckets.Store(bucketID, bucket) + + *reply = OpenBucketReply{ + BucketID: bucketID, + } + + return nil +} + +func (s *Service) getOpenedBucket(id BucketID) (storage.BlobBucket, error) { + raw, exists := s.buckets.Load(id) + if !exists { + return nil, errors.WithStack(storage.ErrBucketClosed) + } + + bucket, ok := raw.(storage.BlobBucket) + if !ok { + return nil, errors.Errorf("unexpected type '%T' for blob bucket", raw) + } + + return bucket, nil +} diff --git a/pkg/storage/rpc/server/blob/read_blob.go b/pkg/storage/rpc/server/blob/read_blob.go new file mode 100644 index 0000000..7a12bc1 --- /dev/null +++ b/pkg/storage/rpc/server/blob/read_blob.go @@ -0,0 +1,41 @@ +package blob + +import ( + "context" + "io" + + "github.com/pkg/errors" +) + +type ReadBlobArgs struct { + ReaderID ReaderID + Length int +} + +type ReadBlobReply struct { + Data []byte + Read int + EOF bool +} + +func (s *Service) ReadBlob(ctx context.Context, args *ReadBlobArgs, reply *ReadBlobReply) error { + reader, err := s.getOpenedReader(args.ReaderID) + if err != nil { + return errors.WithStack(err) + } + + buff := make([]byte, args.Length) + + read, err := reader.Read(buff) + if err != nil && !errors.Is(err, io.EOF) { + return errors.WithStack(err) + } + + *reply = ReadBlobReply{ + Read: read, + Data: buff, + EOF: errors.Is(err, io.EOF), + } + + return nil +} diff --git a/pkg/storage/rpc/server/blob/seek_blob.go b/pkg/storage/rpc/server/blob/seek_blob.go new file mode 100644 index 0000000..902abc2 --- /dev/null +++ b/pkg/storage/rpc/server/blob/seek_blob.go @@ -0,0 +1,38 @@ +package blob + +import ( + "context" + "io" + + "github.com/pkg/errors" +) + +type SeekBlobArgs struct { + ReaderID ReaderID + Offset int64 + Whence int +} + +type SeekBlobReply struct { + Read int64 + EOF bool +} + +func (s *Service) SeekBlob(ctx context.Context, args *SeekBlobArgs, reply *SeekBlobReply) error { + reader, err := s.getOpenedReader(args.ReaderID) + if err != nil { + return errors.WithStack(err) + } + + read, err := reader.Seek(args.Offset, args.Whence) + if err != nil && !errors.Is(err, io.EOF) { + return errors.WithStack(err) + } + + *reply = SeekBlobReply{ + Read: read, + EOF: errors.Is(err, io.EOF), + } + + return nil +} diff --git a/pkg/storage/rpc/server/blob/service.go b/pkg/storage/rpc/server/blob/service.go new file mode 100644 index 0000000..3ab7689 --- /dev/null +++ b/pkg/storage/rpc/server/blob/service.go @@ -0,0 +1,60 @@ +package blob + +import ( + "fmt" + "sync" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "github.com/google/uuid" + "github.com/pkg/errors" +) + +type BucketID string +type WriterID string +type ReaderID string + +type Service struct { + store storage.BlobStore + buckets sync.Map + writers sync.Map + readers sync.Map +} + +func NewService(store storage.BlobStore) *Service { + return &Service{ + store: store, + } +} + +func NewBucketID() (BucketID, error) { + uuid, err := uuid.NewUUID() + if err != nil { + return "", errors.WithStack(err) + } + + id := BucketID(fmt.Sprintf("bucket-%s", uuid.String())) + + return id, nil +} + +func NewWriterID() (WriterID, error) { + uuid, err := uuid.NewUUID() + if err != nil { + return "", errors.WithStack(err) + } + + id := WriterID(fmt.Sprintf("writer-%s", uuid.String())) + + return id, nil +} + +func NewReaderID() (ReaderID, error) { + uuid, err := uuid.NewUUID() + if err != nil { + return "", errors.WithStack(err) + } + + id := ReaderID(fmt.Sprintf("reader-%s", uuid.String())) + + return id, nil +} diff --git a/pkg/storage/rpc/server/blob/write_blob.go b/pkg/storage/rpc/server/blob/write_blob.go new file mode 100644 index 0000000..a3651a8 --- /dev/null +++ b/pkg/storage/rpc/server/blob/write_blob.go @@ -0,0 +1,34 @@ +package blob + +import ( + "context" + + "github.com/pkg/errors" +) + +type WriteBlobArgs struct { + WriterID WriterID + Data []byte +} + +type WriteBlobReply struct { + Written int +} + +func (s *Service) WriteBlob(ctx context.Context, args *WriteBlobArgs, reply *WriteBlobReply) error { + writer, err := s.getOpenedWriter(args.WriterID) + if err != nil { + return errors.WithStack(err) + } + + written, err := writer.Write(args.Data) + if err != nil { + return errors.WithStack(err) + } + + *reply = WriteBlobReply{ + Written: written, + } + + return nil +} diff --git a/pkg/storage/rpc/server/document/delete_document.go b/pkg/storage/rpc/server/document/delete_document.go new file mode 100644 index 0000000..a351b71 --- /dev/null +++ b/pkg/storage/rpc/server/document/delete_document.go @@ -0,0 +1,26 @@ +package document + +import ( + "context" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "github.com/pkg/errors" +) + +type DeleteDocumentArgs struct { + Collection string + DocumentID storage.DocumentID +} + +type DeleteDocumentReply struct { +} + +func (s *Service) DeleteDocument(ctx context.Context, args DeleteDocumentArgs, reply *DeleteDocumentReply) error { + if err := s.store.Delete(ctx, args.Collection, args.DocumentID); err != nil { + return errors.WithStack(err) + } + + *reply = DeleteDocumentReply{} + + return nil +} diff --git a/pkg/storage/rpc/server/document/get_document.go b/pkg/storage/rpc/server/document/get_document.go new file mode 100644 index 0000000..d08abe4 --- /dev/null +++ b/pkg/storage/rpc/server/document/get_document.go @@ -0,0 +1,30 @@ +package document + +import ( + "context" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "github.com/pkg/errors" +) + +type GetDocumentArgs struct { + Collection string + DocumentID storage.DocumentID +} + +type GetDocumentReply struct { + Document storage.Document +} + +func (s *Service) GetDocument(ctx context.Context, args GetDocumentArgs, reply *GetDocumentReply) error { + document, err := s.store.Get(ctx, args.Collection, args.DocumentID) + if err != nil { + return errors.WithStack(err) + } + + *reply = GetDocumentReply{ + Document: document, + } + + return nil +} diff --git a/pkg/storage/rpc/server/document/query_documents.go b/pkg/storage/rpc/server/document/query_documents.go new file mode 100644 index 0000000..f7d8e28 --- /dev/null +++ b/pkg/storage/rpc/server/document/query_documents.go @@ -0,0 +1,53 @@ +package document + +import ( + "context" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "forge.cadoles.com/arcad/edge/pkg/storage/filter" + "github.com/pkg/errors" +) + +type QueryDocumentsArgs struct { + Collection string + Filter map[string]any + Options *storage.QueryOptions +} + +type QueryDocumentsReply struct { + Documents []storage.Document +} + +func (s *Service) QueryDocuments(ctx context.Context, args QueryDocumentsArgs, reply *QueryDocumentsReply) error { + var ( + argsFilter *filter.Filter + err error + ) + + if args.Filter != nil { + argsFilter, err = filter.NewFrom(args.Filter) + if err != nil { + return errors.WithStack(err) + } + } + + documents, err := s.store.Query(ctx, args.Collection, argsFilter, withQueryOptions(args.Options)) + if err != nil { + return errors.WithStack(err) + } + + *reply = QueryDocumentsReply{ + Documents: documents, + } + + return nil +} + +func withQueryOptions(opts *storage.QueryOptions) storage.QueryOptionFunc { + return func(o *storage.QueryOptions) { + o.Limit = opts.Limit + o.Offset = opts.Offset + o.OrderBy = opts.OrderBy + o.OrderDirection = opts.OrderDirection + } +} diff --git a/pkg/storage/rpc/server/document/service.go b/pkg/storage/rpc/server/document/service.go new file mode 100644 index 0000000..fc908a0 --- /dev/null +++ b/pkg/storage/rpc/server/document/service.go @@ -0,0 +1,11 @@ +package document + +import "forge.cadoles.com/arcad/edge/pkg/storage" + +type Service struct { + store storage.DocumentStore +} + +func NewService(store storage.DocumentStore) *Service { + return &Service{store} +} diff --git a/pkg/storage/rpc/server/document/upsert_document.go b/pkg/storage/rpc/server/document/upsert_document.go new file mode 100644 index 0000000..d1bcd5a --- /dev/null +++ b/pkg/storage/rpc/server/document/upsert_document.go @@ -0,0 +1,30 @@ +package document + +import ( + "context" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "github.com/pkg/errors" +) + +type UpsertDocumentArgs struct { + Collection string + Document storage.Document +} + +type UpsertDocumentReply struct { + Document storage.Document +} + +func (s *Service) UpsertDocument(ctx context.Context, args UpsertDocumentArgs, reply *UpsertDocumentReply) error { + document, err := s.store.Upsert(ctx, args.Collection, args.Document) + if err != nil { + return errors.WithStack(err) + } + + *reply = UpsertDocumentReply{ + Document: document, + } + + return nil +} diff --git a/pkg/storage/rpc/server/init.go b/pkg/storage/rpc/server/init.go new file mode 100644 index 0000000..b101d8f --- /dev/null +++ b/pkg/storage/rpc/server/init.go @@ -0,0 +1,5 @@ +package server + +import ( + _ "forge.cadoles.com/arcad/edge/pkg/storage/rpc/gob" +) diff --git a/pkg/storage/rpc/server/server.go b/pkg/storage/rpc/server/server.go new file mode 100644 index 0000000..76a8003 --- /dev/null +++ b/pkg/storage/rpc/server/server.go @@ -0,0 +1,21 @@ +package server + +import ( + "github.com/keegancsmith/rpc" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "forge.cadoles.com/arcad/edge/pkg/storage/rpc/server/blob" + "forge.cadoles.com/arcad/edge/pkg/storage/rpc/server/document" +) + +func NewBlobStoreServer(store storage.BlobStore) *rpc.Server { + server := rpc.NewServer() + server.Register(blob.NewService(store)) + return server +} + +func NewDocumentStoreServer(store storage.DocumentStore) *rpc.Server { + server := rpc.NewServer() + server.Register(document.NewService(store)) + return server +} diff --git a/pkg/storage/sqlite/blob_store_test.go b/pkg/storage/sqlite/blob_store_test.go index 7fabd7c..f3812f4 100644 --- a/pkg/storage/sqlite/blob_store_test.go +++ b/pkg/storage/sqlite/blob_store_test.go @@ -1,6 +1,7 @@ package sqlite import ( + "context" "fmt" "os" "testing" @@ -13,7 +14,9 @@ import ( func TestBlobStore(t *testing.T) { t.Parallel() - logger.SetLevel(logger.LevelDebug) + if testing.Verbose() { + logger.SetLevel(logger.LevelDebug) + } file := "./testdata/blobstore_test.sqlite" @@ -24,5 +27,20 @@ func TestBlobStore(t *testing.T) { dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds()) store := NewBlobStore(dsn) - testsuite.TestBlobStore(t, store) + testsuite.TestBlobStore(context.Background(), t, store) +} + +func BenchmarkBlobStore(t *testing.B) { + logger.SetLevel(logger.LevelError) + + file := "./testdata/blobstore_test.sqlite" + + if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) { + t.Fatalf("%+v", errors.WithStack(err)) + } + + dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds()) + store := NewBlobStore(dsn) + + testsuite.BenchmarkBlobStore(t, store) } diff --git a/pkg/storage/sqlite/document_store_test.go b/pkg/storage/sqlite/document_store_test.go index a4a24de..1833b43 100644 --- a/pkg/storage/sqlite/document_store_test.go +++ b/pkg/storage/sqlite/document_store_test.go @@ -1,6 +1,7 @@ package sqlite import ( + "context" "fmt" "os" "testing" @@ -24,5 +25,5 @@ func TestDocumentStore(t *testing.T) { dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds()) store := NewDocumentStore(dsn) - testsuite.TestDocumentStore(t, store) + testsuite.TestDocumentStore(context.Background(), t, store) } diff --git a/pkg/storage/sqlite/registry/registry.go b/pkg/storage/sqlite/registry/registry.go new file mode 100644 index 0000000..0de74da --- /dev/null +++ b/pkg/storage/sqlite/registry/registry.go @@ -0,0 +1,51 @@ +package registry + +import ( + "net/url" + "os" + "path/filepath" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "forge.cadoles.com/arcad/edge/pkg/storage/registry" + "forge.cadoles.com/arcad/edge/pkg/storage/sqlite" + "github.com/pkg/errors" +) + +func init() { + registry.AddDocumentStoreFactory("sqlite", documentStoreFactory) + registry.AddBlobStoreFactory("sqlite", blobStoreFactory) +} + +func documentStoreFactory(url *url.URL) (storage.DocumentStore, error) { + dir := filepath.Dir(url.Host + url.Path) + + if err := os.MkdirAll(dir, os.FileMode(0750)); err != nil { + return nil, errors.WithStack(err) + } + + path := url.Host + url.Path + "?" + url.RawQuery + + db, err := sqlite.Open(path) + if err != nil { + return nil, errors.WithStack(err) + } + + return sqlite.NewDocumentStoreWithDB(db), nil +} + +func blobStoreFactory(url *url.URL) (storage.BlobStore, error) { + dir := filepath.Dir(url.Host + url.Path) + + if err := os.MkdirAll(dir, os.FileMode(0750)); err != nil { + return nil, errors.WithStack(err) + } + + path := url.Host + url.Path + "?" + url.RawQuery + + db, err := sqlite.Open(path) + if err != nil { + return nil, errors.WithStack(err) + } + + return sqlite.NewBlobStoreWithDB(db), nil +} diff --git a/pkg/storage/testsuite/blob_store.go b/pkg/storage/testsuite/blob_store.go index b73c086..26d23fd 100644 --- a/pkg/storage/testsuite/blob_store.go +++ b/pkg/storage/testsuite/blob_store.go @@ -1,14 +1,14 @@ package testsuite import ( + "context" "testing" "forge.cadoles.com/arcad/edge/pkg/storage" ) -func TestBlobStore(t *testing.T, store storage.BlobStore) { +func TestBlobStore(ctx context.Context, t *testing.T, store storage.BlobStore) { t.Run("Ops", func(t *testing.T) { - t.Parallel() - testBlobStoreOps(t, store) + testBlobStoreOps(ctx, t, store) }) } diff --git a/pkg/storage/testsuite/blob_store_benchmark.go b/pkg/storage/testsuite/blob_store_benchmark.go new file mode 100644 index 0000000..3dc1486 --- /dev/null +++ b/pkg/storage/testsuite/blob_store_benchmark.go @@ -0,0 +1,79 @@ +package testsuite + +import ( + "bytes" + "context" + "fmt" + "io" + "testing" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "github.com/pkg/errors" +) + +func BenchmarkBlobStore(t *testing.B, store storage.BlobStore) { + t.Run("BlobCreateUpdateReadDelete", func(t *testing.B) { + + for i := 0; i < t.N; i++ { + bucketName := fmt.Sprintf("bucket-%d", i) + if err := runBlobCreateUpdateReadDelete(store, bucketName); err != nil { + t.Fatalf("%+v", errors.WithStack(err)) + } + } + }) +} + +func runBlobCreateUpdateReadDelete(store storage.BlobStore, bucketName string) error { + ctx := context.Background() + + bucket, err := store.OpenBucket(ctx, bucketName) + if err != nil { + return errors.WithStack(err) + } + + blobID := storage.NewBlobID() + + writer, err := bucket.NewWriter(ctx, blobID) + if err != nil { + return errors.WithStack(err) + } + + data := []byte("foo") + + if _, err = writer.Write(data); err != nil { + return errors.WithStack(err) + } + + if err := writer.Close(); err != nil { + return errors.WithStack(err) + } + + reader, err := bucket.NewReader(ctx, blobID) + if err != nil { + return errors.WithStack(err) + } + + var buf bytes.Buffer + + if _, err = io.Copy(&buf, reader); err != nil { + return errors.WithStack(err) + } + + if err := reader.Close(); err != nil { + return errors.WithStack(err) + } + + if err := bucket.Delete(ctx, blobID); err != nil { + return errors.WithStack(err) + } + + if err := bucket.Close(); err != nil { + return errors.WithStack(err) + } + + if err := store.DeleteBucket(ctx, bucketName); err != nil { + return errors.WithStack(err) + } + + return nil +} diff --git a/pkg/storage/testsuite/blob_store_ops.go b/pkg/storage/testsuite/blob_store_ops.go index 6f97eb3..323fdf2 100644 --- a/pkg/storage/testsuite/blob_store_ops.go +++ b/pkg/storage/testsuite/blob_store_ops.go @@ -17,9 +17,11 @@ type blobStoreTestCase struct { var blobStoreTestCases = []blobStoreTestCase{ { - Name: "Open new bucket", + Name: "Open then delete bucket", Run: func(ctx context.Context, store storage.BlobStore) error { - bucket, err := store.OpenBucket(ctx, "open-new-bucket") + bucketName := "open-new-bucket" + + bucket, err := store.OpenBucket(ctx, bucketName) if err != nil { return errors.WithStack(err) } @@ -50,6 +52,23 @@ var blobStoreTestCases = []blobStoreTestCase{ return errors.WithStack(err) } + if err := store.DeleteBucket(ctx, bucketName); err != nil { + return errors.WithStack(err) + } + + buckets, err := store.ListBuckets(ctx) + if err != nil { + return errors.WithStack(err) + } + + for _, b := range buckets { + if b != bucketName { + continue + } + + return errors.Errorf("bucket '%s' should be deleted", bucketName) + } + return nil }, }, @@ -112,14 +131,12 @@ var blobStoreTestCases = []blobStoreTestCase{ }, } -func testBlobStoreOps(t *testing.T, store storage.BlobStore) { +func testBlobStoreOps(ctx context.Context, t *testing.T, store storage.BlobStore) { for _, tc := range blobStoreTestCases { func(tc blobStoreTestCase) { t.Run(tc.Name, func(t *testing.T) { t.Parallel() - ctx := context.Background() - if err := tc.Run(ctx, store); err != nil { t.Errorf("%+v", errors.WithStack(err)) } diff --git a/pkg/storage/testsuite/document_store.go b/pkg/storage/testsuite/document_store.go index cdbea14..8e46376 100644 --- a/pkg/storage/testsuite/document_store.go +++ b/pkg/storage/testsuite/document_store.go @@ -1,14 +1,14 @@ package testsuite import ( + "context" "testing" "forge.cadoles.com/arcad/edge/pkg/storage" ) -func TestDocumentStore(t *testing.T, store storage.DocumentStore) { +func TestDocumentStore(ctx context.Context, t *testing.T, store storage.DocumentStore) { t.Run("Ops", func(t *testing.T) { - t.Parallel() - testDocumentStoreOps(t, store) + testDocumentStoreOps(ctx, t, store) }) } diff --git a/pkg/storage/testsuite/document_store_ops.go b/pkg/storage/testsuite/document_store_ops.go index 7281fed..1de8e46 100644 --- a/pkg/storage/testsuite/document_store_ops.go +++ b/pkg/storage/testsuite/document_store_ops.go @@ -433,12 +433,12 @@ var documentStoreOpsTestCases = []documentStoreOpsTestCase{ }, } -func testDocumentStoreOps(t *testing.T, store storage.DocumentStore) { +func testDocumentStoreOps(ctx context.Context, t *testing.T, store storage.DocumentStore) { for _, tc := range documentStoreOpsTestCases { func(tc documentStoreOpsTestCase) { t.Run(tc.Name, func(t *testing.T) { t.Parallel() - if err := tc.Run(context.Background(), store); err != nil { + if err := tc.Run(ctx, store); err != nil { t.Errorf("%+v", errors.WithStack(err)) } })