Compare commits
1 Commits
19539ec34a
...
32aaf48792
Author | SHA1 | Date |
---|---|---|
wpetit | 32aaf48792 |
|
@ -1 +1,3 @@
|
|||
RUN_APP_ARGS=""
|
||||
RUN_APP_ARGS=""
|
||||
#EDGE_DOCUMENTSTORE_DSN="rpc://localhost:3001/documentstore?tenant=local&appId=%APPID%"
|
||||
#EDGE_BLOBSTORE_DSN="rpc://localhost:3001/blobstore?tenant=local&appId=%APPID%"
|
|
@ -4,7 +4,6 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
|
@ -13,8 +12,6 @@ import (
|
|||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/keegancsmith/rpc"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/app"
|
||||
"forge.cadoles.com/arcad/edge/pkg/bus"
|
||||
"forge.cadoles.com/arcad/edge/pkg/bus/memory"
|
||||
|
@ -32,7 +29,7 @@ import (
|
|||
shareModule "forge.cadoles.com/arcad/edge/pkg/module/share"
|
||||
shareSqlite "forge.cadoles.com/arcad/edge/pkg/module/share/sqlite"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/client"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/registry"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/bundle"
|
||||
|
@ -47,6 +44,10 @@ import (
|
|||
|
||||
_ "forge.cadoles.com/arcad/edge/pkg/module/auth/http/passwd/argon2id"
|
||||
_ "forge.cadoles.com/arcad/edge/pkg/module/auth/http/passwd/plain"
|
||||
|
||||
// Register storage schemes
|
||||
_ "forge.cadoles.com/arcad/edge/pkg/storage/rpc/registry"
|
||||
_ "forge.cadoles.com/arcad/edge/pkg/storage/sqlite/registry"
|
||||
)
|
||||
|
||||
func RunCommand() *cli.Command {
|
||||
|
@ -77,9 +78,16 @@ func RunCommand() *cli.Command {
|
|||
Value: 0,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "storage-file",
|
||||
Usage: "use `FILE` for SQLite storage database",
|
||||
Value: ".edge/%APPID%/data.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=60000",
|
||||
Name: "blobstore-dsn",
|
||||
Usage: "use `DSN` for blob storage",
|
||||
EnvVars: []string{"EDGE_BLOBSTORE_DSN"},
|
||||
Value: "sqlite://.edge/%APPID%/data.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=60000",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "documentstore-dsn",
|
||||
Usage: "use `DSN` for document storage",
|
||||
EnvVars: []string{"EDGE_DOCUMENTSTORE_DSN"},
|
||||
Value: "sqlite://.edge/%APPID%/data.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=60000",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "shared-resources-file",
|
||||
|
@ -98,7 +106,8 @@ func RunCommand() *cli.Command {
|
|||
|
||||
logFormat := ctx.String("log-format")
|
||||
logLevel := ctx.Int("log-level")
|
||||
storageFile := ctx.String("storage-file")
|
||||
blobstoreDSN := ctx.String("blobstore-dsn")
|
||||
documentstoreDSN := ctx.String("documentstore-dsn")
|
||||
accountsFile := ctx.String("accounts-file")
|
||||
sharedResourcesFile := ctx.String("shared-resources-file")
|
||||
|
||||
|
@ -146,7 +155,7 @@ func RunCommand() *cli.Command {
|
|||
|
||||
appCtx := logger.With(cmdCtx, logger.F("address", address))
|
||||
|
||||
if err := runApp(appCtx, path, address, storageFile, accountsFile, appsRepository, sharedResourcesFile); err != nil {
|
||||
if err := runApp(appCtx, path, address, documentstoreDSN, blobstoreDSN, accountsFile, appsRepository, sharedResourcesFile); err != nil {
|
||||
logger.Error(appCtx, "could not run app", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}(p, port, idx)
|
||||
|
@ -159,7 +168,7 @@ func RunCommand() *cli.Command {
|
|||
}
|
||||
}
|
||||
|
||||
func runApp(ctx context.Context, path string, address string, storageFile string, accountsFile string, appRepository appModule.Repository, sharedResourcesFile string) error {
|
||||
func runApp(ctx context.Context, path, address, documentStoreDSN, blobStoreDSN, accountsFile string, appRepository appModule.Repository, sharedResourcesFile string) error {
|
||||
absPath, err := filepath.Abs(path)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "could not resolve path '%s'", path)
|
||||
|
@ -192,7 +201,7 @@ func runApp(ctx context.Context, path string, address string, storageFile string
|
|||
deps := &moduleDeps{}
|
||||
funcs := []ModuleDepFunc{
|
||||
initMemoryBus,
|
||||
initDatastores(storageFile, manifest.ID),
|
||||
initDatastores(documentStoreDSN, blobStoreDSN, manifest.ID),
|
||||
initAccounts(accountsFile, manifest.ID),
|
||||
initShareRepository(sharedResourcesFile),
|
||||
initAppRepository(appRepository),
|
||||
|
@ -325,10 +334,10 @@ func loadLocalAccounts(path string) ([]authHTTP.LocalAccount, error) {
|
|||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
data, err := ioutil.ReadFile(path)
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
if err := ioutil.WriteFile(path, defaultAccounts, 0o640); err != nil {
|
||||
if err := os.WriteFile(path, defaultAccounts, 0o640); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
|
@ -439,43 +448,25 @@ func initMemoryBus(deps *moduleDeps) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func initDatastores(storageFile string, appID app.ID) ModuleDepFunc {
|
||||
func initDatastores(documentStoreDSN, blobStoreDSN string, appID app.ID) ModuleDepFunc {
|
||||
return func(deps *moduleDeps) error {
|
||||
// storageFile = injectAppID(storageFile, appID)
|
||||
documentStoreDSN = injectAppID(documentStoreDSN, appID)
|
||||
|
||||
blobStoreClient, err := rpc.DialHTTPPath(
|
||||
"tcp",
|
||||
"localhost:3001",
|
||||
injectAppID("/blobstore?tenant=%APPID%", appID),
|
||||
)
|
||||
documentStore, err := registry.NewDocumentStore(documentStoreDSN)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
deps.BlobStore = client.NewBlobStore(blobStoreClient)
|
||||
deps.DocumentStore = documentStore
|
||||
|
||||
documentStoreClient, err := rpc.DialHTTPPath(
|
||||
"tcp",
|
||||
"localhost:3001",
|
||||
injectAppID("/documentstore?tenant=%APPID%", appID),
|
||||
)
|
||||
blobStoreDSN = injectAppID(blobStoreDSN, appID)
|
||||
|
||||
blobStore, err := registry.NewBlobStore(blobStoreDSN)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
deps.DocumentStore = client.NewDocumentStore(documentStoreClient)
|
||||
|
||||
// if err := ensureDir(storageFile); err != nil {
|
||||
// return errors.WithStack(err)
|
||||
// }
|
||||
|
||||
// db, err := storageSqlite.Open(storageFile)
|
||||
// if err != nil {
|
||||
// return errors.WithStack(err)
|
||||
// }
|
||||
|
||||
// deps.DocumentStore = storageSqlite.NewDocumentStoreWithDB(db)
|
||||
// deps.BlobStore = storageSqlite.NewBlobStoreWithDB(db)
|
||||
deps.BlobStore = blobStore
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -3,20 +3,24 @@ package command
|
|||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/golang-lru/v2/expirable"
|
||||
"github.com/keegancsmith/rpc"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/registry"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/server"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/sqlite"
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/go-chi/chi/v5/middleware"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/urfave/cli/v2"
|
||||
|
||||
// Register storage schemes
|
||||
_ "forge.cadoles.com/arcad/edge/pkg/storage/rpc/registry"
|
||||
_ "forge.cadoles.com/arcad/edge/pkg/storage/sqlite/registry"
|
||||
)
|
||||
|
||||
func Run() *cli.Command {
|
||||
|
@ -30,18 +34,32 @@ func Run() *cli.Command {
|
|||
Value: ":3001",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "data-dir",
|
||||
Value: "./data",
|
||||
Name: "blobstore-dsn-pattern",
|
||||
EnvVars: []string{"STORAGE_SERVER_BLOBSTORE_DSN_PATTERN"},
|
||||
Value: fmt.Sprintf("sqlite://data/%%TENANT%%/%%APPID%%/blobstore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", (60 * time.Second).Milliseconds()),
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "dsn-query-string",
|
||||
Value: fmt.Sprintf("_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", (60 * time.Second).Milliseconds()),
|
||||
Name: "documentstore-dsn-pattern",
|
||||
EnvVars: []string{"STORAGE_SERVER_DOCUMENTSTORE_DSN_PATTERN"},
|
||||
Value: fmt.Sprintf("sqlite://data/%%TENANT%%/%%APPID%%/documentstore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", (60 * time.Second).Milliseconds()),
|
||||
},
|
||||
&cli.DurationFlag{
|
||||
Name: "cache-ttl",
|
||||
EnvVars: []string{"STORAGE_SERVER_CACHE_TTL"},
|
||||
Value: time.Hour,
|
||||
},
|
||||
&cli.IntFlag{
|
||||
Name: "cache-size",
|
||||
EnvVars: []string{"STORAGE_SERVER_CACHE_SIZE"},
|
||||
Value: 32,
|
||||
},
|
||||
},
|
||||
Action: func(ctx *cli.Context) error {
|
||||
addr := ctx.String("address")
|
||||
dataDir := ctx.String("data-dir")
|
||||
dsnQueryString := ctx.String("dsn-query-string")
|
||||
blobstoreDSNPattern := ctx.String("blobstore-dsn-pattern")
|
||||
documentstoreDSNPattern := ctx.String("documentstore-dsn-pattern")
|
||||
cacheSize := ctx.Int("cache-size")
|
||||
cacheTTL := ctx.Duration("cache-ttl")
|
||||
|
||||
router := chi.NewRouter()
|
||||
|
||||
|
@ -56,7 +74,14 @@ func Run() *cli.Command {
|
|||
return
|
||||
}
|
||||
|
||||
server, err := getBlobStoreStoreServer(dataDir, tenant, dsnQueryString)
|
||||
appID := r.URL.Query().Get("appId")
|
||||
if tenant == "" {
|
||||
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
server, err := getBlobStoreStoreServer(cacheSize, cacheTTL, tenant, appID, blobstoreDSNPattern)
|
||||
if err != nil {
|
||||
logger.Error(r.Context(), "could not retrieve blob store server", logger.E(errors.WithStack(err)), logger.F("tenant", tenant))
|
||||
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
|
||||
|
@ -65,7 +90,6 @@ func Run() *cli.Command {
|
|||
}
|
||||
|
||||
server.ServeHTTP(w, r)
|
||||
|
||||
}))
|
||||
|
||||
router.Handle("/documentstore", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
|
@ -76,7 +100,14 @@ func Run() *cli.Command {
|
|||
return
|
||||
}
|
||||
|
||||
server, err := getDocumentStoreServer(dataDir, tenant, dsnQueryString)
|
||||
appID := r.URL.Query().Get("appId")
|
||||
if tenant == "" {
|
||||
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
server, err := getDocumentStoreServer(cacheSize, cacheTTL, tenant, appID, documentstoreDSNPattern)
|
||||
if err != nil {
|
||||
logger.Error(r.Context(), "could not retrieve document store server", logger.E(errors.WithStack(err)), logger.F("tenant", tenant))
|
||||
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
|
||||
|
@ -96,50 +127,66 @@ func Run() *cli.Command {
|
|||
}
|
||||
}
|
||||
|
||||
var documentStoreTenants sync.Map
|
||||
var (
|
||||
documentStoreCache *expirable.LRU[string, *rpc.Server]
|
||||
initDocumentStoreCache sync.Once
|
||||
)
|
||||
|
||||
func getDocumentStoreServer(dataDir, tenant, dsnQueryString string) (*rpc.Server, error) {
|
||||
dir := filepath.Join(dataDir, tenant)
|
||||
if err := os.MkdirAll(dir, os.FileMode(0750)); err != nil {
|
||||
func getDocumentStoreServer(cacheSize int, cacheTTL time.Duration, tenant, appID, dsnPattern string) (*rpc.Server, error) {
|
||||
initDocumentStoreCache.Do(func() {
|
||||
documentStoreCache = expirable.NewLRU[string, *rpc.Server](cacheSize, nil, cacheTTL)
|
||||
})
|
||||
|
||||
key := fmt.Sprintf("%s:%s", tenant, appID)
|
||||
|
||||
documentStoreServer, _ := documentStoreCache.Get(key)
|
||||
if documentStoreServer != nil {
|
||||
return documentStoreServer, nil
|
||||
}
|
||||
|
||||
dsn := strings.ReplaceAll(dsnPattern, "%TENANT%", tenant)
|
||||
dsn = strings.ReplaceAll(dsn, "%APPID%", appID)
|
||||
|
||||
documentStore, err := registry.NewDocumentStore(dsn)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
file := filepath.Join(dir, "documentstore.sqlite")
|
||||
dsn := fmt.Sprintf("%s?%s", file, dsnQueryString)
|
||||
documentStoreServer = server.NewDocumentStoreServer(documentStore)
|
||||
|
||||
documentStore := sqlite.NewDocumentStore(dsn)
|
||||
documentStoreServer := server.NewDocumentStoreServer(documentStore)
|
||||
|
||||
rawDocumentStoreServer, _ := documentStoreTenants.LoadOrStore(tenant, documentStoreServer)
|
||||
|
||||
documentStoreServer, ok := rawDocumentStoreServer.(*rpc.Server)
|
||||
if !ok {
|
||||
return nil, errors.Errorf("unexpected document store server value of type '%T'", rawDocumentStoreServer)
|
||||
}
|
||||
documentStoreCache.Add(key, documentStoreServer)
|
||||
|
||||
return documentStoreServer, nil
|
||||
}
|
||||
|
||||
var blobStoreTenants sync.Map
|
||||
var (
|
||||
blobStoreCache *expirable.LRU[string, *rpc.Server]
|
||||
initBlobStoreCache sync.Once
|
||||
)
|
||||
|
||||
func getBlobStoreStoreServer(dataDir, tenant, dsnQueryString string) (*rpc.Server, error) {
|
||||
dir := filepath.Join(dataDir, tenant)
|
||||
if err := os.MkdirAll(dir, os.FileMode(0750)); err != nil {
|
||||
func getBlobStoreStoreServer(cacheSize int, cacheTTL time.Duration, tenant, appID, dsnPattern string) (*rpc.Server, error) {
|
||||
initBlobStoreCache.Do(func() {
|
||||
blobStoreCache = expirable.NewLRU[string, *rpc.Server](cacheSize, nil, cacheTTL)
|
||||
})
|
||||
|
||||
key := fmt.Sprintf("%s:%s", tenant, appID)
|
||||
|
||||
blobStoreServer, _ := blobStoreCache.Get(key)
|
||||
if blobStoreServer != nil {
|
||||
return blobStoreServer, nil
|
||||
}
|
||||
|
||||
dsn := strings.ReplaceAll(dsnPattern, "%TENANT%", tenant)
|
||||
dsn = strings.ReplaceAll(dsn, "%APPID%", appID)
|
||||
|
||||
blobStore, err := registry.NewBlobStore(dsn)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
file := filepath.Join(dir, "blobstore.sqlite")
|
||||
dsn := fmt.Sprintf("%s?%s", file, dsnQueryString)
|
||||
blobStoreServer = server.NewBlobStoreServer(blobStore)
|
||||
|
||||
blobStore := sqlite.NewBlobStore(dsn)
|
||||
blobStoreServer := server.NewBlobStoreServer(blobStore)
|
||||
|
||||
rawBlobStoreServer, _ := blobStoreTenants.LoadOrStore(tenant, blobStoreServer)
|
||||
|
||||
blobStoreServer, ok := rawBlobStoreServer.(*rpc.Server)
|
||||
if !ok {
|
||||
return nil, errors.Errorf("unexpected document store server value of type '%T'", rawBlobStoreServer)
|
||||
}
|
||||
blobStoreCache.Add(key, blobStoreServer)
|
||||
|
||||
return blobStoreServer, nil
|
||||
}
|
||||
|
|
1
go.mod
1
go.mod
|
@ -15,6 +15,7 @@ require (
|
|||
github.com/go-playground/universal-translator v0.16.0 // indirect
|
||||
github.com/goccy/go-json v0.9.11 // indirect
|
||||
github.com/gogo/protobuf v0.0.0-20161014173244-50d1bd39ce4e // indirect
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.6 // indirect
|
||||
github.com/keegancsmith/rpc v1.3.0 // indirect
|
||||
github.com/leodido/go-urn v1.1.0 // indirect
|
||||
github.com/lestrrat-go/blackmagic v1.0.1 // indirect
|
||||
|
|
2
go.sum
2
go.sum
|
@ -188,6 +188,8 @@ github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad
|
|||
github.com/hashicorp/go.net v0.0.0-20151006203346-104dcad90073/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
|
||||
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
|
||||
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.6 h1:3xi/Cafd1NaoEnS/yDssIiuVeDVywU0QdFGl3aQaQHM=
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.6/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
|
||||
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
|
||||
github.com/hashicorp/mdns v0.0.0-20151206042412-9d85cf22f9f8/go.mod h1:aa76Av3qgPeIQp9Y3qIkTBPieQYNkQ13Kxe7pze9Wb0=
|
||||
github.com/hashicorp/mdns v1.0.5 h1:1M5hW1cunYeoXOqHwEb/GBDDHAFo0Yqb/uz/beC6LbE=
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
package registry
|
||||
|
||||
import (
|
||||
"net/url"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
var blobStoreFactories = make(map[string]BlobStoreFactory, 0)
|
||||
|
||||
type BlobStoreFactory func(url *url.URL) (storage.BlobStore, error)
|
||||
|
||||
func AddBlobStoreFactory(scheme string, factory BlobStoreFactory) {
|
||||
blobStoreFactories[scheme] = factory
|
||||
}
|
||||
|
||||
func NewBlobStore(dsn string) (storage.BlobStore, error) {
|
||||
url, err := url.Parse(dsn)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
factory, exists := blobStoreFactories[url.Scheme]
|
||||
if !exists {
|
||||
return nil, errors.WithStack(ErrSchemeNotRegistered)
|
||||
}
|
||||
|
||||
store, err := factory(url)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return store, nil
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
package registry
|
||||
|
||||
import (
|
||||
"net/url"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
var documentStoreFactories = make(map[string]DocumentStoreFactory, 0)
|
||||
|
||||
type DocumentStoreFactory func(url *url.URL) (storage.DocumentStore, error)
|
||||
|
||||
func AddDocumentStoreFactory(scheme string, factory DocumentStoreFactory) {
|
||||
documentStoreFactories[scheme] = factory
|
||||
}
|
||||
|
||||
func NewDocumentStore(dsn string) (storage.DocumentStore, error) {
|
||||
url, err := url.Parse(dsn)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
factory, exists := documentStoreFactories[url.Scheme]
|
||||
if !exists {
|
||||
return nil, errors.WithStack(ErrSchemeNotRegistered)
|
||||
}
|
||||
|
||||
store, err := factory(url)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return store, nil
|
||||
}
|
|
@ -0,0 +1,5 @@
|
|||
package registry
|
||||
|
||||
import "errors"
|
||||
|
||||
var ErrSchemeNotRegistered = errors.New("scheme was not registered")
|
|
@ -4,17 +4,15 @@ import (
|
|||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/keegancsmith/rpc"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/server/blob"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type BlobBucket struct {
|
||||
name string
|
||||
id blob.BucketID
|
||||
client *rpc.Client
|
||||
name string
|
||||
id blob.BucketID
|
||||
call CallFunc
|
||||
}
|
||||
|
||||
// Size implements storage.BlobBucket
|
||||
|
@ -25,7 +23,7 @@ func (b *BlobBucket) Size(ctx context.Context) (int64, error) {
|
|||
|
||||
reply := blob.GetBucketSizeReply{}
|
||||
|
||||
if err := b.client.Call(ctx, "Service.GetBucketSize", args, &reply); err != nil {
|
||||
if err := b.call(ctx, "Service.GetBucketSize", args, &reply); err != nil {
|
||||
return 0, errors.WithStack(err)
|
||||
}
|
||||
|
||||
|
@ -45,7 +43,7 @@ func (b *BlobBucket) Close() error {
|
|||
|
||||
reply := blob.CloseBucketReply{}
|
||||
|
||||
if err := b.client.Call(context.Background(), "Service.CloseBucket", args, &reply); err != nil {
|
||||
if err := b.call(context.Background(), "Service.CloseBucket", args, &reply); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
|
@ -60,7 +58,7 @@ func (b *BlobBucket) Delete(ctx context.Context, id storage.BlobID) error {
|
|||
|
||||
reply := blob.DeleteBucketReply{}
|
||||
|
||||
if err := b.client.Call(context.Background(), "Service.DeleteBucket", args, &reply); err != nil {
|
||||
if err := b.call(context.Background(), "Service.DeleteBucket", args, &reply); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
|
@ -76,7 +74,7 @@ func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobIn
|
|||
|
||||
reply := blob.GetBlobInfoReply{}
|
||||
|
||||
if err := b.client.Call(context.Background(), "Service.GetBlobInfo", args, &reply); err != nil {
|
||||
if err := b.call(context.Background(), "Service.GetBlobInfo", args, &reply); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
|
@ -91,7 +89,7 @@ func (b *BlobBucket) List(ctx context.Context) ([]storage.BlobInfo, error) {
|
|||
|
||||
reply := blob.ListBlobInfoReply{}
|
||||
|
||||
if err := b.client.Call(context.Background(), "Service.ListBlobInfo", args, &reply); err != nil {
|
||||
if err := b.call(context.Background(), "Service.ListBlobInfo", args, &reply); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
|
@ -107,13 +105,13 @@ func (b *BlobBucket) NewReader(ctx context.Context, id storage.BlobID) (io.ReadS
|
|||
|
||||
reply := blob.NewBlobReaderReply{}
|
||||
|
||||
if err := b.client.Call(context.Background(), "Service.NewBlobReader", args, &reply); err != nil {
|
||||
if err := b.call(context.Background(), "Service.NewBlobReader", args, &reply); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return &blobReaderCloser{
|
||||
readerID: reply.ReaderID,
|
||||
client: b.client,
|
||||
call: b.call,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -126,21 +124,21 @@ func (b *BlobBucket) NewWriter(ctx context.Context, id storage.BlobID) (io.Write
|
|||
|
||||
reply := blob.NewBlobWriterReply{}
|
||||
|
||||
if err := b.client.Call(context.Background(), "Service.NewBlobWriter", args, &reply); err != nil {
|
||||
if err := b.call(context.Background(), "Service.NewBlobWriter", args, &reply); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return &blobWriterCloser{
|
||||
blobID: id,
|
||||
writerID: reply.WriterID,
|
||||
client: b.client,
|
||||
call: b.call,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type blobWriterCloser struct {
|
||||
blobID storage.BlobID
|
||||
writerID blob.WriterID
|
||||
client *rpc.Client
|
||||
call CallFunc
|
||||
}
|
||||
|
||||
// Write implements io.WriteCloser
|
||||
|
@ -152,7 +150,7 @@ func (bwc *blobWriterCloser) Write(data []byte) (int, error) {
|
|||
|
||||
reply := blob.WriteBlobReply{}
|
||||
|
||||
if err := bwc.client.Call(context.Background(), "Service.WriteBlob", args, &reply); err != nil {
|
||||
if err := bwc.call(context.Background(), "Service.WriteBlob", args, &reply); err != nil {
|
||||
return 0, errors.WithStack(err)
|
||||
}
|
||||
|
||||
|
@ -167,7 +165,7 @@ func (bwc *blobWriterCloser) Close() error {
|
|||
|
||||
reply := blob.CloseBucketReply{}
|
||||
|
||||
if err := bwc.client.Call(context.Background(), "Service.CloseWriter", args, &reply); err != nil {
|
||||
if err := bwc.call(context.Background(), "Service.CloseWriter", args, &reply); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
|
@ -176,7 +174,7 @@ func (bwc *blobWriterCloser) Close() error {
|
|||
|
||||
type blobReaderCloser struct {
|
||||
readerID blob.ReaderID
|
||||
client *rpc.Client
|
||||
call func(ctx context.Context, serviceMethod string, args any, reply any) error
|
||||
}
|
||||
|
||||
// Read implements io.ReadSeekCloser
|
||||
|
@ -188,7 +186,7 @@ func (brc *blobReaderCloser) Read(p []byte) (int, error) {
|
|||
|
||||
reply := blob.ReadBlobReply{}
|
||||
|
||||
if err := brc.client.Call(context.Background(), "Service.ReadBlob", args, &reply); err != nil {
|
||||
if err := brc.call(context.Background(), "Service.ReadBlob", args, &reply); err != nil {
|
||||
return 0, errors.WithStack(err)
|
||||
}
|
||||
|
||||
|
@ -211,7 +209,7 @@ func (brc *blobReaderCloser) Seek(offset int64, whence int) (int64, error) {
|
|||
|
||||
reply := blob.SeekBlobReply{}
|
||||
|
||||
if err := brc.client.Call(context.Background(), "Service.SeekBlob", args, &reply); err != nil {
|
||||
if err := brc.call(context.Background(), "Service.SeekBlob", args, &reply); err != nil {
|
||||
return 0, errors.WithStack(err)
|
||||
}
|
||||
|
||||
|
@ -226,7 +224,7 @@ func (brc *blobReaderCloser) Close() error {
|
|||
|
||||
reply := blob.CloseReaderReply{}
|
||||
|
||||
if err := brc.client.Call(context.Background(), "Service.CloseReader", args, &reply); err != nil {
|
||||
if err := brc.call(context.Background(), "Service.CloseReader", args, &reply); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
|
|
|
@ -2,8 +2,10 @@ package client
|
|||
|
||||
import (
|
||||
"context"
|
||||
"net/url"
|
||||
|
||||
"github.com/keegancsmith/rpc"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/server/blob"
|
||||
|
@ -11,7 +13,7 @@ import (
|
|||
)
|
||||
|
||||
type BlobStore struct {
|
||||
client *rpc.Client
|
||||
serverURL *url.URL
|
||||
}
|
||||
|
||||
// DeleteBucket implements storage.BlobStore.
|
||||
|
@ -20,7 +22,7 @@ func (s *BlobStore) DeleteBucket(ctx context.Context, name string) error {
|
|||
BucketName: name,
|
||||
}
|
||||
|
||||
if err := s.client.Call(ctx, "Service.DeleteBucket", args, nil); err != nil {
|
||||
if err := s.call(ctx, "Service.DeleteBucket", args, nil); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
|
@ -33,7 +35,7 @@ func (s *BlobStore) ListBuckets(ctx context.Context) ([]string, error) {
|
|||
|
||||
reply := blob.ListBucketsReply{}
|
||||
|
||||
if err := s.client.Call(ctx, "Service.ListBuckets", args, &reply); err != nil {
|
||||
if err := s.call(ctx, "Service.ListBuckets", args, &reply); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
|
@ -47,19 +49,53 @@ func (s *BlobStore) OpenBucket(ctx context.Context, name string) (storage.BlobBu
|
|||
}
|
||||
reply := &blob.OpenBucketReply{}
|
||||
|
||||
if err := s.client.Call(ctx, "Service.OpenBucket", args, reply); err != nil {
|
||||
if err := s.call(ctx, "Service.OpenBucket", args, reply); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return &BlobBucket{
|
||||
name: name,
|
||||
id: reply.BucketID,
|
||||
client: s.client,
|
||||
name: name,
|
||||
id: reply.BucketID,
|
||||
call: s.call,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewBlobStore(client *rpc.Client) *BlobStore {
|
||||
return &BlobStore{client}
|
||||
func (s *BlobStore) call(ctx context.Context, serviceMethod string, args any, reply any) error {
|
||||
err := s.withClient(ctx, func(ctx context.Context, client *rpc.Client) error {
|
||||
if err := client.Call(ctx, serviceMethod, args, reply); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *BlobStore) withClient(ctx context.Context, fn func(ctx context.Context, client *rpc.Client) error) error {
|
||||
client, err := rpc.DialHTTPPath("tcp", s.serverURL.Host, s.serverURL.Path+"?"+s.serverURL.RawQuery)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := client.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close rpc client", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
if err := fn(ctx, client); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewBlobStore(serverURL *url.URL) *BlobStore {
|
||||
return &BlobStore{serverURL}
|
||||
}
|
||||
|
||||
var _ storage.BlobStore = &BlobStore{}
|
||||
|
|
|
@ -4,12 +4,11 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/keegancsmith/rpc"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/server"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/sqlite"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/testsuite"
|
||||
|
@ -31,18 +30,11 @@ func TestBlobStore(t *testing.T) {
|
|||
defer httpServer.Close()
|
||||
|
||||
serverAddr := httpServer.Listener.Addr()
|
||||
|
||||
client, err := rpc.DialHTTPPath(
|
||||
serverAddr.Network(),
|
||||
serverAddr.String(),
|
||||
"",
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
serverURL := &url.URL{
|
||||
Host: serverAddr.String(),
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
store := NewBlobStore(client)
|
||||
store := NewBlobStore(serverURL)
|
||||
|
||||
testsuite.TestBlobStore(context.Background(), t, store)
|
||||
}
|
||||
|
@ -58,18 +50,11 @@ func BenchmarkBlobStore(t *testing.B) {
|
|||
defer httpServer.Close()
|
||||
|
||||
serverAddr := httpServer.Listener.Addr()
|
||||
|
||||
client, err := rpc.DialHTTPPath(
|
||||
serverAddr.Network(),
|
||||
serverAddr.String(),
|
||||
"",
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
serverURL := &url.URL{
|
||||
Host: serverAddr.String(),
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
store := NewBlobStore(client)
|
||||
store := NewBlobStore(serverURL)
|
||||
|
||||
testsuite.BenchmarkBlobStore(t, store)
|
||||
|
||||
|
|
|
@ -2,9 +2,11 @@ package client
|
|||
|
||||
import (
|
||||
"context"
|
||||
"net/url"
|
||||
|
||||
"github.com/keegancsmith/rpc"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/filter"
|
||||
|
@ -12,7 +14,7 @@ import (
|
|||
)
|
||||
|
||||
type DocumentStore struct {
|
||||
client *rpc.Client
|
||||
serverURL *url.URL
|
||||
}
|
||||
|
||||
// Delete implements storage.DocumentStore.
|
||||
|
@ -24,7 +26,7 @@ func (s *DocumentStore) Delete(ctx context.Context, collection string, id storag
|
|||
|
||||
reply := document.DeleteDocumentReply{}
|
||||
|
||||
if err := s.client.Call(ctx, "Service.DeleteDocument", args, &reply); err != nil {
|
||||
if err := s.call(ctx, "Service.DeleteDocument", args, &reply); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
|
@ -40,7 +42,7 @@ func (s *DocumentStore) Get(ctx context.Context, collection string, id storage.D
|
|||
|
||||
reply := document.GetDocumentReply{}
|
||||
|
||||
if err := s.client.Call(ctx, "Service.GetDocument", args, &reply); err != nil {
|
||||
if err := s.call(ctx, "Service.GetDocument", args, &reply); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
|
@ -68,7 +70,7 @@ func (s *DocumentStore) Query(ctx context.Context, collection string, filter *fi
|
|||
Documents: []storage.Document{},
|
||||
}
|
||||
|
||||
if err := s.client.Call(ctx, "Service.QueryDocuments", args, &reply); err != nil {
|
||||
if err := s.call(ctx, "Service.QueryDocuments", args, &reply); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
|
@ -84,15 +86,49 @@ func (s *DocumentStore) Upsert(ctx context.Context, collection string, doc stora
|
|||
|
||||
reply := document.UpsertDocumentReply{}
|
||||
|
||||
if err := s.client.Call(ctx, "Service.UpsertDocument", args, &reply); err != nil {
|
||||
if err := s.call(ctx, "Service.UpsertDocument", args, &reply); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return reply.Document, nil
|
||||
}
|
||||
|
||||
func NewDocumentStore(client *rpc.Client) *DocumentStore {
|
||||
return &DocumentStore{client}
|
||||
func (s *DocumentStore) call(ctx context.Context, serviceMethod string, args any, reply any) error {
|
||||
err := s.withClient(ctx, func(ctx context.Context, client *rpc.Client) error {
|
||||
if err := client.Call(ctx, serviceMethod, args, reply); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *DocumentStore) withClient(ctx context.Context, fn func(ctx context.Context, client *rpc.Client) error) error {
|
||||
client, err := rpc.DialHTTPPath("tcp", s.serverURL.Host, s.serverURL.Path+"?"+s.serverURL.RawQuery)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := client.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close rpc client", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
if err := fn(ctx, client); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewDocumentStore(url *url.URL) *DocumentStore {
|
||||
return &DocumentStore{url}
|
||||
}
|
||||
|
||||
var _ storage.DocumentStore = &DocumentStore{}
|
||||
|
|
|
@ -4,12 +4,11 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/keegancsmith/rpc"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/server"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/sqlite"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/testsuite"
|
||||
|
@ -32,17 +31,11 @@ func TestDocumentStore(t *testing.T) {
|
|||
|
||||
serverAddr := httpServer.Listener.Addr()
|
||||
|
||||
client, err := rpc.DialHTTPPath(
|
||||
serverAddr.Network(),
|
||||
serverAddr.String(),
|
||||
"",
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
serverURL := &url.URL{
|
||||
Host: serverAddr.String(),
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
store := NewDocumentStore(client)
|
||||
store := NewDocumentStore(serverURL)
|
||||
|
||||
testsuite.TestDocumentStore(context.Background(), t, store)
|
||||
}
|
||||
|
|
|
@ -1,5 +1,9 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
_ "forge.cadoles.com/arcad/edge/pkg/storage/rpc/gob"
|
||||
)
|
||||
|
||||
type CallFunc func(ctx context.Context, serviceMethod string, args any, reply any) error
|
||||
|
|
|
@ -0,0 +1,22 @@
|
|||
package registry
|
||||
|
||||
import (
|
||||
"net/url"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/registry"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/client"
|
||||
)
|
||||
|
||||
func init() {
|
||||
registry.AddDocumentStoreFactory("rpc", documentStoreFactory)
|
||||
registry.AddBlobStoreFactory("rpc", blobStoreFactory)
|
||||
}
|
||||
|
||||
func documentStoreFactory(url *url.URL) (storage.DocumentStore, error) {
|
||||
return client.NewDocumentStore(url), nil
|
||||
}
|
||||
|
||||
func blobStoreFactory(url *url.URL) (storage.BlobStore, error) {
|
||||
return client.NewBlobStore(url), nil
|
||||
}
|
|
@ -0,0 +1,51 @@
|
|||
package registry
|
||||
|
||||
import (
|
||||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/registry"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/sqlite"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func init() {
|
||||
registry.AddDocumentStoreFactory("sqlite", documentStoreFactory)
|
||||
registry.AddBlobStoreFactory("sqlite", blobStoreFactory)
|
||||
}
|
||||
|
||||
func documentStoreFactory(url *url.URL) (storage.DocumentStore, error) {
|
||||
dir := filepath.Dir(url.Host + url.Path)
|
||||
|
||||
if err := os.MkdirAll(dir, os.FileMode(0750)); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
path := url.Host + url.Path + "?" + url.RawQuery
|
||||
|
||||
db, err := sqlite.Open(path)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return sqlite.NewDocumentStoreWithDB(db), nil
|
||||
}
|
||||
|
||||
func blobStoreFactory(url *url.URL) (storage.BlobStore, error) {
|
||||
dir := filepath.Dir(url.Host + url.Path)
|
||||
|
||||
if err := os.MkdirAll(dir, os.FileMode(0750)); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
path := url.Host + url.Path + "?" + url.RawQuery
|
||||
|
||||
db, err := sqlite.Open(path)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return sqlite.NewBlobStoreWithDB(db), nil
|
||||
}
|
Loading…
Reference in New Issue