Compare commits
6 Commits
2023.10.22
...
2023.11.29
Author | SHA1 | Date | |
---|---|---|---|
f4a7366aad | |||
02c74b6f8d | |||
8889694125 | |||
6a99409a15 | |||
2fc590d708 | |||
6e4bf2f025 |
@ -1,4 +1,4 @@
|
||||
RUN_APP_ARGS=""
|
||||
#EDGE_DOCUMENTSTORE_DSN="rpc://localhost:3001/documentstore?tenant=local&appId=%APPID%"
|
||||
#EDGE_BLOBSTORE_DSN="rpc://localhost:3001/blobstore?tenant=local&appId=%APPID%"
|
||||
#EDGE_BLOBSTORE_DSN="cache://localhost:3001/blobstore?driver=rpc&tenant=local&appId=%APPID%"
|
||||
#EDGE_SHARESTORE_DSN="rpc://localhost:3001/sharestore?tenant=local"
|
56
cmd/cli/command/app/info.go
Normal file
56
cmd/cli/command/app/info.go
Normal file
@ -0,0 +1,56 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/app"
|
||||
"forge.cadoles.com/arcad/edge/pkg/bundle"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/urfave/cli/v2"
|
||||
"gopkg.in/yaml.v2"
|
||||
)
|
||||
|
||||
func InfoCommand() *cli.Command {
|
||||
return &cli.Command{
|
||||
Name: "info",
|
||||
Usage: "Print app manifest informations",
|
||||
Flags: []cli.Flag{
|
||||
&cli.StringFlag{
|
||||
Name: "path",
|
||||
Usage: "use `PATH` as app bundle (zip, zim or directory bundle)",
|
||||
Aliases: []string{"p"},
|
||||
Value: "",
|
||||
Required: true,
|
||||
},
|
||||
},
|
||||
Action: func(ctx *cli.Context) error {
|
||||
appPath := ctx.String("path")
|
||||
|
||||
bundle, err := bundle.FromPath(appPath)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not load app bundle")
|
||||
}
|
||||
|
||||
manifest, err := app.LoadManifest(bundle)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not load app manifest")
|
||||
}
|
||||
|
||||
if valid, err := manifest.Validate(manifestMetadataValidators...); !valid {
|
||||
return errors.Wrap(err, "invalid app manifest")
|
||||
}
|
||||
|
||||
encoder := yaml.NewEncoder(os.Stdout)
|
||||
|
||||
if err := encoder.Encode(manifest); err != nil {
|
||||
return errors.Wrap(err, "could not encode manifest")
|
||||
}
|
||||
|
||||
if err := encoder.Close(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
}
|
||||
}
|
@ -12,6 +12,7 @@ func Root() *cli.Command {
|
||||
RunCommand(),
|
||||
PackageCommand(),
|
||||
HashPasswordCommand(),
|
||||
InfoCommand(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -44,10 +44,13 @@ import (
|
||||
_ "forge.cadoles.com/arcad/edge/pkg/module/auth/http/passwd/argon2id"
|
||||
_ "forge.cadoles.com/arcad/edge/pkg/module/auth/http/passwd/plain"
|
||||
|
||||
// Register storage drivers
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver"
|
||||
|
||||
// Register storage drivers
|
||||
_ "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache"
|
||||
_ "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc"
|
||||
_ "forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/share"
|
||||
)
|
||||
|
||||
|
@ -22,13 +22,15 @@ import (
|
||||
"github.com/urfave/cli/v2"
|
||||
|
||||
// Register storage drivers
|
||||
_ "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache"
|
||||
_ "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc"
|
||||
_ "forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/cmd/storage-server/command/flag"
|
||||
"forge.cadoles.com/arcad/edge/pkg/jwtutil"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver"
|
||||
_ "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server"
|
||||
_ "forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/share"
|
||||
)
|
||||
|
||||
@ -51,17 +53,17 @@ func Run() *cli.Command {
|
||||
&cli.StringFlag{
|
||||
Name: "blobstore-dsn-pattern",
|
||||
EnvVars: []string{"STORAGE_SERVER_BLOBSTORE_DSN_PATTERN"},
|
||||
Value: fmt.Sprintf("sqlite://data/%%TENANT%%/%%APPID%%/blobstore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", (60 * time.Second).Milliseconds()),
|
||||
Value: fmt.Sprintf("sqlite://data/%%TENANT%%/%%APPID%%/blobstore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d&_pragma=journal_mode=wal", (60 * time.Second).Milliseconds()),
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "documentstore-dsn-pattern",
|
||||
EnvVars: []string{"STORAGE_SERVER_DOCUMENTSTORE_DSN_PATTERN"},
|
||||
Value: fmt.Sprintf("sqlite://data/%%TENANT%%/%%APPID%%/documentstore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", (60 * time.Second).Milliseconds()),
|
||||
Value: fmt.Sprintf("sqlite://data/%%TENANT%%/%%APPID%%/documentstore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d&_pragma=journal_mode=wal", (60 * time.Second).Milliseconds()),
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "sharestore-dsn-pattern",
|
||||
EnvVars: []string{"STORAGE_SERVER_SHARESTORE_DSN_PATTERN"},
|
||||
Value: fmt.Sprintf("sqlite://data/%%TENANT%%/sharestore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", (60 * time.Second).Milliseconds()),
|
||||
Value: fmt.Sprintf("sqlite://data/%%TENANT%%/sharestore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d&_pragma=journal_mode=wal", (60 * time.Second).Milliseconds()),
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "sentry-dsn",
|
||||
|
3
go.mod
3
go.mod
@ -15,11 +15,13 @@ require (
|
||||
|
||||
require (
|
||||
cloud.google.com/go v0.75.0 // indirect
|
||||
github.com/allegro/bigcache/v3 v3.1.0 // indirect
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
|
||||
github.com/go-playground/locales v0.14.0 // indirect
|
||||
github.com/go-playground/universal-translator v0.18.0 // indirect
|
||||
github.com/goccy/go-json v0.9.11 // indirect
|
||||
github.com/gogo/protobuf v0.0.0-20161014173244-50d1bd39ce4e // indirect
|
||||
github.com/jackc/puddle/v2 v2.2.1 // indirect
|
||||
github.com/leodido/go-urn v1.2.1 // indirect
|
||||
github.com/lestrrat-go/blackmagic v1.0.1 // indirect
|
||||
github.com/lestrrat-go/httpcc v1.0.1 // indirect
|
||||
@ -27,6 +29,7 @@ require (
|
||||
github.com/lestrrat-go/iter v1.0.2 // indirect
|
||||
github.com/lestrrat-go/option v1.0.0 // indirect
|
||||
github.com/miekg/dns v1.1.53 // indirect
|
||||
golang.org/x/sync v0.1.0 // indirect
|
||||
google.golang.org/genproto v0.0.0-20210226172003-ab064af71705 // indirect
|
||||
google.golang.org/grpc v1.35.0 // indirect
|
||||
gopkg.in/go-playground/validator.v9 v9.29.1 // indirect
|
||||
|
4
go.sum
4
go.sum
@ -53,6 +53,8 @@ github.com/alecthomas/kong v0.2.1-0.20190708041108-0548c6b1afae/go.mod h1:+inYUS
|
||||
github.com/alecthomas/kong-hcl v0.1.8-0.20190615233001-b21fea9723c8/go.mod h1:MRgZdU3vrFd05IQ89AxUZ0aYdF39BYoNFa324SodPCA=
|
||||
github.com/alecthomas/repr v0.0.0-20180818092828-117648cd9897 h1:p9Sln00KOTlrYkxI1zYWl1QLnEqAqEARBEYa8FQnQcY=
|
||||
github.com/alecthomas/repr v0.0.0-20180818092828-117648cd9897/go.mod h1:xTS7Pm1pD1mvyM075QCDSRqH6qRLXylzS24ZTpRiSzQ=
|
||||
github.com/allegro/bigcache/v3 v3.1.0 h1:H2Vp8VOvxcrB91o86fUSVJFqeuz8kpyyB02eH3bSzwk=
|
||||
github.com/allegro/bigcache/v3 v3.1.0/go.mod h1:aPyh7jEvrog9zAwx5N7+JUQX5dZTSGpxF1LAR4dr35I=
|
||||
github.com/barnybug/go-cast v0.0.0-20201201064555-a87ccbc26692 h1:JW4WZlqyaNWUUahfr7MigeDW6jmtam5cTzzo1lwsFhE=
|
||||
github.com/barnybug/go-cast v0.0.0-20201201064555-a87ccbc26692/go.mod h1:Au0ipPuCBA7zsOC61SnyrYetm8VT3vo1UJtwHeYke44=
|
||||
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
||||
@ -206,6 +208,8 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:
|
||||
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
|
||||
github.com/igm/sockjs-go/v3 v3.0.2 h1:2m0k53w0DBiGozeQUIEPR6snZFmpFpYvVsGnfLPNXbE=
|
||||
github.com/igm/sockjs-go/v3 v3.0.2/go.mod h1:UqchsOjeagIBFHvd+RZpLaVRbCwGilEC08EDHsD1jYE=
|
||||
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
|
||||
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
|
||||
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
||||
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
|
||||
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
|
||||
|
@ -115,7 +115,7 @@ func (m *Module) handleClientMessages() {
|
||||
case msg := <-clientMessages:
|
||||
clientMessage, ok := msg.(*module.ClientMessage)
|
||||
if !ok {
|
||||
logger.Error(
|
||||
logger.Warn(
|
||||
ctx,
|
||||
"unexpected message type",
|
||||
logger.F("message", msg),
|
||||
|
105
pkg/sdk/client/dist/client.js
vendored
105
pkg/sdk/client/dist/client.js
vendored
File diff suppressed because one or more lines are too long
8
pkg/sdk/client/dist/client.js.map
vendored
8
pkg/sdk/client/dist/client.js.map
vendored
File diff suppressed because one or more lines are too long
@ -5,5 +5,6 @@ import LoginIcon from './login.svg';
|
||||
import HomeIcon from './home.svg';
|
||||
import LinkIcon from './link.svg';
|
||||
import LogoutIcon from './logout.svg';
|
||||
import LoaderIcon from './loader.svg';
|
||||
|
||||
export { UserCircleIcon, MenuIcon, CloudIcon, LoginIcon, HomeIcon, LinkIcon, LogoutIcon }
|
||||
export { LoaderIcon, UserCircleIcon, MenuIcon, CloudIcon, LoginIcon, HomeIcon, LinkIcon, LogoutIcon }
|
1
pkg/sdk/client/src/components/icons/loader.svg
Normal file
1
pkg/sdk/client/src/components/icons/loader.svg
Normal file
@ -0,0 +1 @@
|
||||
<svg viewBox="0 0 24 24" xmlns="http://www.w3.org/2000/svg" fill="#000000"><g id="SVGRepo_bgCarrier" stroke-width="0"></g><g id="SVGRepo_tracerCarrier" stroke-linecap="round" stroke-linejoin="round"></g><g id="SVGRepo_iconCarrier"> <g> <path fill="none" d="M0 0h24v24H0z"></path> <path d="M12 2a1 1 0 0 1 1 1v3a1 1 0 0 1-2 0V3a1 1 0 0 1 1-1zm0 15a1 1 0 0 1 1 1v3a1 1 0 0 1-2 0v-3a1 1 0 0 1 1-1zm8.66-10a1 1 0 0 1-.366 1.366l-2.598 1.5a1 1 0 1 1-1-1.732l2.598-1.5A1 1 0 0 1 20.66 7zM7.67 14.5a1 1 0 0 1-.366 1.366l-2.598 1.5a1 1 0 1 1-1-1.732l2.598-1.5a1 1 0 0 1 1.366.366zM20.66 17a1 1 0 0 1-1.366.366l-2.598-1.5a1 1 0 0 1 1-1.732l2.598 1.5A1 1 0 0 1 20.66 17zM7.67 9.5a1 1 0 0 1-1.366.366l-2.598-1.5a1 1 0 1 1 1-1.732l2.598 1.5A1 1 0 0 1 7.67 9.5z"></path> </g> </g></svg>
|
After Width: | Height: | Size: 773 B |
57
pkg/sdk/client/src/components/loader.ts
Normal file
57
pkg/sdk/client/src/components/loader.ts
Normal file
@ -0,0 +1,57 @@
|
||||
import { LitElement, html, css } from 'lit';
|
||||
import { LoaderIcon } from './icons';
|
||||
|
||||
export class Loader extends LitElement {
|
||||
static styles = css`
|
||||
:host {
|
||||
display: inline-block;
|
||||
height: 100%;
|
||||
width: 100%;
|
||||
border-bottom: 1px solid rgb(229,231,235);
|
||||
border-top: 10px solid transparent;
|
||||
background-color: #fff;
|
||||
min-height: 50px;
|
||||
padding: 10px 0;
|
||||
}
|
||||
|
||||
.container {
|
||||
display: flex;
|
||||
align-items: center;
|
||||
flex-direction: column;
|
||||
justify-content: center;
|
||||
font-family: Arial, Helvetica Neue, Helvetica, sans-serif;
|
||||
font-size: 14px;
|
||||
color: black;
|
||||
}
|
||||
|
||||
.icon {
|
||||
height: 35px;
|
||||
animation-duration: 3s;
|
||||
animation-name: spin;
|
||||
animation-iteration-count: infinite;
|
||||
}
|
||||
|
||||
@keyframes spin {
|
||||
from {
|
||||
transform: rotateZ(0deg);
|
||||
}
|
||||
|
||||
to {
|
||||
transform: rotateZ(360deg);
|
||||
}
|
||||
}
|
||||
`;
|
||||
|
||||
constructor() {
|
||||
super();
|
||||
}
|
||||
|
||||
render() {
|
||||
return html`
|
||||
<div class="container">
|
||||
<img class="icon" src="${LoaderIcon}" />
|
||||
Chargement en cours
|
||||
</div>
|
||||
`
|
||||
}
|
||||
}
|
@ -49,6 +49,9 @@ export class Menu extends LitElement {
|
||||
@property()
|
||||
_profile: Profile
|
||||
|
||||
@property()
|
||||
_loading: boolean = false
|
||||
|
||||
static styles = css`
|
||||
:host {
|
||||
position: fixed;
|
||||
@ -95,6 +98,7 @@ export class Menu extends LitElement {
|
||||
}
|
||||
|
||||
_fetchApps() {
|
||||
this._loading = true;
|
||||
return fetch(`${BASE_API_URL}/apps`)
|
||||
.then(res => res.json())
|
||||
.then(result => {
|
||||
@ -130,9 +134,14 @@ export class Menu extends LitElement {
|
||||
return Promise.all(promises);
|
||||
})
|
||||
.then((manifests: Manifest[]) => {
|
||||
this._loading = false
|
||||
this._apps = manifests;
|
||||
})
|
||||
.catch(err => console.error(err))
|
||||
.catch(err => {
|
||||
console.error(err);
|
||||
this._loading = false;
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
_fetchProfile() {
|
||||
@ -158,19 +167,24 @@ export class Menu extends LitElement {
|
||||
}
|
||||
|
||||
_renderApps() {
|
||||
const apps = this._apps
|
||||
.filter(manifest => this._canAccess(manifest))
|
||||
.map(manifest => {
|
||||
const iconUrl = ( ( manifest.url || '') + ( manifest.metadata?.paths?.icon || '' ) ) || LinkIcon;
|
||||
return html`
|
||||
<edge-menu-sub-item
|
||||
name='${ manifest.id }'
|
||||
label='${ manifest.title }'
|
||||
icon-url='${ iconUrl }'
|
||||
link-url='${ manifest.url || '#' }'>
|
||||
</edge-menu-sub-item>
|
||||
`
|
||||
});
|
||||
let apps;
|
||||
if (this._loading) {
|
||||
apps = [ html`<edge-loader></edge-loader>` ]
|
||||
} else {
|
||||
apps = this._apps
|
||||
.filter(manifest => this._canAccess(manifest))
|
||||
.map(manifest => {
|
||||
const iconUrl = ( ( manifest.url || '') + ( manifest.metadata?.paths?.icon || '' ) ) || LinkIcon;
|
||||
return html`
|
||||
<edge-menu-sub-item
|
||||
name='${ manifest.id }'
|
||||
label='${ manifest.title }'
|
||||
icon-url='${ iconUrl }'
|
||||
link-url='${ manifest.url || '#' }'>
|
||||
</edge-menu-sub-item>
|
||||
`
|
||||
});
|
||||
}
|
||||
|
||||
return html`
|
||||
<edge-menu-item name='apps' label='Apps' icon-url='${CloudIcon}'>
|
||||
|
@ -6,10 +6,12 @@ import { MenuItem as MenuItemElement } from './components/menu-item.js';
|
||||
import { MenuSubItem as MenuSubItemElement } from './components/menu-sub-item.js';
|
||||
import { CrossFrameMessenger } from './crossframe-messenger.js';
|
||||
import { MenuManager } from './menu-manager.js';
|
||||
import { Loader } from './components/loader';
|
||||
|
||||
customElements.define('edge-menu', MenuElement);
|
||||
customElements.define('edge-menu-item', MenuItemElement);
|
||||
customElements.define('edge-menu-sub-item', MenuSubItemElement);
|
||||
customElements.define('edge-loader', Loader);
|
||||
|
||||
export const Client = new EdgeClient();
|
||||
export const Frame = new CrossFrameMessenger();
|
||||
|
@ -23,7 +23,7 @@ func NewBlobStore(dsn string) (storage.BlobStore, error) {
|
||||
|
||||
factory, exists := blobStoreFactories[url.Scheme]
|
||||
if !exists {
|
||||
return nil, errors.WithStack(ErrSchemeNotRegistered)
|
||||
return nil, errors.Wrapf(ErrSchemeNotRegistered, "no driver associated with scheme '%s'", url.Scheme)
|
||||
}
|
||||
|
||||
store, err := factory(url)
|
||||
|
121
pkg/storage/driver/cache/blob_bucket.go
vendored
Normal file
121
pkg/storage/driver/cache/blob_bucket.go
vendored
Normal file
@ -0,0 +1,121 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"github.com/allegro/bigcache/v3"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
type BlobBucket struct {
|
||||
bucket storage.BlobBucket
|
||||
cache *bigcache.BigCache
|
||||
}
|
||||
|
||||
// Close implements storage.BlobBucket.
|
||||
func (b *BlobBucket) Close() error {
|
||||
if err := b.bucket.Close(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete implements storage.BlobBucket.
|
||||
func (b *BlobBucket) Delete(ctx context.Context, id storage.BlobID) error {
|
||||
if err := b.bucket.Delete(ctx, id); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get implements storage.BlobBucket.
|
||||
func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobInfo, error) {
|
||||
info, err := b.bucket.Get(ctx, id)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return info, nil
|
||||
}
|
||||
|
||||
// List implements storage.BlobBucket.
|
||||
func (b *BlobBucket) List(ctx context.Context) ([]storage.BlobInfo, error) {
|
||||
infos, err := b.bucket.List(ctx)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return infos, nil
|
||||
}
|
||||
|
||||
// Name implements storage.BlobBucket.
|
||||
func (b *BlobBucket) Name() string {
|
||||
return b.bucket.Name()
|
||||
}
|
||||
|
||||
// NewReader implements storage.BlobBucket.
|
||||
func (b *BlobBucket) NewReader(ctx context.Context, id storage.BlobID) (io.ReadSeekCloser, error) {
|
||||
if cached, exist := b.inCache(id); exist {
|
||||
logger.Debug(ctx, "found blob in cache", logger.F("cacheKey", b.getCacheKey(id)), logger.F("cacheStats", b.cache.Stats()))
|
||||
return cached, nil
|
||||
}
|
||||
|
||||
reader, err := b.bucket.NewReader(ctx, id)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return &readCacher{
|
||||
reader: reader,
|
||||
cache: b.cache,
|
||||
key: b.getCacheKey(id),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (b *BlobBucket) getCacheKey(id storage.BlobID) string {
|
||||
return fmt.Sprintf("%s-%s", b.Name(), id)
|
||||
}
|
||||
|
||||
func (b *BlobBucket) inCache(id storage.BlobID) (io.ReadSeekCloser, bool) {
|
||||
key := b.getCacheKey(id)
|
||||
data, err := b.cache.Get(key)
|
||||
if err != nil {
|
||||
if errors.Is(err, bigcache.ErrEntryNotFound) {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
logger.Error(context.Background(), "could not retrieve cache value", logger.CapturedE(errors.WithStack(err)))
|
||||
|
||||
return nil, false
|
||||
}
|
||||
|
||||
return &cachedReader{data, 0}, true
|
||||
}
|
||||
|
||||
// NewWriter implements storage.BlobBucket.
|
||||
func (b *BlobBucket) NewWriter(ctx context.Context, id storage.BlobID) (io.WriteCloser, error) {
|
||||
writer, err := b.bucket.NewWriter(ctx, id)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return writer, nil
|
||||
}
|
||||
|
||||
// Size implements storage.BlobBucket.
|
||||
func (b *BlobBucket) Size(ctx context.Context) (int64, error) {
|
||||
size, err := b.bucket.Size(ctx)
|
||||
if err != nil {
|
||||
return 0, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return size, nil
|
||||
}
|
||||
|
||||
var _ storage.BlobBucket = &BlobBucket{}
|
55
pkg/storage/driver/cache/blob_store.go
vendored
Normal file
55
pkg/storage/driver/cache/blob_store.go
vendored
Normal file
@ -0,0 +1,55 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"github.com/allegro/bigcache/v3"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type BlobStore struct {
|
||||
store storage.BlobStore
|
||||
cache *bigcache.BigCache
|
||||
}
|
||||
|
||||
// DeleteBucket implements storage.BlobStore.
|
||||
func (s *BlobStore) DeleteBucket(ctx context.Context, name string) error {
|
||||
if err := s.store.DeleteBucket(ctx, name); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListBuckets implements storage.BlobStore.
|
||||
func (s *BlobStore) ListBuckets(ctx context.Context) ([]string, error) {
|
||||
buckets, err := s.store.ListBuckets(ctx)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return buckets, nil
|
||||
}
|
||||
|
||||
// OpenBucket implements storage.BlobStore.
|
||||
func (s *BlobStore) OpenBucket(ctx context.Context, name string) (storage.BlobBucket, error) {
|
||||
bucket, err := s.store.OpenBucket(ctx, name)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return &BlobBucket{
|
||||
bucket: bucket,
|
||||
cache: s.cache,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewBlobStore(store storage.BlobStore, cache *bigcache.BigCache) *BlobStore {
|
||||
return &BlobStore{
|
||||
store: store,
|
||||
cache: cache,
|
||||
}
|
||||
}
|
||||
|
||||
var _ storage.BlobStore = &BlobStore{}
|
63
pkg/storage/driver/cache/blob_store_test.go
vendored
Normal file
63
pkg/storage/driver/cache/blob_store_test.go
vendored
Normal file
@ -0,0 +1,63 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/testsuite"
|
||||
"github.com/allegro/bigcache/v3"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
func TestBlobStore(t *testing.T) {
|
||||
t.Parallel()
|
||||
if testing.Verbose() {
|
||||
logger.SetLevel(logger.LevelDebug)
|
||||
}
|
||||
|
||||
file := "./testdata/blobstore_test.sqlite"
|
||||
|
||||
if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
|
||||
|
||||
backend := sqlite.NewBlobStore(dsn)
|
||||
|
||||
cache, err := bigcache.New(context.Background(), bigcache.DefaultConfig(time.Minute))
|
||||
if err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
store := NewBlobStore(backend, cache)
|
||||
|
||||
testsuite.TestBlobStore(context.Background(), t, store)
|
||||
}
|
||||
|
||||
func BenchmarkBlobStore(t *testing.B) {
|
||||
logger.SetLevel(logger.LevelError)
|
||||
|
||||
file := "./testdata/blobstore_test.sqlite"
|
||||
|
||||
if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
|
||||
backend := sqlite.NewBlobStore(dsn)
|
||||
|
||||
cache, err := bigcache.New(context.Background(), bigcache.DefaultConfig(time.Minute))
|
||||
if err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
store := NewBlobStore(backend, cache)
|
||||
|
||||
testsuite.BenchmarkBlobStore(t, store)
|
||||
}
|
105
pkg/storage/driver/cache/driver.go
vendored
Normal file
105
pkg/storage/driver/cache/driver.go
vendored
Normal file
@ -0,0 +1,105 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver"
|
||||
"github.com/allegro/bigcache/v3"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
func init() {
|
||||
driver.RegisterBlobStoreFactory("cache", blobStoreFactory)
|
||||
}
|
||||
|
||||
func blobStoreFactory(dsn *url.URL) (storage.BlobStore, error) {
|
||||
query := dsn.Query()
|
||||
|
||||
rawDriver := query.Get("driver")
|
||||
if rawDriver == "" {
|
||||
return nil, errors.New("missing required url parameter 'driver'")
|
||||
}
|
||||
|
||||
query.Del("driver")
|
||||
|
||||
cacheTTL := time.Minute * 60
|
||||
|
||||
rawCacheTTL := query.Get("cacheTTL")
|
||||
if rawCacheTTL != "" {
|
||||
query.Del("cacheTTL")
|
||||
|
||||
ttl, err := time.ParseDuration(rawCacheTTL)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not parse url parameter 'cacheTTL'")
|
||||
}
|
||||
|
||||
cacheTTL = ttl
|
||||
}
|
||||
|
||||
cacheConfig := bigcache.DefaultConfig(cacheTTL)
|
||||
cacheConfig.Logger = &cacheLogger{}
|
||||
|
||||
rawCacheShards := query.Get("cacheShards")
|
||||
if rawCacheShards != "" {
|
||||
query.Del("cacheShards")
|
||||
|
||||
cacheShards, err := strconv.ParseInt(rawCacheShards, 10, 32)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not parse url parameter 'cacheShards'")
|
||||
}
|
||||
|
||||
cacheConfig.Shards = int(cacheShards)
|
||||
}
|
||||
|
||||
rawMaxCacheSize := query.Get("maxCacheSize")
|
||||
if rawMaxCacheSize != "" {
|
||||
query.Del("maxCacheSize")
|
||||
|
||||
maxCacheSize, err := strconv.ParseInt(rawMaxCacheSize, 10, 32)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not parse url parameter 'maxCacheSize'")
|
||||
}
|
||||
|
||||
// See cacheConfig.HardMaxCacheSize documentation
|
||||
var minCacheSize int64 = (2 * (64 + 32) * int64(cacheConfig.Shards)) / 1000
|
||||
|
||||
if maxCacheSize < minCacheSize {
|
||||
return nil, errors.Errorf("max cache size can not be set to a value below '%d'", minCacheSize)
|
||||
}
|
||||
|
||||
cacheConfig.HardMaxCacheSize = int(maxCacheSize)
|
||||
}
|
||||
|
||||
url := &url.URL{
|
||||
Scheme: rawDriver,
|
||||
Host: dsn.Host,
|
||||
Path: dsn.Path,
|
||||
RawQuery: query.Encode(),
|
||||
}
|
||||
|
||||
store, err := driver.NewBlobStore(url.String())
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
cache, err := bigcache.New(context.Background(), cacheConfig)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return NewBlobStore(store, cache), nil
|
||||
}
|
||||
|
||||
type cacheLogger struct{}
|
||||
|
||||
func (l *cacheLogger) Printf(format string, v ...interface{}) {
|
||||
logger.Debug(context.Background(), fmt.Sprintf(format, v...))
|
||||
}
|
||||
|
||||
var _ bigcache.Logger = &cacheLogger{}
|
117
pkg/storage/driver/cache/reader.go
vendored
Normal file
117
pkg/storage/driver/cache/reader.go
vendored
Normal file
@ -0,0 +1,117 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/allegro/bigcache/v3"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
type readCacher struct {
|
||||
reader io.ReadSeekCloser
|
||||
cache *bigcache.BigCache
|
||||
key string
|
||||
}
|
||||
|
||||
// Close implements io.ReadSeekCloser.
|
||||
func (r *readCacher) Close() error {
|
||||
if err := r.reader.Close(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Read implements io.ReadSeekCloser.
|
||||
func (r *readCacher) Read(p []byte) (n int, err error) {
|
||||
length, err := r.reader.Read(p)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
return length, io.EOF
|
||||
}
|
||||
|
||||
return length, errors.WithStack(err)
|
||||
}
|
||||
|
||||
if length > 0 {
|
||||
if err := r.cache.Append(r.key, p[:length]); err != nil {
|
||||
ctx := logger.With(context.Background(), logger.F("cacheKey", r.key))
|
||||
logger.Error(ctx, "could not write to buffer", logger.CapturedE(errors.WithStack(err)))
|
||||
|
||||
if err := r.cache.Delete(r.key); err != nil {
|
||||
logger.Error(ctx, "could not delete cache key", logger.CapturedE(errors.WithStack(err)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return length, nil
|
||||
}
|
||||
|
||||
// Seek implements io.ReadSeekCloser.
|
||||
func (r *readCacher) Seek(offset int64, whence int) (int64, error) {
|
||||
length, err := r.reader.Seek(offset, whence)
|
||||
if err != nil {
|
||||
return length, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return length, nil
|
||||
}
|
||||
|
||||
var _ io.ReadSeekCloser = &readCacher{}
|
||||
|
||||
type cachedReader struct {
|
||||
buffer []byte
|
||||
offset int64
|
||||
}
|
||||
|
||||
// Read implements io.ReadSeekCloser.
|
||||
func (r *cachedReader) Read(p []byte) (n int, err error) {
|
||||
available := len(r.buffer) - int(r.offset)
|
||||
if available == 0 {
|
||||
return 0, io.EOF
|
||||
}
|
||||
|
||||
size := len(p)
|
||||
if size > available {
|
||||
size = available
|
||||
}
|
||||
|
||||
copy(p, r.buffer[r.offset:r.offset+int64(size)])
|
||||
r.offset += int64(size)
|
||||
|
||||
return size, nil
|
||||
}
|
||||
|
||||
// Close implements io.ReadSeekCloser.
|
||||
func (r *cachedReader) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Seek implements io.ReadSeekCloser.
|
||||
func (r *cachedReader) Seek(offset int64, whence int) (int64, error) {
|
||||
var newOffset int64
|
||||
|
||||
switch whence {
|
||||
case io.SeekStart:
|
||||
newOffset = offset
|
||||
case io.SeekCurrent:
|
||||
newOffset = r.offset + offset
|
||||
case io.SeekEnd:
|
||||
newOffset = int64(len(r.buffer)) + offset
|
||||
default:
|
||||
return 0, errors.Errorf("unknown seek whence '%d'", whence)
|
||||
}
|
||||
|
||||
if newOffset > int64(len(r.buffer)) || newOffset < 0 {
|
||||
return 0, fmt.Errorf("invalid offset %d", offset)
|
||||
}
|
||||
|
||||
r.offset = newOffset
|
||||
|
||||
return newOffset, nil
|
||||
}
|
||||
|
||||
var _ io.ReadSeekCloser = &cachedReader{}
|
2
pkg/storage/driver/cache/testdata/.gitignore
vendored
Normal file
2
pkg/storage/driver/cache/testdata/.gitignore
vendored
Normal file
@ -0,0 +1,2 @@
|
||||
*
|
||||
!.gitignore
|
@ -5,7 +5,6 @@ import (
|
||||
"net/url"
|
||||
|
||||
"github.com/keegancsmith/rpc"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server/blob"
|
||||
@ -13,7 +12,7 @@ import (
|
||||
)
|
||||
|
||||
type BlobStore struct {
|
||||
serverURL *url.URL
|
||||
withClient WithClientFunc
|
||||
}
|
||||
|
||||
// DeleteBucket implements storage.BlobStore.
|
||||
@ -63,7 +62,7 @@ func (s *BlobStore) OpenBucket(ctx context.Context, name string) (storage.BlobBu
|
||||
func (s *BlobStore) call(ctx context.Context, serviceMethod string, args any, reply any) error {
|
||||
err := s.withClient(ctx, func(ctx context.Context, client *rpc.Client) error {
|
||||
if err := client.Call(ctx, serviceMethod, args, reply); err != nil {
|
||||
return errors.WithStack(err)
|
||||
return errors.WithStack(remapBlobError(err))
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -75,27 +74,11 @@ func (s *BlobStore) call(ctx context.Context, serviceMethod string, args any, re
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *BlobStore) withClient(ctx context.Context, fn func(ctx context.Context, client *rpc.Client) error) error {
|
||||
client, err := rpc.DialHTTPPath("tcp", s.serverURL.Host, s.serverURL.Path+"?"+s.serverURL.RawQuery)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := client.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close rpc client", logger.CapturedE(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
if err := fn(ctx, client); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewBlobStore(serverURL *url.URL) *BlobStore {
|
||||
return &BlobStore{serverURL}
|
||||
withClient := WithPooledClient(serverURL)
|
||||
return &BlobStore{
|
||||
withClient: withClient,
|
||||
}
|
||||
}
|
||||
|
||||
var _ storage.BlobStore = &BlobStore{}
|
||||
|
94
pkg/storage/driver/rpc/client/client_pool.go
Normal file
94
pkg/storage/driver/rpc/client/client_pool.go
Normal file
@ -0,0 +1,94 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"github.com/jackc/puddle/v2"
|
||||
"github.com/keegancsmith/rpc"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
func NewClientPool(serverURL *url.URL, poolSize int) (*puddle.Pool[*rpc.Client], error) {
|
||||
constructor := func(context.Context) (*rpc.Client, error) {
|
||||
client, err := rpc.DialHTTPPath("tcp", serverURL.Host, serverURL.Path+"?"+serverURL.RawQuery)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return client, nil
|
||||
}
|
||||
|
||||
destructor := func(client *rpc.Client) {
|
||||
if err := client.Close(); err != nil {
|
||||
logger.Error(context.Background(), "could not close client", logger.CapturedE(errors.WithStack(err)))
|
||||
}
|
||||
}
|
||||
|
||||
maxPoolSize := int32(poolSize)
|
||||
|
||||
pool, err := puddle.NewPool(&puddle.Config[*rpc.Client]{Constructor: constructor, Destructor: destructor, MaxSize: maxPoolSize})
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return pool, nil
|
||||
}
|
||||
|
||||
type WithClientFunc func(ctx context.Context, fn func(ctx context.Context, client *rpc.Client) error) error
|
||||
|
||||
func WithPooledClient(serverURL *url.URL) WithClientFunc {
|
||||
var (
|
||||
pool *puddle.Pool[*rpc.Client]
|
||||
createPool sync.Once
|
||||
)
|
||||
|
||||
return func(ctx context.Context, fn func(context.Context, *rpc.Client) error) error {
|
||||
var err error
|
||||
createPool.Do(func() {
|
||||
rawPoolSize := serverURL.Query().Get("clientPoolSize")
|
||||
if rawPoolSize == "" {
|
||||
rawPoolSize = "5"
|
||||
}
|
||||
|
||||
var poolSize int64
|
||||
|
||||
poolSize, err = strconv.ParseInt(rawPoolSize, 10, 32)
|
||||
if err != nil {
|
||||
err = errors.Wrap(err, "could not parse clientPoolSize url query parameter")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
pool, err = NewClientPool(serverURL, int(poolSize))
|
||||
if err != nil {
|
||||
err = errors.WithStack(err)
|
||||
|
||||
return
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
clientResource, err := pool.Acquire(ctx)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := fn(ctx, clientResource.Value()); err != nil {
|
||||
if errors.Is(err, rpc.ErrShutdown) {
|
||||
clientResource.Destroy()
|
||||
}
|
||||
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
clientResource.Release()
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
@ -6,7 +6,6 @@ import (
|
||||
|
||||
"github.com/keegancsmith/rpc"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server/document"
|
||||
@ -14,7 +13,7 @@ import (
|
||||
)
|
||||
|
||||
type DocumentStore struct {
|
||||
serverURL *url.URL
|
||||
withClient WithClientFunc
|
||||
}
|
||||
|
||||
// Delete implements storage.DocumentStore.
|
||||
@ -96,7 +95,7 @@ func (s *DocumentStore) Upsert(ctx context.Context, collection string, doc stora
|
||||
func (s *DocumentStore) call(ctx context.Context, serviceMethod string, args any, reply any) error {
|
||||
err := s.withClient(ctx, func(ctx context.Context, client *rpc.Client) error {
|
||||
if err := client.Call(ctx, serviceMethod, args, reply); err != nil {
|
||||
return errors.WithStack(err)
|
||||
return errors.WithStack(remapDocumentError(err))
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -108,27 +107,12 @@ func (s *DocumentStore) call(ctx context.Context, serviceMethod string, args any
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *DocumentStore) withClient(ctx context.Context, fn func(ctx context.Context, client *rpc.Client) error) error {
|
||||
client, err := rpc.DialHTTPPath("tcp", s.serverURL.Host, s.serverURL.Path+"?"+s.serverURL.RawQuery)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
func NewDocumentStore(serverURL *url.URL) *DocumentStore {
|
||||
withClient := WithPooledClient(serverURL)
|
||||
|
||||
return &DocumentStore{
|
||||
withClient: withClient,
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := client.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close rpc client", logger.CapturedE(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
if err := fn(ctx, client); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewDocumentStore(url *url.URL) *DocumentStore {
|
||||
return &DocumentStore{url}
|
||||
}
|
||||
|
||||
var _ storage.DocumentStore = &DocumentStore{}
|
||||
|
@ -1,10 +1,31 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/share"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func remapBlobError(err error) error {
|
||||
switch errors.Cause(err).Error() {
|
||||
case storage.ErrBlobNotFound.Error():
|
||||
return storage.ErrBlobNotFound
|
||||
case storage.ErrBucketClosed.Error():
|
||||
return storage.ErrBucketClosed
|
||||
default:
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func remapDocumentError(err error) error {
|
||||
switch errors.Cause(err).Error() {
|
||||
case storage.ErrDocumentNotFound.Error():
|
||||
return storage.ErrDocumentNotFound
|
||||
default:
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func remapShareError(err error) error {
|
||||
switch errors.Cause(err).Error() {
|
||||
case share.ErrAttributeRequired.Error():
|
||||
|
@ -236,7 +236,7 @@ func (b *BlobBucket) withTx(ctx context.Context, fn func(tx *sql.Tx) error) erro
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := WithTx(ctx, db, fn); err != nil {
|
||||
if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
@ -335,7 +335,7 @@ func (wbc *blobWriterCloser) withTx(ctx context.Context, fn func(tx *sql.Tx) err
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := WithTx(ctx, db, fn); err != nil {
|
||||
if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
@ -444,7 +444,7 @@ func (brc *blobReaderCloser) withTx(ctx context.Context, fn func(tx *sql.Tx) err
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := WithTx(ctx, db, fn); err != nil {
|
||||
if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
|
@ -114,7 +114,7 @@ func (s *BlobStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) error
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := WithTx(ctx, db, fn); err != nil {
|
||||
if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
|
3
pkg/storage/driver/sqlite/const.go
Normal file
3
pkg/storage/driver/sqlite/const.go
Normal file
@ -0,0 +1,3 @@
|
||||
package sqlite
|
||||
|
||||
const sqliteBusyMaxRetry = 5
|
@ -269,7 +269,7 @@ func (s *DocumentStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) e
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := WithTx(ctx, db, fn); err != nil {
|
||||
if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
|
@ -368,7 +368,7 @@ func (s *ShareStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) erro
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := WithTx(ctx, db, fn); err != nil {
|
||||
if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
|
@ -3,7 +3,9 @@ package sqlite
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
@ -22,6 +24,58 @@ func Open(path string) (*sql.DB, error) {
|
||||
return db, nil
|
||||
}
|
||||
|
||||
func WithRetry(ctx context.Context, db *sql.DB, max int, fn func(*sql.Tx) error) error {
|
||||
attempts := 0
|
||||
|
||||
ctx = logger.With(ctx, logger.F("max", max))
|
||||
|
||||
var err error
|
||||
for {
|
||||
ctx = logger.With(ctx)
|
||||
|
||||
if attempts >= max {
|
||||
logger.Debug(ctx, "transaction retrying failed", logger.F("attempts", attempts))
|
||||
|
||||
return errors.Wrapf(err, "transaction failed after %d attempts", max)
|
||||
}
|
||||
|
||||
err = WithTx(ctx, db, fn)
|
||||
if err != nil {
|
||||
if !strings.Contains(err.Error(), "(5) (SQLITE_BUSY)") {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
err = errors.WithStack(err)
|
||||
|
||||
logger.Warn(ctx, "database is busy", logger.E(err))
|
||||
|
||||
wait := time.Duration(8<<(attempts+1)) * time.Millisecond
|
||||
|
||||
logger.Debug(
|
||||
ctx, "database is busy, waiting before retrying transaction",
|
||||
logger.F("wait", wait.String()),
|
||||
logger.F("attempts", attempts),
|
||||
)
|
||||
|
||||
timer := time.NewTimer(wait)
|
||||
select {
|
||||
case <-timer.C:
|
||||
attempts++
|
||||
continue
|
||||
|
||||
case <-ctx.Done():
|
||||
if err := ctx.Err(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithTx(ctx context.Context, db *sql.DB, fn func(tx *sql.Tx) error) error {
|
||||
var tx *sql.Tx
|
||||
|
||||
|
@ -3,8 +3,10 @@ package testsuite
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"io"
|
||||
mrand "math/rand"
|
||||
"testing"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
@ -13,7 +15,6 @@ import (
|
||||
|
||||
func BenchmarkBlobStore(t *testing.B, store storage.BlobStore) {
|
||||
t.Run("BlobCreateUpdateReadDelete", func(t *testing.B) {
|
||||
|
||||
for i := 0; i < t.N; i++ {
|
||||
bucketName := fmt.Sprintf("bucket-%d", i)
|
||||
if err := runBlobCreateUpdateReadDelete(store, bucketName); err != nil {
|
||||
@ -21,6 +22,21 @@ func BenchmarkBlobStore(t *testing.B, store storage.BlobStore) {
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("BlobRandomRead", func(t *testing.B) {
|
||||
t.StopTimer()
|
||||
if err := prepareBlobStoreRandomRead(store); err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
t.ResetTimer()
|
||||
|
||||
t.StartTimer()
|
||||
for i := 0; i < t.N; i++ {
|
||||
if err := doRandomRead(store); err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func runBlobCreateUpdateReadDelete(store storage.BlobStore, bucketName string) error {
|
||||
@ -77,3 +93,115 @@ func runBlobCreateUpdateReadDelete(store storage.BlobStore, bucketName string) e
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func prepareBlobStoreRandomRead(store storage.BlobStore) error {
|
||||
ctx := context.Background()
|
||||
totalBuckets := 128
|
||||
totalBlobs := 64
|
||||
|
||||
for i := 0; i < totalBuckets; i++ {
|
||||
bucketName := fmt.Sprintf("bucket-%d", i)
|
||||
err := func(bucketName string) error {
|
||||
bucket, err := store.OpenBucket(ctx, bucketName)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := bucket.Close(); err != nil {
|
||||
panic(errors.WithStack(err))
|
||||
}
|
||||
}()
|
||||
|
||||
for j := 0; j < totalBlobs; j++ {
|
||||
blobID := storage.NewBlobID()
|
||||
err = func(blobID storage.BlobID) error {
|
||||
writer, err := bucket.NewWriter(ctx, blobID)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := writer.Close(); err != nil {
|
||||
panic(errors.WithStack(err))
|
||||
}
|
||||
}()
|
||||
|
||||
data := make([]byte, j)
|
||||
if _, err := rand.Read(data); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if _, err = writer.Write(data); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := writer.Close(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}(blobID)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}(bucketName)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func doRandomRead(store storage.BlobStore) error {
|
||||
ctx := context.Background()
|
||||
|
||||
buckets, err := store.ListBuckets(ctx)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
randBucketIndex := mrand.Int31n(int32(len(buckets)))
|
||||
bucketName := buckets[randBucketIndex]
|
||||
|
||||
bucket, err := store.OpenBucket(ctx, bucketName)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := bucket.Close(); err != nil {
|
||||
panic(errors.WithStack(err))
|
||||
}
|
||||
}()
|
||||
|
||||
blobs, err := bucket.List(ctx)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
randBlobIndex := mrand.Int31n(int32(len(blobs)))
|
||||
blobInfo := blobs[randBlobIndex]
|
||||
blobID := blobInfo.ID()
|
||||
|
||||
reader, err := bucket.NewReader(ctx, blobID)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
|
||||
if _, err = io.Copy(&buf, reader); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := reader.Close(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -122,6 +122,24 @@ var blobStoreTestCases = []blobStoreTestCase{
|
||||
panic(errors.WithStack(err))
|
||||
}
|
||||
|
||||
reader, err = bucket.NewReader(ctx, blobID)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
written64, err = io.Copy(&buf, reader)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if e, g := int64(len(data)), written64; e != g {
|
||||
return errors.Errorf("length of written data: expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
if err := reader.Close(); err != nil {
|
||||
panic(errors.WithStack(err))
|
||||
}
|
||||
|
||||
if err := bucket.Close(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
Reference in New Issue
Block a user