Compare commits
12 Commits
1f4f795
...
v2023.4.2-
Author | SHA1 | Date | |
---|---|---|---|
f08f645432 | |||
fbb27d6ea4 | |||
d8ce2901d2 | |||
1996f4dc56 | |||
e09de0b0a4 | |||
72765de20b | |||
ed535b6f5d | |||
07452ad8ab | |||
0f0fdfb02b | |||
9eefce9b41 | |||
0577762be9 | |||
cf8a3f8ac0 |
18
Makefile
18
Makefile
@ -7,6 +7,8 @@ GOTEST_ARGS ?= -short
|
||||
ESBUILD_VERSION ?= v0.17.5
|
||||
|
||||
GIT_VERSION := $(shell git describe --always)
|
||||
DATE_VERSION := $(shell date +%Y.%-m.%-d)
|
||||
FULL_VERSION := v$(DATE_VERSION)-$(GIT_VERSION)$(if $(shell git diff --stat),-dirty,)
|
||||
|
||||
build: build-edge-cli
|
||||
|
||||
@ -57,28 +59,34 @@ pkg/sdk/client/dist/client.js: tools/esbuild/bin/esbuild node_modules
|
||||
node_modules:
|
||||
npm ci
|
||||
|
||||
gitea-release: tools/gitea-release/bin/gitea-release.sh build
|
||||
gitea-release: tools/yq/bin/yq tools/gitea-release/bin/gitea-release.sh build
|
||||
mkdir -p .gitea-release
|
||||
rm -rf .gitea-release/*
|
||||
|
||||
cp bin/cli .gitea-release/edge_cli_amd64
|
||||
|
||||
# Create client-sdk-testsuite package
|
||||
tools/yq/bin/yq -i '.version = "$(FULL_VERSION)"' ./misc/client-sdk-testsuite/dist/manifest.yml
|
||||
.gitea-release/edge_cli_amd64 app package -d ./misc/client-sdk-testsuite/dist -o .gitea-release
|
||||
|
||||
GITEA_RELEASE_PROJECT="edge" \
|
||||
GITEA_RELEASE_ORG="arcad" \
|
||||
GITEA_RELEASE_BASE_URL="https://forge.cadoles.com" \
|
||||
GITEA_RELEASE_VERSION="$(GIT_VERSION)" \
|
||||
GITEA_RELEASE_NAME="$(GIT_VERSION)" \
|
||||
GITEA_RELEASE_VERSION="$(FULL_VERSION)" \
|
||||
GITEA_RELEASE_NAME="$(FULL_VERSION)" \
|
||||
GITEA_RELEASE_COMMITISH_TARGET="$(GIT_VERSION)" \
|
||||
GITEA_RELEASE_IS_DRAFT="false" \
|
||||
GITEA_RELEASE_IS_PRERELEASE="true" \
|
||||
GITEA_RELEASE_BODY="" \
|
||||
GITEA_RELEASE_ATTACHMENTS="$(shell find .gitea-release/* -type f)" \
|
||||
GITEA_RELEASE_ATTACHMENTS="$$(find .gitea-release/* -type f)" \
|
||||
tools/gitea-release/bin/gitea-release.sh
|
||||
|
||||
tools/gitea-release/bin/gitea-release.sh:
|
||||
mkdir -p tools/gitea-release/bin
|
||||
curl --output tools/gitea-release/bin/gitea-release.sh https://forge.cadoles.com/Cadoles/Jenkins/raw/branch/master/resources/com/cadoles/gitea/gitea-release.sh
|
||||
chmod +x tools/gitea-release/bin/gitea-release.sh
|
||||
chmod +x tools/gitea-release/bin/gitea-release.sh
|
||||
|
||||
tools/yq/bin/yq:
|
||||
mkdir -p tools/yq/bin
|
||||
curl -L --output tools/yq/bin/yq https://github.com/mikefarah/yq/releases/download/v4.31.1/yq_linux_amd64
|
||||
chmod +x tools/yq/bin/yq
|
@ -1,7 +1,9 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
@ -13,9 +15,13 @@ import (
|
||||
"forge.cadoles.com/arcad/edge/pkg/bus/memory"
|
||||
appHTTP "forge.cadoles.com/arcad/edge/pkg/http"
|
||||
"forge.cadoles.com/arcad/edge/pkg/module"
|
||||
appModule "forge.cadoles.com/arcad/edge/pkg/module/app"
|
||||
appModuleMemory "forge.cadoles.com/arcad/edge/pkg/module/app/memory"
|
||||
"forge.cadoles.com/arcad/edge/pkg/module/auth"
|
||||
authHTTP "forge.cadoles.com/arcad/edge/pkg/module/auth/http"
|
||||
"forge.cadoles.com/arcad/edge/pkg/module/blob"
|
||||
"forge.cadoles.com/arcad/edge/pkg/module/cast"
|
||||
"forge.cadoles.com/arcad/edge/pkg/module/fetch"
|
||||
"forge.cadoles.com/arcad/edge/pkg/module/net"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/sqlite"
|
||||
@ -105,6 +111,10 @@ func RunCommand() *cli.Command {
|
||||
|
||||
storageFile := injectAppID(ctx.String("storage-file"), manifest.ID)
|
||||
|
||||
if err := ensureDir(storageFile); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
db, err := sqlite.Open(storageFile)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
@ -116,7 +126,7 @@ func RunCommand() *cli.Command {
|
||||
|
||||
handler := appHTTP.NewHandler(
|
||||
appHTTP.WithBus(bus),
|
||||
appHTTP.WithServerModules(getServerModules(bus, ds, bs)...),
|
||||
appHTTP.WithServerModules(getServerModules(bus, ds, bs, manifest, address)...),
|
||||
)
|
||||
if err := handler.Load(bundle); err != nil {
|
||||
return errors.Wrap(err, "could not load app bundle")
|
||||
@ -157,7 +167,7 @@ func RunCommand() *cli.Command {
|
||||
}
|
||||
}
|
||||
|
||||
func getServerModules(bus bus.Bus, ds storage.DocumentStore, bs storage.BlobStore) []app.ServerModuleFactory {
|
||||
func getServerModules(bus bus.Bus, ds storage.DocumentStore, bs storage.BlobStore, manifest *app.Manifest, address string) []app.ServerModuleFactory {
|
||||
return []app.ServerModuleFactory{
|
||||
module.ContextModuleFactory(),
|
||||
module.ConsoleModuleFactory(),
|
||||
@ -166,7 +176,7 @@ func getServerModules(bus bus.Bus, ds storage.DocumentStore, bs storage.BlobStor
|
||||
net.ModuleFactory(bus),
|
||||
module.RPCModuleFactory(bus),
|
||||
module.StoreModuleFactory(ds),
|
||||
module.BlobModuleFactory(bus, bs),
|
||||
blob.ModuleFactory(bus, bs),
|
||||
module.Extends(
|
||||
auth.ModuleFactory(
|
||||
auth.WithJWT(dummyKeySet),
|
||||
@ -189,6 +199,17 @@ func getServerModules(bus bus.Bus, ds storage.DocumentStore, bs storage.BlobStor
|
||||
}
|
||||
},
|
||||
),
|
||||
appModule.ModuleFactory(appModuleMemory.NewRepository(
|
||||
func(ctx context.Context, i app.ID) (string, error) {
|
||||
if strings.HasPrefix(address, ":") {
|
||||
address = "0.0.0.0" + address
|
||||
}
|
||||
|
||||
return fmt.Sprintf("http://%s", address), nil
|
||||
},
|
||||
manifest,
|
||||
)),
|
||||
fetch.ModuleFactory(bus),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -51,6 +51,10 @@ Edge.connect().then(() => {
|
||||
|
||||
> `TODO`
|
||||
|
||||
### `Edge.externalUrl(url: string): string`
|
||||
|
||||
Retourne une URL "locale" permettant d'accéder à une ressource externe, en fonction de règles propres à l'application. Voir module [`fetch`](../server-api/fetch.md).
|
||||
|
||||
## Événements
|
||||
|
||||
### `"message"`
|
||||
|
@ -20,11 +20,13 @@ function onInit() {
|
||||
|
||||
Listes des modules disponibles côté serveur.
|
||||
|
||||
- [`app`](./app.md)
|
||||
- [`auth`](./auth.md)
|
||||
- [`blob`](./blob.md)
|
||||
- [`cast`](./cast.md)
|
||||
- [`console`](./console.md)
|
||||
- [`context`](./context.md)
|
||||
- [`fetch`](./fetch.md)
|
||||
- [`net`](./net.md)
|
||||
- [`rpc`](./rpc.md)
|
||||
- [`store`](./store.md)
|
||||
|
57
doc/apps/server-api/app.md
Normal file
57
doc/apps/server-api/app.md
Normal file
@ -0,0 +1,57 @@
|
||||
# Module `app`
|
||||
|
||||
Ce module permet de récupérer des informations sur les applications actives dans l'environnement Edge courant.
|
||||
|
||||
## Méthodes
|
||||
|
||||
### `app.list(ctx: Context): []Manifest`
|
||||
|
||||
Récupère la liste des applications actives.
|
||||
|
||||
#### Arguments
|
||||
|
||||
- `ctx` **Context** Le contexte d'exécution. Voir la documentation du module [`context`](./context.md)
|
||||
|
||||
#### Valeur de retour
|
||||
|
||||
Liste des objets `Manifest` décrivant chaque application active.
|
||||
|
||||
### `app.get(ctx: Context, appId: string): Manifest`
|
||||
|
||||
Récupère les informations de l'application identifiée par `appId`.
|
||||
|
||||
#### Arguments
|
||||
|
||||
- `ctx` **Context** Le contexte d'exécution. Voir la documentation du module [`context`](./context.md)
|
||||
- `appId` **string** Identifiant de l'application
|
||||
|
||||
#### Valeur de retour
|
||||
|
||||
Objet `Manifest` associé à l'application, ou `null` si aucune application n'a été trouvée correspondant à l'identifiant.
|
||||
|
||||
### `app.getUrl(ctx: Context, appId: string): Manifest`
|
||||
|
||||
Retourne l'URL permettant d'accéder à l'application identifiée par `appId`.
|
||||
|
||||
#### Arguments
|
||||
|
||||
- `ctx` **Context** Le contexte d'exécution. Voir la documentation du module [`context`](./context.md)
|
||||
- `appId` **string** Identifiant de l'application
|
||||
|
||||
#### Valeur de retour
|
||||
|
||||
URL associée à l'application, ou `null` si aucune application n'a été trouvée correspondant à l'identifiant.
|
||||
|
||||
## Objets
|
||||
|
||||
### `Manifest`
|
||||
|
||||
```typescript
|
||||
interface Manifest {
|
||||
id: string // Identifiant de l'application
|
||||
version: string // Version de l'application
|
||||
title: string // Titre associé à l'application
|
||||
description: string // Description associée à l'application
|
||||
tags: string[] // Mots clés associés à l'application
|
||||
}
|
||||
```
|
@ -38,11 +38,15 @@ function onBlobDownload(ctx, bucketName, blobId) {
|
||||
|
||||
> `TODO`
|
||||
|
||||
### `blob.writeBlob(ctx: Context, bucketName: string, blobId: string)`
|
||||
### `blob.getBlobInfo(ctx: Context, bucketName: string, blobId: string): BlobInfo`
|
||||
|
||||
> `TODO`
|
||||
|
||||
### `blob.readBlob(ctx: Context, bucketName: string, blobId: string)`
|
||||
### `blob.writeBlob(ctx: Context, bucketName: string, blobId: string, data: any)`
|
||||
|
||||
> `TODO`
|
||||
|
||||
### `blob.readBlob(ctx: Context, bucketName: string, blobId: string): ArrayBuffer`
|
||||
|
||||
> `TODO`
|
||||
|
||||
@ -58,7 +62,7 @@ function onBlobDownload(ctx, bucketName, blobId) {
|
||||
|
||||
> `TODO`
|
||||
|
||||
### `blob.getBlobInfo(ctx: Context, bucketName: string, blobId: string): BlobInfo`
|
||||
### `blob.getBucketSize(ctx: Context, bucketName: string): number`
|
||||
|
||||
> `TODO`
|
||||
|
||||
@ -70,4 +74,16 @@ Voir la documentation de l'objet [`Context`](./context.md#Context).
|
||||
|
||||
### `BlobInfo`
|
||||
|
||||
### `Metadata`
|
||||
```typescript
|
||||
interface BlobInfo {
|
||||
id: string // Identifiant du blob
|
||||
bucket: string // Nom du bucket contenant le blob
|
||||
size: number // Taille du blob
|
||||
modTime: number // Timestamp Unix de dernière modification du blob
|
||||
contentType: string // Type MIME du contenu du blob
|
||||
}
|
||||
```
|
||||
|
||||
### `Metadata`
|
||||
|
||||
L'objet `Metadata` est un objet clé/valeur arbitraire transmis avec la requête de téléversement. Voir la méthode [`Edge.upload(blob, metadata)`](../client-api/README.md#edge-upload-blob-blob-metadata-object-promise) du SDK client.
|
33
doc/apps/server-api/fetch.md
Normal file
33
doc/apps/server-api/fetch.md
Normal file
@ -0,0 +1,33 @@
|
||||
# Module `fetch`
|
||||
|
||||
Ce module permet l'accès à des ressources distantes (sur Internet) depuis votre application.
|
||||
|
||||
## Fonctions de rappel
|
||||
|
||||
Pour permettre aux utilisateurs d'accéder à des ressources distantes, vous devez déclarer la fonction `onClientFetch(ctx: Context, url: string, remoteAddr: string)` dans le fichier `server/main.js` de votre application.
|
||||
|
||||
### `onClientFetch(ctx: Context, url: string, remoteAddr: string)`
|
||||
|
||||
#### Usage
|
||||
|
||||
**Côté client**
|
||||
```js
|
||||
// Création d'une URL "locale" permettant d'accéder à la ressource distante
|
||||
var url = Edge.externalUrl("http://example.com")
|
||||
|
||||
// Vous pouvez utiliser l'URL comme attribut `src` d'une balise <img> par exemple
|
||||
// ou effectuer une requête fetch() avec celle ci.
|
||||
fetch(url).then(res => res.text()).then(content => console.log(content));
|
||||
```
|
||||
|
||||
**Côté serveur**
|
||||
```js
|
||||
function onClientFetch(ctx, url, remoteAddr) {
|
||||
// Autoriser la récupération de l'URL demandée ou non
|
||||
// Dans cet exemple, seule l'URL externe 'http://example.com' est autorisée
|
||||
// Les autres URLs recevront une erreur HTTP 403 - Forbidden
|
||||
var authorized = url === "http://example.com"
|
||||
|
||||
return { allow: authorized };
|
||||
}
|
||||
```
|
@ -25,6 +25,8 @@
|
||||
<script src="/test/net-module.js"></script>
|
||||
<script src="/test/rpc-module.js"></script>
|
||||
<script src="/test/file-module.js"></script>
|
||||
<script src="/test/app-module.js"></script>
|
||||
<script src="/test/fetch-module.js"></script>
|
||||
<script class="mocha-exec">
|
||||
mocha.run();
|
||||
</script>
|
||||
|
37
misc/client-sdk-testsuite/src/public/test/app-module.js
Normal file
37
misc/client-sdk-testsuite/src/public/test/app-module.js
Normal file
@ -0,0 +1,37 @@
|
||||
describe('App Module', function() {
|
||||
|
||||
before(() => {
|
||||
return Edge.connect();
|
||||
});
|
||||
|
||||
after(() => {
|
||||
Edge.disconnect();
|
||||
});
|
||||
|
||||
it('should list apps', function() {
|
||||
return Edge.rpc("listApps")
|
||||
.then(apps => {
|
||||
console.log("listApps result:", apps);
|
||||
chai.assert.isNotNull(apps);
|
||||
chai.assert.isAtLeast(apps.length, 1);
|
||||
})
|
||||
});
|
||||
|
||||
it('should retrieve requested app', function() {
|
||||
return Edge.rpc("getApp", { appId: "edge.sdk.client.test" })
|
||||
.then(app => {
|
||||
console.log("getApp result:", app);
|
||||
chai.assert.isNotNull(app);
|
||||
chai.assert.equal(app.id, "edge.sdk.client.test");
|
||||
})
|
||||
});
|
||||
|
||||
it('should retrieve requested app url', function() {
|
||||
return Edge.rpc("getAppUrl", { appId: "edge.sdk.client.test" })
|
||||
.then(url => {
|
||||
console.log("getAppUrl result:", url);
|
||||
chai.assert.isNotEmpty(url);
|
||||
})
|
||||
});
|
||||
|
||||
});
|
33
misc/client-sdk-testsuite/src/public/test/fetch-module.js
Normal file
33
misc/client-sdk-testsuite/src/public/test/fetch-module.js
Normal file
@ -0,0 +1,33 @@
|
||||
describe('Fetch Module', function () {
|
||||
|
||||
before(() => {
|
||||
return Edge.connect();
|
||||
});
|
||||
|
||||
after(() => {
|
||||
Edge.disconnect();
|
||||
});
|
||||
|
||||
it('should fetch an authorized external url', function () {
|
||||
var externalUrl = Edge.externalUrl("http://example.com");
|
||||
|
||||
return fetch(externalUrl)
|
||||
.then(res => {
|
||||
chai.assert.equal(res.status, 200)
|
||||
return res.text()
|
||||
})
|
||||
.then(content => {
|
||||
chai.assert.include(content, '<h1>Example Domain</h1>')
|
||||
})
|
||||
});
|
||||
|
||||
it('should not fetch an unauthorized external url', function () {
|
||||
var externalUrl = Edge.externalUrl("https://google.com");
|
||||
|
||||
return fetch(externalUrl)
|
||||
.then(res => {
|
||||
chai.assert.equal(res.status, 403)
|
||||
})
|
||||
});
|
||||
|
||||
});
|
@ -11,6 +11,10 @@ function onInit() {
|
||||
rpc.register("reset", reset);
|
||||
rpc.register("total", total);
|
||||
rpc.register("getUserInfo", getUserInfo);
|
||||
|
||||
rpc.register("listApps");
|
||||
rpc.register("getApp");
|
||||
rpc.register("getAppUrl");
|
||||
}
|
||||
|
||||
// Called for each client message
|
||||
@ -79,4 +83,22 @@ function getUserInfo(ctx, params) {
|
||||
role: role,
|
||||
preferredUsername: preferredUsername,
|
||||
};
|
||||
}
|
||||
|
||||
function listApps(ctx) {
|
||||
return app.list(ctx);
|
||||
}
|
||||
|
||||
function getApp(ctx, params) {
|
||||
var appId = params.appId;
|
||||
return app.get(ctx, appId);
|
||||
}
|
||||
|
||||
function getAppUrl(ctx, params) {
|
||||
var appId = params.appId;
|
||||
return app.getUrl(ctx, appId);
|
||||
}
|
||||
|
||||
function onClientFetch(ctx, url, remoteAddr) {
|
||||
return { allow: url === 'http://example.com' };
|
||||
}
|
@ -9,11 +9,11 @@ import (
|
||||
type ID string
|
||||
|
||||
type Manifest struct {
|
||||
ID ID `yaml:"id"`
|
||||
Version string `yaml:"version"`
|
||||
Title string `yaml:"title"`
|
||||
Description string `yaml:"description"`
|
||||
Tags []string `yaml:"tags"`
|
||||
ID ID `yaml:"id" json:"id"`
|
||||
Version string `yaml:"version" json:"version"`
|
||||
Title string `yaml:"title" json:"title"`
|
||||
Description string `yaml:"description" json:"description"`
|
||||
Tags []string `yaml:"tags" json:"tags"`
|
||||
}
|
||||
|
||||
func LoadManifest(b bundle.Bundle) (*Manifest, error) {
|
||||
|
@ -1,12 +1,9 @@
|
||||
package memory
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/bus"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
type eventDispatcherSet struct {
|
||||
@ -89,8 +86,6 @@ func (d *eventDispatcher) IsOut(out <-chan bus.Message) bool {
|
||||
}
|
||||
|
||||
func (d *eventDispatcher) Run() {
|
||||
ctx := context.Background()
|
||||
|
||||
for {
|
||||
msg, ok := <-d.in
|
||||
if !ok {
|
||||
@ -99,12 +94,7 @@ func (d *eventDispatcher) Run() {
|
||||
return
|
||||
}
|
||||
|
||||
timeout := time.After(2 * time.Second)
|
||||
select {
|
||||
case d.out <- msg:
|
||||
case <-timeout:
|
||||
logger.Error(ctx, "message out chan timed out", logger.F("message", msg))
|
||||
}
|
||||
d.out <- msg
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/bus"
|
||||
"forge.cadoles.com/arcad/edge/pkg/module"
|
||||
"forge.cadoles.com/arcad/edge/pkg/module/blob"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/pkg/errors"
|
||||
@ -68,7 +69,7 @@ func (h *Handler) handleAppUpload(w http.ResponseWriter, r *http.Request) {
|
||||
ContextKeyOriginRequest: r,
|
||||
})
|
||||
|
||||
requestMsg := module.NewMessageUploadRequest(ctx, fileHeader, metadata)
|
||||
requestMsg := blob.NewMessageUploadRequest(ctx, fileHeader, metadata)
|
||||
|
||||
reply, err := h.bus.Request(ctx, requestMsg)
|
||||
if err != nil {
|
||||
@ -80,7 +81,7 @@ func (h *Handler) handleAppUpload(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
logger.Debug(ctx, "upload reply", logger.F("reply", reply))
|
||||
|
||||
responseMsg, ok := reply.(*module.MessageUploadResponse)
|
||||
responseMsg, ok := reply.(*blob.MessageUploadResponse)
|
||||
if !ok {
|
||||
logger.Error(
|
||||
ctx, "unexpected upload response message",
|
||||
@ -120,7 +121,7 @@ func (h *Handler) handleAppDownload(w http.ResponseWriter, r *http.Request) {
|
||||
ContextKeyOriginRequest: r,
|
||||
})
|
||||
|
||||
requestMsg := module.NewMessageDownloadRequest(ctx, bucket, storage.BlobID(blobID))
|
||||
requestMsg := blob.NewMessageDownloadRequest(ctx, bucket, storage.BlobID(blobID))
|
||||
|
||||
reply, err := h.bus.Request(ctx, requestMsg)
|
||||
if err != nil {
|
||||
@ -130,7 +131,7 @@ func (h *Handler) handleAppDownload(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
replyMsg, ok := reply.(*module.MessageDownloadResponse)
|
||||
replyMsg, ok := reply.(*blob.MessageDownloadResponse)
|
||||
if !ok {
|
||||
logger.Error(
|
||||
ctx, "unexpected download response message",
|
||||
|
112
pkg/http/fetch.go
Normal file
112
pkg/http/fetch.go
Normal file
@ -0,0 +1,112 @@
|
||||
package http
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/module"
|
||||
"forge.cadoles.com/arcad/edge/pkg/module/fetch"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
func (h *Handler) handleAppFetch(w http.ResponseWriter, r *http.Request) {
|
||||
h.mutex.RLock()
|
||||
defer h.mutex.RUnlock()
|
||||
|
||||
ctx := r.Context()
|
||||
|
||||
ctx = module.WithContext(ctx, map[module.ContextKey]any{
|
||||
ContextKeyOriginRequest: r,
|
||||
})
|
||||
|
||||
rawURL := r.URL.Query().Get("url")
|
||||
|
||||
url, err := url.Parse(rawURL)
|
||||
if err != nil {
|
||||
jsonError(w, http.StatusBadRequest, errorCodeBadRequest)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
requestMsg := fetch.NewMessageFetchRequest(ctx, r.RemoteAddr, url)
|
||||
|
||||
reply, err := h.bus.Request(ctx, requestMsg)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "could not retrieve fetch request reply", logger.E(errors.WithStack(err)))
|
||||
jsonError(w, http.StatusInternalServerError, errorCodeInternalError)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
logger.Debug(ctx, "fetch reply", logger.F("reply", reply))
|
||||
|
||||
responseMsg, ok := reply.(*fetch.MessageFetchResponse)
|
||||
if !ok {
|
||||
logger.Error(
|
||||
ctx, "unexpected fetch response message",
|
||||
logger.F("message", reply),
|
||||
)
|
||||
jsonError(w, http.StatusInternalServerError, errorCodeInternalError)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if !responseMsg.Allow {
|
||||
jsonError(w, http.StatusForbidden, errorCodeForbidden)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
proxyReq, err := http.NewRequest(http.MethodGet, url.String(), nil)
|
||||
if err != nil {
|
||||
logger.Error(
|
||||
ctx, "could not create proxy request",
|
||||
logger.E(errors.WithStack(err)),
|
||||
)
|
||||
jsonError(w, http.StatusInternalServerError, errorCodeInternalError)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
for header, values := range r.Header {
|
||||
for _, value := range values {
|
||||
proxyReq.Header.Add(header, value)
|
||||
}
|
||||
}
|
||||
|
||||
proxyReq.Header.Add("X-Forwarded-From", r.RemoteAddr)
|
||||
|
||||
res, err := h.httpClient.Do(proxyReq)
|
||||
if err != nil {
|
||||
logger.Error(
|
||||
ctx, "could not execute proxy request",
|
||||
logger.E(errors.WithStack(err)),
|
||||
)
|
||||
jsonError(w, http.StatusInternalServerError, errorCodeInternalError)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := res.Body.Close(); err != nil {
|
||||
logger.Error(
|
||||
ctx, "could not close response body",
|
||||
logger.E(errors.WithStack(err)),
|
||||
)
|
||||
}
|
||||
}()
|
||||
|
||||
for header, values := range res.Header {
|
||||
for _, value := range values {
|
||||
w.Header().Add(header, value)
|
||||
}
|
||||
}
|
||||
|
||||
w.WriteHeader(res.StatusCode)
|
||||
|
||||
if _, err := io.Copy(w, res.Body); err != nil {
|
||||
panic(errors.WithStack(err))
|
||||
}
|
||||
}
|
@ -31,6 +31,8 @@ type Handler struct {
|
||||
server *app.Server
|
||||
serverModuleFactories []app.ServerModuleFactory
|
||||
|
||||
httpClient *http.Client
|
||||
|
||||
mutex sync.RWMutex
|
||||
}
|
||||
|
||||
@ -91,6 +93,7 @@ func NewHandler(funcs ...HandlerOptionFunc) *Handler {
|
||||
sockjsOpts: opts.SockJS,
|
||||
router: router,
|
||||
serverModuleFactories: opts.ServerModuleFactories,
|
||||
httpClient: opts.HTTPClient,
|
||||
bus: opts.Bus,
|
||||
}
|
||||
|
||||
@ -103,6 +106,8 @@ func NewHandler(funcs ...HandlerOptionFunc) *Handler {
|
||||
r.Route("/api/v1", func(r chi.Router) {
|
||||
r.Post("/upload", handler.handleAppUpload)
|
||||
r.Get("/download/{bucket}/{blobID}", handler.handleAppDownload)
|
||||
|
||||
r.Get("/fetch", handler.handleAppFetch)
|
||||
})
|
||||
|
||||
r.HandleFunc("/sock/*", handler.handleSockJS)
|
||||
|
@ -1,6 +1,7 @@
|
||||
package http
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/app"
|
||||
@ -14,6 +15,7 @@ type HandlerOptions struct {
|
||||
SockJS sockjs.Options
|
||||
ServerModuleFactories []app.ServerModuleFactory
|
||||
UploadMaxFileSize int64
|
||||
HTTPClient *http.Client
|
||||
}
|
||||
|
||||
func defaultHandlerOptions() *HandlerOptions {
|
||||
@ -27,6 +29,9 @@ func defaultHandlerOptions() *HandlerOptions {
|
||||
SockJS: sockjsOptions,
|
||||
ServerModuleFactories: make([]app.ServerModuleFactory, 0),
|
||||
UploadMaxFileSize: 10 << (10 * 2), // 10Mb
|
||||
HTTPClient: &http.Client{
|
||||
Timeout: time.Second * 30,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@ -55,3 +60,9 @@ func WithUploadMaxFileSize(size int64) HandlerOptionFunc {
|
||||
opts.UploadMaxFileSize = size
|
||||
}
|
||||
}
|
||||
|
||||
func WithHTTPClient(client *http.Client) HandlerOptionFunc {
|
||||
return func(opts *HandlerOptions) {
|
||||
opts.HTTPClient = client
|
||||
}
|
||||
}
|
||||
|
5
pkg/module/app/error.go
Normal file
5
pkg/module/app/error.go
Normal file
@ -0,0 +1,5 @@
|
||||
package app
|
||||
|
||||
import "errors"
|
||||
|
||||
var ErrNotFound = errors.New("not found")
|
58
pkg/module/app/memory/module_test.go
Normal file
58
pkg/module/app/memory/module_test.go
Normal file
@ -0,0 +1,58 @@
|
||||
package memory
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"testing"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/app"
|
||||
"forge.cadoles.com/arcad/edge/pkg/module"
|
||||
appModule "forge.cadoles.com/arcad/edge/pkg/module/app"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func TestAppModuleWithMemoryRepository(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
server := app.NewServer(
|
||||
module.ContextModuleFactory(),
|
||||
module.ConsoleModuleFactory(),
|
||||
appModule.ModuleFactory(NewRepository(
|
||||
func(ctx context.Context, id app.ID) (string, error) {
|
||||
return fmt.Sprintf("http//%s.example.com", id), nil
|
||||
},
|
||||
&app.Manifest{
|
||||
ID: "dummy1.arcad.app",
|
||||
Version: "0.0.0",
|
||||
Title: "Dummy 1",
|
||||
Description: "Dummy App 1",
|
||||
Tags: []string{"dummy", "first"},
|
||||
},
|
||||
&app.Manifest{
|
||||
ID: "dummy2.arcad.app",
|
||||
Version: "0.0.0",
|
||||
Title: "Dummy 2",
|
||||
Description: "Dummy App 2",
|
||||
Tags: []string{"dummy", "second"},
|
||||
},
|
||||
)),
|
||||
)
|
||||
|
||||
file := "testdata/app.js"
|
||||
|
||||
data, err := ioutil.ReadFile(file)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := server.Load(file, string(data)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
defer server.Stop()
|
||||
|
||||
if err := server.Start(); err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
}
|
50
pkg/module/app/memory/repository.go
Normal file
50
pkg/module/app/memory/repository.go
Normal file
@ -0,0 +1,50 @@
|
||||
package memory
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/app"
|
||||
module "forge.cadoles.com/arcad/edge/pkg/module/app"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type GetURLFunc func(context.Context, app.ID) (string, error)
|
||||
|
||||
type Repository struct {
|
||||
getURL GetURLFunc
|
||||
apps []*app.Manifest
|
||||
}
|
||||
|
||||
// GetURL implements app.Repository
|
||||
func (r *Repository) GetURL(ctx context.Context, id app.ID) (string, error) {
|
||||
url, err := r.getURL(ctx, id)
|
||||
if err != nil {
|
||||
return "", errors.WithStack(err)
|
||||
}
|
||||
|
||||
return url, nil
|
||||
}
|
||||
|
||||
// Get implements app.Repository
|
||||
func (r *Repository) Get(ctx context.Context, id app.ID) (*app.Manifest, error) {
|
||||
for _, app := range r.apps {
|
||||
if app.ID != id {
|
||||
continue
|
||||
}
|
||||
|
||||
return app, nil
|
||||
}
|
||||
|
||||
return nil, module.ErrNotFound
|
||||
}
|
||||
|
||||
// List implements app.Repository
|
||||
func (r *Repository) List(ctx context.Context) ([]*app.Manifest, error) {
|
||||
return r.apps, nil
|
||||
}
|
||||
|
||||
func NewRepository(getURL GetURLFunc, manifests ...*app.Manifest) *Repository {
|
||||
return &Repository{getURL, manifests}
|
||||
}
|
||||
|
||||
var _ module.Repository = &Repository{}
|
17
pkg/module/app/memory/testdata/app.js
vendored
Normal file
17
pkg/module/app/memory/testdata/app.js
vendored
Normal file
@ -0,0 +1,17 @@
|
||||
var ctx = context.new();
|
||||
|
||||
var manifests = app.list(ctx);
|
||||
|
||||
if (manifests.length !== 2) {
|
||||
throw new Error("apps.length: expected '2', got '"+manifests.length+"'");
|
||||
}
|
||||
|
||||
var manifest = app.get(ctx, 'dummy2.arcad.app');
|
||||
|
||||
if (!manifest) {
|
||||
throw new Error("manifest should not be null");
|
||||
}
|
||||
|
||||
if (manifest.id !== "dummy2.arcad.app") {
|
||||
throw new Error("manifest.id: expected 'dummy2.arcad.app', got '"+manifest.id+"'");
|
||||
}
|
117
pkg/module/app/module.go
Normal file
117
pkg/module/app/module.go
Normal file
@ -0,0 +1,117 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/app"
|
||||
"forge.cadoles.com/arcad/edge/pkg/module/util"
|
||||
"github.com/dop251/goja"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type Module struct {
|
||||
repository Repository
|
||||
}
|
||||
|
||||
type gojaManifest struct {
|
||||
ID string `goja:"id" json:"id"`
|
||||
Version string `goja:"version" json:"version"`
|
||||
Title string `goja:"title" json:"title"`
|
||||
Description string `goja:"description" json:"description"`
|
||||
Tags []string `goja:"tags" json:"tags"`
|
||||
}
|
||||
|
||||
func toGojaManifest(manifest *app.Manifest) *gojaManifest {
|
||||
return &gojaManifest{
|
||||
ID: string(manifest.ID),
|
||||
Version: manifest.Version,
|
||||
Title: manifest.Title,
|
||||
Description: manifest.Description,
|
||||
Tags: manifest.Tags,
|
||||
}
|
||||
}
|
||||
|
||||
func toGojaManifests(manifests []*app.Manifest) []*gojaManifest {
|
||||
gojaManifests := make([]*gojaManifest, len(manifests))
|
||||
|
||||
for i, m := range manifests {
|
||||
gojaManifests[i] = toGojaManifest(m)
|
||||
}
|
||||
|
||||
return gojaManifests
|
||||
}
|
||||
|
||||
func (m *Module) Name() string {
|
||||
return "app"
|
||||
}
|
||||
|
||||
func (m *Module) Export(export *goja.Object) {
|
||||
if err := export.Set("list", m.list); err != nil {
|
||||
panic(errors.Wrap(err, "could not set 'list' function"))
|
||||
}
|
||||
|
||||
if err := export.Set("get", m.get); err != nil {
|
||||
panic(errors.Wrap(err, "could not set 'get' function"))
|
||||
}
|
||||
|
||||
if err := export.Set("getUrl", m.getURL); err != nil {
|
||||
panic(errors.Wrap(err, "could not set 'list' function"))
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Module) list(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
|
||||
ctx := util.AssertContext(call.Argument(0), rt)
|
||||
|
||||
manifests, err := m.repository.List(ctx)
|
||||
if err != nil {
|
||||
panic(rt.ToValue(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
return rt.ToValue(toGojaManifests(manifests))
|
||||
}
|
||||
|
||||
func (m *Module) get(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
|
||||
ctx := util.AssertContext(call.Argument(0), rt)
|
||||
appID := assertAppID(call.Argument(1), rt)
|
||||
|
||||
manifest, err := m.repository.Get(ctx, appID)
|
||||
if err != nil {
|
||||
panic(rt.ToValue(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
return rt.ToValue(toGojaManifest(manifest))
|
||||
}
|
||||
|
||||
func (m *Module) getURL(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
|
||||
ctx := util.AssertContext(call.Argument(0), rt)
|
||||
appID := assertAppID(call.Argument(1), rt)
|
||||
|
||||
url, err := m.repository.GetURL(ctx, appID)
|
||||
if err != nil {
|
||||
panic(rt.ToValue(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
return rt.ToValue(url)
|
||||
}
|
||||
|
||||
func ModuleFactory(repository Repository) app.ServerModuleFactory {
|
||||
return func(server *app.Server) app.ServerModule {
|
||||
return &Module{
|
||||
repository: repository,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func assertAppID(value goja.Value, rt *goja.Runtime) app.ID {
|
||||
appID, ok := value.Export().(app.ID)
|
||||
if !ok {
|
||||
rawAppID, ok := value.Export().(string)
|
||||
if !ok {
|
||||
panic(rt.NewTypeError(fmt.Sprintf("app id must be an appid or a string, got '%T'", value.Export())))
|
||||
}
|
||||
|
||||
appID = app.ID(rawAppID)
|
||||
}
|
||||
|
||||
return appID
|
||||
}
|
13
pkg/module/app/repository.go
Normal file
13
pkg/module/app/repository.go
Normal file
@ -0,0 +1,13 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/app"
|
||||
)
|
||||
|
||||
type Repository interface {
|
||||
List(context.Context) ([]*app.Manifest, error)
|
||||
Get(context.Context, app.ID) (*app.Manifest, error)
|
||||
GetURL(context.Context, app.ID) (string, error)
|
||||
}
|
@ -30,10 +30,12 @@ func init() {
|
||||
}
|
||||
|
||||
type LocalHandler struct {
|
||||
router chi.Router
|
||||
algo jwa.KeyAlgorithm
|
||||
key jwk.Key
|
||||
accounts map[string]LocalAccount
|
||||
router chi.Router
|
||||
algo jwa.KeyAlgorithm
|
||||
key jwk.Key
|
||||
cookieDomain string
|
||||
cookieDuration time.Duration
|
||||
accounts map[string]LocalAccount
|
||||
}
|
||||
|
||||
func (h *LocalHandler) initRouter(prefix string) {
|
||||
@ -119,7 +121,9 @@ func (h *LocalHandler) handleForm(w http.ResponseWriter, r *http.Request) {
|
||||
cookie := http.Cookie{
|
||||
Name: auth.CookieName,
|
||||
Value: string(token),
|
||||
Domain: h.cookieDomain,
|
||||
HttpOnly: false,
|
||||
Expires: time.Now().Add(h.cookieDuration),
|
||||
Path: "/",
|
||||
}
|
||||
|
||||
@ -134,6 +138,7 @@ func (h *LocalHandler) handleLogout(w http.ResponseWriter, r *http.Request) {
|
||||
Value: "",
|
||||
HttpOnly: false,
|
||||
Expires: time.Unix(0, 0),
|
||||
Domain: h.cookieDomain,
|
||||
Path: "/",
|
||||
})
|
||||
|
||||
@ -165,9 +170,11 @@ func NewLocalHandler(algo jwa.KeyAlgorithm, key jwk.Key, funcs ...LocalHandlerOp
|
||||
}
|
||||
|
||||
handler := &LocalHandler{
|
||||
algo: algo,
|
||||
key: key,
|
||||
accounts: toAccountsMap(opts.Accounts),
|
||||
algo: algo,
|
||||
key: key,
|
||||
accounts: toAccountsMap(opts.Accounts),
|
||||
cookieDomain: opts.CookieDomain,
|
||||
cookieDuration: opts.CookieDuration,
|
||||
}
|
||||
|
||||
handler.initRouter(opts.RoutePrefix)
|
||||
|
@ -1,16 +1,22 @@
|
||||
package http
|
||||
|
||||
import "time"
|
||||
|
||||
type LocalHandlerOptions struct {
|
||||
RoutePrefix string
|
||||
Accounts []LocalAccount
|
||||
RoutePrefix string
|
||||
Accounts []LocalAccount
|
||||
CookieDomain string
|
||||
CookieDuration time.Duration
|
||||
}
|
||||
|
||||
type LocalHandlerOptionFunc func(*LocalHandlerOptions)
|
||||
|
||||
func defaultLocalHandlerOptions() *LocalHandlerOptions {
|
||||
return &LocalHandlerOptions{
|
||||
RoutePrefix: "",
|
||||
Accounts: make([]LocalAccount, 0),
|
||||
RoutePrefix: "",
|
||||
Accounts: make([]LocalAccount, 0),
|
||||
CookieDomain: "",
|
||||
CookieDuration: 24 * time.Hour,
|
||||
}
|
||||
}
|
||||
|
||||
@ -25,3 +31,10 @@ func WithRoutePrefix(prefix string) LocalHandlerOptionFunc {
|
||||
opts.RoutePrefix = prefix
|
||||
}
|
||||
}
|
||||
|
||||
func WithCookieOptions(domain string, duration time.Duration) LocalHandlerOptionFunc {
|
||||
return func(opts *LocalHandlerOptions) {
|
||||
opts.CookieDomain = domain
|
||||
opts.CookieDuration = duration
|
||||
}
|
||||
}
|
||||
|
@ -61,6 +61,10 @@ func FindToken(r *http.Request, getKeySet GetKeySetFunc) (jwt.Token, error) {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
if keySet == nil {
|
||||
return nil, errors.New("no keyset")
|
||||
}
|
||||
|
||||
token, err := jwt.Parse([]byte(rawToken),
|
||||
jwt.WithKeySet(keySet, jws.WithRequireKid(false)),
|
||||
jwt.WithValidate(true),
|
||||
|
@ -1,282 +0,0 @@
|
||||
package module
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"mime/multipart"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/app"
|
||||
"forge.cadoles.com/arcad/edge/pkg/bus"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"github.com/dop251/goja"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
const (
|
||||
DefaultBlobBucket string = "default"
|
||||
)
|
||||
|
||||
type BlobModule struct {
|
||||
server *app.Server
|
||||
bus bus.Bus
|
||||
store storage.BlobStore
|
||||
}
|
||||
|
||||
func (m *BlobModule) Name() string {
|
||||
return "blob"
|
||||
}
|
||||
|
||||
func (m *BlobModule) Export(export *goja.Object) {
|
||||
}
|
||||
|
||||
func (m *BlobModule) handleMessages() {
|
||||
ctx := context.Background()
|
||||
|
||||
go func() {
|
||||
err := m.bus.Reply(ctx, MessageNamespaceUploadRequest, func(msg bus.Message) (bus.Message, error) {
|
||||
uploadRequest, ok := msg.(*MessageUploadRequest)
|
||||
if !ok {
|
||||
return nil, errors.Wrapf(bus.ErrUnexpectedMessage, "expected message upload request, got '%T'", msg)
|
||||
}
|
||||
|
||||
res, err := m.handleUploadRequest(uploadRequest)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "could not handle upload request", logger.E(errors.WithStack(err)))
|
||||
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
logger.Debug(ctx, "upload request response", logger.F("response", res))
|
||||
|
||||
return res, nil
|
||||
})
|
||||
if err != nil {
|
||||
panic(errors.WithStack(err))
|
||||
}
|
||||
}()
|
||||
|
||||
err := m.bus.Reply(ctx, MessageNamespaceDownloadRequest, func(msg bus.Message) (bus.Message, error) {
|
||||
downloadRequest, ok := msg.(*MessageDownloadRequest)
|
||||
if !ok {
|
||||
return nil, errors.Wrapf(bus.ErrUnexpectedMessage, "expected message download request, got '%T'", msg)
|
||||
}
|
||||
|
||||
res, err := m.handleDownloadRequest(downloadRequest)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "could not handle download request", logger.E(errors.WithStack(err)))
|
||||
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return res, nil
|
||||
})
|
||||
if err != nil {
|
||||
panic(errors.WithStack(err))
|
||||
}
|
||||
}
|
||||
|
||||
func (m *BlobModule) handleUploadRequest(req *MessageUploadRequest) (*MessageUploadResponse, error) {
|
||||
blobID := storage.NewBlobID()
|
||||
res := NewMessageUploadResponse(req.RequestID)
|
||||
|
||||
ctx := logger.With(req.Context, logger.F("blobID", blobID))
|
||||
|
||||
blobInfo := map[string]interface{}{
|
||||
"size": req.FileHeader.Size,
|
||||
"filename": req.FileHeader.Filename,
|
||||
"contentType": req.FileHeader.Header.Get("Content-Type"),
|
||||
}
|
||||
|
||||
rawResult, err := m.server.ExecFuncByName(ctx, "onBlobUpload", ctx, blobID, blobInfo, req.Metadata)
|
||||
if err != nil {
|
||||
if errors.Is(err, app.ErrFuncDoesNotExist) {
|
||||
res.Allow = false
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
result, ok := rawResult.Export().(map[string]interface{})
|
||||
if !ok {
|
||||
return nil, errors.Errorf(
|
||||
"unexpected onBlobUpload result: expected 'map[string]interface{}', got '%T'",
|
||||
rawResult.Export(),
|
||||
)
|
||||
}
|
||||
|
||||
var allow bool
|
||||
|
||||
rawAllow, exists := result["allow"]
|
||||
if !exists {
|
||||
allow = false
|
||||
} else {
|
||||
allow, ok = rawAllow.(bool)
|
||||
if !ok {
|
||||
return nil, errors.Errorf("invalid 'allow' result property: got type '%T', expected type '%T'", rawAllow, false)
|
||||
}
|
||||
}
|
||||
|
||||
res.Allow = allow
|
||||
|
||||
if res.Allow {
|
||||
bucket := DefaultBlobBucket
|
||||
|
||||
rawBucket, exists := result["bucket"]
|
||||
if exists {
|
||||
bucket, ok = rawBucket.(string)
|
||||
if !ok {
|
||||
return nil, errors.Errorf("invalid 'bucket' result property: got type '%T', expected type '%T'", bucket, "")
|
||||
}
|
||||
}
|
||||
|
||||
if err := m.saveBlob(ctx, bucket, blobID, *req.FileHeader); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
res.Bucket = bucket
|
||||
res.BlobID = blobID
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (m *BlobModule) saveBlob(ctx context.Context, bucketName string, blobID storage.BlobID, fileHeader multipart.FileHeader) error {
|
||||
file, err := fileHeader.Open()
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := file.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close file", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
bucket, err := m.store.OpenBucket(ctx, bucketName)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := bucket.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close bucket", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
writer, err := bucket.NewWriter(ctx, blobID)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := file.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close file", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
defer func() {
|
||||
if err := writer.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close writer", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
if _, err := io.Copy(writer, file); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *BlobModule) handleDownloadRequest(req *MessageDownloadRequest) (*MessageDownloadResponse, error) {
|
||||
res := NewMessageDownloadResponse(req.RequestID)
|
||||
|
||||
rawResult, err := m.server.ExecFuncByName(req.Context, "onBlobDownload", req.Context, req.Bucket, req.BlobID)
|
||||
if err != nil {
|
||||
if errors.Is(err, app.ErrFuncDoesNotExist) {
|
||||
res.Allow = false
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
result, ok := rawResult.Export().(map[string]interface{})
|
||||
if !ok {
|
||||
return nil, errors.Errorf(
|
||||
"unexpected onBlobDownload result: expected 'map[string]interface{}', got '%T'",
|
||||
rawResult.Export(),
|
||||
)
|
||||
}
|
||||
|
||||
var allow bool
|
||||
|
||||
rawAllow, exists := result["allow"]
|
||||
if !exists {
|
||||
allow = false
|
||||
} else {
|
||||
allow, ok = rawAllow.(bool)
|
||||
if !ok {
|
||||
return nil, errors.Errorf("invalid 'allow' result property: got type '%T', expected type '%T'", rawAllow, false)
|
||||
}
|
||||
}
|
||||
|
||||
res.Allow = allow
|
||||
|
||||
reader, info, err := m.openBlob(req.Context, req.Bucket, req.BlobID)
|
||||
if err != nil && !errors.Is(err, storage.ErrBlobNotFound) {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
if reader != nil {
|
||||
res.Blob = reader
|
||||
}
|
||||
|
||||
if info != nil {
|
||||
res.BlobInfo = info
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (m *BlobModule) openBlob(ctx context.Context, bucketName string, blobID storage.BlobID) (io.ReadSeekCloser, storage.BlobInfo, error) {
|
||||
bucket, err := m.store.OpenBucket(ctx, bucketName)
|
||||
if err != nil {
|
||||
return nil, nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := bucket.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close bucket", logger.E(errors.WithStack(err)), logger.F("bucket", bucket))
|
||||
}
|
||||
}()
|
||||
|
||||
info, err := bucket.Get(ctx, blobID)
|
||||
if err != nil {
|
||||
return nil, nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
reader, err := bucket.NewReader(ctx, blobID)
|
||||
if err != nil {
|
||||
return nil, nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return reader, info, nil
|
||||
}
|
||||
|
||||
func BlobModuleFactory(bus bus.Bus, store storage.BlobStore) app.ServerModuleFactory {
|
||||
return func(server *app.Server) app.ServerModule {
|
||||
mod := &BlobModule{
|
||||
store: store,
|
||||
bus: bus,
|
||||
server: server,
|
||||
}
|
||||
|
||||
go mod.handleMessages()
|
||||
|
||||
return mod
|
||||
}
|
||||
}
|
21
pkg/module/blob/blob_info.go
Normal file
21
pkg/module/blob/blob_info.go
Normal file
@ -0,0 +1,21 @@
|
||||
package blob
|
||||
|
||||
import "forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
|
||||
type blobInfo struct {
|
||||
ID storage.BlobID `goja:"id"`
|
||||
Bucket string `goja:"bucket"`
|
||||
ModTime int64 `goja:"modTime"`
|
||||
Size int64 `goja:"size"`
|
||||
ContentType string `goja:"contentType"`
|
||||
}
|
||||
|
||||
func toGojaBlobInfo(blob storage.BlobInfo) blobInfo {
|
||||
return blobInfo{
|
||||
ID: blob.ID(),
|
||||
Bucket: blob.Bucket(),
|
||||
ModTime: blob.ModTime().Unix(),
|
||||
Size: blob.Size(),
|
||||
ContentType: blob.ContentType(),
|
||||
}
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package module
|
||||
package blob
|
||||
|
||||
import (
|
||||
"context"
|
499
pkg/module/blob/module.go
Normal file
499
pkg/module/blob/module.go
Normal file
@ -0,0 +1,499 @@
|
||||
package blob
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"mime/multipart"
|
||||
"os"
|
||||
"sort"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/app"
|
||||
"forge.cadoles.com/arcad/edge/pkg/bus"
|
||||
"forge.cadoles.com/arcad/edge/pkg/module/util"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"github.com/dop251/goja"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
const (
|
||||
DefaultBlobBucket string = "default"
|
||||
)
|
||||
|
||||
type Module struct {
|
||||
server *app.Server
|
||||
bus bus.Bus
|
||||
store storage.BlobStore
|
||||
}
|
||||
|
||||
func (m *Module) Name() string {
|
||||
return "blob"
|
||||
}
|
||||
|
||||
func (m *Module) Export(export *goja.Object) {
|
||||
funcs := map[string]any{
|
||||
"listBuckets": m.listBuckets,
|
||||
"deleteBucket": m.deleteBucket,
|
||||
"getBucketSize": m.getBucketSize,
|
||||
"listBlobs": m.listBlobs,
|
||||
"getBlobInfo": m.getBlobInfo,
|
||||
"readBlob": m.readBlob,
|
||||
"writeBlob": m.writeBlob,
|
||||
"deleteBlob": m.deleteBlob,
|
||||
}
|
||||
|
||||
for name, fn := range funcs {
|
||||
if err := export.Set(name, fn); err != nil {
|
||||
panic(errors.Wrapf(err, "could not set '%s' function", name))
|
||||
}
|
||||
}
|
||||
|
||||
if err := export.Set("DEFAULT_BUCKET", DefaultBlobBucket); err != nil {
|
||||
panic(errors.Wrap(err, "could not set 'DEFAULT_BUCKET' property"))
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Module) listBuckets(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
|
||||
ctx := util.AssertContext(call.Argument(0), rt)
|
||||
|
||||
buckets, err := m.store.ListBuckets(ctx)
|
||||
if err != nil {
|
||||
panic(rt.ToValue(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
defaultBucketIndex := sort.SearchStrings(buckets, DefaultBlobBucket)
|
||||
if defaultBucketIndex == 0 {
|
||||
buckets = append(buckets, DefaultBlobBucket)
|
||||
} else {
|
||||
buckets[defaultBucketIndex] = DefaultBlobBucket
|
||||
}
|
||||
|
||||
return rt.ToValue(buckets)
|
||||
}
|
||||
|
||||
func (m *Module) writeBlob(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
|
||||
ctx := util.AssertContext(call.Argument(0), rt)
|
||||
bucketName := util.AssertString(call.Argument(1), rt)
|
||||
blobID := assertBlobID(call.Argument(2), rt)
|
||||
rawData := call.Argument(3).Export()
|
||||
|
||||
var data []byte
|
||||
switch typ := rawData.(type) {
|
||||
case []byte:
|
||||
data = typ
|
||||
case string:
|
||||
data = []byte(typ)
|
||||
default:
|
||||
data = []byte(fmt.Sprintf("%v", typ))
|
||||
}
|
||||
|
||||
bucket, err := m.store.OpenBucket(ctx, bucketName)
|
||||
if err != nil {
|
||||
panic(rt.ToValue(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := bucket.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close bucket", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
writer, err := bucket.NewWriter(ctx, blobID)
|
||||
if err != nil {
|
||||
panic(rt.ToValue(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := writer.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
|
||||
logger.Error(ctx, "could not close blob writer", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
if _, err := writer.Write(data); err != nil {
|
||||
panic(rt.ToValue(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Module) getBlobInfo(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
|
||||
ctx := util.AssertContext(call.Argument(0), rt)
|
||||
bucketName := util.AssertString(call.Argument(1), rt)
|
||||
blobID := assertBlobID(call.Argument(2), rt)
|
||||
|
||||
bucket, err := m.store.OpenBucket(ctx, bucketName)
|
||||
if err != nil {
|
||||
panic(rt.ToValue(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := bucket.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close bucket", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
blobInfo, err := bucket.Get(ctx, blobID)
|
||||
if err != nil {
|
||||
panic(rt.ToValue(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
return rt.ToValue(toGojaBlobInfo(blobInfo))
|
||||
}
|
||||
|
||||
func (m *Module) readBlob(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
|
||||
ctx := util.AssertContext(call.Argument(0), rt)
|
||||
bucketName := util.AssertString(call.Argument(1), rt)
|
||||
blobID := assertBlobID(call.Argument(2), rt)
|
||||
|
||||
reader, _, err := m.openBlob(ctx, bucketName, blobID)
|
||||
if err != nil {
|
||||
panic(rt.ToValue(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := reader.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
|
||||
logger.Error(ctx, "could not close blob reader", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
data, err := io.ReadAll(reader)
|
||||
if err != nil {
|
||||
panic(rt.ToValue(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
return rt.ToValue(rt.NewArrayBuffer(data))
|
||||
}
|
||||
|
||||
func (m *Module) deleteBlob(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
|
||||
ctx := util.AssertContext(call.Argument(0), rt)
|
||||
bucketName := util.AssertString(call.Argument(1), rt)
|
||||
blobID := assertBlobID(call.Argument(2), rt)
|
||||
|
||||
bucket, err := m.store.OpenBucket(ctx, bucketName)
|
||||
if err != nil {
|
||||
panic(rt.ToValue(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
if err := bucket.Delete(ctx, blobID); err != nil {
|
||||
panic(rt.ToValue(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Module) listBlobs(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
|
||||
ctx := util.AssertContext(call.Argument(0), rt)
|
||||
bucketName := util.AssertString(call.Argument(1), rt)
|
||||
|
||||
bucket, err := m.store.OpenBucket(ctx, bucketName)
|
||||
if err != nil {
|
||||
panic(rt.ToValue(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
blobInfos, err := bucket.List(ctx)
|
||||
if err != nil {
|
||||
panic(rt.ToValue(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
gojaBlobInfos := make([]blobInfo, len(blobInfos))
|
||||
|
||||
for i, b := range blobInfos {
|
||||
gojaBlobInfos[i] = toGojaBlobInfo(b)
|
||||
}
|
||||
|
||||
return rt.ToValue(gojaBlobInfos)
|
||||
}
|
||||
|
||||
func (m *Module) deleteBucket(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
|
||||
ctx := util.AssertContext(call.Argument(0), rt)
|
||||
bucketName := util.AssertString(call.Argument(1), rt)
|
||||
|
||||
if err := m.store.DeleteBucket(ctx, bucketName); err != nil {
|
||||
panic(rt.ToValue(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Module) getBucketSize(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
|
||||
ctx := util.AssertContext(call.Argument(0), rt)
|
||||
bucketName := util.AssertString(call.Argument(1), rt)
|
||||
|
||||
bucket, err := m.store.OpenBucket(ctx, bucketName)
|
||||
if err != nil {
|
||||
panic(rt.ToValue(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
size, err := bucket.Size(ctx)
|
||||
if err != nil {
|
||||
panic(rt.ToValue(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
return rt.ToValue(size)
|
||||
}
|
||||
|
||||
func (m *Module) handleMessages() {
|
||||
ctx := context.Background()
|
||||
|
||||
go func() {
|
||||
err := m.bus.Reply(ctx, MessageNamespaceUploadRequest, func(msg bus.Message) (bus.Message, error) {
|
||||
uploadRequest, ok := msg.(*MessageUploadRequest)
|
||||
if !ok {
|
||||
return nil, errors.Wrapf(bus.ErrUnexpectedMessage, "expected message upload request, got '%T'", msg)
|
||||
}
|
||||
|
||||
res, err := m.handleUploadRequest(uploadRequest)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "could not handle upload request", logger.E(errors.WithStack(err)))
|
||||
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
logger.Debug(ctx, "upload request response", logger.F("response", res))
|
||||
|
||||
return res, nil
|
||||
})
|
||||
if err != nil {
|
||||
panic(errors.WithStack(err))
|
||||
}
|
||||
}()
|
||||
|
||||
err := m.bus.Reply(ctx, MessageNamespaceDownloadRequest, func(msg bus.Message) (bus.Message, error) {
|
||||
downloadRequest, ok := msg.(*MessageDownloadRequest)
|
||||
if !ok {
|
||||
return nil, errors.Wrapf(bus.ErrUnexpectedMessage, "expected message download request, got '%T'", msg)
|
||||
}
|
||||
|
||||
res, err := m.handleDownloadRequest(downloadRequest)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "could not handle download request", logger.E(errors.WithStack(err)))
|
||||
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return res, nil
|
||||
})
|
||||
if err != nil {
|
||||
panic(errors.WithStack(err))
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Module) handleUploadRequest(req *MessageUploadRequest) (*MessageUploadResponse, error) {
|
||||
blobID := storage.NewBlobID()
|
||||
res := NewMessageUploadResponse(req.RequestID)
|
||||
|
||||
ctx := logger.With(req.Context, logger.F("blobID", blobID))
|
||||
|
||||
blobInfo := map[string]interface{}{
|
||||
"size": req.FileHeader.Size,
|
||||
"filename": req.FileHeader.Filename,
|
||||
"contentType": req.FileHeader.Header.Get("Content-Type"),
|
||||
}
|
||||
|
||||
rawResult, err := m.server.ExecFuncByName(ctx, "onBlobUpload", ctx, blobID, blobInfo, req.Metadata)
|
||||
if err != nil {
|
||||
if errors.Is(err, app.ErrFuncDoesNotExist) {
|
||||
res.Allow = false
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
result, ok := rawResult.Export().(map[string]interface{})
|
||||
if !ok {
|
||||
return nil, errors.Errorf(
|
||||
"unexpected onBlobUpload result: expected 'map[string]interface{}', got '%T'",
|
||||
rawResult.Export(),
|
||||
)
|
||||
}
|
||||
|
||||
var allow bool
|
||||
|
||||
rawAllow, exists := result["allow"]
|
||||
if !exists {
|
||||
allow = false
|
||||
} else {
|
||||
allow, ok = rawAllow.(bool)
|
||||
if !ok {
|
||||
return nil, errors.Errorf("invalid 'allow' result property: got type '%T', expected type '%T'", rawAllow, false)
|
||||
}
|
||||
}
|
||||
|
||||
res.Allow = allow
|
||||
|
||||
if res.Allow {
|
||||
bucket := DefaultBlobBucket
|
||||
|
||||
rawBucket, exists := result["bucket"]
|
||||
if exists {
|
||||
bucket, ok = rawBucket.(string)
|
||||
if !ok {
|
||||
return nil, errors.Errorf("invalid 'bucket' result property: got type '%T', expected type '%T'", bucket, "")
|
||||
}
|
||||
}
|
||||
|
||||
if err := m.saveBlob(ctx, bucket, blobID, *req.FileHeader); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
res.Bucket = bucket
|
||||
res.BlobID = blobID
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (m *Module) saveBlob(ctx context.Context, bucketName string, blobID storage.BlobID, fileHeader multipart.FileHeader) error {
|
||||
file, err := fileHeader.Open()
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := file.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close file", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
bucket, err := m.store.OpenBucket(ctx, bucketName)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := bucket.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close bucket", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
writer, err := bucket.NewWriter(ctx, blobID)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := file.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close file", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
defer func() {
|
||||
if err := writer.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close writer", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
if _, err := io.Copy(writer, file); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Module) handleDownloadRequest(req *MessageDownloadRequest) (*MessageDownloadResponse, error) {
|
||||
res := NewMessageDownloadResponse(req.RequestID)
|
||||
|
||||
rawResult, err := m.server.ExecFuncByName(req.Context, "onBlobDownload", req.Context, req.Bucket, req.BlobID)
|
||||
if err != nil {
|
||||
if errors.Is(err, app.ErrFuncDoesNotExist) {
|
||||
res.Allow = false
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
result, ok := rawResult.Export().(map[string]interface{})
|
||||
if !ok {
|
||||
return nil, errors.Errorf(
|
||||
"unexpected onBlobDownload result: expected 'map[string]interface{}', got '%T'",
|
||||
rawResult.Export(),
|
||||
)
|
||||
}
|
||||
|
||||
var allow bool
|
||||
|
||||
rawAllow, exists := result["allow"]
|
||||
if !exists {
|
||||
allow = false
|
||||
} else {
|
||||
allow, ok = rawAllow.(bool)
|
||||
if !ok {
|
||||
return nil, errors.Errorf("invalid 'allow' result property: got type '%T', expected type '%T'", rawAllow, false)
|
||||
}
|
||||
}
|
||||
|
||||
res.Allow = allow
|
||||
|
||||
reader, info, err := m.openBlob(req.Context, req.Bucket, req.BlobID)
|
||||
if err != nil && !errors.Is(err, storage.ErrBlobNotFound) {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
if reader != nil {
|
||||
res.Blob = reader
|
||||
}
|
||||
|
||||
if info != nil {
|
||||
res.BlobInfo = info
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (m *Module) openBlob(ctx context.Context, bucketName string, blobID storage.BlobID) (io.ReadSeekCloser, storage.BlobInfo, error) {
|
||||
bucket, err := m.store.OpenBucket(ctx, bucketName)
|
||||
if err != nil {
|
||||
return nil, nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := bucket.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close bucket", logger.E(errors.WithStack(err)), logger.F("bucket", bucket))
|
||||
}
|
||||
}()
|
||||
|
||||
info, err := bucket.Get(ctx, blobID)
|
||||
if err != nil {
|
||||
return nil, nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
reader, err := bucket.NewReader(ctx, blobID)
|
||||
if err != nil {
|
||||
return nil, nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return reader, info, nil
|
||||
}
|
||||
|
||||
func ModuleFactory(bus bus.Bus, store storage.BlobStore) app.ServerModuleFactory {
|
||||
return func(server *app.Server) app.ServerModule {
|
||||
mod := &Module{
|
||||
store: store,
|
||||
bus: bus,
|
||||
server: server,
|
||||
}
|
||||
|
||||
go mod.handleMessages()
|
||||
|
||||
return mod
|
||||
}
|
||||
}
|
||||
|
||||
func assertBlobID(value goja.Value, rt *goja.Runtime) storage.BlobID {
|
||||
blobID, ok := value.Export().(storage.BlobID)
|
||||
if !ok {
|
||||
rawBlobID, ok := value.Export().(string)
|
||||
if !ok {
|
||||
panic(rt.NewTypeError(fmt.Sprintf("blob id must be a blob or a string, got '%T'", value.Export())))
|
||||
}
|
||||
|
||||
blobID = storage.BlobID(rawBlobID)
|
||||
}
|
||||
|
||||
return blobID
|
||||
}
|
44
pkg/module/blob/module_test.go
Normal file
44
pkg/module/blob/module_test.go
Normal file
@ -0,0 +1,44 @@
|
||||
package blob
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"testing"
|
||||
|
||||
"cdr.dev/slog"
|
||||
"forge.cadoles.com/arcad/edge/pkg/app"
|
||||
"forge.cadoles.com/arcad/edge/pkg/bus/memory"
|
||||
"forge.cadoles.com/arcad/edge/pkg/module"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/sqlite"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
func TestBlobModule(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
logger.SetLevel(slog.LevelDebug)
|
||||
|
||||
bus := memory.NewBus()
|
||||
store := sqlite.NewBlobStore(":memory:")
|
||||
|
||||
server := app.NewServer(
|
||||
module.ContextModuleFactory(),
|
||||
module.ConsoleModuleFactory(),
|
||||
ModuleFactory(bus, store),
|
||||
)
|
||||
|
||||
data, err := ioutil.ReadFile("testdata/blob.js")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := server.Load("testdata/blob.js", string(data)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
defer server.Stop()
|
||||
|
||||
if err := server.Start(); err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
}
|
79
pkg/module/blob/testdata/blob.js
vendored
Normal file
79
pkg/module/blob/testdata/blob.js
vendored
Normal file
@ -0,0 +1,79 @@
|
||||
|
||||
var ctx = context.new();
|
||||
var buckets = blob.listBuckets(ctx);
|
||||
|
||||
if (!buckets || buckets.length === 0) {
|
||||
throw new Error("buckets should not be empty");
|
||||
}
|
||||
|
||||
var size = blob.getBucketSize(ctx, blob.DEFAULT_BUCKET);
|
||||
|
||||
if (size !== 0) {
|
||||
throw new Error("bucket size: expected '0', got '"+size+"'");
|
||||
}
|
||||
|
||||
var newBucket = "mybucket"
|
||||
var blobId = "foo"
|
||||
var data = (new Date()).toString();
|
||||
|
||||
blob.writeBlob(ctx, newBucket, blobId, data)
|
||||
|
||||
buckets = blob.listBuckets(ctx);
|
||||
|
||||
if (buckets.length !== 2) {
|
||||
throw new Error("buckets.length: expected '2', got '"+buckets.length+"'");
|
||||
}
|
||||
|
||||
size = blob.getBucketSize(ctx, newBucket);
|
||||
|
||||
if (size !== data.length) {
|
||||
throw new Error("bucket size: expected '"+data.length+"', got '"+size+"'");
|
||||
}
|
||||
|
||||
var blobInfos = blob.listBlobs(ctx, newBucket);
|
||||
|
||||
if (blobInfos.length !== 1) {
|
||||
throw new Error("blobInfos.length: expected '1', got '"+blobInfos.length+"'");
|
||||
}
|
||||
|
||||
if (blobInfos[0].id != blobId) {
|
||||
throw new Error("blobInfos[0].id: expected '"+blobId+"', got '"+blobInfos[0].id+"'");
|
||||
}
|
||||
|
||||
if (blobInfos[0].contentType != "text/plain; charset=utf-8") {
|
||||
throw new Error("blobInfos[0].contentType: expected 'text/plain; charset=utf-8', got '"+blobInfos[0].contentType+"'");
|
||||
}
|
||||
|
||||
if (blobInfos[0].size != data.length) {
|
||||
throw new Error("blobInfos[0].size: expected '"+data.length+"', got '"+blobInfos[0].size+"'");
|
||||
}
|
||||
|
||||
var readData = blob.readBlob(ctx, newBucket, blobId)
|
||||
|
||||
if (!readData) {
|
||||
throw new Error("readData should not be nil");
|
||||
}
|
||||
|
||||
var buckets = blob.listBuckets(ctx);
|
||||
|
||||
if (!buckets || buckets.length !== 2) {
|
||||
throw new Error("buckets.length should be 2");
|
||||
}
|
||||
|
||||
blob.deleteBlob(ctx, newBucket, blobId)
|
||||
|
||||
blobInfos = blob.listBlobs(ctx, newBucket);
|
||||
|
||||
console.log(blobInfos);
|
||||
|
||||
if (blobInfos.length !== 0) {
|
||||
throw new Error("blobInfos.length: expected '0', got '"+blobInfos.length+"'");
|
||||
}
|
||||
|
||||
blob.deleteBucket(ctx, newBucket)
|
||||
|
||||
buckets = blob.listBuckets(ctx);
|
||||
|
||||
if (buckets.length !== 1) {
|
||||
throw new Error("buckets.length: expected '1', got '"+buckets.length+"'");
|
||||
}
|
49
pkg/module/fetch/fetch_message.go
Normal file
49
pkg/module/fetch/fetch_message.go
Normal file
@ -0,0 +1,49 @@
|
||||
package fetch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/url"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/bus"
|
||||
"github.com/oklog/ulid/v2"
|
||||
)
|
||||
|
||||
const (
|
||||
MessageNamespaceFetchRequest bus.MessageNamespace = "fetchRequest"
|
||||
MessageNamespaceFetchResponse bus.MessageNamespace = "fetchResponse"
|
||||
)
|
||||
|
||||
type MessageFetchRequest struct {
|
||||
Context context.Context
|
||||
RequestID string
|
||||
URL *url.URL
|
||||
RemoteAddr string
|
||||
}
|
||||
|
||||
func (m *MessageFetchRequest) MessageNamespace() bus.MessageNamespace {
|
||||
return MessageNamespaceFetchRequest
|
||||
}
|
||||
|
||||
func NewMessageFetchRequest(ctx context.Context, remoteAddr string, url *url.URL) *MessageFetchRequest {
|
||||
return &MessageFetchRequest{
|
||||
Context: ctx,
|
||||
RequestID: ulid.Make().String(),
|
||||
RemoteAddr: remoteAddr,
|
||||
URL: url,
|
||||
}
|
||||
}
|
||||
|
||||
type MessageFetchResponse struct {
|
||||
RequestID string
|
||||
Allow bool
|
||||
}
|
||||
|
||||
func (m *MessageFetchResponse) MessageNamespace() bus.MessageNamespace {
|
||||
return MessageNamespaceFetchResponse
|
||||
}
|
||||
|
||||
func NewMessageFetchResponse(requestID string) *MessageFetchResponse {
|
||||
return &MessageFetchResponse{
|
||||
RequestID: requestID,
|
||||
}
|
||||
}
|
122
pkg/module/fetch/module.go
Normal file
122
pkg/module/fetch/module.go
Normal file
@ -0,0 +1,122 @@
|
||||
package fetch
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/app"
|
||||
"forge.cadoles.com/arcad/edge/pkg/bus"
|
||||
"github.com/dop251/goja"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
type Module struct {
|
||||
server *app.Server
|
||||
bus bus.Bus
|
||||
}
|
||||
|
||||
func (m *Module) Name() string {
|
||||
return "fetch"
|
||||
}
|
||||
|
||||
func (m *Module) Export(export *goja.Object) {
|
||||
funcs := map[string]any{
|
||||
"get": m.get,
|
||||
}
|
||||
|
||||
for name, fn := range funcs {
|
||||
if err := export.Set(name, fn); err != nil {
|
||||
panic(errors.Wrapf(err, "could not set '%s' function", name))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Module) get(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
|
||||
// ctx := util.AssertContext(call.Argument(0), rt)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Module) handleMessages() {
|
||||
ctx := context.Background()
|
||||
|
||||
err := m.bus.Reply(ctx, MessageNamespaceFetchRequest, func(msg bus.Message) (bus.Message, error) {
|
||||
fetchRequest, ok := msg.(*MessageFetchRequest)
|
||||
if !ok {
|
||||
return nil, errors.Wrapf(bus.ErrUnexpectedMessage, "expected message fetch request, got '%T'", msg)
|
||||
}
|
||||
|
||||
res, err := m.handleFetchRequest(fetchRequest)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "could not handle fetch request", logger.E(errors.WithStack(err)))
|
||||
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
logger.Debug(ctx, "fetch request response", logger.F("response", res))
|
||||
|
||||
return res, nil
|
||||
})
|
||||
if err != nil {
|
||||
panic(errors.WithStack(err))
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Module) handleFetchRequest(req *MessageFetchRequest) (*MessageFetchResponse, error) {
|
||||
res := NewMessageFetchResponse(req.RequestID)
|
||||
|
||||
ctx := logger.With(
|
||||
req.Context,
|
||||
logger.F("url", req.URL.String()),
|
||||
logger.F("remoteAddr", req.RemoteAddr),
|
||||
logger.F("requestID", req.RequestID),
|
||||
)
|
||||
|
||||
rawResult, err := m.server.ExecFuncByName(ctx, "onClientFetch", ctx, req.URL.String(), req.RemoteAddr)
|
||||
if err != nil {
|
||||
if errors.Is(err, app.ErrFuncDoesNotExist) {
|
||||
res.Allow = false
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
result, ok := rawResult.Export().(map[string]interface{})
|
||||
if !ok {
|
||||
return nil, errors.Errorf(
|
||||
"unexpected onClientFetch result: expected 'map[string]interface{}', got '%T'",
|
||||
rawResult.Export(),
|
||||
)
|
||||
}
|
||||
|
||||
var allow bool
|
||||
|
||||
rawAllow, exists := result["allow"]
|
||||
if !exists {
|
||||
allow = false
|
||||
} else {
|
||||
allow, ok = rawAllow.(bool)
|
||||
if !ok {
|
||||
return nil, errors.Errorf("invalid 'allow' result property: got type '%T', expected type '%T'", rawAllow, false)
|
||||
}
|
||||
}
|
||||
|
||||
res.Allow = allow
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func ModuleFactory(bus bus.Bus) app.ServerModuleFactory {
|
||||
return func(server *app.Server) app.ServerModule {
|
||||
mod := &Module{
|
||||
bus: bus,
|
||||
server: server,
|
||||
}
|
||||
|
||||
go mod.handleMessages()
|
||||
|
||||
return mod
|
||||
}
|
||||
}
|
78
pkg/module/fetch/module_test.go
Normal file
78
pkg/module/fetch/module_test.go
Normal file
@ -0,0 +1,78 @@
|
||||
package fetch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io/ioutil"
|
||||
"net/url"
|
||||
"testing"
|
||||
|
||||
"cdr.dev/slog"
|
||||
"forge.cadoles.com/arcad/edge/pkg/app"
|
||||
"forge.cadoles.com/arcad/edge/pkg/bus/memory"
|
||||
"forge.cadoles.com/arcad/edge/pkg/module"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
func TestFetchModule(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
logger.SetLevel(slog.LevelDebug)
|
||||
|
||||
bus := memory.NewBus()
|
||||
|
||||
server := app.NewServer(
|
||||
module.ContextModuleFactory(),
|
||||
module.ConsoleModuleFactory(),
|
||||
ModuleFactory(bus),
|
||||
)
|
||||
|
||||
data, err := ioutil.ReadFile("testdata/fetch.js")
|
||||
if err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
if err := server.Load("testdata/fetch.js", string(data)); err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
defer server.Stop()
|
||||
|
||||
if err := server.Start(); err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
remoteAddr := "127.0.0.1"
|
||||
url, _ := url.Parse("http://example.com")
|
||||
|
||||
rawReply, err := bus.Request(ctx, NewMessageFetchRequest(ctx, remoteAddr, url))
|
||||
if err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
reply, ok := rawReply.(*MessageFetchResponse)
|
||||
if !ok {
|
||||
t.Fatalf("unexpected reply type '%T'", rawReply)
|
||||
}
|
||||
|
||||
if e, g := true, reply.Allow; e != g {
|
||||
t.Errorf("reply.Allow: expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
url, _ = url.Parse("https://google.com")
|
||||
|
||||
rawReply, err = bus.Request(ctx, NewMessageFetchRequest(ctx, remoteAddr, url))
|
||||
if err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
reply, ok = rawReply.(*MessageFetchResponse)
|
||||
if !ok {
|
||||
t.Fatalf("unexpected reply type '%T'", rawReply)
|
||||
}
|
||||
|
||||
if e, g := false, reply.Allow; e != g {
|
||||
t.Errorf("reply.Allow: expected '%v', got '%v'", e, g)
|
||||
}
|
||||
}
|
7
pkg/module/fetch/testdata/fetch.js
vendored
Normal file
7
pkg/module/fetch/testdata/fetch.js
vendored
Normal file
@ -0,0 +1,7 @@
|
||||
|
||||
var ctx = context.new();
|
||||
|
||||
function onClientFetch(ctx, url, remoteAddr) {
|
||||
if (url === 'http://example.com') return { allow: true };
|
||||
return { allow: false };
|
||||
}
|
@ -12,7 +12,7 @@ func AssertType[T any](v goja.Value, rt *goja.Runtime) T {
|
||||
return c
|
||||
}
|
||||
|
||||
panic(rt.ToValue(errors.Errorf("expected value to be a '%T', got '%T'", *new(T), v.Export())))
|
||||
panic(rt.ToValue(errors.Errorf("expected value to be a '%T', got '%T'", new(T), v.Export())))
|
||||
}
|
||||
|
||||
func AssertContext(v goja.Value, r *goja.Runtime) context.Context {
|
||||
|
3
pkg/sdk/client/dist/client.js
vendored
3
pkg/sdk/client/dist/client.js
vendored
@ -4084,6 +4084,9 @@ var Edge = (() => {
|
||||
blobUrl(bucket, blobId) {
|
||||
return `/edge/api/v1/download/${bucket}/${blobId}`;
|
||||
}
|
||||
externalUrl(url) {
|
||||
return `/edge/api/v1/fetch?url=${encodeURIComponent(url)}`;
|
||||
}
|
||||
};
|
||||
|
||||
// pkg/sdk/client/src/index.ts
|
||||
|
4
pkg/sdk/client/dist/client.js.map
vendored
4
pkg/sdk/client/dist/client.js.map
vendored
File diff suppressed because one or more lines are too long
@ -264,7 +264,11 @@ export class Client extends EventTarget {
|
||||
});
|
||||
}
|
||||
|
||||
blobUrl(bucket: string, blobId: string) {
|
||||
blobUrl(bucket: string, blobId: string): string {
|
||||
return `/edge/api/v1/download/${bucket}/${blobId}`;
|
||||
}
|
||||
|
||||
externalUrl(url: string): string {
|
||||
return `/edge/api/v1/fetch?url=${encodeURIComponent(url)}`
|
||||
}
|
||||
}
|
@ -68,8 +68,11 @@ func (b *BlobBucket) Close() error {
|
||||
func (b *BlobBucket) Delete(ctx context.Context, id storage.BlobID) error {
|
||||
err := b.withTx(ctx, func(tx *sql.Tx) error {
|
||||
query := `DELETE FROM blobs WHERE bucket = $1 AND id = $2`
|
||||
args := []any{b.name, id}
|
||||
|
||||
if _, err := tx.ExecContext(ctx, query, b.name, id); err != nil {
|
||||
logger.Debug(ctx, "executing query", logger.F("query", query), logger.F("args", args))
|
||||
|
||||
if _, err := tx.ExecContext(ctx, query, args...); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
@ -88,7 +91,11 @@ func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobIn
|
||||
|
||||
err := b.withTx(ctx, func(tx *sql.Tx) error {
|
||||
query := `SELECT content_type, mod_time, size FROM blobs WHERE bucket = $1 AND id = $2`
|
||||
row := tx.QueryRowContext(ctx, query, b.name, id)
|
||||
args := []any{b.name, id}
|
||||
|
||||
logger.Debug(ctx, "executing query", logger.F("query", query), logger.F("args", args))
|
||||
|
||||
row := tx.QueryRowContext(ctx, query, args...)
|
||||
|
||||
var (
|
||||
contentType string
|
||||
@ -127,8 +134,11 @@ func (b *BlobBucket) List(ctx context.Context) ([]storage.BlobInfo, error) {
|
||||
|
||||
err := b.withTx(ctx, func(tx *sql.Tx) error {
|
||||
query := `SELECT id, content_type, mod_time, size FROM blobs WHERE bucket = $1`
|
||||
args := []any{b.name}
|
||||
|
||||
rows, err := tx.QueryContext(ctx, query, b.name)
|
||||
logger.Debug(ctx, "executing query", logger.F("query", query), logger.F("args", args))
|
||||
|
||||
rows, err := tx.QueryContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
@ -229,7 +239,12 @@ type blobWriterCloser struct {
|
||||
|
||||
// Write implements io.WriteCloser
|
||||
func (wbc *blobWriterCloser) Write(p []byte) (int, error) {
|
||||
logger.Debug(context.Background(), "writing data to blob", logger.F("data", p))
|
||||
logger.Debug(
|
||||
context.Background(), "writing data to blob",
|
||||
logger.F("size", len(p)),
|
||||
logger.F("blobID", wbc.id),
|
||||
logger.F("bucket", wbc.bucket),
|
||||
)
|
||||
|
||||
n, err := wbc.buf.Write(p)
|
||||
if err != nil {
|
||||
@ -266,14 +281,20 @@ func (wbc *blobWriterCloser) Close() error {
|
||||
mime := mimetype.Detect(data)
|
||||
modTime := time.Now().UTC()
|
||||
|
||||
_, err := tx.Exec(
|
||||
query,
|
||||
args := []any{
|
||||
wbc.bucket,
|
||||
wbc.id,
|
||||
data,
|
||||
mime.String(),
|
||||
modTime,
|
||||
len(data),
|
||||
}
|
||||
|
||||
logger.Debug(ctx, "executing query", logger.F("query", query))
|
||||
|
||||
_, err := tx.Exec(
|
||||
query,
|
||||
args...,
|
||||
)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
|
@ -36,7 +36,7 @@ func (s *BlobStore) ListBuckets(ctx context.Context) ([]string, error) {
|
||||
buckets := make([]string, 0)
|
||||
|
||||
err := s.withTx(ctx, func(tx *sql.Tx) error {
|
||||
query := `SELECT DISTINCT name FROM blobs`
|
||||
query := `SELECT DISTINCT bucket FROM blobs`
|
||||
rows, err := tx.QueryContext(ctx, query)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
|
Reference in New Issue
Block a user