Compare commits

..

1 Commits

Author SHA1 Message Date
wpetit 05a9861e6f feat(storage): rpc based implementation
arcad/edge/pipeline/head There was a failure building this commit Details
arcad/edge/pipeline/pr-master There was a failure building this commit Details
2023-09-12 22:03:25 -06:00
37 changed files with 125 additions and 1129 deletions

View File

@ -22,7 +22,6 @@ import (
appModuleMemory "forge.cadoles.com/arcad/edge/pkg/module/app/memory" appModuleMemory "forge.cadoles.com/arcad/edge/pkg/module/app/memory"
authModule "forge.cadoles.com/arcad/edge/pkg/module/auth" authModule "forge.cadoles.com/arcad/edge/pkg/module/auth"
authHTTP "forge.cadoles.com/arcad/edge/pkg/module/auth/http" authHTTP "forge.cadoles.com/arcad/edge/pkg/module/auth/http"
authModuleMiddleware "forge.cadoles.com/arcad/edge/pkg/module/auth/middleware"
"forge.cadoles.com/arcad/edge/pkg/module/blob" "forge.cadoles.com/arcad/edge/pkg/module/blob"
"forge.cadoles.com/arcad/edge/pkg/module/cast" "forge.cadoles.com/arcad/edge/pkg/module/cast"
"forge.cadoles.com/arcad/edge/pkg/module/fetch" "forge.cadoles.com/arcad/edge/pkg/module/fetch"
@ -216,11 +215,6 @@ func runApp(ctx context.Context, path string, address string, storageFile string
authModule.WithJWT(dummyKeySet), authModule.WithJWT(dummyKeySet),
), ),
), ),
appHTTP.WithHTTPMiddlewares(
authModuleMiddleware.AnonymousUser(
jwa.HS256, key,
),
),
) )
if err := handler.Load(bundle); err != nil { if err := handler.Load(bundle); err != nil {
return errors.Wrap(err, "could not load app bundle") return errors.Wrap(err, "could not load app bundle")

View File

@ -2,14 +2,6 @@
Ce module permet de récupérer des informations concernant l'utilisateur connecté et ses attributs. Ce module permet de récupérer des informations concernant l'utilisateur connecté et ses attributs.
### Utilisateurs anonymes
Edge génère automatiquement une session pour les utilisateurs anonymes. Ainsi, qu'un utilisateur soit identifié ou non les `claims` suivants seront toujours valués:
- `auth.CLAIM_SUBJECT`
- `auth.CLAIM_PREFERRED_USERNAME`
- `auth.CLAIM_ISSUER` (prendra la valeur `anon` dans le cas d'un utilisateur anonyme)
## Méthodes ## Méthodes
### `auth.getClaim(ctx: Context, name: string): string` ### `auth.getClaim(ctx: Context, name: string): string`

View File

@ -43,6 +43,12 @@ function onClientMessage(ctx, message) {
} }
``` ```
## Propriétés
### `context.SESSION_ID`
Clé permettant de récupérer la clé de session associé au client émetteur du message courant.
#### Usage #### Usage
```js ```js

View File

@ -97,10 +97,6 @@ func NewHandler(funcs ...HandlerOptionFunc) *Handler {
bus: opts.Bus, bus: opts.Bus,
} }
for _, middleware := range opts.HTTPMiddlewares {
router.Use(middleware)
}
router.Route("/edge", func(r chi.Router) { router.Route("/edge", func(r chi.Router) {
r.Route("/sdk", func(r chi.Router) { r.Route("/sdk", func(r chi.Router) {
r.Get("/client.js", handler.handleSDKClient) r.Get("/client.js", handler.handleSDKClient)

View File

@ -18,7 +18,6 @@ type HandlerOptions struct {
UploadMaxFileSize int64 UploadMaxFileSize int64
HTTPClient *http.Client HTTPClient *http.Client
HTTPMounts []func(r chi.Router) HTTPMounts []func(r chi.Router)
HTTPMiddlewares []func(next http.Handler) http.Handler
} }
func defaultHandlerOptions() *HandlerOptions { func defaultHandlerOptions() *HandlerOptions {
@ -36,7 +35,6 @@ func defaultHandlerOptions() *HandlerOptions {
Timeout: time.Second * 30, Timeout: time.Second * 30,
}, },
HTTPMounts: make([]func(r chi.Router), 0), HTTPMounts: make([]func(r chi.Router), 0),
HTTPMiddlewares: make([]func(http.Handler) http.Handler, 0),
} }
} }
@ -77,9 +75,3 @@ func WithHTTPMounts(mounts ...func(r chi.Router)) HandlerOptionFunc {
opts.HTTPMounts = mounts opts.HTTPMounts = mounts
} }
} }
func WithHTTPMiddlewares(middlewares ...func(http.Handler) http.Handler) HandlerOptionFunc {
return func(opts *HandlerOptions) {
opts.HTTPMiddlewares = middlewares
}
}

View File

@ -1,4 +1,4 @@
package jwt package http
import ( import (
"time" "time"
@ -9,7 +9,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
) )
func GenerateSignedToken(algo jwa.KeyAlgorithm, key jwk.Key, claims map[string]any) ([]byte, error) { func generateSignedToken(algo jwa.KeyAlgorithm, key jwk.Key, claims map[string]any) ([]byte, error) {
token := jwt.New() token := jwt.New()
if err := token.Set(jwt.NotBeforeKey, time.Now()); err != nil { if err := token.Set(jwt.NotBeforeKey, time.Now()); err != nil {

View File

@ -9,7 +9,6 @@ import (
"forge.cadoles.com/arcad/edge/pkg/module/auth" "forge.cadoles.com/arcad/edge/pkg/module/auth"
"forge.cadoles.com/arcad/edge/pkg/module/auth/http/passwd" "forge.cadoles.com/arcad/edge/pkg/module/auth/http/passwd"
"forge.cadoles.com/arcad/edge/pkg/module/auth/jwt"
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
"github.com/lestrrat-go/jwx/v2/jwa" "github.com/lestrrat-go/jwx/v2/jwa"
"github.com/lestrrat-go/jwx/v2/jwk" "github.com/lestrrat-go/jwx/v2/jwk"
@ -113,7 +112,7 @@ func (h *LocalHandler) handleForm(w http.ResponseWriter, r *http.Request) {
account.Claims[auth.ClaimIssuer] = "local" account.Claims[auth.ClaimIssuer] = "local"
token, err := jwt.GenerateSignedToken(h.algo, h.key, account.Claims) token, err := generateSignedToken(h.algo, h.key, account.Claims)
if err != nil { if err != nil {
logger.Error(ctx, "could not generate signed token", logger.E(errors.WithStack(err))) logger.Error(ctx, "could not generate signed token", logger.E(errors.WithStack(err)))
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)

View File

@ -30,7 +30,7 @@ func WithJWT(getKeySet GetKeySetFunc) OptionFunc {
} }
} }
func FindRawToken(r *http.Request) (string, error) { func FindToken(r *http.Request, getKeySet GetKeySetFunc) (jwt.Token, error) {
authorization := r.Header.Get("Authorization") authorization := r.Header.Get("Authorization")
// Retrieve token from Authorization header // Retrieve token from Authorization header
@ -44,7 +44,7 @@ func FindRawToken(r *http.Request) (string, error) {
if rawToken == "" { if rawToken == "" {
cookie, err := r.Cookie(CookieName) cookie, err := r.Cookie(CookieName)
if err != nil && !errors.Is(err, http.ErrNoCookie) { if err != nil && !errors.Is(err, http.ErrNoCookie) {
return "", errors.WithStack(err) return nil, errors.WithStack(err)
} }
if cookie != nil { if cookie != nil {
@ -53,16 +53,7 @@ func FindRawToken(r *http.Request) (string, error) {
} }
if rawToken == "" { if rawToken == "" {
return "", errors.WithStack(ErrUnauthenticated) return nil, errors.WithStack(ErrUnauthenticated)
}
return rawToken, nil
}
func FindToken(r *http.Request, getKeySet GetKeySetFunc) (jwt.Token, error) {
rawToken, err := FindRawToken(r)
if err != nil {
return nil, errors.WithStack(err)
} }
keySet, err := getKeySet() keySet, err := getKeySet()

View File

@ -1,113 +0,0 @@
package middleware
import (
"crypto/rand"
"fmt"
"math/big"
"net/http"
"time"
"forge.cadoles.com/arcad/edge/pkg/module/auth"
"forge.cadoles.com/arcad/edge/pkg/module/auth/jwt"
"github.com/google/uuid"
"github.com/lestrrat-go/jwx/v2/jwa"
"github.com/lestrrat-go/jwx/v2/jwk"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
const AnonIssuer = "anon"
func AnonymousUser(algo jwa.KeyAlgorithm, key jwk.Key, funcs ...AnonymousUserOptionFunc) func(next http.Handler) http.Handler {
opts := defaultAnonymousUserOptions()
for _, fn := range funcs {
fn(opts)
}
return func(next http.Handler) http.Handler {
handler := func(w http.ResponseWriter, r *http.Request) {
rawToken, err := auth.FindRawToken(r)
// If request already has a raw token, we do nothing
if rawToken != "" && err == nil {
next.ServeHTTP(w, r)
return
}
ctx := r.Context()
uuid, err := uuid.NewUUID()
if err != nil {
logger.Error(ctx, "could not generate uuid for anonymous user", logger.E(errors.WithStack(err)))
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
subject := fmt.Sprintf("%s-%s", AnonIssuer, uuid.String())
preferredUsername, err := generateRandomPreferredUsername(8)
if err != nil {
logger.Error(ctx, "could not generate preferred username for anonymous user", logger.E(errors.WithStack(err)))
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
claims := map[string]any{
auth.ClaimSubject: subject,
auth.ClaimIssuer: AnonIssuer,
auth.ClaimPreferredUsername: preferredUsername,
auth.ClaimEdgeRole: opts.Role,
auth.ClaimEdgeEntrypoint: opts.Entrypoint,
auth.ClaimEdgeTenant: opts.Tenant,
}
token, err := jwt.GenerateSignedToken(algo, key, claims)
if err != nil {
logger.Error(ctx, "could not generate signed token", logger.E(errors.WithStack(err)))
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
cookieDomain, err := opts.GetCookieDomain(r)
if err != nil {
logger.Error(ctx, "could not retrieve cookie domain", logger.E(errors.WithStack(err)))
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
cookie := http.Cookie{
Name: auth.CookieName,
Value: string(token),
Domain: cookieDomain,
HttpOnly: false,
Expires: time.Now().Add(opts.CookieDuration),
Path: "/",
}
http.SetCookie(w, &cookie)
next.ServeHTTP(w, r)
}
return http.HandlerFunc(handler)
}
}
func generateRandomPreferredUsername(size int) (string, error) {
var letters = []rune("0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ")
max := big.NewInt(int64(len(letters)))
b := make([]rune, size)
for i := range b {
idx, err := rand.Int(rand.Reader, max)
if err != nil {
return "", errors.WithStack(err)
}
b[i] = letters[idx.Int64()]
}
return fmt.Sprintf("Anon %s", string(b)), nil
}

View File

@ -1,57 +0,0 @@
package middleware
import (
"net/http"
"time"
)
type GetCookieDomainFunc func(r *http.Request) (string, error)
func defaultGetCookieDomain(r *http.Request) (string, error) {
return "", nil
}
type AnonymousUserOptions struct {
GetCookieDomain GetCookieDomainFunc
CookieDuration time.Duration
Tenant string
Entrypoint string
Role string
}
type AnonymousUserOptionFunc func(*AnonymousUserOptions)
func defaultAnonymousUserOptions() *AnonymousUserOptions {
return &AnonymousUserOptions{
GetCookieDomain: defaultGetCookieDomain,
CookieDuration: 24 * time.Hour,
Tenant: "",
Entrypoint: "",
Role: "",
}
}
func WithCookieOptions(getCookieDomain GetCookieDomainFunc, duration time.Duration) AnonymousUserOptionFunc {
return func(opts *AnonymousUserOptions) {
opts.GetCookieDomain = getCookieDomain
opts.CookieDuration = duration
}
}
func WithTenant(tenant string) AnonymousUserOptionFunc {
return func(opts *AnonymousUserOptions) {
opts.Tenant = tenant
}
}
func WithEntrypoint(entrypoint string) AnonymousUserOptionFunc {
return func(opts *AnonymousUserOptions) {
opts.Entrypoint = entrypoint
}
}
func WithRole(role string) AnonymousUserOptionFunc {
return func(opts *AnonymousUserOptions) {
opts.Role = role
}
}

View File

@ -92,7 +92,7 @@ var Edge=(()=>{var K3=Object.create;var Mi=Object.defineProperty,Y3=Object.defin
</edge-menu-item> </edge-menu-item>
`}_canAccess(t){var a,o;let i=((a=this._profile)==null?void 0:a.edge_role)||"visitor",n=((o=t.metadata)==null?void 0:o.minimumRole)||"visitor";return sb[i]>=sb[n]}_renderProfile(){let t=this._profile;return re` `}_canAccess(t){var a,o;let i=((a=this._profile)==null?void 0:a.edge_role)||"visitor",n=((o=t.metadata)==null?void 0:o.minimumRole)||"visitor";return sb[i]>=sb[n]}_renderProfile(){let t=this._profile;return re`
<edge-menu-item name='profile' label="${(t==null?void 0:t.preferred_username)||"Profile"}" icon-url='${Zm}'> <edge-menu-item name='profile' label="${(t==null?void 0:t.preferred_username)||"Profile"}" icon-url='${Zm}'>
${t&&t.iss!="anon"?re`<edge-menu-sub-item name='login' label='Logout' icon-url='${nb}' link-url='/edge/auth/logout'></edge-menu-sub-item>`:re`<edge-menu-sub-item name='login' label='Login' icon-url='${tb}' link-url='/edge/auth/login'></edge-menu-sub-item>`} ${t?re`<edge-menu-sub-item name='login' label='Logout' icon-url='${nb}' link-url='/edge/auth/logout'></edge-menu-sub-item>`:re`<edge-menu-sub-item name='login' label='Login' icon-url='${tb}' link-url='/edge/auth/login'></edge-menu-sub-item>`}
</edge-menu-item> </edge-menu-item>
`}_handleMenuItemSelected(t){let i=t.detail.element;i.classList.add("selected"),i.classList.remove("unselected");for(let n,a=0;n=this._menuItems[a];a++)n!==i&&(n.unselect(),n.classList.add("unselected"))}_handleMenuItemUnselected(t){if(t.detail.element.classList.remove("selected"),this.renderRoot.querySelectorAll("edge-menu-item.selected").length===0)for(let a,o=0;a=this._menuItems[o];o++)a.classList.remove("unselected")}};le.styles=Ti` `}_handleMenuItemSelected(t){let i=t.detail.element;i.classList.add("selected"),i.classList.remove("unselected");for(let n,a=0;n=this._menuItems[a];a++)n!==i&&(n.unselect(),n.classList.add("unselected"))}_handleMenuItemUnselected(t){if(t.detail.element.classList.remove("selected"),this.renderRoot.querySelectorAll("edge-menu-item.selected").length===0)for(let a,o=0;a=this._menuItems[o];o++)a.classList.remove("unselected")}};le.styles=Ti`
:host { :host {

File diff suppressed because one or more lines are too long

View File

@ -191,7 +191,7 @@ export class Menu extends LitElement {
return html` return html`
<edge-menu-item name='profile' label="${profile?.preferred_username || 'Profile'}" icon-url='${UserCircleIcon}'> <edge-menu-item name='profile' label="${profile?.preferred_username || 'Profile'}" icon-url='${UserCircleIcon}'>
${ ${
profile && profile.iss != "anon" ? profile ?
html`<edge-menu-sub-item name='login' label='Logout' icon-url='${LogoutIcon}' link-url='/edge/auth/logout'></edge-menu-sub-item>` : html`<edge-menu-sub-item name='login' label='Logout' icon-url='${LogoutIcon}' link-url='/edge/auth/logout'></edge-menu-sub-item>` :
html`<edge-menu-sub-item name='login' label='Login' icon-url='${LoginIcon}' link-url='/edge/auth/login'></edge-menu-sub-item>` html`<edge-menu-sub-item name='login' label='Login' icon-url='${LoginIcon}' link-url='/edge/auth/login'></edge-menu-sub-item>`
} }

View File

@ -1,35 +1,25 @@
package client package client
import ( import (
"bytes"
"context" "context"
"io" "io"
"github.com/keegancsmith/rpc" "github.com/keegancsmith/rpc"
"forge.cadoles.com/arcad/edge/pkg/storage" "forge.cadoles.com/arcad/edge/pkg/storage"
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/server/blob"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
type BlobBucket struct { type BlobBucket struct {
name string name string
id blob.BucketID closed bool
client *rpc.Client client *rpc.Client
} }
// Size implements storage.BlobBucket // Size implements storage.BlobBucket
func (b *BlobBucket) Size(ctx context.Context) (int64, error) { func (b *BlobBucket) Size(ctx context.Context) (int64, error) {
args := blob.GetBucketSizeArgs{ return 0, nil
BucketID: b.id,
}
reply := blob.GetBucketSizeReply{}
if err := b.client.Call(ctx, "Service.GetBucketSize", args, &reply); err != nil {
return 0, errors.WithStack(err)
}
return reply.Size, nil
} }
// Name implements storage.BlobBucket // Name implements storage.BlobBucket
@ -39,197 +29,87 @@ func (b *BlobBucket) Name() string {
// Close implements storage.BlobBucket // Close implements storage.BlobBucket
func (b *BlobBucket) Close() error { func (b *BlobBucket) Close() error {
args := blob.CloseBucketArgs{
BucketID: b.id,
}
reply := blob.CloseBucketReply{}
if err := b.client.Call(context.Background(), "Service.CloseBucket", args, &reply); err != nil {
return errors.WithStack(err)
}
return nil return nil
} }
// Delete implements storage.BlobBucket // Delete implements storage.BlobBucket
func (b *BlobBucket) Delete(ctx context.Context, id storage.BlobID) error { func (b *BlobBucket) Delete(ctx context.Context, id storage.BlobID) error {
args := blob.DeleteBucketArgs{
BucketName: b.name,
}
reply := blob.DeleteBucketReply{}
if err := b.client.Call(context.Background(), "Service.DeleteBucket", args, &reply); err != nil {
return errors.WithStack(err)
}
return nil return nil
} }
// Get implements storage.BlobBucket // Get implements storage.BlobBucket
func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobInfo, error) { func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobInfo, error) {
args := blob.GetBlobInfoArgs{ return nil, nil
Name: b.name,
BlobID: id,
}
reply := blob.GetBlobInfoReply{}
if err := b.client.Call(context.Background(), "Service.GetBlobInfo", args, &reply); err != nil {
return nil, errors.WithStack(err)
}
return reply.BlobInfo, nil
} }
// List implements storage.BlobBucket // List implements storage.BlobBucket
func (b *BlobBucket) List(ctx context.Context) ([]storage.BlobInfo, error) { func (b *BlobBucket) List(ctx context.Context) ([]storage.BlobInfo, error) {
args := blob.ListBlobInfoArgs{ return nil, nil
BucketID: b.id,
}
reply := blob.ListBlobInfoReply{}
if err := b.client.Call(context.Background(), "Service.ListBlobInfo", args, &reply); err != nil {
return nil, errors.WithStack(err)
}
return reply.BlobInfos, nil
} }
// NewReader implements storage.BlobBucket // NewReader implements storage.BlobBucket
func (b *BlobBucket) NewReader(ctx context.Context, id storage.BlobID) (io.ReadSeekCloser, error) { func (b *BlobBucket) NewReader(ctx context.Context, id storage.BlobID) (io.ReadSeekCloser, error) {
args := blob.NewBlobReaderArgs{ if b.closed {
BucketID: b.id, return nil, errors.WithStack(storage.ErrBucketClosed)
BlobID: id,
}
reply := blob.NewBlobReaderReply{}
if err := b.client.Call(context.Background(), "Service.NewBlobReader", args, &reply); err != nil {
return nil, errors.WithStack(err)
} }
return &blobReaderCloser{ return &blobReaderCloser{
readerID: reply.ReaderID, id: id,
bucket: b.name,
client: b.client, client: b.client,
}, nil }, nil
} }
// NewWriter implements storage.BlobBucket // NewWriter implements storage.BlobBucket
func (b *BlobBucket) NewWriter(ctx context.Context, id storage.BlobID) (io.WriteCloser, error) { func (b *BlobBucket) NewWriter(ctx context.Context, id storage.BlobID) (io.WriteCloser, error) {
args := blob.NewBlobWriterArgs{ if b.closed {
BucketID: b.id, return nil, errors.WithStack(storage.ErrBucketClosed)
BlobID: id,
}
reply := blob.NewBlobWriterReply{}
if err := b.client.Call(context.Background(), "Service.NewBlobWriter", args, &reply); err != nil {
return nil, errors.WithStack(err)
} }
return &blobWriterCloser{ return &blobWriterCloser{
blobID: id, id: id,
writerID: reply.WriterID, bucket: b.name,
buf: bytes.Buffer{},
client: b.client, client: b.client,
}, nil }, nil
} }
type blobWriterCloser struct { type blobWriterCloser struct {
blobID storage.BlobID id storage.BlobID
writerID blob.WriterID bucket string
client *rpc.Client client *rpc.Client
buf bytes.Buffer
closed bool
} }
// Write implements io.WriteCloser // Write implements io.WriteCloser
func (bwc *blobWriterCloser) Write(data []byte) (int, error) { func (wbc *blobWriterCloser) Write(p []byte) (int, error) {
args := blob.WriteBlobArgs{ return 0, nil
WriterID: bwc.writerID,
Data: data,
}
reply := blob.WriteBlobReply{}
if err := bwc.client.Call(context.Background(), "Service.WriteBlob", args, &reply); err != nil {
return 0, errors.WithStack(err)
}
return reply.Written, nil
} }
// Close implements io.WriteCloser // Close implements io.WriteCloser
func (bwc *blobWriterCloser) Close() error { func (wbc *blobWriterCloser) Close() error {
args := blob.CloseWriterArgs{
WriterID: bwc.writerID,
}
reply := blob.CloseBucketReply{}
if err := bwc.client.Call(context.Background(), "Service.CloseWriter", args, &reply); err != nil {
return errors.WithStack(err)
}
return nil return nil
} }
type blobReaderCloser struct { type blobReaderCloser struct {
readerID blob.ReaderID id storage.BlobID
bucket string
client *rpc.Client client *rpc.Client
} }
// Read implements io.ReadSeekCloser // Read implements io.ReadSeekCloser
func (brc *blobReaderCloser) Read(p []byte) (int, error) { func (brc *blobReaderCloser) Read(p []byte) (int, error) {
args := blob.ReadBlobArgs{ return 0, nil
ReaderID: brc.readerID,
Length: len(p),
}
reply := blob.ReadBlobReply{}
if err := brc.client.Call(context.Background(), "Service.ReadBlob", args, &reply); err != nil {
return 0, errors.WithStack(err)
}
copy(p, reply.Data)
if reply.EOF {
return reply.Read, io.EOF
}
return reply.Read, nil
} }
// Seek implements io.ReadSeekCloser // Seek implements io.ReadSeekCloser
func (brc *blobReaderCloser) Seek(offset int64, whence int) (int64, error) { func (brc *blobReaderCloser) Seek(offset int64, whence int) (int64, error) {
args := blob.SeekBlobArgs{ return 0, nil
ReaderID: brc.readerID,
Offset: offset,
Whence: whence,
}
reply := blob.SeekBlobReply{}
if err := brc.client.Call(context.Background(), "Service.SeekBlob", args, &reply); err != nil {
return 0, errors.WithStack(err)
}
return reply.Read, nil
} }
// Close implements io.ReadSeekCloser // Close implements io.ReadSeekCloser
func (brc *blobReaderCloser) Close() error { func (brc *blobReaderCloser) Close() error {
args := blob.CloseReaderArgs{
ReaderID: brc.readerID,
}
reply := blob.CloseReaderReply{}
if err := brc.client.Call(context.Background(), "Service.CloseReader", args, &reply); err != nil {
return errors.WithStack(err)
}
return nil return nil
} }

View File

@ -6,7 +6,7 @@ import (
"github.com/keegancsmith/rpc" "github.com/keegancsmith/rpc"
"forge.cadoles.com/arcad/edge/pkg/storage" "forge.cadoles.com/arcad/edge/pkg/storage"
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/server/blob" "forge.cadoles.com/arcad/edge/pkg/storage/rpc/server"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -16,11 +16,11 @@ type BlobStore struct {
// DeleteBucket implements storage.BlobStore. // DeleteBucket implements storage.BlobStore.
func (s *BlobStore) DeleteBucket(ctx context.Context, name string) error { func (s *BlobStore) DeleteBucket(ctx context.Context, name string) error {
args := &blob.DeleteBucketArgs{ args := &server.DeleteBucketArgs{
BucketName: name, Name: name,
} }
if err := s.client.Call(ctx, "Service.DeleteBucket", args, nil); err != nil { if err := s.client.Call(ctx, "BlobStore.DeleteBucket", args, nil); err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }
@ -29,31 +29,23 @@ func (s *BlobStore) DeleteBucket(ctx context.Context, name string) error {
// ListBuckets implements storage.BlobStore. // ListBuckets implements storage.BlobStore.
func (s *BlobStore) ListBuckets(ctx context.Context) ([]string, error) { func (s *BlobStore) ListBuckets(ctx context.Context) ([]string, error) {
args := &blob.ListBucketsArgs{} panic("unimplemented")
reply := blob.ListBucketsReply{}
if err := s.client.Call(ctx, "Service.ListBuckets", args, &reply); err != nil {
return nil, errors.WithStack(err)
}
return reply.Buckets, nil
} }
// OpenBucket implements storage.BlobStore. // OpenBucket implements storage.BlobStore.
func (s *BlobStore) OpenBucket(ctx context.Context, name string) (storage.BlobBucket, error) { func (s *BlobStore) OpenBucket(ctx context.Context, name string) (storage.BlobBucket, error) {
args := &blob.OpenBucketArgs{ args := &server.OpenBucketArgs{
BucketName: name, Name: name,
} }
reply := &blob.OpenBucketReply{} reply := &server.OpenBucketReply{}
if err := s.client.Call(ctx, "Service.OpenBucket", args, reply); err != nil { if err := s.client.Call(ctx, "BlobStore.OpenBucket", args, reply); err != nil {
return nil, errors.WithStack(err) return nil, errors.WithStack(err)
} }
return &BlobBucket{ return &BlobBucket{
name: name, name: reply.BucketName,
id: reply.BucketID, closed: false,
client: s.client, client: s.client,
}, nil }, nil
} }

View File

@ -20,11 +20,7 @@ func TestBlobStore(t *testing.T) {
t.Parallel() t.Parallel()
logger.SetLevel(logger.LevelDebug) logger.SetLevel(logger.LevelDebug)
httpServer, err := startNewServer() httpServer := startNewServer(t)
if err != nil {
t.Fatalf("%+v", errors.WithStack(err))
}
defer httpServer.Close() defer httpServer.Close()
serverAddr := httpServer.Listener.Addr() serverAddr := httpServer.Listener.Addr()
@ -44,56 +40,27 @@ func TestBlobStore(t *testing.T) {
testsuite.TestBlobStore(t, store) testsuite.TestBlobStore(t, store)
} }
func BenchmarkBlobStore(t *testing.B) { func getSQLiteBlobstore(t *testing.T) *sqlite.BlobStore {
logger.SetLevel(logger.LevelError)
httpServer, err := startNewServer()
if err != nil {
t.Fatalf("%+v", errors.WithStack(err))
}
defer httpServer.Close()
serverAddr := httpServer.Listener.Addr()
client, err := rpc.DialHTTPPath(
serverAddr.Network(),
serverAddr.String(),
"",
)
if err != nil {
t.Fatalf("%+v", errors.WithStack(err))
}
defer client.Close()
store := NewBlobStore(client)
testsuite.BenchmarkBlobStore(t, store)
}
func getSQLiteBlobstore() (*sqlite.BlobStore, error) {
file := "./testdata/blobstore_test.sqlite" file := "./testdata/blobstore_test.sqlite"
if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) { if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) {
return nil, errors.WithStack(err) t.Fatalf("%+v", errors.WithStack(err))
} }
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds()) dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
store := sqlite.NewBlobStore(dsn) store := sqlite.NewBlobStore(dsn)
return store, nil return store
} }
func startNewServer() (*httptest.Server, error) { func startNewServer(t *testing.T) *httptest.Server {
store, err := getSQLiteBlobstore() store := getSQLiteBlobstore(t)
if err != nil {
return nil, errors.WithStack(err)
}
server := server.NewBlobStoreServer(store) server := server.NewBlobStoreServer(store)
httpServer := httptest.NewServer(server) httpServer := httptest.NewServer(server)
httpServerAddr := httpServer.Listener.Addr().String()
return httpServer, nil t.Logf("Test HTTP RPC server listening on %s", httpServerAddr)
return httpServer
} }

View File

@ -1,31 +0,0 @@
package blob
import (
"context"
"github.com/pkg/errors"
)
type CloseBucketArgs struct {
BucketID BucketID
}
type CloseBucketReply struct {
}
func (s *Service) CloseBucket(ctx context.Context, args *CloseBucketArgs, reply *CloseBucketReply) error {
bucket, err := s.getOpenedBucket(args.BucketID)
if err != nil {
return errors.WithStack(err)
}
if err := bucket.Close(); err != nil {
return errors.WithStack(err)
}
s.buckets.Delete(args.BucketID)
*reply = CloseBucketReply{}
return nil
}

View File

@ -1,31 +0,0 @@
package blob
import (
"context"
"github.com/pkg/errors"
)
type CloseReaderArgs struct {
ReaderID ReaderID
}
type CloseReaderReply struct {
}
func (s *Service) CloseReader(ctx context.Context, args *CloseReaderArgs, reply *CloseReaderReply) error {
reader, err := s.getOpenedReader(args.ReaderID)
if err != nil {
return errors.WithStack(err)
}
if err := reader.Close(); err != nil {
return errors.WithStack(err)
}
s.readers.Delete(args.ReaderID)
*reply = CloseReaderReply{}
return nil
}

View File

@ -1,31 +0,0 @@
package blob
import (
"context"
"github.com/pkg/errors"
)
type CloseWriterArgs struct {
WriterID WriterID
}
type CloseWriterReply struct {
}
func (s *Service) CloseWriter(ctx context.Context, args *CloseWriterArgs, reply *CloseWriterReply) error {
writer, err := s.getOpenedWriter(args.WriterID)
if err != nil {
return errors.WithStack(err)
}
if err := writer.Close(); err != nil {
return errors.WithStack(err)
}
s.writers.Delete(args.WriterID)
*reply = CloseWriterReply{}
return nil
}

View File

@ -1,22 +0,0 @@
package blob
import (
"context"
"github.com/pkg/errors"
)
type DeleteBucketArgs struct {
BucketName string
}
type DeleteBucketReply struct {
}
func (s *Service) DeleteBucket(ctx context.Context, args *DeleteBucketArgs, reply *DeleteBucketReply) error {
if err := s.store.DeleteBucket(ctx, args.BucketName); err != nil {
return errors.WithStack(err)
}
return nil
}

View File

@ -1,36 +0,0 @@
package blob
import (
"context"
"forge.cadoles.com/arcad/edge/pkg/storage"
"github.com/pkg/errors"
)
type GetBlobInfoArgs struct {
Name string
BlobID storage.BlobID
BucketID BucketID
}
type GetBlobInfoReply struct {
BlobInfo storage.BlobInfo
}
func (s *Service) GetBlobInfo(ctx context.Context, args *GetBlobInfoArgs, reply *GetBlobInfoReply) error {
bucket, err := s.getOpenedBucket(args.BucketID)
if err != nil {
return errors.WithStack(err)
}
blobInfo, err := bucket.Get(ctx, args.BlobID)
if err != nil {
return errors.WithStack(err)
}
*reply = GetBlobInfoReply{
BlobInfo: blobInfo,
}
return nil
}

View File

@ -1,33 +0,0 @@
package blob
import (
"context"
"github.com/pkg/errors"
)
type GetBucketSizeArgs struct {
BucketID BucketID
}
type GetBucketSizeReply struct {
Size int64
}
func (s *Service) GetBucketSize(ctx context.Context, args *GetBucketSizeArgs, reply *GetBucketSizeReply) error {
bucket, err := s.getOpenedBucket(args.BucketID)
if err != nil {
return errors.WithStack(err)
}
size, err := bucket.Size(ctx)
if err != nil {
return errors.WithStack(err)
}
*reply = GetBucketSizeReply{
Size: size,
}
return nil
}

View File

@ -1,34 +0,0 @@
package blob
import (
"context"
"forge.cadoles.com/arcad/edge/pkg/storage"
"github.com/pkg/errors"
)
type ListBlobInfoArgs struct {
BucketID BucketID
}
type ListBlobInfoReply struct {
BlobInfos []storage.BlobInfo
}
func (s *Service) ListBlobInfo(ctx context.Context, args *ListBlobInfoArgs, reply *ListBlobInfoReply) error {
bucket, err := s.getOpenedBucket(args.BucketID)
if err != nil {
return errors.WithStack(err)
}
blobInfos, err := bucket.List(ctx)
if err != nil {
return errors.WithStack(err)
}
*reply = ListBlobInfoReply{
BlobInfos: blobInfos,
}
return nil
}

View File

@ -1,27 +0,0 @@
package blob
import (
"context"
"github.com/pkg/errors"
)
type ListBucketsArgs struct {
}
type ListBucketsReply struct {
Buckets []string
}
func (s *Service) ListBuckets(ctx context.Context, args *ListBucketsArgs, reply *ListBucketsReply) error {
buckets, err := s.store.ListBuckets(ctx)
if err != nil {
return errors.WithStack(err)
}
*reply = ListBucketsReply{
Buckets: buckets,
}
return nil
}

View File

@ -1,57 +0,0 @@
package blob
import (
"context"
"io"
"forge.cadoles.com/arcad/edge/pkg/storage"
"github.com/pkg/errors"
)
type NewBlobReaderArgs struct {
BlobID storage.BlobID
BucketID BucketID
}
type NewBlobReaderReply struct {
ReaderID ReaderID
}
func (s *Service) NewBlobReader(ctx context.Context, args *NewBlobReaderArgs, reply *NewBlobReaderReply) error {
bucket, err := s.getOpenedBucket(args.BucketID)
if err != nil {
return errors.WithStack(err)
}
readerID, err := NewReaderID()
if err != nil {
return errors.WithStack(err)
}
reader, err := bucket.NewReader(ctx, args.BlobID)
if err != nil {
return errors.WithStack(err)
}
s.readers.Store(readerID, reader)
*reply = NewBlobReaderReply{
ReaderID: readerID,
}
return nil
}
func (s *Service) getOpenedReader(id ReaderID) (io.ReadSeekCloser, error) {
raw, exists := s.readers.Load(id)
if !exists {
return nil, errors.Errorf("could not find writer '%s'", id)
}
reader, ok := raw.(io.ReadSeekCloser)
if !ok {
return nil, errors.Errorf("unexpected type '%T' for writer", raw)
}
return reader, nil
}

View File

@ -1,57 +0,0 @@
package blob
import (
"context"
"io"
"forge.cadoles.com/arcad/edge/pkg/storage"
"github.com/pkg/errors"
)
type NewBlobWriterArgs struct {
BlobID storage.BlobID
BucketID BucketID
}
type NewBlobWriterReply struct {
WriterID WriterID
}
func (s *Service) NewBlobWriter(ctx context.Context, args *NewBlobWriterArgs, reply *NewBlobWriterReply) error {
bucket, err := s.getOpenedBucket(args.BucketID)
if err != nil {
return errors.WithStack(err)
}
writerID, err := NewWriterID()
if err != nil {
return errors.WithStack(err)
}
writer, err := bucket.NewWriter(ctx, args.BlobID)
if err != nil {
return errors.WithStack(err)
}
s.writers.Store(writerID, writer)
*reply = NewBlobWriterReply{
WriterID: writerID,
}
return nil
}
func (s *Service) getOpenedWriter(id WriterID) (io.WriteCloser, error) {
raw, exists := s.writers.Load(id)
if !exists {
return nil, errors.Errorf("could not find writer '%s'", id)
}
writer, ok := raw.(io.WriteCloser)
if !ok {
return nil, errors.Errorf("unexpected type '%T' for writer", raw)
}
return writer, nil
}

View File

@ -1,50 +0,0 @@
package blob
import (
"context"
"forge.cadoles.com/arcad/edge/pkg/storage"
"github.com/pkg/errors"
)
type OpenBucketArgs struct {
BucketName string
}
type OpenBucketReply struct {
BucketID BucketID
}
func (s *Service) OpenBucket(ctx context.Context, args *OpenBucketArgs, reply *OpenBucketReply) error {
bucket, err := s.store.OpenBucket(ctx, args.BucketName)
if err != nil {
return errors.WithStack(err)
}
bucketID, err := NewBucketID()
if err != nil {
return errors.WithStack(err)
}
s.buckets.Store(bucketID, bucket)
*reply = OpenBucketReply{
BucketID: bucketID,
}
return nil
}
func (s *Service) getOpenedBucket(id BucketID) (storage.BlobBucket, error) {
raw, exists := s.buckets.Load(id)
if !exists {
return nil, errors.WithStack(storage.ErrBucketClosed)
}
bucket, ok := raw.(storage.BlobBucket)
if !ok {
return nil, errors.Errorf("unexpected type '%T' for blob bucket", raw)
}
return bucket, nil
}

View File

@ -1,41 +0,0 @@
package blob
import (
"context"
"io"
"github.com/pkg/errors"
)
type ReadBlobArgs struct {
ReaderID ReaderID
Length int
}
type ReadBlobReply struct {
Data []byte
Read int
EOF bool
}
func (s *Service) ReadBlob(ctx context.Context, args *ReadBlobArgs, reply *ReadBlobReply) error {
reader, err := s.getOpenedReader(args.ReaderID)
if err != nil {
return errors.WithStack(err)
}
buff := make([]byte, args.Length)
read, err := reader.Read(buff)
if err != nil && !errors.Is(err, io.EOF) {
return errors.WithStack(err)
}
*reply = ReadBlobReply{
Read: read,
Data: buff,
EOF: errors.Is(err, io.EOF),
}
return nil
}

View File

@ -1,38 +0,0 @@
package blob
import (
"context"
"io"
"github.com/pkg/errors"
)
type SeekBlobArgs struct {
ReaderID ReaderID
Offset int64
Whence int
}
type SeekBlobReply struct {
Read int64
EOF bool
}
func (s *Service) SeekBlob(ctx context.Context, args *SeekBlobArgs, reply *SeekBlobReply) error {
reader, err := s.getOpenedReader(args.ReaderID)
if err != nil {
return errors.WithStack(err)
}
read, err := reader.Seek(args.Offset, args.Whence)
if err != nil && !errors.Is(err, io.EOF) {
return errors.WithStack(err)
}
*reply = SeekBlobReply{
Read: read,
EOF: errors.Is(err, io.EOF),
}
return nil
}

View File

@ -1,60 +0,0 @@
package blob
import (
"fmt"
"sync"
"forge.cadoles.com/arcad/edge/pkg/storage"
"github.com/google/uuid"
"github.com/pkg/errors"
)
type BucketID string
type WriterID string
type ReaderID string
type Service struct {
store storage.BlobStore
buckets sync.Map
writers sync.Map
readers sync.Map
}
func NewService(store storage.BlobStore) *Service {
return &Service{
store: store,
}
}
func NewBucketID() (BucketID, error) {
uuid, err := uuid.NewUUID()
if err != nil {
return "", errors.WithStack(err)
}
id := BucketID(fmt.Sprintf("bucket-%s", uuid.String()))
return id, nil
}
func NewWriterID() (WriterID, error) {
uuid, err := uuid.NewUUID()
if err != nil {
return "", errors.WithStack(err)
}
id := WriterID(fmt.Sprintf("writer-%s", uuid.String()))
return id, nil
}
func NewReaderID() (ReaderID, error) {
uuid, err := uuid.NewUUID()
if err != nil {
return "", errors.WithStack(err)
}
id := ReaderID(fmt.Sprintf("reader-%s", uuid.String()))
return id, nil
}

View File

@ -1,34 +0,0 @@
package blob
import (
"context"
"github.com/pkg/errors"
)
type WriteBlobArgs struct {
WriterID WriterID
Data []byte
}
type WriteBlobReply struct {
Written int
}
func (s *Service) WriteBlob(ctx context.Context, args *WriteBlobArgs, reply *WriteBlobReply) error {
writer, err := s.getOpenedWriter(args.WriterID)
if err != nil {
return errors.WithStack(err)
}
written, err := writer.Write(args.Data)
if err != nil {
return errors.WithStack(err)
}
*reply = WriteBlobReply{
Written: written,
}
return nil
}

View File

@ -0,0 +1,54 @@
package server
import (
"context"
"forge.cadoles.com/arcad/edge/pkg/storage"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
type BlobStore struct {
store storage.BlobStore
}
type DeleteBucketArgs struct {
Name string
}
type DeleteBucketReply struct {
}
func (s *BlobStore) DeleteBucket(ctx context.Context, args *DeleteBucketArgs, reply *DeleteBucketReply) error {
return nil
}
type OpenBucketArgs struct {
Name string
}
type OpenBucketReply struct {
BucketName string
}
func (s *BlobStore) OpenBucket(ctx context.Context, args *OpenBucketArgs, reply *OpenBucketReply) error {
bucket, err := s.store.OpenBucket(ctx, args.Name)
if err != nil {
return errors.WithStack(err)
}
defer func() {
if err := bucket.Close(); err != nil {
logger.Error(ctx, "could not close bucket", logger.E(errors.WithStack(err)))
}
}()
*reply = OpenBucketReply{
BucketName: bucket.Name(),
}
return nil
}
func NewBlobStore(store storage.BlobStore) *BlobStore {
return &BlobStore{store}
}

View File

@ -4,12 +4,11 @@ import (
"github.com/keegancsmith/rpc" "github.com/keegancsmith/rpc"
"forge.cadoles.com/arcad/edge/pkg/storage" "forge.cadoles.com/arcad/edge/pkg/storage"
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/server/blob"
) )
func NewBlobStoreServer(store storage.BlobStore) *rpc.Server { func NewBlobStoreServer(store storage.BlobStore) *rpc.Server {
server := rpc.NewServer() server := rpc.NewServer()
server.Register(blob.NewService(store)) server.Register(NewBlobStore(store))
return server return server
} }

View File

@ -26,18 +26,3 @@ func TestBlobStore(t *testing.T) {
testsuite.TestBlobStore(t, store) testsuite.TestBlobStore(t, store)
} }
func BenchmarkBlobStore(t *testing.B) {
logger.SetLevel(logger.LevelError)
file := "./testdata/blobstore_test.sqlite"
if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) {
t.Fatalf("%+v", errors.WithStack(err))
}
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
store := NewBlobStore(dsn)
testsuite.BenchmarkBlobStore(t, store)
}

View File

@ -1,79 +0,0 @@
package testsuite
import (
"bytes"
"context"
"fmt"
"io"
"testing"
"forge.cadoles.com/arcad/edge/pkg/storage"
"github.com/pkg/errors"
)
func BenchmarkBlobStore(t *testing.B, store storage.BlobStore) {
t.Run("BlobCreateUpdateReadDelete", func(t *testing.B) {
for i := 0; i < t.N; i++ {
bucketName := fmt.Sprintf("bucket-%d", i)
if err := runBlobCreateUpdateReadDelete(store, bucketName); err != nil {
t.Fatalf("%+v", errors.WithStack(err))
}
}
})
}
func runBlobCreateUpdateReadDelete(store storage.BlobStore, bucketName string) error {
ctx := context.Background()
bucket, err := store.OpenBucket(ctx, bucketName)
if err != nil {
return errors.WithStack(err)
}
blobID := storage.NewBlobID()
writer, err := bucket.NewWriter(ctx, blobID)
if err != nil {
return errors.WithStack(err)
}
data := []byte("foo")
if _, err = writer.Write(data); err != nil {
return errors.WithStack(err)
}
if err := writer.Close(); err != nil {
return errors.WithStack(err)
}
reader, err := bucket.NewReader(ctx, blobID)
if err != nil {
return errors.WithStack(err)
}
var buf bytes.Buffer
if _, err = io.Copy(&buf, reader); err != nil {
return errors.WithStack(err)
}
if err := reader.Close(); err != nil {
return errors.WithStack(err)
}
if err := bucket.Delete(ctx, blobID); err != nil {
return errors.WithStack(err)
}
if err := bucket.Close(); err != nil {
return errors.WithStack(err)
}
if err := store.DeleteBucket(ctx, bucketName); err != nil {
return errors.WithStack(err)
}
return nil
}

View File

@ -17,11 +17,9 @@ type blobStoreTestCase struct {
var blobStoreTestCases = []blobStoreTestCase{ var blobStoreTestCases = []blobStoreTestCase{
{ {
Name: "Open then delete bucket", Name: "Open new bucket",
Run: func(ctx context.Context, store storage.BlobStore) error { Run: func(ctx context.Context, store storage.BlobStore) error {
bucketName := "open-new-bucket" bucket, err := store.OpenBucket(ctx, "open-new-bucket")
bucket, err := store.OpenBucket(ctx, bucketName)
if err != nil { if err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }
@ -52,23 +50,6 @@ var blobStoreTestCases = []blobStoreTestCase{
return errors.WithStack(err) return errors.WithStack(err)
} }
if err := store.DeleteBucket(ctx, bucketName); err != nil {
return errors.WithStack(err)
}
buckets, err := store.ListBuckets(ctx)
if err != nil {
return errors.WithStack(err)
}
for _, b := range buckets {
if b != bucketName {
continue
}
return errors.Errorf("bucket '%s' should be deleted", bucketName)
}
return nil return nil
}, },
}, },

View File

@ -1,8 +1,7 @@
{ {
"compilerOptions": { "compilerOptions": {
"lib": ["ES2015", "DOM"], "lib": ["ES2015", "DOM"],
"experimentalDecorators": true, "experimentalDecorators": true
"esModuleInterop": true
}, },
"include": ["pkg/sdk/client/src/index.d.ts", "pkg/sdk/client/src/**/*.ts", "pkg/sdk/client/src/**/*.svg"] "include": ["pkg/sdk/client/src/index.d.ts", "pkg/sdk/client/src/**/*.ts", "pkg/sdk/client/src/**/*.svg"]
} }