Compare commits

..

1 Commits

Author SHA1 Message Date
19539ec34a feat(storage): rpc based implementation
All checks were successful
arcad/edge/pipeline/pr-master This commit looks good
2023-09-26 22:28:25 -06:00
11 changed files with 80 additions and 59 deletions

View File

@ -134,7 +134,7 @@ func getBlobStoreStoreServer(dataDir, tenant, dsnQueryString string) (*rpc.Serve
blobStore := sqlite.NewBlobStore(dsn)
blobStoreServer := server.NewBlobStoreServer(blobStore)
rawBlobStoreServer, _ := documentStoreTenants.LoadOrStore(tenant, blobStoreServer)
rawBlobStoreServer, _ := blobStoreTenants.LoadOrStore(tenant, blobStoreServer)
blobStoreServer, ok := rawBlobStoreServer.(*rpc.Server)
if !ok {

View File

@ -2,8 +2,6 @@ package client
import (
"context"
"encoding/gob"
"time"
"github.com/keegancsmith/rpc"
"github.com/pkg/errors"
@ -13,15 +11,6 @@ import (
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/server/document"
)
func init() {
gob.Register(storage.Document{})
gob.Register(storage.DocumentID(""))
gob.Register(time.Time{})
gob.Register(map[string]interface{}{})
gob.Register([]interface{}{})
gob.Register([]map[string]interface{}{})
}
type DocumentStore struct {
client *rpc.Client
}
@ -75,7 +64,9 @@ func (s *DocumentStore) Query(ctx context.Context, collection string, filter *fi
args.Filter = filter.AsMap()
}
reply := document.QueryDocumentsReply{}
reply := document.QueryDocumentsReply{
Documents: []storage.Document{},
}
if err := s.client.Call(ctx, "Service.QueryDocuments", args, &reply); err != nil {
return nil, errors.WithStack(err)

View File

@ -0,0 +1,5 @@
package client
import (
_ "forge.cadoles.com/arcad/edge/pkg/storage/rpc/gob"
)

View File

@ -0,0 +1,42 @@
package gob
import (
"time"
"forge.cadoles.com/arcad/edge/pkg/storage"
)
type BlobInfo struct {
Bucket_ string
ContentType_ string
BlobID_ storage.BlobID
ModTime_ time.Time
Size_ int64
}
// Bucket implements storage.BlobInfo.
func (bi *BlobInfo) Bucket() string {
return bi.Bucket_
}
// ContentType implements storage.BlobInfo.
func (bi *BlobInfo) ContentType() string {
return bi.ContentType_
}
// ID implements storage.BlobInfo.
func (bi *BlobInfo) ID() storage.BlobID {
return bi.BlobID_
}
// ModTime implements storage.BlobInfo.
func (bi *BlobInfo) ModTime() time.Time {
return bi.ModTime_
}
// Size implements storage.BlobInfo.
func (bi *BlobInfo) Size() int64 {
return bi.Size_
}
var _ storage.BlobInfo = &BlobInfo{}

View File

@ -0,0 +1,18 @@
package gob
import (
"encoding/gob"
"time"
"forge.cadoles.com/arcad/edge/pkg/storage"
)
func init() {
gob.Register(storage.Document{})
gob.Register(storage.DocumentID(""))
gob.Register(time.Time{})
gob.Register(map[string]interface{}{})
gob.Register([]interface{}{})
gob.Register([]map[string]interface{}{})
gob.Register(&BlobInfo{})
}

View File

@ -2,17 +2,12 @@ package blob
import (
"context"
"encoding/gob"
"time"
"forge.cadoles.com/arcad/edge/pkg/storage"
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/gob"
"github.com/pkg/errors"
)
func init() {
gob.Register(&BlobInfo{})
}
type GetBlobInfoArgs struct {
BlobID storage.BlobID
BucketID BucketID
@ -34,7 +29,7 @@ func (s *Service) GetBlobInfo(ctx context.Context, args *GetBlobInfoArgs, reply
}
*reply = GetBlobInfoReply{
BlobInfo: &BlobInfo{
BlobInfo: &gob.BlobInfo{
Bucket_: blobInfo.Bucket(),
ContentType_: blobInfo.ContentType(),
BlobID_: blobInfo.ID(),
@ -45,38 +40,3 @@ func (s *Service) GetBlobInfo(ctx context.Context, args *GetBlobInfoArgs, reply
return nil
}
type BlobInfo struct {
Bucket_ string
ContentType_ string
BlobID_ storage.BlobID
ModTime_ time.Time
Size_ int64
}
// Bucket implements storage.BlobInfo.
func (bi *BlobInfo) Bucket() string {
return bi.Bucket_
}
// ContentType implements storage.BlobInfo.
func (bi *BlobInfo) ContentType() string {
return bi.ContentType_
}
// ID implements storage.BlobInfo.
func (bi *BlobInfo) ID() storage.BlobID {
return bi.BlobID_
}
// ModTime implements storage.BlobInfo.
func (bi *BlobInfo) ModTime() time.Time {
return bi.ModTime_
}
// Size implements storage.BlobInfo.
func (bi *BlobInfo) Size() int64 {
return bi.Size_
}
var _ storage.BlobInfo = &BlobInfo{}

View File

@ -15,7 +15,7 @@ type DeleteDocumentArgs struct {
type DeleteDocumentReply struct {
}
func (s *Service) DeleteDocument(ctx context.Context, args *DeleteDocumentArgs, reply *DeleteDocumentReply) error {
func (s *Service) DeleteDocument(ctx context.Context, args DeleteDocumentArgs, reply *DeleteDocumentReply) error {
if err := s.store.Delete(ctx, args.Collection, args.DocumentID); err != nil {
return errors.WithStack(err)
}

View File

@ -16,7 +16,7 @@ type GetDocumentReply struct {
Document storage.Document
}
func (s *Service) GetDocument(ctx context.Context, args *GetDocumentArgs, reply *GetDocumentReply) error {
func (s *Service) GetDocument(ctx context.Context, args GetDocumentArgs, reply *GetDocumentReply) error {
document, err := s.store.Get(ctx, args.Collection, args.DocumentID)
if err != nil {
return errors.WithStack(err)

View File

@ -18,7 +18,7 @@ type QueryDocumentsReply struct {
Documents []storage.Document
}
func (s *Service) QueryDocuments(ctx context.Context, args *QueryDocumentsArgs, reply *QueryDocumentsReply) error {
func (s *Service) QueryDocuments(ctx context.Context, args QueryDocumentsArgs, reply *QueryDocumentsReply) error {
var (
argsFilter *filter.Filter
err error

View File

@ -16,7 +16,7 @@ type UpsertDocumentReply struct {
Document storage.Document
}
func (s *Service) UpsertDocument(ctx context.Context, args *UpsertDocumentArgs, reply *UpsertDocumentReply) error {
func (s *Service) UpsertDocument(ctx context.Context, args UpsertDocumentArgs, reply *UpsertDocumentReply) error {
document, err := s.store.Upsert(ctx, args.Collection, args.Document)
if err != nil {
return errors.WithStack(err)

View File

@ -0,0 +1,5 @@
package server
import (
_ "forge.cadoles.com/arcad/edge/pkg/storage/rpc/gob"
)