102 lines
2.3 KiB
Go
102 lines
2.3 KiB
Go
|
package client
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"net/url"
|
||
|
|
||
|
"github.com/keegancsmith/rpc"
|
||
|
"gitlab.com/wpetit/goweb/logger"
|
||
|
|
||
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||
|
"forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server/blob"
|
||
|
"github.com/pkg/errors"
|
||
|
)
|
||
|
|
||
|
type BlobStore struct {
|
||
|
serverURL *url.URL
|
||
|
}
|
||
|
|
||
|
// DeleteBucket implements storage.BlobStore.
|
||
|
func (s *BlobStore) DeleteBucket(ctx context.Context, name string) error {
|
||
|
args := &blob.DeleteBucketArgs{
|
||
|
BucketName: name,
|
||
|
}
|
||
|
|
||
|
if err := s.call(ctx, "Service.DeleteBucket", args, nil); err != nil {
|
||
|
return errors.WithStack(err)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// ListBuckets implements storage.BlobStore.
|
||
|
func (s *BlobStore) ListBuckets(ctx context.Context) ([]string, error) {
|
||
|
args := &blob.ListBucketsArgs{}
|
||
|
|
||
|
reply := blob.ListBucketsReply{}
|
||
|
|
||
|
if err := s.call(ctx, "Service.ListBuckets", args, &reply); err != nil {
|
||
|
return nil, errors.WithStack(err)
|
||
|
}
|
||
|
|
||
|
return reply.Buckets, nil
|
||
|
}
|
||
|
|
||
|
// OpenBucket implements storage.BlobStore.
|
||
|
func (s *BlobStore) OpenBucket(ctx context.Context, name string) (storage.BlobBucket, error) {
|
||
|
args := &blob.OpenBucketArgs{
|
||
|
BucketName: name,
|
||
|
}
|
||
|
reply := &blob.OpenBucketReply{}
|
||
|
|
||
|
if err := s.call(ctx, "Service.OpenBucket", args, reply); err != nil {
|
||
|
return nil, errors.WithStack(err)
|
||
|
}
|
||
|
|
||
|
return &BlobBucket{
|
||
|
name: name,
|
||
|
id: reply.BucketID,
|
||
|
call: s.call,
|
||
|
}, nil
|
||
|
}
|
||
|
|
||
|
func (s *BlobStore) call(ctx context.Context, serviceMethod string, args any, reply any) error {
|
||
|
err := s.withClient(ctx, func(ctx context.Context, client *rpc.Client) error {
|
||
|
if err := client.Call(ctx, serviceMethod, args, reply); err != nil {
|
||
|
return errors.WithStack(err)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
})
|
||
|
if err != nil {
|
||
|
return errors.WithStack(err)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (s *BlobStore) withClient(ctx context.Context, fn func(ctx context.Context, client *rpc.Client) error) error {
|
||
|
client, err := rpc.DialHTTPPath("tcp", s.serverURL.Host, s.serverURL.Path+"?"+s.serverURL.RawQuery)
|
||
|
if err != nil {
|
||
|
return errors.WithStack(err)
|
||
|
}
|
||
|
|
||
|
defer func() {
|
||
|
if err := client.Close(); err != nil {
|
||
|
logger.Error(ctx, "could not close rpc client", logger.E(errors.WithStack(err)))
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
if err := fn(ctx, client); err != nil {
|
||
|
return errors.WithStack(err)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func NewBlobStore(serverURL *url.URL) *BlobStore {
|
||
|
return &BlobStore{serverURL}
|
||
|
}
|
||
|
|
||
|
var _ storage.BlobStore = &BlobStore{}
|