From c74c6c954819e56e6b98f4b303ca63a7baf61737 Mon Sep 17 00:00:00 2001 From: William Petit Date: Tue, 12 Sep 2023 22:03:25 -0600 Subject: [PATCH] feat(storage): rpc based implementation --- go.mod | 1 + go.sum | 2 + pkg/storage/rpc/client/blob_bucket.go | 241 ++++++++++++++++++ pkg/storage/rpc/client/blob_info.go | 40 +++ pkg/storage/rpc/client/blob_store.go | 65 +++++ pkg/storage/rpc/client/blob_store_test.go | 99 +++++++ pkg/storage/rpc/client/document_store.go | 40 +++ pkg/storage/rpc/client/testdata/.gitignore | 1 + pkg/storage/rpc/server/blob/close_bucket.go | 31 +++ pkg/storage/rpc/server/blob/close_reader.go | 31 +++ pkg/storage/rpc/server/blob/close_writer.go | 31 +++ pkg/storage/rpc/server/blob/delete_bucket.go | 22 ++ pkg/storage/rpc/server/blob/get_blob_info.go | 36 +++ .../rpc/server/blob/get_bucket_size.go | 33 +++ pkg/storage/rpc/server/blob/list_blob_info.go | 34 +++ pkg/storage/rpc/server/blob/list_buckets.go | 27 ++ .../rpc/server/blob/new_blob_reader.go | 57 +++++ .../rpc/server/blob/new_blob_writer.go | 57 +++++ pkg/storage/rpc/server/blob/open_bucket.go | 50 ++++ pkg/storage/rpc/server/blob/read_blob.go | 41 +++ pkg/storage/rpc/server/blob/seek_blob.go | 38 +++ pkg/storage/rpc/server/blob/service.go | 60 +++++ pkg/storage/rpc/server/blob/write_blob.go | 34 +++ pkg/storage/rpc/server/document_store.go | 11 + pkg/storage/rpc/server/server.go | 20 ++ pkg/storage/sqlite/blob_store_test.go | 15 ++ pkg/storage/testsuite/blob_store.go | 1 - pkg/storage/testsuite/blob_store_benchmark.go | 79 ++++++ pkg/storage/testsuite/blob_store_ops.go | 23 +- 29 files changed, 1217 insertions(+), 3 deletions(-) create mode 100644 pkg/storage/rpc/client/blob_bucket.go create mode 100644 pkg/storage/rpc/client/blob_info.go create mode 100644 pkg/storage/rpc/client/blob_store.go create mode 100644 pkg/storage/rpc/client/blob_store_test.go create mode 100644 pkg/storage/rpc/client/document_store.go create mode 100644 pkg/storage/rpc/client/testdata/.gitignore create mode 100644 pkg/storage/rpc/server/blob/close_bucket.go create mode 100644 pkg/storage/rpc/server/blob/close_reader.go create mode 100644 pkg/storage/rpc/server/blob/close_writer.go create mode 100644 pkg/storage/rpc/server/blob/delete_bucket.go create mode 100644 pkg/storage/rpc/server/blob/get_blob_info.go create mode 100644 pkg/storage/rpc/server/blob/get_bucket_size.go create mode 100644 pkg/storage/rpc/server/blob/list_blob_info.go create mode 100644 pkg/storage/rpc/server/blob/list_buckets.go create mode 100644 pkg/storage/rpc/server/blob/new_blob_reader.go create mode 100644 pkg/storage/rpc/server/blob/new_blob_writer.go create mode 100644 pkg/storage/rpc/server/blob/open_bucket.go create mode 100644 pkg/storage/rpc/server/blob/read_blob.go create mode 100644 pkg/storage/rpc/server/blob/seek_blob.go create mode 100644 pkg/storage/rpc/server/blob/service.go create mode 100644 pkg/storage/rpc/server/blob/write_blob.go create mode 100644 pkg/storage/rpc/server/document_store.go create mode 100644 pkg/storage/rpc/server/server.go create mode 100644 pkg/storage/testsuite/blob_store_benchmark.go diff --git a/go.mod b/go.mod index 19c0759..220dab5 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/go-playground/universal-translator v0.16.0 // indirect github.com/goccy/go-json v0.9.11 // indirect github.com/gogo/protobuf v0.0.0-20161014173244-50d1bd39ce4e // indirect + github.com/keegancsmith/rpc v1.3.0 // indirect github.com/leodido/go-urn v1.1.0 // indirect github.com/lestrrat-go/blackmagic v1.0.1 // indirect github.com/lestrrat-go/httpcc v1.0.1 // indirect diff --git a/go.sum b/go.sum index e1a0564..d39b980 100644 --- a/go.sum +++ b/go.sum @@ -201,6 +201,8 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1 github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= +github.com/keegancsmith/rpc v1.3.0 h1:wGWOpjcNrZaY8GDYZJfvyxmlLljm3YQWF+p918DXtDk= +github.com/keegancsmith/rpc v1.3.0/go.mod h1:6O2xnOGjPyvIPbvp0MdrOe5r6cu1GZ4JoTzpzDhWeo0= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= diff --git a/pkg/storage/rpc/client/blob_bucket.go b/pkg/storage/rpc/client/blob_bucket.go new file mode 100644 index 0000000..f7c8bf4 --- /dev/null +++ b/pkg/storage/rpc/client/blob_bucket.go @@ -0,0 +1,241 @@ +package client + +import ( + "context" + "io" + + "github.com/keegancsmith/rpc" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "forge.cadoles.com/arcad/edge/pkg/storage/rpc/server/blob" + "github.com/pkg/errors" +) + +type BlobBucket struct { + name string + id blob.BucketID + client *rpc.Client +} + +// Size implements storage.BlobBucket +func (b *BlobBucket) Size(ctx context.Context) (int64, error) { + args := blob.GetBucketSizeArgs{ + BucketID: b.id, + } + + reply := blob.GetBucketSizeReply{} + + if err := b.client.Call(ctx, "Service.GetBucketSize", args, &reply); err != nil { + return 0, errors.WithStack(err) + } + + return reply.Size, nil +} + +// Name implements storage.BlobBucket +func (b *BlobBucket) Name() string { + return b.name +} + +// Close implements storage.BlobBucket +func (b *BlobBucket) Close() error { + args := blob.CloseBucketArgs{ + BucketID: b.id, + } + + reply := blob.CloseBucketReply{} + + if err := b.client.Call(context.Background(), "Service.CloseBucket", args, &reply); err != nil { + return errors.WithStack(err) + } + + return nil +} + +// Delete implements storage.BlobBucket +func (b *BlobBucket) Delete(ctx context.Context, id storage.BlobID) error { + args := blob.DeleteBucketArgs{ + BucketName: b.name, + } + + reply := blob.DeleteBucketReply{} + + if err := b.client.Call(context.Background(), "Service.DeleteBucket", args, &reply); err != nil { + return errors.WithStack(err) + } + + return nil +} + +// Get implements storage.BlobBucket +func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobInfo, error) { + args := blob.GetBlobInfoArgs{ + Name: b.name, + BlobID: id, + } + + reply := blob.GetBlobInfoReply{} + + if err := b.client.Call(context.Background(), "Service.GetBlobInfo", args, &reply); err != nil { + return nil, errors.WithStack(err) + } + + return reply.BlobInfo, nil +} + +// List implements storage.BlobBucket +func (b *BlobBucket) List(ctx context.Context) ([]storage.BlobInfo, error) { + args := blob.ListBlobInfoArgs{ + BucketID: b.id, + } + + reply := blob.ListBlobInfoReply{} + + if err := b.client.Call(context.Background(), "Service.ListBlobInfo", args, &reply); err != nil { + return nil, errors.WithStack(err) + } + + return reply.BlobInfos, nil +} + +// NewReader implements storage.BlobBucket +func (b *BlobBucket) NewReader(ctx context.Context, id storage.BlobID) (io.ReadSeekCloser, error) { + args := blob.NewBlobReaderArgs{ + BucketID: b.id, + BlobID: id, + } + + reply := blob.NewBlobReaderReply{} + + if err := b.client.Call(context.Background(), "Service.NewBlobReader", args, &reply); err != nil { + return nil, errors.WithStack(err) + } + + return &blobReaderCloser{ + readerID: reply.ReaderID, + client: b.client, + }, nil +} + +// NewWriter implements storage.BlobBucket +func (b *BlobBucket) NewWriter(ctx context.Context, id storage.BlobID) (io.WriteCloser, error) { + args := blob.NewBlobWriterArgs{ + BucketID: b.id, + BlobID: id, + } + + reply := blob.NewBlobWriterReply{} + + if err := b.client.Call(context.Background(), "Service.NewBlobWriter", args, &reply); err != nil { + return nil, errors.WithStack(err) + } + + return &blobWriterCloser{ + blobID: id, + writerID: reply.WriterID, + client: b.client, + }, nil +} + +type blobWriterCloser struct { + blobID storage.BlobID + writerID blob.WriterID + client *rpc.Client +} + +// Write implements io.WriteCloser +func (bwc *blobWriterCloser) Write(data []byte) (int, error) { + args := blob.WriteBlobArgs{ + WriterID: bwc.writerID, + Data: data, + } + + reply := blob.WriteBlobReply{} + + if err := bwc.client.Call(context.Background(), "Service.WriteBlob", args, &reply); err != nil { + return 0, errors.WithStack(err) + } + + return reply.Written, nil +} + +// Close implements io.WriteCloser +func (bwc *blobWriterCloser) Close() error { + args := blob.CloseWriterArgs{ + WriterID: bwc.writerID, + } + + reply := blob.CloseBucketReply{} + + if err := bwc.client.Call(context.Background(), "Service.CloseWriter", args, &reply); err != nil { + return errors.WithStack(err) + } + + return nil +} + +type blobReaderCloser struct { + readerID blob.ReaderID + client *rpc.Client +} + +// Read implements io.ReadSeekCloser +func (brc *blobReaderCloser) Read(p []byte) (int, error) { + args := blob.ReadBlobArgs{ + ReaderID: brc.readerID, + Length: len(p), + } + + reply := blob.ReadBlobReply{} + + if err := brc.client.Call(context.Background(), "Service.ReadBlob", args, &reply); err != nil { + return 0, errors.WithStack(err) + } + + copy(p, reply.Data) + + if reply.EOF { + return reply.Read, io.EOF + } + + return reply.Read, nil +} + +// Seek implements io.ReadSeekCloser +func (brc *blobReaderCloser) Seek(offset int64, whence int) (int64, error) { + args := blob.SeekBlobArgs{ + ReaderID: brc.readerID, + Offset: offset, + Whence: whence, + } + + reply := blob.SeekBlobReply{} + + if err := brc.client.Call(context.Background(), "Service.SeekBlob", args, &reply); err != nil { + return 0, errors.WithStack(err) + } + + return reply.Read, nil +} + +// Close implements io.ReadSeekCloser +func (brc *blobReaderCloser) Close() error { + args := blob.CloseReaderArgs{ + ReaderID: brc.readerID, + } + + reply := blob.CloseReaderReply{} + + if err := brc.client.Call(context.Background(), "Service.CloseReader", args, &reply); err != nil { + return errors.WithStack(err) + } + + return nil +} + +var ( + _ storage.BlobBucket = &BlobBucket{} + _ storage.BlobInfo = &BlobInfo{} + _ io.WriteCloser = &blobWriterCloser{} + _ io.ReadSeekCloser = &blobReaderCloser{} +) diff --git a/pkg/storage/rpc/client/blob_info.go b/pkg/storage/rpc/client/blob_info.go new file mode 100644 index 0000000..5cc9b24 --- /dev/null +++ b/pkg/storage/rpc/client/blob_info.go @@ -0,0 +1,40 @@ +package client + +import ( + "time" + + "forge.cadoles.com/arcad/edge/pkg/storage" +) + +type BlobInfo struct { + id storage.BlobID + bucket string + contentType string + modTime time.Time + size int64 +} + +// Bucket implements storage.BlobInfo +func (i *BlobInfo) Bucket() string { + return i.bucket +} + +// ID implements storage.BlobInfo +func (i *BlobInfo) ID() storage.BlobID { + return i.id +} + +// ContentType implements storage.BlobInfo +func (i *BlobInfo) ContentType() string { + return i.contentType +} + +// ModTime implements storage.BlobInfo +func (i *BlobInfo) ModTime() time.Time { + return i.modTime +} + +// Size implements storage.BlobInfo +func (i *BlobInfo) Size() int64 { + return i.size +} diff --git a/pkg/storage/rpc/client/blob_store.go b/pkg/storage/rpc/client/blob_store.go new file mode 100644 index 0000000..ddea622 --- /dev/null +++ b/pkg/storage/rpc/client/blob_store.go @@ -0,0 +1,65 @@ +package client + +import ( + "context" + + "github.com/keegancsmith/rpc" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "forge.cadoles.com/arcad/edge/pkg/storage/rpc/server/blob" + "github.com/pkg/errors" +) + +type BlobStore struct { + client *rpc.Client +} + +// DeleteBucket implements storage.BlobStore. +func (s *BlobStore) DeleteBucket(ctx context.Context, name string) error { + args := &blob.DeleteBucketArgs{ + BucketName: name, + } + + if err := s.client.Call(ctx, "Service.DeleteBucket", args, nil); err != nil { + return errors.WithStack(err) + } + + return nil +} + +// ListBuckets implements storage.BlobStore. +func (s *BlobStore) ListBuckets(ctx context.Context) ([]string, error) { + args := &blob.ListBucketsArgs{} + + reply := blob.ListBucketsReply{} + + if err := s.client.Call(ctx, "Service.ListBuckets", args, &reply); err != nil { + return nil, errors.WithStack(err) + } + + return reply.Buckets, nil +} + +// OpenBucket implements storage.BlobStore. +func (s *BlobStore) OpenBucket(ctx context.Context, name string) (storage.BlobBucket, error) { + args := &blob.OpenBucketArgs{ + BucketName: name, + } + reply := &blob.OpenBucketReply{} + + if err := s.client.Call(ctx, "Service.OpenBucket", args, reply); err != nil { + return nil, errors.WithStack(err) + } + + return &BlobBucket{ + name: name, + id: reply.BucketID, + client: s.client, + }, nil +} + +func NewBlobStore(client *rpc.Client) *BlobStore { + return &BlobStore{client} +} + +var _ storage.BlobStore = &BlobStore{} diff --git a/pkg/storage/rpc/client/blob_store_test.go b/pkg/storage/rpc/client/blob_store_test.go new file mode 100644 index 0000000..a4f199e --- /dev/null +++ b/pkg/storage/rpc/client/blob_store_test.go @@ -0,0 +1,99 @@ +package client + +import ( + "fmt" + "net/http/httptest" + "os" + "testing" + "time" + + "github.com/keegancsmith/rpc" + + "forge.cadoles.com/arcad/edge/pkg/storage/rpc/server" + "forge.cadoles.com/arcad/edge/pkg/storage/sqlite" + "forge.cadoles.com/arcad/edge/pkg/storage/testsuite" + "github.com/pkg/errors" + "gitlab.com/wpetit/goweb/logger" +) + +func TestBlobStore(t *testing.T) { + t.Parallel() + logger.SetLevel(logger.LevelDebug) + + httpServer, err := startNewServer() + if err != nil { + t.Fatalf("%+v", errors.WithStack(err)) + } + + defer httpServer.Close() + + serverAddr := httpServer.Listener.Addr() + + client, err := rpc.DialHTTPPath( + serverAddr.Network(), + serverAddr.String(), + "", + ) + if err != nil { + t.Fatalf("%+v", errors.WithStack(err)) + } + defer client.Close() + + store := NewBlobStore(client) + + testsuite.TestBlobStore(t, store) +} + +func BenchmarkBlobStore(t *testing.B) { + logger.SetLevel(logger.LevelError) + + httpServer, err := startNewServer() + if err != nil { + t.Fatalf("%+v", errors.WithStack(err)) + } + + defer httpServer.Close() + + serverAddr := httpServer.Listener.Addr() + + client, err := rpc.DialHTTPPath( + serverAddr.Network(), + serverAddr.String(), + "", + ) + if err != nil { + t.Fatalf("%+v", errors.WithStack(err)) + } + defer client.Close() + + store := NewBlobStore(client) + + testsuite.BenchmarkBlobStore(t, store) + +} + +func getSQLiteBlobstore() (*sqlite.BlobStore, error) { + file := "./testdata/blobstore_test.sqlite" + + if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) { + return nil, errors.WithStack(err) + } + + dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds()) + store := sqlite.NewBlobStore(dsn) + + return store, nil +} + +func startNewServer() (*httptest.Server, error) { + store, err := getSQLiteBlobstore() + if err != nil { + return nil, errors.WithStack(err) + } + + server := server.NewBlobStoreServer(store) + + httpServer := httptest.NewServer(server) + + return httpServer, nil +} diff --git a/pkg/storage/rpc/client/document_store.go b/pkg/storage/rpc/client/document_store.go new file mode 100644 index 0000000..dec9e63 --- /dev/null +++ b/pkg/storage/rpc/client/document_store.go @@ -0,0 +1,40 @@ +package client + +import ( + "context" + + "github.com/keegancsmith/rpc" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "forge.cadoles.com/arcad/edge/pkg/storage/filter" +) + +type DocumentStore struct { + client *rpc.Client +} + +// Delete implements storage.DocumentStore. +func (*DocumentStore) Delete(ctx context.Context, collection string, id storage.DocumentID) error { + panic("unimplemented") +} + +// Get implements storage.DocumentStore. +func (*DocumentStore) Get(ctx context.Context, collection string, id storage.DocumentID) (storage.Document, error) { + panic("unimplemented") +} + +// Query implements storage.DocumentStore. +func (*DocumentStore) Query(ctx context.Context, collection string, filter *filter.Filter, funcs ...storage.QueryOptionFunc) ([]storage.Document, error) { + panic("unimplemented") +} + +// Upsert implements storage.DocumentStore. +func (*DocumentStore) Upsert(ctx context.Context, collection string, document storage.Document) (storage.Document, error) { + panic("unimplemented") +} + +func NewDocumentStore(client *rpc.Client) *DocumentStore { + return &DocumentStore{client} +} + +var _ storage.DocumentStore = &DocumentStore{} diff --git a/pkg/storage/rpc/client/testdata/.gitignore b/pkg/storage/rpc/client/testdata/.gitignore new file mode 100644 index 0000000..2a2bc40 --- /dev/null +++ b/pkg/storage/rpc/client/testdata/.gitignore @@ -0,0 +1 @@ +/*.sqlite* \ No newline at end of file diff --git a/pkg/storage/rpc/server/blob/close_bucket.go b/pkg/storage/rpc/server/blob/close_bucket.go new file mode 100644 index 0000000..64c403e --- /dev/null +++ b/pkg/storage/rpc/server/blob/close_bucket.go @@ -0,0 +1,31 @@ +package blob + +import ( + "context" + + "github.com/pkg/errors" +) + +type CloseBucketArgs struct { + BucketID BucketID +} + +type CloseBucketReply struct { +} + +func (s *Service) CloseBucket(ctx context.Context, args *CloseBucketArgs, reply *CloseBucketReply) error { + bucket, err := s.getOpenedBucket(args.BucketID) + if err != nil { + return errors.WithStack(err) + } + + if err := bucket.Close(); err != nil { + return errors.WithStack(err) + } + + s.buckets.Delete(args.BucketID) + + *reply = CloseBucketReply{} + + return nil +} diff --git a/pkg/storage/rpc/server/blob/close_reader.go b/pkg/storage/rpc/server/blob/close_reader.go new file mode 100644 index 0000000..8f98134 --- /dev/null +++ b/pkg/storage/rpc/server/blob/close_reader.go @@ -0,0 +1,31 @@ +package blob + +import ( + "context" + + "github.com/pkg/errors" +) + +type CloseReaderArgs struct { + ReaderID ReaderID +} + +type CloseReaderReply struct { +} + +func (s *Service) CloseReader(ctx context.Context, args *CloseReaderArgs, reply *CloseReaderReply) error { + reader, err := s.getOpenedReader(args.ReaderID) + if err != nil { + return errors.WithStack(err) + } + + if err := reader.Close(); err != nil { + return errors.WithStack(err) + } + + s.readers.Delete(args.ReaderID) + + *reply = CloseReaderReply{} + + return nil +} diff --git a/pkg/storage/rpc/server/blob/close_writer.go b/pkg/storage/rpc/server/blob/close_writer.go new file mode 100644 index 0000000..8fe6ab1 --- /dev/null +++ b/pkg/storage/rpc/server/blob/close_writer.go @@ -0,0 +1,31 @@ +package blob + +import ( + "context" + + "github.com/pkg/errors" +) + +type CloseWriterArgs struct { + WriterID WriterID +} + +type CloseWriterReply struct { +} + +func (s *Service) CloseWriter(ctx context.Context, args *CloseWriterArgs, reply *CloseWriterReply) error { + writer, err := s.getOpenedWriter(args.WriterID) + if err != nil { + return errors.WithStack(err) + } + + if err := writer.Close(); err != nil { + return errors.WithStack(err) + } + + s.writers.Delete(args.WriterID) + + *reply = CloseWriterReply{} + + return nil +} diff --git a/pkg/storage/rpc/server/blob/delete_bucket.go b/pkg/storage/rpc/server/blob/delete_bucket.go new file mode 100644 index 0000000..751d97a --- /dev/null +++ b/pkg/storage/rpc/server/blob/delete_bucket.go @@ -0,0 +1,22 @@ +package blob + +import ( + "context" + + "github.com/pkg/errors" +) + +type DeleteBucketArgs struct { + BucketName string +} + +type DeleteBucketReply struct { +} + +func (s *Service) DeleteBucket(ctx context.Context, args *DeleteBucketArgs, reply *DeleteBucketReply) error { + if err := s.store.DeleteBucket(ctx, args.BucketName); err != nil { + return errors.WithStack(err) + } + + return nil +} diff --git a/pkg/storage/rpc/server/blob/get_blob_info.go b/pkg/storage/rpc/server/blob/get_blob_info.go new file mode 100644 index 0000000..1f3a031 --- /dev/null +++ b/pkg/storage/rpc/server/blob/get_blob_info.go @@ -0,0 +1,36 @@ +package blob + +import ( + "context" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "github.com/pkg/errors" +) + +type GetBlobInfoArgs struct { + Name string + BlobID storage.BlobID + BucketID BucketID +} + +type GetBlobInfoReply struct { + BlobInfo storage.BlobInfo +} + +func (s *Service) GetBlobInfo(ctx context.Context, args *GetBlobInfoArgs, reply *GetBlobInfoReply) error { + bucket, err := s.getOpenedBucket(args.BucketID) + if err != nil { + return errors.WithStack(err) + } + + blobInfo, err := bucket.Get(ctx, args.BlobID) + if err != nil { + return errors.WithStack(err) + } + + *reply = GetBlobInfoReply{ + BlobInfo: blobInfo, + } + + return nil +} diff --git a/pkg/storage/rpc/server/blob/get_bucket_size.go b/pkg/storage/rpc/server/blob/get_bucket_size.go new file mode 100644 index 0000000..5c02733 --- /dev/null +++ b/pkg/storage/rpc/server/blob/get_bucket_size.go @@ -0,0 +1,33 @@ +package blob + +import ( + "context" + + "github.com/pkg/errors" +) + +type GetBucketSizeArgs struct { + BucketID BucketID +} + +type GetBucketSizeReply struct { + Size int64 +} + +func (s *Service) GetBucketSize(ctx context.Context, args *GetBucketSizeArgs, reply *GetBucketSizeReply) error { + bucket, err := s.getOpenedBucket(args.BucketID) + if err != nil { + return errors.WithStack(err) + } + + size, err := bucket.Size(ctx) + if err != nil { + return errors.WithStack(err) + } + + *reply = GetBucketSizeReply{ + Size: size, + } + + return nil +} diff --git a/pkg/storage/rpc/server/blob/list_blob_info.go b/pkg/storage/rpc/server/blob/list_blob_info.go new file mode 100644 index 0000000..6614b0c --- /dev/null +++ b/pkg/storage/rpc/server/blob/list_blob_info.go @@ -0,0 +1,34 @@ +package blob + +import ( + "context" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "github.com/pkg/errors" +) + +type ListBlobInfoArgs struct { + BucketID BucketID +} + +type ListBlobInfoReply struct { + BlobInfos []storage.BlobInfo +} + +func (s *Service) ListBlobInfo(ctx context.Context, args *ListBlobInfoArgs, reply *ListBlobInfoReply) error { + bucket, err := s.getOpenedBucket(args.BucketID) + if err != nil { + return errors.WithStack(err) + } + + blobInfos, err := bucket.List(ctx) + if err != nil { + return errors.WithStack(err) + } + + *reply = ListBlobInfoReply{ + BlobInfos: blobInfos, + } + + return nil +} diff --git a/pkg/storage/rpc/server/blob/list_buckets.go b/pkg/storage/rpc/server/blob/list_buckets.go new file mode 100644 index 0000000..52c8a10 --- /dev/null +++ b/pkg/storage/rpc/server/blob/list_buckets.go @@ -0,0 +1,27 @@ +package blob + +import ( + "context" + + "github.com/pkg/errors" +) + +type ListBucketsArgs struct { +} + +type ListBucketsReply struct { + Buckets []string +} + +func (s *Service) ListBuckets(ctx context.Context, args *ListBucketsArgs, reply *ListBucketsReply) error { + buckets, err := s.store.ListBuckets(ctx) + if err != nil { + return errors.WithStack(err) + } + + *reply = ListBucketsReply{ + Buckets: buckets, + } + + return nil +} diff --git a/pkg/storage/rpc/server/blob/new_blob_reader.go b/pkg/storage/rpc/server/blob/new_blob_reader.go new file mode 100644 index 0000000..b1d1286 --- /dev/null +++ b/pkg/storage/rpc/server/blob/new_blob_reader.go @@ -0,0 +1,57 @@ +package blob + +import ( + "context" + "io" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "github.com/pkg/errors" +) + +type NewBlobReaderArgs struct { + BlobID storage.BlobID + BucketID BucketID +} + +type NewBlobReaderReply struct { + ReaderID ReaderID +} + +func (s *Service) NewBlobReader(ctx context.Context, args *NewBlobReaderArgs, reply *NewBlobReaderReply) error { + bucket, err := s.getOpenedBucket(args.BucketID) + if err != nil { + return errors.WithStack(err) + } + + readerID, err := NewReaderID() + if err != nil { + return errors.WithStack(err) + } + + reader, err := bucket.NewReader(ctx, args.BlobID) + if err != nil { + return errors.WithStack(err) + } + + s.readers.Store(readerID, reader) + + *reply = NewBlobReaderReply{ + ReaderID: readerID, + } + + return nil +} + +func (s *Service) getOpenedReader(id ReaderID) (io.ReadSeekCloser, error) { + raw, exists := s.readers.Load(id) + if !exists { + return nil, errors.Errorf("could not find writer '%s'", id) + } + + reader, ok := raw.(io.ReadSeekCloser) + if !ok { + return nil, errors.Errorf("unexpected type '%T' for writer", raw) + } + + return reader, nil +} diff --git a/pkg/storage/rpc/server/blob/new_blob_writer.go b/pkg/storage/rpc/server/blob/new_blob_writer.go new file mode 100644 index 0000000..de25439 --- /dev/null +++ b/pkg/storage/rpc/server/blob/new_blob_writer.go @@ -0,0 +1,57 @@ +package blob + +import ( + "context" + "io" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "github.com/pkg/errors" +) + +type NewBlobWriterArgs struct { + BlobID storage.BlobID + BucketID BucketID +} + +type NewBlobWriterReply struct { + WriterID WriterID +} + +func (s *Service) NewBlobWriter(ctx context.Context, args *NewBlobWriterArgs, reply *NewBlobWriterReply) error { + bucket, err := s.getOpenedBucket(args.BucketID) + if err != nil { + return errors.WithStack(err) + } + + writerID, err := NewWriterID() + if err != nil { + return errors.WithStack(err) + } + + writer, err := bucket.NewWriter(ctx, args.BlobID) + if err != nil { + return errors.WithStack(err) + } + + s.writers.Store(writerID, writer) + + *reply = NewBlobWriterReply{ + WriterID: writerID, + } + + return nil +} + +func (s *Service) getOpenedWriter(id WriterID) (io.WriteCloser, error) { + raw, exists := s.writers.Load(id) + if !exists { + return nil, errors.Errorf("could not find writer '%s'", id) + } + + writer, ok := raw.(io.WriteCloser) + if !ok { + return nil, errors.Errorf("unexpected type '%T' for writer", raw) + } + + return writer, nil +} diff --git a/pkg/storage/rpc/server/blob/open_bucket.go b/pkg/storage/rpc/server/blob/open_bucket.go new file mode 100644 index 0000000..bf5ce28 --- /dev/null +++ b/pkg/storage/rpc/server/blob/open_bucket.go @@ -0,0 +1,50 @@ +package blob + +import ( + "context" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "github.com/pkg/errors" +) + +type OpenBucketArgs struct { + BucketName string +} + +type OpenBucketReply struct { + BucketID BucketID +} + +func (s *Service) OpenBucket(ctx context.Context, args *OpenBucketArgs, reply *OpenBucketReply) error { + bucket, err := s.store.OpenBucket(ctx, args.BucketName) + if err != nil { + return errors.WithStack(err) + } + + bucketID, err := NewBucketID() + if err != nil { + return errors.WithStack(err) + } + + s.buckets.Store(bucketID, bucket) + + *reply = OpenBucketReply{ + BucketID: bucketID, + } + + return nil +} + +func (s *Service) getOpenedBucket(id BucketID) (storage.BlobBucket, error) { + raw, exists := s.buckets.Load(id) + if !exists { + return nil, errors.WithStack(storage.ErrBucketClosed) + } + + bucket, ok := raw.(storage.BlobBucket) + if !ok { + return nil, errors.Errorf("unexpected type '%T' for blob bucket", raw) + } + + return bucket, nil +} diff --git a/pkg/storage/rpc/server/blob/read_blob.go b/pkg/storage/rpc/server/blob/read_blob.go new file mode 100644 index 0000000..7a12bc1 --- /dev/null +++ b/pkg/storage/rpc/server/blob/read_blob.go @@ -0,0 +1,41 @@ +package blob + +import ( + "context" + "io" + + "github.com/pkg/errors" +) + +type ReadBlobArgs struct { + ReaderID ReaderID + Length int +} + +type ReadBlobReply struct { + Data []byte + Read int + EOF bool +} + +func (s *Service) ReadBlob(ctx context.Context, args *ReadBlobArgs, reply *ReadBlobReply) error { + reader, err := s.getOpenedReader(args.ReaderID) + if err != nil { + return errors.WithStack(err) + } + + buff := make([]byte, args.Length) + + read, err := reader.Read(buff) + if err != nil && !errors.Is(err, io.EOF) { + return errors.WithStack(err) + } + + *reply = ReadBlobReply{ + Read: read, + Data: buff, + EOF: errors.Is(err, io.EOF), + } + + return nil +} diff --git a/pkg/storage/rpc/server/blob/seek_blob.go b/pkg/storage/rpc/server/blob/seek_blob.go new file mode 100644 index 0000000..902abc2 --- /dev/null +++ b/pkg/storage/rpc/server/blob/seek_blob.go @@ -0,0 +1,38 @@ +package blob + +import ( + "context" + "io" + + "github.com/pkg/errors" +) + +type SeekBlobArgs struct { + ReaderID ReaderID + Offset int64 + Whence int +} + +type SeekBlobReply struct { + Read int64 + EOF bool +} + +func (s *Service) SeekBlob(ctx context.Context, args *SeekBlobArgs, reply *SeekBlobReply) error { + reader, err := s.getOpenedReader(args.ReaderID) + if err != nil { + return errors.WithStack(err) + } + + read, err := reader.Seek(args.Offset, args.Whence) + if err != nil && !errors.Is(err, io.EOF) { + return errors.WithStack(err) + } + + *reply = SeekBlobReply{ + Read: read, + EOF: errors.Is(err, io.EOF), + } + + return nil +} diff --git a/pkg/storage/rpc/server/blob/service.go b/pkg/storage/rpc/server/blob/service.go new file mode 100644 index 0000000..3ab7689 --- /dev/null +++ b/pkg/storage/rpc/server/blob/service.go @@ -0,0 +1,60 @@ +package blob + +import ( + "fmt" + "sync" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "github.com/google/uuid" + "github.com/pkg/errors" +) + +type BucketID string +type WriterID string +type ReaderID string + +type Service struct { + store storage.BlobStore + buckets sync.Map + writers sync.Map + readers sync.Map +} + +func NewService(store storage.BlobStore) *Service { + return &Service{ + store: store, + } +} + +func NewBucketID() (BucketID, error) { + uuid, err := uuid.NewUUID() + if err != nil { + return "", errors.WithStack(err) + } + + id := BucketID(fmt.Sprintf("bucket-%s", uuid.String())) + + return id, nil +} + +func NewWriterID() (WriterID, error) { + uuid, err := uuid.NewUUID() + if err != nil { + return "", errors.WithStack(err) + } + + id := WriterID(fmt.Sprintf("writer-%s", uuid.String())) + + return id, nil +} + +func NewReaderID() (ReaderID, error) { + uuid, err := uuid.NewUUID() + if err != nil { + return "", errors.WithStack(err) + } + + id := ReaderID(fmt.Sprintf("reader-%s", uuid.String())) + + return id, nil +} diff --git a/pkg/storage/rpc/server/blob/write_blob.go b/pkg/storage/rpc/server/blob/write_blob.go new file mode 100644 index 0000000..a3651a8 --- /dev/null +++ b/pkg/storage/rpc/server/blob/write_blob.go @@ -0,0 +1,34 @@ +package blob + +import ( + "context" + + "github.com/pkg/errors" +) + +type WriteBlobArgs struct { + WriterID WriterID + Data []byte +} + +type WriteBlobReply struct { + Written int +} + +func (s *Service) WriteBlob(ctx context.Context, args *WriteBlobArgs, reply *WriteBlobReply) error { + writer, err := s.getOpenedWriter(args.WriterID) + if err != nil { + return errors.WithStack(err) + } + + written, err := writer.Write(args.Data) + if err != nil { + return errors.WithStack(err) + } + + *reply = WriteBlobReply{ + Written: written, + } + + return nil +} diff --git a/pkg/storage/rpc/server/document_store.go b/pkg/storage/rpc/server/document_store.go new file mode 100644 index 0000000..4a87f88 --- /dev/null +++ b/pkg/storage/rpc/server/document_store.go @@ -0,0 +1,11 @@ +package server + +import "forge.cadoles.com/arcad/edge/pkg/storage" + +type DocumentStore struct { + store storage.DocumentStore +} + +func NewDocumentStore(store storage.DocumentStore) *DocumentStore { + return &DocumentStore{store} +} diff --git a/pkg/storage/rpc/server/server.go b/pkg/storage/rpc/server/server.go new file mode 100644 index 0000000..2da0327 --- /dev/null +++ b/pkg/storage/rpc/server/server.go @@ -0,0 +1,20 @@ +package server + +import ( + "github.com/keegancsmith/rpc" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "forge.cadoles.com/arcad/edge/pkg/storage/rpc/server/blob" +) + +func NewBlobStoreServer(store storage.BlobStore) *rpc.Server { + server := rpc.NewServer() + server.Register(blob.NewService(store)) + return server +} + +func NewDocumentStoreServer(store storage.DocumentStore) *rpc.Server { + server := rpc.NewServer() + server.Register(NewDocumentStore(store)) + return server +} diff --git a/pkg/storage/sqlite/blob_store_test.go b/pkg/storage/sqlite/blob_store_test.go index 7fabd7c..b00a417 100644 --- a/pkg/storage/sqlite/blob_store_test.go +++ b/pkg/storage/sqlite/blob_store_test.go @@ -26,3 +26,18 @@ func TestBlobStore(t *testing.T) { testsuite.TestBlobStore(t, store) } + +func BenchmarkBlobStore(t *testing.B) { + logger.SetLevel(logger.LevelError) + + file := "./testdata/blobstore_test.sqlite" + + if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) { + t.Fatalf("%+v", errors.WithStack(err)) + } + + dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds()) + store := NewBlobStore(dsn) + + testsuite.BenchmarkBlobStore(t, store) +} diff --git a/pkg/storage/testsuite/blob_store.go b/pkg/storage/testsuite/blob_store.go index b73c086..8b6fe43 100644 --- a/pkg/storage/testsuite/blob_store.go +++ b/pkg/storage/testsuite/blob_store.go @@ -8,7 +8,6 @@ import ( func TestBlobStore(t *testing.T, store storage.BlobStore) { t.Run("Ops", func(t *testing.T) { - t.Parallel() testBlobStoreOps(t, store) }) } diff --git a/pkg/storage/testsuite/blob_store_benchmark.go b/pkg/storage/testsuite/blob_store_benchmark.go new file mode 100644 index 0000000..3dc1486 --- /dev/null +++ b/pkg/storage/testsuite/blob_store_benchmark.go @@ -0,0 +1,79 @@ +package testsuite + +import ( + "bytes" + "context" + "fmt" + "io" + "testing" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "github.com/pkg/errors" +) + +func BenchmarkBlobStore(t *testing.B, store storage.BlobStore) { + t.Run("BlobCreateUpdateReadDelete", func(t *testing.B) { + + for i := 0; i < t.N; i++ { + bucketName := fmt.Sprintf("bucket-%d", i) + if err := runBlobCreateUpdateReadDelete(store, bucketName); err != nil { + t.Fatalf("%+v", errors.WithStack(err)) + } + } + }) +} + +func runBlobCreateUpdateReadDelete(store storage.BlobStore, bucketName string) error { + ctx := context.Background() + + bucket, err := store.OpenBucket(ctx, bucketName) + if err != nil { + return errors.WithStack(err) + } + + blobID := storage.NewBlobID() + + writer, err := bucket.NewWriter(ctx, blobID) + if err != nil { + return errors.WithStack(err) + } + + data := []byte("foo") + + if _, err = writer.Write(data); err != nil { + return errors.WithStack(err) + } + + if err := writer.Close(); err != nil { + return errors.WithStack(err) + } + + reader, err := bucket.NewReader(ctx, blobID) + if err != nil { + return errors.WithStack(err) + } + + var buf bytes.Buffer + + if _, err = io.Copy(&buf, reader); err != nil { + return errors.WithStack(err) + } + + if err := reader.Close(); err != nil { + return errors.WithStack(err) + } + + if err := bucket.Delete(ctx, blobID); err != nil { + return errors.WithStack(err) + } + + if err := bucket.Close(); err != nil { + return errors.WithStack(err) + } + + if err := store.DeleteBucket(ctx, bucketName); err != nil { + return errors.WithStack(err) + } + + return nil +} diff --git a/pkg/storage/testsuite/blob_store_ops.go b/pkg/storage/testsuite/blob_store_ops.go index 6f97eb3..0eeadf5 100644 --- a/pkg/storage/testsuite/blob_store_ops.go +++ b/pkg/storage/testsuite/blob_store_ops.go @@ -17,9 +17,11 @@ type blobStoreTestCase struct { var blobStoreTestCases = []blobStoreTestCase{ { - Name: "Open new bucket", + Name: "Open then delete bucket", Run: func(ctx context.Context, store storage.BlobStore) error { - bucket, err := store.OpenBucket(ctx, "open-new-bucket") + bucketName := "open-new-bucket" + + bucket, err := store.OpenBucket(ctx, bucketName) if err != nil { return errors.WithStack(err) } @@ -50,6 +52,23 @@ var blobStoreTestCases = []blobStoreTestCase{ return errors.WithStack(err) } + if err := store.DeleteBucket(ctx, bucketName); err != nil { + return errors.WithStack(err) + } + + buckets, err := store.ListBuckets(ctx) + if err != nil { + return errors.WithStack(err) + } + + for _, b := range buckets { + if b != bucketName { + continue + } + + return errors.Errorf("bucket '%s' should be deleted", bucketName) + } + return nil }, },