2023-02-09 12:16:36 +01:00
|
|
|
package module
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"io"
|
|
|
|
"mime/multipart"
|
|
|
|
|
|
|
|
"forge.cadoles.com/arcad/edge/pkg/app"
|
|
|
|
"forge.cadoles.com/arcad/edge/pkg/bus"
|
|
|
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
|
|
|
"github.com/dop251/goja"
|
|
|
|
"github.com/pkg/errors"
|
|
|
|
"gitlab.com/wpetit/goweb/logger"
|
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
|
|
|
DefaultBlobBucket string = "default"
|
|
|
|
)
|
|
|
|
|
|
|
|
type BlobModule struct {
|
|
|
|
server *app.Server
|
|
|
|
bus bus.Bus
|
|
|
|
store storage.BlobStore
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *BlobModule) Name() string {
|
|
|
|
return "blob"
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *BlobModule) Export(export *goja.Object) {
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *BlobModule) handleMessages() {
|
|
|
|
ctx := context.Background()
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
err := m.bus.Reply(ctx, MessageNamespaceUploadRequest, func(msg bus.Message) (bus.Message, error) {
|
|
|
|
uploadRequest, ok := msg.(*MessageUploadRequest)
|
|
|
|
if !ok {
|
|
|
|
return nil, errors.Wrapf(bus.ErrUnexpectedMessage, "expected message upload request, got '%T'", msg)
|
|
|
|
}
|
|
|
|
|
|
|
|
res, err := m.handleUploadRequest(uploadRequest)
|
|
|
|
if err != nil {
|
|
|
|
logger.Error(ctx, "could not handle upload request", logger.E(errors.WithStack(err)))
|
|
|
|
|
|
|
|
return nil, errors.WithStack(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
logger.Debug(ctx, "upload request response", logger.F("response", res))
|
|
|
|
|
|
|
|
return res, nil
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
panic(errors.WithStack(err))
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
err := m.bus.Reply(ctx, MessageNamespaceDownloadRequest, func(msg bus.Message) (bus.Message, error) {
|
|
|
|
downloadRequest, ok := msg.(*MessageDownloadRequest)
|
|
|
|
if !ok {
|
|
|
|
return nil, errors.Wrapf(bus.ErrUnexpectedMessage, "expected message download request, got '%T'", msg)
|
|
|
|
}
|
|
|
|
|
|
|
|
res, err := m.handleDownloadRequest(downloadRequest)
|
|
|
|
if err != nil {
|
|
|
|
logger.Error(ctx, "could not handle download request", logger.E(errors.WithStack(err)))
|
|
|
|
|
|
|
|
return nil, errors.WithStack(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return res, nil
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
panic(errors.WithStack(err))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *BlobModule) handleUploadRequest(req *MessageUploadRequest) (*MessageUploadResponse, error) {
|
|
|
|
blobID := storage.NewBlobID()
|
|
|
|
res := NewMessageUploadResponse(req.RequestID)
|
|
|
|
|
|
|
|
ctx := logger.With(req.Context, logger.F("blobID", blobID))
|
|
|
|
|
|
|
|
blobInfo := map[string]interface{}{
|
|
|
|
"size": req.FileHeader.Size,
|
|
|
|
"filename": req.FileHeader.Filename,
|
|
|
|
"contentType": req.FileHeader.Header.Get("Content-Type"),
|
|
|
|
}
|
|
|
|
|
2023-03-01 13:04:40 +01:00
|
|
|
rawResult, err := m.server.ExecFuncByName(ctx, "onBlobUpload", ctx, blobID, blobInfo, req.Metadata)
|
2023-02-09 12:16:36 +01:00
|
|
|
if err != nil {
|
|
|
|
if errors.Is(err, app.ErrFuncDoesNotExist) {
|
|
|
|
res.Allow = false
|
|
|
|
|
|
|
|
return res, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil, errors.WithStack(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
result, ok := rawResult.Export().(map[string]interface{})
|
|
|
|
if !ok {
|
|
|
|
return nil, errors.Errorf(
|
|
|
|
"unexpected onBlobUpload result: expected 'map[string]interface{}', got '%T'",
|
|
|
|
rawResult.Export(),
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
|
|
|
var allow bool
|
|
|
|
|
|
|
|
rawAllow, exists := result["allow"]
|
|
|
|
if !exists {
|
|
|
|
allow = false
|
|
|
|
} else {
|
|
|
|
allow, ok = rawAllow.(bool)
|
|
|
|
if !ok {
|
|
|
|
return nil, errors.Errorf("invalid 'allow' result property: got type '%T', expected type '%T'", rawAllow, false)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
res.Allow = allow
|
|
|
|
|
|
|
|
if res.Allow {
|
|
|
|
bucket := DefaultBlobBucket
|
|
|
|
|
|
|
|
rawBucket, exists := result["bucket"]
|
|
|
|
if exists {
|
|
|
|
bucket, ok = rawBucket.(string)
|
|
|
|
if !ok {
|
|
|
|
return nil, errors.Errorf("invalid 'bucket' result property: got type '%T', expected type '%T'", bucket, "")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := m.saveBlob(ctx, bucket, blobID, *req.FileHeader); err != nil {
|
|
|
|
return nil, errors.WithStack(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
res.Bucket = bucket
|
|
|
|
res.BlobID = blobID
|
|
|
|
}
|
|
|
|
|
|
|
|
return res, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *BlobModule) saveBlob(ctx context.Context, bucketName string, blobID storage.BlobID, fileHeader multipart.FileHeader) error {
|
|
|
|
file, err := fileHeader.Open()
|
|
|
|
if err != nil {
|
|
|
|
return errors.WithStack(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
defer func() {
|
|
|
|
if err := file.Close(); err != nil {
|
|
|
|
logger.Error(ctx, "could not close file", logger.E(errors.WithStack(err)))
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
bucket, err := m.store.OpenBucket(ctx, bucketName)
|
|
|
|
if err != nil {
|
|
|
|
return errors.WithStack(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
defer func() {
|
|
|
|
if err := bucket.Close(); err != nil {
|
|
|
|
logger.Error(ctx, "could not close bucket", logger.E(errors.WithStack(err)))
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
writer, err := bucket.NewWriter(ctx, blobID)
|
|
|
|
if err != nil {
|
|
|
|
return errors.WithStack(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
defer func() {
|
|
|
|
if err := file.Close(); err != nil {
|
|
|
|
logger.Error(ctx, "could not close file", logger.E(errors.WithStack(err)))
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
defer func() {
|
|
|
|
if err := writer.Close(); err != nil {
|
|
|
|
logger.Error(ctx, "could not close writer", logger.E(errors.WithStack(err)))
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
if _, err := io.Copy(writer, file); err != nil {
|
|
|
|
return errors.WithStack(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *BlobModule) handleDownloadRequest(req *MessageDownloadRequest) (*MessageDownloadResponse, error) {
|
|
|
|
res := NewMessageDownloadResponse(req.RequestID)
|
|
|
|
|
2023-03-01 13:04:40 +01:00
|
|
|
rawResult, err := m.server.ExecFuncByName(req.Context, "onBlobDownload", req.Context, req.Bucket, req.BlobID)
|
2023-02-09 12:16:36 +01:00
|
|
|
if err != nil {
|
|
|
|
if errors.Is(err, app.ErrFuncDoesNotExist) {
|
|
|
|
res.Allow = false
|
|
|
|
|
|
|
|
return res, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil, errors.WithStack(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
result, ok := rawResult.Export().(map[string]interface{})
|
|
|
|
if !ok {
|
|
|
|
return nil, errors.Errorf(
|
|
|
|
"unexpected onBlobDownload result: expected 'map[string]interface{}', got '%T'",
|
|
|
|
rawResult.Export(),
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
|
|
|
var allow bool
|
|
|
|
|
|
|
|
rawAllow, exists := result["allow"]
|
|
|
|
if !exists {
|
|
|
|
allow = false
|
|
|
|
} else {
|
|
|
|
allow, ok = rawAllow.(bool)
|
|
|
|
if !ok {
|
|
|
|
return nil, errors.Errorf("invalid 'allow' result property: got type '%T', expected type '%T'", rawAllow, false)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
res.Allow = allow
|
|
|
|
|
|
|
|
reader, info, err := m.openBlob(req.Context, req.Bucket, req.BlobID)
|
|
|
|
if err != nil && !errors.Is(err, storage.ErrBlobNotFound) {
|
|
|
|
return nil, errors.WithStack(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if reader != nil {
|
|
|
|
res.Blob = reader
|
|
|
|
}
|
|
|
|
|
|
|
|
if info != nil {
|
|
|
|
res.BlobInfo = info
|
|
|
|
}
|
|
|
|
|
|
|
|
return res, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *BlobModule) openBlob(ctx context.Context, bucketName string, blobID storage.BlobID) (io.ReadSeekCloser, storage.BlobInfo, error) {
|
|
|
|
bucket, err := m.store.OpenBucket(ctx, bucketName)
|
|
|
|
if err != nil {
|
|
|
|
return nil, nil, errors.WithStack(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
defer func() {
|
|
|
|
if err := bucket.Close(); err != nil {
|
|
|
|
logger.Error(ctx, "could not close bucket", logger.E(errors.WithStack(err)), logger.F("bucket", bucket))
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
info, err := bucket.Get(ctx, blobID)
|
|
|
|
if err != nil {
|
|
|
|
return nil, nil, errors.WithStack(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
reader, err := bucket.NewReader(ctx, blobID)
|
|
|
|
if err != nil {
|
|
|
|
return nil, nil, errors.WithStack(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return reader, info, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func BlobModuleFactory(bus bus.Bus, store storage.BlobStore) app.ServerModuleFactory {
|
|
|
|
return func(server *app.Server) app.ServerModule {
|
|
|
|
mod := &BlobModule{
|
|
|
|
store: store,
|
|
|
|
bus: bus,
|
|
|
|
server: server,
|
|
|
|
}
|
|
|
|
|
|
|
|
go mod.handleMessages()
|
|
|
|
|
|
|
|
return mod
|
|
|
|
}
|
|
|
|
}
|