From 0577762be9fa18641ab4a5e222e3eb0facd34b57 Mon Sep 17 00:00:00 2001 From: William Petit Date: Thu, 23 Mar 2023 19:01:20 +0100 Subject: [PATCH] feat(module,blob): implements full api --- cmd/cli/command/app/run.go | 3 +- doc/apps/server-api/blob.md | 24 +- pkg/http/blob.go | 9 +- pkg/module/blob.go | 282 --------------- pkg/module/blob/blob_info.go | 21 ++ pkg/module/{ => blob}/blob_message.go | 2 +- pkg/module/blob/module.go | 499 ++++++++++++++++++++++++++ pkg/module/blob/module_test.go | 44 +++ pkg/module/blob/testdata/blob.js | 79 ++++ pkg/module/util/assert.go | 2 +- pkg/storage/sqlite/blob_bucket.go | 33 +- pkg/storage/sqlite/blob_store.go | 2 +- 12 files changed, 700 insertions(+), 300 deletions(-) delete mode 100644 pkg/module/blob.go create mode 100644 pkg/module/blob/blob_info.go rename pkg/module/{ => blob}/blob_message.go (99%) create mode 100644 pkg/module/blob/module.go create mode 100644 pkg/module/blob/module_test.go create mode 100644 pkg/module/blob/testdata/blob.js diff --git a/cmd/cli/command/app/run.go b/cmd/cli/command/app/run.go index 153586d..e28805b 100644 --- a/cmd/cli/command/app/run.go +++ b/cmd/cli/command/app/run.go @@ -15,6 +15,7 @@ import ( "forge.cadoles.com/arcad/edge/pkg/module" "forge.cadoles.com/arcad/edge/pkg/module/auth" authHTTP "forge.cadoles.com/arcad/edge/pkg/module/auth/http" + "forge.cadoles.com/arcad/edge/pkg/module/blob" "forge.cadoles.com/arcad/edge/pkg/module/cast" "forge.cadoles.com/arcad/edge/pkg/module/net" "forge.cadoles.com/arcad/edge/pkg/storage" @@ -166,7 +167,7 @@ func getServerModules(bus bus.Bus, ds storage.DocumentStore, bs storage.BlobStor net.ModuleFactory(bus), module.RPCModuleFactory(bus), module.StoreModuleFactory(ds), - module.BlobModuleFactory(bus, bs), + blob.ModuleFactory(bus, bs), module.Extends( auth.ModuleFactory( auth.WithJWT(dummyKeySet), diff --git a/doc/apps/server-api/blob.md b/doc/apps/server-api/blob.md index 341d5f8..cb0eb7c 100644 --- a/doc/apps/server-api/blob.md +++ b/doc/apps/server-api/blob.md @@ -38,11 +38,15 @@ function onBlobDownload(ctx, bucketName, blobId) { > `TODO` -### `blob.writeBlob(ctx: Context, bucketName: string, blobId: string)` +### `blob.getBlobInfo(ctx: Context, bucketName: string, blobId: string): BlobInfo` > `TODO` -### `blob.readBlob(ctx: Context, bucketName: string, blobId: string)` +### `blob.writeBlob(ctx: Context, bucketName: string, blobId: string, data: any)` + +> `TODO` + +### `blob.readBlob(ctx: Context, bucketName: string, blobId: string): ArrayBuffer` > `TODO` @@ -58,7 +62,7 @@ function onBlobDownload(ctx, bucketName, blobId) { > `TODO` -### `blob.getBlobInfo(ctx: Context, bucketName: string, blobId: string): BlobInfo` +### `blob.getBucketSize(ctx: Context, bucketName: string): number` > `TODO` @@ -70,4 +74,16 @@ Voir la documentation de l'objet [`Context`](./context.md#Context). ### `BlobInfo` -### `Metadata` \ No newline at end of file +```typescript +interface BlobInfo { + id: string // Identifiant du blob + bucket: string // Nom du bucket contenant le blob + size: number // Taille du blob + modTime: number // Timestamp Unix de dernière modification du blob + contentType: string // Type MIME du contenu du blob +} +``` + +### `Metadata` + +L'objet `Metadata` est un objet clé/valeur arbitraire transmis avec la requête de téléversement. Voir la méthode [`Edge.upload(blob, metadata)`](../client-api/README.md#edge-upload-blob-blob-metadata-object-promise) du SDK client. \ No newline at end of file diff --git a/pkg/http/blob.go b/pkg/http/blob.go index 5e5bb7b..d58c5bd 100644 --- a/pkg/http/blob.go +++ b/pkg/http/blob.go @@ -11,6 +11,7 @@ import ( "forge.cadoles.com/arcad/edge/pkg/bus" "forge.cadoles.com/arcad/edge/pkg/module" + "forge.cadoles.com/arcad/edge/pkg/module/blob" "forge.cadoles.com/arcad/edge/pkg/storage" "github.com/go-chi/chi/v5" "github.com/pkg/errors" @@ -68,7 +69,7 @@ func (h *Handler) handleAppUpload(w http.ResponseWriter, r *http.Request) { ContextKeyOriginRequest: r, }) - requestMsg := module.NewMessageUploadRequest(ctx, fileHeader, metadata) + requestMsg := blob.NewMessageUploadRequest(ctx, fileHeader, metadata) reply, err := h.bus.Request(ctx, requestMsg) if err != nil { @@ -80,7 +81,7 @@ func (h *Handler) handleAppUpload(w http.ResponseWriter, r *http.Request) { logger.Debug(ctx, "upload reply", logger.F("reply", reply)) - responseMsg, ok := reply.(*module.MessageUploadResponse) + responseMsg, ok := reply.(*blob.MessageUploadResponse) if !ok { logger.Error( ctx, "unexpected upload response message", @@ -120,7 +121,7 @@ func (h *Handler) handleAppDownload(w http.ResponseWriter, r *http.Request) { ContextKeyOriginRequest: r, }) - requestMsg := module.NewMessageDownloadRequest(ctx, bucket, storage.BlobID(blobID)) + requestMsg := blob.NewMessageDownloadRequest(ctx, bucket, storage.BlobID(blobID)) reply, err := h.bus.Request(ctx, requestMsg) if err != nil { @@ -130,7 +131,7 @@ func (h *Handler) handleAppDownload(w http.ResponseWriter, r *http.Request) { return } - replyMsg, ok := reply.(*module.MessageDownloadResponse) + replyMsg, ok := reply.(*blob.MessageDownloadResponse) if !ok { logger.Error( ctx, "unexpected download response message", diff --git a/pkg/module/blob.go b/pkg/module/blob.go deleted file mode 100644 index b6264f6..0000000 --- a/pkg/module/blob.go +++ /dev/null @@ -1,282 +0,0 @@ -package module - -import ( - "context" - "io" - "mime/multipart" - - "forge.cadoles.com/arcad/edge/pkg/app" - "forge.cadoles.com/arcad/edge/pkg/bus" - "forge.cadoles.com/arcad/edge/pkg/storage" - "github.com/dop251/goja" - "github.com/pkg/errors" - "gitlab.com/wpetit/goweb/logger" -) - -const ( - DefaultBlobBucket string = "default" -) - -type BlobModule struct { - server *app.Server - bus bus.Bus - store storage.BlobStore -} - -func (m *BlobModule) Name() string { - return "blob" -} - -func (m *BlobModule) Export(export *goja.Object) { -} - -func (m *BlobModule) handleMessages() { - ctx := context.Background() - - go func() { - err := m.bus.Reply(ctx, MessageNamespaceUploadRequest, func(msg bus.Message) (bus.Message, error) { - uploadRequest, ok := msg.(*MessageUploadRequest) - if !ok { - return nil, errors.Wrapf(bus.ErrUnexpectedMessage, "expected message upload request, got '%T'", msg) - } - - res, err := m.handleUploadRequest(uploadRequest) - if err != nil { - logger.Error(ctx, "could not handle upload request", logger.E(errors.WithStack(err))) - - return nil, errors.WithStack(err) - } - - logger.Debug(ctx, "upload request response", logger.F("response", res)) - - return res, nil - }) - if err != nil { - panic(errors.WithStack(err)) - } - }() - - err := m.bus.Reply(ctx, MessageNamespaceDownloadRequest, func(msg bus.Message) (bus.Message, error) { - downloadRequest, ok := msg.(*MessageDownloadRequest) - if !ok { - return nil, errors.Wrapf(bus.ErrUnexpectedMessage, "expected message download request, got '%T'", msg) - } - - res, err := m.handleDownloadRequest(downloadRequest) - if err != nil { - logger.Error(ctx, "could not handle download request", logger.E(errors.WithStack(err))) - - return nil, errors.WithStack(err) - } - - return res, nil - }) - if err != nil { - panic(errors.WithStack(err)) - } -} - -func (m *BlobModule) handleUploadRequest(req *MessageUploadRequest) (*MessageUploadResponse, error) { - blobID := storage.NewBlobID() - res := NewMessageUploadResponse(req.RequestID) - - ctx := logger.With(req.Context, logger.F("blobID", blobID)) - - blobInfo := map[string]interface{}{ - "size": req.FileHeader.Size, - "filename": req.FileHeader.Filename, - "contentType": req.FileHeader.Header.Get("Content-Type"), - } - - rawResult, err := m.server.ExecFuncByName(ctx, "onBlobUpload", ctx, blobID, blobInfo, req.Metadata) - if err != nil { - if errors.Is(err, app.ErrFuncDoesNotExist) { - res.Allow = false - - return res, nil - } - - return nil, errors.WithStack(err) - } - - result, ok := rawResult.Export().(map[string]interface{}) - if !ok { - return nil, errors.Errorf( - "unexpected onBlobUpload result: expected 'map[string]interface{}', got '%T'", - rawResult.Export(), - ) - } - - var allow bool - - rawAllow, exists := result["allow"] - if !exists { - allow = false - } else { - allow, ok = rawAllow.(bool) - if !ok { - return nil, errors.Errorf("invalid 'allow' result property: got type '%T', expected type '%T'", rawAllow, false) - } - } - - res.Allow = allow - - if res.Allow { - bucket := DefaultBlobBucket - - rawBucket, exists := result["bucket"] - if exists { - bucket, ok = rawBucket.(string) - if !ok { - return nil, errors.Errorf("invalid 'bucket' result property: got type '%T', expected type '%T'", bucket, "") - } - } - - if err := m.saveBlob(ctx, bucket, blobID, *req.FileHeader); err != nil { - return nil, errors.WithStack(err) - } - - res.Bucket = bucket - res.BlobID = blobID - } - - return res, nil -} - -func (m *BlobModule) saveBlob(ctx context.Context, bucketName string, blobID storage.BlobID, fileHeader multipart.FileHeader) error { - file, err := fileHeader.Open() - if err != nil { - return errors.WithStack(err) - } - - defer func() { - if err := file.Close(); err != nil { - logger.Error(ctx, "could not close file", logger.E(errors.WithStack(err))) - } - }() - - bucket, err := m.store.OpenBucket(ctx, bucketName) - if err != nil { - return errors.WithStack(err) - } - - defer func() { - if err := bucket.Close(); err != nil { - logger.Error(ctx, "could not close bucket", logger.E(errors.WithStack(err))) - } - }() - - writer, err := bucket.NewWriter(ctx, blobID) - if err != nil { - return errors.WithStack(err) - } - - defer func() { - if err := file.Close(); err != nil { - logger.Error(ctx, "could not close file", logger.E(errors.WithStack(err))) - } - }() - - defer func() { - if err := writer.Close(); err != nil { - logger.Error(ctx, "could not close writer", logger.E(errors.WithStack(err))) - } - }() - - if _, err := io.Copy(writer, file); err != nil { - return errors.WithStack(err) - } - - return nil -} - -func (m *BlobModule) handleDownloadRequest(req *MessageDownloadRequest) (*MessageDownloadResponse, error) { - res := NewMessageDownloadResponse(req.RequestID) - - rawResult, err := m.server.ExecFuncByName(req.Context, "onBlobDownload", req.Context, req.Bucket, req.BlobID) - if err != nil { - if errors.Is(err, app.ErrFuncDoesNotExist) { - res.Allow = false - - return res, nil - } - - return nil, errors.WithStack(err) - } - - result, ok := rawResult.Export().(map[string]interface{}) - if !ok { - return nil, errors.Errorf( - "unexpected onBlobDownload result: expected 'map[string]interface{}', got '%T'", - rawResult.Export(), - ) - } - - var allow bool - - rawAllow, exists := result["allow"] - if !exists { - allow = false - } else { - allow, ok = rawAllow.(bool) - if !ok { - return nil, errors.Errorf("invalid 'allow' result property: got type '%T', expected type '%T'", rawAllow, false) - } - } - - res.Allow = allow - - reader, info, err := m.openBlob(req.Context, req.Bucket, req.BlobID) - if err != nil && !errors.Is(err, storage.ErrBlobNotFound) { - return nil, errors.WithStack(err) - } - - if reader != nil { - res.Blob = reader - } - - if info != nil { - res.BlobInfo = info - } - - return res, nil -} - -func (m *BlobModule) openBlob(ctx context.Context, bucketName string, blobID storage.BlobID) (io.ReadSeekCloser, storage.BlobInfo, error) { - bucket, err := m.store.OpenBucket(ctx, bucketName) - if err != nil { - return nil, nil, errors.WithStack(err) - } - - defer func() { - if err := bucket.Close(); err != nil { - logger.Error(ctx, "could not close bucket", logger.E(errors.WithStack(err)), logger.F("bucket", bucket)) - } - }() - - info, err := bucket.Get(ctx, blobID) - if err != nil { - return nil, nil, errors.WithStack(err) - } - - reader, err := bucket.NewReader(ctx, blobID) - if err != nil { - return nil, nil, errors.WithStack(err) - } - - return reader, info, nil -} - -func BlobModuleFactory(bus bus.Bus, store storage.BlobStore) app.ServerModuleFactory { - return func(server *app.Server) app.ServerModule { - mod := &BlobModule{ - store: store, - bus: bus, - server: server, - } - - go mod.handleMessages() - - return mod - } -} diff --git a/pkg/module/blob/blob_info.go b/pkg/module/blob/blob_info.go new file mode 100644 index 0000000..0974d1f --- /dev/null +++ b/pkg/module/blob/blob_info.go @@ -0,0 +1,21 @@ +package blob + +import "forge.cadoles.com/arcad/edge/pkg/storage" + +type blobInfo struct { + ID storage.BlobID `goja:"id"` + Bucket string `goja:"bucket"` + ModTime int64 `goja:"modTime"` + Size int64 `goja:"size"` + ContentType string `goja:"contentType"` +} + +func toGojaBlobInfo(blob storage.BlobInfo) blobInfo { + return blobInfo{ + ID: blob.ID(), + Bucket: blob.Bucket(), + ModTime: blob.ModTime().Unix(), + Size: blob.Size(), + ContentType: blob.ContentType(), + } +} diff --git a/pkg/module/blob_message.go b/pkg/module/blob/blob_message.go similarity index 99% rename from pkg/module/blob_message.go rename to pkg/module/blob/blob_message.go index d0db084..d355a90 100644 --- a/pkg/module/blob_message.go +++ b/pkg/module/blob/blob_message.go @@ -1,4 +1,4 @@ -package module +package blob import ( "context" diff --git a/pkg/module/blob/module.go b/pkg/module/blob/module.go new file mode 100644 index 0000000..a9c4b23 --- /dev/null +++ b/pkg/module/blob/module.go @@ -0,0 +1,499 @@ +package blob + +import ( + "context" + "fmt" + "io" + "mime/multipart" + "os" + "sort" + + "forge.cadoles.com/arcad/edge/pkg/app" + "forge.cadoles.com/arcad/edge/pkg/bus" + "forge.cadoles.com/arcad/edge/pkg/module/util" + "forge.cadoles.com/arcad/edge/pkg/storage" + "github.com/dop251/goja" + "github.com/pkg/errors" + "gitlab.com/wpetit/goweb/logger" +) + +const ( + DefaultBlobBucket string = "default" +) + +type Module struct { + server *app.Server + bus bus.Bus + store storage.BlobStore +} + +func (m *Module) Name() string { + return "blob" +} + +func (m *Module) Export(export *goja.Object) { + funcs := map[string]any{ + "listBuckets": m.listBuckets, + "deleteBucket": m.deleteBucket, + "getBucketSize": m.getBucketSize, + "listBlobs": m.listBlobs, + "getBlobInfo": m.getBlobInfo, + "readBlob": m.readBlob, + "writeBlob": m.writeBlob, + "deleteBlob": m.deleteBlob, + } + + for name, fn := range funcs { + if err := export.Set(name, fn); err != nil { + panic(errors.Wrapf(err, "could not set '%s' function", name)) + } + } + + if err := export.Set("DEFAULT_BUCKET", DefaultBlobBucket); err != nil { + panic(errors.Wrap(err, "could not set 'DEFAULT_BUCKET' property")) + } +} + +func (m *Module) listBuckets(call goja.FunctionCall, rt *goja.Runtime) goja.Value { + ctx := util.AssertContext(call.Argument(0), rt) + + buckets, err := m.store.ListBuckets(ctx) + if err != nil { + panic(rt.ToValue(errors.WithStack(err))) + } + + defaultBucketIndex := sort.SearchStrings(buckets, DefaultBlobBucket) + if defaultBucketIndex == 0 { + buckets = append(buckets, DefaultBlobBucket) + } else { + buckets[defaultBucketIndex] = DefaultBlobBucket + } + + return rt.ToValue(buckets) +} + +func (m *Module) writeBlob(call goja.FunctionCall, rt *goja.Runtime) goja.Value { + ctx := util.AssertContext(call.Argument(0), rt) + bucketName := util.AssertString(call.Argument(1), rt) + blobID := assertBlobID(call.Argument(2), rt) + rawData := call.Argument(3).Export() + + var data []byte + switch typ := rawData.(type) { + case []byte: + data = typ + case string: + data = []byte(typ) + default: + data = []byte(fmt.Sprintf("%v", typ)) + } + + bucket, err := m.store.OpenBucket(ctx, bucketName) + if err != nil { + panic(rt.ToValue(errors.WithStack(err))) + } + + defer func() { + if err := bucket.Close(); err != nil { + logger.Error(ctx, "could not close bucket", logger.E(errors.WithStack(err))) + } + }() + + writer, err := bucket.NewWriter(ctx, blobID) + if err != nil { + panic(rt.ToValue(errors.WithStack(err))) + } + + defer func() { + if err := writer.Close(); err != nil && !errors.Is(err, os.ErrClosed) { + logger.Error(ctx, "could not close blob writer", logger.E(errors.WithStack(err))) + } + }() + + if _, err := writer.Write(data); err != nil { + panic(rt.ToValue(errors.WithStack(err))) + } + + return nil +} + +func (m *Module) getBlobInfo(call goja.FunctionCall, rt *goja.Runtime) goja.Value { + ctx := util.AssertContext(call.Argument(0), rt) + bucketName := util.AssertString(call.Argument(1), rt) + blobID := assertBlobID(call.Argument(2), rt) + + bucket, err := m.store.OpenBucket(ctx, bucketName) + if err != nil { + panic(rt.ToValue(errors.WithStack(err))) + } + + defer func() { + if err := bucket.Close(); err != nil { + logger.Error(ctx, "could not close bucket", logger.E(errors.WithStack(err))) + } + }() + + blobInfo, err := bucket.Get(ctx, blobID) + if err != nil { + panic(rt.ToValue(errors.WithStack(err))) + } + + return rt.ToValue(toGojaBlobInfo(blobInfo)) +} + +func (m *Module) readBlob(call goja.FunctionCall, rt *goja.Runtime) goja.Value { + ctx := util.AssertContext(call.Argument(0), rt) + bucketName := util.AssertString(call.Argument(1), rt) + blobID := assertBlobID(call.Argument(2), rt) + + reader, _, err := m.openBlob(ctx, bucketName, blobID) + if err != nil { + panic(rt.ToValue(errors.WithStack(err))) + } + + defer func() { + if err := reader.Close(); err != nil && !errors.Is(err, os.ErrClosed) { + logger.Error(ctx, "could not close blob reader", logger.E(errors.WithStack(err))) + } + }() + + data, err := io.ReadAll(reader) + if err != nil { + panic(rt.ToValue(errors.WithStack(err))) + } + + return rt.ToValue(rt.NewArrayBuffer(data)) +} + +func (m *Module) deleteBlob(call goja.FunctionCall, rt *goja.Runtime) goja.Value { + ctx := util.AssertContext(call.Argument(0), rt) + bucketName := util.AssertString(call.Argument(1), rt) + blobID := assertBlobID(call.Argument(2), rt) + + bucket, err := m.store.OpenBucket(ctx, bucketName) + if err != nil { + panic(rt.ToValue(errors.WithStack(err))) + } + + if err := bucket.Delete(ctx, blobID); err != nil { + panic(rt.ToValue(errors.WithStack(err))) + } + + return nil +} + +func (m *Module) listBlobs(call goja.FunctionCall, rt *goja.Runtime) goja.Value { + ctx := util.AssertContext(call.Argument(0), rt) + bucketName := util.AssertString(call.Argument(1), rt) + + bucket, err := m.store.OpenBucket(ctx, bucketName) + if err != nil { + panic(rt.ToValue(errors.WithStack(err))) + } + + blobInfos, err := bucket.List(ctx) + if err != nil { + panic(rt.ToValue(errors.WithStack(err))) + } + + gojaBlobInfos := make([]blobInfo, len(blobInfos)) + + for i, b := range blobInfos { + gojaBlobInfos[i] = toGojaBlobInfo(b) + } + + return rt.ToValue(gojaBlobInfos) +} + +func (m *Module) deleteBucket(call goja.FunctionCall, rt *goja.Runtime) goja.Value { + ctx := util.AssertContext(call.Argument(0), rt) + bucketName := util.AssertString(call.Argument(1), rt) + + if err := m.store.DeleteBucket(ctx, bucketName); err != nil { + panic(rt.ToValue(errors.WithStack(err))) + } + + return nil +} + +func (m *Module) getBucketSize(call goja.FunctionCall, rt *goja.Runtime) goja.Value { + ctx := util.AssertContext(call.Argument(0), rt) + bucketName := util.AssertString(call.Argument(1), rt) + + bucket, err := m.store.OpenBucket(ctx, bucketName) + if err != nil { + panic(rt.ToValue(errors.WithStack(err))) + } + + size, err := bucket.Size(ctx) + if err != nil { + panic(rt.ToValue(errors.WithStack(err))) + } + + return rt.ToValue(size) +} + +func (m *Module) handleMessages() { + ctx := context.Background() + + go func() { + err := m.bus.Reply(ctx, MessageNamespaceUploadRequest, func(msg bus.Message) (bus.Message, error) { + uploadRequest, ok := msg.(*MessageUploadRequest) + if !ok { + return nil, errors.Wrapf(bus.ErrUnexpectedMessage, "expected message upload request, got '%T'", msg) + } + + res, err := m.handleUploadRequest(uploadRequest) + if err != nil { + logger.Error(ctx, "could not handle upload request", logger.E(errors.WithStack(err))) + + return nil, errors.WithStack(err) + } + + logger.Debug(ctx, "upload request response", logger.F("response", res)) + + return res, nil + }) + if err != nil { + panic(errors.WithStack(err)) + } + }() + + err := m.bus.Reply(ctx, MessageNamespaceDownloadRequest, func(msg bus.Message) (bus.Message, error) { + downloadRequest, ok := msg.(*MessageDownloadRequest) + if !ok { + return nil, errors.Wrapf(bus.ErrUnexpectedMessage, "expected message download request, got '%T'", msg) + } + + res, err := m.handleDownloadRequest(downloadRequest) + if err != nil { + logger.Error(ctx, "could not handle download request", logger.E(errors.WithStack(err))) + + return nil, errors.WithStack(err) + } + + return res, nil + }) + if err != nil { + panic(errors.WithStack(err)) + } +} + +func (m *Module) handleUploadRequest(req *MessageUploadRequest) (*MessageUploadResponse, error) { + blobID := storage.NewBlobID() + res := NewMessageUploadResponse(req.RequestID) + + ctx := logger.With(req.Context, logger.F("blobID", blobID)) + + blobInfo := map[string]interface{}{ + "size": req.FileHeader.Size, + "filename": req.FileHeader.Filename, + "contentType": req.FileHeader.Header.Get("Content-Type"), + } + + rawResult, err := m.server.ExecFuncByName(ctx, "onBlobUpload", ctx, blobID, blobInfo, req.Metadata) + if err != nil { + if errors.Is(err, app.ErrFuncDoesNotExist) { + res.Allow = false + + return res, nil + } + + return nil, errors.WithStack(err) + } + + result, ok := rawResult.Export().(map[string]interface{}) + if !ok { + return nil, errors.Errorf( + "unexpected onBlobUpload result: expected 'map[string]interface{}', got '%T'", + rawResult.Export(), + ) + } + + var allow bool + + rawAllow, exists := result["allow"] + if !exists { + allow = false + } else { + allow, ok = rawAllow.(bool) + if !ok { + return nil, errors.Errorf("invalid 'allow' result property: got type '%T', expected type '%T'", rawAllow, false) + } + } + + res.Allow = allow + + if res.Allow { + bucket := DefaultBlobBucket + + rawBucket, exists := result["bucket"] + if exists { + bucket, ok = rawBucket.(string) + if !ok { + return nil, errors.Errorf("invalid 'bucket' result property: got type '%T', expected type '%T'", bucket, "") + } + } + + if err := m.saveBlob(ctx, bucket, blobID, *req.FileHeader); err != nil { + return nil, errors.WithStack(err) + } + + res.Bucket = bucket + res.BlobID = blobID + } + + return res, nil +} + +func (m *Module) saveBlob(ctx context.Context, bucketName string, blobID storage.BlobID, fileHeader multipart.FileHeader) error { + file, err := fileHeader.Open() + if err != nil { + return errors.WithStack(err) + } + + defer func() { + if err := file.Close(); err != nil { + logger.Error(ctx, "could not close file", logger.E(errors.WithStack(err))) + } + }() + + bucket, err := m.store.OpenBucket(ctx, bucketName) + if err != nil { + return errors.WithStack(err) + } + + defer func() { + if err := bucket.Close(); err != nil { + logger.Error(ctx, "could not close bucket", logger.E(errors.WithStack(err))) + } + }() + + writer, err := bucket.NewWriter(ctx, blobID) + if err != nil { + return errors.WithStack(err) + } + + defer func() { + if err := file.Close(); err != nil { + logger.Error(ctx, "could not close file", logger.E(errors.WithStack(err))) + } + }() + + defer func() { + if err := writer.Close(); err != nil { + logger.Error(ctx, "could not close writer", logger.E(errors.WithStack(err))) + } + }() + + if _, err := io.Copy(writer, file); err != nil { + return errors.WithStack(err) + } + + return nil +} + +func (m *Module) handleDownloadRequest(req *MessageDownloadRequest) (*MessageDownloadResponse, error) { + res := NewMessageDownloadResponse(req.RequestID) + + rawResult, err := m.server.ExecFuncByName(req.Context, "onBlobDownload", req.Context, req.Bucket, req.BlobID) + if err != nil { + if errors.Is(err, app.ErrFuncDoesNotExist) { + res.Allow = false + + return res, nil + } + + return nil, errors.WithStack(err) + } + + result, ok := rawResult.Export().(map[string]interface{}) + if !ok { + return nil, errors.Errorf( + "unexpected onBlobDownload result: expected 'map[string]interface{}', got '%T'", + rawResult.Export(), + ) + } + + var allow bool + + rawAllow, exists := result["allow"] + if !exists { + allow = false + } else { + allow, ok = rawAllow.(bool) + if !ok { + return nil, errors.Errorf("invalid 'allow' result property: got type '%T', expected type '%T'", rawAllow, false) + } + } + + res.Allow = allow + + reader, info, err := m.openBlob(req.Context, req.Bucket, req.BlobID) + if err != nil && !errors.Is(err, storage.ErrBlobNotFound) { + return nil, errors.WithStack(err) + } + + if reader != nil { + res.Blob = reader + } + + if info != nil { + res.BlobInfo = info + } + + return res, nil +} + +func (m *Module) openBlob(ctx context.Context, bucketName string, blobID storage.BlobID) (io.ReadSeekCloser, storage.BlobInfo, error) { + bucket, err := m.store.OpenBucket(ctx, bucketName) + if err != nil { + return nil, nil, errors.WithStack(err) + } + + defer func() { + if err := bucket.Close(); err != nil { + logger.Error(ctx, "could not close bucket", logger.E(errors.WithStack(err)), logger.F("bucket", bucket)) + } + }() + + info, err := bucket.Get(ctx, blobID) + if err != nil { + return nil, nil, errors.WithStack(err) + } + + reader, err := bucket.NewReader(ctx, blobID) + if err != nil { + return nil, nil, errors.WithStack(err) + } + + return reader, info, nil +} + +func ModuleFactory(bus bus.Bus, store storage.BlobStore) app.ServerModuleFactory { + return func(server *app.Server) app.ServerModule { + mod := &Module{ + store: store, + bus: bus, + server: server, + } + + go mod.handleMessages() + + return mod + } +} + +func assertBlobID(value goja.Value, rt *goja.Runtime) storage.BlobID { + blobID, ok := value.Export().(storage.BlobID) + if !ok { + rawBlobID, ok := value.Export().(string) + if !ok { + panic(rt.NewTypeError(fmt.Sprintf("blob id must be a blob or a string, got '%T'", value.Export()))) + } + + blobID = storage.BlobID(rawBlobID) + } + + return blobID +} diff --git a/pkg/module/blob/module_test.go b/pkg/module/blob/module_test.go new file mode 100644 index 0000000..b979a60 --- /dev/null +++ b/pkg/module/blob/module_test.go @@ -0,0 +1,44 @@ +package blob + +import ( + "io/ioutil" + "testing" + + "cdr.dev/slog" + "forge.cadoles.com/arcad/edge/pkg/app" + "forge.cadoles.com/arcad/edge/pkg/bus/memory" + "forge.cadoles.com/arcad/edge/pkg/module" + "forge.cadoles.com/arcad/edge/pkg/storage/sqlite" + "github.com/pkg/errors" + "gitlab.com/wpetit/goweb/logger" +) + +func TestBlobModule(t *testing.T) { + t.Parallel() + + logger.SetLevel(slog.LevelDebug) + + bus := memory.NewBus() + store := sqlite.NewBlobStore(":memory:") + + server := app.NewServer( + module.ContextModuleFactory(), + module.ConsoleModuleFactory(), + ModuleFactory(bus, store), + ) + + data, err := ioutil.ReadFile("testdata/blob.js") + if err != nil { + t.Fatal(err) + } + + if err := server.Load("testdata/blob.js", string(data)); err != nil { + t.Fatal(err) + } + + defer server.Stop() + + if err := server.Start(); err != nil { + t.Fatalf("%+v", errors.WithStack(err)) + } +} diff --git a/pkg/module/blob/testdata/blob.js b/pkg/module/blob/testdata/blob.js new file mode 100644 index 0000000..84bfcf8 --- /dev/null +++ b/pkg/module/blob/testdata/blob.js @@ -0,0 +1,79 @@ + +var ctx = context.new(); +var buckets = blob.listBuckets(ctx); + +if (!buckets || buckets.length === 0) { + throw new Error("buckets should not be empty"); +} + +var size = blob.getBucketSize(ctx, blob.DEFAULT_BUCKET); + +if (size !== 0) { + throw new Error("bucket size: expected '0', got '"+size+"'"); +} + +var newBucket = "mybucket" +var blobId = "foo" +var data = (new Date()).toString(); + +blob.writeBlob(ctx, newBucket, blobId, data) + +buckets = blob.listBuckets(ctx); + +if (buckets.length !== 2) { + throw new Error("buckets.length: expected '2', got '"+buckets.length+"'"); +} + +size = blob.getBucketSize(ctx, newBucket); + +if (size !== data.length) { + throw new Error("bucket size: expected '"+data.length+"', got '"+size+"'"); +} + +var blobInfos = blob.listBlobs(ctx, newBucket); + +if (blobInfos.length !== 1) { + throw new Error("blobInfos.length: expected '1', got '"+blobInfos.length+"'"); +} + +if (blobInfos[0].id != blobId) { + throw new Error("blobInfos[0].id: expected '"+blobId+"', got '"+blobInfos[0].id+"'"); +} + +if (blobInfos[0].contentType != "text/plain; charset=utf-8") { + throw new Error("blobInfos[0].contentType: expected 'text/plain; charset=utf-8', got '"+blobInfos[0].contentType+"'"); +} + +if (blobInfos[0].size != data.length) { + throw new Error("blobInfos[0].size: expected '"+data.length+"', got '"+blobInfos[0].size+"'"); +} + +var readData = blob.readBlob(ctx, newBucket, blobId) + +if (!readData) { + throw new Error("readData should not be nil"); +} + +var buckets = blob.listBuckets(ctx); + +if (!buckets || buckets.length !== 2) { + throw new Error("buckets.length should be 2"); +} + +blob.deleteBlob(ctx, newBucket, blobId) + +blobInfos = blob.listBlobs(ctx, newBucket); + +console.log(blobInfos); + +if (blobInfos.length !== 0) { + throw new Error("blobInfos.length: expected '0', got '"+blobInfos.length+"'"); +} + +blob.deleteBucket(ctx, newBucket) + +buckets = blob.listBuckets(ctx); + +if (buckets.length !== 1) { + throw new Error("buckets.length: expected '1', got '"+buckets.length+"'"); +} \ No newline at end of file diff --git a/pkg/module/util/assert.go b/pkg/module/util/assert.go index a7f6fd9..7990534 100644 --- a/pkg/module/util/assert.go +++ b/pkg/module/util/assert.go @@ -12,7 +12,7 @@ func AssertType[T any](v goja.Value, rt *goja.Runtime) T { return c } - panic(rt.ToValue(errors.Errorf("expected value to be a '%T', got '%T'", *new(T), v.Export()))) + panic(rt.ToValue(errors.Errorf("expected value to be a '%T', got '%T'", new(T), v.Export()))) } func AssertContext(v goja.Value, r *goja.Runtime) context.Context { diff --git a/pkg/storage/sqlite/blob_bucket.go b/pkg/storage/sqlite/blob_bucket.go index 2bbbf7d..982325a 100644 --- a/pkg/storage/sqlite/blob_bucket.go +++ b/pkg/storage/sqlite/blob_bucket.go @@ -68,8 +68,11 @@ func (b *BlobBucket) Close() error { func (b *BlobBucket) Delete(ctx context.Context, id storage.BlobID) error { err := b.withTx(ctx, func(tx *sql.Tx) error { query := `DELETE FROM blobs WHERE bucket = $1 AND id = $2` + args := []any{b.name, id} - if _, err := tx.ExecContext(ctx, query, b.name, id); err != nil { + logger.Debug(ctx, "executing query", logger.F("query", query), logger.F("args", args)) + + if _, err := tx.ExecContext(ctx, query, args...); err != nil { return errors.WithStack(err) } @@ -88,7 +91,11 @@ func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobIn err := b.withTx(ctx, func(tx *sql.Tx) error { query := `SELECT content_type, mod_time, size FROM blobs WHERE bucket = $1 AND id = $2` - row := tx.QueryRowContext(ctx, query, b.name, id) + args := []any{b.name, id} + + logger.Debug(ctx, "executing query", logger.F("query", query), logger.F("args", args)) + + row := tx.QueryRowContext(ctx, query, args...) var ( contentType string @@ -127,8 +134,11 @@ func (b *BlobBucket) List(ctx context.Context) ([]storage.BlobInfo, error) { err := b.withTx(ctx, func(tx *sql.Tx) error { query := `SELECT id, content_type, mod_time, size FROM blobs WHERE bucket = $1` + args := []any{b.name} - rows, err := tx.QueryContext(ctx, query, b.name) + logger.Debug(ctx, "executing query", logger.F("query", query), logger.F("args", args)) + + rows, err := tx.QueryContext(ctx, query, args...) if err != nil { return errors.WithStack(err) } @@ -229,7 +239,12 @@ type blobWriterCloser struct { // Write implements io.WriteCloser func (wbc *blobWriterCloser) Write(p []byte) (int, error) { - logger.Debug(context.Background(), "writing data to blob", logger.F("data", p)) + logger.Debug( + context.Background(), "writing data to blob", + logger.F("size", len(p)), + logger.F("blobID", wbc.id), + logger.F("bucket", wbc.bucket), + ) n, err := wbc.buf.Write(p) if err != nil { @@ -266,14 +281,20 @@ func (wbc *blobWriterCloser) Close() error { mime := mimetype.Detect(data) modTime := time.Now().UTC() - _, err := tx.Exec( - query, + args := []any{ wbc.bucket, wbc.id, data, mime.String(), modTime, len(data), + } + + logger.Debug(ctx, "executing query", logger.F("query", query)) + + _, err := tx.Exec( + query, + args..., ) if err != nil { return errors.WithStack(err) diff --git a/pkg/storage/sqlite/blob_store.go b/pkg/storage/sqlite/blob_store.go index 65241e8..4cb2baf 100644 --- a/pkg/storage/sqlite/blob_store.go +++ b/pkg/storage/sqlite/blob_store.go @@ -36,7 +36,7 @@ func (s *BlobStore) ListBuckets(ctx context.Context) ([]string, error) { buckets := make([]string, 0) err := s.withTx(ctx, func(tx *sql.Tx) error { - query := `SELECT DISTINCT name FROM blobs` + query := `SELECT DISTINCT bucket FROM blobs` rows, err := tx.QueryContext(ctx, query) if err != nil { return errors.WithStack(err)