Compare commits
1 Commits
34b85e6a96
...
8e574c299b
Author | SHA1 | Date | |
---|---|---|---|
8e574c299b |
@ -11,16 +11,18 @@ import (
|
||||
"github.com/keegancsmith/rpc"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server"
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/go-chi/chi/v5/middleware"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/urfave/cli/v2"
|
||||
|
||||
// Register storage drivers
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver"
|
||||
_ "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server"
|
||||
_ "forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/share"
|
||||
)
|
||||
|
||||
func Run() *cli.Command {
|
||||
@ -43,6 +45,11 @@ func Run() *cli.Command {
|
||||
EnvVars: []string{"STORAGE_SERVER_DOCUMENTSTORE_DSN_PATTERN"},
|
||||
Value: fmt.Sprintf("sqlite://data/%%TENANT%%/%%APPID%%/documentstore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", (60 * time.Second).Milliseconds()),
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "sharestore-dsn-pattern",
|
||||
EnvVars: []string{"STORAGE_SERVER_SHARESTORE_DSN_PATTERN"},
|
||||
Value: fmt.Sprintf("sqlite://data/%%TENANT%%/sharestore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", (60 * time.Second).Milliseconds()),
|
||||
},
|
||||
&cli.DurationFlag{
|
||||
Name: "cache-ttl",
|
||||
EnvVars: []string{"STORAGE_SERVER_CACHE_TTL"},
|
||||
@ -56,67 +63,47 @@ func Run() *cli.Command {
|
||||
},
|
||||
Action: func(ctx *cli.Context) error {
|
||||
addr := ctx.String("address")
|
||||
blobstoreDSNPattern := ctx.String("blobstore-dsn-pattern")
|
||||
documentstoreDSNPattern := ctx.String("documentstore-dsn-pattern")
|
||||
blobStoreDSNPattern := ctx.String("blobstore-dsn-pattern")
|
||||
documentStoreDSNPattern := ctx.String("documentstore-dsn-pattern")
|
||||
shareStoreDSNPattern := ctx.String("sharestore-dsn-pattern")
|
||||
cacheSize := ctx.Int("cache-size")
|
||||
cacheTTL := ctx.Duration("cache-ttl")
|
||||
|
||||
router := chi.NewRouter()
|
||||
|
||||
getBlobStoreServer := createGetCachedStoreServer(
|
||||
func(dsn string) (storage.BlobStore, error) {
|
||||
return driver.NewBlobStore(dsn)
|
||||
},
|
||||
func(store storage.BlobStore) *rpc.Server {
|
||||
return server.NewBlobStoreServer(store)
|
||||
},
|
||||
)
|
||||
|
||||
getShareStoreServer := createGetCachedStoreServer(
|
||||
func(dsn string) (share.Store, error) {
|
||||
return driver.NewShareStore(dsn)
|
||||
},
|
||||
func(store share.Store) *rpc.Server {
|
||||
return server.NewShareStoreServer(store)
|
||||
},
|
||||
)
|
||||
|
||||
getDocumentStoreServer := createGetCachedStoreServer(
|
||||
func(dsn string) (storage.DocumentStore, error) {
|
||||
return driver.NewDocumentStore(dsn)
|
||||
},
|
||||
func(store storage.DocumentStore) *rpc.Server {
|
||||
return server.NewDocumentStoreServer(store)
|
||||
},
|
||||
)
|
||||
|
||||
router.Use(middleware.RealIP)
|
||||
router.Use(middleware.Logger)
|
||||
|
||||
router.Handle("/blobstore", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
tenant := r.URL.Query().Get("tenant")
|
||||
if tenant == "" {
|
||||
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
appID := r.URL.Query().Get("appId")
|
||||
if tenant == "" {
|
||||
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
server, err := getBlobStoreStoreServer(cacheSize, cacheTTL, tenant, appID, blobstoreDSNPattern)
|
||||
if err != nil {
|
||||
logger.Error(r.Context(), "could not retrieve blob store server", logger.E(errors.WithStack(err)), logger.F("tenant", tenant))
|
||||
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
server.ServeHTTP(w, r)
|
||||
}))
|
||||
|
||||
router.Handle("/documentstore", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
tenant := r.URL.Query().Get("tenant")
|
||||
if tenant == "" {
|
||||
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
appID := r.URL.Query().Get("appId")
|
||||
if tenant == "" {
|
||||
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
server, err := getDocumentStoreServer(cacheSize, cacheTTL, tenant, appID, documentstoreDSNPattern)
|
||||
if err != nil {
|
||||
logger.Error(r.Context(), "could not retrieve document store server", logger.E(errors.WithStack(err)), logger.F("tenant", tenant))
|
||||
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
server.ServeHTTP(w, r)
|
||||
}))
|
||||
router.Handle("/blobstore", createStoreHandler(getBlobStoreServer, blobStoreDSNPattern, cacheSize, cacheTTL))
|
||||
router.Handle("/documentstore", createStoreHandler(getDocumentStoreServer, documentStoreDSNPattern, cacheSize, cacheTTL))
|
||||
router.Handle("/sharestore", createStoreHandler(getShareStoreServer, shareStoreDSNPattern, cacheSize, cacheTTL))
|
||||
|
||||
if err := http.ListenAndServe(addr, router); err != nil {
|
||||
return errors.WithStack(err)
|
||||
@ -127,66 +114,66 @@ func Run() *cli.Command {
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
documentStoreCache *expirable.LRU[string, *rpc.Server]
|
||||
initDocumentStoreCache sync.Once
|
||||
)
|
||||
type getRPCServerFunc func(cacheSize int, cacheTTL time.Duration, tenant, appID, dsnPattern string) (*rpc.Server, error)
|
||||
|
||||
func getDocumentStoreServer(cacheSize int, cacheTTL time.Duration, tenant, appID, dsnPattern string) (*rpc.Server, error) {
|
||||
initDocumentStoreCache.Do(func() {
|
||||
documentStoreCache = expirable.NewLRU[string, *rpc.Server](cacheSize, nil, cacheTTL)
|
||||
})
|
||||
func createGetCachedStoreServer[T any](storeFactory func(dsn string) (T, error), serverFactory func(store T) *rpc.Server) getRPCServerFunc {
|
||||
var (
|
||||
cache *expirable.LRU[string, *rpc.Server]
|
||||
initCache sync.Once
|
||||
)
|
||||
|
||||
key := fmt.Sprintf("%s:%s", tenant, appID)
|
||||
return func(cacheSize int, cacheTTL time.Duration, tenant, appID, dsnPattern string) (*rpc.Server, error) {
|
||||
initCache.Do(func() {
|
||||
cache = expirable.NewLRU[string, *rpc.Server](cacheSize, nil, cacheTTL)
|
||||
})
|
||||
|
||||
documentStoreServer, _ := documentStoreCache.Get(key)
|
||||
if documentStoreServer != nil {
|
||||
return documentStoreServer, nil
|
||||
key := fmt.Sprintf("%s:%s", tenant, appID)
|
||||
|
||||
storeServer, _ := cache.Get(key)
|
||||
if storeServer != nil {
|
||||
return storeServer, nil
|
||||
}
|
||||
|
||||
dsn := strings.ReplaceAll(dsnPattern, "%TENANT%", tenant)
|
||||
dsn = strings.ReplaceAll(dsn, "%APPID%", appID)
|
||||
|
||||
store, err := storeFactory(dsn)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
storeServer = serverFactory(store)
|
||||
|
||||
cache.Add(key, storeServer)
|
||||
|
||||
return storeServer, nil
|
||||
}
|
||||
|
||||
dsn := strings.ReplaceAll(dsnPattern, "%TENANT%", tenant)
|
||||
dsn = strings.ReplaceAll(dsn, "%APPID%", appID)
|
||||
|
||||
documentStore, err := driver.NewDocumentStore(dsn)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
documentStoreServer = server.NewDocumentStoreServer(documentStore)
|
||||
|
||||
documentStoreCache.Add(key, documentStoreServer)
|
||||
|
||||
return documentStoreServer, nil
|
||||
}
|
||||
|
||||
var (
|
||||
blobStoreCache *expirable.LRU[string, *rpc.Server]
|
||||
initBlobStoreCache sync.Once
|
||||
)
|
||||
func createStoreHandler(getStoreServer getRPCServerFunc, dsnPattern string, cacheSize int, cacheTTL time.Duration) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
tenant := r.URL.Query().Get("tenant")
|
||||
if tenant == "" {
|
||||
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
|
||||
|
||||
func getBlobStoreStoreServer(cacheSize int, cacheTTL time.Duration, tenant, appID, dsnPattern string) (*rpc.Server, error) {
|
||||
initBlobStoreCache.Do(func() {
|
||||
blobStoreCache = expirable.NewLRU[string, *rpc.Server](cacheSize, nil, cacheTTL)
|
||||
return
|
||||
}
|
||||
|
||||
appID := r.URL.Query().Get("appId")
|
||||
if tenant == "" {
|
||||
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
server, err := getStoreServer(cacheSize, cacheTTL, tenant, appID, dsnPattern)
|
||||
if err != nil {
|
||||
logger.Error(r.Context(), "could not retrieve store server", logger.E(errors.WithStack(err)), logger.F("tenant", tenant))
|
||||
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
server.ServeHTTP(w, r)
|
||||
})
|
||||
|
||||
key := fmt.Sprintf("%s:%s", tenant, appID)
|
||||
|
||||
blobStoreServer, _ := blobStoreCache.Get(key)
|
||||
if blobStoreServer != nil {
|
||||
return blobStoreServer, nil
|
||||
}
|
||||
|
||||
dsn := strings.ReplaceAll(dsnPattern, "%TENANT%", tenant)
|
||||
dsn = strings.ReplaceAll(dsn, "%APPID%", appID)
|
||||
|
||||
blobStore, err := driver.NewBlobStore(dsn)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
blobStoreServer = server.NewBlobStoreServer(blobStore)
|
||||
|
||||
blobStoreCache.Add(key, blobStoreServer)
|
||||
|
||||
return blobStoreServer, nil
|
||||
}
|
||||
|
17
pkg/storage/driver/rpc/client/error.go
Normal file
17
pkg/storage/driver/rpc/client/error.go
Normal file
@ -0,0 +1,17 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/share"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func remapShareError(err error) error {
|
||||
switch errors.Cause(err).Error() {
|
||||
case share.ErrAttributeRequired.Error():
|
||||
return share.ErrAttributeRequired
|
||||
case share.ErrNotFound.Error():
|
||||
return share.ErrNotFound
|
||||
default:
|
||||
return err
|
||||
}
|
||||
}
|
150
pkg/storage/driver/rpc/client/share_store.go
Normal file
150
pkg/storage/driver/rpc/client/share_store.go
Normal file
@ -0,0 +1,150 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/url"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/app"
|
||||
server "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server/share"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/share"
|
||||
"github.com/keegancsmith/rpc"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
type ShareStore struct {
|
||||
serverURL *url.URL
|
||||
}
|
||||
|
||||
// DeleteAttributes implements share.Store.
|
||||
func (s *ShareStore) DeleteAttributes(ctx context.Context, origin app.ID, resourceID share.ResourceID, names ...string) error {
|
||||
args := server.DeleteAttributesArgs{
|
||||
Origin: origin,
|
||||
ResourceID: resourceID,
|
||||
Names: names,
|
||||
}
|
||||
|
||||
reply := server.DeleteAttributesArgs{}
|
||||
|
||||
if err := s.call(ctx, "Service.DeleteAttributes", args, &reply); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteResource implements share.Store.
|
||||
func (s *ShareStore) DeleteResource(ctx context.Context, origin app.ID, resourceID share.ResourceID) error {
|
||||
args := server.DeleteResourceArgs{
|
||||
Origin: origin,
|
||||
ResourceID: resourceID,
|
||||
}
|
||||
|
||||
reply := server.DeleteResourceReply{}
|
||||
|
||||
if err := s.call(ctx, "Service.DeleteResource", args, &reply); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// FindResources implements share.Store.
|
||||
func (s *ShareStore) FindResources(ctx context.Context, funcs ...share.FindResourcesOptionFunc) ([]share.Resource, error) {
|
||||
options := share.NewFindResourcesOptions(funcs...)
|
||||
|
||||
args := server.FindResourcesArgs{
|
||||
Options: options,
|
||||
}
|
||||
|
||||
reply := server.FindResourcesReply{}
|
||||
|
||||
if err := s.call(ctx, "Service.FindResources", args, &reply); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
resources := make([]share.Resource, len(reply.Resources))
|
||||
for idx, res := range reply.Resources {
|
||||
resources[idx] = res
|
||||
}
|
||||
|
||||
return resources, nil
|
||||
}
|
||||
|
||||
// GetResource implements share.Store.
|
||||
func (s *ShareStore) GetResource(ctx context.Context, origin app.ID, resourceID share.ResourceID) (share.Resource, error) {
|
||||
args := server.GetResourceArgs{
|
||||
Origin: origin,
|
||||
ResourceID: resourceID,
|
||||
}
|
||||
|
||||
reply := server.GetResourceReply{}
|
||||
|
||||
if err := s.call(ctx, "Service.GetResource", args, &reply); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return reply.Resource, nil
|
||||
}
|
||||
|
||||
// UpdateAttributes implements share.Store.
|
||||
func (s *ShareStore) UpdateAttributes(ctx context.Context, origin app.ID, resourceID share.ResourceID, attributes ...share.Attribute) (share.Resource, error) {
|
||||
serializableAttributes := make([]*server.SerializableAttribute, len(attributes))
|
||||
for attrIdx, attr := range attributes {
|
||||
serializableAttributes[attrIdx] = server.FromAttribute(attr)
|
||||
}
|
||||
|
||||
args := server.UpdateAttributesArgs{
|
||||
Origin: origin,
|
||||
ResourceID: resourceID,
|
||||
Attributes: serializableAttributes,
|
||||
}
|
||||
|
||||
reply := server.UpdateAttributesReply{}
|
||||
|
||||
if err := s.call(ctx, "Service.UpdateAttributes", args, &reply); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return reply.Resource, nil
|
||||
}
|
||||
|
||||
func (s *ShareStore) call(ctx context.Context, serviceMethod string, args any, reply any) error {
|
||||
err := s.withClient(ctx, func(ctx context.Context, client *rpc.Client) error {
|
||||
if err := client.Call(ctx, serviceMethod, args, reply); err != nil {
|
||||
return errors.WithStack(remapShareError(err))
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *ShareStore) withClient(ctx context.Context, fn func(ctx context.Context, client *rpc.Client) error) error {
|
||||
client, err := rpc.DialHTTPPath("tcp", s.serverURL.Host, s.serverURL.Path+"?"+s.serverURL.RawQuery)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := client.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close rpc client", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
if err := fn(ctx, client); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewShareStore(url *url.URL) *ShareStore {
|
||||
return &ShareStore{url}
|
||||
}
|
||||
|
||||
var _ share.Store = &ShareStore{}
|
67
pkg/storage/driver/rpc/client/share_store_test.go
Normal file
67
pkg/storage/driver/rpc/client/share_store_test.go
Normal file
@ -0,0 +1,67 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/share"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/share/testsuite"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
func TestShareStore(t *testing.T) {
|
||||
t.Parallel()
|
||||
if testing.Verbose() {
|
||||
logger.SetLevel(logger.LevelDebug)
|
||||
}
|
||||
|
||||
testsuite.TestStore(t, func(testName string) (share.Store, error) {
|
||||
httpServer, err := startNewShareStoreServer(testName)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
serverAddr := httpServer.Listener.Addr()
|
||||
serverURL := &url.URL{
|
||||
Host: serverAddr.String(),
|
||||
}
|
||||
|
||||
return NewShareStore(serverURL), nil
|
||||
})
|
||||
}
|
||||
|
||||
func getSQLiteShareStore(testName string) (*sqlite.ShareStore, error) {
|
||||
filename := strings.ToLower(strings.ReplaceAll(testName, " ", "_"))
|
||||
|
||||
file := fmt.Sprintf("./testdata/sharestore_test_%s.sqlite", filename)
|
||||
|
||||
if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
|
||||
store := sqlite.NewShareStore(dsn)
|
||||
|
||||
return store, nil
|
||||
}
|
||||
|
||||
func startNewShareStoreServer(testName string) (*httptest.Server, error) {
|
||||
store, err := getSQLiteShareStore(testName)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
server := server.NewShareStoreServer(store)
|
||||
|
||||
httpServer := httptest.NewServer(server)
|
||||
|
||||
return httpServer, nil
|
||||
}
|
@ -6,6 +6,8 @@ import (
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server/blob"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server/document"
|
||||
shareService "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server/share"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/share"
|
||||
)
|
||||
|
||||
func NewBlobStoreServer(store storage.BlobStore) *rpc.Server {
|
||||
@ -19,3 +21,9 @@ func NewDocumentStoreServer(store storage.DocumentStore) *rpc.Server {
|
||||
server.Register(document.NewService(store))
|
||||
return server
|
||||
}
|
||||
|
||||
func NewShareStoreServer(store share.Store) *rpc.Server {
|
||||
server := rpc.NewServer()
|
||||
server.Register(shareService.NewService(store))
|
||||
return server
|
||||
}
|
||||
|
28
pkg/storage/driver/rpc/server/share/delete_attributes.go
Normal file
28
pkg/storage/driver/rpc/server/share/delete_attributes.go
Normal file
@ -0,0 +1,28 @@
|
||||
package share
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/app"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/share"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type DeleteAttributesArgs struct {
|
||||
Origin app.ID
|
||||
ResourceID share.ResourceID
|
||||
Names []string
|
||||
}
|
||||
|
||||
type DeleteAttributesReply struct {
|
||||
}
|
||||
|
||||
func (s *Service) DeleteAttributes(ctx context.Context, args DeleteAttributesArgs, reply *DeleteAttributesReply) error {
|
||||
if err := s.store.DeleteAttributes(ctx, args.Origin, args.ResourceID, args.Names...); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
*reply = DeleteAttributesReply{}
|
||||
|
||||
return nil
|
||||
}
|
27
pkg/storage/driver/rpc/server/share/delete_resource.go
Normal file
27
pkg/storage/driver/rpc/server/share/delete_resource.go
Normal file
@ -0,0 +1,27 @@
|
||||
package share
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/app"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/share"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type DeleteResourceArgs struct {
|
||||
Origin app.ID
|
||||
ResourceID share.ResourceID
|
||||
}
|
||||
|
||||
type DeleteResourceReply struct {
|
||||
}
|
||||
|
||||
func (s *Service) DeleteResource(ctx context.Context, args DeleteResourceArgs, reply *DeleteResourceReply) error {
|
||||
if err := s.store.DeleteResource(ctx, args.Origin, args.ResourceID); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
*reply = DeleteResourceReply{}
|
||||
|
||||
return nil
|
||||
}
|
41
pkg/storage/driver/rpc/server/share/find_resources.go
Normal file
41
pkg/storage/driver/rpc/server/share/find_resources.go
Normal file
@ -0,0 +1,41 @@
|
||||
package share
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/share"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type FindResourcesArgs struct {
|
||||
Options *share.FindResourcesOptions
|
||||
}
|
||||
|
||||
type FindResourcesReply struct {
|
||||
Resources []*SerializableResource
|
||||
}
|
||||
|
||||
func (s *Service) FindResources(ctx context.Context, args FindResourcesArgs, reply *FindResourcesReply) error {
|
||||
resources, err := s.store.FindResources(ctx, withFindResourcesOptions(args.Options))
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
serializableResources := make([]*SerializableResource, len(resources))
|
||||
for resIdx, r := range resources {
|
||||
serializableResources[resIdx] = FromResource(r)
|
||||
}
|
||||
|
||||
*reply = FindResourcesReply{
|
||||
Resources: serializableResources,
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func withFindResourcesOptions(opts *share.FindResourcesOptions) share.FindResourcesOptionFunc {
|
||||
return func(o *share.FindResourcesOptions) {
|
||||
o.Name = opts.Name
|
||||
o.ValueType = opts.ValueType
|
||||
}
|
||||
}
|
31
pkg/storage/driver/rpc/server/share/get_resource.go
Normal file
31
pkg/storage/driver/rpc/server/share/get_resource.go
Normal file
@ -0,0 +1,31 @@
|
||||
package share
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/app"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/share"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type GetResourceArgs struct {
|
||||
Origin app.ID
|
||||
ResourceID share.ResourceID
|
||||
}
|
||||
|
||||
type GetResourceReply struct {
|
||||
Resource *SerializableResource
|
||||
}
|
||||
|
||||
func (s *Service) GetResource(ctx context.Context, args GetResourceArgs, reply *GetResourceReply) error {
|
||||
resource, err := s.store.GetResource(ctx, args.Origin, args.ResourceID)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
*reply = GetResourceReply{
|
||||
Resource: FromResource(resource),
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
94
pkg/storage/driver/rpc/server/share/serializable.go
Normal file
94
pkg/storage/driver/rpc/server/share/serializable.go
Normal file
@ -0,0 +1,94 @@
|
||||
package share
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/app"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/share"
|
||||
)
|
||||
|
||||
func FromResource(res share.Resource) *SerializableResource {
|
||||
serializableAttributes := make([]*SerializableAttribute, len(res.Attributes()))
|
||||
for attrIdx, attr := range res.Attributes() {
|
||||
serializableAttributes[attrIdx] = FromAttribute(attr)
|
||||
}
|
||||
|
||||
return &SerializableResource{
|
||||
ID_: res.ID(),
|
||||
Origin_: res.Origin(),
|
||||
Attributes_: serializableAttributes,
|
||||
}
|
||||
}
|
||||
|
||||
func FromAttribute(attr share.Attribute) *SerializableAttribute {
|
||||
return &SerializableAttribute{
|
||||
Name_: attr.Name(),
|
||||
Value_: attr.Value(),
|
||||
Type_: attr.Type(),
|
||||
UpdatedAt_: attr.UpdatedAt(),
|
||||
CreatedAt_: attr.CreatedAt(),
|
||||
}
|
||||
}
|
||||
|
||||
type SerializableResource struct {
|
||||
ID_ share.ResourceID
|
||||
Origin_ app.ID
|
||||
Attributes_ []*SerializableAttribute
|
||||
}
|
||||
|
||||
// Attributes implements share.Resource.
|
||||
func (r *SerializableResource) Attributes() []share.Attribute {
|
||||
attributes := make([]share.Attribute, len(r.Attributes_))
|
||||
for idx, attr := range r.Attributes_ {
|
||||
attributes[idx] = attr
|
||||
}
|
||||
|
||||
return attributes
|
||||
}
|
||||
|
||||
// ID implements share.Resource.
|
||||
func (r *SerializableResource) ID() share.ResourceID {
|
||||
return r.ID_
|
||||
}
|
||||
|
||||
// Origin implements share.Resource.
|
||||
func (r *SerializableResource) Origin() app.ID {
|
||||
return r.Origin_
|
||||
}
|
||||
|
||||
var _ share.Resource = &SerializableResource{}
|
||||
|
||||
type SerializableAttribute struct {
|
||||
Name_ string
|
||||
Value_ any
|
||||
Type_ share.ValueType
|
||||
UpdatedAt_ time.Time
|
||||
CreatedAt_ time.Time
|
||||
}
|
||||
|
||||
// CreatedAt implements share.Attribute.
|
||||
func (a *SerializableAttribute) CreatedAt() time.Time {
|
||||
return a.CreatedAt_
|
||||
}
|
||||
|
||||
// Name implements share.Attribute.
|
||||
func (a *SerializableAttribute) Name() string {
|
||||
return a.Name_
|
||||
}
|
||||
|
||||
// Type implements share.Attribute.
|
||||
func (a *SerializableAttribute) Type() share.ValueType {
|
||||
return a.Type_
|
||||
}
|
||||
|
||||
// UpdatedAt implements share.Attribute.
|
||||
func (a *SerializableAttribute) UpdatedAt() time.Time {
|
||||
return a.UpdatedAt_
|
||||
}
|
||||
|
||||
// Value implements share.Attribute.
|
||||
func (a *SerializableAttribute) Value() any {
|
||||
return a.Value_
|
||||
}
|
||||
|
||||
var _ share.Attribute = &SerializableAttribute{}
|
13
pkg/storage/driver/rpc/server/share/service.go
Normal file
13
pkg/storage/driver/rpc/server/share/service.go
Normal file
@ -0,0 +1,13 @@
|
||||
package share
|
||||
|
||||
import (
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/share"
|
||||
)
|
||||
|
||||
type Service struct {
|
||||
store share.Store
|
||||
}
|
||||
|
||||
func NewService(store share.Store) *Service {
|
||||
return &Service{store}
|
||||
}
|
37
pkg/storage/driver/rpc/server/share/update_attributes.go
Normal file
37
pkg/storage/driver/rpc/server/share/update_attributes.go
Normal file
@ -0,0 +1,37 @@
|
||||
package share
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/app"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/share"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type UpdateAttributesArgs struct {
|
||||
Origin app.ID
|
||||
ResourceID share.ResourceID
|
||||
Attributes []*SerializableAttribute
|
||||
}
|
||||
|
||||
type UpdateAttributesReply struct {
|
||||
Resource *SerializableResource
|
||||
}
|
||||
|
||||
func (s *Service) UpdateAttributes(ctx context.Context, args UpdateAttributesArgs, reply *UpdateAttributesReply) error {
|
||||
attributes := make([]share.Attribute, len(args.Attributes))
|
||||
for idx, attr := range args.Attributes {
|
||||
attributes[idx] = attr
|
||||
}
|
||||
|
||||
resource, err := s.store.UpdateAttributes(ctx, args.Origin, args.ResourceID, attributes...)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
*reply = UpdateAttributesReply{
|
||||
Resource: FromResource(resource),
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
@ -115,7 +115,7 @@ func (s *ShareStore) DeleteResource(ctx context.Context, origin app.ID, resource
|
||||
|
||||
// FindResources implements share.Repository
|
||||
func (s *ShareStore) FindResources(ctx context.Context, funcs ...share.FindResourcesOptionFunc) ([]share.Resource, error) {
|
||||
opts := share.FillFindResourcesOptions(funcs...)
|
||||
opts := share.NewFindResourcesOptions(funcs...)
|
||||
|
||||
var resources []share.Resource
|
||||
|
||||
|
@ -7,7 +7,7 @@ type FindResourcesOptions struct {
|
||||
ValueType *ValueType
|
||||
}
|
||||
|
||||
func FillFindResourcesOptions(funcs ...FindResourcesOptionFunc) *FindResourcesOptions {
|
||||
func NewFindResourcesOptions(funcs ...FindResourcesOptionFunc) *FindResourcesOptions {
|
||||
opts := &FindResourcesOptions{}
|
||||
|
||||
for _, fn := range funcs {
|
||||
|
@ -10,7 +10,6 @@ type NewTestStoreFunc func(testname string) (share.Store, error)
|
||||
|
||||
func TestStore(t *testing.T, newStore NewTestStoreFunc) {
|
||||
t.Run("Cases", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
runRepositoryTests(t, newStore)
|
||||
})
|
||||
}
|
||||
|
@ -33,7 +33,7 @@ var repositoryTestCases = []repositoryTestCase{
|
||||
}
|
||||
|
||||
if !errors.Is(err, share.ErrAttributeRequired) {
|
||||
return errors.Errorf("err: expected share.ErrAttributeRequired, got '%+v'", err)
|
||||
return errors.Errorf("err: expected share.ErrAttributeRequired, got '%v'", err)
|
||||
}
|
||||
|
||||
attributes := []share.Attribute{
|
||||
@ -274,8 +274,6 @@ func runRepositoryTests(t *testing.T, newRepo NewTestStoreFunc) {
|
||||
for _, tc := range repositoryTestCases {
|
||||
func(tc repositoryTestCase) {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
if tc.Skip {
|
||||
t.SkipNow()
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user