Compare commits
5 Commits
05a9861e6f
...
c74c6c9548
Author | SHA1 | Date | |
---|---|---|---|
c74c6c9548 | |||
c3535a4a9b | |||
7e58551f6a | |||
41d5db6321 | |||
8eb441daee |
@ -22,6 +22,7 @@ import (
|
|||||||
appModuleMemory "forge.cadoles.com/arcad/edge/pkg/module/app/memory"
|
appModuleMemory "forge.cadoles.com/arcad/edge/pkg/module/app/memory"
|
||||||
authModule "forge.cadoles.com/arcad/edge/pkg/module/auth"
|
authModule "forge.cadoles.com/arcad/edge/pkg/module/auth"
|
||||||
authHTTP "forge.cadoles.com/arcad/edge/pkg/module/auth/http"
|
authHTTP "forge.cadoles.com/arcad/edge/pkg/module/auth/http"
|
||||||
|
authModuleMiddleware "forge.cadoles.com/arcad/edge/pkg/module/auth/middleware"
|
||||||
"forge.cadoles.com/arcad/edge/pkg/module/blob"
|
"forge.cadoles.com/arcad/edge/pkg/module/blob"
|
||||||
"forge.cadoles.com/arcad/edge/pkg/module/cast"
|
"forge.cadoles.com/arcad/edge/pkg/module/cast"
|
||||||
"forge.cadoles.com/arcad/edge/pkg/module/fetch"
|
"forge.cadoles.com/arcad/edge/pkg/module/fetch"
|
||||||
@ -215,6 +216,11 @@ func runApp(ctx context.Context, path string, address string, storageFile string
|
|||||||
authModule.WithJWT(dummyKeySet),
|
authModule.WithJWT(dummyKeySet),
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
|
appHTTP.WithHTTPMiddlewares(
|
||||||
|
authModuleMiddleware.AnonymousUser(
|
||||||
|
jwa.HS256, key,
|
||||||
|
),
|
||||||
|
),
|
||||||
)
|
)
|
||||||
if err := handler.Load(bundle); err != nil {
|
if err := handler.Load(bundle); err != nil {
|
||||||
return errors.Wrap(err, "could not load app bundle")
|
return errors.Wrap(err, "could not load app bundle")
|
||||||
|
@ -2,6 +2,14 @@
|
|||||||
|
|
||||||
Ce module permet de récupérer des informations concernant l'utilisateur connecté et ses attributs.
|
Ce module permet de récupérer des informations concernant l'utilisateur connecté et ses attributs.
|
||||||
|
|
||||||
|
### Utilisateurs anonymes
|
||||||
|
|
||||||
|
Edge génère automatiquement une session pour les utilisateurs anonymes. Ainsi, qu'un utilisateur soit identifié ou non les `claims` suivants seront toujours valués:
|
||||||
|
|
||||||
|
- `auth.CLAIM_SUBJECT`
|
||||||
|
- `auth.CLAIM_PREFERRED_USERNAME`
|
||||||
|
- `auth.CLAIM_ISSUER` (prendra la valeur `anon` dans le cas d'un utilisateur anonyme)
|
||||||
|
|
||||||
## Méthodes
|
## Méthodes
|
||||||
|
|
||||||
### `auth.getClaim(ctx: Context, name: string): string`
|
### `auth.getClaim(ctx: Context, name: string): string`
|
||||||
|
@ -43,12 +43,6 @@ function onClientMessage(ctx, message) {
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
## Propriétés
|
|
||||||
|
|
||||||
### `context.SESSION_ID`
|
|
||||||
|
|
||||||
Clé permettant de récupérer la clé de session associé au client émetteur du message courant.
|
|
||||||
|
|
||||||
#### Usage
|
#### Usage
|
||||||
|
|
||||||
```js
|
```js
|
||||||
|
1
go.mod
1
go.mod
@ -15,6 +15,7 @@ require (
|
|||||||
github.com/go-playground/universal-translator v0.16.0 // indirect
|
github.com/go-playground/universal-translator v0.16.0 // indirect
|
||||||
github.com/goccy/go-json v0.9.11 // indirect
|
github.com/goccy/go-json v0.9.11 // indirect
|
||||||
github.com/gogo/protobuf v0.0.0-20161014173244-50d1bd39ce4e // indirect
|
github.com/gogo/protobuf v0.0.0-20161014173244-50d1bd39ce4e // indirect
|
||||||
|
github.com/keegancsmith/rpc v1.3.0 // indirect
|
||||||
github.com/leodido/go-urn v1.1.0 // indirect
|
github.com/leodido/go-urn v1.1.0 // indirect
|
||||||
github.com/lestrrat-go/blackmagic v1.0.1 // indirect
|
github.com/lestrrat-go/blackmagic v1.0.1 // indirect
|
||||||
github.com/lestrrat-go/httpcc v1.0.1 // indirect
|
github.com/lestrrat-go/httpcc v1.0.1 // indirect
|
||||||
|
2
go.sum
2
go.sum
@ -201,6 +201,8 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1
|
|||||||
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
|
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
|
||||||
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
|
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
|
||||||
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
|
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
|
||||||
|
github.com/keegancsmith/rpc v1.3.0 h1:wGWOpjcNrZaY8GDYZJfvyxmlLljm3YQWF+p918DXtDk=
|
||||||
|
github.com/keegancsmith/rpc v1.3.0/go.mod h1:6O2xnOGjPyvIPbvp0MdrOe5r6cu1GZ4JoTzpzDhWeo0=
|
||||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||||
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
|
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
|
||||||
|
@ -97,6 +97,10 @@ func NewHandler(funcs ...HandlerOptionFunc) *Handler {
|
|||||||
bus: opts.Bus,
|
bus: opts.Bus,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, middleware := range opts.HTTPMiddlewares {
|
||||||
|
router.Use(middleware)
|
||||||
|
}
|
||||||
|
|
||||||
router.Route("/edge", func(r chi.Router) {
|
router.Route("/edge", func(r chi.Router) {
|
||||||
r.Route("/sdk", func(r chi.Router) {
|
r.Route("/sdk", func(r chi.Router) {
|
||||||
r.Get("/client.js", handler.handleSDKClient)
|
r.Get("/client.js", handler.handleSDKClient)
|
||||||
|
@ -18,6 +18,7 @@ type HandlerOptions struct {
|
|||||||
UploadMaxFileSize int64
|
UploadMaxFileSize int64
|
||||||
HTTPClient *http.Client
|
HTTPClient *http.Client
|
||||||
HTTPMounts []func(r chi.Router)
|
HTTPMounts []func(r chi.Router)
|
||||||
|
HTTPMiddlewares []func(next http.Handler) http.Handler
|
||||||
}
|
}
|
||||||
|
|
||||||
func defaultHandlerOptions() *HandlerOptions {
|
func defaultHandlerOptions() *HandlerOptions {
|
||||||
@ -34,7 +35,8 @@ func defaultHandlerOptions() *HandlerOptions {
|
|||||||
HTTPClient: &http.Client{
|
HTTPClient: &http.Client{
|
||||||
Timeout: time.Second * 30,
|
Timeout: time.Second * 30,
|
||||||
},
|
},
|
||||||
HTTPMounts: make([]func(r chi.Router), 0),
|
HTTPMounts: make([]func(r chi.Router), 0),
|
||||||
|
HTTPMiddlewares: make([]func(http.Handler) http.Handler, 0),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -75,3 +77,9 @@ func WithHTTPMounts(mounts ...func(r chi.Router)) HandlerOptionFunc {
|
|||||||
opts.HTTPMounts = mounts
|
opts.HTTPMounts = mounts
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithHTTPMiddlewares(middlewares ...func(http.Handler) http.Handler) HandlerOptionFunc {
|
||||||
|
return func(opts *HandlerOptions) {
|
||||||
|
opts.HTTPMiddlewares = middlewares
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -9,6 +9,7 @@ import (
|
|||||||
|
|
||||||
"forge.cadoles.com/arcad/edge/pkg/module/auth"
|
"forge.cadoles.com/arcad/edge/pkg/module/auth"
|
||||||
"forge.cadoles.com/arcad/edge/pkg/module/auth/http/passwd"
|
"forge.cadoles.com/arcad/edge/pkg/module/auth/http/passwd"
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/module/auth/jwt"
|
||||||
"github.com/go-chi/chi/v5"
|
"github.com/go-chi/chi/v5"
|
||||||
"github.com/lestrrat-go/jwx/v2/jwa"
|
"github.com/lestrrat-go/jwx/v2/jwa"
|
||||||
"github.com/lestrrat-go/jwx/v2/jwk"
|
"github.com/lestrrat-go/jwx/v2/jwk"
|
||||||
@ -112,7 +113,7 @@ func (h *LocalHandler) handleForm(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
account.Claims[auth.ClaimIssuer] = "local"
|
account.Claims[auth.ClaimIssuer] = "local"
|
||||||
|
|
||||||
token, err := generateSignedToken(h.algo, h.key, account.Claims)
|
token, err := jwt.GenerateSignedToken(h.algo, h.key, account.Claims)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(ctx, "could not generate signed token", logger.E(errors.WithStack(err)))
|
logger.Error(ctx, "could not generate signed token", logger.E(errors.WithStack(err)))
|
||||||
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
|
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
|
||||||
|
@ -30,7 +30,7 @@ func WithJWT(getKeySet GetKeySetFunc) OptionFunc {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func FindToken(r *http.Request, getKeySet GetKeySetFunc) (jwt.Token, error) {
|
func FindRawToken(r *http.Request) (string, error) {
|
||||||
authorization := r.Header.Get("Authorization")
|
authorization := r.Header.Get("Authorization")
|
||||||
|
|
||||||
// Retrieve token from Authorization header
|
// Retrieve token from Authorization header
|
||||||
@ -44,7 +44,7 @@ func FindToken(r *http.Request, getKeySet GetKeySetFunc) (jwt.Token, error) {
|
|||||||
if rawToken == "" {
|
if rawToken == "" {
|
||||||
cookie, err := r.Cookie(CookieName)
|
cookie, err := r.Cookie(CookieName)
|
||||||
if err != nil && !errors.Is(err, http.ErrNoCookie) {
|
if err != nil && !errors.Is(err, http.ErrNoCookie) {
|
||||||
return nil, errors.WithStack(err)
|
return "", errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if cookie != nil {
|
if cookie != nil {
|
||||||
@ -53,7 +53,16 @@ func FindToken(r *http.Request, getKeySet GetKeySetFunc) (jwt.Token, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if rawToken == "" {
|
if rawToken == "" {
|
||||||
return nil, errors.WithStack(ErrUnauthenticated)
|
return "", errors.WithStack(ErrUnauthenticated)
|
||||||
|
}
|
||||||
|
|
||||||
|
return rawToken, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func FindToken(r *http.Request, getKeySet GetKeySetFunc) (jwt.Token, error) {
|
||||||
|
rawToken, err := FindRawToken(r)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
keySet, err := getKeySet()
|
keySet, err := getKeySet()
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
package http
|
package jwt
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"time"
|
"time"
|
||||||
@ -9,7 +9,7 @@ import (
|
|||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
func generateSignedToken(algo jwa.KeyAlgorithm, key jwk.Key, claims map[string]any) ([]byte, error) {
|
func GenerateSignedToken(algo jwa.KeyAlgorithm, key jwk.Key, claims map[string]any) ([]byte, error) {
|
||||||
token := jwt.New()
|
token := jwt.New()
|
||||||
|
|
||||||
if err := token.Set(jwt.NotBeforeKey, time.Now()); err != nil {
|
if err := token.Set(jwt.NotBeforeKey, time.Now()); err != nil {
|
113
pkg/module/auth/middleware/anonymous_user.go
Normal file
113
pkg/module/auth/middleware/anonymous_user.go
Normal file
@ -0,0 +1,113 @@
|
|||||||
|
package middleware
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/rand"
|
||||||
|
"fmt"
|
||||||
|
"math/big"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/module/auth"
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/module/auth/jwt"
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/lestrrat-go/jwx/v2/jwa"
|
||||||
|
"github.com/lestrrat-go/jwx/v2/jwk"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"gitlab.com/wpetit/goweb/logger"
|
||||||
|
)
|
||||||
|
|
||||||
|
const AnonIssuer = "anon"
|
||||||
|
|
||||||
|
func AnonymousUser(algo jwa.KeyAlgorithm, key jwk.Key, funcs ...AnonymousUserOptionFunc) func(next http.Handler) http.Handler {
|
||||||
|
opts := defaultAnonymousUserOptions()
|
||||||
|
for _, fn := range funcs {
|
||||||
|
fn(opts)
|
||||||
|
}
|
||||||
|
|
||||||
|
return func(next http.Handler) http.Handler {
|
||||||
|
handler := func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
rawToken, err := auth.FindRawToken(r)
|
||||||
|
|
||||||
|
// If request already has a raw token, we do nothing
|
||||||
|
if rawToken != "" && err == nil {
|
||||||
|
next.ServeHTTP(w, r)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := r.Context()
|
||||||
|
|
||||||
|
uuid, err := uuid.NewUUID()
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(ctx, "could not generate uuid for anonymous user", logger.E(errors.WithStack(err)))
|
||||||
|
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
subject := fmt.Sprintf("%s-%s", AnonIssuer, uuid.String())
|
||||||
|
preferredUsername, err := generateRandomPreferredUsername(8)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(ctx, "could not generate preferred username for anonymous user", logger.E(errors.WithStack(err)))
|
||||||
|
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
claims := map[string]any{
|
||||||
|
auth.ClaimSubject: subject,
|
||||||
|
auth.ClaimIssuer: AnonIssuer,
|
||||||
|
auth.ClaimPreferredUsername: preferredUsername,
|
||||||
|
auth.ClaimEdgeRole: opts.Role,
|
||||||
|
auth.ClaimEdgeEntrypoint: opts.Entrypoint,
|
||||||
|
auth.ClaimEdgeTenant: opts.Tenant,
|
||||||
|
}
|
||||||
|
|
||||||
|
token, err := jwt.GenerateSignedToken(algo, key, claims)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(ctx, "could not generate signed token", logger.E(errors.WithStack(err)))
|
||||||
|
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
cookieDomain, err := opts.GetCookieDomain(r)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(ctx, "could not retrieve cookie domain", logger.E(errors.WithStack(err)))
|
||||||
|
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
cookie := http.Cookie{
|
||||||
|
Name: auth.CookieName,
|
||||||
|
Value: string(token),
|
||||||
|
Domain: cookieDomain,
|
||||||
|
HttpOnly: false,
|
||||||
|
Expires: time.Now().Add(opts.CookieDuration),
|
||||||
|
Path: "/",
|
||||||
|
}
|
||||||
|
|
||||||
|
http.SetCookie(w, &cookie)
|
||||||
|
|
||||||
|
next.ServeHTTP(w, r)
|
||||||
|
}
|
||||||
|
|
||||||
|
return http.HandlerFunc(handler)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func generateRandomPreferredUsername(size int) (string, error) {
|
||||||
|
var letters = []rune("0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ")
|
||||||
|
max := big.NewInt(int64(len(letters)))
|
||||||
|
|
||||||
|
b := make([]rune, size)
|
||||||
|
for i := range b {
|
||||||
|
idx, err := rand.Int(rand.Reader, max)
|
||||||
|
if err != nil {
|
||||||
|
return "", errors.WithStack(err)
|
||||||
|
}
|
||||||
|
b[i] = letters[idx.Int64()]
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Sprintf("Anon %s", string(b)), nil
|
||||||
|
}
|
57
pkg/module/auth/middleware/options.go
Normal file
57
pkg/module/auth/middleware/options.go
Normal file
@ -0,0 +1,57 @@
|
|||||||
|
package middleware
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type GetCookieDomainFunc func(r *http.Request) (string, error)
|
||||||
|
|
||||||
|
func defaultGetCookieDomain(r *http.Request) (string, error) {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type AnonymousUserOptions struct {
|
||||||
|
GetCookieDomain GetCookieDomainFunc
|
||||||
|
CookieDuration time.Duration
|
||||||
|
Tenant string
|
||||||
|
Entrypoint string
|
||||||
|
Role string
|
||||||
|
}
|
||||||
|
|
||||||
|
type AnonymousUserOptionFunc func(*AnonymousUserOptions)
|
||||||
|
|
||||||
|
func defaultAnonymousUserOptions() *AnonymousUserOptions {
|
||||||
|
return &AnonymousUserOptions{
|
||||||
|
GetCookieDomain: defaultGetCookieDomain,
|
||||||
|
CookieDuration: 24 * time.Hour,
|
||||||
|
Tenant: "",
|
||||||
|
Entrypoint: "",
|
||||||
|
Role: "",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithCookieOptions(getCookieDomain GetCookieDomainFunc, duration time.Duration) AnonymousUserOptionFunc {
|
||||||
|
return func(opts *AnonymousUserOptions) {
|
||||||
|
opts.GetCookieDomain = getCookieDomain
|
||||||
|
opts.CookieDuration = duration
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithTenant(tenant string) AnonymousUserOptionFunc {
|
||||||
|
return func(opts *AnonymousUserOptions) {
|
||||||
|
opts.Tenant = tenant
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithEntrypoint(entrypoint string) AnonymousUserOptionFunc {
|
||||||
|
return func(opts *AnonymousUserOptions) {
|
||||||
|
opts.Entrypoint = entrypoint
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithRole(role string) AnonymousUserOptionFunc {
|
||||||
|
return func(opts *AnonymousUserOptions) {
|
||||||
|
opts.Role = role
|
||||||
|
}
|
||||||
|
}
|
2
pkg/sdk/client/dist/client.js
vendored
2
pkg/sdk/client/dist/client.js
vendored
@ -92,7 +92,7 @@ var Edge=(()=>{var K3=Object.create;var Mi=Object.defineProperty,Y3=Object.defin
|
|||||||
</edge-menu-item>
|
</edge-menu-item>
|
||||||
`}_canAccess(t){var a,o;let i=((a=this._profile)==null?void 0:a.edge_role)||"visitor",n=((o=t.metadata)==null?void 0:o.minimumRole)||"visitor";return sb[i]>=sb[n]}_renderProfile(){let t=this._profile;return re`
|
`}_canAccess(t){var a,o;let i=((a=this._profile)==null?void 0:a.edge_role)||"visitor",n=((o=t.metadata)==null?void 0:o.minimumRole)||"visitor";return sb[i]>=sb[n]}_renderProfile(){let t=this._profile;return re`
|
||||||
<edge-menu-item name='profile' label="${(t==null?void 0:t.preferred_username)||"Profile"}" icon-url='${Zm}'>
|
<edge-menu-item name='profile' label="${(t==null?void 0:t.preferred_username)||"Profile"}" icon-url='${Zm}'>
|
||||||
${t?re`<edge-menu-sub-item name='login' label='Logout' icon-url='${nb}' link-url='/edge/auth/logout'></edge-menu-sub-item>`:re`<edge-menu-sub-item name='login' label='Login' icon-url='${tb}' link-url='/edge/auth/login'></edge-menu-sub-item>`}
|
${t&&t.iss!="anon"?re`<edge-menu-sub-item name='login' label='Logout' icon-url='${nb}' link-url='/edge/auth/logout'></edge-menu-sub-item>`:re`<edge-menu-sub-item name='login' label='Login' icon-url='${tb}' link-url='/edge/auth/login'></edge-menu-sub-item>`}
|
||||||
</edge-menu-item>
|
</edge-menu-item>
|
||||||
`}_handleMenuItemSelected(t){let i=t.detail.element;i.classList.add("selected"),i.classList.remove("unselected");for(let n,a=0;n=this._menuItems[a];a++)n!==i&&(n.unselect(),n.classList.add("unselected"))}_handleMenuItemUnselected(t){if(t.detail.element.classList.remove("selected"),this.renderRoot.querySelectorAll("edge-menu-item.selected").length===0)for(let a,o=0;a=this._menuItems[o];o++)a.classList.remove("unselected")}};le.styles=Ti`
|
`}_handleMenuItemSelected(t){let i=t.detail.element;i.classList.add("selected"),i.classList.remove("unselected");for(let n,a=0;n=this._menuItems[a];a++)n!==i&&(n.unselect(),n.classList.add("unselected"))}_handleMenuItemUnselected(t){if(t.detail.element.classList.remove("selected"),this.renderRoot.querySelectorAll("edge-menu-item.selected").length===0)for(let a,o=0;a=this._menuItems[o];o++)a.classList.remove("unselected")}};le.styles=Ti`
|
||||||
:host {
|
:host {
|
||||||
|
4
pkg/sdk/client/dist/client.js.map
vendored
4
pkg/sdk/client/dist/client.js.map
vendored
File diff suppressed because one or more lines are too long
@ -191,7 +191,7 @@ export class Menu extends LitElement {
|
|||||||
return html`
|
return html`
|
||||||
<edge-menu-item name='profile' label="${profile?.preferred_username || 'Profile'}" icon-url='${UserCircleIcon}'>
|
<edge-menu-item name='profile' label="${profile?.preferred_username || 'Profile'}" icon-url='${UserCircleIcon}'>
|
||||||
${
|
${
|
||||||
profile ?
|
profile && profile.iss != "anon" ?
|
||||||
html`<edge-menu-sub-item name='login' label='Logout' icon-url='${LogoutIcon}' link-url='/edge/auth/logout'></edge-menu-sub-item>` :
|
html`<edge-menu-sub-item name='login' label='Logout' icon-url='${LogoutIcon}' link-url='/edge/auth/logout'></edge-menu-sub-item>` :
|
||||||
html`<edge-menu-sub-item name='login' label='Login' icon-url='${LoginIcon}' link-url='/edge/auth/login'></edge-menu-sub-item>`
|
html`<edge-menu-sub-item name='login' label='Login' icon-url='${LoginIcon}' link-url='/edge/auth/login'></edge-menu-sub-item>`
|
||||||
}
|
}
|
||||||
|
241
pkg/storage/rpc/client/blob_bucket.go
Normal file
241
pkg/storage/rpc/client/blob_bucket.go
Normal file
@ -0,0 +1,241 @@
|
|||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/keegancsmith/rpc"
|
||||||
|
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/server/blob"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type BlobBucket struct {
|
||||||
|
name string
|
||||||
|
id blob.BucketID
|
||||||
|
client *rpc.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
// Size implements storage.BlobBucket
|
||||||
|
func (b *BlobBucket) Size(ctx context.Context) (int64, error) {
|
||||||
|
args := blob.GetBucketSizeArgs{
|
||||||
|
BucketID: b.id,
|
||||||
|
}
|
||||||
|
|
||||||
|
reply := blob.GetBucketSizeReply{}
|
||||||
|
|
||||||
|
if err := b.client.Call(ctx, "Service.GetBucketSize", args, &reply); err != nil {
|
||||||
|
return 0, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return reply.Size, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Name implements storage.BlobBucket
|
||||||
|
func (b *BlobBucket) Name() string {
|
||||||
|
return b.name
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close implements storage.BlobBucket
|
||||||
|
func (b *BlobBucket) Close() error {
|
||||||
|
args := blob.CloseBucketArgs{
|
||||||
|
BucketID: b.id,
|
||||||
|
}
|
||||||
|
|
||||||
|
reply := blob.CloseBucketReply{}
|
||||||
|
|
||||||
|
if err := b.client.Call(context.Background(), "Service.CloseBucket", args, &reply); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete implements storage.BlobBucket
|
||||||
|
func (b *BlobBucket) Delete(ctx context.Context, id storage.BlobID) error {
|
||||||
|
args := blob.DeleteBucketArgs{
|
||||||
|
BucketName: b.name,
|
||||||
|
}
|
||||||
|
|
||||||
|
reply := blob.DeleteBucketReply{}
|
||||||
|
|
||||||
|
if err := b.client.Call(context.Background(), "Service.DeleteBucket", args, &reply); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get implements storage.BlobBucket
|
||||||
|
func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobInfo, error) {
|
||||||
|
args := blob.GetBlobInfoArgs{
|
||||||
|
Name: b.name,
|
||||||
|
BlobID: id,
|
||||||
|
}
|
||||||
|
|
||||||
|
reply := blob.GetBlobInfoReply{}
|
||||||
|
|
||||||
|
if err := b.client.Call(context.Background(), "Service.GetBlobInfo", args, &reply); err != nil {
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return reply.BlobInfo, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// List implements storage.BlobBucket
|
||||||
|
func (b *BlobBucket) List(ctx context.Context) ([]storage.BlobInfo, error) {
|
||||||
|
args := blob.ListBlobInfoArgs{
|
||||||
|
BucketID: b.id,
|
||||||
|
}
|
||||||
|
|
||||||
|
reply := blob.ListBlobInfoReply{}
|
||||||
|
|
||||||
|
if err := b.client.Call(context.Background(), "Service.ListBlobInfo", args, &reply); err != nil {
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return reply.BlobInfos, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewReader implements storage.BlobBucket
|
||||||
|
func (b *BlobBucket) NewReader(ctx context.Context, id storage.BlobID) (io.ReadSeekCloser, error) {
|
||||||
|
args := blob.NewBlobReaderArgs{
|
||||||
|
BucketID: b.id,
|
||||||
|
BlobID: id,
|
||||||
|
}
|
||||||
|
|
||||||
|
reply := blob.NewBlobReaderReply{}
|
||||||
|
|
||||||
|
if err := b.client.Call(context.Background(), "Service.NewBlobReader", args, &reply); err != nil {
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &blobReaderCloser{
|
||||||
|
readerID: reply.ReaderID,
|
||||||
|
client: b.client,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewWriter implements storage.BlobBucket
|
||||||
|
func (b *BlobBucket) NewWriter(ctx context.Context, id storage.BlobID) (io.WriteCloser, error) {
|
||||||
|
args := blob.NewBlobWriterArgs{
|
||||||
|
BucketID: b.id,
|
||||||
|
BlobID: id,
|
||||||
|
}
|
||||||
|
|
||||||
|
reply := blob.NewBlobWriterReply{}
|
||||||
|
|
||||||
|
if err := b.client.Call(context.Background(), "Service.NewBlobWriter", args, &reply); err != nil {
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &blobWriterCloser{
|
||||||
|
blobID: id,
|
||||||
|
writerID: reply.WriterID,
|
||||||
|
client: b.client,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type blobWriterCloser struct {
|
||||||
|
blobID storage.BlobID
|
||||||
|
writerID blob.WriterID
|
||||||
|
client *rpc.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write implements io.WriteCloser
|
||||||
|
func (bwc *blobWriterCloser) Write(data []byte) (int, error) {
|
||||||
|
args := blob.WriteBlobArgs{
|
||||||
|
WriterID: bwc.writerID,
|
||||||
|
Data: data,
|
||||||
|
}
|
||||||
|
|
||||||
|
reply := blob.WriteBlobReply{}
|
||||||
|
|
||||||
|
if err := bwc.client.Call(context.Background(), "Service.WriteBlob", args, &reply); err != nil {
|
||||||
|
return 0, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return reply.Written, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close implements io.WriteCloser
|
||||||
|
func (bwc *blobWriterCloser) Close() error {
|
||||||
|
args := blob.CloseWriterArgs{
|
||||||
|
WriterID: bwc.writerID,
|
||||||
|
}
|
||||||
|
|
||||||
|
reply := blob.CloseBucketReply{}
|
||||||
|
|
||||||
|
if err := bwc.client.Call(context.Background(), "Service.CloseWriter", args, &reply); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type blobReaderCloser struct {
|
||||||
|
readerID blob.ReaderID
|
||||||
|
client *rpc.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read implements io.ReadSeekCloser
|
||||||
|
func (brc *blobReaderCloser) Read(p []byte) (int, error) {
|
||||||
|
args := blob.ReadBlobArgs{
|
||||||
|
ReaderID: brc.readerID,
|
||||||
|
Length: len(p),
|
||||||
|
}
|
||||||
|
|
||||||
|
reply := blob.ReadBlobReply{}
|
||||||
|
|
||||||
|
if err := brc.client.Call(context.Background(), "Service.ReadBlob", args, &reply); err != nil {
|
||||||
|
return 0, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
copy(p, reply.Data)
|
||||||
|
|
||||||
|
if reply.EOF {
|
||||||
|
return reply.Read, io.EOF
|
||||||
|
}
|
||||||
|
|
||||||
|
return reply.Read, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Seek implements io.ReadSeekCloser
|
||||||
|
func (brc *blobReaderCloser) Seek(offset int64, whence int) (int64, error) {
|
||||||
|
args := blob.SeekBlobArgs{
|
||||||
|
ReaderID: brc.readerID,
|
||||||
|
Offset: offset,
|
||||||
|
Whence: whence,
|
||||||
|
}
|
||||||
|
|
||||||
|
reply := blob.SeekBlobReply{}
|
||||||
|
|
||||||
|
if err := brc.client.Call(context.Background(), "Service.SeekBlob", args, &reply); err != nil {
|
||||||
|
return 0, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return reply.Read, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close implements io.ReadSeekCloser
|
||||||
|
func (brc *blobReaderCloser) Close() error {
|
||||||
|
args := blob.CloseReaderArgs{
|
||||||
|
ReaderID: brc.readerID,
|
||||||
|
}
|
||||||
|
|
||||||
|
reply := blob.CloseReaderReply{}
|
||||||
|
|
||||||
|
if err := brc.client.Call(context.Background(), "Service.CloseReader", args, &reply); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
_ storage.BlobBucket = &BlobBucket{}
|
||||||
|
_ storage.BlobInfo = &BlobInfo{}
|
||||||
|
_ io.WriteCloser = &blobWriterCloser{}
|
||||||
|
_ io.ReadSeekCloser = &blobReaderCloser{}
|
||||||
|
)
|
40
pkg/storage/rpc/client/blob_info.go
Normal file
40
pkg/storage/rpc/client/blob_info.go
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
|
)
|
||||||
|
|
||||||
|
type BlobInfo struct {
|
||||||
|
id storage.BlobID
|
||||||
|
bucket string
|
||||||
|
contentType string
|
||||||
|
modTime time.Time
|
||||||
|
size int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// Bucket implements storage.BlobInfo
|
||||||
|
func (i *BlobInfo) Bucket() string {
|
||||||
|
return i.bucket
|
||||||
|
}
|
||||||
|
|
||||||
|
// ID implements storage.BlobInfo
|
||||||
|
func (i *BlobInfo) ID() storage.BlobID {
|
||||||
|
return i.id
|
||||||
|
}
|
||||||
|
|
||||||
|
// ContentType implements storage.BlobInfo
|
||||||
|
func (i *BlobInfo) ContentType() string {
|
||||||
|
return i.contentType
|
||||||
|
}
|
||||||
|
|
||||||
|
// ModTime implements storage.BlobInfo
|
||||||
|
func (i *BlobInfo) ModTime() time.Time {
|
||||||
|
return i.modTime
|
||||||
|
}
|
||||||
|
|
||||||
|
// Size implements storage.BlobInfo
|
||||||
|
func (i *BlobInfo) Size() int64 {
|
||||||
|
return i.size
|
||||||
|
}
|
65
pkg/storage/rpc/client/blob_store.go
Normal file
65
pkg/storage/rpc/client/blob_store.go
Normal file
@ -0,0 +1,65 @@
|
|||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/keegancsmith/rpc"
|
||||||
|
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/server/blob"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type BlobStore struct {
|
||||||
|
client *rpc.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteBucket implements storage.BlobStore.
|
||||||
|
func (s *BlobStore) DeleteBucket(ctx context.Context, name string) error {
|
||||||
|
args := &blob.DeleteBucketArgs{
|
||||||
|
BucketName: name,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.client.Call(ctx, "Service.DeleteBucket", args, nil); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListBuckets implements storage.BlobStore.
|
||||||
|
func (s *BlobStore) ListBuckets(ctx context.Context) ([]string, error) {
|
||||||
|
args := &blob.ListBucketsArgs{}
|
||||||
|
|
||||||
|
reply := blob.ListBucketsReply{}
|
||||||
|
|
||||||
|
if err := s.client.Call(ctx, "Service.ListBuckets", args, &reply); err != nil {
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return reply.Buckets, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// OpenBucket implements storage.BlobStore.
|
||||||
|
func (s *BlobStore) OpenBucket(ctx context.Context, name string) (storage.BlobBucket, error) {
|
||||||
|
args := &blob.OpenBucketArgs{
|
||||||
|
BucketName: name,
|
||||||
|
}
|
||||||
|
reply := &blob.OpenBucketReply{}
|
||||||
|
|
||||||
|
if err := s.client.Call(ctx, "Service.OpenBucket", args, reply); err != nil {
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &BlobBucket{
|
||||||
|
name: name,
|
||||||
|
id: reply.BucketID,
|
||||||
|
client: s.client,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewBlobStore(client *rpc.Client) *BlobStore {
|
||||||
|
return &BlobStore{client}
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ storage.BlobStore = &BlobStore{}
|
99
pkg/storage/rpc/client/blob_store_test.go
Normal file
99
pkg/storage/rpc/client/blob_store_test.go
Normal file
@ -0,0 +1,99 @@
|
|||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net/http/httptest"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/keegancsmith/rpc"
|
||||||
|
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/server"
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage/sqlite"
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage/testsuite"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"gitlab.com/wpetit/goweb/logger"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestBlobStore(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
logger.SetLevel(logger.LevelDebug)
|
||||||
|
|
||||||
|
httpServer, err := startNewServer()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("%+v", errors.WithStack(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
defer httpServer.Close()
|
||||||
|
|
||||||
|
serverAddr := httpServer.Listener.Addr()
|
||||||
|
|
||||||
|
client, err := rpc.DialHTTPPath(
|
||||||
|
serverAddr.Network(),
|
||||||
|
serverAddr.String(),
|
||||||
|
"",
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("%+v", errors.WithStack(err))
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
store := NewBlobStore(client)
|
||||||
|
|
||||||
|
testsuite.TestBlobStore(t, store)
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkBlobStore(t *testing.B) {
|
||||||
|
logger.SetLevel(logger.LevelError)
|
||||||
|
|
||||||
|
httpServer, err := startNewServer()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("%+v", errors.WithStack(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
defer httpServer.Close()
|
||||||
|
|
||||||
|
serverAddr := httpServer.Listener.Addr()
|
||||||
|
|
||||||
|
client, err := rpc.DialHTTPPath(
|
||||||
|
serverAddr.Network(),
|
||||||
|
serverAddr.String(),
|
||||||
|
"",
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("%+v", errors.WithStack(err))
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
store := NewBlobStore(client)
|
||||||
|
|
||||||
|
testsuite.BenchmarkBlobStore(t, store)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func getSQLiteBlobstore() (*sqlite.BlobStore, error) {
|
||||||
|
file := "./testdata/blobstore_test.sqlite"
|
||||||
|
|
||||||
|
if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) {
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
|
||||||
|
store := sqlite.NewBlobStore(dsn)
|
||||||
|
|
||||||
|
return store, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func startNewServer() (*httptest.Server, error) {
|
||||||
|
store, err := getSQLiteBlobstore()
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
server := server.NewBlobStoreServer(store)
|
||||||
|
|
||||||
|
httpServer := httptest.NewServer(server)
|
||||||
|
|
||||||
|
return httpServer, nil
|
||||||
|
}
|
40
pkg/storage/rpc/client/document_store.go
Normal file
40
pkg/storage/rpc/client/document_store.go
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/keegancsmith/rpc"
|
||||||
|
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage/filter"
|
||||||
|
)
|
||||||
|
|
||||||
|
type DocumentStore struct {
|
||||||
|
client *rpc.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete implements storage.DocumentStore.
|
||||||
|
func (*DocumentStore) Delete(ctx context.Context, collection string, id storage.DocumentID) error {
|
||||||
|
panic("unimplemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get implements storage.DocumentStore.
|
||||||
|
func (*DocumentStore) Get(ctx context.Context, collection string, id storage.DocumentID) (storage.Document, error) {
|
||||||
|
panic("unimplemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Query implements storage.DocumentStore.
|
||||||
|
func (*DocumentStore) Query(ctx context.Context, collection string, filter *filter.Filter, funcs ...storage.QueryOptionFunc) ([]storage.Document, error) {
|
||||||
|
panic("unimplemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Upsert implements storage.DocumentStore.
|
||||||
|
func (*DocumentStore) Upsert(ctx context.Context, collection string, document storage.Document) (storage.Document, error) {
|
||||||
|
panic("unimplemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewDocumentStore(client *rpc.Client) *DocumentStore {
|
||||||
|
return &DocumentStore{client}
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ storage.DocumentStore = &DocumentStore{}
|
1
pkg/storage/rpc/client/testdata/.gitignore
vendored
Normal file
1
pkg/storage/rpc/client/testdata/.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
|||||||
|
/*.sqlite*
|
31
pkg/storage/rpc/server/blob/close_bucket.go
Normal file
31
pkg/storage/rpc/server/blob/close_bucket.go
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
package blob
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type CloseBucketArgs struct {
|
||||||
|
BucketID BucketID
|
||||||
|
}
|
||||||
|
|
||||||
|
type CloseBucketReply struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) CloseBucket(ctx context.Context, args *CloseBucketArgs, reply *CloseBucketReply) error {
|
||||||
|
bucket, err := s.getOpenedBucket(args.BucketID)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := bucket.Close(); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.buckets.Delete(args.BucketID)
|
||||||
|
|
||||||
|
*reply = CloseBucketReply{}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
31
pkg/storage/rpc/server/blob/close_reader.go
Normal file
31
pkg/storage/rpc/server/blob/close_reader.go
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
package blob
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type CloseReaderArgs struct {
|
||||||
|
ReaderID ReaderID
|
||||||
|
}
|
||||||
|
|
||||||
|
type CloseReaderReply struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) CloseReader(ctx context.Context, args *CloseReaderArgs, reply *CloseReaderReply) error {
|
||||||
|
reader, err := s.getOpenedReader(args.ReaderID)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := reader.Close(); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.readers.Delete(args.ReaderID)
|
||||||
|
|
||||||
|
*reply = CloseReaderReply{}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
31
pkg/storage/rpc/server/blob/close_writer.go
Normal file
31
pkg/storage/rpc/server/blob/close_writer.go
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
package blob
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type CloseWriterArgs struct {
|
||||||
|
WriterID WriterID
|
||||||
|
}
|
||||||
|
|
||||||
|
type CloseWriterReply struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) CloseWriter(ctx context.Context, args *CloseWriterArgs, reply *CloseWriterReply) error {
|
||||||
|
writer, err := s.getOpenedWriter(args.WriterID)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := writer.Close(); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.writers.Delete(args.WriterID)
|
||||||
|
|
||||||
|
*reply = CloseWriterReply{}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
22
pkg/storage/rpc/server/blob/delete_bucket.go
Normal file
22
pkg/storage/rpc/server/blob/delete_bucket.go
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
package blob
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type DeleteBucketArgs struct {
|
||||||
|
BucketName string
|
||||||
|
}
|
||||||
|
|
||||||
|
type DeleteBucketReply struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) DeleteBucket(ctx context.Context, args *DeleteBucketArgs, reply *DeleteBucketReply) error {
|
||||||
|
if err := s.store.DeleteBucket(ctx, args.BucketName); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
36
pkg/storage/rpc/server/blob/get_blob_info.go
Normal file
36
pkg/storage/rpc/server/blob/get_blob_info.go
Normal file
@ -0,0 +1,36 @@
|
|||||||
|
package blob
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type GetBlobInfoArgs struct {
|
||||||
|
Name string
|
||||||
|
BlobID storage.BlobID
|
||||||
|
BucketID BucketID
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetBlobInfoReply struct {
|
||||||
|
BlobInfo storage.BlobInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) GetBlobInfo(ctx context.Context, args *GetBlobInfoArgs, reply *GetBlobInfoReply) error {
|
||||||
|
bucket, err := s.getOpenedBucket(args.BucketID)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
blobInfo, err := bucket.Get(ctx, args.BlobID)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
*reply = GetBlobInfoReply{
|
||||||
|
BlobInfo: blobInfo,
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
33
pkg/storage/rpc/server/blob/get_bucket_size.go
Normal file
33
pkg/storage/rpc/server/blob/get_bucket_size.go
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
package blob
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type GetBucketSizeArgs struct {
|
||||||
|
BucketID BucketID
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetBucketSizeReply struct {
|
||||||
|
Size int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) GetBucketSize(ctx context.Context, args *GetBucketSizeArgs, reply *GetBucketSizeReply) error {
|
||||||
|
bucket, err := s.getOpenedBucket(args.BucketID)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
size, err := bucket.Size(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
*reply = GetBucketSizeReply{
|
||||||
|
Size: size,
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
34
pkg/storage/rpc/server/blob/list_blob_info.go
Normal file
34
pkg/storage/rpc/server/blob/list_blob_info.go
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
package blob
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ListBlobInfoArgs struct {
|
||||||
|
BucketID BucketID
|
||||||
|
}
|
||||||
|
|
||||||
|
type ListBlobInfoReply struct {
|
||||||
|
BlobInfos []storage.BlobInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) ListBlobInfo(ctx context.Context, args *ListBlobInfoArgs, reply *ListBlobInfoReply) error {
|
||||||
|
bucket, err := s.getOpenedBucket(args.BucketID)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
blobInfos, err := bucket.List(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
*reply = ListBlobInfoReply{
|
||||||
|
BlobInfos: blobInfos,
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
27
pkg/storage/rpc/server/blob/list_buckets.go
Normal file
27
pkg/storage/rpc/server/blob/list_buckets.go
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
package blob
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ListBucketsArgs struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
type ListBucketsReply struct {
|
||||||
|
Buckets []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) ListBuckets(ctx context.Context, args *ListBucketsArgs, reply *ListBucketsReply) error {
|
||||||
|
buckets, err := s.store.ListBuckets(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
*reply = ListBucketsReply{
|
||||||
|
Buckets: buckets,
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
57
pkg/storage/rpc/server/blob/new_blob_reader.go
Normal file
57
pkg/storage/rpc/server/blob/new_blob_reader.go
Normal file
@ -0,0 +1,57 @@
|
|||||||
|
package blob
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type NewBlobReaderArgs struct {
|
||||||
|
BlobID storage.BlobID
|
||||||
|
BucketID BucketID
|
||||||
|
}
|
||||||
|
|
||||||
|
type NewBlobReaderReply struct {
|
||||||
|
ReaderID ReaderID
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) NewBlobReader(ctx context.Context, args *NewBlobReaderArgs, reply *NewBlobReaderReply) error {
|
||||||
|
bucket, err := s.getOpenedBucket(args.BucketID)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
readerID, err := NewReaderID()
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
reader, err := bucket.NewReader(ctx, args.BlobID)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.readers.Store(readerID, reader)
|
||||||
|
|
||||||
|
*reply = NewBlobReaderReply{
|
||||||
|
ReaderID: readerID,
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) getOpenedReader(id ReaderID) (io.ReadSeekCloser, error) {
|
||||||
|
raw, exists := s.readers.Load(id)
|
||||||
|
if !exists {
|
||||||
|
return nil, errors.Errorf("could not find writer '%s'", id)
|
||||||
|
}
|
||||||
|
|
||||||
|
reader, ok := raw.(io.ReadSeekCloser)
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.Errorf("unexpected type '%T' for writer", raw)
|
||||||
|
}
|
||||||
|
|
||||||
|
return reader, nil
|
||||||
|
}
|
57
pkg/storage/rpc/server/blob/new_blob_writer.go
Normal file
57
pkg/storage/rpc/server/blob/new_blob_writer.go
Normal file
@ -0,0 +1,57 @@
|
|||||||
|
package blob
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type NewBlobWriterArgs struct {
|
||||||
|
BlobID storage.BlobID
|
||||||
|
BucketID BucketID
|
||||||
|
}
|
||||||
|
|
||||||
|
type NewBlobWriterReply struct {
|
||||||
|
WriterID WriterID
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) NewBlobWriter(ctx context.Context, args *NewBlobWriterArgs, reply *NewBlobWriterReply) error {
|
||||||
|
bucket, err := s.getOpenedBucket(args.BucketID)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
writerID, err := NewWriterID()
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
writer, err := bucket.NewWriter(ctx, args.BlobID)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.writers.Store(writerID, writer)
|
||||||
|
|
||||||
|
*reply = NewBlobWriterReply{
|
||||||
|
WriterID: writerID,
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) getOpenedWriter(id WriterID) (io.WriteCloser, error) {
|
||||||
|
raw, exists := s.writers.Load(id)
|
||||||
|
if !exists {
|
||||||
|
return nil, errors.Errorf("could not find writer '%s'", id)
|
||||||
|
}
|
||||||
|
|
||||||
|
writer, ok := raw.(io.WriteCloser)
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.Errorf("unexpected type '%T' for writer", raw)
|
||||||
|
}
|
||||||
|
|
||||||
|
return writer, nil
|
||||||
|
}
|
50
pkg/storage/rpc/server/blob/open_bucket.go
Normal file
50
pkg/storage/rpc/server/blob/open_bucket.go
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
package blob
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type OpenBucketArgs struct {
|
||||||
|
BucketName string
|
||||||
|
}
|
||||||
|
|
||||||
|
type OpenBucketReply struct {
|
||||||
|
BucketID BucketID
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) OpenBucket(ctx context.Context, args *OpenBucketArgs, reply *OpenBucketReply) error {
|
||||||
|
bucket, err := s.store.OpenBucket(ctx, args.BucketName)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
bucketID, err := NewBucketID()
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.buckets.Store(bucketID, bucket)
|
||||||
|
|
||||||
|
*reply = OpenBucketReply{
|
||||||
|
BucketID: bucketID,
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) getOpenedBucket(id BucketID) (storage.BlobBucket, error) {
|
||||||
|
raw, exists := s.buckets.Load(id)
|
||||||
|
if !exists {
|
||||||
|
return nil, errors.WithStack(storage.ErrBucketClosed)
|
||||||
|
}
|
||||||
|
|
||||||
|
bucket, ok := raw.(storage.BlobBucket)
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.Errorf("unexpected type '%T' for blob bucket", raw)
|
||||||
|
}
|
||||||
|
|
||||||
|
return bucket, nil
|
||||||
|
}
|
41
pkg/storage/rpc/server/blob/read_blob.go
Normal file
41
pkg/storage/rpc/server/blob/read_blob.go
Normal file
@ -0,0 +1,41 @@
|
|||||||
|
package blob
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ReadBlobArgs struct {
|
||||||
|
ReaderID ReaderID
|
||||||
|
Length int
|
||||||
|
}
|
||||||
|
|
||||||
|
type ReadBlobReply struct {
|
||||||
|
Data []byte
|
||||||
|
Read int
|
||||||
|
EOF bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) ReadBlob(ctx context.Context, args *ReadBlobArgs, reply *ReadBlobReply) error {
|
||||||
|
reader, err := s.getOpenedReader(args.ReaderID)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
buff := make([]byte, args.Length)
|
||||||
|
|
||||||
|
read, err := reader.Read(buff)
|
||||||
|
if err != nil && !errors.Is(err, io.EOF) {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
*reply = ReadBlobReply{
|
||||||
|
Read: read,
|
||||||
|
Data: buff,
|
||||||
|
EOF: errors.Is(err, io.EOF),
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
38
pkg/storage/rpc/server/blob/seek_blob.go
Normal file
38
pkg/storage/rpc/server/blob/seek_blob.go
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
package blob
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SeekBlobArgs struct {
|
||||||
|
ReaderID ReaderID
|
||||||
|
Offset int64
|
||||||
|
Whence int
|
||||||
|
}
|
||||||
|
|
||||||
|
type SeekBlobReply struct {
|
||||||
|
Read int64
|
||||||
|
EOF bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) SeekBlob(ctx context.Context, args *SeekBlobArgs, reply *SeekBlobReply) error {
|
||||||
|
reader, err := s.getOpenedReader(args.ReaderID)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
read, err := reader.Seek(args.Offset, args.Whence)
|
||||||
|
if err != nil && !errors.Is(err, io.EOF) {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
*reply = SeekBlobReply{
|
||||||
|
Read: read,
|
||||||
|
EOF: errors.Is(err, io.EOF),
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
60
pkg/storage/rpc/server/blob/service.go
Normal file
60
pkg/storage/rpc/server/blob/service.go
Normal file
@ -0,0 +1,60 @@
|
|||||||
|
package blob
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type BucketID string
|
||||||
|
type WriterID string
|
||||||
|
type ReaderID string
|
||||||
|
|
||||||
|
type Service struct {
|
||||||
|
store storage.BlobStore
|
||||||
|
buckets sync.Map
|
||||||
|
writers sync.Map
|
||||||
|
readers sync.Map
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewService(store storage.BlobStore) *Service {
|
||||||
|
return &Service{
|
||||||
|
store: store,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewBucketID() (BucketID, error) {
|
||||||
|
uuid, err := uuid.NewUUID()
|
||||||
|
if err != nil {
|
||||||
|
return "", errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
id := BucketID(fmt.Sprintf("bucket-%s", uuid.String()))
|
||||||
|
|
||||||
|
return id, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewWriterID() (WriterID, error) {
|
||||||
|
uuid, err := uuid.NewUUID()
|
||||||
|
if err != nil {
|
||||||
|
return "", errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
id := WriterID(fmt.Sprintf("writer-%s", uuid.String()))
|
||||||
|
|
||||||
|
return id, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewReaderID() (ReaderID, error) {
|
||||||
|
uuid, err := uuid.NewUUID()
|
||||||
|
if err != nil {
|
||||||
|
return "", errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
id := ReaderID(fmt.Sprintf("reader-%s", uuid.String()))
|
||||||
|
|
||||||
|
return id, nil
|
||||||
|
}
|
34
pkg/storage/rpc/server/blob/write_blob.go
Normal file
34
pkg/storage/rpc/server/blob/write_blob.go
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
package blob
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type WriteBlobArgs struct {
|
||||||
|
WriterID WriterID
|
||||||
|
Data []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
type WriteBlobReply struct {
|
||||||
|
Written int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) WriteBlob(ctx context.Context, args *WriteBlobArgs, reply *WriteBlobReply) error {
|
||||||
|
writer, err := s.getOpenedWriter(args.WriterID)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
written, err := writer.Write(args.Data)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
*reply = WriteBlobReply{
|
||||||
|
Written: written,
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
11
pkg/storage/rpc/server/document_store.go
Normal file
11
pkg/storage/rpc/server/document_store.go
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
package server
|
||||||
|
|
||||||
|
import "forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
|
|
||||||
|
type DocumentStore struct {
|
||||||
|
store storage.DocumentStore
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewDocumentStore(store storage.DocumentStore) *DocumentStore {
|
||||||
|
return &DocumentStore{store}
|
||||||
|
}
|
20
pkg/storage/rpc/server/server.go
Normal file
20
pkg/storage/rpc/server/server.go
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/keegancsmith/rpc"
|
||||||
|
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/server/blob"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewBlobStoreServer(store storage.BlobStore) *rpc.Server {
|
||||||
|
server := rpc.NewServer()
|
||||||
|
server.Register(blob.NewService(store))
|
||||||
|
return server
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewDocumentStoreServer(store storage.DocumentStore) *rpc.Server {
|
||||||
|
server := rpc.NewServer()
|
||||||
|
server.Register(NewDocumentStore(store))
|
||||||
|
return server
|
||||||
|
}
|
@ -26,3 +26,18 @@ func TestBlobStore(t *testing.T) {
|
|||||||
|
|
||||||
testsuite.TestBlobStore(t, store)
|
testsuite.TestBlobStore(t, store)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func BenchmarkBlobStore(t *testing.B) {
|
||||||
|
logger.SetLevel(logger.LevelError)
|
||||||
|
|
||||||
|
file := "./testdata/blobstore_test.sqlite"
|
||||||
|
|
||||||
|
if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) {
|
||||||
|
t.Fatalf("%+v", errors.WithStack(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
|
||||||
|
store := NewBlobStore(dsn)
|
||||||
|
|
||||||
|
testsuite.BenchmarkBlobStore(t, store)
|
||||||
|
}
|
||||||
|
@ -8,7 +8,6 @@ import (
|
|||||||
|
|
||||||
func TestBlobStore(t *testing.T, store storage.BlobStore) {
|
func TestBlobStore(t *testing.T, store storage.BlobStore) {
|
||||||
t.Run("Ops", func(t *testing.T) {
|
t.Run("Ops", func(t *testing.T) {
|
||||||
t.Parallel()
|
|
||||||
testBlobStoreOps(t, store)
|
testBlobStoreOps(t, store)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
79
pkg/storage/testsuite/blob_store_benchmark.go
Normal file
79
pkg/storage/testsuite/blob_store_benchmark.go
Normal file
@ -0,0 +1,79 @@
|
|||||||
|
package testsuite
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
func BenchmarkBlobStore(t *testing.B, store storage.BlobStore) {
|
||||||
|
t.Run("BlobCreateUpdateReadDelete", func(t *testing.B) {
|
||||||
|
|
||||||
|
for i := 0; i < t.N; i++ {
|
||||||
|
bucketName := fmt.Sprintf("bucket-%d", i)
|
||||||
|
if err := runBlobCreateUpdateReadDelete(store, bucketName); err != nil {
|
||||||
|
t.Fatalf("%+v", errors.WithStack(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func runBlobCreateUpdateReadDelete(store storage.BlobStore, bucketName string) error {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
bucket, err := store.OpenBucket(ctx, bucketName)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
blobID := storage.NewBlobID()
|
||||||
|
|
||||||
|
writer, err := bucket.NewWriter(ctx, blobID)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
data := []byte("foo")
|
||||||
|
|
||||||
|
if _, err = writer.Write(data); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := writer.Close(); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
reader, err := bucket.NewReader(ctx, blobID)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var buf bytes.Buffer
|
||||||
|
|
||||||
|
if _, err = io.Copy(&buf, reader); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := reader.Close(); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := bucket.Delete(ctx, blobID); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := bucket.Close(); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := store.DeleteBucket(ctx, bucketName); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
@ -17,9 +17,11 @@ type blobStoreTestCase struct {
|
|||||||
|
|
||||||
var blobStoreTestCases = []blobStoreTestCase{
|
var blobStoreTestCases = []blobStoreTestCase{
|
||||||
{
|
{
|
||||||
Name: "Open new bucket",
|
Name: "Open then delete bucket",
|
||||||
Run: func(ctx context.Context, store storage.BlobStore) error {
|
Run: func(ctx context.Context, store storage.BlobStore) error {
|
||||||
bucket, err := store.OpenBucket(ctx, "open-new-bucket")
|
bucketName := "open-new-bucket"
|
||||||
|
|
||||||
|
bucket, err := store.OpenBucket(ctx, bucketName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
@ -50,6 +52,23 @@ var blobStoreTestCases = []blobStoreTestCase{
|
|||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := store.DeleteBucket(ctx, bucketName); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
buckets, err := store.ListBuckets(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, b := range buckets {
|
||||||
|
if b != bucketName {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
return errors.Errorf("bucket '%s' should be deleted", bucketName)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -1,7 +1,8 @@
|
|||||||
{
|
{
|
||||||
"compilerOptions": {
|
"compilerOptions": {
|
||||||
"lib": ["ES2015", "DOM"],
|
"lib": ["ES2015", "DOM"],
|
||||||
"experimentalDecorators": true
|
"experimentalDecorators": true,
|
||||||
|
"esModuleInterop": true
|
||||||
},
|
},
|
||||||
"include": ["pkg/sdk/client/src/index.d.ts", "pkg/sdk/client/src/**/*.ts", "pkg/sdk/client/src/**/*.svg"]
|
"include": ["pkg/sdk/client/src/index.d.ts", "pkg/sdk/client/src/**/*.ts", "pkg/sdk/client/src/**/*.svg"]
|
||||||
}
|
}
|
Loading…
x
Reference in New Issue
Block a user