feat(storage): rpc based implementation
arcad/edge/pipeline/head There was a failure building this commit Details
arcad/edge/pipeline/pr-master There was a failure building this commit Details

This commit is contained in:
wpetit 2023-09-12 22:03:25 -06:00
parent 17808d14c9
commit 05a9861e6f
12 changed files with 412 additions and 1 deletions

1
go.mod
View File

@ -15,6 +15,7 @@ require (
github.com/go-playground/universal-translator v0.16.0 // indirect github.com/go-playground/universal-translator v0.16.0 // indirect
github.com/goccy/go-json v0.9.11 // indirect github.com/goccy/go-json v0.9.11 // indirect
github.com/gogo/protobuf v0.0.0-20161014173244-50d1bd39ce4e // indirect github.com/gogo/protobuf v0.0.0-20161014173244-50d1bd39ce4e // indirect
github.com/keegancsmith/rpc v1.3.0 // indirect
github.com/leodido/go-urn v1.1.0 // indirect github.com/leodido/go-urn v1.1.0 // indirect
github.com/lestrrat-go/blackmagic v1.0.1 // indirect github.com/lestrrat-go/blackmagic v1.0.1 // indirect
github.com/lestrrat-go/httpcc v1.0.1 // indirect github.com/lestrrat-go/httpcc v1.0.1 // indirect

2
go.sum
View File

@ -201,6 +201,8 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
github.com/keegancsmith/rpc v1.3.0 h1:wGWOpjcNrZaY8GDYZJfvyxmlLljm3YQWF+p918DXtDk=
github.com/keegancsmith/rpc v1.3.0/go.mod h1:6O2xnOGjPyvIPbvp0MdrOe5r6cu1GZ4JoTzpzDhWeo0=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=

View File

@ -0,0 +1,121 @@
package client
import (
"bytes"
"context"
"io"
"github.com/keegancsmith/rpc"
"forge.cadoles.com/arcad/edge/pkg/storage"
"github.com/pkg/errors"
)
type BlobBucket struct {
name string
closed bool
client *rpc.Client
}
// Size implements storage.BlobBucket
func (b *BlobBucket) Size(ctx context.Context) (int64, error) {
return 0, nil
}
// Name implements storage.BlobBucket
func (b *BlobBucket) Name() string {
return b.name
}
// Close implements storage.BlobBucket
func (b *BlobBucket) Close() error {
return nil
}
// Delete implements storage.BlobBucket
func (b *BlobBucket) Delete(ctx context.Context, id storage.BlobID) error {
return nil
}
// Get implements storage.BlobBucket
func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobInfo, error) {
return nil, nil
}
// List implements storage.BlobBucket
func (b *BlobBucket) List(ctx context.Context) ([]storage.BlobInfo, error) {
return nil, nil
}
// NewReader implements storage.BlobBucket
func (b *BlobBucket) NewReader(ctx context.Context, id storage.BlobID) (io.ReadSeekCloser, error) {
if b.closed {
return nil, errors.WithStack(storage.ErrBucketClosed)
}
return &blobReaderCloser{
id: id,
bucket: b.name,
client: b.client,
}, nil
}
// NewWriter implements storage.BlobBucket
func (b *BlobBucket) NewWriter(ctx context.Context, id storage.BlobID) (io.WriteCloser, error) {
if b.closed {
return nil, errors.WithStack(storage.ErrBucketClosed)
}
return &blobWriterCloser{
id: id,
bucket: b.name,
buf: bytes.Buffer{},
client: b.client,
}, nil
}
type blobWriterCloser struct {
id storage.BlobID
bucket string
client *rpc.Client
buf bytes.Buffer
closed bool
}
// Write implements io.WriteCloser
func (wbc *blobWriterCloser) Write(p []byte) (int, error) {
return 0, nil
}
// Close implements io.WriteCloser
func (wbc *blobWriterCloser) Close() error {
return nil
}
type blobReaderCloser struct {
id storage.BlobID
bucket string
client *rpc.Client
}
// Read implements io.ReadSeekCloser
func (brc *blobReaderCloser) Read(p []byte) (int, error) {
return 0, nil
}
// Seek implements io.ReadSeekCloser
func (brc *blobReaderCloser) Seek(offset int64, whence int) (int64, error) {
return 0, nil
}
// Close implements io.ReadSeekCloser
func (brc *blobReaderCloser) Close() error {
return nil
}
var (
_ storage.BlobBucket = &BlobBucket{}
_ storage.BlobInfo = &BlobInfo{}
_ io.WriteCloser = &blobWriterCloser{}
_ io.ReadSeekCloser = &blobReaderCloser{}
)

View File

@ -0,0 +1,40 @@
package client
import (
"time"
"forge.cadoles.com/arcad/edge/pkg/storage"
)
type BlobInfo struct {
id storage.BlobID
bucket string
contentType string
modTime time.Time
size int64
}
// Bucket implements storage.BlobInfo
func (i *BlobInfo) Bucket() string {
return i.bucket
}
// ID implements storage.BlobInfo
func (i *BlobInfo) ID() storage.BlobID {
return i.id
}
// ContentType implements storage.BlobInfo
func (i *BlobInfo) ContentType() string {
return i.contentType
}
// ModTime implements storage.BlobInfo
func (i *BlobInfo) ModTime() time.Time {
return i.modTime
}
// Size implements storage.BlobInfo
func (i *BlobInfo) Size() int64 {
return i.size
}

View File

@ -0,0 +1,57 @@
package client
import (
"context"
"github.com/keegancsmith/rpc"
"forge.cadoles.com/arcad/edge/pkg/storage"
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/server"
"github.com/pkg/errors"
)
type BlobStore struct {
client *rpc.Client
}
// DeleteBucket implements storage.BlobStore.
func (s *BlobStore) DeleteBucket(ctx context.Context, name string) error {
args := &server.DeleteBucketArgs{
Name: name,
}
if err := s.client.Call(ctx, "BlobStore.DeleteBucket", args, nil); err != nil {
return errors.WithStack(err)
}
return nil
}
// ListBuckets implements storage.BlobStore.
func (s *BlobStore) ListBuckets(ctx context.Context) ([]string, error) {
panic("unimplemented")
}
// OpenBucket implements storage.BlobStore.
func (s *BlobStore) OpenBucket(ctx context.Context, name string) (storage.BlobBucket, error) {
args := &server.OpenBucketArgs{
Name: name,
}
reply := &server.OpenBucketReply{}
if err := s.client.Call(ctx, "BlobStore.OpenBucket", args, reply); err != nil {
return nil, errors.WithStack(err)
}
return &BlobBucket{
name: reply.BucketName,
closed: false,
client: s.client,
}, nil
}
func NewBlobStore(client *rpc.Client) *BlobStore {
return &BlobStore{client}
}
var _ storage.BlobStore = &BlobStore{}

View File

@ -0,0 +1,66 @@
package client
import (
"fmt"
"net/http/httptest"
"os"
"testing"
"time"
"github.com/keegancsmith/rpc"
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/server"
"forge.cadoles.com/arcad/edge/pkg/storage/sqlite"
"forge.cadoles.com/arcad/edge/pkg/storage/testsuite"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
func TestBlobStore(t *testing.T) {
t.Parallel()
logger.SetLevel(logger.LevelDebug)
httpServer := startNewServer(t)
defer httpServer.Close()
serverAddr := httpServer.Listener.Addr()
client, err := rpc.DialHTTPPath(
serverAddr.Network(),
serverAddr.String(),
"",
)
if err != nil {
t.Fatalf("%+v", errors.WithStack(err))
}
defer client.Close()
store := NewBlobStore(client)
testsuite.TestBlobStore(t, store)
}
func getSQLiteBlobstore(t *testing.T) *sqlite.BlobStore {
file := "./testdata/blobstore_test.sqlite"
if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) {
t.Fatalf("%+v", errors.WithStack(err))
}
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
store := sqlite.NewBlobStore(dsn)
return store
}
func startNewServer(t *testing.T) *httptest.Server {
store := getSQLiteBlobstore(t)
server := server.NewBlobStoreServer(store)
httpServer := httptest.NewServer(server)
httpServerAddr := httpServer.Listener.Addr().String()
t.Logf("Test HTTP RPC server listening on %s", httpServerAddr)
return httpServer
}

View File

@ -0,0 +1,40 @@
package client
import (
"context"
"github.com/keegancsmith/rpc"
"forge.cadoles.com/arcad/edge/pkg/storage"
"forge.cadoles.com/arcad/edge/pkg/storage/filter"
)
type DocumentStore struct {
client *rpc.Client
}
// Delete implements storage.DocumentStore.
func (*DocumentStore) Delete(ctx context.Context, collection string, id storage.DocumentID) error {
panic("unimplemented")
}
// Get implements storage.DocumentStore.
func (*DocumentStore) Get(ctx context.Context, collection string, id storage.DocumentID) (storage.Document, error) {
panic("unimplemented")
}
// Query implements storage.DocumentStore.
func (*DocumentStore) Query(ctx context.Context, collection string, filter *filter.Filter, funcs ...storage.QueryOptionFunc) ([]storage.Document, error) {
panic("unimplemented")
}
// Upsert implements storage.DocumentStore.
func (*DocumentStore) Upsert(ctx context.Context, collection string, document storage.Document) (storage.Document, error) {
panic("unimplemented")
}
func NewDocumentStore(client *rpc.Client) *DocumentStore {
return &DocumentStore{client}
}
var _ storage.DocumentStore = &DocumentStore{}

View File

@ -0,0 +1 @@
/*.sqlite*

View File

@ -0,0 +1,54 @@
package server
import (
"context"
"forge.cadoles.com/arcad/edge/pkg/storage"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
type BlobStore struct {
store storage.BlobStore
}
type DeleteBucketArgs struct {
Name string
}
type DeleteBucketReply struct {
}
func (s *BlobStore) DeleteBucket(ctx context.Context, args *DeleteBucketArgs, reply *DeleteBucketReply) error {
return nil
}
type OpenBucketArgs struct {
Name string
}
type OpenBucketReply struct {
BucketName string
}
func (s *BlobStore) OpenBucket(ctx context.Context, args *OpenBucketArgs, reply *OpenBucketReply) error {
bucket, err := s.store.OpenBucket(ctx, args.Name)
if err != nil {
return errors.WithStack(err)
}
defer func() {
if err := bucket.Close(); err != nil {
logger.Error(ctx, "could not close bucket", logger.E(errors.WithStack(err)))
}
}()
*reply = OpenBucketReply{
BucketName: bucket.Name(),
}
return nil
}
func NewBlobStore(store storage.BlobStore) *BlobStore {
return &BlobStore{store}
}

View File

@ -0,0 +1,11 @@
package server
import "forge.cadoles.com/arcad/edge/pkg/storage"
type DocumentStore struct {
store storage.DocumentStore
}
func NewDocumentStore(store storage.DocumentStore) *DocumentStore {
return &DocumentStore{store}
}

View File

@ -0,0 +1,19 @@
package server
import (
"github.com/keegancsmith/rpc"
"forge.cadoles.com/arcad/edge/pkg/storage"
)
func NewBlobStoreServer(store storage.BlobStore) *rpc.Server {
server := rpc.NewServer()
server.Register(NewBlobStore(store))
return server
}
func NewDocumentStoreServer(store storage.DocumentStore) *rpc.Server {
server := rpc.NewServer()
server.Register(NewDocumentStore(store))
return server
}

View File

@ -8,7 +8,6 @@ import (
func TestBlobStore(t *testing.T, store storage.BlobStore) { func TestBlobStore(t *testing.T, store storage.BlobStore) {
t.Run("Ops", func(t *testing.T) { t.Run("Ops", func(t *testing.T) {
t.Parallel()
testBlobStoreOps(t, store) testBlobStoreOps(t, store)
}) })
} }