feat(storage): rpc based implementation
arcad/edge/pipeline/pr-master This commit looks good
Details
arcad/edge/pipeline/pr-master This commit looks good
Details
This commit is contained in:
parent
c3535a4a9b
commit
c74c6c9548
1
go.mod
1
go.mod
|
@ -15,6 +15,7 @@ require (
|
|||
github.com/go-playground/universal-translator v0.16.0 // indirect
|
||||
github.com/goccy/go-json v0.9.11 // indirect
|
||||
github.com/gogo/protobuf v0.0.0-20161014173244-50d1bd39ce4e // indirect
|
||||
github.com/keegancsmith/rpc v1.3.0 // indirect
|
||||
github.com/leodido/go-urn v1.1.0 // indirect
|
||||
github.com/lestrrat-go/blackmagic v1.0.1 // indirect
|
||||
github.com/lestrrat-go/httpcc v1.0.1 // indirect
|
||||
|
|
2
go.sum
2
go.sum
|
@ -201,6 +201,8 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1
|
|||
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
|
||||
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
|
||||
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
|
||||
github.com/keegancsmith/rpc v1.3.0 h1:wGWOpjcNrZaY8GDYZJfvyxmlLljm3YQWF+p918DXtDk=
|
||||
github.com/keegancsmith/rpc v1.3.0/go.mod h1:6O2xnOGjPyvIPbvp0MdrOe5r6cu1GZ4JoTzpzDhWeo0=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
|
||||
|
|
|
@ -0,0 +1,241 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/keegancsmith/rpc"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/server/blob"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type BlobBucket struct {
|
||||
name string
|
||||
id blob.BucketID
|
||||
client *rpc.Client
|
||||
}
|
||||
|
||||
// Size implements storage.BlobBucket
|
||||
func (b *BlobBucket) Size(ctx context.Context) (int64, error) {
|
||||
args := blob.GetBucketSizeArgs{
|
||||
BucketID: b.id,
|
||||
}
|
||||
|
||||
reply := blob.GetBucketSizeReply{}
|
||||
|
||||
if err := b.client.Call(ctx, "Service.GetBucketSize", args, &reply); err != nil {
|
||||
return 0, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return reply.Size, nil
|
||||
}
|
||||
|
||||
// Name implements storage.BlobBucket
|
||||
func (b *BlobBucket) Name() string {
|
||||
return b.name
|
||||
}
|
||||
|
||||
// Close implements storage.BlobBucket
|
||||
func (b *BlobBucket) Close() error {
|
||||
args := blob.CloseBucketArgs{
|
||||
BucketID: b.id,
|
||||
}
|
||||
|
||||
reply := blob.CloseBucketReply{}
|
||||
|
||||
if err := b.client.Call(context.Background(), "Service.CloseBucket", args, &reply); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete implements storage.BlobBucket
|
||||
func (b *BlobBucket) Delete(ctx context.Context, id storage.BlobID) error {
|
||||
args := blob.DeleteBucketArgs{
|
||||
BucketName: b.name,
|
||||
}
|
||||
|
||||
reply := blob.DeleteBucketReply{}
|
||||
|
||||
if err := b.client.Call(context.Background(), "Service.DeleteBucket", args, &reply); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get implements storage.BlobBucket
|
||||
func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobInfo, error) {
|
||||
args := blob.GetBlobInfoArgs{
|
||||
Name: b.name,
|
||||
BlobID: id,
|
||||
}
|
||||
|
||||
reply := blob.GetBlobInfoReply{}
|
||||
|
||||
if err := b.client.Call(context.Background(), "Service.GetBlobInfo", args, &reply); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return reply.BlobInfo, nil
|
||||
}
|
||||
|
||||
// List implements storage.BlobBucket
|
||||
func (b *BlobBucket) List(ctx context.Context) ([]storage.BlobInfo, error) {
|
||||
args := blob.ListBlobInfoArgs{
|
||||
BucketID: b.id,
|
||||
}
|
||||
|
||||
reply := blob.ListBlobInfoReply{}
|
||||
|
||||
if err := b.client.Call(context.Background(), "Service.ListBlobInfo", args, &reply); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return reply.BlobInfos, nil
|
||||
}
|
||||
|
||||
// NewReader implements storage.BlobBucket
|
||||
func (b *BlobBucket) NewReader(ctx context.Context, id storage.BlobID) (io.ReadSeekCloser, error) {
|
||||
args := blob.NewBlobReaderArgs{
|
||||
BucketID: b.id,
|
||||
BlobID: id,
|
||||
}
|
||||
|
||||
reply := blob.NewBlobReaderReply{}
|
||||
|
||||
if err := b.client.Call(context.Background(), "Service.NewBlobReader", args, &reply); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return &blobReaderCloser{
|
||||
readerID: reply.ReaderID,
|
||||
client: b.client,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// NewWriter implements storage.BlobBucket
|
||||
func (b *BlobBucket) NewWriter(ctx context.Context, id storage.BlobID) (io.WriteCloser, error) {
|
||||
args := blob.NewBlobWriterArgs{
|
||||
BucketID: b.id,
|
||||
BlobID: id,
|
||||
}
|
||||
|
||||
reply := blob.NewBlobWriterReply{}
|
||||
|
||||
if err := b.client.Call(context.Background(), "Service.NewBlobWriter", args, &reply); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return &blobWriterCloser{
|
||||
blobID: id,
|
||||
writerID: reply.WriterID,
|
||||
client: b.client,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type blobWriterCloser struct {
|
||||
blobID storage.BlobID
|
||||
writerID blob.WriterID
|
||||
client *rpc.Client
|
||||
}
|
||||
|
||||
// Write implements io.WriteCloser
|
||||
func (bwc *blobWriterCloser) Write(data []byte) (int, error) {
|
||||
args := blob.WriteBlobArgs{
|
||||
WriterID: bwc.writerID,
|
||||
Data: data,
|
||||
}
|
||||
|
||||
reply := blob.WriteBlobReply{}
|
||||
|
||||
if err := bwc.client.Call(context.Background(), "Service.WriteBlob", args, &reply); err != nil {
|
||||
return 0, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return reply.Written, nil
|
||||
}
|
||||
|
||||
// Close implements io.WriteCloser
|
||||
func (bwc *blobWriterCloser) Close() error {
|
||||
args := blob.CloseWriterArgs{
|
||||
WriterID: bwc.writerID,
|
||||
}
|
||||
|
||||
reply := blob.CloseBucketReply{}
|
||||
|
||||
if err := bwc.client.Call(context.Background(), "Service.CloseWriter", args, &reply); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type blobReaderCloser struct {
|
||||
readerID blob.ReaderID
|
||||
client *rpc.Client
|
||||
}
|
||||
|
||||
// Read implements io.ReadSeekCloser
|
||||
func (brc *blobReaderCloser) Read(p []byte) (int, error) {
|
||||
args := blob.ReadBlobArgs{
|
||||
ReaderID: brc.readerID,
|
||||
Length: len(p),
|
||||
}
|
||||
|
||||
reply := blob.ReadBlobReply{}
|
||||
|
||||
if err := brc.client.Call(context.Background(), "Service.ReadBlob", args, &reply); err != nil {
|
||||
return 0, errors.WithStack(err)
|
||||
}
|
||||
|
||||
copy(p, reply.Data)
|
||||
|
||||
if reply.EOF {
|
||||
return reply.Read, io.EOF
|
||||
}
|
||||
|
||||
return reply.Read, nil
|
||||
}
|
||||
|
||||
// Seek implements io.ReadSeekCloser
|
||||
func (brc *blobReaderCloser) Seek(offset int64, whence int) (int64, error) {
|
||||
args := blob.SeekBlobArgs{
|
||||
ReaderID: brc.readerID,
|
||||
Offset: offset,
|
||||
Whence: whence,
|
||||
}
|
||||
|
||||
reply := blob.SeekBlobReply{}
|
||||
|
||||
if err := brc.client.Call(context.Background(), "Service.SeekBlob", args, &reply); err != nil {
|
||||
return 0, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return reply.Read, nil
|
||||
}
|
||||
|
||||
// Close implements io.ReadSeekCloser
|
||||
func (brc *blobReaderCloser) Close() error {
|
||||
args := blob.CloseReaderArgs{
|
||||
ReaderID: brc.readerID,
|
||||
}
|
||||
|
||||
reply := blob.CloseReaderReply{}
|
||||
|
||||
if err := brc.client.Call(context.Background(), "Service.CloseReader", args, &reply); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
var (
|
||||
_ storage.BlobBucket = &BlobBucket{}
|
||||
_ storage.BlobInfo = &BlobInfo{}
|
||||
_ io.WriteCloser = &blobWriterCloser{}
|
||||
_ io.ReadSeekCloser = &blobReaderCloser{}
|
||||
)
|
|
@ -0,0 +1,40 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
)
|
||||
|
||||
type BlobInfo struct {
|
||||
id storage.BlobID
|
||||
bucket string
|
||||
contentType string
|
||||
modTime time.Time
|
||||
size int64
|
||||
}
|
||||
|
||||
// Bucket implements storage.BlobInfo
|
||||
func (i *BlobInfo) Bucket() string {
|
||||
return i.bucket
|
||||
}
|
||||
|
||||
// ID implements storage.BlobInfo
|
||||
func (i *BlobInfo) ID() storage.BlobID {
|
||||
return i.id
|
||||
}
|
||||
|
||||
// ContentType implements storage.BlobInfo
|
||||
func (i *BlobInfo) ContentType() string {
|
||||
return i.contentType
|
||||
}
|
||||
|
||||
// ModTime implements storage.BlobInfo
|
||||
func (i *BlobInfo) ModTime() time.Time {
|
||||
return i.modTime
|
||||
}
|
||||
|
||||
// Size implements storage.BlobInfo
|
||||
func (i *BlobInfo) Size() int64 {
|
||||
return i.size
|
||||
}
|
|
@ -0,0 +1,65 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/keegancsmith/rpc"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/server/blob"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type BlobStore struct {
|
||||
client *rpc.Client
|
||||
}
|
||||
|
||||
// DeleteBucket implements storage.BlobStore.
|
||||
func (s *BlobStore) DeleteBucket(ctx context.Context, name string) error {
|
||||
args := &blob.DeleteBucketArgs{
|
||||
BucketName: name,
|
||||
}
|
||||
|
||||
if err := s.client.Call(ctx, "Service.DeleteBucket", args, nil); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListBuckets implements storage.BlobStore.
|
||||
func (s *BlobStore) ListBuckets(ctx context.Context) ([]string, error) {
|
||||
args := &blob.ListBucketsArgs{}
|
||||
|
||||
reply := blob.ListBucketsReply{}
|
||||
|
||||
if err := s.client.Call(ctx, "Service.ListBuckets", args, &reply); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return reply.Buckets, nil
|
||||
}
|
||||
|
||||
// OpenBucket implements storage.BlobStore.
|
||||
func (s *BlobStore) OpenBucket(ctx context.Context, name string) (storage.BlobBucket, error) {
|
||||
args := &blob.OpenBucketArgs{
|
||||
BucketName: name,
|
||||
}
|
||||
reply := &blob.OpenBucketReply{}
|
||||
|
||||
if err := s.client.Call(ctx, "Service.OpenBucket", args, reply); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return &BlobBucket{
|
||||
name: name,
|
||||
id: reply.BucketID,
|
||||
client: s.client,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewBlobStore(client *rpc.Client) *BlobStore {
|
||||
return &BlobStore{client}
|
||||
}
|
||||
|
||||
var _ storage.BlobStore = &BlobStore{}
|
|
@ -0,0 +1,99 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/keegancsmith/rpc"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/server"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/sqlite"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/testsuite"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
func TestBlobStore(t *testing.T) {
|
||||
t.Parallel()
|
||||
logger.SetLevel(logger.LevelDebug)
|
||||
|
||||
httpServer, err := startNewServer()
|
||||
if err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
defer httpServer.Close()
|
||||
|
||||
serverAddr := httpServer.Listener.Addr()
|
||||
|
||||
client, err := rpc.DialHTTPPath(
|
||||
serverAddr.Network(),
|
||||
serverAddr.String(),
|
||||
"",
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
store := NewBlobStore(client)
|
||||
|
||||
testsuite.TestBlobStore(t, store)
|
||||
}
|
||||
|
||||
func BenchmarkBlobStore(t *testing.B) {
|
||||
logger.SetLevel(logger.LevelError)
|
||||
|
||||
httpServer, err := startNewServer()
|
||||
if err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
defer httpServer.Close()
|
||||
|
||||
serverAddr := httpServer.Listener.Addr()
|
||||
|
||||
client, err := rpc.DialHTTPPath(
|
||||
serverAddr.Network(),
|
||||
serverAddr.String(),
|
||||
"",
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
store := NewBlobStore(client)
|
||||
|
||||
testsuite.BenchmarkBlobStore(t, store)
|
||||
|
||||
}
|
||||
|
||||
func getSQLiteBlobstore() (*sqlite.BlobStore, error) {
|
||||
file := "./testdata/blobstore_test.sqlite"
|
||||
|
||||
if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
|
||||
store := sqlite.NewBlobStore(dsn)
|
||||
|
||||
return store, nil
|
||||
}
|
||||
|
||||
func startNewServer() (*httptest.Server, error) {
|
||||
store, err := getSQLiteBlobstore()
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
server := server.NewBlobStoreServer(store)
|
||||
|
||||
httpServer := httptest.NewServer(server)
|
||||
|
||||
return httpServer, nil
|
||||
}
|
|
@ -0,0 +1,40 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/keegancsmith/rpc"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/filter"
|
||||
)
|
||||
|
||||
type DocumentStore struct {
|
||||
client *rpc.Client
|
||||
}
|
||||
|
||||
// Delete implements storage.DocumentStore.
|
||||
func (*DocumentStore) Delete(ctx context.Context, collection string, id storage.DocumentID) error {
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
// Get implements storage.DocumentStore.
|
||||
func (*DocumentStore) Get(ctx context.Context, collection string, id storage.DocumentID) (storage.Document, error) {
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
// Query implements storage.DocumentStore.
|
||||
func (*DocumentStore) Query(ctx context.Context, collection string, filter *filter.Filter, funcs ...storage.QueryOptionFunc) ([]storage.Document, error) {
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
// Upsert implements storage.DocumentStore.
|
||||
func (*DocumentStore) Upsert(ctx context.Context, collection string, document storage.Document) (storage.Document, error) {
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
func NewDocumentStore(client *rpc.Client) *DocumentStore {
|
||||
return &DocumentStore{client}
|
||||
}
|
||||
|
||||
var _ storage.DocumentStore = &DocumentStore{}
|
|
@ -0,0 +1 @@
|
|||
/*.sqlite*
|
|
@ -0,0 +1,31 @@
|
|||
package blob
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type CloseBucketArgs struct {
|
||||
BucketID BucketID
|
||||
}
|
||||
|
||||
type CloseBucketReply struct {
|
||||
}
|
||||
|
||||
func (s *Service) CloseBucket(ctx context.Context, args *CloseBucketArgs, reply *CloseBucketReply) error {
|
||||
bucket, err := s.getOpenedBucket(args.BucketID)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := bucket.Close(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
s.buckets.Delete(args.BucketID)
|
||||
|
||||
*reply = CloseBucketReply{}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
package blob
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type CloseReaderArgs struct {
|
||||
ReaderID ReaderID
|
||||
}
|
||||
|
||||
type CloseReaderReply struct {
|
||||
}
|
||||
|
||||
func (s *Service) CloseReader(ctx context.Context, args *CloseReaderArgs, reply *CloseReaderReply) error {
|
||||
reader, err := s.getOpenedReader(args.ReaderID)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := reader.Close(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
s.readers.Delete(args.ReaderID)
|
||||
|
||||
*reply = CloseReaderReply{}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
package blob
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type CloseWriterArgs struct {
|
||||
WriterID WriterID
|
||||
}
|
||||
|
||||
type CloseWriterReply struct {
|
||||
}
|
||||
|
||||
func (s *Service) CloseWriter(ctx context.Context, args *CloseWriterArgs, reply *CloseWriterReply) error {
|
||||
writer, err := s.getOpenedWriter(args.WriterID)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := writer.Close(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
s.writers.Delete(args.WriterID)
|
||||
|
||||
*reply = CloseWriterReply{}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
package blob
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type DeleteBucketArgs struct {
|
||||
BucketName string
|
||||
}
|
||||
|
||||
type DeleteBucketReply struct {
|
||||
}
|
||||
|
||||
func (s *Service) DeleteBucket(ctx context.Context, args *DeleteBucketArgs, reply *DeleteBucketReply) error {
|
||||
if err := s.store.DeleteBucket(ctx, args.BucketName); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
package blob
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type GetBlobInfoArgs struct {
|
||||
Name string
|
||||
BlobID storage.BlobID
|
||||
BucketID BucketID
|
||||
}
|
||||
|
||||
type GetBlobInfoReply struct {
|
||||
BlobInfo storage.BlobInfo
|
||||
}
|
||||
|
||||
func (s *Service) GetBlobInfo(ctx context.Context, args *GetBlobInfoArgs, reply *GetBlobInfoReply) error {
|
||||
bucket, err := s.getOpenedBucket(args.BucketID)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
blobInfo, err := bucket.Get(ctx, args.BlobID)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
*reply = GetBlobInfoReply{
|
||||
BlobInfo: blobInfo,
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
package blob
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type GetBucketSizeArgs struct {
|
||||
BucketID BucketID
|
||||
}
|
||||
|
||||
type GetBucketSizeReply struct {
|
||||
Size int64
|
||||
}
|
||||
|
||||
func (s *Service) GetBucketSize(ctx context.Context, args *GetBucketSizeArgs, reply *GetBucketSizeReply) error {
|
||||
bucket, err := s.getOpenedBucket(args.BucketID)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
size, err := bucket.Size(ctx)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
*reply = GetBucketSizeReply{
|
||||
Size: size,
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
package blob
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type ListBlobInfoArgs struct {
|
||||
BucketID BucketID
|
||||
}
|
||||
|
||||
type ListBlobInfoReply struct {
|
||||
BlobInfos []storage.BlobInfo
|
||||
}
|
||||
|
||||
func (s *Service) ListBlobInfo(ctx context.Context, args *ListBlobInfoArgs, reply *ListBlobInfoReply) error {
|
||||
bucket, err := s.getOpenedBucket(args.BucketID)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
blobInfos, err := bucket.List(ctx)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
*reply = ListBlobInfoReply{
|
||||
BlobInfos: blobInfos,
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,27 @@
|
|||
package blob
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type ListBucketsArgs struct {
|
||||
}
|
||||
|
||||
type ListBucketsReply struct {
|
||||
Buckets []string
|
||||
}
|
||||
|
||||
func (s *Service) ListBuckets(ctx context.Context, args *ListBucketsArgs, reply *ListBucketsReply) error {
|
||||
buckets, err := s.store.ListBuckets(ctx)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
*reply = ListBucketsReply{
|
||||
Buckets: buckets,
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
package blob
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type NewBlobReaderArgs struct {
|
||||
BlobID storage.BlobID
|
||||
BucketID BucketID
|
||||
}
|
||||
|
||||
type NewBlobReaderReply struct {
|
||||
ReaderID ReaderID
|
||||
}
|
||||
|
||||
func (s *Service) NewBlobReader(ctx context.Context, args *NewBlobReaderArgs, reply *NewBlobReaderReply) error {
|
||||
bucket, err := s.getOpenedBucket(args.BucketID)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
readerID, err := NewReaderID()
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
reader, err := bucket.NewReader(ctx, args.BlobID)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
s.readers.Store(readerID, reader)
|
||||
|
||||
*reply = NewBlobReaderReply{
|
||||
ReaderID: readerID,
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) getOpenedReader(id ReaderID) (io.ReadSeekCloser, error) {
|
||||
raw, exists := s.readers.Load(id)
|
||||
if !exists {
|
||||
return nil, errors.Errorf("could not find writer '%s'", id)
|
||||
}
|
||||
|
||||
reader, ok := raw.(io.ReadSeekCloser)
|
||||
if !ok {
|
||||
return nil, errors.Errorf("unexpected type '%T' for writer", raw)
|
||||
}
|
||||
|
||||
return reader, nil
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
package blob
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type NewBlobWriterArgs struct {
|
||||
BlobID storage.BlobID
|
||||
BucketID BucketID
|
||||
}
|
||||
|
||||
type NewBlobWriterReply struct {
|
||||
WriterID WriterID
|
||||
}
|
||||
|
||||
func (s *Service) NewBlobWriter(ctx context.Context, args *NewBlobWriterArgs, reply *NewBlobWriterReply) error {
|
||||
bucket, err := s.getOpenedBucket(args.BucketID)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
writerID, err := NewWriterID()
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
writer, err := bucket.NewWriter(ctx, args.BlobID)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
s.writers.Store(writerID, writer)
|
||||
|
||||
*reply = NewBlobWriterReply{
|
||||
WriterID: writerID,
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) getOpenedWriter(id WriterID) (io.WriteCloser, error) {
|
||||
raw, exists := s.writers.Load(id)
|
||||
if !exists {
|
||||
return nil, errors.Errorf("could not find writer '%s'", id)
|
||||
}
|
||||
|
||||
writer, ok := raw.(io.WriteCloser)
|
||||
if !ok {
|
||||
return nil, errors.Errorf("unexpected type '%T' for writer", raw)
|
||||
}
|
||||
|
||||
return writer, nil
|
||||
}
|
|
@ -0,0 +1,50 @@
|
|||
package blob
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type OpenBucketArgs struct {
|
||||
BucketName string
|
||||
}
|
||||
|
||||
type OpenBucketReply struct {
|
||||
BucketID BucketID
|
||||
}
|
||||
|
||||
func (s *Service) OpenBucket(ctx context.Context, args *OpenBucketArgs, reply *OpenBucketReply) error {
|
||||
bucket, err := s.store.OpenBucket(ctx, args.BucketName)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
bucketID, err := NewBucketID()
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
s.buckets.Store(bucketID, bucket)
|
||||
|
||||
*reply = OpenBucketReply{
|
||||
BucketID: bucketID,
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) getOpenedBucket(id BucketID) (storage.BlobBucket, error) {
|
||||
raw, exists := s.buckets.Load(id)
|
||||
if !exists {
|
||||
return nil, errors.WithStack(storage.ErrBucketClosed)
|
||||
}
|
||||
|
||||
bucket, ok := raw.(storage.BlobBucket)
|
||||
if !ok {
|
||||
return nil, errors.Errorf("unexpected type '%T' for blob bucket", raw)
|
||||
}
|
||||
|
||||
return bucket, nil
|
||||
}
|
|
@ -0,0 +1,41 @@
|
|||
package blob
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type ReadBlobArgs struct {
|
||||
ReaderID ReaderID
|
||||
Length int
|
||||
}
|
||||
|
||||
type ReadBlobReply struct {
|
||||
Data []byte
|
||||
Read int
|
||||
EOF bool
|
||||
}
|
||||
|
||||
func (s *Service) ReadBlob(ctx context.Context, args *ReadBlobArgs, reply *ReadBlobReply) error {
|
||||
reader, err := s.getOpenedReader(args.ReaderID)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
buff := make([]byte, args.Length)
|
||||
|
||||
read, err := reader.Read(buff)
|
||||
if err != nil && !errors.Is(err, io.EOF) {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
*reply = ReadBlobReply{
|
||||
Read: read,
|
||||
Data: buff,
|
||||
EOF: errors.Is(err, io.EOF),
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
package blob
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type SeekBlobArgs struct {
|
||||
ReaderID ReaderID
|
||||
Offset int64
|
||||
Whence int
|
||||
}
|
||||
|
||||
type SeekBlobReply struct {
|
||||
Read int64
|
||||
EOF bool
|
||||
}
|
||||
|
||||
func (s *Service) SeekBlob(ctx context.Context, args *SeekBlobArgs, reply *SeekBlobReply) error {
|
||||
reader, err := s.getOpenedReader(args.ReaderID)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
read, err := reader.Seek(args.Offset, args.Whence)
|
||||
if err != nil && !errors.Is(err, io.EOF) {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
*reply = SeekBlobReply{
|
||||
Read: read,
|
||||
EOF: errors.Is(err, io.EOF),
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,60 @@
|
|||
package blob
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"github.com/google/uuid"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type BucketID string
|
||||
type WriterID string
|
||||
type ReaderID string
|
||||
|
||||
type Service struct {
|
||||
store storage.BlobStore
|
||||
buckets sync.Map
|
||||
writers sync.Map
|
||||
readers sync.Map
|
||||
}
|
||||
|
||||
func NewService(store storage.BlobStore) *Service {
|
||||
return &Service{
|
||||
store: store,
|
||||
}
|
||||
}
|
||||
|
||||
func NewBucketID() (BucketID, error) {
|
||||
uuid, err := uuid.NewUUID()
|
||||
if err != nil {
|
||||
return "", errors.WithStack(err)
|
||||
}
|
||||
|
||||
id := BucketID(fmt.Sprintf("bucket-%s", uuid.String()))
|
||||
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func NewWriterID() (WriterID, error) {
|
||||
uuid, err := uuid.NewUUID()
|
||||
if err != nil {
|
||||
return "", errors.WithStack(err)
|
||||
}
|
||||
|
||||
id := WriterID(fmt.Sprintf("writer-%s", uuid.String()))
|
||||
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func NewReaderID() (ReaderID, error) {
|
||||
uuid, err := uuid.NewUUID()
|
||||
if err != nil {
|
||||
return "", errors.WithStack(err)
|
||||
}
|
||||
|
||||
id := ReaderID(fmt.Sprintf("reader-%s", uuid.String()))
|
||||
|
||||
return id, nil
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
package blob
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type WriteBlobArgs struct {
|
||||
WriterID WriterID
|
||||
Data []byte
|
||||
}
|
||||
|
||||
type WriteBlobReply struct {
|
||||
Written int
|
||||
}
|
||||
|
||||
func (s *Service) WriteBlob(ctx context.Context, args *WriteBlobArgs, reply *WriteBlobReply) error {
|
||||
writer, err := s.getOpenedWriter(args.WriterID)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
written, err := writer.Write(args.Data)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
*reply = WriteBlobReply{
|
||||
Written: written,
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,11 @@
|
|||
package server
|
||||
|
||||
import "forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
|
||||
type DocumentStore struct {
|
||||
store storage.DocumentStore
|
||||
}
|
||||
|
||||
func NewDocumentStore(store storage.DocumentStore) *DocumentStore {
|
||||
return &DocumentStore{store}
|
||||
}
|
|
@ -0,0 +1,20 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"github.com/keegancsmith/rpc"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/server/blob"
|
||||
)
|
||||
|
||||
func NewBlobStoreServer(store storage.BlobStore) *rpc.Server {
|
||||
server := rpc.NewServer()
|
||||
server.Register(blob.NewService(store))
|
||||
return server
|
||||
}
|
||||
|
||||
func NewDocumentStoreServer(store storage.DocumentStore) *rpc.Server {
|
||||
server := rpc.NewServer()
|
||||
server.Register(NewDocumentStore(store))
|
||||
return server
|
||||
}
|
|
@ -26,3 +26,18 @@ func TestBlobStore(t *testing.T) {
|
|||
|
||||
testsuite.TestBlobStore(t, store)
|
||||
}
|
||||
|
||||
func BenchmarkBlobStore(t *testing.B) {
|
||||
logger.SetLevel(logger.LevelError)
|
||||
|
||||
file := "./testdata/blobstore_test.sqlite"
|
||||
|
||||
if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
|
||||
store := NewBlobStore(dsn)
|
||||
|
||||
testsuite.BenchmarkBlobStore(t, store)
|
||||
}
|
||||
|
|
|
@ -8,7 +8,6 @@ import (
|
|||
|
||||
func TestBlobStore(t *testing.T, store storage.BlobStore) {
|
||||
t.Run("Ops", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
testBlobStoreOps(t, store)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -0,0 +1,79 @@
|
|||
package testsuite
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func BenchmarkBlobStore(t *testing.B, store storage.BlobStore) {
|
||||
t.Run("BlobCreateUpdateReadDelete", func(t *testing.B) {
|
||||
|
||||
for i := 0; i < t.N; i++ {
|
||||
bucketName := fmt.Sprintf("bucket-%d", i)
|
||||
if err := runBlobCreateUpdateReadDelete(store, bucketName); err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func runBlobCreateUpdateReadDelete(store storage.BlobStore, bucketName string) error {
|
||||
ctx := context.Background()
|
||||
|
||||
bucket, err := store.OpenBucket(ctx, bucketName)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
blobID := storage.NewBlobID()
|
||||
|
||||
writer, err := bucket.NewWriter(ctx, blobID)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
data := []byte("foo")
|
||||
|
||||
if _, err = writer.Write(data); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := writer.Close(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
reader, err := bucket.NewReader(ctx, blobID)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
|
||||
if _, err = io.Copy(&buf, reader); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := reader.Close(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := bucket.Delete(ctx, blobID); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := bucket.Close(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := store.DeleteBucket(ctx, bucketName); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -17,9 +17,11 @@ type blobStoreTestCase struct {
|
|||
|
||||
var blobStoreTestCases = []blobStoreTestCase{
|
||||
{
|
||||
Name: "Open new bucket",
|
||||
Name: "Open then delete bucket",
|
||||
Run: func(ctx context.Context, store storage.BlobStore) error {
|
||||
bucket, err := store.OpenBucket(ctx, "open-new-bucket")
|
||||
bucketName := "open-new-bucket"
|
||||
|
||||
bucket, err := store.OpenBucket(ctx, bucketName)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
@ -50,6 +52,23 @@ var blobStoreTestCases = []blobStoreTestCase{
|
|||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := store.DeleteBucket(ctx, bucketName); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
buckets, err := store.ListBuckets(ctx)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
for _, b := range buckets {
|
||||
if b != bucketName {
|
||||
continue
|
||||
}
|
||||
|
||||
return errors.Errorf("bucket '%s' should be deleted", bucketName)
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
},
|
||||
|
|
Loading…
Reference in New Issue