edge/pkg/storage/driver/rpc/client/blob_store.go

102 lines
2.3 KiB
Go
Raw Permalink Normal View History

package client
import (
"context"
"net/url"
"github.com/keegancsmith/rpc"
"gitlab.com/wpetit/goweb/logger"
"forge.cadoles.com/arcad/edge/pkg/storage"
"forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server/blob"
"github.com/pkg/errors"
)
type BlobStore struct {
serverURL *url.URL
}
// DeleteBucket implements storage.BlobStore.
func (s *BlobStore) DeleteBucket(ctx context.Context, name string) error {
args := &blob.DeleteBucketArgs{
BucketName: name,
}
if err := s.call(ctx, "Service.DeleteBucket", args, nil); err != nil {
return errors.WithStack(err)
}
return nil
}
// ListBuckets implements storage.BlobStore.
func (s *BlobStore) ListBuckets(ctx context.Context) ([]string, error) {
args := &blob.ListBucketsArgs{}
reply := blob.ListBucketsReply{}
if err := s.call(ctx, "Service.ListBuckets", args, &reply); err != nil {
return nil, errors.WithStack(err)
}
return reply.Buckets, nil
}
// OpenBucket implements storage.BlobStore.
func (s *BlobStore) OpenBucket(ctx context.Context, name string) (storage.BlobBucket, error) {
args := &blob.OpenBucketArgs{
BucketName: name,
}
reply := &blob.OpenBucketReply{}
if err := s.call(ctx, "Service.OpenBucket", args, reply); err != nil {
return nil, errors.WithStack(err)
}
return &BlobBucket{
name: name,
id: reply.BucketID,
call: s.call,
}, nil
}
func (s *BlobStore) call(ctx context.Context, serviceMethod string, args any, reply any) error {
err := s.withClient(ctx, func(ctx context.Context, client *rpc.Client) error {
if err := client.Call(ctx, serviceMethod, args, reply); err != nil {
return errors.WithStack(err)
}
return nil
})
if err != nil {
return errors.WithStack(err)
}
return nil
}
func (s *BlobStore) withClient(ctx context.Context, fn func(ctx context.Context, client *rpc.Client) error) error {
client, err := rpc.DialHTTPPath("tcp", s.serverURL.Host, s.serverURL.Path+"?"+s.serverURL.RawQuery)
if err != nil {
return errors.WithStack(err)
}
defer func() {
if err := client.Close(); err != nil {
logger.Error(ctx, "could not close rpc client", logger.CapturedE(errors.WithStack(err)))
}
}()
if err := fn(ctx, client); err != nil {
return errors.WithStack(err)
}
return nil
}
func NewBlobStore(serverURL *url.URL) *BlobStore {
return &BlobStore{serverURL}
}
var _ storage.BlobStore = &BlobStore{}