Compare commits
1 Commits
776d0cc3cd
...
19539ec34a
Author | SHA1 | Date | |
---|---|---|---|
19539ec34a |
@ -134,7 +134,7 @@ func getBlobStoreStoreServer(dataDir, tenant, dsnQueryString string) (*rpc.Serve
|
||||
blobStore := sqlite.NewBlobStore(dsn)
|
||||
blobStoreServer := server.NewBlobStoreServer(blobStore)
|
||||
|
||||
rawBlobStoreServer, _ := documentStoreTenants.LoadOrStore(tenant, blobStoreServer)
|
||||
rawBlobStoreServer, _ := blobStoreTenants.LoadOrStore(tenant, blobStoreServer)
|
||||
|
||||
blobStoreServer, ok := rawBlobStoreServer.(*rpc.Server)
|
||||
if !ok {
|
||||
|
@ -2,8 +2,6 @@ package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/gob"
|
||||
"time"
|
||||
|
||||
"github.com/keegancsmith/rpc"
|
||||
"github.com/pkg/errors"
|
||||
@ -13,15 +11,6 @@ import (
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/server/document"
|
||||
)
|
||||
|
||||
func init() {
|
||||
gob.Register(storage.Document{})
|
||||
gob.Register(storage.DocumentID(""))
|
||||
gob.Register(time.Time{})
|
||||
gob.Register(map[string]interface{}{})
|
||||
gob.Register([]interface{}{})
|
||||
gob.Register([]map[string]interface{}{})
|
||||
}
|
||||
|
||||
type DocumentStore struct {
|
||||
client *rpc.Client
|
||||
}
|
||||
@ -75,7 +64,9 @@ func (s *DocumentStore) Query(ctx context.Context, collection string, filter *fi
|
||||
args.Filter = filter.AsMap()
|
||||
}
|
||||
|
||||
reply := document.QueryDocumentsReply{}
|
||||
reply := document.QueryDocumentsReply{
|
||||
Documents: []storage.Document{},
|
||||
}
|
||||
|
||||
if err := s.client.Call(ctx, "Service.QueryDocuments", args, &reply); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
|
5
pkg/storage/rpc/client/init.go
Normal file
5
pkg/storage/rpc/client/init.go
Normal file
@ -0,0 +1,5 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
_ "forge.cadoles.com/arcad/edge/pkg/storage/rpc/gob"
|
||||
)
|
42
pkg/storage/rpc/gob/blob_info.go
Normal file
42
pkg/storage/rpc/gob/blob_info.go
Normal file
@ -0,0 +1,42 @@
|
||||
package gob
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
)
|
||||
|
||||
type BlobInfo struct {
|
||||
Bucket_ string
|
||||
ContentType_ string
|
||||
BlobID_ storage.BlobID
|
||||
ModTime_ time.Time
|
||||
Size_ int64
|
||||
}
|
||||
|
||||
// Bucket implements storage.BlobInfo.
|
||||
func (bi *BlobInfo) Bucket() string {
|
||||
return bi.Bucket_
|
||||
}
|
||||
|
||||
// ContentType implements storage.BlobInfo.
|
||||
func (bi *BlobInfo) ContentType() string {
|
||||
return bi.ContentType_
|
||||
}
|
||||
|
||||
// ID implements storage.BlobInfo.
|
||||
func (bi *BlobInfo) ID() storage.BlobID {
|
||||
return bi.BlobID_
|
||||
}
|
||||
|
||||
// ModTime implements storage.BlobInfo.
|
||||
func (bi *BlobInfo) ModTime() time.Time {
|
||||
return bi.ModTime_
|
||||
}
|
||||
|
||||
// Size implements storage.BlobInfo.
|
||||
func (bi *BlobInfo) Size() int64 {
|
||||
return bi.Size_
|
||||
}
|
||||
|
||||
var _ storage.BlobInfo = &BlobInfo{}
|
18
pkg/storage/rpc/gob/init.go
Normal file
18
pkg/storage/rpc/gob/init.go
Normal file
@ -0,0 +1,18 @@
|
||||
package gob
|
||||
|
||||
import (
|
||||
"encoding/gob"
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
)
|
||||
|
||||
func init() {
|
||||
gob.Register(storage.Document{})
|
||||
gob.Register(storage.DocumentID(""))
|
||||
gob.Register(time.Time{})
|
||||
gob.Register(map[string]interface{}{})
|
||||
gob.Register([]interface{}{})
|
||||
gob.Register([]map[string]interface{}{})
|
||||
gob.Register(&BlobInfo{})
|
||||
}
|
@ -2,17 +2,12 @@ package blob
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/gob"
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/gob"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func init() {
|
||||
gob.Register(&BlobInfo{})
|
||||
}
|
||||
|
||||
type GetBlobInfoArgs struct {
|
||||
BlobID storage.BlobID
|
||||
BucketID BucketID
|
||||
@ -34,7 +29,7 @@ func (s *Service) GetBlobInfo(ctx context.Context, args *GetBlobInfoArgs, reply
|
||||
}
|
||||
|
||||
*reply = GetBlobInfoReply{
|
||||
BlobInfo: &BlobInfo{
|
||||
BlobInfo: &gob.BlobInfo{
|
||||
Bucket_: blobInfo.Bucket(),
|
||||
ContentType_: blobInfo.ContentType(),
|
||||
BlobID_: blobInfo.ID(),
|
||||
@ -45,38 +40,3 @@ func (s *Service) GetBlobInfo(ctx context.Context, args *GetBlobInfoArgs, reply
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type BlobInfo struct {
|
||||
Bucket_ string
|
||||
ContentType_ string
|
||||
BlobID_ storage.BlobID
|
||||
ModTime_ time.Time
|
||||
Size_ int64
|
||||
}
|
||||
|
||||
// Bucket implements storage.BlobInfo.
|
||||
func (bi *BlobInfo) Bucket() string {
|
||||
return bi.Bucket_
|
||||
}
|
||||
|
||||
// ContentType implements storage.BlobInfo.
|
||||
func (bi *BlobInfo) ContentType() string {
|
||||
return bi.ContentType_
|
||||
}
|
||||
|
||||
// ID implements storage.BlobInfo.
|
||||
func (bi *BlobInfo) ID() storage.BlobID {
|
||||
return bi.BlobID_
|
||||
}
|
||||
|
||||
// ModTime implements storage.BlobInfo.
|
||||
func (bi *BlobInfo) ModTime() time.Time {
|
||||
return bi.ModTime_
|
||||
}
|
||||
|
||||
// Size implements storage.BlobInfo.
|
||||
func (bi *BlobInfo) Size() int64 {
|
||||
return bi.Size_
|
||||
}
|
||||
|
||||
var _ storage.BlobInfo = &BlobInfo{}
|
||||
|
@ -15,7 +15,7 @@ type DeleteDocumentArgs struct {
|
||||
type DeleteDocumentReply struct {
|
||||
}
|
||||
|
||||
func (s *Service) DeleteDocument(ctx context.Context, args *DeleteDocumentArgs, reply *DeleteDocumentReply) error {
|
||||
func (s *Service) DeleteDocument(ctx context.Context, args DeleteDocumentArgs, reply *DeleteDocumentReply) error {
|
||||
if err := s.store.Delete(ctx, args.Collection, args.DocumentID); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
@ -16,7 +16,7 @@ type GetDocumentReply struct {
|
||||
Document storage.Document
|
||||
}
|
||||
|
||||
func (s *Service) GetDocument(ctx context.Context, args *GetDocumentArgs, reply *GetDocumentReply) error {
|
||||
func (s *Service) GetDocument(ctx context.Context, args GetDocumentArgs, reply *GetDocumentReply) error {
|
||||
document, err := s.store.Get(ctx, args.Collection, args.DocumentID)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
|
@ -18,7 +18,7 @@ type QueryDocumentsReply struct {
|
||||
Documents []storage.Document
|
||||
}
|
||||
|
||||
func (s *Service) QueryDocuments(ctx context.Context, args *QueryDocumentsArgs, reply *QueryDocumentsReply) error {
|
||||
func (s *Service) QueryDocuments(ctx context.Context, args QueryDocumentsArgs, reply *QueryDocumentsReply) error {
|
||||
var (
|
||||
argsFilter *filter.Filter
|
||||
err error
|
||||
|
@ -16,7 +16,7 @@ type UpsertDocumentReply struct {
|
||||
Document storage.Document
|
||||
}
|
||||
|
||||
func (s *Service) UpsertDocument(ctx context.Context, args *UpsertDocumentArgs, reply *UpsertDocumentReply) error {
|
||||
func (s *Service) UpsertDocument(ctx context.Context, args UpsertDocumentArgs, reply *UpsertDocumentReply) error {
|
||||
document, err := s.store.Upsert(ctx, args.Collection, args.Document)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
|
5
pkg/storage/rpc/server/init.go
Normal file
5
pkg/storage/rpc/server/init.go
Normal file
@ -0,0 +1,5 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
_ "forge.cadoles.com/arcad/edge/pkg/storage/rpc/gob"
|
||||
)
|
Loading…
x
Reference in New Issue
Block a user