package client import ( "context" "net/url" "github.com/keegancsmith/rpc" "github.com/pkg/errors" "gitlab.com/wpetit/goweb/logger" "forge.cadoles.com/arcad/edge/pkg/storage" "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server/document" "forge.cadoles.com/arcad/edge/pkg/storage/filter" ) type DocumentStore struct { serverURL *url.URL } // Delete implements storage.DocumentStore. func (s *DocumentStore) Delete(ctx context.Context, collection string, id storage.DocumentID) error { args := document.DeleteDocumentArgs{ Collection: collection, DocumentID: id, } reply := document.DeleteDocumentReply{} if err := s.call(ctx, "Service.DeleteDocument", args, &reply); err != nil { return errors.WithStack(err) } return nil } // Get implements storage.DocumentStore. func (s *DocumentStore) Get(ctx context.Context, collection string, id storage.DocumentID) (storage.Document, error) { args := document.GetDocumentArgs{ Collection: collection, DocumentID: id, } reply := document.GetDocumentReply{} if err := s.call(ctx, "Service.GetDocument", args, &reply); err != nil { return nil, errors.WithStack(err) } return reply.Document, nil } // Query implements storage.DocumentStore. func (s *DocumentStore) Query(ctx context.Context, collection string, filter *filter.Filter, funcs ...storage.QueryOptionFunc) ([]storage.Document, error) { opts := &storage.QueryOptions{} for _, fn := range funcs { fn(opts) } args := document.QueryDocumentsArgs{ Collection: collection, Filter: nil, Options: opts, } if filter != nil { args.Filter = filter.AsMap() } reply := document.QueryDocumentsReply{ Documents: []storage.Document{}, } if err := s.call(ctx, "Service.QueryDocuments", args, &reply); err != nil { return nil, errors.WithStack(err) } return reply.Documents, nil } // Upsert implements storage.DocumentStore. func (s *DocumentStore) Upsert(ctx context.Context, collection string, doc storage.Document) (storage.Document, error) { args := document.UpsertDocumentArgs{ Collection: collection, Document: doc, } reply := document.UpsertDocumentReply{} if err := s.call(ctx, "Service.UpsertDocument", args, &reply); err != nil { return nil, errors.WithStack(err) } return reply.Document, nil } func (s *DocumentStore) call(ctx context.Context, serviceMethod string, args any, reply any) error { err := s.withClient(ctx, func(ctx context.Context, client *rpc.Client) error { if err := client.Call(ctx, serviceMethod, args, reply); err != nil { return errors.WithStack(remapDocumentError(err)) } return nil }) if err != nil { return errors.WithStack(err) } return nil } func (s *DocumentStore) withClient(ctx context.Context, fn func(ctx context.Context, client *rpc.Client) error) error { client, err := rpc.DialHTTPPath("tcp", s.serverURL.Host, s.serverURL.Path+"?"+s.serverURL.RawQuery) if err != nil { return errors.WithStack(err) } defer func() { if err := client.Close(); err != nil { logger.Error(ctx, "could not close rpc client", logger.CapturedE(errors.WithStack(err))) } }() if err := fn(ctx, client); err != nil { return errors.WithStack(err) } return nil } func NewDocumentStore(url *url.URL) *DocumentStore { return &DocumentStore{url} } var _ storage.DocumentStore = &DocumentStore{}