feat(storage): rpc based implementation
arcad/edge/pipeline/pr-master This commit looks good
Details
arcad/edge/pipeline/pr-master This commit looks good
Details
This commit is contained in:
parent
c3535a4a9b
commit
776d0cc3cd
|
@ -4,4 +4,5 @@
|
||||||
/tools
|
/tools
|
||||||
*.sqlite
|
*.sqlite
|
||||||
/.gitea-release
|
/.gitea-release
|
||||||
/.edge
|
/.edge
|
||||||
|
/data
|
16
Makefile
16
Makefile
|
@ -11,9 +11,12 @@ DATE_VERSION := $(shell date +%Y.%-m.%-d)
|
||||||
FULL_VERSION := v$(DATE_VERSION)-$(GIT_VERSION)$(if $(shell git diff --stat),-dirty,)
|
FULL_VERSION := v$(DATE_VERSION)-$(GIT_VERSION)$(if $(shell git diff --stat),-dirty,)
|
||||||
APP_PATH ?= misc/client-sdk-testsuite/dist
|
APP_PATH ?= misc/client-sdk-testsuite/dist
|
||||||
RUN_APP_ARGS ?=
|
RUN_APP_ARGS ?=
|
||||||
|
RUN_STORAGE_SERVER_ARGS ?=
|
||||||
|
|
||||||
SHELL := bash
|
SHELL := bash
|
||||||
|
|
||||||
build: build-edge-cli build-client-sdk-test-app
|
|
||||||
|
build: build-cli build-storage-server build-client-sdk-test-app
|
||||||
|
|
||||||
watch: tools/modd/bin/modd
|
watch: tools/modd/bin/modd
|
||||||
tools/modd/bin/modd
|
tools/modd/bin/modd
|
||||||
|
@ -27,12 +30,18 @@ test-go:
|
||||||
lint:
|
lint:
|
||||||
golangci-lint run --enable-all $(LINT_ARGS)
|
golangci-lint run --enable-all $(LINT_ARGS)
|
||||||
|
|
||||||
build-edge-cli: build-sdk
|
build-cli: build-sdk
|
||||||
CGO_ENABLED=0 go build \
|
CGO_ENABLED=0 go build \
|
||||||
-v \
|
-v \
|
||||||
-o ./bin/cli \
|
-o ./bin/cli \
|
||||||
./cmd/cli
|
./cmd/cli
|
||||||
|
|
||||||
|
build-storage-server: build-sdk
|
||||||
|
CGO_ENABLED=0 go build \
|
||||||
|
-v \
|
||||||
|
-o ./bin/storage-server \
|
||||||
|
./cmd/storage-server
|
||||||
|
|
||||||
build-client-sdk-test-app:
|
build-client-sdk-test-app:
|
||||||
cd misc/client-sdk-testsuite && $(MAKE) dist
|
cd misc/client-sdk-testsuite && $(MAKE) dist
|
||||||
|
|
||||||
|
@ -68,6 +77,9 @@ node_modules:
|
||||||
run-app: .env
|
run-app: .env
|
||||||
( set -o allexport && source .env && set +o allexport && bin/cli app run -p $(APP_PATH) $$RUN_APP_ARGS )
|
( set -o allexport && source .env && set +o allexport && bin/cli app run -p $(APP_PATH) $$RUN_APP_ARGS )
|
||||||
|
|
||||||
|
run-storage-server: .env
|
||||||
|
( set -o allexport && source .env && set +o allexport && bin/storage-server run $$RUN_STORAGE_SERVER_ARGS )
|
||||||
|
|
||||||
.env:
|
.env:
|
||||||
cp .env.dist .env
|
cp .env.dist .env
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,8 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/keegancsmith/rpc"
|
||||||
|
|
||||||
"forge.cadoles.com/arcad/edge/pkg/app"
|
"forge.cadoles.com/arcad/edge/pkg/app"
|
||||||
"forge.cadoles.com/arcad/edge/pkg/bus"
|
"forge.cadoles.com/arcad/edge/pkg/bus"
|
||||||
"forge.cadoles.com/arcad/edge/pkg/bus/memory"
|
"forge.cadoles.com/arcad/edge/pkg/bus/memory"
|
||||||
|
@ -30,7 +32,7 @@ import (
|
||||||
shareModule "forge.cadoles.com/arcad/edge/pkg/module/share"
|
shareModule "forge.cadoles.com/arcad/edge/pkg/module/share"
|
||||||
shareSqlite "forge.cadoles.com/arcad/edge/pkg/module/share/sqlite"
|
shareSqlite "forge.cadoles.com/arcad/edge/pkg/module/share/sqlite"
|
||||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
storageSqlite "forge.cadoles.com/arcad/edge/pkg/storage/sqlite"
|
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/client"
|
||||||
"gitlab.com/wpetit/goweb/logger"
|
"gitlab.com/wpetit/goweb/logger"
|
||||||
|
|
||||||
"forge.cadoles.com/arcad/edge/pkg/bundle"
|
"forge.cadoles.com/arcad/edge/pkg/bundle"
|
||||||
|
@ -439,19 +441,41 @@ func initMemoryBus(deps *moduleDeps) error {
|
||||||
|
|
||||||
func initDatastores(storageFile string, appID app.ID) ModuleDepFunc {
|
func initDatastores(storageFile string, appID app.ID) ModuleDepFunc {
|
||||||
return func(deps *moduleDeps) error {
|
return func(deps *moduleDeps) error {
|
||||||
storageFile = injectAppID(storageFile, appID)
|
// storageFile = injectAppID(storageFile, appID)
|
||||||
|
|
||||||
if err := ensureDir(storageFile); err != nil {
|
blobStoreClient, err := rpc.DialHTTPPath(
|
||||||
return errors.WithStack(err)
|
"tcp",
|
||||||
}
|
"localhost:3001",
|
||||||
|
injectAppID("/blobstore?tenant=%APPID%", appID),
|
||||||
db, err := storageSqlite.Open(storageFile)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
deps.DocumentStore = storageSqlite.NewDocumentStoreWithDB(db)
|
deps.BlobStore = client.NewBlobStore(blobStoreClient)
|
||||||
deps.BlobStore = storageSqlite.NewBlobStoreWithDB(db)
|
|
||||||
|
documentStoreClient, err := rpc.DialHTTPPath(
|
||||||
|
"tcp",
|
||||||
|
"localhost:3001",
|
||||||
|
injectAppID("/documentstore?tenant=%APPID%", appID),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
deps.DocumentStore = client.NewDocumentStore(documentStoreClient)
|
||||||
|
|
||||||
|
// if err := ensureDir(storageFile); err != nil {
|
||||||
|
// return errors.WithStack(err)
|
||||||
|
// }
|
||||||
|
|
||||||
|
// db, err := storageSqlite.Open(storageFile)
|
||||||
|
// if err != nil {
|
||||||
|
// return errors.WithStack(err)
|
||||||
|
// }
|
||||||
|
|
||||||
|
// deps.DocumentStore = storageSqlite.NewDocumentStoreWithDB(db)
|
||||||
|
// deps.BlobStore = storageSqlite.NewBlobStoreWithDB(db)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,13 @@
|
||||||
|
package auth
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/urfave/cli/v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Root() *cli.Command {
|
||||||
|
return &cli.Command{
|
||||||
|
Name: "auth",
|
||||||
|
Usage: "Auth related command",
|
||||||
|
Subcommands: []*cli.Command{},
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,48 @@
|
||||||
|
package command
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"sort"
|
||||||
|
|
||||||
|
"github.com/urfave/cli/v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Main(commands ...*cli.Command) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
app := &cli.App{
|
||||||
|
Name: "storage-server",
|
||||||
|
Usage: "Edge storage server",
|
||||||
|
Commands: commands,
|
||||||
|
Flags: []cli.Flag{
|
||||||
|
&cli.BoolFlag{
|
||||||
|
Name: "debug",
|
||||||
|
EnvVars: []string{"DEBUG"},
|
||||||
|
Value: false,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
app.ExitErrHandler = func(ctx *cli.Context, err error) {
|
||||||
|
if err == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
debug := ctx.Bool("debug")
|
||||||
|
|
||||||
|
if !debug {
|
||||||
|
fmt.Printf("[ERROR] %v\n", err)
|
||||||
|
} else {
|
||||||
|
fmt.Printf("%+v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Sort(cli.FlagsByName(app.Flags))
|
||||||
|
sort.Sort(cli.CommandsByName(app.Commands))
|
||||||
|
|
||||||
|
if err := app.RunContext(ctx, os.Args); err != nil {
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,145 @@
|
||||||
|
package command
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/keegancsmith/rpc"
|
||||||
|
"gitlab.com/wpetit/goweb/logger"
|
||||||
|
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/server"
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage/sqlite"
|
||||||
|
"github.com/go-chi/chi/v5"
|
||||||
|
"github.com/go-chi/chi/v5/middleware"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"github.com/urfave/cli/v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Run() *cli.Command {
|
||||||
|
return &cli.Command{
|
||||||
|
Name: "run",
|
||||||
|
Usage: "Run server",
|
||||||
|
Flags: []cli.Flag{
|
||||||
|
&cli.StringFlag{
|
||||||
|
Name: "address",
|
||||||
|
Aliases: []string{"addr"},
|
||||||
|
Value: ":3001",
|
||||||
|
},
|
||||||
|
&cli.StringFlag{
|
||||||
|
Name: "data-dir",
|
||||||
|
Value: "./data",
|
||||||
|
},
|
||||||
|
&cli.StringFlag{
|
||||||
|
Name: "dsn-query-string",
|
||||||
|
Value: fmt.Sprintf("_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", (60 * time.Second).Milliseconds()),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Action: func(ctx *cli.Context) error {
|
||||||
|
addr := ctx.String("address")
|
||||||
|
dataDir := ctx.String("data-dir")
|
||||||
|
dsnQueryString := ctx.String("dsn-query-string")
|
||||||
|
|
||||||
|
router := chi.NewRouter()
|
||||||
|
|
||||||
|
router.Use(middleware.RealIP)
|
||||||
|
router.Use(middleware.Logger)
|
||||||
|
|
||||||
|
router.Handle("/blobstore", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
tenant := r.URL.Query().Get("tenant")
|
||||||
|
if tenant == "" {
|
||||||
|
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
server, err := getBlobStoreStoreServer(dataDir, tenant, dsnQueryString)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(r.Context(), "could not retrieve blob store server", logger.E(errors.WithStack(err)), logger.F("tenant", tenant))
|
||||||
|
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
server.ServeHTTP(w, r)
|
||||||
|
|
||||||
|
}))
|
||||||
|
|
||||||
|
router.Handle("/documentstore", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
tenant := r.URL.Query().Get("tenant")
|
||||||
|
if tenant == "" {
|
||||||
|
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
server, err := getDocumentStoreServer(dataDir, tenant, dsnQueryString)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(r.Context(), "could not retrieve document store server", logger.E(errors.WithStack(err)), logger.F("tenant", tenant))
|
||||||
|
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
server.ServeHTTP(w, r)
|
||||||
|
}))
|
||||||
|
|
||||||
|
if err := http.ListenAndServe(addr, router); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var documentStoreTenants sync.Map
|
||||||
|
|
||||||
|
func getDocumentStoreServer(dataDir, tenant, dsnQueryString string) (*rpc.Server, error) {
|
||||||
|
dir := filepath.Join(dataDir, tenant)
|
||||||
|
if err := os.MkdirAll(dir, os.FileMode(0750)); err != nil {
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
file := filepath.Join(dir, "documentstore.sqlite")
|
||||||
|
dsn := fmt.Sprintf("%s?%s", file, dsnQueryString)
|
||||||
|
|
||||||
|
documentStore := sqlite.NewDocumentStore(dsn)
|
||||||
|
documentStoreServer := server.NewDocumentStoreServer(documentStore)
|
||||||
|
|
||||||
|
rawDocumentStoreServer, _ := documentStoreTenants.LoadOrStore(tenant, documentStoreServer)
|
||||||
|
|
||||||
|
documentStoreServer, ok := rawDocumentStoreServer.(*rpc.Server)
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.Errorf("unexpected document store server value of type '%T'", rawDocumentStoreServer)
|
||||||
|
}
|
||||||
|
|
||||||
|
return documentStoreServer, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var blobStoreTenants sync.Map
|
||||||
|
|
||||||
|
func getBlobStoreStoreServer(dataDir, tenant, dsnQueryString string) (*rpc.Server, error) {
|
||||||
|
dir := filepath.Join(dataDir, tenant)
|
||||||
|
if err := os.MkdirAll(dir, os.FileMode(0750)); err != nil {
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
file := filepath.Join(dir, "blobstore.sqlite")
|
||||||
|
dsn := fmt.Sprintf("%s?%s", file, dsnQueryString)
|
||||||
|
|
||||||
|
blobStore := sqlite.NewBlobStore(dsn)
|
||||||
|
blobStoreServer := server.NewBlobStoreServer(blobStore)
|
||||||
|
|
||||||
|
rawBlobStoreServer, _ := documentStoreTenants.LoadOrStore(tenant, blobStoreServer)
|
||||||
|
|
||||||
|
blobStoreServer, ok := rawBlobStoreServer.(*rpc.Server)
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.Errorf("unexpected document store server value of type '%T'", rawBlobStoreServer)
|
||||||
|
}
|
||||||
|
|
||||||
|
return blobStoreServer, nil
|
||||||
|
}
|
|
@ -0,0 +1,13 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"forge.cadoles.com/arcad/edge/cmd/storage-server/command"
|
||||||
|
"forge.cadoles.com/arcad/edge/cmd/storage-server/command/auth"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
command.Main(
|
||||||
|
command.Run(),
|
||||||
|
auth.Root(),
|
||||||
|
)
|
||||||
|
}
|
1
go.mod
1
go.mod
|
@ -15,6 +15,7 @@ require (
|
||||||
github.com/go-playground/universal-translator v0.16.0 // indirect
|
github.com/go-playground/universal-translator v0.16.0 // indirect
|
||||||
github.com/goccy/go-json v0.9.11 // indirect
|
github.com/goccy/go-json v0.9.11 // indirect
|
||||||
github.com/gogo/protobuf v0.0.0-20161014173244-50d1bd39ce4e // indirect
|
github.com/gogo/protobuf v0.0.0-20161014173244-50d1bd39ce4e // indirect
|
||||||
|
github.com/keegancsmith/rpc v1.3.0 // indirect
|
||||||
github.com/leodido/go-urn v1.1.0 // indirect
|
github.com/leodido/go-urn v1.1.0 // indirect
|
||||||
github.com/lestrrat-go/blackmagic v1.0.1 // indirect
|
github.com/lestrrat-go/blackmagic v1.0.1 // indirect
|
||||||
github.com/lestrrat-go/httpcc v1.0.1 // indirect
|
github.com/lestrrat-go/httpcc v1.0.1 // indirect
|
||||||
|
|
2
go.sum
2
go.sum
|
@ -201,6 +201,8 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1
|
||||||
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
|
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
|
||||||
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
|
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
|
||||||
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
|
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
|
||||||
|
github.com/keegancsmith/rpc v1.3.0 h1:wGWOpjcNrZaY8GDYZJfvyxmlLljm3YQWF+p918DXtDk=
|
||||||
|
github.com/keegancsmith/rpc v1.3.0/go.mod h1:6O2xnOGjPyvIPbvp0MdrOe5r6cu1GZ4JoTzpzDhWeo0=
|
||||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||||
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
|
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
|
||||||
|
|
|
@ -9,6 +9,7 @@ modd.conf
|
||||||
prep: make build-client-sdk-test-app
|
prep: make build-client-sdk-test-app
|
||||||
prep: make build
|
prep: make build
|
||||||
daemon: make run-app
|
daemon: make run-app
|
||||||
|
daemon: make run-storage-server
|
||||||
}
|
}
|
||||||
|
|
||||||
**/*.go {
|
**/*.go {
|
||||||
|
|
|
@ -8,6 +8,16 @@ func (o *AndOperator) Token() Token {
|
||||||
return TokenAnd
|
return TokenAnd
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (o *AndOperator) AsMap() map[string]any {
|
||||||
|
children := make([]map[string]any, 0, len(o.children))
|
||||||
|
for _, c := range o.children {
|
||||||
|
children = append(children, c.AsMap())
|
||||||
|
}
|
||||||
|
return map[string]any{
|
||||||
|
string(TokenAnd): children,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (o *AndOperator) Children() []Operator {
|
func (o *AndOperator) Children() []Operator {
|
||||||
return o.children
|
return o.children
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,6 +12,16 @@ func (o *EqOperator) Fields() map[string]interface{} {
|
||||||
return o.fields
|
return o.fields
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (o *EqOperator) AsMap() map[string]any {
|
||||||
|
fields := make(map[string]any, len(o.fields))
|
||||||
|
for k, v := range o.fields {
|
||||||
|
fields[k] = v
|
||||||
|
}
|
||||||
|
return map[string]any{
|
||||||
|
string(TokenEq): fields,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func NewEqOperator(fields map[string]interface{}) *EqOperator {
|
func NewEqOperator(fields map[string]interface{}) *EqOperator {
|
||||||
return &EqOperator{fields}
|
return &EqOperator{fields}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,6 +29,15 @@ func NewFrom(raw map[string]interface{}) (*Filter, error) {
|
||||||
return &Filter{op}, nil
|
return &Filter{op}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *Filter) AsMap() map[string]any {
|
||||||
|
root := f.Root()
|
||||||
|
if root == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return f.Root().AsMap()
|
||||||
|
}
|
||||||
|
|
||||||
func toFieldOperator(v interface{}) (Operator, error) {
|
func toFieldOperator(v interface{}) (Operator, error) {
|
||||||
vv, ok := v.(map[string]interface{})
|
vv, ok := v.(map[string]interface{})
|
||||||
if !ok {
|
if !ok {
|
||||||
|
@ -93,8 +102,17 @@ func toFieldOperator(v interface{}) (Operator, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func toAggregateOperator(token Token, v interface{}) (Operator, error) {
|
func toAggregateOperator(token Token, v interface{}) (Operator, error) {
|
||||||
vv, ok := v.([]interface{})
|
var vv []interface{}
|
||||||
if !ok {
|
|
||||||
|
switch typed := v.(type) {
|
||||||
|
case []interface{}:
|
||||||
|
vv = typed
|
||||||
|
case []map[string]interface{}:
|
||||||
|
vv = make([]interface{}, 0, len(typed))
|
||||||
|
for _, item := range typed {
|
||||||
|
vv = append(vv, item)
|
||||||
|
}
|
||||||
|
default:
|
||||||
return nil, errors.WithStack(ErrInvalidAggregationOperator)
|
return nil, errors.WithStack(ErrInvalidAggregationOperator)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -12,6 +12,16 @@ func (o *GtOperator) Fields() map[string]interface{} {
|
||||||
return o.fields
|
return o.fields
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (o *GtOperator) AsMap() map[string]any {
|
||||||
|
fields := make(map[string]any, len(o.fields))
|
||||||
|
for k, v := range o.fields {
|
||||||
|
fields[k] = v
|
||||||
|
}
|
||||||
|
return map[string]any{
|
||||||
|
string(TokenGt): fields,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func NewGtOperator(fields OperatorFields) *GtOperator {
|
func NewGtOperator(fields OperatorFields) *GtOperator {
|
||||||
return &GtOperator{fields}
|
return &GtOperator{fields}
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,6 +12,16 @@ func (o *GteOperator) Fields() map[string]interface{} {
|
||||||
return o.fields
|
return o.fields
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (o *GteOperator) AsMap() map[string]any {
|
||||||
|
fields := make(map[string]any, len(o.fields))
|
||||||
|
for k, v := range o.fields {
|
||||||
|
fields[k] = v
|
||||||
|
}
|
||||||
|
return map[string]any{
|
||||||
|
string(TokenGte): fields,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func NewGteOperator(fields OperatorFields) *GteOperator {
|
func NewGteOperator(fields OperatorFields) *GteOperator {
|
||||||
return &GteOperator{fields}
|
return &GteOperator{fields}
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,6 +12,16 @@ func (o *InOperator) Fields() map[string]interface{} {
|
||||||
return o.fields
|
return o.fields
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (o *InOperator) AsMap() map[string]any {
|
||||||
|
fields := make(map[string]any, len(o.fields))
|
||||||
|
for k, v := range o.fields {
|
||||||
|
fields[k] = v
|
||||||
|
}
|
||||||
|
return map[string]any{
|
||||||
|
string(TokenIn): fields,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func NewInOperator(fields OperatorFields) *InOperator {
|
func NewInOperator(fields OperatorFields) *InOperator {
|
||||||
return &InOperator{fields}
|
return &InOperator{fields}
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,6 +12,16 @@ func (o *LikeOperator) Fields() map[string]interface{} {
|
||||||
return o.fields
|
return o.fields
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (o *LikeOperator) AsMap() map[string]any {
|
||||||
|
fields := make(map[string]any, len(o.fields))
|
||||||
|
for k, v := range o.fields {
|
||||||
|
fields[k] = v
|
||||||
|
}
|
||||||
|
return map[string]any{
|
||||||
|
string(TokenLike): fields,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func NewLikeOperator(fields OperatorFields) *LikeOperator {
|
func NewLikeOperator(fields OperatorFields) *LikeOperator {
|
||||||
return &LikeOperator{fields}
|
return &LikeOperator{fields}
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,6 +12,16 @@ func (o *LtOperator) Fields() map[string]interface{} {
|
||||||
return o.fields
|
return o.fields
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (o *LtOperator) AsMap() map[string]any {
|
||||||
|
fields := make(map[string]any, len(o.fields))
|
||||||
|
for k, v := range o.fields {
|
||||||
|
fields[k] = v
|
||||||
|
}
|
||||||
|
return map[string]any{
|
||||||
|
string(TokenLt): fields,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func NewLtOperator(fields OperatorFields) *LtOperator {
|
func NewLtOperator(fields OperatorFields) *LtOperator {
|
||||||
return &LtOperator{fields}
|
return &LtOperator{fields}
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,6 +12,16 @@ func (o *LteOperator) Fields() map[string]interface{} {
|
||||||
return o.fields
|
return o.fields
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (o *LteOperator) AsMap() map[string]any {
|
||||||
|
fields := make(map[string]any, len(o.fields))
|
||||||
|
for k, v := range o.fields {
|
||||||
|
fields[k] = v
|
||||||
|
}
|
||||||
|
return map[string]any{
|
||||||
|
string(TokenLte): fields,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func NewLteOperator(fields OperatorFields) *LteOperator {
|
func NewLteOperator(fields OperatorFields) *LteOperator {
|
||||||
return &LteOperator{fields}
|
return &LteOperator{fields}
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,6 +12,16 @@ func (o *NeqOperator) Fields() map[string]interface{} {
|
||||||
return o.fields
|
return o.fields
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (o *NeqOperator) AsMap() map[string]any {
|
||||||
|
fields := make(map[string]any, len(o.fields))
|
||||||
|
for k, v := range o.fields {
|
||||||
|
fields[k] = v
|
||||||
|
}
|
||||||
|
return map[string]any{
|
||||||
|
string(TokenNeq): fields,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func NewNeqOperator(fields map[string]interface{}) *NeqOperator {
|
func NewNeqOperator(fields map[string]interface{}) *NeqOperator {
|
||||||
return &NeqOperator{fields}
|
return &NeqOperator{fields}
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,6 +12,16 @@ func (o *NotOperator) Children() []Operator {
|
||||||
return o.children
|
return o.children
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (o *NotOperator) AsMap() map[string]any {
|
||||||
|
children := make([]map[string]any, 0, len(o.children))
|
||||||
|
for _, c := range o.children {
|
||||||
|
children = append(children, c.AsMap())
|
||||||
|
}
|
||||||
|
return map[string]any{
|
||||||
|
string(TokenNot): children,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func NewNotOperator(ops ...Operator) *NotOperator {
|
func NewNotOperator(ops ...Operator) *NotOperator {
|
||||||
return &NotOperator{ops}
|
return &NotOperator{ops}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,4 +20,15 @@ type OperatorFields map[string]interface{}
|
||||||
|
|
||||||
type Operator interface {
|
type Operator interface {
|
||||||
Token() Token
|
Token() Token
|
||||||
|
AsMap() map[string]any
|
||||||
|
}
|
||||||
|
|
||||||
|
type FieldOperator interface {
|
||||||
|
Operator
|
||||||
|
Fields() map[string]any
|
||||||
|
}
|
||||||
|
|
||||||
|
type AggregatorOperator interface {
|
||||||
|
Operator
|
||||||
|
Children() []Operator
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,6 +12,16 @@ func (o *OrOperator) Children() []Operator {
|
||||||
return o.children
|
return o.children
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (o *OrOperator) AsMap() map[string]any {
|
||||||
|
children := make([]map[string]any, 0, len(o.children))
|
||||||
|
for _, c := range o.children {
|
||||||
|
children = append(children, c.AsMap())
|
||||||
|
}
|
||||||
|
return map[string]any{
|
||||||
|
string(TokenOr): children,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func NewOrOperator(ops ...Operator) *OrOperator {
|
func NewOrOperator(ops ...Operator) *OrOperator {
|
||||||
return &OrOperator{ops}
|
return &OrOperator{ops}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,241 @@
|
||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/keegancsmith/rpc"
|
||||||
|
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/server/blob"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type BlobBucket struct {
|
||||||
|
name string
|
||||||
|
id blob.BucketID
|
||||||
|
client *rpc.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
// Size implements storage.BlobBucket
|
||||||
|
func (b *BlobBucket) Size(ctx context.Context) (int64, error) {
|
||||||
|
args := blob.GetBucketSizeArgs{
|
||||||
|
BucketID: b.id,
|
||||||
|
}
|
||||||
|
|
||||||
|
reply := blob.GetBucketSizeReply{}
|
||||||
|
|
||||||
|
if err := b.client.Call(ctx, "Service.GetBucketSize", args, &reply); err != nil {
|
||||||
|
return 0, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return reply.Size, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Name implements storage.BlobBucket
|
||||||
|
func (b *BlobBucket) Name() string {
|
||||||
|
return b.name
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close implements storage.BlobBucket
|
||||||
|
func (b *BlobBucket) Close() error {
|
||||||
|
args := blob.CloseBucketArgs{
|
||||||
|
BucketID: b.id,
|
||||||
|
}
|
||||||
|
|
||||||
|
reply := blob.CloseBucketReply{}
|
||||||
|
|
||||||
|
if err := b.client.Call(context.Background(), "Service.CloseBucket", args, &reply); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete implements storage.BlobBucket
|
||||||
|
func (b *BlobBucket) Delete(ctx context.Context, id storage.BlobID) error {
|
||||||
|
args := blob.DeleteBucketArgs{
|
||||||
|
BucketName: b.name,
|
||||||
|
}
|
||||||
|
|
||||||
|
reply := blob.DeleteBucketReply{}
|
||||||
|
|
||||||
|
if err := b.client.Call(context.Background(), "Service.DeleteBucket", args, &reply); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get implements storage.BlobBucket
|
||||||
|
func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobInfo, error) {
|
||||||
|
args := blob.GetBlobInfoArgs{
|
||||||
|
BucketID: b.id,
|
||||||
|
BlobID: id,
|
||||||
|
}
|
||||||
|
|
||||||
|
reply := blob.GetBlobInfoReply{}
|
||||||
|
|
||||||
|
if err := b.client.Call(context.Background(), "Service.GetBlobInfo", args, &reply); err != nil {
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return reply.BlobInfo, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// List implements storage.BlobBucket
|
||||||
|
func (b *BlobBucket) List(ctx context.Context) ([]storage.BlobInfo, error) {
|
||||||
|
args := blob.ListBlobInfoArgs{
|
||||||
|
BucketID: b.id,
|
||||||
|
}
|
||||||
|
|
||||||
|
reply := blob.ListBlobInfoReply{}
|
||||||
|
|
||||||
|
if err := b.client.Call(context.Background(), "Service.ListBlobInfo", args, &reply); err != nil {
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return reply.BlobInfos, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewReader implements storage.BlobBucket
|
||||||
|
func (b *BlobBucket) NewReader(ctx context.Context, id storage.BlobID) (io.ReadSeekCloser, error) {
|
||||||
|
args := blob.NewBlobReaderArgs{
|
||||||
|
BucketID: b.id,
|
||||||
|
BlobID: id,
|
||||||
|
}
|
||||||
|
|
||||||
|
reply := blob.NewBlobReaderReply{}
|
||||||
|
|
||||||
|
if err := b.client.Call(context.Background(), "Service.NewBlobReader", args, &reply); err != nil {
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &blobReaderCloser{
|
||||||
|
readerID: reply.ReaderID,
|
||||||
|
client: b.client,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewWriter implements storage.BlobBucket
|
||||||
|
func (b *BlobBucket) NewWriter(ctx context.Context, id storage.BlobID) (io.WriteCloser, error) {
|
||||||
|
args := blob.NewBlobWriterArgs{
|
||||||
|
BucketID: b.id,
|
||||||
|
BlobID: id,
|
||||||
|
}
|
||||||
|
|
||||||
|
reply := blob.NewBlobWriterReply{}
|
||||||
|
|
||||||
|
if err := b.client.Call(context.Background(), "Service.NewBlobWriter", args, &reply); err != nil {
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &blobWriterCloser{
|
||||||
|
blobID: id,
|
||||||
|
writerID: reply.WriterID,
|
||||||
|
client: b.client,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type blobWriterCloser struct {
|
||||||
|
blobID storage.BlobID
|
||||||
|
writerID blob.WriterID
|
||||||
|
client *rpc.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write implements io.WriteCloser
|
||||||
|
func (bwc *blobWriterCloser) Write(data []byte) (int, error) {
|
||||||
|
args := blob.WriteBlobArgs{
|
||||||
|
WriterID: bwc.writerID,
|
||||||
|
Data: data,
|
||||||
|
}
|
||||||
|
|
||||||
|
reply := blob.WriteBlobReply{}
|
||||||
|
|
||||||
|
if err := bwc.client.Call(context.Background(), "Service.WriteBlob", args, &reply); err != nil {
|
||||||
|
return 0, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return reply.Written, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close implements io.WriteCloser
|
||||||
|
func (bwc *blobWriterCloser) Close() error {
|
||||||
|
args := blob.CloseWriterArgs{
|
||||||
|
WriterID: bwc.writerID,
|
||||||
|
}
|
||||||
|
|
||||||
|
reply := blob.CloseBucketReply{}
|
||||||
|
|
||||||
|
if err := bwc.client.Call(context.Background(), "Service.CloseWriter", args, &reply); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type blobReaderCloser struct {
|
||||||
|
readerID blob.ReaderID
|
||||||
|
client *rpc.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read implements io.ReadSeekCloser
|
||||||
|
func (brc *blobReaderCloser) Read(p []byte) (int, error) {
|
||||||
|
args := blob.ReadBlobArgs{
|
||||||
|
ReaderID: brc.readerID,
|
||||||
|
Length: len(p),
|
||||||
|
}
|
||||||
|
|
||||||
|
reply := blob.ReadBlobReply{}
|
||||||
|
|
||||||
|
if err := brc.client.Call(context.Background(), "Service.ReadBlob", args, &reply); err != nil {
|
||||||
|
return 0, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
copy(p, reply.Data)
|
||||||
|
|
||||||
|
if reply.EOF {
|
||||||
|
return reply.Read, io.EOF
|
||||||
|
}
|
||||||
|
|
||||||
|
return reply.Read, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Seek implements io.ReadSeekCloser
|
||||||
|
func (brc *blobReaderCloser) Seek(offset int64, whence int) (int64, error) {
|
||||||
|
args := blob.SeekBlobArgs{
|
||||||
|
ReaderID: brc.readerID,
|
||||||
|
Offset: offset,
|
||||||
|
Whence: whence,
|
||||||
|
}
|
||||||
|
|
||||||
|
reply := blob.SeekBlobReply{}
|
||||||
|
|
||||||
|
if err := brc.client.Call(context.Background(), "Service.SeekBlob", args, &reply); err != nil {
|
||||||
|
return 0, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return reply.Read, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close implements io.ReadSeekCloser
|
||||||
|
func (brc *blobReaderCloser) Close() error {
|
||||||
|
args := blob.CloseReaderArgs{
|
||||||
|
ReaderID: brc.readerID,
|
||||||
|
}
|
||||||
|
|
||||||
|
reply := blob.CloseReaderReply{}
|
||||||
|
|
||||||
|
if err := brc.client.Call(context.Background(), "Service.CloseReader", args, &reply); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
_ storage.BlobBucket = &BlobBucket{}
|
||||||
|
_ storage.BlobInfo = &BlobInfo{}
|
||||||
|
_ io.WriteCloser = &blobWriterCloser{}
|
||||||
|
_ io.ReadSeekCloser = &blobReaderCloser{}
|
||||||
|
)
|
|
@ -0,0 +1,40 @@
|
||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
|
)
|
||||||
|
|
||||||
|
type BlobInfo struct {
|
||||||
|
id storage.BlobID
|
||||||
|
bucket string
|
||||||
|
contentType string
|
||||||
|
modTime time.Time
|
||||||
|
size int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// Bucket implements storage.BlobInfo
|
||||||
|
func (i *BlobInfo) Bucket() string {
|
||||||
|
return i.bucket
|
||||||
|
}
|
||||||
|
|
||||||
|
// ID implements storage.BlobInfo
|
||||||
|
func (i *BlobInfo) ID() storage.BlobID {
|
||||||
|
return i.id
|
||||||
|
}
|
||||||
|
|
||||||
|
// ContentType implements storage.BlobInfo
|
||||||
|
func (i *BlobInfo) ContentType() string {
|
||||||
|
return i.contentType
|
||||||
|
}
|
||||||
|
|
||||||
|
// ModTime implements storage.BlobInfo
|
||||||
|
func (i *BlobInfo) ModTime() time.Time {
|
||||||
|
return i.modTime
|
||||||
|
}
|
||||||
|
|
||||||
|
// Size implements storage.BlobInfo
|
||||||
|
func (i *BlobInfo) Size() int64 {
|
||||||
|
return i.size
|
||||||
|
}
|
|
@ -0,0 +1,65 @@
|
||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/keegancsmith/rpc"
|
||||||
|
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/server/blob"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type BlobStore struct {
|
||||||
|
client *rpc.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteBucket implements storage.BlobStore.
|
||||||
|
func (s *BlobStore) DeleteBucket(ctx context.Context, name string) error {
|
||||||
|
args := &blob.DeleteBucketArgs{
|
||||||
|
BucketName: name,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.client.Call(ctx, "Service.DeleteBucket", args, nil); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListBuckets implements storage.BlobStore.
|
||||||
|
func (s *BlobStore) ListBuckets(ctx context.Context) ([]string, error) {
|
||||||
|
args := &blob.ListBucketsArgs{}
|
||||||
|
|
||||||
|
reply := blob.ListBucketsReply{}
|
||||||
|
|
||||||
|
if err := s.client.Call(ctx, "Service.ListBuckets", args, &reply); err != nil {
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return reply.Buckets, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// OpenBucket implements storage.BlobStore.
|
||||||
|
func (s *BlobStore) OpenBucket(ctx context.Context, name string) (storage.BlobBucket, error) {
|
||||||
|
args := &blob.OpenBucketArgs{
|
||||||
|
BucketName: name,
|
||||||
|
}
|
||||||
|
reply := &blob.OpenBucketReply{}
|
||||||
|
|
||||||
|
if err := s.client.Call(ctx, "Service.OpenBucket", args, reply); err != nil {
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &BlobBucket{
|
||||||
|
name: name,
|
||||||
|
id: reply.BucketID,
|
||||||
|
client: s.client,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewBlobStore(client *rpc.Client) *BlobStore {
|
||||||
|
return &BlobStore{client}
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ storage.BlobStore = &BlobStore{}
|
|
@ -0,0 +1,102 @@
|
||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"net/http/httptest"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/keegancsmith/rpc"
|
||||||
|
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/server"
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage/sqlite"
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage/testsuite"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"gitlab.com/wpetit/goweb/logger"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestBlobStore(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
if testing.Verbose() {
|
||||||
|
logger.SetLevel(logger.LevelDebug)
|
||||||
|
}
|
||||||
|
|
||||||
|
httpServer, err := startNewBlobStoreServer()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("%+v", errors.WithStack(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
defer httpServer.Close()
|
||||||
|
|
||||||
|
serverAddr := httpServer.Listener.Addr()
|
||||||
|
|
||||||
|
client, err := rpc.DialHTTPPath(
|
||||||
|
serverAddr.Network(),
|
||||||
|
serverAddr.String(),
|
||||||
|
"",
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("%+v", errors.WithStack(err))
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
store := NewBlobStore(client)
|
||||||
|
|
||||||
|
testsuite.TestBlobStore(context.Background(), t, store)
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkBlobStore(t *testing.B) {
|
||||||
|
logger.SetLevel(logger.LevelError)
|
||||||
|
|
||||||
|
httpServer, err := startNewBlobStoreServer()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("%+v", errors.WithStack(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
defer httpServer.Close()
|
||||||
|
|
||||||
|
serverAddr := httpServer.Listener.Addr()
|
||||||
|
|
||||||
|
client, err := rpc.DialHTTPPath(
|
||||||
|
serverAddr.Network(),
|
||||||
|
serverAddr.String(),
|
||||||
|
"",
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("%+v", errors.WithStack(err))
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
store := NewBlobStore(client)
|
||||||
|
|
||||||
|
testsuite.BenchmarkBlobStore(t, store)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func getSQLiteBlobStore() (*sqlite.BlobStore, error) {
|
||||||
|
file := "./testdata/blobstore_test.sqlite"
|
||||||
|
|
||||||
|
if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) {
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
|
||||||
|
store := sqlite.NewBlobStore(dsn)
|
||||||
|
|
||||||
|
return store, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func startNewBlobStoreServer() (*httptest.Server, error) {
|
||||||
|
store, err := getSQLiteBlobStore()
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
server := server.NewBlobStoreServer(store)
|
||||||
|
|
||||||
|
httpServer := httptest.NewServer(server)
|
||||||
|
|
||||||
|
return httpServer, nil
|
||||||
|
}
|
|
@ -0,0 +1,107 @@
|
||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/gob"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/keegancsmith/rpc"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage/filter"
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/server/document"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
gob.Register(storage.Document{})
|
||||||
|
gob.Register(storage.DocumentID(""))
|
||||||
|
gob.Register(time.Time{})
|
||||||
|
gob.Register(map[string]interface{}{})
|
||||||
|
gob.Register([]interface{}{})
|
||||||
|
gob.Register([]map[string]interface{}{})
|
||||||
|
}
|
||||||
|
|
||||||
|
type DocumentStore struct {
|
||||||
|
client *rpc.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete implements storage.DocumentStore.
|
||||||
|
func (s *DocumentStore) Delete(ctx context.Context, collection string, id storage.DocumentID) error {
|
||||||
|
args := document.DeleteDocumentArgs{
|
||||||
|
Collection: collection,
|
||||||
|
DocumentID: id,
|
||||||
|
}
|
||||||
|
|
||||||
|
reply := document.DeleteDocumentReply{}
|
||||||
|
|
||||||
|
if err := s.client.Call(ctx, "Service.DeleteDocument", args, &reply); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get implements storage.DocumentStore.
|
||||||
|
func (s *DocumentStore) Get(ctx context.Context, collection string, id storage.DocumentID) (storage.Document, error) {
|
||||||
|
args := document.GetDocumentArgs{
|
||||||
|
Collection: collection,
|
||||||
|
DocumentID: id,
|
||||||
|
}
|
||||||
|
|
||||||
|
reply := document.GetDocumentReply{}
|
||||||
|
|
||||||
|
if err := s.client.Call(ctx, "Service.GetDocument", args, &reply); err != nil {
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return reply.Document, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Query implements storage.DocumentStore.
|
||||||
|
func (s *DocumentStore) Query(ctx context.Context, collection string, filter *filter.Filter, funcs ...storage.QueryOptionFunc) ([]storage.Document, error) {
|
||||||
|
opts := &storage.QueryOptions{}
|
||||||
|
for _, fn := range funcs {
|
||||||
|
fn(opts)
|
||||||
|
}
|
||||||
|
|
||||||
|
args := document.QueryDocumentsArgs{
|
||||||
|
Collection: collection,
|
||||||
|
Filter: nil,
|
||||||
|
Options: opts,
|
||||||
|
}
|
||||||
|
|
||||||
|
if filter != nil {
|
||||||
|
args.Filter = filter.AsMap()
|
||||||
|
}
|
||||||
|
|
||||||
|
reply := document.QueryDocumentsReply{}
|
||||||
|
|
||||||
|
if err := s.client.Call(ctx, "Service.QueryDocuments", args, &reply); err != nil {
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return reply.Documents, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Upsert implements storage.DocumentStore.
|
||||||
|
func (s *DocumentStore) Upsert(ctx context.Context, collection string, doc storage.Document) (storage.Document, error) {
|
||||||
|
args := document.UpsertDocumentArgs{
|
||||||
|
Collection: collection,
|
||||||
|
Document: doc,
|
||||||
|
}
|
||||||
|
|
||||||
|
reply := document.UpsertDocumentReply{}
|
||||||
|
|
||||||
|
if err := s.client.Call(ctx, "Service.UpsertDocument", args, &reply); err != nil {
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return reply.Document, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewDocumentStore(client *rpc.Client) *DocumentStore {
|
||||||
|
return &DocumentStore{client}
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ storage.DocumentStore = &DocumentStore{}
|
|
@ -0,0 +1,74 @@
|
||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"net/http/httptest"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/keegancsmith/rpc"
|
||||||
|
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/server"
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage/sqlite"
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage/testsuite"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"gitlab.com/wpetit/goweb/logger"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestDocumentStore(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
if testing.Verbose() {
|
||||||
|
logger.SetLevel(logger.LevelDebug)
|
||||||
|
}
|
||||||
|
|
||||||
|
httpServer, err := startNewDocumentStoreServer()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("%+v", errors.WithStack(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
defer httpServer.Close()
|
||||||
|
|
||||||
|
serverAddr := httpServer.Listener.Addr()
|
||||||
|
|
||||||
|
client, err := rpc.DialHTTPPath(
|
||||||
|
serverAddr.Network(),
|
||||||
|
serverAddr.String(),
|
||||||
|
"",
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("%+v", errors.WithStack(err))
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
store := NewDocumentStore(client)
|
||||||
|
|
||||||
|
testsuite.TestDocumentStore(context.Background(), t, store)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getSQLiteDocumentStore() (*sqlite.DocumentStore, error) {
|
||||||
|
file := "./testdata/documentstore_test.sqlite"
|
||||||
|
|
||||||
|
if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) {
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
|
||||||
|
store := sqlite.NewDocumentStore(dsn)
|
||||||
|
|
||||||
|
return store, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func startNewDocumentStoreServer() (*httptest.Server, error) {
|
||||||
|
store, err := getSQLiteDocumentStore()
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
server := server.NewDocumentStoreServer(store)
|
||||||
|
|
||||||
|
httpServer := httptest.NewServer(server)
|
||||||
|
|
||||||
|
return httpServer, nil
|
||||||
|
}
|
|
@ -0,0 +1 @@
|
||||||
|
/*.sqlite*
|
|
@ -0,0 +1,31 @@
|
||||||
|
package blob
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type CloseBucketArgs struct {
|
||||||
|
BucketID BucketID
|
||||||
|
}
|
||||||
|
|
||||||
|
type CloseBucketReply struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) CloseBucket(ctx context.Context, args *CloseBucketArgs, reply *CloseBucketReply) error {
|
||||||
|
bucket, err := s.getOpenedBucket(args.BucketID)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := bucket.Close(); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.buckets.Delete(args.BucketID)
|
||||||
|
|
||||||
|
*reply = CloseBucketReply{}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,31 @@
|
||||||
|
package blob
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type CloseReaderArgs struct {
|
||||||
|
ReaderID ReaderID
|
||||||
|
}
|
||||||
|
|
||||||
|
type CloseReaderReply struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) CloseReader(ctx context.Context, args *CloseReaderArgs, reply *CloseReaderReply) error {
|
||||||
|
reader, err := s.getOpenedReader(args.ReaderID)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := reader.Close(); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.readers.Delete(args.ReaderID)
|
||||||
|
|
||||||
|
*reply = CloseReaderReply{}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,31 @@
|
||||||
|
package blob
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type CloseWriterArgs struct {
|
||||||
|
WriterID WriterID
|
||||||
|
}
|
||||||
|
|
||||||
|
type CloseWriterReply struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) CloseWriter(ctx context.Context, args *CloseWriterArgs, reply *CloseWriterReply) error {
|
||||||
|
writer, err := s.getOpenedWriter(args.WriterID)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := writer.Close(); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.writers.Delete(args.WriterID)
|
||||||
|
|
||||||
|
*reply = CloseWriterReply{}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,22 @@
|
||||||
|
package blob
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type DeleteBucketArgs struct {
|
||||||
|
BucketName string
|
||||||
|
}
|
||||||
|
|
||||||
|
type DeleteBucketReply struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) DeleteBucket(ctx context.Context, args *DeleteBucketArgs, reply *DeleteBucketReply) error {
|
||||||
|
if err := s.store.DeleteBucket(ctx, args.BucketName); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,82 @@
|
||||||
|
package blob
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/gob"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
gob.Register(&BlobInfo{})
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetBlobInfoArgs struct {
|
||||||
|
BlobID storage.BlobID
|
||||||
|
BucketID BucketID
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetBlobInfoReply struct {
|
||||||
|
BlobInfo storage.BlobInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) GetBlobInfo(ctx context.Context, args *GetBlobInfoArgs, reply *GetBlobInfoReply) error {
|
||||||
|
bucket, err := s.getOpenedBucket(args.BucketID)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
blobInfo, err := bucket.Get(ctx, args.BlobID)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
*reply = GetBlobInfoReply{
|
||||||
|
BlobInfo: &BlobInfo{
|
||||||
|
Bucket_: blobInfo.Bucket(),
|
||||||
|
ContentType_: blobInfo.ContentType(),
|
||||||
|
BlobID_: blobInfo.ID(),
|
||||||
|
ModTime_: blobInfo.ModTime(),
|
||||||
|
Size_: blobInfo.Size(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type BlobInfo struct {
|
||||||
|
Bucket_ string
|
||||||
|
ContentType_ string
|
||||||
|
BlobID_ storage.BlobID
|
||||||
|
ModTime_ time.Time
|
||||||
|
Size_ int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// Bucket implements storage.BlobInfo.
|
||||||
|
func (bi *BlobInfo) Bucket() string {
|
||||||
|
return bi.Bucket_
|
||||||
|
}
|
||||||
|
|
||||||
|
// ContentType implements storage.BlobInfo.
|
||||||
|
func (bi *BlobInfo) ContentType() string {
|
||||||
|
return bi.ContentType_
|
||||||
|
}
|
||||||
|
|
||||||
|
// ID implements storage.BlobInfo.
|
||||||
|
func (bi *BlobInfo) ID() storage.BlobID {
|
||||||
|
return bi.BlobID_
|
||||||
|
}
|
||||||
|
|
||||||
|
// ModTime implements storage.BlobInfo.
|
||||||
|
func (bi *BlobInfo) ModTime() time.Time {
|
||||||
|
return bi.ModTime_
|
||||||
|
}
|
||||||
|
|
||||||
|
// Size implements storage.BlobInfo.
|
||||||
|
func (bi *BlobInfo) Size() int64 {
|
||||||
|
return bi.Size_
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ storage.BlobInfo = &BlobInfo{}
|
|
@ -0,0 +1,33 @@
|
||||||
|
package blob
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type GetBucketSizeArgs struct {
|
||||||
|
BucketID BucketID
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetBucketSizeReply struct {
|
||||||
|
Size int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) GetBucketSize(ctx context.Context, args *GetBucketSizeArgs, reply *GetBucketSizeReply) error {
|
||||||
|
bucket, err := s.getOpenedBucket(args.BucketID)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
size, err := bucket.Size(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
*reply = GetBucketSizeReply{
|
||||||
|
Size: size,
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,34 @@
|
||||||
|
package blob
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ListBlobInfoArgs struct {
|
||||||
|
BucketID BucketID
|
||||||
|
}
|
||||||
|
|
||||||
|
type ListBlobInfoReply struct {
|
||||||
|
BlobInfos []storage.BlobInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) ListBlobInfo(ctx context.Context, args *ListBlobInfoArgs, reply *ListBlobInfoReply) error {
|
||||||
|
bucket, err := s.getOpenedBucket(args.BucketID)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
blobInfos, err := bucket.List(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
*reply = ListBlobInfoReply{
|
||||||
|
BlobInfos: blobInfos,
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,27 @@
|
||||||
|
package blob
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ListBucketsArgs struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
type ListBucketsReply struct {
|
||||||
|
Buckets []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) ListBuckets(ctx context.Context, args *ListBucketsArgs, reply *ListBucketsReply) error {
|
||||||
|
buckets, err := s.store.ListBuckets(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
*reply = ListBucketsReply{
|
||||||
|
Buckets: buckets,
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,57 @@
|
||||||
|
package blob
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type NewBlobReaderArgs struct {
|
||||||
|
BlobID storage.BlobID
|
||||||
|
BucketID BucketID
|
||||||
|
}
|
||||||
|
|
||||||
|
type NewBlobReaderReply struct {
|
||||||
|
ReaderID ReaderID
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) NewBlobReader(ctx context.Context, args *NewBlobReaderArgs, reply *NewBlobReaderReply) error {
|
||||||
|
bucket, err := s.getOpenedBucket(args.BucketID)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
readerID, err := NewReaderID()
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
reader, err := bucket.NewReader(ctx, args.BlobID)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.readers.Store(readerID, reader)
|
||||||
|
|
||||||
|
*reply = NewBlobReaderReply{
|
||||||
|
ReaderID: readerID,
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) getOpenedReader(id ReaderID) (io.ReadSeekCloser, error) {
|
||||||
|
raw, exists := s.readers.Load(id)
|
||||||
|
if !exists {
|
||||||
|
return nil, errors.Errorf("could not find writer '%s'", id)
|
||||||
|
}
|
||||||
|
|
||||||
|
reader, ok := raw.(io.ReadSeekCloser)
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.Errorf("unexpected type '%T' for writer", raw)
|
||||||
|
}
|
||||||
|
|
||||||
|
return reader, nil
|
||||||
|
}
|
|
@ -0,0 +1,57 @@
|
||||||
|
package blob
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type NewBlobWriterArgs struct {
|
||||||
|
BlobID storage.BlobID
|
||||||
|
BucketID BucketID
|
||||||
|
}
|
||||||
|
|
||||||
|
type NewBlobWriterReply struct {
|
||||||
|
WriterID WriterID
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) NewBlobWriter(ctx context.Context, args *NewBlobWriterArgs, reply *NewBlobWriterReply) error {
|
||||||
|
bucket, err := s.getOpenedBucket(args.BucketID)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
writerID, err := NewWriterID()
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
writer, err := bucket.NewWriter(ctx, args.BlobID)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.writers.Store(writerID, writer)
|
||||||
|
|
||||||
|
*reply = NewBlobWriterReply{
|
||||||
|
WriterID: writerID,
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) getOpenedWriter(id WriterID) (io.WriteCloser, error) {
|
||||||
|
raw, exists := s.writers.Load(id)
|
||||||
|
if !exists {
|
||||||
|
return nil, errors.Errorf("could not find writer '%s'", id)
|
||||||
|
}
|
||||||
|
|
||||||
|
writer, ok := raw.(io.WriteCloser)
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.Errorf("unexpected type '%T' for writer", raw)
|
||||||
|
}
|
||||||
|
|
||||||
|
return writer, nil
|
||||||
|
}
|
|
@ -0,0 +1,50 @@
|
||||||
|
package blob
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type OpenBucketArgs struct {
|
||||||
|
BucketName string
|
||||||
|
}
|
||||||
|
|
||||||
|
type OpenBucketReply struct {
|
||||||
|
BucketID BucketID
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) OpenBucket(ctx context.Context, args *OpenBucketArgs, reply *OpenBucketReply) error {
|
||||||
|
bucket, err := s.store.OpenBucket(ctx, args.BucketName)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
bucketID, err := NewBucketID()
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.buckets.Store(bucketID, bucket)
|
||||||
|
|
||||||
|
*reply = OpenBucketReply{
|
||||||
|
BucketID: bucketID,
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) getOpenedBucket(id BucketID) (storage.BlobBucket, error) {
|
||||||
|
raw, exists := s.buckets.Load(id)
|
||||||
|
if !exists {
|
||||||
|
return nil, errors.WithStack(storage.ErrBucketClosed)
|
||||||
|
}
|
||||||
|
|
||||||
|
bucket, ok := raw.(storage.BlobBucket)
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.Errorf("unexpected type '%T' for blob bucket", raw)
|
||||||
|
}
|
||||||
|
|
||||||
|
return bucket, nil
|
||||||
|
}
|
|
@ -0,0 +1,41 @@
|
||||||
|
package blob
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ReadBlobArgs struct {
|
||||||
|
ReaderID ReaderID
|
||||||
|
Length int
|
||||||
|
}
|
||||||
|
|
||||||
|
type ReadBlobReply struct {
|
||||||
|
Data []byte
|
||||||
|
Read int
|
||||||
|
EOF bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) ReadBlob(ctx context.Context, args *ReadBlobArgs, reply *ReadBlobReply) error {
|
||||||
|
reader, err := s.getOpenedReader(args.ReaderID)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
buff := make([]byte, args.Length)
|
||||||
|
|
||||||
|
read, err := reader.Read(buff)
|
||||||
|
if err != nil && !errors.Is(err, io.EOF) {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
*reply = ReadBlobReply{
|
||||||
|
Read: read,
|
||||||
|
Data: buff,
|
||||||
|
EOF: errors.Is(err, io.EOF),
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,38 @@
|
||||||
|
package blob
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SeekBlobArgs struct {
|
||||||
|
ReaderID ReaderID
|
||||||
|
Offset int64
|
||||||
|
Whence int
|
||||||
|
}
|
||||||
|
|
||||||
|
type SeekBlobReply struct {
|
||||||
|
Read int64
|
||||||
|
EOF bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) SeekBlob(ctx context.Context, args *SeekBlobArgs, reply *SeekBlobReply) error {
|
||||||
|
reader, err := s.getOpenedReader(args.ReaderID)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
read, err := reader.Seek(args.Offset, args.Whence)
|
||||||
|
if err != nil && !errors.Is(err, io.EOF) {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
*reply = SeekBlobReply{
|
||||||
|
Read: read,
|
||||||
|
EOF: errors.Is(err, io.EOF),
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,60 @@
|
||||||
|
package blob
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type BucketID string
|
||||||
|
type WriterID string
|
||||||
|
type ReaderID string
|
||||||
|
|
||||||
|
type Service struct {
|
||||||
|
store storage.BlobStore
|
||||||
|
buckets sync.Map
|
||||||
|
writers sync.Map
|
||||||
|
readers sync.Map
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewService(store storage.BlobStore) *Service {
|
||||||
|
return &Service{
|
||||||
|
store: store,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewBucketID() (BucketID, error) {
|
||||||
|
uuid, err := uuid.NewUUID()
|
||||||
|
if err != nil {
|
||||||
|
return "", errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
id := BucketID(fmt.Sprintf("bucket-%s", uuid.String()))
|
||||||
|
|
||||||
|
return id, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewWriterID() (WriterID, error) {
|
||||||
|
uuid, err := uuid.NewUUID()
|
||||||
|
if err != nil {
|
||||||
|
return "", errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
id := WriterID(fmt.Sprintf("writer-%s", uuid.String()))
|
||||||
|
|
||||||
|
return id, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewReaderID() (ReaderID, error) {
|
||||||
|
uuid, err := uuid.NewUUID()
|
||||||
|
if err != nil {
|
||||||
|
return "", errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
id := ReaderID(fmt.Sprintf("reader-%s", uuid.String()))
|
||||||
|
|
||||||
|
return id, nil
|
||||||
|
}
|
|
@ -0,0 +1,34 @@
|
||||||
|
package blob
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type WriteBlobArgs struct {
|
||||||
|
WriterID WriterID
|
||||||
|
Data []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
type WriteBlobReply struct {
|
||||||
|
Written int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) WriteBlob(ctx context.Context, args *WriteBlobArgs, reply *WriteBlobReply) error {
|
||||||
|
writer, err := s.getOpenedWriter(args.WriterID)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
written, err := writer.Write(args.Data)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
*reply = WriteBlobReply{
|
||||||
|
Written: written,
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,26 @@
|
||||||
|
package document
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type DeleteDocumentArgs struct {
|
||||||
|
Collection string
|
||||||
|
DocumentID storage.DocumentID
|
||||||
|
}
|
||||||
|
|
||||||
|
type DeleteDocumentReply struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) DeleteDocument(ctx context.Context, args *DeleteDocumentArgs, reply *DeleteDocumentReply) error {
|
||||||
|
if err := s.store.Delete(ctx, args.Collection, args.DocumentID); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
*reply = DeleteDocumentReply{}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,30 @@
|
||||||
|
package document
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type GetDocumentArgs struct {
|
||||||
|
Collection string
|
||||||
|
DocumentID storage.DocumentID
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetDocumentReply struct {
|
||||||
|
Document storage.Document
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) GetDocument(ctx context.Context, args *GetDocumentArgs, reply *GetDocumentReply) error {
|
||||||
|
document, err := s.store.Get(ctx, args.Collection, args.DocumentID)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
*reply = GetDocumentReply{
|
||||||
|
Document: document,
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,53 @@
|
||||||
|
package document
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage/filter"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type QueryDocumentsArgs struct {
|
||||||
|
Collection string
|
||||||
|
Filter map[string]any
|
||||||
|
Options *storage.QueryOptions
|
||||||
|
}
|
||||||
|
|
||||||
|
type QueryDocumentsReply struct {
|
||||||
|
Documents []storage.Document
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) QueryDocuments(ctx context.Context, args *QueryDocumentsArgs, reply *QueryDocumentsReply) error {
|
||||||
|
var (
|
||||||
|
argsFilter *filter.Filter
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
|
||||||
|
if args.Filter != nil {
|
||||||
|
argsFilter, err = filter.NewFrom(args.Filter)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
documents, err := s.store.Query(ctx, args.Collection, argsFilter, withQueryOptions(args.Options))
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
*reply = QueryDocumentsReply{
|
||||||
|
Documents: documents,
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func withQueryOptions(opts *storage.QueryOptions) storage.QueryOptionFunc {
|
||||||
|
return func(o *storage.QueryOptions) {
|
||||||
|
o.Limit = opts.Limit
|
||||||
|
o.Offset = opts.Offset
|
||||||
|
o.OrderBy = opts.OrderBy
|
||||||
|
o.OrderDirection = opts.OrderDirection
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,11 @@
|
||||||
|
package document
|
||||||
|
|
||||||
|
import "forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
|
|
||||||
|
type Service struct {
|
||||||
|
store storage.DocumentStore
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewService(store storage.DocumentStore) *Service {
|
||||||
|
return &Service{store}
|
||||||
|
}
|
|
@ -0,0 +1,30 @@
|
||||||
|
package document
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type UpsertDocumentArgs struct {
|
||||||
|
Collection string
|
||||||
|
Document storage.Document
|
||||||
|
}
|
||||||
|
|
||||||
|
type UpsertDocumentReply struct {
|
||||||
|
Document storage.Document
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) UpsertDocument(ctx context.Context, args *UpsertDocumentArgs, reply *UpsertDocumentReply) error {
|
||||||
|
document, err := s.store.Upsert(ctx, args.Collection, args.Document)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
*reply = UpsertDocumentReply{
|
||||||
|
Document: document,
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,21 @@
|
||||||
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/keegancsmith/rpc"
|
||||||
|
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/server/blob"
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/server/document"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewBlobStoreServer(store storage.BlobStore) *rpc.Server {
|
||||||
|
server := rpc.NewServer()
|
||||||
|
server.Register(blob.NewService(store))
|
||||||
|
return server
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewDocumentStoreServer(store storage.DocumentStore) *rpc.Server {
|
||||||
|
server := rpc.NewServer()
|
||||||
|
server.Register(document.NewService(store))
|
||||||
|
return server
|
||||||
|
}
|
|
@ -1,6 +1,7 @@
|
||||||
package sqlite
|
package sqlite
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -13,7 +14,9 @@ import (
|
||||||
|
|
||||||
func TestBlobStore(t *testing.T) {
|
func TestBlobStore(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
logger.SetLevel(logger.LevelDebug)
|
if testing.Verbose() {
|
||||||
|
logger.SetLevel(logger.LevelDebug)
|
||||||
|
}
|
||||||
|
|
||||||
file := "./testdata/blobstore_test.sqlite"
|
file := "./testdata/blobstore_test.sqlite"
|
||||||
|
|
||||||
|
@ -24,5 +27,20 @@ func TestBlobStore(t *testing.T) {
|
||||||
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
|
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
|
||||||
store := NewBlobStore(dsn)
|
store := NewBlobStore(dsn)
|
||||||
|
|
||||||
testsuite.TestBlobStore(t, store)
|
testsuite.TestBlobStore(context.Background(), t, store)
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkBlobStore(t *testing.B) {
|
||||||
|
logger.SetLevel(logger.LevelError)
|
||||||
|
|
||||||
|
file := "./testdata/blobstore_test.sqlite"
|
||||||
|
|
||||||
|
if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) {
|
||||||
|
t.Fatalf("%+v", errors.WithStack(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
|
||||||
|
store := NewBlobStore(dsn)
|
||||||
|
|
||||||
|
testsuite.BenchmarkBlobStore(t, store)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package sqlite
|
package sqlite
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -24,5 +25,5 @@ func TestDocumentStore(t *testing.T) {
|
||||||
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
|
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
|
||||||
store := NewDocumentStore(dsn)
|
store := NewDocumentStore(dsn)
|
||||||
|
|
||||||
testsuite.TestDocumentStore(t, store)
|
testsuite.TestDocumentStore(context.Background(), t, store)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,14 +1,14 @@
|
||||||
package testsuite
|
package testsuite
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestBlobStore(t *testing.T, store storage.BlobStore) {
|
func TestBlobStore(ctx context.Context, t *testing.T, store storage.BlobStore) {
|
||||||
t.Run("Ops", func(t *testing.T) {
|
t.Run("Ops", func(t *testing.T) {
|
||||||
t.Parallel()
|
testBlobStoreOps(ctx, t, store)
|
||||||
testBlobStoreOps(t, store)
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,79 @@
|
||||||
|
package testsuite
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
func BenchmarkBlobStore(t *testing.B, store storage.BlobStore) {
|
||||||
|
t.Run("BlobCreateUpdateReadDelete", func(t *testing.B) {
|
||||||
|
|
||||||
|
for i := 0; i < t.N; i++ {
|
||||||
|
bucketName := fmt.Sprintf("bucket-%d", i)
|
||||||
|
if err := runBlobCreateUpdateReadDelete(store, bucketName); err != nil {
|
||||||
|
t.Fatalf("%+v", errors.WithStack(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func runBlobCreateUpdateReadDelete(store storage.BlobStore, bucketName string) error {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
bucket, err := store.OpenBucket(ctx, bucketName)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
blobID := storage.NewBlobID()
|
||||||
|
|
||||||
|
writer, err := bucket.NewWriter(ctx, blobID)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
data := []byte("foo")
|
||||||
|
|
||||||
|
if _, err = writer.Write(data); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := writer.Close(); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
reader, err := bucket.NewReader(ctx, blobID)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var buf bytes.Buffer
|
||||||
|
|
||||||
|
if _, err = io.Copy(&buf, reader); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := reader.Close(); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := bucket.Delete(ctx, blobID); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := bucket.Close(); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := store.DeleteBucket(ctx, bucketName); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -17,9 +17,11 @@ type blobStoreTestCase struct {
|
||||||
|
|
||||||
var blobStoreTestCases = []blobStoreTestCase{
|
var blobStoreTestCases = []blobStoreTestCase{
|
||||||
{
|
{
|
||||||
Name: "Open new bucket",
|
Name: "Open then delete bucket",
|
||||||
Run: func(ctx context.Context, store storage.BlobStore) error {
|
Run: func(ctx context.Context, store storage.BlobStore) error {
|
||||||
bucket, err := store.OpenBucket(ctx, "open-new-bucket")
|
bucketName := "open-new-bucket"
|
||||||
|
|
||||||
|
bucket, err := store.OpenBucket(ctx, bucketName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
@ -50,6 +52,23 @@ var blobStoreTestCases = []blobStoreTestCase{
|
||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := store.DeleteBucket(ctx, bucketName); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
buckets, err := store.ListBuckets(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, b := range buckets {
|
||||||
|
if b != bucketName {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
return errors.Errorf("bucket '%s' should be deleted", bucketName)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -112,14 +131,12 @@ var blobStoreTestCases = []blobStoreTestCase{
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
func testBlobStoreOps(t *testing.T, store storage.BlobStore) {
|
func testBlobStoreOps(ctx context.Context, t *testing.T, store storage.BlobStore) {
|
||||||
for _, tc := range blobStoreTestCases {
|
for _, tc := range blobStoreTestCases {
|
||||||
func(tc blobStoreTestCase) {
|
func(tc blobStoreTestCase) {
|
||||||
t.Run(tc.Name, func(t *testing.T) {
|
t.Run(tc.Name, func(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
if err := tc.Run(ctx, store); err != nil {
|
if err := tc.Run(ctx, store); err != nil {
|
||||||
t.Errorf("%+v", errors.WithStack(err))
|
t.Errorf("%+v", errors.WithStack(err))
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,14 +1,14 @@
|
||||||
package testsuite
|
package testsuite
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestDocumentStore(t *testing.T, store storage.DocumentStore) {
|
func TestDocumentStore(ctx context.Context, t *testing.T, store storage.DocumentStore) {
|
||||||
t.Run("Ops", func(t *testing.T) {
|
t.Run("Ops", func(t *testing.T) {
|
||||||
t.Parallel()
|
testDocumentStoreOps(ctx, t, store)
|
||||||
testDocumentStoreOps(t, store)
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -433,12 +433,12 @@ var documentStoreOpsTestCases = []documentStoreOpsTestCase{
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
func testDocumentStoreOps(t *testing.T, store storage.DocumentStore) {
|
func testDocumentStoreOps(ctx context.Context, t *testing.T, store storage.DocumentStore) {
|
||||||
for _, tc := range documentStoreOpsTestCases {
|
for _, tc := range documentStoreOpsTestCases {
|
||||||
func(tc documentStoreOpsTestCase) {
|
func(tc documentStoreOpsTestCase) {
|
||||||
t.Run(tc.Name, func(t *testing.T) {
|
t.Run(tc.Name, func(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
if err := tc.Run(context.Background(), store); err != nil {
|
if err := tc.Run(ctx, store); err != nil {
|
||||||
t.Errorf("%+v", errors.WithStack(err))
|
t.Errorf("%+v", errors.WithStack(err))
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in New Issue