feat(module,blob): implements full api
This commit is contained in:
parent
cf8a3f8ac0
commit
0577762be9
|
@ -15,6 +15,7 @@ import (
|
|||
"forge.cadoles.com/arcad/edge/pkg/module"
|
||||
"forge.cadoles.com/arcad/edge/pkg/module/auth"
|
||||
authHTTP "forge.cadoles.com/arcad/edge/pkg/module/auth/http"
|
||||
"forge.cadoles.com/arcad/edge/pkg/module/blob"
|
||||
"forge.cadoles.com/arcad/edge/pkg/module/cast"
|
||||
"forge.cadoles.com/arcad/edge/pkg/module/net"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
|
@ -166,7 +167,7 @@ func getServerModules(bus bus.Bus, ds storage.DocumentStore, bs storage.BlobStor
|
|||
net.ModuleFactory(bus),
|
||||
module.RPCModuleFactory(bus),
|
||||
module.StoreModuleFactory(ds),
|
||||
module.BlobModuleFactory(bus, bs),
|
||||
blob.ModuleFactory(bus, bs),
|
||||
module.Extends(
|
||||
auth.ModuleFactory(
|
||||
auth.WithJWT(dummyKeySet),
|
||||
|
|
|
@ -38,11 +38,15 @@ function onBlobDownload(ctx, bucketName, blobId) {
|
|||
|
||||
> `TODO`
|
||||
|
||||
### `blob.writeBlob(ctx: Context, bucketName: string, blobId: string)`
|
||||
### `blob.getBlobInfo(ctx: Context, bucketName: string, blobId: string): BlobInfo`
|
||||
|
||||
> `TODO`
|
||||
|
||||
### `blob.readBlob(ctx: Context, bucketName: string, blobId: string)`
|
||||
### `blob.writeBlob(ctx: Context, bucketName: string, blobId: string, data: any)`
|
||||
|
||||
> `TODO`
|
||||
|
||||
### `blob.readBlob(ctx: Context, bucketName: string, blobId: string): ArrayBuffer`
|
||||
|
||||
> `TODO`
|
||||
|
||||
|
@ -58,7 +62,7 @@ function onBlobDownload(ctx, bucketName, blobId) {
|
|||
|
||||
> `TODO`
|
||||
|
||||
### `blob.getBlobInfo(ctx: Context, bucketName: string, blobId: string): BlobInfo`
|
||||
### `blob.getBucketSize(ctx: Context, bucketName: string): number`
|
||||
|
||||
> `TODO`
|
||||
|
||||
|
@ -70,4 +74,16 @@ Voir la documentation de l'objet [`Context`](./context.md#Context).
|
|||
|
||||
### `BlobInfo`
|
||||
|
||||
```typescript
|
||||
interface BlobInfo {
|
||||
id: string // Identifiant du blob
|
||||
bucket: string // Nom du bucket contenant le blob
|
||||
size: number // Taille du blob
|
||||
modTime: number // Timestamp Unix de dernière modification du blob
|
||||
contentType: string // Type MIME du contenu du blob
|
||||
}
|
||||
```
|
||||
|
||||
### `Metadata`
|
||||
|
||||
L'objet `Metadata` est un objet clé/valeur arbitraire transmis avec la requête de téléversement. Voir la méthode [`Edge.upload(blob, metadata)`](../client-api/README.md#edge-upload-blob-blob-metadata-object-promise) du SDK client.
|
|
@ -11,6 +11,7 @@ import (
|
|||
|
||||
"forge.cadoles.com/arcad/edge/pkg/bus"
|
||||
"forge.cadoles.com/arcad/edge/pkg/module"
|
||||
"forge.cadoles.com/arcad/edge/pkg/module/blob"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/pkg/errors"
|
||||
|
@ -68,7 +69,7 @@ func (h *Handler) handleAppUpload(w http.ResponseWriter, r *http.Request) {
|
|||
ContextKeyOriginRequest: r,
|
||||
})
|
||||
|
||||
requestMsg := module.NewMessageUploadRequest(ctx, fileHeader, metadata)
|
||||
requestMsg := blob.NewMessageUploadRequest(ctx, fileHeader, metadata)
|
||||
|
||||
reply, err := h.bus.Request(ctx, requestMsg)
|
||||
if err != nil {
|
||||
|
@ -80,7 +81,7 @@ func (h *Handler) handleAppUpload(w http.ResponseWriter, r *http.Request) {
|
|||
|
||||
logger.Debug(ctx, "upload reply", logger.F("reply", reply))
|
||||
|
||||
responseMsg, ok := reply.(*module.MessageUploadResponse)
|
||||
responseMsg, ok := reply.(*blob.MessageUploadResponse)
|
||||
if !ok {
|
||||
logger.Error(
|
||||
ctx, "unexpected upload response message",
|
||||
|
@ -120,7 +121,7 @@ func (h *Handler) handleAppDownload(w http.ResponseWriter, r *http.Request) {
|
|||
ContextKeyOriginRequest: r,
|
||||
})
|
||||
|
||||
requestMsg := module.NewMessageDownloadRequest(ctx, bucket, storage.BlobID(blobID))
|
||||
requestMsg := blob.NewMessageDownloadRequest(ctx, bucket, storage.BlobID(blobID))
|
||||
|
||||
reply, err := h.bus.Request(ctx, requestMsg)
|
||||
if err != nil {
|
||||
|
@ -130,7 +131,7 @@ func (h *Handler) handleAppDownload(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
replyMsg, ok := reply.(*module.MessageDownloadResponse)
|
||||
replyMsg, ok := reply.(*blob.MessageDownloadResponse)
|
||||
if !ok {
|
||||
logger.Error(
|
||||
ctx, "unexpected download response message",
|
||||
|
|
|
@ -1,282 +0,0 @@
|
|||
package module
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"mime/multipart"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/app"
|
||||
"forge.cadoles.com/arcad/edge/pkg/bus"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"github.com/dop251/goja"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
const (
|
||||
DefaultBlobBucket string = "default"
|
||||
)
|
||||
|
||||
type BlobModule struct {
|
||||
server *app.Server
|
||||
bus bus.Bus
|
||||
store storage.BlobStore
|
||||
}
|
||||
|
||||
func (m *BlobModule) Name() string {
|
||||
return "blob"
|
||||
}
|
||||
|
||||
func (m *BlobModule) Export(export *goja.Object) {
|
||||
}
|
||||
|
||||
func (m *BlobModule) handleMessages() {
|
||||
ctx := context.Background()
|
||||
|
||||
go func() {
|
||||
err := m.bus.Reply(ctx, MessageNamespaceUploadRequest, func(msg bus.Message) (bus.Message, error) {
|
||||
uploadRequest, ok := msg.(*MessageUploadRequest)
|
||||
if !ok {
|
||||
return nil, errors.Wrapf(bus.ErrUnexpectedMessage, "expected message upload request, got '%T'", msg)
|
||||
}
|
||||
|
||||
res, err := m.handleUploadRequest(uploadRequest)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "could not handle upload request", logger.E(errors.WithStack(err)))
|
||||
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
logger.Debug(ctx, "upload request response", logger.F("response", res))
|
||||
|
||||
return res, nil
|
||||
})
|
||||
if err != nil {
|
||||
panic(errors.WithStack(err))
|
||||
}
|
||||
}()
|
||||
|
||||
err := m.bus.Reply(ctx, MessageNamespaceDownloadRequest, func(msg bus.Message) (bus.Message, error) {
|
||||
downloadRequest, ok := msg.(*MessageDownloadRequest)
|
||||
if !ok {
|
||||
return nil, errors.Wrapf(bus.ErrUnexpectedMessage, "expected message download request, got '%T'", msg)
|
||||
}
|
||||
|
||||
res, err := m.handleDownloadRequest(downloadRequest)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "could not handle download request", logger.E(errors.WithStack(err)))
|
||||
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return res, nil
|
||||
})
|
||||
if err != nil {
|
||||
panic(errors.WithStack(err))
|
||||
}
|
||||
}
|
||||
|
||||
func (m *BlobModule) handleUploadRequest(req *MessageUploadRequest) (*MessageUploadResponse, error) {
|
||||
blobID := storage.NewBlobID()
|
||||
res := NewMessageUploadResponse(req.RequestID)
|
||||
|
||||
ctx := logger.With(req.Context, logger.F("blobID", blobID))
|
||||
|
||||
blobInfo := map[string]interface{}{
|
||||
"size": req.FileHeader.Size,
|
||||
"filename": req.FileHeader.Filename,
|
||||
"contentType": req.FileHeader.Header.Get("Content-Type"),
|
||||
}
|
||||
|
||||
rawResult, err := m.server.ExecFuncByName(ctx, "onBlobUpload", ctx, blobID, blobInfo, req.Metadata)
|
||||
if err != nil {
|
||||
if errors.Is(err, app.ErrFuncDoesNotExist) {
|
||||
res.Allow = false
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
result, ok := rawResult.Export().(map[string]interface{})
|
||||
if !ok {
|
||||
return nil, errors.Errorf(
|
||||
"unexpected onBlobUpload result: expected 'map[string]interface{}', got '%T'",
|
||||
rawResult.Export(),
|
||||
)
|
||||
}
|
||||
|
||||
var allow bool
|
||||
|
||||
rawAllow, exists := result["allow"]
|
||||
if !exists {
|
||||
allow = false
|
||||
} else {
|
||||
allow, ok = rawAllow.(bool)
|
||||
if !ok {
|
||||
return nil, errors.Errorf("invalid 'allow' result property: got type '%T', expected type '%T'", rawAllow, false)
|
||||
}
|
||||
}
|
||||
|
||||
res.Allow = allow
|
||||
|
||||
if res.Allow {
|
||||
bucket := DefaultBlobBucket
|
||||
|
||||
rawBucket, exists := result["bucket"]
|
||||
if exists {
|
||||
bucket, ok = rawBucket.(string)
|
||||
if !ok {
|
||||
return nil, errors.Errorf("invalid 'bucket' result property: got type '%T', expected type '%T'", bucket, "")
|
||||
}
|
||||
}
|
||||
|
||||
if err := m.saveBlob(ctx, bucket, blobID, *req.FileHeader); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
res.Bucket = bucket
|
||||
res.BlobID = blobID
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (m *BlobModule) saveBlob(ctx context.Context, bucketName string, blobID storage.BlobID, fileHeader multipart.FileHeader) error {
|
||||
file, err := fileHeader.Open()
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := file.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close file", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
bucket, err := m.store.OpenBucket(ctx, bucketName)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := bucket.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close bucket", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
writer, err := bucket.NewWriter(ctx, blobID)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := file.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close file", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
defer func() {
|
||||
if err := writer.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close writer", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
if _, err := io.Copy(writer, file); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *BlobModule) handleDownloadRequest(req *MessageDownloadRequest) (*MessageDownloadResponse, error) {
|
||||
res := NewMessageDownloadResponse(req.RequestID)
|
||||
|
||||
rawResult, err := m.server.ExecFuncByName(req.Context, "onBlobDownload", req.Context, req.Bucket, req.BlobID)
|
||||
if err != nil {
|
||||
if errors.Is(err, app.ErrFuncDoesNotExist) {
|
||||
res.Allow = false
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
result, ok := rawResult.Export().(map[string]interface{})
|
||||
if !ok {
|
||||
return nil, errors.Errorf(
|
||||
"unexpected onBlobDownload result: expected 'map[string]interface{}', got '%T'",
|
||||
rawResult.Export(),
|
||||
)
|
||||
}
|
||||
|
||||
var allow bool
|
||||
|
||||
rawAllow, exists := result["allow"]
|
||||
if !exists {
|
||||
allow = false
|
||||
} else {
|
||||
allow, ok = rawAllow.(bool)
|
||||
if !ok {
|
||||
return nil, errors.Errorf("invalid 'allow' result property: got type '%T', expected type '%T'", rawAllow, false)
|
||||
}
|
||||
}
|
||||
|
||||
res.Allow = allow
|
||||
|
||||
reader, info, err := m.openBlob(req.Context, req.Bucket, req.BlobID)
|
||||
if err != nil && !errors.Is(err, storage.ErrBlobNotFound) {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
if reader != nil {
|
||||
res.Blob = reader
|
||||
}
|
||||
|
||||
if info != nil {
|
||||
res.BlobInfo = info
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (m *BlobModule) openBlob(ctx context.Context, bucketName string, blobID storage.BlobID) (io.ReadSeekCloser, storage.BlobInfo, error) {
|
||||
bucket, err := m.store.OpenBucket(ctx, bucketName)
|
||||
if err != nil {
|
||||
return nil, nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := bucket.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close bucket", logger.E(errors.WithStack(err)), logger.F("bucket", bucket))
|
||||
}
|
||||
}()
|
||||
|
||||
info, err := bucket.Get(ctx, blobID)
|
||||
if err != nil {
|
||||
return nil, nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
reader, err := bucket.NewReader(ctx, blobID)
|
||||
if err != nil {
|
||||
return nil, nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return reader, info, nil
|
||||
}
|
||||
|
||||
func BlobModuleFactory(bus bus.Bus, store storage.BlobStore) app.ServerModuleFactory {
|
||||
return func(server *app.Server) app.ServerModule {
|
||||
mod := &BlobModule{
|
||||
store: store,
|
||||
bus: bus,
|
||||
server: server,
|
||||
}
|
||||
|
||||
go mod.handleMessages()
|
||||
|
||||
return mod
|
||||
}
|
||||
}
|
|
@ -0,0 +1,21 @@
|
|||
package blob
|
||||
|
||||
import "forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
|
||||
type blobInfo struct {
|
||||
ID storage.BlobID `goja:"id"`
|
||||
Bucket string `goja:"bucket"`
|
||||
ModTime int64 `goja:"modTime"`
|
||||
Size int64 `goja:"size"`
|
||||
ContentType string `goja:"contentType"`
|
||||
}
|
||||
|
||||
func toGojaBlobInfo(blob storage.BlobInfo) blobInfo {
|
||||
return blobInfo{
|
||||
ID: blob.ID(),
|
||||
Bucket: blob.Bucket(),
|
||||
ModTime: blob.ModTime().Unix(),
|
||||
Size: blob.Size(),
|
||||
ContentType: blob.ContentType(),
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package module
|
||||
package blob
|
||||
|
||||
import (
|
||||
"context"
|
|
@ -0,0 +1,499 @@
|
|||
package blob
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"mime/multipart"
|
||||
"os"
|
||||
"sort"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/app"
|
||||
"forge.cadoles.com/arcad/edge/pkg/bus"
|
||||
"forge.cadoles.com/arcad/edge/pkg/module/util"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"github.com/dop251/goja"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
const (
|
||||
DefaultBlobBucket string = "default"
|
||||
)
|
||||
|
||||
type Module struct {
|
||||
server *app.Server
|
||||
bus bus.Bus
|
||||
store storage.BlobStore
|
||||
}
|
||||
|
||||
func (m *Module) Name() string {
|
||||
return "blob"
|
||||
}
|
||||
|
||||
func (m *Module) Export(export *goja.Object) {
|
||||
funcs := map[string]any{
|
||||
"listBuckets": m.listBuckets,
|
||||
"deleteBucket": m.deleteBucket,
|
||||
"getBucketSize": m.getBucketSize,
|
||||
"listBlobs": m.listBlobs,
|
||||
"getBlobInfo": m.getBlobInfo,
|
||||
"readBlob": m.readBlob,
|
||||
"writeBlob": m.writeBlob,
|
||||
"deleteBlob": m.deleteBlob,
|
||||
}
|
||||
|
||||
for name, fn := range funcs {
|
||||
if err := export.Set(name, fn); err != nil {
|
||||
panic(errors.Wrapf(err, "could not set '%s' function", name))
|
||||
}
|
||||
}
|
||||
|
||||
if err := export.Set("DEFAULT_BUCKET", DefaultBlobBucket); err != nil {
|
||||
panic(errors.Wrap(err, "could not set 'DEFAULT_BUCKET' property"))
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Module) listBuckets(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
|
||||
ctx := util.AssertContext(call.Argument(0), rt)
|
||||
|
||||
buckets, err := m.store.ListBuckets(ctx)
|
||||
if err != nil {
|
||||
panic(rt.ToValue(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
defaultBucketIndex := sort.SearchStrings(buckets, DefaultBlobBucket)
|
||||
if defaultBucketIndex == 0 {
|
||||
buckets = append(buckets, DefaultBlobBucket)
|
||||
} else {
|
||||
buckets[defaultBucketIndex] = DefaultBlobBucket
|
||||
}
|
||||
|
||||
return rt.ToValue(buckets)
|
||||
}
|
||||
|
||||
func (m *Module) writeBlob(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
|
||||
ctx := util.AssertContext(call.Argument(0), rt)
|
||||
bucketName := util.AssertString(call.Argument(1), rt)
|
||||
blobID := assertBlobID(call.Argument(2), rt)
|
||||
rawData := call.Argument(3).Export()
|
||||
|
||||
var data []byte
|
||||
switch typ := rawData.(type) {
|
||||
case []byte:
|
||||
data = typ
|
||||
case string:
|
||||
data = []byte(typ)
|
||||
default:
|
||||
data = []byte(fmt.Sprintf("%v", typ))
|
||||
}
|
||||
|
||||
bucket, err := m.store.OpenBucket(ctx, bucketName)
|
||||
if err != nil {
|
||||
panic(rt.ToValue(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := bucket.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close bucket", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
writer, err := bucket.NewWriter(ctx, blobID)
|
||||
if err != nil {
|
||||
panic(rt.ToValue(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := writer.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
|
||||
logger.Error(ctx, "could not close blob writer", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
if _, err := writer.Write(data); err != nil {
|
||||
panic(rt.ToValue(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Module) getBlobInfo(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
|
||||
ctx := util.AssertContext(call.Argument(0), rt)
|
||||
bucketName := util.AssertString(call.Argument(1), rt)
|
||||
blobID := assertBlobID(call.Argument(2), rt)
|
||||
|
||||
bucket, err := m.store.OpenBucket(ctx, bucketName)
|
||||
if err != nil {
|
||||
panic(rt.ToValue(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := bucket.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close bucket", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
blobInfo, err := bucket.Get(ctx, blobID)
|
||||
if err != nil {
|
||||
panic(rt.ToValue(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
return rt.ToValue(toGojaBlobInfo(blobInfo))
|
||||
}
|
||||
|
||||
func (m *Module) readBlob(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
|
||||
ctx := util.AssertContext(call.Argument(0), rt)
|
||||
bucketName := util.AssertString(call.Argument(1), rt)
|
||||
blobID := assertBlobID(call.Argument(2), rt)
|
||||
|
||||
reader, _, err := m.openBlob(ctx, bucketName, blobID)
|
||||
if err != nil {
|
||||
panic(rt.ToValue(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := reader.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
|
||||
logger.Error(ctx, "could not close blob reader", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
data, err := io.ReadAll(reader)
|
||||
if err != nil {
|
||||
panic(rt.ToValue(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
return rt.ToValue(rt.NewArrayBuffer(data))
|
||||
}
|
||||
|
||||
func (m *Module) deleteBlob(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
|
||||
ctx := util.AssertContext(call.Argument(0), rt)
|
||||
bucketName := util.AssertString(call.Argument(1), rt)
|
||||
blobID := assertBlobID(call.Argument(2), rt)
|
||||
|
||||
bucket, err := m.store.OpenBucket(ctx, bucketName)
|
||||
if err != nil {
|
||||
panic(rt.ToValue(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
if err := bucket.Delete(ctx, blobID); err != nil {
|
||||
panic(rt.ToValue(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Module) listBlobs(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
|
||||
ctx := util.AssertContext(call.Argument(0), rt)
|
||||
bucketName := util.AssertString(call.Argument(1), rt)
|
||||
|
||||
bucket, err := m.store.OpenBucket(ctx, bucketName)
|
||||
if err != nil {
|
||||
panic(rt.ToValue(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
blobInfos, err := bucket.List(ctx)
|
||||
if err != nil {
|
||||
panic(rt.ToValue(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
gojaBlobInfos := make([]blobInfo, len(blobInfos))
|
||||
|
||||
for i, b := range blobInfos {
|
||||
gojaBlobInfos[i] = toGojaBlobInfo(b)
|
||||
}
|
||||
|
||||
return rt.ToValue(gojaBlobInfos)
|
||||
}
|
||||
|
||||
func (m *Module) deleteBucket(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
|
||||
ctx := util.AssertContext(call.Argument(0), rt)
|
||||
bucketName := util.AssertString(call.Argument(1), rt)
|
||||
|
||||
if err := m.store.DeleteBucket(ctx, bucketName); err != nil {
|
||||
panic(rt.ToValue(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Module) getBucketSize(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
|
||||
ctx := util.AssertContext(call.Argument(0), rt)
|
||||
bucketName := util.AssertString(call.Argument(1), rt)
|
||||
|
||||
bucket, err := m.store.OpenBucket(ctx, bucketName)
|
||||
if err != nil {
|
||||
panic(rt.ToValue(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
size, err := bucket.Size(ctx)
|
||||
if err != nil {
|
||||
panic(rt.ToValue(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
return rt.ToValue(size)
|
||||
}
|
||||
|
||||
func (m *Module) handleMessages() {
|
||||
ctx := context.Background()
|
||||
|
||||
go func() {
|
||||
err := m.bus.Reply(ctx, MessageNamespaceUploadRequest, func(msg bus.Message) (bus.Message, error) {
|
||||
uploadRequest, ok := msg.(*MessageUploadRequest)
|
||||
if !ok {
|
||||
return nil, errors.Wrapf(bus.ErrUnexpectedMessage, "expected message upload request, got '%T'", msg)
|
||||
}
|
||||
|
||||
res, err := m.handleUploadRequest(uploadRequest)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "could not handle upload request", logger.E(errors.WithStack(err)))
|
||||
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
logger.Debug(ctx, "upload request response", logger.F("response", res))
|
||||
|
||||
return res, nil
|
||||
})
|
||||
if err != nil {
|
||||
panic(errors.WithStack(err))
|
||||
}
|
||||
}()
|
||||
|
||||
err := m.bus.Reply(ctx, MessageNamespaceDownloadRequest, func(msg bus.Message) (bus.Message, error) {
|
||||
downloadRequest, ok := msg.(*MessageDownloadRequest)
|
||||
if !ok {
|
||||
return nil, errors.Wrapf(bus.ErrUnexpectedMessage, "expected message download request, got '%T'", msg)
|
||||
}
|
||||
|
||||
res, err := m.handleDownloadRequest(downloadRequest)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "could not handle download request", logger.E(errors.WithStack(err)))
|
||||
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return res, nil
|
||||
})
|
||||
if err != nil {
|
||||
panic(errors.WithStack(err))
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Module) handleUploadRequest(req *MessageUploadRequest) (*MessageUploadResponse, error) {
|
||||
blobID := storage.NewBlobID()
|
||||
res := NewMessageUploadResponse(req.RequestID)
|
||||
|
||||
ctx := logger.With(req.Context, logger.F("blobID", blobID))
|
||||
|
||||
blobInfo := map[string]interface{}{
|
||||
"size": req.FileHeader.Size,
|
||||
"filename": req.FileHeader.Filename,
|
||||
"contentType": req.FileHeader.Header.Get("Content-Type"),
|
||||
}
|
||||
|
||||
rawResult, err := m.server.ExecFuncByName(ctx, "onBlobUpload", ctx, blobID, blobInfo, req.Metadata)
|
||||
if err != nil {
|
||||
if errors.Is(err, app.ErrFuncDoesNotExist) {
|
||||
res.Allow = false
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
result, ok := rawResult.Export().(map[string]interface{})
|
||||
if !ok {
|
||||
return nil, errors.Errorf(
|
||||
"unexpected onBlobUpload result: expected 'map[string]interface{}', got '%T'",
|
||||
rawResult.Export(),
|
||||
)
|
||||
}
|
||||
|
||||
var allow bool
|
||||
|
||||
rawAllow, exists := result["allow"]
|
||||
if !exists {
|
||||
allow = false
|
||||
} else {
|
||||
allow, ok = rawAllow.(bool)
|
||||
if !ok {
|
||||
return nil, errors.Errorf("invalid 'allow' result property: got type '%T', expected type '%T'", rawAllow, false)
|
||||
}
|
||||
}
|
||||
|
||||
res.Allow = allow
|
||||
|
||||
if res.Allow {
|
||||
bucket := DefaultBlobBucket
|
||||
|
||||
rawBucket, exists := result["bucket"]
|
||||
if exists {
|
||||
bucket, ok = rawBucket.(string)
|
||||
if !ok {
|
||||
return nil, errors.Errorf("invalid 'bucket' result property: got type '%T', expected type '%T'", bucket, "")
|
||||
}
|
||||
}
|
||||
|
||||
if err := m.saveBlob(ctx, bucket, blobID, *req.FileHeader); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
res.Bucket = bucket
|
||||
res.BlobID = blobID
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (m *Module) saveBlob(ctx context.Context, bucketName string, blobID storage.BlobID, fileHeader multipart.FileHeader) error {
|
||||
file, err := fileHeader.Open()
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := file.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close file", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
bucket, err := m.store.OpenBucket(ctx, bucketName)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := bucket.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close bucket", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
writer, err := bucket.NewWriter(ctx, blobID)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := file.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close file", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
defer func() {
|
||||
if err := writer.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close writer", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
if _, err := io.Copy(writer, file); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Module) handleDownloadRequest(req *MessageDownloadRequest) (*MessageDownloadResponse, error) {
|
||||
res := NewMessageDownloadResponse(req.RequestID)
|
||||
|
||||
rawResult, err := m.server.ExecFuncByName(req.Context, "onBlobDownload", req.Context, req.Bucket, req.BlobID)
|
||||
if err != nil {
|
||||
if errors.Is(err, app.ErrFuncDoesNotExist) {
|
||||
res.Allow = false
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
result, ok := rawResult.Export().(map[string]interface{})
|
||||
if !ok {
|
||||
return nil, errors.Errorf(
|
||||
"unexpected onBlobDownload result: expected 'map[string]interface{}', got '%T'",
|
||||
rawResult.Export(),
|
||||
)
|
||||
}
|
||||
|
||||
var allow bool
|
||||
|
||||
rawAllow, exists := result["allow"]
|
||||
if !exists {
|
||||
allow = false
|
||||
} else {
|
||||
allow, ok = rawAllow.(bool)
|
||||
if !ok {
|
||||
return nil, errors.Errorf("invalid 'allow' result property: got type '%T', expected type '%T'", rawAllow, false)
|
||||
}
|
||||
}
|
||||
|
||||
res.Allow = allow
|
||||
|
||||
reader, info, err := m.openBlob(req.Context, req.Bucket, req.BlobID)
|
||||
if err != nil && !errors.Is(err, storage.ErrBlobNotFound) {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
if reader != nil {
|
||||
res.Blob = reader
|
||||
}
|
||||
|
||||
if info != nil {
|
||||
res.BlobInfo = info
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (m *Module) openBlob(ctx context.Context, bucketName string, blobID storage.BlobID) (io.ReadSeekCloser, storage.BlobInfo, error) {
|
||||
bucket, err := m.store.OpenBucket(ctx, bucketName)
|
||||
if err != nil {
|
||||
return nil, nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := bucket.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close bucket", logger.E(errors.WithStack(err)), logger.F("bucket", bucket))
|
||||
}
|
||||
}()
|
||||
|
||||
info, err := bucket.Get(ctx, blobID)
|
||||
if err != nil {
|
||||
return nil, nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
reader, err := bucket.NewReader(ctx, blobID)
|
||||
if err != nil {
|
||||
return nil, nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return reader, info, nil
|
||||
}
|
||||
|
||||
func ModuleFactory(bus bus.Bus, store storage.BlobStore) app.ServerModuleFactory {
|
||||
return func(server *app.Server) app.ServerModule {
|
||||
mod := &Module{
|
||||
store: store,
|
||||
bus: bus,
|
||||
server: server,
|
||||
}
|
||||
|
||||
go mod.handleMessages()
|
||||
|
||||
return mod
|
||||
}
|
||||
}
|
||||
|
||||
func assertBlobID(value goja.Value, rt *goja.Runtime) storage.BlobID {
|
||||
blobID, ok := value.Export().(storage.BlobID)
|
||||
if !ok {
|
||||
rawBlobID, ok := value.Export().(string)
|
||||
if !ok {
|
||||
panic(rt.NewTypeError(fmt.Sprintf("blob id must be a blob or a string, got '%T'", value.Export())))
|
||||
}
|
||||
|
||||
blobID = storage.BlobID(rawBlobID)
|
||||
}
|
||||
|
||||
return blobID
|
||||
}
|
|
@ -0,0 +1,44 @@
|
|||
package blob
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"testing"
|
||||
|
||||
"cdr.dev/slog"
|
||||
"forge.cadoles.com/arcad/edge/pkg/app"
|
||||
"forge.cadoles.com/arcad/edge/pkg/bus/memory"
|
||||
"forge.cadoles.com/arcad/edge/pkg/module"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/sqlite"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
func TestBlobModule(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
logger.SetLevel(slog.LevelDebug)
|
||||
|
||||
bus := memory.NewBus()
|
||||
store := sqlite.NewBlobStore(":memory:")
|
||||
|
||||
server := app.NewServer(
|
||||
module.ContextModuleFactory(),
|
||||
module.ConsoleModuleFactory(),
|
||||
ModuleFactory(bus, store),
|
||||
)
|
||||
|
||||
data, err := ioutil.ReadFile("testdata/blob.js")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := server.Load("testdata/blob.js", string(data)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
defer server.Stop()
|
||||
|
||||
if err := server.Start(); err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
}
|
|
@ -0,0 +1,79 @@
|
|||
|
||||
var ctx = context.new();
|
||||
var buckets = blob.listBuckets(ctx);
|
||||
|
||||
if (!buckets || buckets.length === 0) {
|
||||
throw new Error("buckets should not be empty");
|
||||
}
|
||||
|
||||
var size = blob.getBucketSize(ctx, blob.DEFAULT_BUCKET);
|
||||
|
||||
if (size !== 0) {
|
||||
throw new Error("bucket size: expected '0', got '"+size+"'");
|
||||
}
|
||||
|
||||
var newBucket = "mybucket"
|
||||
var blobId = "foo"
|
||||
var data = (new Date()).toString();
|
||||
|
||||
blob.writeBlob(ctx, newBucket, blobId, data)
|
||||
|
||||
buckets = blob.listBuckets(ctx);
|
||||
|
||||
if (buckets.length !== 2) {
|
||||
throw new Error("buckets.length: expected '2', got '"+buckets.length+"'");
|
||||
}
|
||||
|
||||
size = blob.getBucketSize(ctx, newBucket);
|
||||
|
||||
if (size !== data.length) {
|
||||
throw new Error("bucket size: expected '"+data.length+"', got '"+size+"'");
|
||||
}
|
||||
|
||||
var blobInfos = blob.listBlobs(ctx, newBucket);
|
||||
|
||||
if (blobInfos.length !== 1) {
|
||||
throw new Error("blobInfos.length: expected '1', got '"+blobInfos.length+"'");
|
||||
}
|
||||
|
||||
if (blobInfos[0].id != blobId) {
|
||||
throw new Error("blobInfos[0].id: expected '"+blobId+"', got '"+blobInfos[0].id+"'");
|
||||
}
|
||||
|
||||
if (blobInfos[0].contentType != "text/plain; charset=utf-8") {
|
||||
throw new Error("blobInfos[0].contentType: expected 'text/plain; charset=utf-8', got '"+blobInfos[0].contentType+"'");
|
||||
}
|
||||
|
||||
if (blobInfos[0].size != data.length) {
|
||||
throw new Error("blobInfos[0].size: expected '"+data.length+"', got '"+blobInfos[0].size+"'");
|
||||
}
|
||||
|
||||
var readData = blob.readBlob(ctx, newBucket, blobId)
|
||||
|
||||
if (!readData) {
|
||||
throw new Error("readData should not be nil");
|
||||
}
|
||||
|
||||
var buckets = blob.listBuckets(ctx);
|
||||
|
||||
if (!buckets || buckets.length !== 2) {
|
||||
throw new Error("buckets.length should be 2");
|
||||
}
|
||||
|
||||
blob.deleteBlob(ctx, newBucket, blobId)
|
||||
|
||||
blobInfos = blob.listBlobs(ctx, newBucket);
|
||||
|
||||
console.log(blobInfos);
|
||||
|
||||
if (blobInfos.length !== 0) {
|
||||
throw new Error("blobInfos.length: expected '0', got '"+blobInfos.length+"'");
|
||||
}
|
||||
|
||||
blob.deleteBucket(ctx, newBucket)
|
||||
|
||||
buckets = blob.listBuckets(ctx);
|
||||
|
||||
if (buckets.length !== 1) {
|
||||
throw new Error("buckets.length: expected '1', got '"+buckets.length+"'");
|
||||
}
|
|
@ -12,7 +12,7 @@ func AssertType[T any](v goja.Value, rt *goja.Runtime) T {
|
|||
return c
|
||||
}
|
||||
|
||||
panic(rt.ToValue(errors.Errorf("expected value to be a '%T', got '%T'", *new(T), v.Export())))
|
||||
panic(rt.ToValue(errors.Errorf("expected value to be a '%T', got '%T'", new(T), v.Export())))
|
||||
}
|
||||
|
||||
func AssertContext(v goja.Value, r *goja.Runtime) context.Context {
|
||||
|
|
|
@ -68,8 +68,11 @@ func (b *BlobBucket) Close() error {
|
|||
func (b *BlobBucket) Delete(ctx context.Context, id storage.BlobID) error {
|
||||
err := b.withTx(ctx, func(tx *sql.Tx) error {
|
||||
query := `DELETE FROM blobs WHERE bucket = $1 AND id = $2`
|
||||
args := []any{b.name, id}
|
||||
|
||||
if _, err := tx.ExecContext(ctx, query, b.name, id); err != nil {
|
||||
logger.Debug(ctx, "executing query", logger.F("query", query), logger.F("args", args))
|
||||
|
||||
if _, err := tx.ExecContext(ctx, query, args...); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
|
@ -88,7 +91,11 @@ func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobIn
|
|||
|
||||
err := b.withTx(ctx, func(tx *sql.Tx) error {
|
||||
query := `SELECT content_type, mod_time, size FROM blobs WHERE bucket = $1 AND id = $2`
|
||||
row := tx.QueryRowContext(ctx, query, b.name, id)
|
||||
args := []any{b.name, id}
|
||||
|
||||
logger.Debug(ctx, "executing query", logger.F("query", query), logger.F("args", args))
|
||||
|
||||
row := tx.QueryRowContext(ctx, query, args...)
|
||||
|
||||
var (
|
||||
contentType string
|
||||
|
@ -127,8 +134,11 @@ func (b *BlobBucket) List(ctx context.Context) ([]storage.BlobInfo, error) {
|
|||
|
||||
err := b.withTx(ctx, func(tx *sql.Tx) error {
|
||||
query := `SELECT id, content_type, mod_time, size FROM blobs WHERE bucket = $1`
|
||||
args := []any{b.name}
|
||||
|
||||
rows, err := tx.QueryContext(ctx, query, b.name)
|
||||
logger.Debug(ctx, "executing query", logger.F("query", query), logger.F("args", args))
|
||||
|
||||
rows, err := tx.QueryContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
@ -229,7 +239,12 @@ type blobWriterCloser struct {
|
|||
|
||||
// Write implements io.WriteCloser
|
||||
func (wbc *blobWriterCloser) Write(p []byte) (int, error) {
|
||||
logger.Debug(context.Background(), "writing data to blob", logger.F("data", p))
|
||||
logger.Debug(
|
||||
context.Background(), "writing data to blob",
|
||||
logger.F("size", len(p)),
|
||||
logger.F("blobID", wbc.id),
|
||||
logger.F("bucket", wbc.bucket),
|
||||
)
|
||||
|
||||
n, err := wbc.buf.Write(p)
|
||||
if err != nil {
|
||||
|
@ -266,14 +281,20 @@ func (wbc *blobWriterCloser) Close() error {
|
|||
mime := mimetype.Detect(data)
|
||||
modTime := time.Now().UTC()
|
||||
|
||||
_, err := tx.Exec(
|
||||
query,
|
||||
args := []any{
|
||||
wbc.bucket,
|
||||
wbc.id,
|
||||
data,
|
||||
mime.String(),
|
||||
modTime,
|
||||
len(data),
|
||||
}
|
||||
|
||||
logger.Debug(ctx, "executing query", logger.F("query", query))
|
||||
|
||||
_, err := tx.Exec(
|
||||
query,
|
||||
args...,
|
||||
)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
|
|
|
@ -36,7 +36,7 @@ func (s *BlobStore) ListBuckets(ctx context.Context) ([]string, error) {
|
|||
buckets := make([]string, 0)
|
||||
|
||||
err := s.withTx(ctx, func(tx *sql.Tx) error {
|
||||
query := `SELECT DISTINCT name FROM blobs`
|
||||
query := `SELECT DISTINCT bucket FROM blobs`
|
||||
rows, err := tx.QueryContext(ctx, query)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
|
|
Loading…
Reference in New Issue