Compare commits

...

4 Commits

Author SHA1 Message Date
0f0fdfb02b chore: use date based versioning 2023-03-23 19:04:29 +01:00
9eefce9b41 feat: remove arbitrary timeout 2023-03-23 19:01:48 +01:00
0577762be9 feat(module,blob): implements full api 2023-03-23 19:01:20 +01:00
cf8a3f8ac0 feat: add proxy package 2023-03-22 18:05:44 +01:00
20 changed files with 1036 additions and 313 deletions

View File

@ -7,6 +7,8 @@ GOTEST_ARGS ?= -short
ESBUILD_VERSION ?= v0.17.5
GIT_VERSION := $(shell git describe --always)
DATE_VERSION := $(shell date +%Y.%-m.%-d)
FULL_VERSION := v$(DATE_VERSION)-$(GIT_VERSION)$(if $(shell git diff --stat),-dirty,)
build: build-edge-cli
@ -69,8 +71,8 @@ gitea-release: tools/gitea-release/bin/gitea-release.sh build
GITEA_RELEASE_PROJECT="edge" \
GITEA_RELEASE_ORG="arcad" \
GITEA_RELEASE_BASE_URL="https://forge.cadoles.com" \
GITEA_RELEASE_VERSION="$(GIT_VERSION)" \
GITEA_RELEASE_NAME="$(GIT_VERSION)" \
GITEA_RELEASE_VERSION="$(FULL_VERSION)" \
GITEA_RELEASE_NAME="$(FULL_VERSION)" \
GITEA_RELEASE_COMMITISH_TARGET="$(GIT_VERSION)" \
GITEA_RELEASE_IS_DRAFT="false" \
GITEA_RELEASE_IS_PRERELEASE="true" \

View File

@ -15,6 +15,7 @@ import (
"forge.cadoles.com/arcad/edge/pkg/module"
"forge.cadoles.com/arcad/edge/pkg/module/auth"
authHTTP "forge.cadoles.com/arcad/edge/pkg/module/auth/http"
"forge.cadoles.com/arcad/edge/pkg/module/blob"
"forge.cadoles.com/arcad/edge/pkg/module/cast"
"forge.cadoles.com/arcad/edge/pkg/module/net"
"forge.cadoles.com/arcad/edge/pkg/storage"
@ -166,7 +167,7 @@ func getServerModules(bus bus.Bus, ds storage.DocumentStore, bs storage.BlobStor
net.ModuleFactory(bus),
module.RPCModuleFactory(bus),
module.StoreModuleFactory(ds),
module.BlobModuleFactory(bus, bs),
blob.ModuleFactory(bus, bs),
module.Extends(
auth.ModuleFactory(
auth.WithJWT(dummyKeySet),

View File

@ -38,11 +38,15 @@ function onBlobDownload(ctx, bucketName, blobId) {
> `TODO`
### `blob.writeBlob(ctx: Context, bucketName: string, blobId: string)`
### `blob.getBlobInfo(ctx: Context, bucketName: string, blobId: string): BlobInfo`
> `TODO`
### `blob.readBlob(ctx: Context, bucketName: string, blobId: string)`
### `blob.writeBlob(ctx: Context, bucketName: string, blobId: string, data: any)`
> `TODO`
### `blob.readBlob(ctx: Context, bucketName: string, blobId: string): ArrayBuffer`
> `TODO`
@ -58,7 +62,7 @@ function onBlobDownload(ctx, bucketName, blobId) {
> `TODO`
### `blob.getBlobInfo(ctx: Context, bucketName: string, blobId: string): BlobInfo`
### `blob.getBucketSize(ctx: Context, bucketName: string): number`
> `TODO`
@ -70,4 +74,16 @@ Voir la documentation de l'objet [`Context`](./context.md#Context).
### `BlobInfo`
### `Metadata`
```typescript
interface BlobInfo {
id: string // Identifiant du blob
bucket: string // Nom du bucket contenant le blob
size: number // Taille du blob
modTime: number // Timestamp Unix de dernière modification du blob
contentType: string // Type MIME du contenu du blob
}
```
### `Metadata`
L'objet `Metadata` est un objet clé/valeur arbitraire transmis avec la requête de téléversement. Voir la méthode [`Edge.upload(blob, metadata)`](../client-api/README.md#edge-upload-blob-blob-metadata-object-promise) du SDK client.

View File

@ -1,12 +1,9 @@
package memory
import (
"context"
"sync"
"time"
"forge.cadoles.com/arcad/edge/pkg/bus"
"gitlab.com/wpetit/goweb/logger"
)
type eventDispatcherSet struct {
@ -89,8 +86,6 @@ func (d *eventDispatcher) IsOut(out <-chan bus.Message) bool {
}
func (d *eventDispatcher) Run() {
ctx := context.Background()
for {
msg, ok := <-d.in
if !ok {
@ -99,12 +94,7 @@ func (d *eventDispatcher) Run() {
return
}
timeout := time.After(2 * time.Second)
select {
case d.out <- msg:
case <-timeout:
logger.Error(ctx, "message out chan timed out", logger.F("message", msg))
}
d.out <- msg
}
}

View File

@ -11,6 +11,7 @@ import (
"forge.cadoles.com/arcad/edge/pkg/bus"
"forge.cadoles.com/arcad/edge/pkg/module"
"forge.cadoles.com/arcad/edge/pkg/module/blob"
"forge.cadoles.com/arcad/edge/pkg/storage"
"github.com/go-chi/chi/v5"
"github.com/pkg/errors"
@ -68,7 +69,7 @@ func (h *Handler) handleAppUpload(w http.ResponseWriter, r *http.Request) {
ContextKeyOriginRequest: r,
})
requestMsg := module.NewMessageUploadRequest(ctx, fileHeader, metadata)
requestMsg := blob.NewMessageUploadRequest(ctx, fileHeader, metadata)
reply, err := h.bus.Request(ctx, requestMsg)
if err != nil {
@ -80,7 +81,7 @@ func (h *Handler) handleAppUpload(w http.ResponseWriter, r *http.Request) {
logger.Debug(ctx, "upload reply", logger.F("reply", reply))
responseMsg, ok := reply.(*module.MessageUploadResponse)
responseMsg, ok := reply.(*blob.MessageUploadResponse)
if !ok {
logger.Error(
ctx, "unexpected upload response message",
@ -120,7 +121,7 @@ func (h *Handler) handleAppDownload(w http.ResponseWriter, r *http.Request) {
ContextKeyOriginRequest: r,
})
requestMsg := module.NewMessageDownloadRequest(ctx, bucket, storage.BlobID(blobID))
requestMsg := blob.NewMessageDownloadRequest(ctx, bucket, storage.BlobID(blobID))
reply, err := h.bus.Request(ctx, requestMsg)
if err != nil {
@ -130,7 +131,7 @@ func (h *Handler) handleAppDownload(w http.ResponseWriter, r *http.Request) {
return
}
replyMsg, ok := reply.(*module.MessageDownloadResponse)
replyMsg, ok := reply.(*blob.MessageDownloadResponse)
if !ok {
logger.Error(
ctx, "unexpected download response message",

View File

@ -1,282 +0,0 @@
package module
import (
"context"
"io"
"mime/multipart"
"forge.cadoles.com/arcad/edge/pkg/app"
"forge.cadoles.com/arcad/edge/pkg/bus"
"forge.cadoles.com/arcad/edge/pkg/storage"
"github.com/dop251/goja"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
const (
DefaultBlobBucket string = "default"
)
type BlobModule struct {
server *app.Server
bus bus.Bus
store storage.BlobStore
}
func (m *BlobModule) Name() string {
return "blob"
}
func (m *BlobModule) Export(export *goja.Object) {
}
func (m *BlobModule) handleMessages() {
ctx := context.Background()
go func() {
err := m.bus.Reply(ctx, MessageNamespaceUploadRequest, func(msg bus.Message) (bus.Message, error) {
uploadRequest, ok := msg.(*MessageUploadRequest)
if !ok {
return nil, errors.Wrapf(bus.ErrUnexpectedMessage, "expected message upload request, got '%T'", msg)
}
res, err := m.handleUploadRequest(uploadRequest)
if err != nil {
logger.Error(ctx, "could not handle upload request", logger.E(errors.WithStack(err)))
return nil, errors.WithStack(err)
}
logger.Debug(ctx, "upload request response", logger.F("response", res))
return res, nil
})
if err != nil {
panic(errors.WithStack(err))
}
}()
err := m.bus.Reply(ctx, MessageNamespaceDownloadRequest, func(msg bus.Message) (bus.Message, error) {
downloadRequest, ok := msg.(*MessageDownloadRequest)
if !ok {
return nil, errors.Wrapf(bus.ErrUnexpectedMessage, "expected message download request, got '%T'", msg)
}
res, err := m.handleDownloadRequest(downloadRequest)
if err != nil {
logger.Error(ctx, "could not handle download request", logger.E(errors.WithStack(err)))
return nil, errors.WithStack(err)
}
return res, nil
})
if err != nil {
panic(errors.WithStack(err))
}
}
func (m *BlobModule) handleUploadRequest(req *MessageUploadRequest) (*MessageUploadResponse, error) {
blobID := storage.NewBlobID()
res := NewMessageUploadResponse(req.RequestID)
ctx := logger.With(req.Context, logger.F("blobID", blobID))
blobInfo := map[string]interface{}{
"size": req.FileHeader.Size,
"filename": req.FileHeader.Filename,
"contentType": req.FileHeader.Header.Get("Content-Type"),
}
rawResult, err := m.server.ExecFuncByName(ctx, "onBlobUpload", ctx, blobID, blobInfo, req.Metadata)
if err != nil {
if errors.Is(err, app.ErrFuncDoesNotExist) {
res.Allow = false
return res, nil
}
return nil, errors.WithStack(err)
}
result, ok := rawResult.Export().(map[string]interface{})
if !ok {
return nil, errors.Errorf(
"unexpected onBlobUpload result: expected 'map[string]interface{}', got '%T'",
rawResult.Export(),
)
}
var allow bool
rawAllow, exists := result["allow"]
if !exists {
allow = false
} else {
allow, ok = rawAllow.(bool)
if !ok {
return nil, errors.Errorf("invalid 'allow' result property: got type '%T', expected type '%T'", rawAllow, false)
}
}
res.Allow = allow
if res.Allow {
bucket := DefaultBlobBucket
rawBucket, exists := result["bucket"]
if exists {
bucket, ok = rawBucket.(string)
if !ok {
return nil, errors.Errorf("invalid 'bucket' result property: got type '%T', expected type '%T'", bucket, "")
}
}
if err := m.saveBlob(ctx, bucket, blobID, *req.FileHeader); err != nil {
return nil, errors.WithStack(err)
}
res.Bucket = bucket
res.BlobID = blobID
}
return res, nil
}
func (m *BlobModule) saveBlob(ctx context.Context, bucketName string, blobID storage.BlobID, fileHeader multipart.FileHeader) error {
file, err := fileHeader.Open()
if err != nil {
return errors.WithStack(err)
}
defer func() {
if err := file.Close(); err != nil {
logger.Error(ctx, "could not close file", logger.E(errors.WithStack(err)))
}
}()
bucket, err := m.store.OpenBucket(ctx, bucketName)
if err != nil {
return errors.WithStack(err)
}
defer func() {
if err := bucket.Close(); err != nil {
logger.Error(ctx, "could not close bucket", logger.E(errors.WithStack(err)))
}
}()
writer, err := bucket.NewWriter(ctx, blobID)
if err != nil {
return errors.WithStack(err)
}
defer func() {
if err := file.Close(); err != nil {
logger.Error(ctx, "could not close file", logger.E(errors.WithStack(err)))
}
}()
defer func() {
if err := writer.Close(); err != nil {
logger.Error(ctx, "could not close writer", logger.E(errors.WithStack(err)))
}
}()
if _, err := io.Copy(writer, file); err != nil {
return errors.WithStack(err)
}
return nil
}
func (m *BlobModule) handleDownloadRequest(req *MessageDownloadRequest) (*MessageDownloadResponse, error) {
res := NewMessageDownloadResponse(req.RequestID)
rawResult, err := m.server.ExecFuncByName(req.Context, "onBlobDownload", req.Context, req.Bucket, req.BlobID)
if err != nil {
if errors.Is(err, app.ErrFuncDoesNotExist) {
res.Allow = false
return res, nil
}
return nil, errors.WithStack(err)
}
result, ok := rawResult.Export().(map[string]interface{})
if !ok {
return nil, errors.Errorf(
"unexpected onBlobDownload result: expected 'map[string]interface{}', got '%T'",
rawResult.Export(),
)
}
var allow bool
rawAllow, exists := result["allow"]
if !exists {
allow = false
} else {
allow, ok = rawAllow.(bool)
if !ok {
return nil, errors.Errorf("invalid 'allow' result property: got type '%T', expected type '%T'", rawAllow, false)
}
}
res.Allow = allow
reader, info, err := m.openBlob(req.Context, req.Bucket, req.BlobID)
if err != nil && !errors.Is(err, storage.ErrBlobNotFound) {
return nil, errors.WithStack(err)
}
if reader != nil {
res.Blob = reader
}
if info != nil {
res.BlobInfo = info
}
return res, nil
}
func (m *BlobModule) openBlob(ctx context.Context, bucketName string, blobID storage.BlobID) (io.ReadSeekCloser, storage.BlobInfo, error) {
bucket, err := m.store.OpenBucket(ctx, bucketName)
if err != nil {
return nil, nil, errors.WithStack(err)
}
defer func() {
if err := bucket.Close(); err != nil {
logger.Error(ctx, "could not close bucket", logger.E(errors.WithStack(err)), logger.F("bucket", bucket))
}
}()
info, err := bucket.Get(ctx, blobID)
if err != nil {
return nil, nil, errors.WithStack(err)
}
reader, err := bucket.NewReader(ctx, blobID)
if err != nil {
return nil, nil, errors.WithStack(err)
}
return reader, info, nil
}
func BlobModuleFactory(bus bus.Bus, store storage.BlobStore) app.ServerModuleFactory {
return func(server *app.Server) app.ServerModule {
mod := &BlobModule{
store: store,
bus: bus,
server: server,
}
go mod.handleMessages()
return mod
}
}

View File

@ -0,0 +1,21 @@
package blob
import "forge.cadoles.com/arcad/edge/pkg/storage"
type blobInfo struct {
ID storage.BlobID `goja:"id"`
Bucket string `goja:"bucket"`
ModTime int64 `goja:"modTime"`
Size int64 `goja:"size"`
ContentType string `goja:"contentType"`
}
func toGojaBlobInfo(blob storage.BlobInfo) blobInfo {
return blobInfo{
ID: blob.ID(),
Bucket: blob.Bucket(),
ModTime: blob.ModTime().Unix(),
Size: blob.Size(),
ContentType: blob.ContentType(),
}
}

View File

@ -1,4 +1,4 @@
package module
package blob
import (
"context"

499
pkg/module/blob/module.go Normal file
View File

@ -0,0 +1,499 @@
package blob
import (
"context"
"fmt"
"io"
"mime/multipart"
"os"
"sort"
"forge.cadoles.com/arcad/edge/pkg/app"
"forge.cadoles.com/arcad/edge/pkg/bus"
"forge.cadoles.com/arcad/edge/pkg/module/util"
"forge.cadoles.com/arcad/edge/pkg/storage"
"github.com/dop251/goja"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
const (
DefaultBlobBucket string = "default"
)
type Module struct {
server *app.Server
bus bus.Bus
store storage.BlobStore
}
func (m *Module) Name() string {
return "blob"
}
func (m *Module) Export(export *goja.Object) {
funcs := map[string]any{
"listBuckets": m.listBuckets,
"deleteBucket": m.deleteBucket,
"getBucketSize": m.getBucketSize,
"listBlobs": m.listBlobs,
"getBlobInfo": m.getBlobInfo,
"readBlob": m.readBlob,
"writeBlob": m.writeBlob,
"deleteBlob": m.deleteBlob,
}
for name, fn := range funcs {
if err := export.Set(name, fn); err != nil {
panic(errors.Wrapf(err, "could not set '%s' function", name))
}
}
if err := export.Set("DEFAULT_BUCKET", DefaultBlobBucket); err != nil {
panic(errors.Wrap(err, "could not set 'DEFAULT_BUCKET' property"))
}
}
func (m *Module) listBuckets(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
ctx := util.AssertContext(call.Argument(0), rt)
buckets, err := m.store.ListBuckets(ctx)
if err != nil {
panic(rt.ToValue(errors.WithStack(err)))
}
defaultBucketIndex := sort.SearchStrings(buckets, DefaultBlobBucket)
if defaultBucketIndex == 0 {
buckets = append(buckets, DefaultBlobBucket)
} else {
buckets[defaultBucketIndex] = DefaultBlobBucket
}
return rt.ToValue(buckets)
}
func (m *Module) writeBlob(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
ctx := util.AssertContext(call.Argument(0), rt)
bucketName := util.AssertString(call.Argument(1), rt)
blobID := assertBlobID(call.Argument(2), rt)
rawData := call.Argument(3).Export()
var data []byte
switch typ := rawData.(type) {
case []byte:
data = typ
case string:
data = []byte(typ)
default:
data = []byte(fmt.Sprintf("%v", typ))
}
bucket, err := m.store.OpenBucket(ctx, bucketName)
if err != nil {
panic(rt.ToValue(errors.WithStack(err)))
}
defer func() {
if err := bucket.Close(); err != nil {
logger.Error(ctx, "could not close bucket", logger.E(errors.WithStack(err)))
}
}()
writer, err := bucket.NewWriter(ctx, blobID)
if err != nil {
panic(rt.ToValue(errors.WithStack(err)))
}
defer func() {
if err := writer.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
logger.Error(ctx, "could not close blob writer", logger.E(errors.WithStack(err)))
}
}()
if _, err := writer.Write(data); err != nil {
panic(rt.ToValue(errors.WithStack(err)))
}
return nil
}
func (m *Module) getBlobInfo(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
ctx := util.AssertContext(call.Argument(0), rt)
bucketName := util.AssertString(call.Argument(1), rt)
blobID := assertBlobID(call.Argument(2), rt)
bucket, err := m.store.OpenBucket(ctx, bucketName)
if err != nil {
panic(rt.ToValue(errors.WithStack(err)))
}
defer func() {
if err := bucket.Close(); err != nil {
logger.Error(ctx, "could not close bucket", logger.E(errors.WithStack(err)))
}
}()
blobInfo, err := bucket.Get(ctx, blobID)
if err != nil {
panic(rt.ToValue(errors.WithStack(err)))
}
return rt.ToValue(toGojaBlobInfo(blobInfo))
}
func (m *Module) readBlob(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
ctx := util.AssertContext(call.Argument(0), rt)
bucketName := util.AssertString(call.Argument(1), rt)
blobID := assertBlobID(call.Argument(2), rt)
reader, _, err := m.openBlob(ctx, bucketName, blobID)
if err != nil {
panic(rt.ToValue(errors.WithStack(err)))
}
defer func() {
if err := reader.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
logger.Error(ctx, "could not close blob reader", logger.E(errors.WithStack(err)))
}
}()
data, err := io.ReadAll(reader)
if err != nil {
panic(rt.ToValue(errors.WithStack(err)))
}
return rt.ToValue(rt.NewArrayBuffer(data))
}
func (m *Module) deleteBlob(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
ctx := util.AssertContext(call.Argument(0), rt)
bucketName := util.AssertString(call.Argument(1), rt)
blobID := assertBlobID(call.Argument(2), rt)
bucket, err := m.store.OpenBucket(ctx, bucketName)
if err != nil {
panic(rt.ToValue(errors.WithStack(err)))
}
if err := bucket.Delete(ctx, blobID); err != nil {
panic(rt.ToValue(errors.WithStack(err)))
}
return nil
}
func (m *Module) listBlobs(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
ctx := util.AssertContext(call.Argument(0), rt)
bucketName := util.AssertString(call.Argument(1), rt)
bucket, err := m.store.OpenBucket(ctx, bucketName)
if err != nil {
panic(rt.ToValue(errors.WithStack(err)))
}
blobInfos, err := bucket.List(ctx)
if err != nil {
panic(rt.ToValue(errors.WithStack(err)))
}
gojaBlobInfos := make([]blobInfo, len(blobInfos))
for i, b := range blobInfos {
gojaBlobInfos[i] = toGojaBlobInfo(b)
}
return rt.ToValue(gojaBlobInfos)
}
func (m *Module) deleteBucket(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
ctx := util.AssertContext(call.Argument(0), rt)
bucketName := util.AssertString(call.Argument(1), rt)
if err := m.store.DeleteBucket(ctx, bucketName); err != nil {
panic(rt.ToValue(errors.WithStack(err)))
}
return nil
}
func (m *Module) getBucketSize(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
ctx := util.AssertContext(call.Argument(0), rt)
bucketName := util.AssertString(call.Argument(1), rt)
bucket, err := m.store.OpenBucket(ctx, bucketName)
if err != nil {
panic(rt.ToValue(errors.WithStack(err)))
}
size, err := bucket.Size(ctx)
if err != nil {
panic(rt.ToValue(errors.WithStack(err)))
}
return rt.ToValue(size)
}
func (m *Module) handleMessages() {
ctx := context.Background()
go func() {
err := m.bus.Reply(ctx, MessageNamespaceUploadRequest, func(msg bus.Message) (bus.Message, error) {
uploadRequest, ok := msg.(*MessageUploadRequest)
if !ok {
return nil, errors.Wrapf(bus.ErrUnexpectedMessage, "expected message upload request, got '%T'", msg)
}
res, err := m.handleUploadRequest(uploadRequest)
if err != nil {
logger.Error(ctx, "could not handle upload request", logger.E(errors.WithStack(err)))
return nil, errors.WithStack(err)
}
logger.Debug(ctx, "upload request response", logger.F("response", res))
return res, nil
})
if err != nil {
panic(errors.WithStack(err))
}
}()
err := m.bus.Reply(ctx, MessageNamespaceDownloadRequest, func(msg bus.Message) (bus.Message, error) {
downloadRequest, ok := msg.(*MessageDownloadRequest)
if !ok {
return nil, errors.Wrapf(bus.ErrUnexpectedMessage, "expected message download request, got '%T'", msg)
}
res, err := m.handleDownloadRequest(downloadRequest)
if err != nil {
logger.Error(ctx, "could not handle download request", logger.E(errors.WithStack(err)))
return nil, errors.WithStack(err)
}
return res, nil
})
if err != nil {
panic(errors.WithStack(err))
}
}
func (m *Module) handleUploadRequest(req *MessageUploadRequest) (*MessageUploadResponse, error) {
blobID := storage.NewBlobID()
res := NewMessageUploadResponse(req.RequestID)
ctx := logger.With(req.Context, logger.F("blobID", blobID))
blobInfo := map[string]interface{}{
"size": req.FileHeader.Size,
"filename": req.FileHeader.Filename,
"contentType": req.FileHeader.Header.Get("Content-Type"),
}
rawResult, err := m.server.ExecFuncByName(ctx, "onBlobUpload", ctx, blobID, blobInfo, req.Metadata)
if err != nil {
if errors.Is(err, app.ErrFuncDoesNotExist) {
res.Allow = false
return res, nil
}
return nil, errors.WithStack(err)
}
result, ok := rawResult.Export().(map[string]interface{})
if !ok {
return nil, errors.Errorf(
"unexpected onBlobUpload result: expected 'map[string]interface{}', got '%T'",
rawResult.Export(),
)
}
var allow bool
rawAllow, exists := result["allow"]
if !exists {
allow = false
} else {
allow, ok = rawAllow.(bool)
if !ok {
return nil, errors.Errorf("invalid 'allow' result property: got type '%T', expected type '%T'", rawAllow, false)
}
}
res.Allow = allow
if res.Allow {
bucket := DefaultBlobBucket
rawBucket, exists := result["bucket"]
if exists {
bucket, ok = rawBucket.(string)
if !ok {
return nil, errors.Errorf("invalid 'bucket' result property: got type '%T', expected type '%T'", bucket, "")
}
}
if err := m.saveBlob(ctx, bucket, blobID, *req.FileHeader); err != nil {
return nil, errors.WithStack(err)
}
res.Bucket = bucket
res.BlobID = blobID
}
return res, nil
}
func (m *Module) saveBlob(ctx context.Context, bucketName string, blobID storage.BlobID, fileHeader multipart.FileHeader) error {
file, err := fileHeader.Open()
if err != nil {
return errors.WithStack(err)
}
defer func() {
if err := file.Close(); err != nil {
logger.Error(ctx, "could not close file", logger.E(errors.WithStack(err)))
}
}()
bucket, err := m.store.OpenBucket(ctx, bucketName)
if err != nil {
return errors.WithStack(err)
}
defer func() {
if err := bucket.Close(); err != nil {
logger.Error(ctx, "could not close bucket", logger.E(errors.WithStack(err)))
}
}()
writer, err := bucket.NewWriter(ctx, blobID)
if err != nil {
return errors.WithStack(err)
}
defer func() {
if err := file.Close(); err != nil {
logger.Error(ctx, "could not close file", logger.E(errors.WithStack(err)))
}
}()
defer func() {
if err := writer.Close(); err != nil {
logger.Error(ctx, "could not close writer", logger.E(errors.WithStack(err)))
}
}()
if _, err := io.Copy(writer, file); err != nil {
return errors.WithStack(err)
}
return nil
}
func (m *Module) handleDownloadRequest(req *MessageDownloadRequest) (*MessageDownloadResponse, error) {
res := NewMessageDownloadResponse(req.RequestID)
rawResult, err := m.server.ExecFuncByName(req.Context, "onBlobDownload", req.Context, req.Bucket, req.BlobID)
if err != nil {
if errors.Is(err, app.ErrFuncDoesNotExist) {
res.Allow = false
return res, nil
}
return nil, errors.WithStack(err)
}
result, ok := rawResult.Export().(map[string]interface{})
if !ok {
return nil, errors.Errorf(
"unexpected onBlobDownload result: expected 'map[string]interface{}', got '%T'",
rawResult.Export(),
)
}
var allow bool
rawAllow, exists := result["allow"]
if !exists {
allow = false
} else {
allow, ok = rawAllow.(bool)
if !ok {
return nil, errors.Errorf("invalid 'allow' result property: got type '%T', expected type '%T'", rawAllow, false)
}
}
res.Allow = allow
reader, info, err := m.openBlob(req.Context, req.Bucket, req.BlobID)
if err != nil && !errors.Is(err, storage.ErrBlobNotFound) {
return nil, errors.WithStack(err)
}
if reader != nil {
res.Blob = reader
}
if info != nil {
res.BlobInfo = info
}
return res, nil
}
func (m *Module) openBlob(ctx context.Context, bucketName string, blobID storage.BlobID) (io.ReadSeekCloser, storage.BlobInfo, error) {
bucket, err := m.store.OpenBucket(ctx, bucketName)
if err != nil {
return nil, nil, errors.WithStack(err)
}
defer func() {
if err := bucket.Close(); err != nil {
logger.Error(ctx, "could not close bucket", logger.E(errors.WithStack(err)), logger.F("bucket", bucket))
}
}()
info, err := bucket.Get(ctx, blobID)
if err != nil {
return nil, nil, errors.WithStack(err)
}
reader, err := bucket.NewReader(ctx, blobID)
if err != nil {
return nil, nil, errors.WithStack(err)
}
return reader, info, nil
}
func ModuleFactory(bus bus.Bus, store storage.BlobStore) app.ServerModuleFactory {
return func(server *app.Server) app.ServerModule {
mod := &Module{
store: store,
bus: bus,
server: server,
}
go mod.handleMessages()
return mod
}
}
func assertBlobID(value goja.Value, rt *goja.Runtime) storage.BlobID {
blobID, ok := value.Export().(storage.BlobID)
if !ok {
rawBlobID, ok := value.Export().(string)
if !ok {
panic(rt.NewTypeError(fmt.Sprintf("blob id must be a blob or a string, got '%T'", value.Export())))
}
blobID = storage.BlobID(rawBlobID)
}
return blobID
}

View File

@ -0,0 +1,44 @@
package blob
import (
"io/ioutil"
"testing"
"cdr.dev/slog"
"forge.cadoles.com/arcad/edge/pkg/app"
"forge.cadoles.com/arcad/edge/pkg/bus/memory"
"forge.cadoles.com/arcad/edge/pkg/module"
"forge.cadoles.com/arcad/edge/pkg/storage/sqlite"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
func TestBlobModule(t *testing.T) {
t.Parallel()
logger.SetLevel(slog.LevelDebug)
bus := memory.NewBus()
store := sqlite.NewBlobStore(":memory:")
server := app.NewServer(
module.ContextModuleFactory(),
module.ConsoleModuleFactory(),
ModuleFactory(bus, store),
)
data, err := ioutil.ReadFile("testdata/blob.js")
if err != nil {
t.Fatal(err)
}
if err := server.Load("testdata/blob.js", string(data)); err != nil {
t.Fatal(err)
}
defer server.Stop()
if err := server.Start(); err != nil {
t.Fatalf("%+v", errors.WithStack(err))
}
}

79
pkg/module/blob/testdata/blob.js vendored Normal file
View File

@ -0,0 +1,79 @@
var ctx = context.new();
var buckets = blob.listBuckets(ctx);
if (!buckets || buckets.length === 0) {
throw new Error("buckets should not be empty");
}
var size = blob.getBucketSize(ctx, blob.DEFAULT_BUCKET);
if (size !== 0) {
throw new Error("bucket size: expected '0', got '"+size+"'");
}
var newBucket = "mybucket"
var blobId = "foo"
var data = (new Date()).toString();
blob.writeBlob(ctx, newBucket, blobId, data)
buckets = blob.listBuckets(ctx);
if (buckets.length !== 2) {
throw new Error("buckets.length: expected '2', got '"+buckets.length+"'");
}
size = blob.getBucketSize(ctx, newBucket);
if (size !== data.length) {
throw new Error("bucket size: expected '"+data.length+"', got '"+size+"'");
}
var blobInfos = blob.listBlobs(ctx, newBucket);
if (blobInfos.length !== 1) {
throw new Error("blobInfos.length: expected '1', got '"+blobInfos.length+"'");
}
if (blobInfos[0].id != blobId) {
throw new Error("blobInfos[0].id: expected '"+blobId+"', got '"+blobInfos[0].id+"'");
}
if (blobInfos[0].contentType != "text/plain; charset=utf-8") {
throw new Error("blobInfos[0].contentType: expected 'text/plain; charset=utf-8', got '"+blobInfos[0].contentType+"'");
}
if (blobInfos[0].size != data.length) {
throw new Error("blobInfos[0].size: expected '"+data.length+"', got '"+blobInfos[0].size+"'");
}
var readData = blob.readBlob(ctx, newBucket, blobId)
if (!readData) {
throw new Error("readData should not be nil");
}
var buckets = blob.listBuckets(ctx);
if (!buckets || buckets.length !== 2) {
throw new Error("buckets.length should be 2");
}
blob.deleteBlob(ctx, newBucket, blobId)
blobInfos = blob.listBlobs(ctx, newBucket);
console.log(blobInfos);
if (blobInfos.length !== 0) {
throw new Error("blobInfos.length: expected '0', got '"+blobInfos.length+"'");
}
blob.deleteBucket(ctx, newBucket)
buckets = blob.listBuckets(ctx);
if (buckets.length !== 1) {
throw new Error("buckets.length: expected '1', got '"+buckets.length+"'");
}

View File

@ -12,7 +12,7 @@ func AssertType[T any](v goja.Value, rt *goja.Runtime) T {
return c
}
panic(rt.ToValue(errors.Errorf("expected value to be a '%T', got '%T'", *new(T), v.Export())))
panic(rt.ToValue(errors.Errorf("expected value to be a '%T', got '%T'", new(T), v.Export())))
}
func AssertContext(v goja.Value, r *goja.Runtime) context.Context {

29
pkg/proxy/host_filter.go Normal file
View File

@ -0,0 +1,29 @@
package proxy
import (
"net/http"
"forge.cadoles.com/arcad/edge/pkg/proxy/wildcard"
)
func FilterHosts(allowedHostPatterns ...string) Middleware {
return func(h http.Handler) http.Handler {
fn := func(w http.ResponseWriter, r *http.Request) {
if matches := wildcard.MatchAny(r.Host, allowedHostPatterns...); !matches {
http.Error(w, http.StatusText(http.StatusForbidden), http.StatusForbidden)
return
}
h.ServeHTTP(w, r)
}
return http.HandlerFunc(fn)
}
}
func WithAllowedHosts(allowedHostPatterns ...string) OptionFunc {
return func(o *Options) {
o.Middlewares = append(o.Middlewares, FilterHosts(allowedHostPatterns...))
}
}

65
pkg/proxy/host_rewrite.go Normal file
View File

@ -0,0 +1,65 @@
package proxy
import (
"net/http"
"net/url"
"sort"
"forge.cadoles.com/arcad/edge/pkg/proxy/wildcard"
"gitlab.com/wpetit/goweb/logger"
)
func RewriteHosts(mappings map[string]*url.URL) Middleware {
patterns := make([]string, len(mappings))
for p := range mappings {
patterns = append(patterns, p)
}
sort.Strings(patterns)
return func(h http.Handler) http.Handler {
fn := func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
var match *url.URL
for _, p := range patterns {
logger.Debug(ctx, "matching host to pattern", logger.F("host", r.Host), logger.F("pattern", p))
if matches := wildcard.Match(r.Host, p); !matches {
continue
}
match = mappings[p]
break
}
if match == nil {
h.ServeHTTP(w, r)
return
}
ctx = logger.With(ctx, logger.F("originalHost", r.Host))
r = r.WithContext(ctx)
originalURL := r.URL.String()
r.URL.Host = match.Host
r.URL.Scheme = match.Scheme
logger.Debug(ctx, "rewriting url", logger.F("from", originalURL), logger.F("to", r.URL.String()))
h.ServeHTTP(w, r)
}
return http.HandlerFunc(fn)
}
}
func WithRewriteHosts(mappings map[string]*url.URL) OptionFunc {
return func(o *Options) {
o.Middlewares = append(o.Middlewares, RewriteHosts(mappings))
}
}

33
pkg/proxy/middleware.go Normal file
View File

@ -0,0 +1,33 @@
package proxy
import "net/http"
type Middleware func(h http.Handler) http.Handler
type ProxyResponseTransformer interface {
TransformResponse(*http.Response) error
}
type defaultProxyResponseTransformer struct{}
// TransformResponse implements ProxyResponseTransformer
func (*defaultProxyResponseTransformer) TransformResponse(*http.Response) error {
return nil
}
var _ ProxyResponseTransformer = &defaultProxyResponseTransformer{}
type ProxyResponseMiddleware func(ProxyResponseTransformer) ProxyResponseTransformer
type ProxyRequestTransformer interface {
TransformRequest(*http.Request)
}
type ProxyRequestMiddleware func(ProxyRequestTransformer) ProxyRequestTransformer
type defaultProxyRequestTransformer struct{}
// TransformRequest implements ProxyRequestTransformer
func (*defaultProxyRequestTransformer) TransformRequest(*http.Request) {}
var _ ProxyRequestTransformer = &defaultProxyRequestTransformer{}

29
pkg/proxy/options.go Normal file
View File

@ -0,0 +1,29 @@
package proxy
type Options struct {
Middlewares []Middleware
ProxyRequestMiddlewares []ProxyRequestMiddleware
ProxyResponseMiddlewares []ProxyResponseMiddleware
}
func defaultOptions() *Options {
return &Options{
Middlewares: make([]Middleware, 0),
ProxyRequestMiddlewares: make([]ProxyRequestMiddleware, 0),
ProxyResponseMiddlewares: make([]ProxyResponseMiddleware, 0),
}
}
type OptionFunc func(*Options)
func WithProxyRequestMiddlewares(middlewares ...ProxyRequestMiddleware) OptionFunc {
return func(o *Options) {
o.ProxyRequestMiddlewares = middlewares
}
}
func WithproxyResponseMiddlewares(middlewares ...ProxyResponseMiddleware) OptionFunc {
return func(o *Options) {
o.ProxyResponseMiddlewares = middlewares
}
}

131
pkg/proxy/proxy.go Normal file
View File

@ -0,0 +1,131 @@
package proxy
import (
"fmt"
"net/http"
"net/http/httputil"
"net/url"
"sync"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
type Proxy struct {
reversers sync.Map
handler http.Handler
proxyResponseTransformer ProxyResponseTransformer
proxyRequestTransformer ProxyRequestTransformer
}
// ServeHTTP implements http.Handler
func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
p.handler.ServeHTTP(w, r)
}
func (p *Proxy) proxyRequest(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
var reverser *httputil.ReverseProxy
key := fmt.Sprintf("%s://%s", r.URL.Scheme, r.URL.Host)
createAndStore := func() {
target := &url.URL{
Scheme: r.URL.Scheme,
Host: r.URL.Host,
}
reverser = httputil.NewSingleHostReverseProxy(target)
originalDirector := reverser.Director
if p.proxyRequestTransformer != nil {
reverser.Director = func(r *http.Request) {
originalURL := r.URL.String()
originalDirector(r)
p.proxyRequestTransformer.TransformRequest(r)
logger.Debug(ctx, "proxying request", logger.F("targetURL", r.URL.String()), logger.F("originalURL", originalURL))
}
}
if p.proxyResponseTransformer != nil {
reverser.ModifyResponse = func(r *http.Response) error {
if err := p.proxyResponseTransformer.TransformResponse(r); err != nil {
return errors.WithStack(err)
}
return nil
}
}
p.reversers.Store(key, reverser)
}
raw, exists := p.reversers.Load(key)
if !exists {
createAndStore()
}
reverser, ok := raw.(*httputil.ReverseProxy)
if !ok {
createAndStore()
}
reverser.ServeHTTP(w, r)
}
func New(funcs ...OptionFunc) *Proxy {
opts := defaultOptions()
for _, fn := range funcs {
fn(opts)
}
proxy := &Proxy{}
handler := http.HandlerFunc(proxy.proxyRequest)
proxy.handler = createMiddlewareChain(handler, opts.Middlewares)
proxy.proxyRequestTransformer = createProxyRequestChain(&defaultProxyRequestTransformer{}, opts.ProxyRequestMiddlewares)
proxy.proxyResponseTransformer = createProxyResponseChain(&defaultProxyResponseTransformer{}, opts.ProxyResponseMiddlewares)
return proxy
}
var _ http.Handler = &Proxy{}
func createMiddlewareChain(handler http.Handler, middlewares []Middleware) http.Handler {
reverse(middlewares)
for _, m := range middlewares {
handler = m(handler)
}
return handler
}
func createProxyResponseChain(transformer ProxyResponseTransformer, middlewares []ProxyResponseMiddleware) ProxyResponseTransformer {
reverse(middlewares)
for _, m := range middlewares {
transformer = m(transformer)
}
return transformer
}
func createProxyRequestChain(transformer ProxyRequestTransformer, middlewares []ProxyRequestMiddleware) ProxyRequestTransformer {
reverse(middlewares)
for _, m := range middlewares {
transformer = m(transformer)
}
return transformer
}
func reverse[S ~[]E, E any](s S) {
for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 {
s[i], s[j] = s[j], s[i]
}
}

View File

@ -0,0 +1,44 @@
package wildcard
const wildcard = '*'
func Match(str, pattern string) bool {
if pattern == "" {
return str == pattern
}
if pattern == string(wildcard) {
return true
}
return deepMatchRune([]rune(str), []rune(pattern))
}
func MatchAny(str string, patterns ...string) bool {
for _, p := range patterns {
if matches := Match(str, p); matches {
return matches
}
}
return false
}
func deepMatchRune(str, pattern []rune) bool {
for len(pattern) > 0 {
switch pattern[0] {
default:
if len(str) == 0 || str[0] != pattern[0] {
return false
}
case wildcard:
return deepMatchRune(str, pattern[1:]) ||
(len(str) > 0 && deepMatchRune(str[1:], pattern))
}
str = str[1:]
pattern = pattern[1:]
}
return len(str) == 0 && len(pattern) == 0
}

View File

@ -68,8 +68,11 @@ func (b *BlobBucket) Close() error {
func (b *BlobBucket) Delete(ctx context.Context, id storage.BlobID) error {
err := b.withTx(ctx, func(tx *sql.Tx) error {
query := `DELETE FROM blobs WHERE bucket = $1 AND id = $2`
args := []any{b.name, id}
if _, err := tx.ExecContext(ctx, query, b.name, id); err != nil {
logger.Debug(ctx, "executing query", logger.F("query", query), logger.F("args", args))
if _, err := tx.ExecContext(ctx, query, args...); err != nil {
return errors.WithStack(err)
}
@ -88,7 +91,11 @@ func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobIn
err := b.withTx(ctx, func(tx *sql.Tx) error {
query := `SELECT content_type, mod_time, size FROM blobs WHERE bucket = $1 AND id = $2`
row := tx.QueryRowContext(ctx, query, b.name, id)
args := []any{b.name, id}
logger.Debug(ctx, "executing query", logger.F("query", query), logger.F("args", args))
row := tx.QueryRowContext(ctx, query, args...)
var (
contentType string
@ -127,8 +134,11 @@ func (b *BlobBucket) List(ctx context.Context) ([]storage.BlobInfo, error) {
err := b.withTx(ctx, func(tx *sql.Tx) error {
query := `SELECT id, content_type, mod_time, size FROM blobs WHERE bucket = $1`
args := []any{b.name}
rows, err := tx.QueryContext(ctx, query, b.name)
logger.Debug(ctx, "executing query", logger.F("query", query), logger.F("args", args))
rows, err := tx.QueryContext(ctx, query, args...)
if err != nil {
return errors.WithStack(err)
}
@ -229,7 +239,12 @@ type blobWriterCloser struct {
// Write implements io.WriteCloser
func (wbc *blobWriterCloser) Write(p []byte) (int, error) {
logger.Debug(context.Background(), "writing data to blob", logger.F("data", p))
logger.Debug(
context.Background(), "writing data to blob",
logger.F("size", len(p)),
logger.F("blobID", wbc.id),
logger.F("bucket", wbc.bucket),
)
n, err := wbc.buf.Write(p)
if err != nil {
@ -266,14 +281,20 @@ func (wbc *blobWriterCloser) Close() error {
mime := mimetype.Detect(data)
modTime := time.Now().UTC()
_, err := tx.Exec(
query,
args := []any{
wbc.bucket,
wbc.id,
data,
mime.String(),
modTime,
len(data),
}
logger.Debug(ctx, "executing query", logger.F("query", query))
_, err := tx.Exec(
query,
args...,
)
if err != nil {
return errors.WithStack(err)

View File

@ -36,7 +36,7 @@ func (s *BlobStore) ListBuckets(ctx context.Context) ([]string, error) {
buckets := make([]string, 0)
err := s.withTx(ctx, func(tx *sql.Tx) error {
query := `SELECT DISTINCT name FROM blobs`
query := `SELECT DISTINCT bucket FROM blobs`
rows, err := tx.QueryContext(ctx, query)
if err != nil {
return errors.WithStack(err)