diff --git a/.env.dist b/.env.dist index 660424e..6f09b89 100644 --- a/.env.dist +++ b/.env.dist @@ -1 +1,3 @@ -RUN_APP_ARGS="" \ No newline at end of file +RUN_APP_ARGS="" +#EDGE_DOCUMENTSTORE_DSN="rpc://localhost:3001/documentstore?tenant=local&appId=%APPID%" +#EDGE_BLOBSTORE_DSN="rpc://localhost:3001/blobstore?tenant=local&appId=%APPID%" \ No newline at end of file diff --git a/.gitignore b/.gitignore index 665f9ac..8477fe8 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ /tools *.sqlite /.gitea-release -/.edge \ No newline at end of file +/.edge +/data \ No newline at end of file diff --git a/Makefile b/Makefile index e2829a2..611d22e 100644 --- a/Makefile +++ b/Makefile @@ -11,9 +11,12 @@ DATE_VERSION := $(shell date +%Y.%-m.%-d) FULL_VERSION := v$(DATE_VERSION)-$(GIT_VERSION)$(if $(shell git diff --stat),-dirty,) APP_PATH ?= misc/client-sdk-testsuite/dist RUN_APP_ARGS ?= +RUN_STORAGE_SERVER_ARGS ?= + SHELL := bash -build: build-edge-cli build-client-sdk-test-app + +build: build-cli build-storage-server build-client-sdk-test-app watch: tools/modd/bin/modd tools/modd/bin/modd @@ -22,17 +25,23 @@ watch: tools/modd/bin/modd test: test-go test-go: - go test -v -count=1 $(GOTEST_ARGS) ./... + go test -count=1 $(GOTEST_ARGS) ./... lint: golangci-lint run --enable-all $(LINT_ARGS) -build-edge-cli: build-sdk +build-cli: build-sdk CGO_ENABLED=0 go build \ -v \ -o ./bin/cli \ ./cmd/cli +build-storage-server: build-sdk + CGO_ENABLED=0 go build \ + -v \ + -o ./bin/storage-server \ + ./cmd/storage-server + build-client-sdk-test-app: cd misc/client-sdk-testsuite && $(MAKE) dist @@ -68,6 +77,9 @@ node_modules: run-app: .env ( set -o allexport && source .env && set +o allexport && bin/cli app run -p $(APP_PATH) $$RUN_APP_ARGS ) +run-storage-server: .env + ( set -o allexport && source .env && set +o allexport && bin/storage-server run $$RUN_STORAGE_SERVER_ARGS ) + .env: cp .env.dist .env diff --git a/cmd/cli/command/app/run.go b/cmd/cli/command/app/run.go index 6ce6d44..05003e5 100644 --- a/cmd/cli/command/app/run.go +++ b/cmd/cli/command/app/run.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "io/ioutil" "net" "net/http" "os" @@ -28,9 +27,7 @@ import ( "forge.cadoles.com/arcad/edge/pkg/module/fetch" netModule "forge.cadoles.com/arcad/edge/pkg/module/net" shareModule "forge.cadoles.com/arcad/edge/pkg/module/share" - shareSqlite "forge.cadoles.com/arcad/edge/pkg/module/share/sqlite" "forge.cadoles.com/arcad/edge/pkg/storage" - storageSqlite "forge.cadoles.com/arcad/edge/pkg/storage/sqlite" "gitlab.com/wpetit/goweb/logger" "forge.cadoles.com/arcad/edge/pkg/bundle" @@ -45,6 +42,12 @@ import ( _ "forge.cadoles.com/arcad/edge/pkg/module/auth/http/passwd/argon2id" _ "forge.cadoles.com/arcad/edge/pkg/module/auth/http/passwd/plain" + + // Register storage drivers + "forge.cadoles.com/arcad/edge/pkg/storage/driver" + _ "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc" + _ "forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite" + "forge.cadoles.com/arcad/edge/pkg/storage/share" ) func RunCommand() *cli.Command { @@ -75,14 +78,22 @@ func RunCommand() *cli.Command { Value: 0, }, &cli.StringFlag{ - Name: "storage-file", - Usage: "use `FILE` for SQLite storage database", - Value: ".edge/%APPID%/data.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=60000", + Name: "blobstore-dsn", + Usage: "use `DSN` for blob storage", + EnvVars: []string{"EDGE_BLOBSTORE_DSN"}, + Value: "sqlite://.edge/%APPID%/data.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=60000", }, &cli.StringFlag{ - Name: "shared-resources-file", - Usage: "use `FILE` for SQLite shared resources database", - Value: ".edge/shared-resources.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=60000", + Name: "documentstore-dsn", + Usage: "use `DSN` for document storage", + EnvVars: []string{"EDGE_DOCUMENTSTORE_DSN"}, + Value: "sqlite://.edge/%APPID%/data.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=60000", + }, + &cli.StringFlag{ + Name: "sharestore-dsn", + Usage: "use `DSN` for share storage", + EnvVars: []string{"EDGE_SHARESTORE_DSN"}, + Value: "sqlite://.edge/share.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=60000", }, &cli.StringFlag{ Name: "accounts-file", @@ -96,9 +107,10 @@ func RunCommand() *cli.Command { logFormat := ctx.String("log-format") logLevel := ctx.Int("log-level") - storageFile := ctx.String("storage-file") + blobstoreDSN := ctx.String("blobstore-dsn") + documentstoreDSN := ctx.String("documentstore-dsn") + shareStoreDSN := ctx.String("sharestore-dsn") accountsFile := ctx.String("accounts-file") - sharedResourcesFile := ctx.String("shared-resources-file") logger.SetFormat(logger.Format(logFormat)) logger.SetLevel(logger.Level(logLevel)) @@ -144,7 +156,7 @@ func RunCommand() *cli.Command { appCtx := logger.With(cmdCtx, logger.F("address", address)) - if err := runApp(appCtx, path, address, storageFile, accountsFile, appsRepository, sharedResourcesFile); err != nil { + if err := runApp(appCtx, path, address, documentstoreDSN, blobstoreDSN, shareStoreDSN, accountsFile, appsRepository); err != nil { logger.Error(appCtx, "could not run app", logger.E(errors.WithStack(err))) } }(p, port, idx) @@ -157,7 +169,7 @@ func RunCommand() *cli.Command { } } -func runApp(ctx context.Context, path string, address string, storageFile string, accountsFile string, appRepository appModule.Repository, sharedResourcesFile string) error { +func runApp(ctx context.Context, path, address, documentStoreDSN, blobStoreDSN, shareStoreDSN, accountsFile string, appRepository appModule.Repository) error { absPath, err := filepath.Abs(path) if err != nil { return errors.Wrapf(err, "could not resolve path '%s'", path) @@ -190,9 +202,8 @@ func runApp(ctx context.Context, path string, address string, storageFile string deps := &moduleDeps{} funcs := []ModuleDepFunc{ initMemoryBus, - initDatastores(storageFile, manifest.ID), + initDatastores(documentStoreDSN, blobStoreDSN, shareStoreDSN, manifest.ID), initAccounts(accountsFile, manifest.ID), - initShareRepository(sharedResourcesFile), initAppRepository(appRepository), } @@ -243,13 +254,13 @@ func runApp(ctx context.Context, path string, address string, storageFile string } type moduleDeps struct { - AppID app.ID - Bus bus.Bus - DocumentStore storage.DocumentStore - BlobStore storage.BlobStore - AppRepository appModule.Repository - ShareRepository shareModule.Repository - Accounts []authHTTP.LocalAccount + AppID app.ID + Bus bus.Bus + DocumentStore storage.DocumentStore + BlobStore storage.BlobStore + AppRepository appModule.Repository + ShareStore share.Store + Accounts []authHTTP.LocalAccount } type ModuleDepFunc func(*moduleDeps) error @@ -269,7 +280,7 @@ func getServerModules(deps *moduleDeps) []app.ServerModuleFactory { ), appModule.ModuleFactory(deps.AppRepository), fetch.ModuleFactory(deps.Bus), - shareModule.ModuleFactory(deps.AppID, deps.ShareRepository), + shareModule.ModuleFactory(deps.AppID, deps.ShareStore), } } @@ -323,10 +334,10 @@ func loadLocalAccounts(path string) ([]authHTTP.LocalAccount, error) { return nil, errors.WithStack(err) } - data, err := ioutil.ReadFile(path) + data, err := os.ReadFile(path) if err != nil { if errors.Is(err, os.ErrNotExist) { - if err := ioutil.WriteFile(path, defaultAccounts, 0o640); err != nil { + if err := os.WriteFile(path, defaultAccounts, 0o640); err != nil { return nil, errors.WithStack(err) } @@ -437,21 +448,32 @@ func initMemoryBus(deps *moduleDeps) error { return nil } -func initDatastores(storageFile string, appID app.ID) ModuleDepFunc { +func initDatastores(documentStoreDSN, blobStoreDSN, shareStoreDSN string, appID app.ID) ModuleDepFunc { return func(deps *moduleDeps) error { - storageFile = injectAppID(storageFile, appID) + documentStoreDSN = injectAppID(documentStoreDSN, appID) - if err := ensureDir(storageFile); err != nil { - return errors.WithStack(err) - } - - db, err := storageSqlite.Open(storageFile) + documentStore, err := driver.NewDocumentStore(documentStoreDSN) if err != nil { return errors.WithStack(err) } - deps.DocumentStore = storageSqlite.NewDocumentStoreWithDB(db) - deps.BlobStore = storageSqlite.NewBlobStoreWithDB(db) + deps.DocumentStore = documentStore + + blobStoreDSN = injectAppID(blobStoreDSN, appID) + + blobStore, err := driver.NewBlobStore(blobStoreDSN) + if err != nil { + return errors.WithStack(err) + } + + deps.BlobStore = blobStore + + shareStore, err := driver.NewShareStore(shareStoreDSN) + if err != nil { + return errors.WithStack(err) + } + + deps.ShareStore = shareStore return nil } @@ -471,17 +493,3 @@ func initAccounts(accountsFile string, appID app.ID) ModuleDepFunc { return nil } } - -func initShareRepository(shareRepositoryFile string) ModuleDepFunc { - return func(deps *moduleDeps) error { - if err := ensureDir(shareRepositoryFile); err != nil { - return errors.WithStack(err) - } - - repo := shareSqlite.NewRepository(shareRepositoryFile) - - deps.ShareRepository = repo - - return nil - } -} diff --git a/cmd/storage-server/command/auth/root.go b/cmd/storage-server/command/auth/root.go new file mode 100644 index 0000000..bdf044a --- /dev/null +++ b/cmd/storage-server/command/auth/root.go @@ -0,0 +1,13 @@ +package auth + +import ( + "github.com/urfave/cli/v2" +) + +func Root() *cli.Command { + return &cli.Command{ + Name: "auth", + Usage: "Auth related command", + Subcommands: []*cli.Command{}, + } +} diff --git a/cmd/storage-server/command/main.go b/cmd/storage-server/command/main.go new file mode 100644 index 0000000..791d8dc --- /dev/null +++ b/cmd/storage-server/command/main.go @@ -0,0 +1,48 @@ +package command + +import ( + "context" + "fmt" + "os" + "sort" + + "github.com/urfave/cli/v2" +) + +func Main(commands ...*cli.Command) { + ctx := context.Background() + + app := &cli.App{ + Name: "storage-server", + Usage: "Edge storage server", + Commands: commands, + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "debug", + EnvVars: []string{"DEBUG"}, + Value: false, + }, + }, + } + + app.ExitErrHandler = func(ctx *cli.Context, err error) { + if err == nil { + return + } + + debug := ctx.Bool("debug") + + if !debug { + fmt.Printf("[ERROR] %v\n", err) + } else { + fmt.Printf("%+v", err) + } + } + + sort.Sort(cli.FlagsByName(app.Flags)) + sort.Sort(cli.CommandsByName(app.Commands)) + + if err := app.RunContext(ctx, os.Args); err != nil { + os.Exit(1) + } +} diff --git a/cmd/storage-server/command/run.go b/cmd/storage-server/command/run.go new file mode 100644 index 0000000..bbcf49c --- /dev/null +++ b/cmd/storage-server/command/run.go @@ -0,0 +1,192 @@ +package command + +import ( + "fmt" + "net/http" + "strings" + "sync" + "time" + + "github.com/hashicorp/golang-lru/v2/expirable" + "github.com/keegancsmith/rpc" + "gitlab.com/wpetit/goweb/logger" + + "forge.cadoles.com/arcad/edge/pkg/storage/driver" + "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server" + "github.com/go-chi/chi/v5" + "github.com/go-chi/chi/v5/middleware" + "github.com/pkg/errors" + "github.com/urfave/cli/v2" + + // Register storage drivers + _ "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc" + _ "forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite" +) + +func Run() *cli.Command { + return &cli.Command{ + Name: "run", + Usage: "Run server", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "address", + Aliases: []string{"addr"}, + Value: ":3001", + }, + &cli.StringFlag{ + Name: "blobstore-dsn-pattern", + EnvVars: []string{"STORAGE_SERVER_BLOBSTORE_DSN_PATTERN"}, + Value: fmt.Sprintf("sqlite://data/%%TENANT%%/%%APPID%%/blobstore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", (60 * time.Second).Milliseconds()), + }, + &cli.StringFlag{ + Name: "documentstore-dsn-pattern", + EnvVars: []string{"STORAGE_SERVER_DOCUMENTSTORE_DSN_PATTERN"}, + Value: fmt.Sprintf("sqlite://data/%%TENANT%%/%%APPID%%/documentstore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", (60 * time.Second).Milliseconds()), + }, + &cli.DurationFlag{ + Name: "cache-ttl", + EnvVars: []string{"STORAGE_SERVER_CACHE_TTL"}, + Value: time.Hour, + }, + &cli.IntFlag{ + Name: "cache-size", + EnvVars: []string{"STORAGE_SERVER_CACHE_SIZE"}, + Value: 32, + }, + }, + Action: func(ctx *cli.Context) error { + addr := ctx.String("address") + blobstoreDSNPattern := ctx.String("blobstore-dsn-pattern") + documentstoreDSNPattern := ctx.String("documentstore-dsn-pattern") + cacheSize := ctx.Int("cache-size") + cacheTTL := ctx.Duration("cache-ttl") + + router := chi.NewRouter() + + router.Use(middleware.RealIP) + router.Use(middleware.Logger) + + router.Handle("/blobstore", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + tenant := r.URL.Query().Get("tenant") + if tenant == "" { + http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) + + return + } + + appID := r.URL.Query().Get("appId") + if tenant == "" { + http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) + + return + } + + server, err := getBlobStoreStoreServer(cacheSize, cacheTTL, tenant, appID, blobstoreDSNPattern) + if err != nil { + logger.Error(r.Context(), "could not retrieve blob store server", logger.E(errors.WithStack(err)), logger.F("tenant", tenant)) + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + + return + } + + server.ServeHTTP(w, r) + })) + + router.Handle("/documentstore", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + tenant := r.URL.Query().Get("tenant") + if tenant == "" { + http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) + + return + } + + appID := r.URL.Query().Get("appId") + if tenant == "" { + http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) + + return + } + + server, err := getDocumentStoreServer(cacheSize, cacheTTL, tenant, appID, documentstoreDSNPattern) + if err != nil { + logger.Error(r.Context(), "could not retrieve document store server", logger.E(errors.WithStack(err)), logger.F("tenant", tenant)) + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + + return + } + + server.ServeHTTP(w, r) + })) + + if err := http.ListenAndServe(addr, router); err != nil { + return errors.WithStack(err) + } + + return nil + }, + } +} + +var ( + documentStoreCache *expirable.LRU[string, *rpc.Server] + initDocumentStoreCache sync.Once +) + +func getDocumentStoreServer(cacheSize int, cacheTTL time.Duration, tenant, appID, dsnPattern string) (*rpc.Server, error) { + initDocumentStoreCache.Do(func() { + documentStoreCache = expirable.NewLRU[string, *rpc.Server](cacheSize, nil, cacheTTL) + }) + + key := fmt.Sprintf("%s:%s", tenant, appID) + + documentStoreServer, _ := documentStoreCache.Get(key) + if documentStoreServer != nil { + return documentStoreServer, nil + } + + dsn := strings.ReplaceAll(dsnPattern, "%TENANT%", tenant) + dsn = strings.ReplaceAll(dsn, "%APPID%", appID) + + documentStore, err := driver.NewDocumentStore(dsn) + if err != nil { + return nil, errors.WithStack(err) + } + + documentStoreServer = server.NewDocumentStoreServer(documentStore) + + documentStoreCache.Add(key, documentStoreServer) + + return documentStoreServer, nil +} + +var ( + blobStoreCache *expirable.LRU[string, *rpc.Server] + initBlobStoreCache sync.Once +) + +func getBlobStoreStoreServer(cacheSize int, cacheTTL time.Duration, tenant, appID, dsnPattern string) (*rpc.Server, error) { + initBlobStoreCache.Do(func() { + blobStoreCache = expirable.NewLRU[string, *rpc.Server](cacheSize, nil, cacheTTL) + }) + + key := fmt.Sprintf("%s:%s", tenant, appID) + + blobStoreServer, _ := blobStoreCache.Get(key) + if blobStoreServer != nil { + return blobStoreServer, nil + } + + dsn := strings.ReplaceAll(dsnPattern, "%TENANT%", tenant) + dsn = strings.ReplaceAll(dsn, "%APPID%", appID) + + blobStore, err := driver.NewBlobStore(dsn) + if err != nil { + return nil, errors.WithStack(err) + } + + blobStoreServer = server.NewBlobStoreServer(blobStore) + + blobStoreCache.Add(key, blobStoreServer) + + return blobStoreServer, nil +} diff --git a/cmd/storage-server/main.go b/cmd/storage-server/main.go new file mode 100644 index 0000000..35ed74e --- /dev/null +++ b/cmd/storage-server/main.go @@ -0,0 +1,13 @@ +package main + +import ( + "forge.cadoles.com/arcad/edge/cmd/storage-server/command" + "forge.cadoles.com/arcad/edge/cmd/storage-server/command/auth" +) + +func main() { + command.Main( + command.Run(), + auth.Root(), + ) +} diff --git a/go.mod b/go.mod index 19c0759..a206a84 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,8 @@ require ( github.com/go-playground/universal-translator v0.16.0 // indirect github.com/goccy/go-json v0.9.11 // indirect github.com/gogo/protobuf v0.0.0-20161014173244-50d1bd39ce4e // indirect + github.com/hashicorp/golang-lru/v2 v2.0.6 // indirect + github.com/keegancsmith/rpc v1.3.0 // indirect github.com/leodido/go-urn v1.1.0 // indirect github.com/lestrrat-go/blackmagic v1.0.1 // indirect github.com/lestrrat-go/httpcc v1.0.1 // indirect diff --git a/go.sum b/go.sum index e1a0564..d01e884 100644 --- a/go.sum +++ b/go.sum @@ -188,6 +188,8 @@ github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad github.com/hashicorp/go.net v0.0.0-20151006203346-104dcad90073/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/golang-lru/v2 v2.0.6 h1:3xi/Cafd1NaoEnS/yDssIiuVeDVywU0QdFGl3aQaQHM= +github.com/hashicorp/golang-lru/v2 v2.0.6/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/mdns v0.0.0-20151206042412-9d85cf22f9f8/go.mod h1:aa76Av3qgPeIQp9Y3qIkTBPieQYNkQ13Kxe7pze9Wb0= github.com/hashicorp/mdns v1.0.5 h1:1M5hW1cunYeoXOqHwEb/GBDDHAFo0Yqb/uz/beC6LbE= @@ -201,6 +203,8 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1 github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= +github.com/keegancsmith/rpc v1.3.0 h1:wGWOpjcNrZaY8GDYZJfvyxmlLljm3YQWF+p918DXtDk= +github.com/keegancsmith/rpc v1.3.0/go.mod h1:6O2xnOGjPyvIPbvp0MdrOe5r6cu1GZ4JoTzpzDhWeo0= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= diff --git a/modd.conf b/modd.conf index 5f5e28a..5f1a776 100644 --- a/modd.conf +++ b/modd.conf @@ -9,6 +9,7 @@ modd.conf prep: make build-client-sdk-test-app prep: make build daemon: make run-app + daemon: make run-storage-server } **/*.go { diff --git a/pkg/module/blob/module_test.go b/pkg/module/blob/module_test.go index e7bd736..aed1a23 100644 --- a/pkg/module/blob/module_test.go +++ b/pkg/module/blob/module_test.go @@ -1,14 +1,14 @@ package blob import ( - "io/ioutil" + "os" "testing" "cdr.dev/slog" "forge.cadoles.com/arcad/edge/pkg/app" "forge.cadoles.com/arcad/edge/pkg/bus/memory" "forge.cadoles.com/arcad/edge/pkg/module" - "forge.cadoles.com/arcad/edge/pkg/storage/sqlite" + "forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite" "github.com/pkg/errors" "gitlab.com/wpetit/goweb/logger" ) @@ -27,7 +27,7 @@ func TestBlobModule(t *testing.T) { ModuleFactory(bus, store), ) - data, err := ioutil.ReadFile("testdata/blob.js") + data, err := os.ReadFile("testdata/blob.js") if err != nil { t.Fatal(err) } diff --git a/pkg/module/share/module.go b/pkg/module/share/module.go index 9b50077..8eebcc9 100644 --- a/pkg/module/share/module.go +++ b/pkg/module/share/module.go @@ -5,18 +5,19 @@ import ( "forge.cadoles.com/arcad/edge/pkg/app" "forge.cadoles.com/arcad/edge/pkg/module/util" + "forge.cadoles.com/arcad/edge/pkg/storage/share" "github.com/dop251/goja" "github.com/pkg/errors" ) const ( - AnyType ValueType = "*" - AnyName string = "*" + AnyType share.ValueType = "*" + AnyName string = "*" ) type Module struct { - appID app.ID - repository Repository + appID app.ID + store share.Store } func (m *Module) Name() string { @@ -48,19 +49,19 @@ func (m *Module) Export(export *goja.Object) { panic(errors.Wrap(err, "could not set 'ANY_NAME' property")) } - if err := export.Set("TYPE_TEXT", TypeText); err != nil { + if err := export.Set("TYPE_TEXT", share.TypeText); err != nil { panic(errors.Wrap(err, "could not set 'TYPE_TEXT' property")) } - if err := export.Set("TYPE_NUMBER", TypeNumber); err != nil { + if err := export.Set("TYPE_NUMBER", share.TypeNumber); err != nil { panic(errors.Wrap(err, "could not set 'TYPE_NUMBER' property")) } - if err := export.Set("TYPE_BOOL", TypeBool); err != nil { + if err := export.Set("TYPE_BOOL", share.TypeBool); err != nil { panic(errors.Wrap(err, "could not set 'TYPE_BOOL' property")) } - if err := export.Set("TYPE_PATH", TypePath); err != nil { + if err := export.Set("TYPE_PATH", share.TypePath); err != nil { panic(errors.Wrap(err, "could not set 'TYPE_PATH' property")) } } @@ -69,20 +70,20 @@ func (m *Module) upsertResource(call goja.FunctionCall, rt *goja.Runtime) goja.V ctx := util.AssertContext(call.Argument(0), rt) resourceID := assertResourceID(call.Argument(1), rt) - var attributes []Attribute + var attributes []share.Attribute if len(call.Arguments) > 2 { attributes = assertAttributes(call.Arguments[2:], rt) } else { - attributes = make([]Attribute, 0) + attributes = make([]share.Attribute, 0) } for _, attr := range attributes { - if err := AssertType(attr.Value(), attr.Type()); err != nil { + if err := share.AssertType(attr.Value(), attr.Type()); err != nil { panic(rt.ToValue(errors.WithStack(err))) } } - resource, err := m.repository.UpdateAttributes(ctx, m.appID, resourceID, attributes...) + resource, err := m.store.UpdateAttributes(ctx, m.appID, resourceID, attributes...) if err != nil { panic(rt.ToValue(errors.WithStack(err))) } @@ -101,7 +102,7 @@ func (m *Module) deleteAttributes(call goja.FunctionCall, rt *goja.Runtime) goja names = make([]string, 0) } - err := m.repository.DeleteAttributes(ctx, m.appID, resourceID, names...) + err := m.store.DeleteAttributes(ctx, m.appID, resourceID, names...) if err != nil { panic(rt.ToValue(errors.WithStack(err))) } @@ -112,23 +113,23 @@ func (m *Module) deleteAttributes(call goja.FunctionCall, rt *goja.Runtime) goja func (m *Module) findResources(call goja.FunctionCall, rt *goja.Runtime) goja.Value { ctx := util.AssertContext(call.Argument(0), rt) - funcs := make([]FindResourcesOptionFunc, 0) + funcs := make([]share.FindResourcesOptionFunc, 0) if len(call.Arguments) > 1 { name := util.AssertString(call.Argument(1), rt) if name != AnyName { - funcs = append(funcs, WithName(name)) + funcs = append(funcs, share.WithName(name)) } } if len(call.Arguments) > 2 { valueType := assertValueType(call.Argument(2), rt) if valueType != AnyType { - funcs = append(funcs, WithType(valueType)) + funcs = append(funcs, share.WithType(valueType)) } } - resources, err := m.repository.FindResources(ctx, funcs...) + resources, err := m.store.FindResources(ctx, funcs...) if err != nil { panic(rt.ToValue(errors.WithStack(err))) } @@ -140,7 +141,7 @@ func (m *Module) deleteResource(call goja.FunctionCall, rt *goja.Runtime) goja.V ctx := util.AssertContext(call.Argument(0), rt) resourceID := assertResourceID(call.Argument(1), rt) - err := m.repository.DeleteResource(ctx, m.appID, resourceID) + err := m.store.DeleteResource(ctx, m.appID, resourceID) if err != nil { panic(rt.ToValue(errors.WithStack(err))) } @@ -148,29 +149,29 @@ func (m *Module) deleteResource(call goja.FunctionCall, rt *goja.Runtime) goja.V return nil } -func ModuleFactory(appID app.ID, repository Repository) app.ServerModuleFactory { +func ModuleFactory(appID app.ID, store share.Store) app.ServerModuleFactory { return func(server *app.Server) app.ServerModule { return &Module{ - appID: appID, - repository: repository, + appID: appID, + store: store, } } } -func assertResourceID(v goja.Value, r *goja.Runtime) ResourceID { +func assertResourceID(v goja.Value, r *goja.Runtime) share.ResourceID { value := v.Export() switch typ := value.(type) { case string: - return ResourceID(typ) - case ResourceID: + return share.ResourceID(typ) + case share.ResourceID: return typ default: panic(r.ToValue(errors.Errorf("expected value to be a string or ResourceID, got '%T'", value))) } } -func assertAttributes(values []goja.Value, r *goja.Runtime) []Attribute { - attributes := make([]Attribute, len(values)) +func assertAttributes(values []goja.Value, r *goja.Runtime) []share.Attribute { + attributes := make([]share.Attribute, len(values)) for idx, val := range values { export := val.Export() @@ -195,12 +196,12 @@ func assertAttributes(values []goja.Value, r *goja.Runtime) []Attribute { panic(r.ToValue(errors.Errorf("could not find 'type' property on attribute '%v'", export))) } - var valueType ValueType + var valueType share.ValueType switch typ := rawType.(type) { - case ValueType: + case share.ValueType: valueType = typ case string: - valueType = ValueType(typ) + valueType = share.ValueType(typ) default: panic(r.ToValue(errors.Errorf("unexpected value for attribute property 'type': expected 'string' or 'ValueType', got '%T'", rawType))) @@ -211,7 +212,7 @@ func assertAttributes(values []goja.Value, r *goja.Runtime) []Attribute { panic(r.ToValue(errors.Errorf("could not find 'value' property on attribute '%v'", export))) } - attributes[idx] = NewBaseAttribute( + attributes[idx] = share.NewBaseAttribute( name, valueType, value, @@ -232,12 +233,12 @@ func assertStrings(values []goja.Value, r *goja.Runtime) []string { return strings } -func assertValueType(v goja.Value, r *goja.Runtime) ValueType { +func assertValueType(v goja.Value, r *goja.Runtime) share.ValueType { value := v.Export() switch typ := value.(type) { case string: - return ValueType(typ) - case ValueType: + return share.ValueType(typ) + case share.ValueType: return typ default: panic(r.ToValue(errors.Errorf("expected value to be a string or ValueType, got '%T'", value))) @@ -245,7 +246,7 @@ func assertValueType(v goja.Value, r *goja.Runtime) ValueType { } type gojaResource struct { - ID ResourceID `goja:"id" json:"id"` + ID share.ResourceID `goja:"id" json:"id"` Origin app.ID `goja:"origin" json:"origin"` Attributes []*gojaAttribute `goja:"attributes" json:"attributes"` } @@ -254,7 +255,7 @@ func (r *gojaResource) Has(call goja.FunctionCall, rt *goja.Runtime) goja.Value name := util.AssertString(call.Argument(0), rt) valueType := assertValueType(call.Argument(1), rt) - hasAttr := HasAttribute(toResource(r), name, valueType) + hasAttr := share.HasAttribute(toResource(r), name, valueType) return rt.ToValue(hasAttr) } @@ -268,7 +269,7 @@ func (r *gojaResource) Get(call goja.FunctionCall, rt *goja.Runtime) goja.Value defaultValue = call.Argument(2).Export() } - attr := GetAttribute(toResource(r), name, valueType) + attr := share.GetAttribute(toResource(r), name, valueType) if attr == nil { return rt.ToValue(defaultValue) @@ -278,14 +279,14 @@ func (r *gojaResource) Get(call goja.FunctionCall, rt *goja.Runtime) goja.Value } type gojaAttribute struct { - Name string `goja:"name" json:"name"` - Type ValueType `goja:"type" json:"type"` - Value any `goja:"value" json:"value"` - CreatedAt time.Time `goja:"createdAt" json:"createdAt"` - UpdatedAt time.Time `goja:"updatedAt" json:"updatedAt"` + Name string `goja:"name" json:"name"` + Type share.ValueType `goja:"type" json:"type"` + Value any `goja:"value" json:"value"` + CreatedAt time.Time `goja:"createdAt" json:"createdAt"` + UpdatedAt time.Time `goja:"updatedAt" json:"updatedAt"` } -func toGojaResource(res Resource) *gojaResource { +func toGojaResource(res share.Resource) *gojaResource { attributes := make([]*gojaAttribute, len(res.Attributes())) for idx, attr := range res.Attributes() { @@ -305,7 +306,7 @@ func toGojaResource(res Resource) *gojaResource { } } -func toGojaResources(resources []Resource) []*gojaResource { +func toGojaResources(resources []share.Resource) []*gojaResource { gojaResources := make([]*gojaResource, len(resources)) for idx, res := range resources { gojaResources[idx] = toGojaResource(res) @@ -313,19 +314,19 @@ func toGojaResources(resources []Resource) []*gojaResource { return gojaResources } -func toResource(res *gojaResource) Resource { - return NewBaseResource( +func toResource(res *gojaResource) share.Resource { + return share.NewBaseResource( res.Origin, res.ID, toAttributes(res.Attributes)..., ) } -func toAttributes(gojaAttributes []*gojaAttribute) []Attribute { - attributes := make([]Attribute, len(gojaAttributes)) +func toAttributes(gojaAttributes []*gojaAttribute) []share.Attribute { + attributes := make([]share.Attribute, len(gojaAttributes)) for idx, gojaAttr := range gojaAttributes { - attr := NewBaseAttribute( + attr := share.NewBaseAttribute( gojaAttr.Name, gojaAttr.Type, gojaAttr.Value, diff --git a/pkg/module/share/testsuite/module.go b/pkg/module/share/module_test.go similarity index 70% rename from pkg/module/share/testsuite/module.go rename to pkg/module/share/module_test.go index 6737440..9a70419 100644 --- a/pkg/module/share/testsuite/module.go +++ b/pkg/module/share/module_test.go @@ -1,21 +1,23 @@ -package testsuite +package share import ( "context" - "io/fs" + "os" "testing" "forge.cadoles.com/arcad/edge/pkg/app" "forge.cadoles.com/arcad/edge/pkg/module" - "forge.cadoles.com/arcad/edge/pkg/module/share" + "forge.cadoles.com/arcad/edge/pkg/storage/driver" "github.com/pkg/errors" "gitlab.com/wpetit/goweb/logger" + + _ "forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite" ) -func TestModule(t *testing.T, newRepo NewTestRepoFunc) { +func TestModule(t *testing.T) { logger.SetLevel(logger.LevelDebug) - repo, err := newRepo("module") + store, err := driver.NewShareStore("sqlite://testdata/test_share_module.sqlite") if err != nil { t.Fatalf("%+v", errors.WithStack(err)) } @@ -23,10 +25,10 @@ func TestModule(t *testing.T, newRepo NewTestRepoFunc) { server := app.NewServer( module.ContextModuleFactory(), module.ConsoleModuleFactory(), - share.ModuleFactory("test.app.edge", repo), + ModuleFactory("test.app.edge", store), ) - data, err := fs.ReadFile(testData, "testdata/share.js") + data, err := os.ReadFile("testdata/share.js") if err != nil { t.Fatalf("%+v", errors.WithStack(err)) } diff --git a/pkg/module/share/sqlite/module_test.go b/pkg/module/share/sqlite/module_test.go deleted file mode 100644 index 4b498fc..0000000 --- a/pkg/module/share/sqlite/module_test.go +++ /dev/null @@ -1,13 +0,0 @@ -package sqlite - -import ( - "testing" - - "forge.cadoles.com/arcad/edge/pkg/module/share/testsuite" - "gitlab.com/wpetit/goweb/logger" -) - -func TestModule(t *testing.T) { - logger.SetLevel(logger.LevelDebug) - testsuite.TestModule(t, newTestRepo) -} diff --git a/pkg/module/share/sqlite/testdata/.gitignore b/pkg/module/share/sqlite/testdata/.gitignore deleted file mode 100644 index 885029a..0000000 --- a/pkg/module/share/sqlite/testdata/.gitignore +++ /dev/null @@ -1 +0,0 @@ -*.sqlite* \ No newline at end of file diff --git a/pkg/module/share/testsuite/testdata/share.js b/pkg/module/share/testdata/share.js similarity index 100% rename from pkg/module/share/testsuite/testdata/share.js rename to pkg/module/share/testdata/share.js diff --git a/pkg/module/share/testsuite/repository.go b/pkg/module/share/testsuite/repository.go deleted file mode 100644 index 46867d1..0000000 --- a/pkg/module/share/testsuite/repository.go +++ /dev/null @@ -1,16 +0,0 @@ -package testsuite - -import ( - "testing" - - "forge.cadoles.com/arcad/edge/pkg/module/share" -) - -type NewTestRepoFunc func(testname string) (share.Repository, error) - -func TestRepository(t *testing.T, newRepo NewTestRepoFunc) { - t.Run("Cases", func(t *testing.T) { - t.Parallel() - runRepositoryTests(t, newRepo) - }) -} diff --git a/pkg/module/store/module_test.go b/pkg/module/store/module_test.go index b48ccb7..1e46137 100644 --- a/pkg/module/store/module_test.go +++ b/pkg/module/store/module_test.go @@ -2,12 +2,12 @@ package store import ( "context" - "io/ioutil" + "os" "testing" "forge.cadoles.com/arcad/edge/pkg/app" "forge.cadoles.com/arcad/edge/pkg/module" - "forge.cadoles.com/arcad/edge/pkg/storage/sqlite" + "forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite" "github.com/pkg/errors" "gitlab.com/wpetit/goweb/logger" ) @@ -22,7 +22,7 @@ func TestStoreModule(t *testing.T) { ModuleFactory(store), ) - data, err := ioutil.ReadFile("testdata/store.js") + data, err := os.ReadFile("testdata/store.js") if err != nil { t.Fatalf("%+v", errors.WithStack(err)) } diff --git a/pkg/storage/driver/blob_store.go b/pkg/storage/driver/blob_store.go new file mode 100644 index 0000000..c7d1df8 --- /dev/null +++ b/pkg/storage/driver/blob_store.go @@ -0,0 +1,35 @@ +package driver + +import ( + "net/url" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "github.com/pkg/errors" +) + +var blobStoreFactories = make(map[string]BlobStoreFactory, 0) + +type BlobStoreFactory func(url *url.URL) (storage.BlobStore, error) + +func RegisterBlobStoreFactory(scheme string, factory BlobStoreFactory) { + blobStoreFactories[scheme] = factory +} + +func NewBlobStore(dsn string) (storage.BlobStore, error) { + url, err := url.Parse(dsn) + if err != nil { + return nil, errors.WithStack(err) + } + + factory, exists := blobStoreFactories[url.Scheme] + if !exists { + return nil, errors.WithStack(ErrSchemeNotRegistered) + } + + store, err := factory(url) + if err != nil { + return nil, errors.WithStack(err) + } + + return store, nil +} diff --git a/pkg/storage/driver/document_store.go b/pkg/storage/driver/document_store.go new file mode 100644 index 0000000..4a6b269 --- /dev/null +++ b/pkg/storage/driver/document_store.go @@ -0,0 +1,35 @@ +package driver + +import ( + "net/url" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "github.com/pkg/errors" +) + +var documentStoreFactories = make(map[string]DocumentStoreFactory, 0) + +type DocumentStoreFactory func(url *url.URL) (storage.DocumentStore, error) + +func RegisterDocumentStoreFactory(scheme string, factory DocumentStoreFactory) { + documentStoreFactories[scheme] = factory +} + +func NewDocumentStore(dsn string) (storage.DocumentStore, error) { + url, err := url.Parse(dsn) + if err != nil { + return nil, errors.WithStack(err) + } + + factory, exists := documentStoreFactories[url.Scheme] + if !exists { + return nil, errors.WithStack(ErrSchemeNotRegistered) + } + + store, err := factory(url) + if err != nil { + return nil, errors.WithStack(err) + } + + return store, nil +} diff --git a/pkg/storage/driver/error.go b/pkg/storage/driver/error.go new file mode 100644 index 0000000..35e31e5 --- /dev/null +++ b/pkg/storage/driver/error.go @@ -0,0 +1,5 @@ +package driver + +import "errors" + +var ErrSchemeNotRegistered = errors.New("scheme was not registered") diff --git a/pkg/storage/driver/rpc/client/blob_bucket.go b/pkg/storage/driver/rpc/client/blob_bucket.go new file mode 100644 index 0000000..437ea6c --- /dev/null +++ b/pkg/storage/driver/rpc/client/blob_bucket.go @@ -0,0 +1,239 @@ +package client + +import ( + "context" + "io" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server/blob" + "github.com/pkg/errors" +) + +type BlobBucket struct { + name string + id blob.BucketID + call CallFunc +} + +// Size implements storage.BlobBucket +func (b *BlobBucket) Size(ctx context.Context) (int64, error) { + args := blob.GetBucketSizeArgs{ + BucketID: b.id, + } + + reply := blob.GetBucketSizeReply{} + + if err := b.call(ctx, "Service.GetBucketSize", args, &reply); err != nil { + return 0, errors.WithStack(err) + } + + return reply.Size, nil +} + +// Name implements storage.BlobBucket +func (b *BlobBucket) Name() string { + return b.name +} + +// Close implements storage.BlobBucket +func (b *BlobBucket) Close() error { + args := blob.CloseBucketArgs{ + BucketID: b.id, + } + + reply := blob.CloseBucketReply{} + + if err := b.call(context.Background(), "Service.CloseBucket", args, &reply); err != nil { + return errors.WithStack(err) + } + + return nil +} + +// Delete implements storage.BlobBucket +func (b *BlobBucket) Delete(ctx context.Context, id storage.BlobID) error { + args := blob.DeleteBucketArgs{ + BucketName: b.name, + } + + reply := blob.DeleteBucketReply{} + + if err := b.call(context.Background(), "Service.DeleteBucket", args, &reply); err != nil { + return errors.WithStack(err) + } + + return nil +} + +// Get implements storage.BlobBucket +func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobInfo, error) { + args := blob.GetBlobInfoArgs{ + BucketID: b.id, + BlobID: id, + } + + reply := blob.GetBlobInfoReply{} + + if err := b.call(context.Background(), "Service.GetBlobInfo", args, &reply); err != nil { + return nil, errors.WithStack(err) + } + + return reply.BlobInfo, nil +} + +// List implements storage.BlobBucket +func (b *BlobBucket) List(ctx context.Context) ([]storage.BlobInfo, error) { + args := blob.ListBlobInfoArgs{ + BucketID: b.id, + } + + reply := blob.ListBlobInfoReply{} + + if err := b.call(context.Background(), "Service.ListBlobInfo", args, &reply); err != nil { + return nil, errors.WithStack(err) + } + + return reply.BlobInfos, nil +} + +// NewReader implements storage.BlobBucket +func (b *BlobBucket) NewReader(ctx context.Context, id storage.BlobID) (io.ReadSeekCloser, error) { + args := blob.NewBlobReaderArgs{ + BucketID: b.id, + BlobID: id, + } + + reply := blob.NewBlobReaderReply{} + + if err := b.call(context.Background(), "Service.NewBlobReader", args, &reply); err != nil { + return nil, errors.WithStack(err) + } + + return &blobReaderCloser{ + readerID: reply.ReaderID, + call: b.call, + }, nil +} + +// NewWriter implements storage.BlobBucket +func (b *BlobBucket) NewWriter(ctx context.Context, id storage.BlobID) (io.WriteCloser, error) { + args := blob.NewBlobWriterArgs{ + BucketID: b.id, + BlobID: id, + } + + reply := blob.NewBlobWriterReply{} + + if err := b.call(context.Background(), "Service.NewBlobWriter", args, &reply); err != nil { + return nil, errors.WithStack(err) + } + + return &blobWriterCloser{ + blobID: id, + writerID: reply.WriterID, + call: b.call, + }, nil +} + +type blobWriterCloser struct { + blobID storage.BlobID + writerID blob.WriterID + call CallFunc +} + +// Write implements io.WriteCloser +func (bwc *blobWriterCloser) Write(data []byte) (int, error) { + args := blob.WriteBlobArgs{ + WriterID: bwc.writerID, + Data: data, + } + + reply := blob.WriteBlobReply{} + + if err := bwc.call(context.Background(), "Service.WriteBlob", args, &reply); err != nil { + return 0, errors.WithStack(err) + } + + return reply.Written, nil +} + +// Close implements io.WriteCloser +func (bwc *blobWriterCloser) Close() error { + args := blob.CloseWriterArgs{ + WriterID: bwc.writerID, + } + + reply := blob.CloseBucketReply{} + + if err := bwc.call(context.Background(), "Service.CloseWriter", args, &reply); err != nil { + return errors.WithStack(err) + } + + return nil +} + +type blobReaderCloser struct { + readerID blob.ReaderID + call func(ctx context.Context, serviceMethod string, args any, reply any) error +} + +// Read implements io.ReadSeekCloser +func (brc *blobReaderCloser) Read(p []byte) (int, error) { + args := blob.ReadBlobArgs{ + ReaderID: brc.readerID, + Length: len(p), + } + + reply := blob.ReadBlobReply{} + + if err := brc.call(context.Background(), "Service.ReadBlob", args, &reply); err != nil { + return 0, errors.WithStack(err) + } + + copy(p, reply.Data) + + if reply.EOF { + return reply.Read, io.EOF + } + + return reply.Read, nil +} + +// Seek implements io.ReadSeekCloser +func (brc *blobReaderCloser) Seek(offset int64, whence int) (int64, error) { + args := blob.SeekBlobArgs{ + ReaderID: brc.readerID, + Offset: offset, + Whence: whence, + } + + reply := blob.SeekBlobReply{} + + if err := brc.call(context.Background(), "Service.SeekBlob", args, &reply); err != nil { + return 0, errors.WithStack(err) + } + + return reply.Read, nil +} + +// Close implements io.ReadSeekCloser +func (brc *blobReaderCloser) Close() error { + args := blob.CloseReaderArgs{ + ReaderID: brc.readerID, + } + + reply := blob.CloseReaderReply{} + + if err := brc.call(context.Background(), "Service.CloseReader", args, &reply); err != nil { + return errors.WithStack(err) + } + + return nil +} + +var ( + _ storage.BlobBucket = &BlobBucket{} + _ storage.BlobInfo = &BlobInfo{} + _ io.WriteCloser = &blobWriterCloser{} + _ io.ReadSeekCloser = &blobReaderCloser{} +) diff --git a/pkg/storage/driver/rpc/client/blob_info.go b/pkg/storage/driver/rpc/client/blob_info.go new file mode 100644 index 0000000..5cc9b24 --- /dev/null +++ b/pkg/storage/driver/rpc/client/blob_info.go @@ -0,0 +1,40 @@ +package client + +import ( + "time" + + "forge.cadoles.com/arcad/edge/pkg/storage" +) + +type BlobInfo struct { + id storage.BlobID + bucket string + contentType string + modTime time.Time + size int64 +} + +// Bucket implements storage.BlobInfo +func (i *BlobInfo) Bucket() string { + return i.bucket +} + +// ID implements storage.BlobInfo +func (i *BlobInfo) ID() storage.BlobID { + return i.id +} + +// ContentType implements storage.BlobInfo +func (i *BlobInfo) ContentType() string { + return i.contentType +} + +// ModTime implements storage.BlobInfo +func (i *BlobInfo) ModTime() time.Time { + return i.modTime +} + +// Size implements storage.BlobInfo +func (i *BlobInfo) Size() int64 { + return i.size +} diff --git a/pkg/storage/driver/rpc/client/blob_store.go b/pkg/storage/driver/rpc/client/blob_store.go new file mode 100644 index 0000000..cf31b04 --- /dev/null +++ b/pkg/storage/driver/rpc/client/blob_store.go @@ -0,0 +1,101 @@ +package client + +import ( + "context" + "net/url" + + "github.com/keegancsmith/rpc" + "gitlab.com/wpetit/goweb/logger" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server/blob" + "github.com/pkg/errors" +) + +type BlobStore struct { + serverURL *url.URL +} + +// DeleteBucket implements storage.BlobStore. +func (s *BlobStore) DeleteBucket(ctx context.Context, name string) error { + args := &blob.DeleteBucketArgs{ + BucketName: name, + } + + if err := s.call(ctx, "Service.DeleteBucket", args, nil); err != nil { + return errors.WithStack(err) + } + + return nil +} + +// ListBuckets implements storage.BlobStore. +func (s *BlobStore) ListBuckets(ctx context.Context) ([]string, error) { + args := &blob.ListBucketsArgs{} + + reply := blob.ListBucketsReply{} + + if err := s.call(ctx, "Service.ListBuckets", args, &reply); err != nil { + return nil, errors.WithStack(err) + } + + return reply.Buckets, nil +} + +// OpenBucket implements storage.BlobStore. +func (s *BlobStore) OpenBucket(ctx context.Context, name string) (storage.BlobBucket, error) { + args := &blob.OpenBucketArgs{ + BucketName: name, + } + reply := &blob.OpenBucketReply{} + + if err := s.call(ctx, "Service.OpenBucket", args, reply); err != nil { + return nil, errors.WithStack(err) + } + + return &BlobBucket{ + name: name, + id: reply.BucketID, + call: s.call, + }, nil +} + +func (s *BlobStore) call(ctx context.Context, serviceMethod string, args any, reply any) error { + err := s.withClient(ctx, func(ctx context.Context, client *rpc.Client) error { + if err := client.Call(ctx, serviceMethod, args, reply); err != nil { + return errors.WithStack(err) + } + + return nil + }) + if err != nil { + return errors.WithStack(err) + } + + return nil +} + +func (s *BlobStore) withClient(ctx context.Context, fn func(ctx context.Context, client *rpc.Client) error) error { + client, err := rpc.DialHTTPPath("tcp", s.serverURL.Host, s.serverURL.Path+"?"+s.serverURL.RawQuery) + if err != nil { + return errors.WithStack(err) + } + + defer func() { + if err := client.Close(); err != nil { + logger.Error(ctx, "could not close rpc client", logger.E(errors.WithStack(err))) + } + }() + + if err := fn(ctx, client); err != nil { + return errors.WithStack(err) + } + + return nil +} + +func NewBlobStore(serverURL *url.URL) *BlobStore { + return &BlobStore{serverURL} +} + +var _ storage.BlobStore = &BlobStore{} diff --git a/pkg/storage/driver/rpc/client/blob_store_test.go b/pkg/storage/driver/rpc/client/blob_store_test.go new file mode 100644 index 0000000..8bdbdf3 --- /dev/null +++ b/pkg/storage/driver/rpc/client/blob_store_test.go @@ -0,0 +1,87 @@ +package client + +import ( + "context" + "fmt" + "net/http/httptest" + "net/url" + "os" + "testing" + "time" + + "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server" + "forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite" + "forge.cadoles.com/arcad/edge/pkg/storage/testsuite" + "github.com/pkg/errors" + "gitlab.com/wpetit/goweb/logger" +) + +func TestBlobStore(t *testing.T) { + t.Parallel() + if testing.Verbose() { + logger.SetLevel(logger.LevelDebug) + } + + httpServer, err := startNewBlobStoreServer() + if err != nil { + t.Fatalf("%+v", errors.WithStack(err)) + } + + defer httpServer.Close() + + serverAddr := httpServer.Listener.Addr() + serverURL := &url.URL{ + Host: serverAddr.String(), + } + + store := NewBlobStore(serverURL) + + testsuite.TestBlobStore(context.Background(), t, store) +} + +func BenchmarkBlobStore(t *testing.B) { + logger.SetLevel(logger.LevelError) + + httpServer, err := startNewBlobStoreServer() + if err != nil { + t.Fatalf("%+v", errors.WithStack(err)) + } + + defer httpServer.Close() + + serverAddr := httpServer.Listener.Addr() + serverURL := &url.URL{ + Host: serverAddr.String(), + } + + store := NewBlobStore(serverURL) + + testsuite.BenchmarkBlobStore(t, store) + +} + +func getSQLiteBlobStore() (*sqlite.BlobStore, error) { + file := "./testdata/blobstore_test.sqlite" + + if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) { + return nil, errors.WithStack(err) + } + + dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds()) + store := sqlite.NewBlobStore(dsn) + + return store, nil +} + +func startNewBlobStoreServer() (*httptest.Server, error) { + store, err := getSQLiteBlobStore() + if err != nil { + return nil, errors.WithStack(err) + } + + server := server.NewBlobStoreServer(store) + + httpServer := httptest.NewServer(server) + + return httpServer, nil +} diff --git a/pkg/storage/driver/rpc/client/document_store.go b/pkg/storage/driver/rpc/client/document_store.go new file mode 100644 index 0000000..46282f2 --- /dev/null +++ b/pkg/storage/driver/rpc/client/document_store.go @@ -0,0 +1,134 @@ +package client + +import ( + "context" + "net/url" + + "github.com/keegancsmith/rpc" + "github.com/pkg/errors" + "gitlab.com/wpetit/goweb/logger" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server/document" + "forge.cadoles.com/arcad/edge/pkg/storage/filter" +) + +type DocumentStore struct { + serverURL *url.URL +} + +// Delete implements storage.DocumentStore. +func (s *DocumentStore) Delete(ctx context.Context, collection string, id storage.DocumentID) error { + args := document.DeleteDocumentArgs{ + Collection: collection, + DocumentID: id, + } + + reply := document.DeleteDocumentReply{} + + if err := s.call(ctx, "Service.DeleteDocument", args, &reply); err != nil { + return errors.WithStack(err) + } + + return nil +} + +// Get implements storage.DocumentStore. +func (s *DocumentStore) Get(ctx context.Context, collection string, id storage.DocumentID) (storage.Document, error) { + args := document.GetDocumentArgs{ + Collection: collection, + DocumentID: id, + } + + reply := document.GetDocumentReply{} + + if err := s.call(ctx, "Service.GetDocument", args, &reply); err != nil { + return nil, errors.WithStack(err) + } + + return reply.Document, nil +} + +// Query implements storage.DocumentStore. +func (s *DocumentStore) Query(ctx context.Context, collection string, filter *filter.Filter, funcs ...storage.QueryOptionFunc) ([]storage.Document, error) { + opts := &storage.QueryOptions{} + for _, fn := range funcs { + fn(opts) + } + + args := document.QueryDocumentsArgs{ + Collection: collection, + Filter: nil, + Options: opts, + } + + if filter != nil { + args.Filter = filter.AsMap() + } + + reply := document.QueryDocumentsReply{ + Documents: []storage.Document{}, + } + + if err := s.call(ctx, "Service.QueryDocuments", args, &reply); err != nil { + return nil, errors.WithStack(err) + } + + return reply.Documents, nil +} + +// Upsert implements storage.DocumentStore. +func (s *DocumentStore) Upsert(ctx context.Context, collection string, doc storage.Document) (storage.Document, error) { + args := document.UpsertDocumentArgs{ + Collection: collection, + Document: doc, + } + + reply := document.UpsertDocumentReply{} + + if err := s.call(ctx, "Service.UpsertDocument", args, &reply); err != nil { + return nil, errors.WithStack(err) + } + + return reply.Document, nil +} + +func (s *DocumentStore) call(ctx context.Context, serviceMethod string, args any, reply any) error { + err := s.withClient(ctx, func(ctx context.Context, client *rpc.Client) error { + if err := client.Call(ctx, serviceMethod, args, reply); err != nil { + return errors.WithStack(err) + } + + return nil + }) + if err != nil { + return errors.WithStack(err) + } + + return nil +} + +func (s *DocumentStore) withClient(ctx context.Context, fn func(ctx context.Context, client *rpc.Client) error) error { + client, err := rpc.DialHTTPPath("tcp", s.serverURL.Host, s.serverURL.Path+"?"+s.serverURL.RawQuery) + if err != nil { + return errors.WithStack(err) + } + + defer func() { + if err := client.Close(); err != nil { + logger.Error(ctx, "could not close rpc client", logger.E(errors.WithStack(err))) + } + }() + + if err := fn(ctx, client); err != nil { + return errors.WithStack(err) + } + + return nil +} + +func NewDocumentStore(url *url.URL) *DocumentStore { + return &DocumentStore{url} +} + +var _ storage.DocumentStore = &DocumentStore{} diff --git a/pkg/storage/driver/rpc/client/document_store_test.go b/pkg/storage/driver/rpc/client/document_store_test.go new file mode 100644 index 0000000..f029c51 --- /dev/null +++ b/pkg/storage/driver/rpc/client/document_store_test.go @@ -0,0 +1,67 @@ +package client + +import ( + "context" + "fmt" + "net/http/httptest" + "net/url" + "os" + "testing" + "time" + + "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server" + "forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite" + "forge.cadoles.com/arcad/edge/pkg/storage/testsuite" + "github.com/pkg/errors" + "gitlab.com/wpetit/goweb/logger" +) + +func TestDocumentStore(t *testing.T) { + t.Parallel() + if testing.Verbose() { + logger.SetLevel(logger.LevelDebug) + } + + httpServer, err := startNewDocumentStoreServer() + if err != nil { + t.Fatalf("%+v", errors.WithStack(err)) + } + + defer httpServer.Close() + + serverAddr := httpServer.Listener.Addr() + + serverURL := &url.URL{ + Host: serverAddr.String(), + } + + store := NewDocumentStore(serverURL) + + testsuite.TestDocumentStore(context.Background(), t, store) +} + +func getSQLiteDocumentStore() (*sqlite.DocumentStore, error) { + file := "./testdata/documentstore_test.sqlite" + + if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) { + return nil, errors.WithStack(err) + } + + dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds()) + store := sqlite.NewDocumentStore(dsn) + + return store, nil +} + +func startNewDocumentStoreServer() (*httptest.Server, error) { + store, err := getSQLiteDocumentStore() + if err != nil { + return nil, errors.WithStack(err) + } + + server := server.NewDocumentStoreServer(store) + + httpServer := httptest.NewServer(server) + + return httpServer, nil +} diff --git a/pkg/storage/driver/rpc/client/init.go b/pkg/storage/driver/rpc/client/init.go new file mode 100644 index 0000000..5a6ba64 --- /dev/null +++ b/pkg/storage/driver/rpc/client/init.go @@ -0,0 +1,9 @@ +package client + +import ( + "context" + + _ "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/gob" +) + +type CallFunc func(ctx context.Context, serviceMethod string, args any, reply any) error diff --git a/pkg/storage/sqlite/testdata/.gitignore b/pkg/storage/driver/rpc/client/testdata/.gitignore similarity index 100% rename from pkg/storage/sqlite/testdata/.gitignore rename to pkg/storage/driver/rpc/client/testdata/.gitignore diff --git a/pkg/storage/driver/rpc/driver.go b/pkg/storage/driver/rpc/driver.go new file mode 100644 index 0000000..fc8d961 --- /dev/null +++ b/pkg/storage/driver/rpc/driver.go @@ -0,0 +1,22 @@ +package rpc + +import ( + "net/url" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "forge.cadoles.com/arcad/edge/pkg/storage/driver" + "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/client" +) + +func init() { + driver.RegisterDocumentStoreFactory("rpc", documentStoreFactory) + driver.RegisterBlobStoreFactory("rpc", blobStoreFactory) +} + +func documentStoreFactory(url *url.URL) (storage.DocumentStore, error) { + return client.NewDocumentStore(url), nil +} + +func blobStoreFactory(url *url.URL) (storage.BlobStore, error) { + return client.NewBlobStore(url), nil +} diff --git a/pkg/storage/driver/rpc/gob/blob_info.go b/pkg/storage/driver/rpc/gob/blob_info.go new file mode 100644 index 0000000..a0c2e33 --- /dev/null +++ b/pkg/storage/driver/rpc/gob/blob_info.go @@ -0,0 +1,42 @@ +package gob + +import ( + "time" + + "forge.cadoles.com/arcad/edge/pkg/storage" +) + +type BlobInfo struct { + Bucket_ string + ContentType_ string + BlobID_ storage.BlobID + ModTime_ time.Time + Size_ int64 +} + +// Bucket implements storage.BlobInfo. +func (bi *BlobInfo) Bucket() string { + return bi.Bucket_ +} + +// ContentType implements storage.BlobInfo. +func (bi *BlobInfo) ContentType() string { + return bi.ContentType_ +} + +// ID implements storage.BlobInfo. +func (bi *BlobInfo) ID() storage.BlobID { + return bi.BlobID_ +} + +// ModTime implements storage.BlobInfo. +func (bi *BlobInfo) ModTime() time.Time { + return bi.ModTime_ +} + +// Size implements storage.BlobInfo. +func (bi *BlobInfo) Size() int64 { + return bi.Size_ +} + +var _ storage.BlobInfo = &BlobInfo{} diff --git a/pkg/storage/driver/rpc/gob/init.go b/pkg/storage/driver/rpc/gob/init.go new file mode 100644 index 0000000..1666eeb --- /dev/null +++ b/pkg/storage/driver/rpc/gob/init.go @@ -0,0 +1,18 @@ +package gob + +import ( + "encoding/gob" + "time" + + "forge.cadoles.com/arcad/edge/pkg/storage" +) + +func init() { + gob.Register(storage.Document{}) + gob.Register(storage.DocumentID("")) + gob.Register(time.Time{}) + gob.Register(map[string]interface{}{}) + gob.Register([]interface{}{}) + gob.Register([]map[string]interface{}{}) + gob.Register(&BlobInfo{}) +} diff --git a/pkg/storage/driver/rpc/server/blob/close_bucket.go b/pkg/storage/driver/rpc/server/blob/close_bucket.go new file mode 100644 index 0000000..64c403e --- /dev/null +++ b/pkg/storage/driver/rpc/server/blob/close_bucket.go @@ -0,0 +1,31 @@ +package blob + +import ( + "context" + + "github.com/pkg/errors" +) + +type CloseBucketArgs struct { + BucketID BucketID +} + +type CloseBucketReply struct { +} + +func (s *Service) CloseBucket(ctx context.Context, args *CloseBucketArgs, reply *CloseBucketReply) error { + bucket, err := s.getOpenedBucket(args.BucketID) + if err != nil { + return errors.WithStack(err) + } + + if err := bucket.Close(); err != nil { + return errors.WithStack(err) + } + + s.buckets.Delete(args.BucketID) + + *reply = CloseBucketReply{} + + return nil +} diff --git a/pkg/storage/driver/rpc/server/blob/close_reader.go b/pkg/storage/driver/rpc/server/blob/close_reader.go new file mode 100644 index 0000000..8f98134 --- /dev/null +++ b/pkg/storage/driver/rpc/server/blob/close_reader.go @@ -0,0 +1,31 @@ +package blob + +import ( + "context" + + "github.com/pkg/errors" +) + +type CloseReaderArgs struct { + ReaderID ReaderID +} + +type CloseReaderReply struct { +} + +func (s *Service) CloseReader(ctx context.Context, args *CloseReaderArgs, reply *CloseReaderReply) error { + reader, err := s.getOpenedReader(args.ReaderID) + if err != nil { + return errors.WithStack(err) + } + + if err := reader.Close(); err != nil { + return errors.WithStack(err) + } + + s.readers.Delete(args.ReaderID) + + *reply = CloseReaderReply{} + + return nil +} diff --git a/pkg/storage/driver/rpc/server/blob/close_writer.go b/pkg/storage/driver/rpc/server/blob/close_writer.go new file mode 100644 index 0000000..8fe6ab1 --- /dev/null +++ b/pkg/storage/driver/rpc/server/blob/close_writer.go @@ -0,0 +1,31 @@ +package blob + +import ( + "context" + + "github.com/pkg/errors" +) + +type CloseWriterArgs struct { + WriterID WriterID +} + +type CloseWriterReply struct { +} + +func (s *Service) CloseWriter(ctx context.Context, args *CloseWriterArgs, reply *CloseWriterReply) error { + writer, err := s.getOpenedWriter(args.WriterID) + if err != nil { + return errors.WithStack(err) + } + + if err := writer.Close(); err != nil { + return errors.WithStack(err) + } + + s.writers.Delete(args.WriterID) + + *reply = CloseWriterReply{} + + return nil +} diff --git a/pkg/storage/driver/rpc/server/blob/delete_bucket.go b/pkg/storage/driver/rpc/server/blob/delete_bucket.go new file mode 100644 index 0000000..751d97a --- /dev/null +++ b/pkg/storage/driver/rpc/server/blob/delete_bucket.go @@ -0,0 +1,22 @@ +package blob + +import ( + "context" + + "github.com/pkg/errors" +) + +type DeleteBucketArgs struct { + BucketName string +} + +type DeleteBucketReply struct { +} + +func (s *Service) DeleteBucket(ctx context.Context, args *DeleteBucketArgs, reply *DeleteBucketReply) error { + if err := s.store.DeleteBucket(ctx, args.BucketName); err != nil { + return errors.WithStack(err) + } + + return nil +} diff --git a/pkg/storage/driver/rpc/server/blob/get_blob_info.go b/pkg/storage/driver/rpc/server/blob/get_blob_info.go new file mode 100644 index 0000000..13b6c14 --- /dev/null +++ b/pkg/storage/driver/rpc/server/blob/get_blob_info.go @@ -0,0 +1,42 @@ +package blob + +import ( + "context" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/gob" + "github.com/pkg/errors" +) + +type GetBlobInfoArgs struct { + BlobID storage.BlobID + BucketID BucketID +} + +type GetBlobInfoReply struct { + BlobInfo storage.BlobInfo +} + +func (s *Service) GetBlobInfo(ctx context.Context, args *GetBlobInfoArgs, reply *GetBlobInfoReply) error { + bucket, err := s.getOpenedBucket(args.BucketID) + if err != nil { + return errors.WithStack(err) + } + + blobInfo, err := bucket.Get(ctx, args.BlobID) + if err != nil { + return errors.WithStack(err) + } + + *reply = GetBlobInfoReply{ + BlobInfo: &gob.BlobInfo{ + Bucket_: blobInfo.Bucket(), + ContentType_: blobInfo.ContentType(), + BlobID_: blobInfo.ID(), + ModTime_: blobInfo.ModTime(), + Size_: blobInfo.Size(), + }, + } + + return nil +} diff --git a/pkg/storage/driver/rpc/server/blob/get_bucket_size.go b/pkg/storage/driver/rpc/server/blob/get_bucket_size.go new file mode 100644 index 0000000..5c02733 --- /dev/null +++ b/pkg/storage/driver/rpc/server/blob/get_bucket_size.go @@ -0,0 +1,33 @@ +package blob + +import ( + "context" + + "github.com/pkg/errors" +) + +type GetBucketSizeArgs struct { + BucketID BucketID +} + +type GetBucketSizeReply struct { + Size int64 +} + +func (s *Service) GetBucketSize(ctx context.Context, args *GetBucketSizeArgs, reply *GetBucketSizeReply) error { + bucket, err := s.getOpenedBucket(args.BucketID) + if err != nil { + return errors.WithStack(err) + } + + size, err := bucket.Size(ctx) + if err != nil { + return errors.WithStack(err) + } + + *reply = GetBucketSizeReply{ + Size: size, + } + + return nil +} diff --git a/pkg/storage/driver/rpc/server/blob/list_blob_info.go b/pkg/storage/driver/rpc/server/blob/list_blob_info.go new file mode 100644 index 0000000..6614b0c --- /dev/null +++ b/pkg/storage/driver/rpc/server/blob/list_blob_info.go @@ -0,0 +1,34 @@ +package blob + +import ( + "context" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "github.com/pkg/errors" +) + +type ListBlobInfoArgs struct { + BucketID BucketID +} + +type ListBlobInfoReply struct { + BlobInfos []storage.BlobInfo +} + +func (s *Service) ListBlobInfo(ctx context.Context, args *ListBlobInfoArgs, reply *ListBlobInfoReply) error { + bucket, err := s.getOpenedBucket(args.BucketID) + if err != nil { + return errors.WithStack(err) + } + + blobInfos, err := bucket.List(ctx) + if err != nil { + return errors.WithStack(err) + } + + *reply = ListBlobInfoReply{ + BlobInfos: blobInfos, + } + + return nil +} diff --git a/pkg/storage/driver/rpc/server/blob/list_buckets.go b/pkg/storage/driver/rpc/server/blob/list_buckets.go new file mode 100644 index 0000000..52c8a10 --- /dev/null +++ b/pkg/storage/driver/rpc/server/blob/list_buckets.go @@ -0,0 +1,27 @@ +package blob + +import ( + "context" + + "github.com/pkg/errors" +) + +type ListBucketsArgs struct { +} + +type ListBucketsReply struct { + Buckets []string +} + +func (s *Service) ListBuckets(ctx context.Context, args *ListBucketsArgs, reply *ListBucketsReply) error { + buckets, err := s.store.ListBuckets(ctx) + if err != nil { + return errors.WithStack(err) + } + + *reply = ListBucketsReply{ + Buckets: buckets, + } + + return nil +} diff --git a/pkg/storage/driver/rpc/server/blob/new_blob_reader.go b/pkg/storage/driver/rpc/server/blob/new_blob_reader.go new file mode 100644 index 0000000..b1d1286 --- /dev/null +++ b/pkg/storage/driver/rpc/server/blob/new_blob_reader.go @@ -0,0 +1,57 @@ +package blob + +import ( + "context" + "io" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "github.com/pkg/errors" +) + +type NewBlobReaderArgs struct { + BlobID storage.BlobID + BucketID BucketID +} + +type NewBlobReaderReply struct { + ReaderID ReaderID +} + +func (s *Service) NewBlobReader(ctx context.Context, args *NewBlobReaderArgs, reply *NewBlobReaderReply) error { + bucket, err := s.getOpenedBucket(args.BucketID) + if err != nil { + return errors.WithStack(err) + } + + readerID, err := NewReaderID() + if err != nil { + return errors.WithStack(err) + } + + reader, err := bucket.NewReader(ctx, args.BlobID) + if err != nil { + return errors.WithStack(err) + } + + s.readers.Store(readerID, reader) + + *reply = NewBlobReaderReply{ + ReaderID: readerID, + } + + return nil +} + +func (s *Service) getOpenedReader(id ReaderID) (io.ReadSeekCloser, error) { + raw, exists := s.readers.Load(id) + if !exists { + return nil, errors.Errorf("could not find writer '%s'", id) + } + + reader, ok := raw.(io.ReadSeekCloser) + if !ok { + return nil, errors.Errorf("unexpected type '%T' for writer", raw) + } + + return reader, nil +} diff --git a/pkg/storage/driver/rpc/server/blob/new_blob_writer.go b/pkg/storage/driver/rpc/server/blob/new_blob_writer.go new file mode 100644 index 0000000..de25439 --- /dev/null +++ b/pkg/storage/driver/rpc/server/blob/new_blob_writer.go @@ -0,0 +1,57 @@ +package blob + +import ( + "context" + "io" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "github.com/pkg/errors" +) + +type NewBlobWriterArgs struct { + BlobID storage.BlobID + BucketID BucketID +} + +type NewBlobWriterReply struct { + WriterID WriterID +} + +func (s *Service) NewBlobWriter(ctx context.Context, args *NewBlobWriterArgs, reply *NewBlobWriterReply) error { + bucket, err := s.getOpenedBucket(args.BucketID) + if err != nil { + return errors.WithStack(err) + } + + writerID, err := NewWriterID() + if err != nil { + return errors.WithStack(err) + } + + writer, err := bucket.NewWriter(ctx, args.BlobID) + if err != nil { + return errors.WithStack(err) + } + + s.writers.Store(writerID, writer) + + *reply = NewBlobWriterReply{ + WriterID: writerID, + } + + return nil +} + +func (s *Service) getOpenedWriter(id WriterID) (io.WriteCloser, error) { + raw, exists := s.writers.Load(id) + if !exists { + return nil, errors.Errorf("could not find writer '%s'", id) + } + + writer, ok := raw.(io.WriteCloser) + if !ok { + return nil, errors.Errorf("unexpected type '%T' for writer", raw) + } + + return writer, nil +} diff --git a/pkg/storage/driver/rpc/server/blob/open_bucket.go b/pkg/storage/driver/rpc/server/blob/open_bucket.go new file mode 100644 index 0000000..bf5ce28 --- /dev/null +++ b/pkg/storage/driver/rpc/server/blob/open_bucket.go @@ -0,0 +1,50 @@ +package blob + +import ( + "context" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "github.com/pkg/errors" +) + +type OpenBucketArgs struct { + BucketName string +} + +type OpenBucketReply struct { + BucketID BucketID +} + +func (s *Service) OpenBucket(ctx context.Context, args *OpenBucketArgs, reply *OpenBucketReply) error { + bucket, err := s.store.OpenBucket(ctx, args.BucketName) + if err != nil { + return errors.WithStack(err) + } + + bucketID, err := NewBucketID() + if err != nil { + return errors.WithStack(err) + } + + s.buckets.Store(bucketID, bucket) + + *reply = OpenBucketReply{ + BucketID: bucketID, + } + + return nil +} + +func (s *Service) getOpenedBucket(id BucketID) (storage.BlobBucket, error) { + raw, exists := s.buckets.Load(id) + if !exists { + return nil, errors.WithStack(storage.ErrBucketClosed) + } + + bucket, ok := raw.(storage.BlobBucket) + if !ok { + return nil, errors.Errorf("unexpected type '%T' for blob bucket", raw) + } + + return bucket, nil +} diff --git a/pkg/storage/driver/rpc/server/blob/read_blob.go b/pkg/storage/driver/rpc/server/blob/read_blob.go new file mode 100644 index 0000000..7a12bc1 --- /dev/null +++ b/pkg/storage/driver/rpc/server/blob/read_blob.go @@ -0,0 +1,41 @@ +package blob + +import ( + "context" + "io" + + "github.com/pkg/errors" +) + +type ReadBlobArgs struct { + ReaderID ReaderID + Length int +} + +type ReadBlobReply struct { + Data []byte + Read int + EOF bool +} + +func (s *Service) ReadBlob(ctx context.Context, args *ReadBlobArgs, reply *ReadBlobReply) error { + reader, err := s.getOpenedReader(args.ReaderID) + if err != nil { + return errors.WithStack(err) + } + + buff := make([]byte, args.Length) + + read, err := reader.Read(buff) + if err != nil && !errors.Is(err, io.EOF) { + return errors.WithStack(err) + } + + *reply = ReadBlobReply{ + Read: read, + Data: buff, + EOF: errors.Is(err, io.EOF), + } + + return nil +} diff --git a/pkg/storage/driver/rpc/server/blob/seek_blob.go b/pkg/storage/driver/rpc/server/blob/seek_blob.go new file mode 100644 index 0000000..902abc2 --- /dev/null +++ b/pkg/storage/driver/rpc/server/blob/seek_blob.go @@ -0,0 +1,38 @@ +package blob + +import ( + "context" + "io" + + "github.com/pkg/errors" +) + +type SeekBlobArgs struct { + ReaderID ReaderID + Offset int64 + Whence int +} + +type SeekBlobReply struct { + Read int64 + EOF bool +} + +func (s *Service) SeekBlob(ctx context.Context, args *SeekBlobArgs, reply *SeekBlobReply) error { + reader, err := s.getOpenedReader(args.ReaderID) + if err != nil { + return errors.WithStack(err) + } + + read, err := reader.Seek(args.Offset, args.Whence) + if err != nil && !errors.Is(err, io.EOF) { + return errors.WithStack(err) + } + + *reply = SeekBlobReply{ + Read: read, + EOF: errors.Is(err, io.EOF), + } + + return nil +} diff --git a/pkg/storage/driver/rpc/server/blob/service.go b/pkg/storage/driver/rpc/server/blob/service.go new file mode 100644 index 0000000..3ab7689 --- /dev/null +++ b/pkg/storage/driver/rpc/server/blob/service.go @@ -0,0 +1,60 @@ +package blob + +import ( + "fmt" + "sync" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "github.com/google/uuid" + "github.com/pkg/errors" +) + +type BucketID string +type WriterID string +type ReaderID string + +type Service struct { + store storage.BlobStore + buckets sync.Map + writers sync.Map + readers sync.Map +} + +func NewService(store storage.BlobStore) *Service { + return &Service{ + store: store, + } +} + +func NewBucketID() (BucketID, error) { + uuid, err := uuid.NewUUID() + if err != nil { + return "", errors.WithStack(err) + } + + id := BucketID(fmt.Sprintf("bucket-%s", uuid.String())) + + return id, nil +} + +func NewWriterID() (WriterID, error) { + uuid, err := uuid.NewUUID() + if err != nil { + return "", errors.WithStack(err) + } + + id := WriterID(fmt.Sprintf("writer-%s", uuid.String())) + + return id, nil +} + +func NewReaderID() (ReaderID, error) { + uuid, err := uuid.NewUUID() + if err != nil { + return "", errors.WithStack(err) + } + + id := ReaderID(fmt.Sprintf("reader-%s", uuid.String())) + + return id, nil +} diff --git a/pkg/storage/driver/rpc/server/blob/write_blob.go b/pkg/storage/driver/rpc/server/blob/write_blob.go new file mode 100644 index 0000000..a3651a8 --- /dev/null +++ b/pkg/storage/driver/rpc/server/blob/write_blob.go @@ -0,0 +1,34 @@ +package blob + +import ( + "context" + + "github.com/pkg/errors" +) + +type WriteBlobArgs struct { + WriterID WriterID + Data []byte +} + +type WriteBlobReply struct { + Written int +} + +func (s *Service) WriteBlob(ctx context.Context, args *WriteBlobArgs, reply *WriteBlobReply) error { + writer, err := s.getOpenedWriter(args.WriterID) + if err != nil { + return errors.WithStack(err) + } + + written, err := writer.Write(args.Data) + if err != nil { + return errors.WithStack(err) + } + + *reply = WriteBlobReply{ + Written: written, + } + + return nil +} diff --git a/pkg/storage/driver/rpc/server/document/delete_document.go b/pkg/storage/driver/rpc/server/document/delete_document.go new file mode 100644 index 0000000..a351b71 --- /dev/null +++ b/pkg/storage/driver/rpc/server/document/delete_document.go @@ -0,0 +1,26 @@ +package document + +import ( + "context" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "github.com/pkg/errors" +) + +type DeleteDocumentArgs struct { + Collection string + DocumentID storage.DocumentID +} + +type DeleteDocumentReply struct { +} + +func (s *Service) DeleteDocument(ctx context.Context, args DeleteDocumentArgs, reply *DeleteDocumentReply) error { + if err := s.store.Delete(ctx, args.Collection, args.DocumentID); err != nil { + return errors.WithStack(err) + } + + *reply = DeleteDocumentReply{} + + return nil +} diff --git a/pkg/storage/driver/rpc/server/document/get_document.go b/pkg/storage/driver/rpc/server/document/get_document.go new file mode 100644 index 0000000..d08abe4 --- /dev/null +++ b/pkg/storage/driver/rpc/server/document/get_document.go @@ -0,0 +1,30 @@ +package document + +import ( + "context" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "github.com/pkg/errors" +) + +type GetDocumentArgs struct { + Collection string + DocumentID storage.DocumentID +} + +type GetDocumentReply struct { + Document storage.Document +} + +func (s *Service) GetDocument(ctx context.Context, args GetDocumentArgs, reply *GetDocumentReply) error { + document, err := s.store.Get(ctx, args.Collection, args.DocumentID) + if err != nil { + return errors.WithStack(err) + } + + *reply = GetDocumentReply{ + Document: document, + } + + return nil +} diff --git a/pkg/storage/driver/rpc/server/document/query_documents.go b/pkg/storage/driver/rpc/server/document/query_documents.go new file mode 100644 index 0000000..f7d8e28 --- /dev/null +++ b/pkg/storage/driver/rpc/server/document/query_documents.go @@ -0,0 +1,53 @@ +package document + +import ( + "context" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "forge.cadoles.com/arcad/edge/pkg/storage/filter" + "github.com/pkg/errors" +) + +type QueryDocumentsArgs struct { + Collection string + Filter map[string]any + Options *storage.QueryOptions +} + +type QueryDocumentsReply struct { + Documents []storage.Document +} + +func (s *Service) QueryDocuments(ctx context.Context, args QueryDocumentsArgs, reply *QueryDocumentsReply) error { + var ( + argsFilter *filter.Filter + err error + ) + + if args.Filter != nil { + argsFilter, err = filter.NewFrom(args.Filter) + if err != nil { + return errors.WithStack(err) + } + } + + documents, err := s.store.Query(ctx, args.Collection, argsFilter, withQueryOptions(args.Options)) + if err != nil { + return errors.WithStack(err) + } + + *reply = QueryDocumentsReply{ + Documents: documents, + } + + return nil +} + +func withQueryOptions(opts *storage.QueryOptions) storage.QueryOptionFunc { + return func(o *storage.QueryOptions) { + o.Limit = opts.Limit + o.Offset = opts.Offset + o.OrderBy = opts.OrderBy + o.OrderDirection = opts.OrderDirection + } +} diff --git a/pkg/storage/driver/rpc/server/document/service.go b/pkg/storage/driver/rpc/server/document/service.go new file mode 100644 index 0000000..fc908a0 --- /dev/null +++ b/pkg/storage/driver/rpc/server/document/service.go @@ -0,0 +1,11 @@ +package document + +import "forge.cadoles.com/arcad/edge/pkg/storage" + +type Service struct { + store storage.DocumentStore +} + +func NewService(store storage.DocumentStore) *Service { + return &Service{store} +} diff --git a/pkg/storage/driver/rpc/server/document/upsert_document.go b/pkg/storage/driver/rpc/server/document/upsert_document.go new file mode 100644 index 0000000..d1bcd5a --- /dev/null +++ b/pkg/storage/driver/rpc/server/document/upsert_document.go @@ -0,0 +1,30 @@ +package document + +import ( + "context" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "github.com/pkg/errors" +) + +type UpsertDocumentArgs struct { + Collection string + Document storage.Document +} + +type UpsertDocumentReply struct { + Document storage.Document +} + +func (s *Service) UpsertDocument(ctx context.Context, args UpsertDocumentArgs, reply *UpsertDocumentReply) error { + document, err := s.store.Upsert(ctx, args.Collection, args.Document) + if err != nil { + return errors.WithStack(err) + } + + *reply = UpsertDocumentReply{ + Document: document, + } + + return nil +} diff --git a/pkg/storage/driver/rpc/server/init.go b/pkg/storage/driver/rpc/server/init.go new file mode 100644 index 0000000..c5e8842 --- /dev/null +++ b/pkg/storage/driver/rpc/server/init.go @@ -0,0 +1,5 @@ +package server + +import ( + _ "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/gob" +) diff --git a/pkg/storage/driver/rpc/server/server.go b/pkg/storage/driver/rpc/server/server.go new file mode 100644 index 0000000..a3645ed --- /dev/null +++ b/pkg/storage/driver/rpc/server/server.go @@ -0,0 +1,21 @@ +package server + +import ( + "github.com/keegancsmith/rpc" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server/blob" + "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server/document" +) + +func NewBlobStoreServer(store storage.BlobStore) *rpc.Server { + server := rpc.NewServer() + server.Register(blob.NewService(store)) + return server +} + +func NewDocumentStoreServer(store storage.DocumentStore) *rpc.Server { + server := rpc.NewServer() + server.Register(document.NewService(store)) + return server +} diff --git a/pkg/storage/driver/share_store.go b/pkg/storage/driver/share_store.go new file mode 100644 index 0000000..983a00b --- /dev/null +++ b/pkg/storage/driver/share_store.go @@ -0,0 +1,35 @@ +package driver + +import ( + "net/url" + + "forge.cadoles.com/arcad/edge/pkg/storage/share" + "github.com/pkg/errors" +) + +var shareStoreFactories = make(map[string]ShareStoreFactory, 0) + +type ShareStoreFactory func(url *url.URL) (share.Store, error) + +func RegisterShareStoreFactory(scheme string, factory ShareStoreFactory) { + shareStoreFactories[scheme] = factory +} + +func NewShareStore(dsn string) (share.Store, error) { + url, err := url.Parse(dsn) + if err != nil { + return nil, errors.WithStack(err) + } + + factory, exists := shareStoreFactories[url.Scheme] + if !exists { + return nil, errors.WithStack(ErrSchemeNotRegistered) + } + + store, err := factory(url) + if err != nil { + return nil, errors.WithStack(err) + } + + return store, nil +} diff --git a/pkg/storage/sqlite/blob_bucket.go b/pkg/storage/driver/sqlite/blob_bucket.go similarity index 100% rename from pkg/storage/sqlite/blob_bucket.go rename to pkg/storage/driver/sqlite/blob_bucket.go diff --git a/pkg/storage/sqlite/blob_info.go b/pkg/storage/driver/sqlite/blob_info.go similarity index 100% rename from pkg/storage/sqlite/blob_info.go rename to pkg/storage/driver/sqlite/blob_info.go diff --git a/pkg/storage/sqlite/blob_store.go b/pkg/storage/driver/sqlite/blob_store.go similarity index 100% rename from pkg/storage/sqlite/blob_store.go rename to pkg/storage/driver/sqlite/blob_store.go diff --git a/pkg/storage/driver/sqlite/blob_store_test.go b/pkg/storage/driver/sqlite/blob_store_test.go new file mode 100644 index 0000000..f3812f4 --- /dev/null +++ b/pkg/storage/driver/sqlite/blob_store_test.go @@ -0,0 +1,46 @@ +package sqlite + +import ( + "context" + "fmt" + "os" + "testing" + "time" + + "forge.cadoles.com/arcad/edge/pkg/storage/testsuite" + "github.com/pkg/errors" + "gitlab.com/wpetit/goweb/logger" +) + +func TestBlobStore(t *testing.T) { + t.Parallel() + if testing.Verbose() { + logger.SetLevel(logger.LevelDebug) + } + + file := "./testdata/blobstore_test.sqlite" + + if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) { + t.Fatalf("%+v", errors.WithStack(err)) + } + + dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds()) + store := NewBlobStore(dsn) + + testsuite.TestBlobStore(context.Background(), t, store) +} + +func BenchmarkBlobStore(t *testing.B) { + logger.SetLevel(logger.LevelError) + + file := "./testdata/blobstore_test.sqlite" + + if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) { + t.Fatalf("%+v", errors.WithStack(err)) + } + + dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds()) + store := NewBlobStore(dsn) + + testsuite.BenchmarkBlobStore(t, store) +} diff --git a/pkg/storage/sqlite/document_store.go b/pkg/storage/driver/sqlite/document_store.go similarity index 97% rename from pkg/storage/sqlite/document_store.go rename to pkg/storage/driver/sqlite/document_store.go index d7bbc40..6f6f3e5 100644 --- a/pkg/storage/sqlite/document_store.go +++ b/pkg/storage/driver/sqlite/document_store.go @@ -276,7 +276,7 @@ func (s *DocumentStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) e return nil } -func ensureTables(ctx context.Context, db *sql.DB) error { +func ensureDocumentTables(ctx context.Context, db *sql.DB) error { err := WithTx(ctx, db, func(tx *sql.Tx) error { query := ` CREATE TABLE IF NOT EXISTS documents ( @@ -344,7 +344,7 @@ func withLimitOffsetClause(query string, args []any, limit int, offset int) (str } func NewDocumentStore(path string) *DocumentStore { - getDB := NewGetDBFunc(path, ensureTables) + getDB := NewGetDBFunc(path, ensureDocumentTables) return &DocumentStore{ getDB: getDB, @@ -352,7 +352,7 @@ func NewDocumentStore(path string) *DocumentStore { } func NewDocumentStoreWithDB(db *sql.DB) *DocumentStore { - getDB := NewGetDBFuncFromDB(db, ensureTables) + getDB := NewGetDBFuncFromDB(db, ensureDocumentTables) return &DocumentStore{ getDB: getDB, diff --git a/pkg/storage/sqlite/document_store_test.go b/pkg/storage/driver/sqlite/document_store_test.go similarity index 89% rename from pkg/storage/sqlite/document_store_test.go rename to pkg/storage/driver/sqlite/document_store_test.go index a4a24de..1833b43 100644 --- a/pkg/storage/sqlite/document_store_test.go +++ b/pkg/storage/driver/sqlite/document_store_test.go @@ -1,6 +1,7 @@ package sqlite import ( + "context" "fmt" "os" "testing" @@ -24,5 +25,5 @@ func TestDocumentStore(t *testing.T) { dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds()) store := NewDocumentStore(dsn) - testsuite.TestDocumentStore(t, store) + testsuite.TestDocumentStore(context.Background(), t, store) } diff --git a/pkg/storage/driver/sqlite/driver.go b/pkg/storage/driver/sqlite/driver.go new file mode 100644 index 0000000..97dae13 --- /dev/null +++ b/pkg/storage/driver/sqlite/driver.go @@ -0,0 +1,75 @@ +package sqlite + +import ( + "net/url" + "os" + "path/filepath" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "forge.cadoles.com/arcad/edge/pkg/storage/driver" + "forge.cadoles.com/arcad/edge/pkg/storage/share" + "github.com/pkg/errors" +) + +func init() { + driver.RegisterDocumentStoreFactory("sqlite", documentStoreFactory) + driver.RegisterBlobStoreFactory("sqlite", blobStoreFactory) + driver.RegisterShareStoreFactory("sqlite", shareStoreFactory) +} + +func documentStoreFactory(url *url.URL) (storage.DocumentStore, error) { + dir := filepath.Dir(url.Host + url.Path) + + if dir != ":memory:" { + if err := os.MkdirAll(dir, os.FileMode(0750)); err != nil { + return nil, errors.WithStack(err) + } + } + + path := url.Host + url.Path + "?" + url.RawQuery + + db, err := Open(path) + if err != nil { + return nil, errors.WithStack(err) + } + + return NewDocumentStoreWithDB(db), nil +} + +func blobStoreFactory(url *url.URL) (storage.BlobStore, error) { + dir := filepath.Dir(url.Host + url.Path) + + if dir != ":memory:" { + if err := os.MkdirAll(dir, os.FileMode(0750)); err != nil { + return nil, errors.WithStack(err) + } + } + + path := url.Host + url.Path + "?" + url.RawQuery + + db, err := Open(path) + if err != nil { + return nil, errors.WithStack(err) + } + + return NewBlobStoreWithDB(db), nil +} + +func shareStoreFactory(url *url.URL) (share.Store, error) { + dir := filepath.Dir(url.Host + url.Path) + + if dir != ":memory:" { + if err := os.MkdirAll(dir, os.FileMode(0750)); err != nil { + return nil, errors.WithStack(err) + } + } + + path := url.Host + url.Path + "?" + url.RawQuery + + db, err := Open(path) + if err != nil { + return nil, errors.WithStack(err) + } + + return NewShareStoreWithDB(db), nil +} diff --git a/pkg/storage/sqlite/filter.go b/pkg/storage/driver/sqlite/filter.go similarity index 100% rename from pkg/storage/sqlite/filter.go rename to pkg/storage/driver/sqlite/filter.go diff --git a/pkg/storage/sqlite/json.go b/pkg/storage/driver/sqlite/json.go similarity index 100% rename from pkg/storage/sqlite/json.go rename to pkg/storage/driver/sqlite/json.go diff --git a/pkg/module/share/sqlite/repository.go b/pkg/storage/driver/sqlite/share_store.go similarity index 84% rename from pkg/module/share/sqlite/repository.go rename to pkg/storage/driver/sqlite/share_store.go index cb36d71..4fff4dd 100644 --- a/pkg/module/share/sqlite/repository.go +++ b/pkg/storage/driver/sqlite/share_store.go @@ -7,19 +7,18 @@ import ( "time" "forge.cadoles.com/arcad/edge/pkg/app" - "forge.cadoles.com/arcad/edge/pkg/module/share" - "forge.cadoles.com/arcad/edge/pkg/storage/sqlite" + "forge.cadoles.com/arcad/edge/pkg/storage/share" "github.com/pkg/errors" "gitlab.com/wpetit/goweb/logger" ) -type Repository struct { - getDB sqlite.GetDBFunc +type ShareStore struct { + getDB GetDBFunc } // DeleteAttributes implements share.Repository -func (r *Repository) DeleteAttributes(ctx context.Context, origin app.ID, resourceID share.ResourceID, names ...string) error { - err := r.withTx(ctx, func(tx *sql.Tx) error { +func (s *ShareStore) DeleteAttributes(ctx context.Context, origin app.ID, resourceID share.ResourceID, names ...string) error { + err := s.withTx(ctx, func(tx *sql.Tx) error { query := ` DELETE FROM resources WHERE origin = $1 AND resource_id = $2 @@ -76,8 +75,8 @@ func (r *Repository) DeleteAttributes(ctx context.Context, origin app.ID, resour } // DeleteResource implements share.Repository -func (r *Repository) DeleteResource(ctx context.Context, origin app.ID, resourceID share.ResourceID) error { - err := r.withTx(ctx, func(tx *sql.Tx) error { +func (s *ShareStore) DeleteResource(ctx context.Context, origin app.ID, resourceID share.ResourceID) error { + err := s.withTx(ctx, func(tx *sql.Tx) error { query := ` DELETE FROM resources WHERE origin = $1 AND resource_id = $2 @@ -115,12 +114,12 @@ func (r *Repository) DeleteResource(ctx context.Context, origin app.ID, resource } // FindResources implements share.Repository -func (r *Repository) FindResources(ctx context.Context, funcs ...share.FindResourcesOptionFunc) ([]share.Resource, error) { +func (s *ShareStore) FindResources(ctx context.Context, funcs ...share.FindResourcesOptionFunc) ([]share.Resource, error) { opts := share.FillFindResourcesOptions(funcs...) var resources []share.Resource - err := r.withTx(ctx, func(tx *sql.Tx) error { + err := s.withTx(ctx, func(tx *sql.Tx) error { query := ` SELECT main.origin, main.resource_id, @@ -222,14 +221,14 @@ func (r *Repository) FindResources(ctx context.Context, funcs ...share.FindResou } // GetResource implements share.Repository -func (r *Repository) GetResource(ctx context.Context, origin app.ID, resourceID share.ResourceID) (share.Resource, error) { +func (s *ShareStore) GetResource(ctx context.Context, origin app.ID, resourceID share.ResourceID) (share.Resource, error) { var ( resource *share.BaseResource err error ) - err = r.withTx(ctx, func(tx *sql.Tx) error { - resource, err = r.getResourceWithinTx(ctx, tx, origin, resourceID) + err = s.withTx(ctx, func(tx *sql.Tx) error { + resource, err = s.getResourceWithinTx(ctx, tx, origin, resourceID) if err != nil { return errors.WithStack(err) } @@ -244,13 +243,13 @@ func (r *Repository) GetResource(ctx context.Context, origin app.ID, resourceID } // UpdateAttributes implements share.Repository -func (r *Repository) UpdateAttributes(ctx context.Context, origin app.ID, resourceID share.ResourceID, attributes ...share.Attribute) (share.Resource, error) { +func (s *ShareStore) UpdateAttributes(ctx context.Context, origin app.ID, resourceID share.ResourceID, attributes ...share.Attribute) (share.Resource, error) { if len(attributes) == 0 { return nil, errors.WithStack(share.ErrAttributeRequired) } var resource *share.BaseResource - err := r.withTx(ctx, func(tx *sql.Tx) error { + err := s.withTx(ctx, func(tx *sql.Tx) error { query := ` INSERT INTO resources (origin, resource_id, name, type, value, created_at, updated_at) VALUES($1, $2, $3, $4, $5, $6, $6) @@ -289,7 +288,7 @@ func (r *Repository) UpdateAttributes(ctx context.Context, origin app.ID, resour } } - resource, err = r.getResourceWithinTx(ctx, tx, origin, resourceID) + resource, err = s.getResourceWithinTx(ctx, tx, origin, resourceID) if err != nil { return errors.WithStack(err) } @@ -303,7 +302,7 @@ func (r *Repository) UpdateAttributes(ctx context.Context, origin app.ID, resour return resource, nil } -func (r *Repository) getResourceWithinTx(ctx context.Context, tx *sql.Tx, origin app.ID, resourceID share.ResourceID) (*share.BaseResource, error) { +func (s *ShareStore) getResourceWithinTx(ctx context.Context, tx *sql.Tx, origin app.ID, resourceID share.ResourceID) (*share.BaseResource, error) { query := ` SELECT name, type, value, created_at, updated_at FROM resources @@ -361,23 +360,23 @@ func (r *Repository) getResourceWithinTx(ctx context.Context, tx *sql.Tx, origin return resource, nil } -func (r *Repository) withTx(ctx context.Context, fn func(tx *sql.Tx) error) error { +func (s *ShareStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) error { var db *sql.DB - db, err := r.getDB(ctx) + db, err := s.getDB(ctx) if err != nil { return errors.WithStack(err) } - if err := sqlite.WithTx(ctx, db, fn); err != nil { + if err := WithTx(ctx, db, fn); err != nil { return errors.WithStack(err) } return nil } -func ensureTables(ctx context.Context, db *sql.DB) error { - err := sqlite.WithTx(ctx, db, func(tx *sql.Tx) error { +func ensureShareTables(ctx context.Context, db *sql.DB) error { + err := WithTx(ctx, db, func(tx *sql.Tx) error { query := ` CREATE TABLE IF NOT EXISTS resources ( resource_id TEXT NOT NULL, @@ -410,20 +409,20 @@ func ensureTables(ctx context.Context, db *sql.DB) error { return nil } -func NewRepository(path string) *Repository { - getDB := sqlite.NewGetDBFunc(path, ensureTables) +func NewShareStore(path string) *ShareStore { + getDB := NewGetDBFunc(path, ensureShareTables) - return &Repository{ + return &ShareStore{ getDB: getDB, } } -func NewRepositoryWithDB(db *sql.DB) *Repository { - getDB := sqlite.NewGetDBFuncFromDB(db, ensureTables) +func NewShareStoreWithDB(db *sql.DB) *ShareStore { + getDB := NewGetDBFuncFromDB(db, ensureShareTables) - return &Repository{ + return &ShareStore{ getDB: getDB, } } -var _ share.Repository = &Repository{} +var _ share.Store = &ShareStore{} diff --git a/pkg/module/share/sqlite/repository_test.go b/pkg/storage/driver/sqlite/share_store_test.go similarity index 68% rename from pkg/module/share/sqlite/repository_test.go rename to pkg/storage/driver/sqlite/share_store_test.go index dd6d4dc..4d1ff27 100644 --- a/pkg/module/share/sqlite/repository_test.go +++ b/pkg/storage/driver/sqlite/share_store_test.go @@ -7,18 +7,18 @@ import ( "testing" "time" - "forge.cadoles.com/arcad/edge/pkg/module/share" - "forge.cadoles.com/arcad/edge/pkg/module/share/testsuite" + "forge.cadoles.com/arcad/edge/pkg/storage/share" + "forge.cadoles.com/arcad/edge/pkg/storage/share/testsuite" "github.com/pkg/errors" "gitlab.com/wpetit/goweb/logger" ) func TestRepository(t *testing.T) { logger.SetLevel(logger.LevelDebug) - testsuite.TestRepository(t, newTestRepo) + testsuite.TestStore(t, newTestStore) } -func newTestRepo(testName string) (share.Repository, error) { +func newTestStore(testName string) (share.Store, error) { filename := strings.ToLower(strings.ReplaceAll(testName, " ", "_")) file := fmt.Sprintf("./testdata/%s.sqlite", filename) @@ -27,7 +27,7 @@ func newTestRepo(testName string) (share.Repository, error) { } dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds()) - repo := NewRepository(dsn) + store := NewShareStore(dsn) - return repo, nil + return store, nil } diff --git a/pkg/storage/sqlite/sql.go b/pkg/storage/driver/sqlite/sql.go similarity index 100% rename from pkg/storage/sqlite/sql.go rename to pkg/storage/driver/sqlite/sql.go diff --git a/pkg/storage/driver/sqlite/testdata/.gitignore b/pkg/storage/driver/sqlite/testdata/.gitignore new file mode 100644 index 0000000..2a2bc40 --- /dev/null +++ b/pkg/storage/driver/sqlite/testdata/.gitignore @@ -0,0 +1 @@ +/*.sqlite* \ No newline at end of file diff --git a/pkg/storage/filter/and.go b/pkg/storage/filter/and.go index c504797..b73e4ca 100644 --- a/pkg/storage/filter/and.go +++ b/pkg/storage/filter/and.go @@ -8,6 +8,16 @@ func (o *AndOperator) Token() Token { return TokenAnd } +func (o *AndOperator) AsMap() map[string]any { + children := make([]map[string]any, 0, len(o.children)) + for _, c := range o.children { + children = append(children, c.AsMap()) + } + return map[string]any{ + string(TokenAnd): children, + } +} + func (o *AndOperator) Children() []Operator { return o.children } diff --git a/pkg/storage/filter/eq.go b/pkg/storage/filter/eq.go index c7d0163..497f06b 100644 --- a/pkg/storage/filter/eq.go +++ b/pkg/storage/filter/eq.go @@ -12,6 +12,16 @@ func (o *EqOperator) Fields() map[string]interface{} { return o.fields } +func (o *EqOperator) AsMap() map[string]any { + fields := make(map[string]any, len(o.fields)) + for k, v := range o.fields { + fields[k] = v + } + return map[string]any{ + string(TokenEq): fields, + } +} + func NewEqOperator(fields map[string]interface{}) *EqOperator { return &EqOperator{fields} } diff --git a/pkg/storage/filter/filter.go b/pkg/storage/filter/filter.go index 9a6acf3..87006eb 100644 --- a/pkg/storage/filter/filter.go +++ b/pkg/storage/filter/filter.go @@ -29,6 +29,15 @@ func NewFrom(raw map[string]interface{}) (*Filter, error) { return &Filter{op}, nil } +func (f *Filter) AsMap() map[string]any { + root := f.Root() + if root == nil { + return nil + } + + return f.Root().AsMap() +} + func toFieldOperator(v interface{}) (Operator, error) { vv, ok := v.(map[string]interface{}) if !ok { @@ -93,8 +102,17 @@ func toFieldOperator(v interface{}) (Operator, error) { } func toAggregateOperator(token Token, v interface{}) (Operator, error) { - vv, ok := v.([]interface{}) - if !ok { + var vv []interface{} + + switch typed := v.(type) { + case []interface{}: + vv = typed + case []map[string]interface{}: + vv = make([]interface{}, 0, len(typed)) + for _, item := range typed { + vv = append(vv, item) + } + default: return nil, errors.WithStack(ErrInvalidAggregationOperator) } diff --git a/pkg/storage/filter/gt.go b/pkg/storage/filter/gt.go index 32d1361..b72eddc 100644 --- a/pkg/storage/filter/gt.go +++ b/pkg/storage/filter/gt.go @@ -12,6 +12,16 @@ func (o *GtOperator) Fields() map[string]interface{} { return o.fields } +func (o *GtOperator) AsMap() map[string]any { + fields := make(map[string]any, len(o.fields)) + for k, v := range o.fields { + fields[k] = v + } + return map[string]any{ + string(TokenGt): fields, + } +} + func NewGtOperator(fields OperatorFields) *GtOperator { return &GtOperator{fields} } diff --git a/pkg/storage/filter/gte.go b/pkg/storage/filter/gte.go index de9a330..1fef96e 100644 --- a/pkg/storage/filter/gte.go +++ b/pkg/storage/filter/gte.go @@ -12,6 +12,16 @@ func (o *GteOperator) Fields() map[string]interface{} { return o.fields } +func (o *GteOperator) AsMap() map[string]any { + fields := make(map[string]any, len(o.fields)) + for k, v := range o.fields { + fields[k] = v + } + return map[string]any{ + string(TokenGte): fields, + } +} + func NewGteOperator(fields OperatorFields) *GteOperator { return &GteOperator{fields} } diff --git a/pkg/storage/filter/in.go b/pkg/storage/filter/in.go index 9672785..393074b 100644 --- a/pkg/storage/filter/in.go +++ b/pkg/storage/filter/in.go @@ -12,6 +12,16 @@ func (o *InOperator) Fields() map[string]interface{} { return o.fields } +func (o *InOperator) AsMap() map[string]any { + fields := make(map[string]any, len(o.fields)) + for k, v := range o.fields { + fields[k] = v + } + return map[string]any{ + string(TokenIn): fields, + } +} + func NewInOperator(fields OperatorFields) *InOperator { return &InOperator{fields} } diff --git a/pkg/storage/filter/like.go b/pkg/storage/filter/like.go index 2dd6cc0..80277bc 100644 --- a/pkg/storage/filter/like.go +++ b/pkg/storage/filter/like.go @@ -12,6 +12,16 @@ func (o *LikeOperator) Fields() map[string]interface{} { return o.fields } +func (o *LikeOperator) AsMap() map[string]any { + fields := make(map[string]any, len(o.fields)) + for k, v := range o.fields { + fields[k] = v + } + return map[string]any{ + string(TokenLike): fields, + } +} + func NewLikeOperator(fields OperatorFields) *LikeOperator { return &LikeOperator{fields} } diff --git a/pkg/storage/filter/lt.go b/pkg/storage/filter/lt.go index a60fc60..a680530 100644 --- a/pkg/storage/filter/lt.go +++ b/pkg/storage/filter/lt.go @@ -12,6 +12,16 @@ func (o *LtOperator) Fields() map[string]interface{} { return o.fields } +func (o *LtOperator) AsMap() map[string]any { + fields := make(map[string]any, len(o.fields)) + for k, v := range o.fields { + fields[k] = v + } + return map[string]any{ + string(TokenLt): fields, + } +} + func NewLtOperator(fields OperatorFields) *LtOperator { return &LtOperator{fields} } diff --git a/pkg/storage/filter/lte.go b/pkg/storage/filter/lte.go index 070635c..8f06e7d 100644 --- a/pkg/storage/filter/lte.go +++ b/pkg/storage/filter/lte.go @@ -12,6 +12,16 @@ func (o *LteOperator) Fields() map[string]interface{} { return o.fields } +func (o *LteOperator) AsMap() map[string]any { + fields := make(map[string]any, len(o.fields)) + for k, v := range o.fields { + fields[k] = v + } + return map[string]any{ + string(TokenLte): fields, + } +} + func NewLteOperator(fields OperatorFields) *LteOperator { return &LteOperator{fields} } diff --git a/pkg/storage/filter/neq.go b/pkg/storage/filter/neq.go index bdba587..ce3aba5 100644 --- a/pkg/storage/filter/neq.go +++ b/pkg/storage/filter/neq.go @@ -12,6 +12,16 @@ func (o *NeqOperator) Fields() map[string]interface{} { return o.fields } +func (o *NeqOperator) AsMap() map[string]any { + fields := make(map[string]any, len(o.fields)) + for k, v := range o.fields { + fields[k] = v + } + return map[string]any{ + string(TokenNeq): fields, + } +} + func NewNeqOperator(fields map[string]interface{}) *NeqOperator { return &NeqOperator{fields} } diff --git a/pkg/storage/filter/not.go b/pkg/storage/filter/not.go index 1b60af9..19a9b1f 100644 --- a/pkg/storage/filter/not.go +++ b/pkg/storage/filter/not.go @@ -12,6 +12,16 @@ func (o *NotOperator) Children() []Operator { return o.children } +func (o *NotOperator) AsMap() map[string]any { + children := make([]map[string]any, 0, len(o.children)) + for _, c := range o.children { + children = append(children, c.AsMap()) + } + return map[string]any{ + string(TokenNot): children, + } +} + func NewNotOperator(ops ...Operator) *NotOperator { return &NotOperator{ops} } diff --git a/pkg/storage/filter/operator.go b/pkg/storage/filter/operator.go index e4781b9..344b2da 100644 --- a/pkg/storage/filter/operator.go +++ b/pkg/storage/filter/operator.go @@ -20,4 +20,15 @@ type OperatorFields map[string]interface{} type Operator interface { Token() Token + AsMap() map[string]any +} + +type FieldOperator interface { + Operator + Fields() map[string]any +} + +type AggregatorOperator interface { + Operator + Children() []Operator } diff --git a/pkg/storage/filter/or.go b/pkg/storage/filter/or.go index 276f801..845d22a 100644 --- a/pkg/storage/filter/or.go +++ b/pkg/storage/filter/or.go @@ -12,6 +12,16 @@ func (o *OrOperator) Children() []Operator { return o.children } +func (o *OrOperator) AsMap() map[string]any { + children := make([]map[string]any, 0, len(o.children)) + for _, c := range o.children { + children = append(children, c.AsMap()) + } + return map[string]any{ + string(TokenOr): children, + } +} + func NewOrOperator(ops ...Operator) *OrOperator { return &OrOperator{ops} } diff --git a/pkg/module/share/error.go b/pkg/storage/share/error.go similarity index 100% rename from pkg/module/share/error.go rename to pkg/storage/share/error.go diff --git a/pkg/module/share/resource.go b/pkg/storage/share/resource.go similarity index 100% rename from pkg/module/share/resource.go rename to pkg/storage/share/resource.go diff --git a/pkg/module/share/options.go b/pkg/storage/share/shared_resource_options.go similarity index 100% rename from pkg/module/share/options.go rename to pkg/storage/share/shared_resource_options.go diff --git a/pkg/module/share/repository.go b/pkg/storage/share/shared_resource_store.go similarity index 96% rename from pkg/module/share/repository.go rename to pkg/storage/share/shared_resource_store.go index c5f4428..42df213 100644 --- a/pkg/module/share/repository.go +++ b/pkg/storage/share/shared_resource_store.go @@ -23,7 +23,7 @@ type Attribute interface { CreatedAt() time.Time } -type Repository interface { +type Store interface { DeleteResource(ctx context.Context, origin app.ID, resourceID ResourceID) error FindResources(ctx context.Context, funcs ...FindResourcesOptionFunc) ([]Resource, error) GetResource(ctx context.Context, origin app.ID, resourceID ResourceID) (Resource, error) diff --git a/pkg/storage/share/testsuite/store.go b/pkg/storage/share/testsuite/store.go new file mode 100644 index 0000000..47b6827 --- /dev/null +++ b/pkg/storage/share/testsuite/store.go @@ -0,0 +1,16 @@ +package testsuite + +import ( + "testing" + + "forge.cadoles.com/arcad/edge/pkg/storage/share" +) + +type NewTestStoreFunc func(testname string) (share.Store, error) + +func TestStore(t *testing.T, newStore NewTestStoreFunc) { + t.Run("Cases", func(t *testing.T) { + t.Parallel() + runRepositoryTests(t, newStore) + }) +} diff --git a/pkg/module/share/testsuite/repository_cases.go b/pkg/storage/share/testsuite/store_cases.go similarity index 77% rename from pkg/module/share/testsuite/repository_cases.go rename to pkg/storage/share/testsuite/store_cases.go index f8b1f83..876bce7 100644 --- a/pkg/module/share/testsuite/repository_cases.go +++ b/pkg/storage/share/testsuite/store_cases.go @@ -8,26 +8,26 @@ import ( "time" "forge.cadoles.com/arcad/edge/pkg/app" - "forge.cadoles.com/arcad/edge/pkg/module/share" + "forge.cadoles.com/arcad/edge/pkg/storage/share" "github.com/pkg/errors" ) type repositoryTestCase struct { Name string Skip bool - Run func(ctx context.Context, t *testing.T, repo share.Repository) error + Run func(ctx context.Context, t *testing.T, store share.Store) error } var repositoryTestCases = []repositoryTestCase{ { Name: "Update resource attributes", Skip: false, - Run: func(ctx context.Context, t *testing.T, repo share.Repository) error { + Run: func(ctx context.Context, t *testing.T, store share.Store) error { origin := app.ID("test") resourceID := share.ResourceID("test") // Try to create resource without attributes - _, err := repo.UpdateAttributes(ctx, origin, resourceID) + _, err := store.UpdateAttributes(ctx, origin, resourceID) if err == nil { return errors.New("err should not be nil") } @@ -43,7 +43,7 @@ var repositoryTestCases = []repositoryTestCase{ share.NewBaseAttribute("my_bool_attr", share.TypeBool, true), } - resource, err := repo.UpdateAttributes(ctx, origin, resourceID, attributes...) + resource, err := store.UpdateAttributes(ctx, origin, resourceID, attributes...) if err != nil { return errors.WithStack(err) } @@ -71,12 +71,12 @@ var repositoryTestCases = []repositoryTestCase{ { Name: "Find resources by attribute name", Skip: false, - Run: func(ctx context.Context, t *testing.T, repo share.Repository) error { - if err := loadTestData(ctx, "testdata/find_resources_by_attribute_name.json", repo); err != nil { + Run: func(ctx context.Context, t *testing.T, store share.Store) error { + if err := loadTestData(ctx, "testdata/find_resources_by_attribute_name.json", store); err != nil { return errors.WithStack(err) } - resources, err := repo.FindResources(ctx, share.WithName("my_number")) + resources, err := store.FindResources(ctx, share.WithName("my_number")) if err != nil { return errors.WithStack(err) } @@ -96,12 +96,12 @@ var repositoryTestCases = []repositoryTestCase{ { Name: "Find resources by attribute type", Skip: false, - Run: func(ctx context.Context, t *testing.T, repo share.Repository) error { - if err := loadTestData(ctx, "testdata/find_resources_by_attribute_type.json", repo); err != nil { + Run: func(ctx context.Context, t *testing.T, store share.Store) error { + if err := loadTestData(ctx, "testdata/find_resources_by_attribute_type.json", store); err != nil { return errors.WithStack(err) } - resources, err := repo.FindResources(ctx, share.WithType(share.TypePath)) + resources, err := store.FindResources(ctx, share.WithType(share.TypePath)) if err != nil { return errors.WithStack(err) } @@ -121,12 +121,12 @@ var repositoryTestCases = []repositoryTestCase{ { Name: "Find resources by attribute type and name", Skip: false, - Run: func(ctx context.Context, t *testing.T, repo share.Repository) error { - if err := loadTestData(ctx, "testdata/find_resources_by_attribute_type_and_name.json", repo); err != nil { + Run: func(ctx context.Context, t *testing.T, store share.Store) error { + if err := loadTestData(ctx, "testdata/find_resources_by_attribute_type_and_name.json", store); err != nil { return errors.WithStack(err) } - resources, err := repo.FindResources(ctx, share.WithType(share.TypeText), share.WithName("my_attr")) + resources, err := store.FindResources(ctx, share.WithType(share.TypeText), share.WithName("my_attr")) if err != nil { return errors.WithStack(err) } @@ -146,15 +146,15 @@ var repositoryTestCases = []repositoryTestCase{ { Name: "Get resource", Skip: false, - Run: func(ctx context.Context, t *testing.T, repo share.Repository) error { - if err := loadTestData(ctx, "testdata/get_resource.json", repo); err != nil { + Run: func(ctx context.Context, t *testing.T, store share.Store) error { + if err := loadTestData(ctx, "testdata/get_resource.json", store); err != nil { return errors.WithStack(err) } origin := app.ID("app1.edge.app") resourceID := share.ResourceID("res-1") - resource, err := repo.GetResource(ctx, origin, resourceID) + resource, err := store.GetResource(ctx, origin, resourceID) if err != nil { return errors.WithStack(err) } @@ -172,7 +172,7 @@ var repositoryTestCases = []repositoryTestCase{ return errors.Errorf("resource.ID(): expected '%v', got '%v'", e, g) } - resource, err = repo.GetResource(ctx, origin, "unexistant-id") + resource, err = store.GetResource(ctx, origin, "unexistant-id") if err == nil { return errors.New("err should not be nil") } @@ -187,8 +187,8 @@ var repositoryTestCases = []repositoryTestCase{ { Name: "Delete resource", Skip: false, - Run: func(ctx context.Context, t *testing.T, repo share.Repository) error { - if err := loadTestData(ctx, "testdata/delete_resource.json", repo); err != nil { + Run: func(ctx context.Context, t *testing.T, store share.Store) error { + if err := loadTestData(ctx, "testdata/delete_resource.json", store); err != nil { return errors.WithStack(err) } @@ -196,11 +196,11 @@ var repositoryTestCases = []repositoryTestCase{ resourceID := share.ResourceID("res-1") // It should delete an existing resource - if err := repo.DeleteResource(ctx, origin, resourceID); err != nil { + if err := store.DeleteResource(ctx, origin, resourceID); err != nil { return errors.WithStack(err) } - _, err := repo.GetResource(ctx, origin, resourceID) + _, err := store.GetResource(ctx, origin, resourceID) if err == nil { return errors.New("err should not be nil") } @@ -211,7 +211,7 @@ var repositoryTestCases = []repositoryTestCase{ } // It should not delete an unexistant resource - err = repo.DeleteResource(ctx, origin, resourceID) + err = store.DeleteResource(ctx, origin, resourceID) if err == nil { return errors.New("err should not be nil") } @@ -223,7 +223,7 @@ var repositoryTestCases = []repositoryTestCase{ otherOrigin := app.ID("app2.edge.app") // It should not delete a resource with the same id and another origin - resource, err := repo.GetResource(ctx, otherOrigin, resourceID) + resource, err := store.GetResource(ctx, otherOrigin, resourceID) if err != nil { return errors.New("err should not be nil") } @@ -238,8 +238,8 @@ var repositoryTestCases = []repositoryTestCase{ { Name: "Delete attributes", Skip: false, - Run: func(ctx context.Context, t *testing.T, repo share.Repository) error { - if err := loadTestData(ctx, "testdata/delete_attributes.json", repo); err != nil { + Run: func(ctx context.Context, t *testing.T, store share.Store) error { + if err := loadTestData(ctx, "testdata/delete_attributes.json", store); err != nil { return errors.WithStack(err) } @@ -247,11 +247,11 @@ var repositoryTestCases = []repositoryTestCase{ resourceID := share.ResourceID("res-1") // It should delete specified attributes - if err := repo.DeleteAttributes(ctx, origin, resourceID, "my_text", "my_bool"); err != nil { + if err := store.DeleteAttributes(ctx, origin, resourceID, "my_text", "my_bool"); err != nil { return errors.WithStack(err) } - resource, err := repo.GetResource(ctx, origin, resourceID) + resource, err := store.GetResource(ctx, origin, resourceID) if err != nil { return errors.WithStack(err) } @@ -270,7 +270,7 @@ var repositoryTestCases = []repositoryTestCase{ }, } -func runRepositoryTests(t *testing.T, newRepo NewTestRepoFunc) { +func runRepositoryTests(t *testing.T, newRepo NewTestStoreFunc) { for _, tc := range repositoryTestCases { func(tc repositoryTestCase) { t.Run(tc.Name, func(t *testing.T) { @@ -311,7 +311,7 @@ type jsonAttribute struct { UpdatedAt time.Time `json:"updatedAt"` } -func loadTestData(ctx context.Context, jsonFile string, repo share.Repository) error { +func loadTestData(ctx context.Context, jsonFile string, store share.Store) error { data, err := testData.ReadFile(jsonFile) if err != nil { return errors.WithStack(err) @@ -334,7 +334,7 @@ func loadTestData(ctx context.Context, jsonFile string, repo share.Repository) e ) } - _, err := repo.UpdateAttributes(ctx, app.ID(res.Origin), share.ResourceID(res.ID), attributes...) + _, err := store.UpdateAttributes(ctx, app.ID(res.Origin), share.ResourceID(res.ID), attributes...) if err != nil { return errors.WithStack(err) } diff --git a/pkg/module/share/testsuite/testdata.go b/pkg/storage/share/testsuite/testdata.go similarity index 100% rename from pkg/module/share/testsuite/testdata.go rename to pkg/storage/share/testsuite/testdata.go diff --git a/pkg/module/share/testsuite/testdata/delete_attributes.json b/pkg/storage/share/testsuite/testdata/delete_attributes.json similarity index 100% rename from pkg/module/share/testsuite/testdata/delete_attributes.json rename to pkg/storage/share/testsuite/testdata/delete_attributes.json diff --git a/pkg/module/share/testsuite/testdata/delete_resource.json b/pkg/storage/share/testsuite/testdata/delete_resource.json similarity index 100% rename from pkg/module/share/testsuite/testdata/delete_resource.json rename to pkg/storage/share/testsuite/testdata/delete_resource.json diff --git a/pkg/module/share/testsuite/testdata/find_resources_by_attribute_name.json b/pkg/storage/share/testsuite/testdata/find_resources_by_attribute_name.json similarity index 100% rename from pkg/module/share/testsuite/testdata/find_resources_by_attribute_name.json rename to pkg/storage/share/testsuite/testdata/find_resources_by_attribute_name.json diff --git a/pkg/module/share/testsuite/testdata/find_resources_by_attribute_type.json b/pkg/storage/share/testsuite/testdata/find_resources_by_attribute_type.json similarity index 100% rename from pkg/module/share/testsuite/testdata/find_resources_by_attribute_type.json rename to pkg/storage/share/testsuite/testdata/find_resources_by_attribute_type.json diff --git a/pkg/module/share/testsuite/testdata/find_resources_by_attribute_type_and_name.json b/pkg/storage/share/testsuite/testdata/find_resources_by_attribute_type_and_name.json similarity index 100% rename from pkg/module/share/testsuite/testdata/find_resources_by_attribute_type_and_name.json rename to pkg/storage/share/testsuite/testdata/find_resources_by_attribute_type_and_name.json diff --git a/pkg/module/share/testsuite/testdata/get_resource.json b/pkg/storage/share/testsuite/testdata/get_resource.json similarity index 100% rename from pkg/module/share/testsuite/testdata/get_resource.json rename to pkg/storage/share/testsuite/testdata/get_resource.json diff --git a/pkg/module/share/value_type.go b/pkg/storage/share/value_type.go similarity index 100% rename from pkg/module/share/value_type.go rename to pkg/storage/share/value_type.go diff --git a/pkg/module/share/value_type_test.go b/pkg/storage/share/value_type_test.go similarity index 100% rename from pkg/module/share/value_type_test.go rename to pkg/storage/share/value_type_test.go diff --git a/pkg/storage/sqlite/blob_store_test.go b/pkg/storage/sqlite/blob_store_test.go deleted file mode 100644 index 7fabd7c..0000000 --- a/pkg/storage/sqlite/blob_store_test.go +++ /dev/null @@ -1,28 +0,0 @@ -package sqlite - -import ( - "fmt" - "os" - "testing" - "time" - - "forge.cadoles.com/arcad/edge/pkg/storage/testsuite" - "github.com/pkg/errors" - "gitlab.com/wpetit/goweb/logger" -) - -func TestBlobStore(t *testing.T) { - t.Parallel() - logger.SetLevel(logger.LevelDebug) - - file := "./testdata/blobstore_test.sqlite" - - if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) { - t.Fatalf("%+v", errors.WithStack(err)) - } - - dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds()) - store := NewBlobStore(dsn) - - testsuite.TestBlobStore(t, store) -} diff --git a/pkg/storage/testsuite/blob_store.go b/pkg/storage/testsuite/blob_store.go index b73c086..26d23fd 100644 --- a/pkg/storage/testsuite/blob_store.go +++ b/pkg/storage/testsuite/blob_store.go @@ -1,14 +1,14 @@ package testsuite import ( + "context" "testing" "forge.cadoles.com/arcad/edge/pkg/storage" ) -func TestBlobStore(t *testing.T, store storage.BlobStore) { +func TestBlobStore(ctx context.Context, t *testing.T, store storage.BlobStore) { t.Run("Ops", func(t *testing.T) { - t.Parallel() - testBlobStoreOps(t, store) + testBlobStoreOps(ctx, t, store) }) } diff --git a/pkg/storage/testsuite/blob_store_benchmark.go b/pkg/storage/testsuite/blob_store_benchmark.go new file mode 100644 index 0000000..3dc1486 --- /dev/null +++ b/pkg/storage/testsuite/blob_store_benchmark.go @@ -0,0 +1,79 @@ +package testsuite + +import ( + "bytes" + "context" + "fmt" + "io" + "testing" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "github.com/pkg/errors" +) + +func BenchmarkBlobStore(t *testing.B, store storage.BlobStore) { + t.Run("BlobCreateUpdateReadDelete", func(t *testing.B) { + + for i := 0; i < t.N; i++ { + bucketName := fmt.Sprintf("bucket-%d", i) + if err := runBlobCreateUpdateReadDelete(store, bucketName); err != nil { + t.Fatalf("%+v", errors.WithStack(err)) + } + } + }) +} + +func runBlobCreateUpdateReadDelete(store storage.BlobStore, bucketName string) error { + ctx := context.Background() + + bucket, err := store.OpenBucket(ctx, bucketName) + if err != nil { + return errors.WithStack(err) + } + + blobID := storage.NewBlobID() + + writer, err := bucket.NewWriter(ctx, blobID) + if err != nil { + return errors.WithStack(err) + } + + data := []byte("foo") + + if _, err = writer.Write(data); err != nil { + return errors.WithStack(err) + } + + if err := writer.Close(); err != nil { + return errors.WithStack(err) + } + + reader, err := bucket.NewReader(ctx, blobID) + if err != nil { + return errors.WithStack(err) + } + + var buf bytes.Buffer + + if _, err = io.Copy(&buf, reader); err != nil { + return errors.WithStack(err) + } + + if err := reader.Close(); err != nil { + return errors.WithStack(err) + } + + if err := bucket.Delete(ctx, blobID); err != nil { + return errors.WithStack(err) + } + + if err := bucket.Close(); err != nil { + return errors.WithStack(err) + } + + if err := store.DeleteBucket(ctx, bucketName); err != nil { + return errors.WithStack(err) + } + + return nil +} diff --git a/pkg/storage/testsuite/blob_store_ops.go b/pkg/storage/testsuite/blob_store_ops.go index 6f97eb3..323fdf2 100644 --- a/pkg/storage/testsuite/blob_store_ops.go +++ b/pkg/storage/testsuite/blob_store_ops.go @@ -17,9 +17,11 @@ type blobStoreTestCase struct { var blobStoreTestCases = []blobStoreTestCase{ { - Name: "Open new bucket", + Name: "Open then delete bucket", Run: func(ctx context.Context, store storage.BlobStore) error { - bucket, err := store.OpenBucket(ctx, "open-new-bucket") + bucketName := "open-new-bucket" + + bucket, err := store.OpenBucket(ctx, bucketName) if err != nil { return errors.WithStack(err) } @@ -50,6 +52,23 @@ var blobStoreTestCases = []blobStoreTestCase{ return errors.WithStack(err) } + if err := store.DeleteBucket(ctx, bucketName); err != nil { + return errors.WithStack(err) + } + + buckets, err := store.ListBuckets(ctx) + if err != nil { + return errors.WithStack(err) + } + + for _, b := range buckets { + if b != bucketName { + continue + } + + return errors.Errorf("bucket '%s' should be deleted", bucketName) + } + return nil }, }, @@ -112,14 +131,12 @@ var blobStoreTestCases = []blobStoreTestCase{ }, } -func testBlobStoreOps(t *testing.T, store storage.BlobStore) { +func testBlobStoreOps(ctx context.Context, t *testing.T, store storage.BlobStore) { for _, tc := range blobStoreTestCases { func(tc blobStoreTestCase) { t.Run(tc.Name, func(t *testing.T) { t.Parallel() - ctx := context.Background() - if err := tc.Run(ctx, store); err != nil { t.Errorf("%+v", errors.WithStack(err)) } diff --git a/pkg/storage/testsuite/document_store.go b/pkg/storage/testsuite/document_store.go index cdbea14..8e46376 100644 --- a/pkg/storage/testsuite/document_store.go +++ b/pkg/storage/testsuite/document_store.go @@ -1,14 +1,14 @@ package testsuite import ( + "context" "testing" "forge.cadoles.com/arcad/edge/pkg/storage" ) -func TestDocumentStore(t *testing.T, store storage.DocumentStore) { +func TestDocumentStore(ctx context.Context, t *testing.T, store storage.DocumentStore) { t.Run("Ops", func(t *testing.T) { - t.Parallel() - testDocumentStoreOps(t, store) + testDocumentStoreOps(ctx, t, store) }) } diff --git a/pkg/storage/testsuite/document_store_ops.go b/pkg/storage/testsuite/document_store_ops.go index 7281fed..1de8e46 100644 --- a/pkg/storage/testsuite/document_store_ops.go +++ b/pkg/storage/testsuite/document_store_ops.go @@ -433,12 +433,12 @@ var documentStoreOpsTestCases = []documentStoreOpsTestCase{ }, } -func testDocumentStoreOps(t *testing.T, store storage.DocumentStore) { +func testDocumentStoreOps(ctx context.Context, t *testing.T, store storage.DocumentStore) { for _, tc := range documentStoreOpsTestCases { func(tc documentStoreOpsTestCase) { t.Run(tc.Name, func(t *testing.T) { t.Parallel() - if err := tc.Run(context.Background(), store); err != nil { + if err := tc.Run(ctx, store); err != nil { t.Errorf("%+v", errors.WithStack(err)) } })