William Petit
8e574c299b
All checks were successful
arcad/edge/pipeline/pr-master This commit looks good
151 lines
3.9 KiB
Go
151 lines
3.9 KiB
Go
package client
|
|
|
|
import (
|
|
"context"
|
|
"net/url"
|
|
|
|
"forge.cadoles.com/arcad/edge/pkg/app"
|
|
server "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server/share"
|
|
"forge.cadoles.com/arcad/edge/pkg/storage/share"
|
|
"github.com/keegancsmith/rpc"
|
|
"github.com/pkg/errors"
|
|
"gitlab.com/wpetit/goweb/logger"
|
|
)
|
|
|
|
type ShareStore struct {
|
|
serverURL *url.URL
|
|
}
|
|
|
|
// DeleteAttributes implements share.Store.
|
|
func (s *ShareStore) DeleteAttributes(ctx context.Context, origin app.ID, resourceID share.ResourceID, names ...string) error {
|
|
args := server.DeleteAttributesArgs{
|
|
Origin: origin,
|
|
ResourceID: resourceID,
|
|
Names: names,
|
|
}
|
|
|
|
reply := server.DeleteAttributesArgs{}
|
|
|
|
if err := s.call(ctx, "Service.DeleteAttributes", args, &reply); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// DeleteResource implements share.Store.
|
|
func (s *ShareStore) DeleteResource(ctx context.Context, origin app.ID, resourceID share.ResourceID) error {
|
|
args := server.DeleteResourceArgs{
|
|
Origin: origin,
|
|
ResourceID: resourceID,
|
|
}
|
|
|
|
reply := server.DeleteResourceReply{}
|
|
|
|
if err := s.call(ctx, "Service.DeleteResource", args, &reply); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// FindResources implements share.Store.
|
|
func (s *ShareStore) FindResources(ctx context.Context, funcs ...share.FindResourcesOptionFunc) ([]share.Resource, error) {
|
|
options := share.NewFindResourcesOptions(funcs...)
|
|
|
|
args := server.FindResourcesArgs{
|
|
Options: options,
|
|
}
|
|
|
|
reply := server.FindResourcesReply{}
|
|
|
|
if err := s.call(ctx, "Service.FindResources", args, &reply); err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
resources := make([]share.Resource, len(reply.Resources))
|
|
for idx, res := range reply.Resources {
|
|
resources[idx] = res
|
|
}
|
|
|
|
return resources, nil
|
|
}
|
|
|
|
// GetResource implements share.Store.
|
|
func (s *ShareStore) GetResource(ctx context.Context, origin app.ID, resourceID share.ResourceID) (share.Resource, error) {
|
|
args := server.GetResourceArgs{
|
|
Origin: origin,
|
|
ResourceID: resourceID,
|
|
}
|
|
|
|
reply := server.GetResourceReply{}
|
|
|
|
if err := s.call(ctx, "Service.GetResource", args, &reply); err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
return reply.Resource, nil
|
|
}
|
|
|
|
// UpdateAttributes implements share.Store.
|
|
func (s *ShareStore) UpdateAttributes(ctx context.Context, origin app.ID, resourceID share.ResourceID, attributes ...share.Attribute) (share.Resource, error) {
|
|
serializableAttributes := make([]*server.SerializableAttribute, len(attributes))
|
|
for attrIdx, attr := range attributes {
|
|
serializableAttributes[attrIdx] = server.FromAttribute(attr)
|
|
}
|
|
|
|
args := server.UpdateAttributesArgs{
|
|
Origin: origin,
|
|
ResourceID: resourceID,
|
|
Attributes: serializableAttributes,
|
|
}
|
|
|
|
reply := server.UpdateAttributesReply{}
|
|
|
|
if err := s.call(ctx, "Service.UpdateAttributes", args, &reply); err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
return reply.Resource, nil
|
|
}
|
|
|
|
func (s *ShareStore) call(ctx context.Context, serviceMethod string, args any, reply any) error {
|
|
err := s.withClient(ctx, func(ctx context.Context, client *rpc.Client) error {
|
|
if err := client.Call(ctx, serviceMethod, args, reply); err != nil {
|
|
return errors.WithStack(remapShareError(err))
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *ShareStore) withClient(ctx context.Context, fn func(ctx context.Context, client *rpc.Client) error) error {
|
|
client, err := rpc.DialHTTPPath("tcp", s.serverURL.Host, s.serverURL.Path+"?"+s.serverURL.RawQuery)
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
defer func() {
|
|
if err := client.Close(); err != nil {
|
|
logger.Error(ctx, "could not close rpc client", logger.E(errors.WithStack(err)))
|
|
}
|
|
}()
|
|
|
|
if err := fn(ctx, client); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func NewShareStore(url *url.URL) *ShareStore {
|
|
return &ShareStore{url}
|
|
}
|
|
|
|
var _ share.Store = &ShareStore{}
|