Compare commits

..

10 Commits

Author SHA1 Message Date
f4a7366aad feat(storage): rpc driver client pooling and memory-constrained cache
All checks were successful
arcad/edge/pipeline/head This commit looks good
driver

ref #20
2023-11-29 11:10:29 +01:00
02c74b6f8d feat(client): add loader for apps menu
All checks were successful
arcad/edge/pipeline/head This commit looks good
2023-10-25 21:27:41 +02:00
8889694125 feat(cli): add basic bundle info command
All checks were successful
arcad/edge/pipeline/head This commit looks good
2023-10-24 22:52:51 +02:00
6a99409a15 feat(blobstore): add cache driver 2023-10-24 22:52:33 +02:00
2fc590d708 feat(storage): retry sqlite failed transaction when database is busy
All checks were successful
arcad/edge/pipeline/head This commit looks good
2023-10-22 23:18:02 +02:00
6e4bf2f025 feat(storage): remap rpc errors
All checks were successful
arcad/edge/pipeline/head This commit looks good
2023-10-22 23:04:56 +02:00
22a3326be9 feat(lifecycle): execute onInit func asynchronously
All checks were successful
arcad/edge/pipeline/head This commit looks good
2023-10-22 10:47:44 +02:00
0cfb132b65 feat(lifecycle-module): add debug message for onInit() execution
All checks were successful
arcad/edge/pipeline/head This commit looks good
2023-10-21 21:46:51 +02:00
de4ab0d02c fix(bus): prevent double close in event dispatcher
All checks were successful
arcad/edge/pipeline/head This commit looks good
2023-10-21 21:38:34 +02:00
d1458bab4a ci: use go 1.21.2
All checks were successful
arcad/edge/pipeline/head This commit looks good
2023-10-20 11:01:32 +02:00
37 changed files with 1057 additions and 131 deletions

View File

@ -1,4 +1,4 @@
RUN_APP_ARGS="" RUN_APP_ARGS=""
#EDGE_DOCUMENTSTORE_DSN="rpc://localhost:3001/documentstore?tenant=local&appId=%APPID%" #EDGE_DOCUMENTSTORE_DSN="rpc://localhost:3001/documentstore?tenant=local&appId=%APPID%"
#EDGE_BLOBSTORE_DSN="rpc://localhost:3001/blobstore?tenant=local&appId=%APPID%" #EDGE_BLOBSTORE_DSN="cache://localhost:3001/blobstore?driver=rpc&tenant=local&appId=%APPID%"
#EDGE_SHARESTORE_DSN="rpc://localhost:3001/sharestore?tenant=local" #EDGE_SHARESTORE_DSN="rpc://localhost:3001/sharestore?tenant=local"

View File

@ -0,0 +1,56 @@
package app
import (
"os"
"forge.cadoles.com/arcad/edge/pkg/app"
"forge.cadoles.com/arcad/edge/pkg/bundle"
"github.com/pkg/errors"
"github.com/urfave/cli/v2"
"gopkg.in/yaml.v2"
)
func InfoCommand() *cli.Command {
return &cli.Command{
Name: "info",
Usage: "Print app manifest informations",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "path",
Usage: "use `PATH` as app bundle (zip, zim or directory bundle)",
Aliases: []string{"p"},
Value: "",
Required: true,
},
},
Action: func(ctx *cli.Context) error {
appPath := ctx.String("path")
bundle, err := bundle.FromPath(appPath)
if err != nil {
return errors.Wrap(err, "could not load app bundle")
}
manifest, err := app.LoadManifest(bundle)
if err != nil {
return errors.Wrap(err, "could not load app manifest")
}
if valid, err := manifest.Validate(manifestMetadataValidators...); !valid {
return errors.Wrap(err, "invalid app manifest")
}
encoder := yaml.NewEncoder(os.Stdout)
if err := encoder.Encode(manifest); err != nil {
return errors.Wrap(err, "could not encode manifest")
}
if err := encoder.Close(); err != nil {
return errors.WithStack(err)
}
return nil
},
}
}

View File

@ -12,6 +12,7 @@ func Root() *cli.Command {
RunCommand(), RunCommand(),
PackageCommand(), PackageCommand(),
HashPasswordCommand(), HashPasswordCommand(),
InfoCommand(),
}, },
} }
} }

View File

@ -44,10 +44,13 @@ import (
_ "forge.cadoles.com/arcad/edge/pkg/module/auth/http/passwd/argon2id" _ "forge.cadoles.com/arcad/edge/pkg/module/auth/http/passwd/argon2id"
_ "forge.cadoles.com/arcad/edge/pkg/module/auth/http/passwd/plain" _ "forge.cadoles.com/arcad/edge/pkg/module/auth/http/passwd/plain"
// Register storage drivers
"forge.cadoles.com/arcad/edge/pkg/storage/driver" "forge.cadoles.com/arcad/edge/pkg/storage/driver"
// Register storage drivers
_ "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache"
_ "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc" _ "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc"
_ "forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite" _ "forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite"
"forge.cadoles.com/arcad/edge/pkg/storage/share" "forge.cadoles.com/arcad/edge/pkg/storage/share"
) )

View File

@ -22,13 +22,15 @@ import (
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
// Register storage drivers // Register storage drivers
_ "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache"
_ "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc"
_ "forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite"
"forge.cadoles.com/arcad/edge/cmd/storage-server/command/flag" "forge.cadoles.com/arcad/edge/cmd/storage-server/command/flag"
"forge.cadoles.com/arcad/edge/pkg/jwtutil" "forge.cadoles.com/arcad/edge/pkg/jwtutil"
"forge.cadoles.com/arcad/edge/pkg/storage" "forge.cadoles.com/arcad/edge/pkg/storage"
"forge.cadoles.com/arcad/edge/pkg/storage/driver" "forge.cadoles.com/arcad/edge/pkg/storage/driver"
_ "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc"
"forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server" "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server"
_ "forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite"
"forge.cadoles.com/arcad/edge/pkg/storage/share" "forge.cadoles.com/arcad/edge/pkg/storage/share"
) )
@ -51,17 +53,17 @@ func Run() *cli.Command {
&cli.StringFlag{ &cli.StringFlag{
Name: "blobstore-dsn-pattern", Name: "blobstore-dsn-pattern",
EnvVars: []string{"STORAGE_SERVER_BLOBSTORE_DSN_PATTERN"}, EnvVars: []string{"STORAGE_SERVER_BLOBSTORE_DSN_PATTERN"},
Value: fmt.Sprintf("sqlite://data/%%TENANT%%/%%APPID%%/blobstore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", (60 * time.Second).Milliseconds()), Value: fmt.Sprintf("sqlite://data/%%TENANT%%/%%APPID%%/blobstore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d&_pragma=journal_mode=wal", (60 * time.Second).Milliseconds()),
}, },
&cli.StringFlag{ &cli.StringFlag{
Name: "documentstore-dsn-pattern", Name: "documentstore-dsn-pattern",
EnvVars: []string{"STORAGE_SERVER_DOCUMENTSTORE_DSN_PATTERN"}, EnvVars: []string{"STORAGE_SERVER_DOCUMENTSTORE_DSN_PATTERN"},
Value: fmt.Sprintf("sqlite://data/%%TENANT%%/%%APPID%%/documentstore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", (60 * time.Second).Milliseconds()), Value: fmt.Sprintf("sqlite://data/%%TENANT%%/%%APPID%%/documentstore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d&_pragma=journal_mode=wal", (60 * time.Second).Milliseconds()),
}, },
&cli.StringFlag{ &cli.StringFlag{
Name: "sharestore-dsn-pattern", Name: "sharestore-dsn-pattern",
EnvVars: []string{"STORAGE_SERVER_SHARESTORE_DSN_PATTERN"}, EnvVars: []string{"STORAGE_SERVER_SHARESTORE_DSN_PATTERN"},
Value: fmt.Sprintf("sqlite://data/%%TENANT%%/sharestore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", (60 * time.Second).Milliseconds()), Value: fmt.Sprintf("sqlite://data/%%TENANT%%/sharestore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d&_pragma=journal_mode=wal", (60 * time.Second).Milliseconds()),
}, },
&cli.StringFlag{ &cli.StringFlag{
Name: "sentry-dsn", Name: "sentry-dsn",

3
go.mod
View File

@ -15,11 +15,13 @@ require (
require ( require (
cloud.google.com/go v0.75.0 // indirect cloud.google.com/go v0.75.0 // indirect
github.com/allegro/bigcache/v3 v3.1.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
github.com/go-playground/locales v0.14.0 // indirect github.com/go-playground/locales v0.14.0 // indirect
github.com/go-playground/universal-translator v0.18.0 // indirect github.com/go-playground/universal-translator v0.18.0 // indirect
github.com/goccy/go-json v0.9.11 // indirect github.com/goccy/go-json v0.9.11 // indirect
github.com/gogo/protobuf v0.0.0-20161014173244-50d1bd39ce4e // indirect github.com/gogo/protobuf v0.0.0-20161014173244-50d1bd39ce4e // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/leodido/go-urn v1.2.1 // indirect github.com/leodido/go-urn v1.2.1 // indirect
github.com/lestrrat-go/blackmagic v1.0.1 // indirect github.com/lestrrat-go/blackmagic v1.0.1 // indirect
github.com/lestrrat-go/httpcc v1.0.1 // indirect github.com/lestrrat-go/httpcc v1.0.1 // indirect
@ -27,6 +29,7 @@ require (
github.com/lestrrat-go/iter v1.0.2 // indirect github.com/lestrrat-go/iter v1.0.2 // indirect
github.com/lestrrat-go/option v1.0.0 // indirect github.com/lestrrat-go/option v1.0.0 // indirect
github.com/miekg/dns v1.1.53 // indirect github.com/miekg/dns v1.1.53 // indirect
golang.org/x/sync v0.1.0 // indirect
google.golang.org/genproto v0.0.0-20210226172003-ab064af71705 // indirect google.golang.org/genproto v0.0.0-20210226172003-ab064af71705 // indirect
google.golang.org/grpc v1.35.0 // indirect google.golang.org/grpc v1.35.0 // indirect
gopkg.in/go-playground/validator.v9 v9.29.1 // indirect gopkg.in/go-playground/validator.v9 v9.29.1 // indirect

4
go.sum
View File

@ -53,6 +53,8 @@ github.com/alecthomas/kong v0.2.1-0.20190708041108-0548c6b1afae/go.mod h1:+inYUS
github.com/alecthomas/kong-hcl v0.1.8-0.20190615233001-b21fea9723c8/go.mod h1:MRgZdU3vrFd05IQ89AxUZ0aYdF39BYoNFa324SodPCA= github.com/alecthomas/kong-hcl v0.1.8-0.20190615233001-b21fea9723c8/go.mod h1:MRgZdU3vrFd05IQ89AxUZ0aYdF39BYoNFa324SodPCA=
github.com/alecthomas/repr v0.0.0-20180818092828-117648cd9897 h1:p9Sln00KOTlrYkxI1zYWl1QLnEqAqEARBEYa8FQnQcY= github.com/alecthomas/repr v0.0.0-20180818092828-117648cd9897 h1:p9Sln00KOTlrYkxI1zYWl1QLnEqAqEARBEYa8FQnQcY=
github.com/alecthomas/repr v0.0.0-20180818092828-117648cd9897/go.mod h1:xTS7Pm1pD1mvyM075QCDSRqH6qRLXylzS24ZTpRiSzQ= github.com/alecthomas/repr v0.0.0-20180818092828-117648cd9897/go.mod h1:xTS7Pm1pD1mvyM075QCDSRqH6qRLXylzS24ZTpRiSzQ=
github.com/allegro/bigcache/v3 v3.1.0 h1:H2Vp8VOvxcrB91o86fUSVJFqeuz8kpyyB02eH3bSzwk=
github.com/allegro/bigcache/v3 v3.1.0/go.mod h1:aPyh7jEvrog9zAwx5N7+JUQX5dZTSGpxF1LAR4dr35I=
github.com/barnybug/go-cast v0.0.0-20201201064555-a87ccbc26692 h1:JW4WZlqyaNWUUahfr7MigeDW6jmtam5cTzzo1lwsFhE= github.com/barnybug/go-cast v0.0.0-20201201064555-a87ccbc26692 h1:JW4WZlqyaNWUUahfr7MigeDW6jmtam5cTzzo1lwsFhE=
github.com/barnybug/go-cast v0.0.0-20201201064555-a87ccbc26692/go.mod h1:Au0ipPuCBA7zsOC61SnyrYetm8VT3vo1UJtwHeYke44= github.com/barnybug/go-cast v0.0.0-20201201064555-a87ccbc26692/go.mod h1:Au0ipPuCBA7zsOC61SnyrYetm8VT3vo1UJtwHeYke44=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
@ -206,6 +208,8 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/igm/sockjs-go/v3 v3.0.2 h1:2m0k53w0DBiGozeQUIEPR6snZFmpFpYvVsGnfLPNXbE= github.com/igm/sockjs-go/v3 v3.0.2 h1:2m0k53w0DBiGozeQUIEPR6snZFmpFpYvVsGnfLPNXbE=
github.com/igm/sockjs-go/v3 v3.0.2/go.mod h1:UqchsOjeagIBFHvd+RZpLaVRbCwGilEC08EDHsD1jYE= github.com/igm/sockjs-go/v3 v3.0.2/go.mod h1:UqchsOjeagIBFHvd+RZpLaVRbCwGilEC08EDHsD1jYE=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=

View File

@ -4,7 +4,7 @@ ARG HTTP_PROXY=
ARG HTTPS_PROXY= ARG HTTPS_PROXY=
ARG http_proxy= ARG http_proxy=
ARG https_proxy= ARG https_proxy=
ARG GO_VERSION=1.20.2 ARG GO_VERSION=1.21.2
# Install dev environment dependencies # Install dev environment dependencies
RUN export DEBIAN_FRONTEND=noninteractive &&\ RUN export DEBIAN_FRONTEND=noninteractive &&\

View File

@ -83,8 +83,12 @@ func (d *eventDispatcher) Close() {
} }
func (d *eventDispatcher) close() { func (d *eventDispatcher) close() {
d.closed = true if d.closed {
return
}
close(d.in) close(d.in)
d.closed = true
} }
func (d *eventDispatcher) In(msg bus.Message) (err error) { func (d *eventDispatcher) In(msg bus.Message) (err error) {

View File

@ -19,29 +19,16 @@ func (m *LifecycleModule) Export(export *goja.Object) {
} }
func (m *LifecycleModule) OnInit(ctx context.Context, rt *goja.Runtime) (err error) { func (m *LifecycleModule) OnInit(ctx context.Context, rt *goja.Runtime) (err error) {
call, ok := goja.AssertFunction(rt.Get("onInit")) _, ok := goja.AssertFunction(rt.Get("onInit"))
if !ok { if !ok {
logger.Warn(ctx, "could not find onInit() function") logger.Warn(ctx, "could not find onInit() function")
return nil return nil
} }
defer func() { if _, err := rt.RunString("setTimeout(onInit, 0)"); err != nil {
if recovered := recover(); recovered != nil { return errors.WithStack(err)
revoveredErr, ok := recovered.(error) }
if ok {
logger.Error(ctx, "recovered runtime error", logger.CapturedE(errors.WithStack(revoveredErr)))
err = errors.WithStack(app.ErrUnknownError)
return
}
panic(recovered)
}
}()
call(nil)
return nil return nil
} }

View File

@ -115,7 +115,7 @@ func (m *Module) handleClientMessages() {
case msg := <-clientMessages: case msg := <-clientMessages:
clientMessage, ok := msg.(*module.ClientMessage) clientMessage, ok := msg.(*module.ClientMessage)
if !ok { if !ok {
logger.Error( logger.Warn(
ctx, ctx,
"unexpected message type", "unexpected message type",
logger.F("message", msg), logger.F("message", msg),

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -5,5 +5,6 @@ import LoginIcon from './login.svg';
import HomeIcon from './home.svg'; import HomeIcon from './home.svg';
import LinkIcon from './link.svg'; import LinkIcon from './link.svg';
import LogoutIcon from './logout.svg'; import LogoutIcon from './logout.svg';
import LoaderIcon from './loader.svg';
export { UserCircleIcon, MenuIcon, CloudIcon, LoginIcon, HomeIcon, LinkIcon, LogoutIcon } export { LoaderIcon, UserCircleIcon, MenuIcon, CloudIcon, LoginIcon, HomeIcon, LinkIcon, LogoutIcon }

View File

@ -0,0 +1 @@
<svg viewBox="0 0 24 24" xmlns="http://www.w3.org/2000/svg" fill="#000000"><g id="SVGRepo_bgCarrier" stroke-width="0"></g><g id="SVGRepo_tracerCarrier" stroke-linecap="round" stroke-linejoin="round"></g><g id="SVGRepo_iconCarrier"> <g> <path fill="none" d="M0 0h24v24H0z"></path> <path d="M12 2a1 1 0 0 1 1 1v3a1 1 0 0 1-2 0V3a1 1 0 0 1 1-1zm0 15a1 1 0 0 1 1 1v3a1 1 0 0 1-2 0v-3a1 1 0 0 1 1-1zm8.66-10a1 1 0 0 1-.366 1.366l-2.598 1.5a1 1 0 1 1-1-1.732l2.598-1.5A1 1 0 0 1 20.66 7zM7.67 14.5a1 1 0 0 1-.366 1.366l-2.598 1.5a1 1 0 1 1-1-1.732l2.598-1.5a1 1 0 0 1 1.366.366zM20.66 17a1 1 0 0 1-1.366.366l-2.598-1.5a1 1 0 0 1 1-1.732l2.598 1.5A1 1 0 0 1 20.66 17zM7.67 9.5a1 1 0 0 1-1.366.366l-2.598-1.5a1 1 0 1 1 1-1.732l2.598 1.5A1 1 0 0 1 7.67 9.5z"></path> </g> </g></svg>

After

Width:  |  Height:  |  Size: 773 B

View File

@ -0,0 +1,57 @@
import { LitElement, html, css } from 'lit';
import { LoaderIcon } from './icons';
export class Loader extends LitElement {
static styles = css`
:host {
display: inline-block;
height: 100%;
width: 100%;
border-bottom: 1px solid rgb(229,231,235);
border-top: 10px solid transparent;
background-color: #fff;
min-height: 50px;
padding: 10px 0;
}
.container {
display: flex;
align-items: center;
flex-direction: column;
justify-content: center;
font-family: Arial, Helvetica Neue, Helvetica, sans-serif;
font-size: 14px;
color: black;
}
.icon {
height: 35px;
animation-duration: 3s;
animation-name: spin;
animation-iteration-count: infinite;
}
@keyframes spin {
from {
transform: rotateZ(0deg);
}
to {
transform: rotateZ(360deg);
}
}
`;
constructor() {
super();
}
render() {
return html`
<div class="container">
<img class="icon" src="${LoaderIcon}" />
Chargement en cours
</div>
`
}
}

View File

@ -49,6 +49,9 @@ export class Menu extends LitElement {
@property() @property()
_profile: Profile _profile: Profile
@property()
_loading: boolean = false
static styles = css` static styles = css`
:host { :host {
position: fixed; position: fixed;
@ -95,6 +98,7 @@ export class Menu extends LitElement {
} }
_fetchApps() { _fetchApps() {
this._loading = true;
return fetch(`${BASE_API_URL}/apps`) return fetch(`${BASE_API_URL}/apps`)
.then(res => res.json()) .then(res => res.json())
.then(result => { .then(result => {
@ -130,9 +134,14 @@ export class Menu extends LitElement {
return Promise.all(promises); return Promise.all(promises);
}) })
.then((manifests: Manifest[]) => { .then((manifests: Manifest[]) => {
this._loading = false
this._apps = manifests; this._apps = manifests;
}) })
.catch(err => console.error(err)) .catch(err => {
console.error(err);
this._loading = false;
})
} }
_fetchProfile() { _fetchProfile() {
@ -158,19 +167,24 @@ export class Menu extends LitElement {
} }
_renderApps() { _renderApps() {
const apps = this._apps let apps;
.filter(manifest => this._canAccess(manifest)) if (this._loading) {
.map(manifest => { apps = [ html`<edge-loader></edge-loader>` ]
const iconUrl = ( ( manifest.url || '') + ( manifest.metadata?.paths?.icon || '' ) ) || LinkIcon; } else {
return html` apps = this._apps
<edge-menu-sub-item .filter(manifest => this._canAccess(manifest))
name='${ manifest.id }' .map(manifest => {
label='${ manifest.title }' const iconUrl = ( ( manifest.url || '') + ( manifest.metadata?.paths?.icon || '' ) ) || LinkIcon;
icon-url='${ iconUrl }' return html`
link-url='${ manifest.url || '#' }'> <edge-menu-sub-item
</edge-menu-sub-item> name='${ manifest.id }'
` label='${ manifest.title }'
}); icon-url='${ iconUrl }'
link-url='${ manifest.url || '#' }'>
</edge-menu-sub-item>
`
});
}
return html` return html`
<edge-menu-item name='apps' label='Apps' icon-url='${CloudIcon}'> <edge-menu-item name='apps' label='Apps' icon-url='${CloudIcon}'>

View File

@ -6,10 +6,12 @@ import { MenuItem as MenuItemElement } from './components/menu-item.js';
import { MenuSubItem as MenuSubItemElement } from './components/menu-sub-item.js'; import { MenuSubItem as MenuSubItemElement } from './components/menu-sub-item.js';
import { CrossFrameMessenger } from './crossframe-messenger.js'; import { CrossFrameMessenger } from './crossframe-messenger.js';
import { MenuManager } from './menu-manager.js'; import { MenuManager } from './menu-manager.js';
import { Loader } from './components/loader';
customElements.define('edge-menu', MenuElement); customElements.define('edge-menu', MenuElement);
customElements.define('edge-menu-item', MenuItemElement); customElements.define('edge-menu-item', MenuItemElement);
customElements.define('edge-menu-sub-item', MenuSubItemElement); customElements.define('edge-menu-sub-item', MenuSubItemElement);
customElements.define('edge-loader', Loader);
export const Client = new EdgeClient(); export const Client = new EdgeClient();
export const Frame = new CrossFrameMessenger(); export const Frame = new CrossFrameMessenger();

View File

@ -23,7 +23,7 @@ func NewBlobStore(dsn string) (storage.BlobStore, error) {
factory, exists := blobStoreFactories[url.Scheme] factory, exists := blobStoreFactories[url.Scheme]
if !exists { if !exists {
return nil, errors.WithStack(ErrSchemeNotRegistered) return nil, errors.Wrapf(ErrSchemeNotRegistered, "no driver associated with scheme '%s'", url.Scheme)
} }
store, err := factory(url) store, err := factory(url)

121
pkg/storage/driver/cache/blob_bucket.go vendored Normal file
View File

@ -0,0 +1,121 @@
package cache
import (
"context"
"fmt"
"io"
"forge.cadoles.com/arcad/edge/pkg/storage"
"github.com/allegro/bigcache/v3"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
type BlobBucket struct {
bucket storage.BlobBucket
cache *bigcache.BigCache
}
// Close implements storage.BlobBucket.
func (b *BlobBucket) Close() error {
if err := b.bucket.Close(); err != nil {
return errors.WithStack(err)
}
return nil
}
// Delete implements storage.BlobBucket.
func (b *BlobBucket) Delete(ctx context.Context, id storage.BlobID) error {
if err := b.bucket.Delete(ctx, id); err != nil {
return errors.WithStack(err)
}
return nil
}
// Get implements storage.BlobBucket.
func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobInfo, error) {
info, err := b.bucket.Get(ctx, id)
if err != nil {
return nil, errors.WithStack(err)
}
return info, nil
}
// List implements storage.BlobBucket.
func (b *BlobBucket) List(ctx context.Context) ([]storage.BlobInfo, error) {
infos, err := b.bucket.List(ctx)
if err != nil {
return nil, errors.WithStack(err)
}
return infos, nil
}
// Name implements storage.BlobBucket.
func (b *BlobBucket) Name() string {
return b.bucket.Name()
}
// NewReader implements storage.BlobBucket.
func (b *BlobBucket) NewReader(ctx context.Context, id storage.BlobID) (io.ReadSeekCloser, error) {
if cached, exist := b.inCache(id); exist {
logger.Debug(ctx, "found blob in cache", logger.F("cacheKey", b.getCacheKey(id)), logger.F("cacheStats", b.cache.Stats()))
return cached, nil
}
reader, err := b.bucket.NewReader(ctx, id)
if err != nil {
return nil, errors.WithStack(err)
}
return &readCacher{
reader: reader,
cache: b.cache,
key: b.getCacheKey(id),
}, nil
}
func (b *BlobBucket) getCacheKey(id storage.BlobID) string {
return fmt.Sprintf("%s-%s", b.Name(), id)
}
func (b *BlobBucket) inCache(id storage.BlobID) (io.ReadSeekCloser, bool) {
key := b.getCacheKey(id)
data, err := b.cache.Get(key)
if err != nil {
if errors.Is(err, bigcache.ErrEntryNotFound) {
return nil, false
}
logger.Error(context.Background(), "could not retrieve cache value", logger.CapturedE(errors.WithStack(err)))
return nil, false
}
return &cachedReader{data, 0}, true
}
// NewWriter implements storage.BlobBucket.
func (b *BlobBucket) NewWriter(ctx context.Context, id storage.BlobID) (io.WriteCloser, error) {
writer, err := b.bucket.NewWriter(ctx, id)
if err != nil {
return nil, errors.WithStack(err)
}
return writer, nil
}
// Size implements storage.BlobBucket.
func (b *BlobBucket) Size(ctx context.Context) (int64, error) {
size, err := b.bucket.Size(ctx)
if err != nil {
return 0, errors.WithStack(err)
}
return size, nil
}
var _ storage.BlobBucket = &BlobBucket{}

55
pkg/storage/driver/cache/blob_store.go vendored Normal file
View File

@ -0,0 +1,55 @@
package cache
import (
"context"
"forge.cadoles.com/arcad/edge/pkg/storage"
"github.com/allegro/bigcache/v3"
"github.com/pkg/errors"
)
type BlobStore struct {
store storage.BlobStore
cache *bigcache.BigCache
}
// DeleteBucket implements storage.BlobStore.
func (s *BlobStore) DeleteBucket(ctx context.Context, name string) error {
if err := s.store.DeleteBucket(ctx, name); err != nil {
return errors.WithStack(err)
}
return nil
}
// ListBuckets implements storage.BlobStore.
func (s *BlobStore) ListBuckets(ctx context.Context) ([]string, error) {
buckets, err := s.store.ListBuckets(ctx)
if err != nil {
return nil, errors.WithStack(err)
}
return buckets, nil
}
// OpenBucket implements storage.BlobStore.
func (s *BlobStore) OpenBucket(ctx context.Context, name string) (storage.BlobBucket, error) {
bucket, err := s.store.OpenBucket(ctx, name)
if err != nil {
return nil, errors.WithStack(err)
}
return &BlobBucket{
bucket: bucket,
cache: s.cache,
}, nil
}
func NewBlobStore(store storage.BlobStore, cache *bigcache.BigCache) *BlobStore {
return &BlobStore{
store: store,
cache: cache,
}
}
var _ storage.BlobStore = &BlobStore{}

View File

@ -0,0 +1,63 @@
package cache
import (
"context"
"fmt"
"os"
"testing"
"time"
"forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite"
"forge.cadoles.com/arcad/edge/pkg/storage/testsuite"
"github.com/allegro/bigcache/v3"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
func TestBlobStore(t *testing.T) {
t.Parallel()
if testing.Verbose() {
logger.SetLevel(logger.LevelDebug)
}
file := "./testdata/blobstore_test.sqlite"
if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) {
t.Fatalf("%+v", errors.WithStack(err))
}
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
backend := sqlite.NewBlobStore(dsn)
cache, err := bigcache.New(context.Background(), bigcache.DefaultConfig(time.Minute))
if err != nil {
t.Fatalf("%+v", errors.WithStack(err))
}
store := NewBlobStore(backend, cache)
testsuite.TestBlobStore(context.Background(), t, store)
}
func BenchmarkBlobStore(t *testing.B) {
logger.SetLevel(logger.LevelError)
file := "./testdata/blobstore_test.sqlite"
if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) {
t.Fatalf("%+v", errors.WithStack(err))
}
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
backend := sqlite.NewBlobStore(dsn)
cache, err := bigcache.New(context.Background(), bigcache.DefaultConfig(time.Minute))
if err != nil {
t.Fatalf("%+v", errors.WithStack(err))
}
store := NewBlobStore(backend, cache)
testsuite.BenchmarkBlobStore(t, store)
}

105
pkg/storage/driver/cache/driver.go vendored Normal file
View File

@ -0,0 +1,105 @@
package cache
import (
"context"
"fmt"
"net/url"
"strconv"
"time"
"forge.cadoles.com/arcad/edge/pkg/storage"
"forge.cadoles.com/arcad/edge/pkg/storage/driver"
"github.com/allegro/bigcache/v3"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
func init() {
driver.RegisterBlobStoreFactory("cache", blobStoreFactory)
}
func blobStoreFactory(dsn *url.URL) (storage.BlobStore, error) {
query := dsn.Query()
rawDriver := query.Get("driver")
if rawDriver == "" {
return nil, errors.New("missing required url parameter 'driver'")
}
query.Del("driver")
cacheTTL := time.Minute * 60
rawCacheTTL := query.Get("cacheTTL")
if rawCacheTTL != "" {
query.Del("cacheTTL")
ttl, err := time.ParseDuration(rawCacheTTL)
if err != nil {
return nil, errors.Wrap(err, "could not parse url parameter 'cacheTTL'")
}
cacheTTL = ttl
}
cacheConfig := bigcache.DefaultConfig(cacheTTL)
cacheConfig.Logger = &cacheLogger{}
rawCacheShards := query.Get("cacheShards")
if rawCacheShards != "" {
query.Del("cacheShards")
cacheShards, err := strconv.ParseInt(rawCacheShards, 10, 32)
if err != nil {
return nil, errors.Wrap(err, "could not parse url parameter 'cacheShards'")
}
cacheConfig.Shards = int(cacheShards)
}
rawMaxCacheSize := query.Get("maxCacheSize")
if rawMaxCacheSize != "" {
query.Del("maxCacheSize")
maxCacheSize, err := strconv.ParseInt(rawMaxCacheSize, 10, 32)
if err != nil {
return nil, errors.Wrap(err, "could not parse url parameter 'maxCacheSize'")
}
// See cacheConfig.HardMaxCacheSize documentation
var minCacheSize int64 = (2 * (64 + 32) * int64(cacheConfig.Shards)) / 1000
if maxCacheSize < minCacheSize {
return nil, errors.Errorf("max cache size can not be set to a value below '%d'", minCacheSize)
}
cacheConfig.HardMaxCacheSize = int(maxCacheSize)
}
url := &url.URL{
Scheme: rawDriver,
Host: dsn.Host,
Path: dsn.Path,
RawQuery: query.Encode(),
}
store, err := driver.NewBlobStore(url.String())
if err != nil {
return nil, errors.WithStack(err)
}
cache, err := bigcache.New(context.Background(), cacheConfig)
if err != nil {
return nil, errors.WithStack(err)
}
return NewBlobStore(store, cache), nil
}
type cacheLogger struct{}
func (l *cacheLogger) Printf(format string, v ...interface{}) {
logger.Debug(context.Background(), fmt.Sprintf(format, v...))
}
var _ bigcache.Logger = &cacheLogger{}

117
pkg/storage/driver/cache/reader.go vendored Normal file
View File

@ -0,0 +1,117 @@
package cache
import (
"context"
"fmt"
"io"
"github.com/allegro/bigcache/v3"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
type readCacher struct {
reader io.ReadSeekCloser
cache *bigcache.BigCache
key string
}
// Close implements io.ReadSeekCloser.
func (r *readCacher) Close() error {
if err := r.reader.Close(); err != nil {
return errors.WithStack(err)
}
return nil
}
// Read implements io.ReadSeekCloser.
func (r *readCacher) Read(p []byte) (n int, err error) {
length, err := r.reader.Read(p)
if err != nil {
if err == io.EOF {
return length, io.EOF
}
return length, errors.WithStack(err)
}
if length > 0 {
if err := r.cache.Append(r.key, p[:length]); err != nil {
ctx := logger.With(context.Background(), logger.F("cacheKey", r.key))
logger.Error(ctx, "could not write to buffer", logger.CapturedE(errors.WithStack(err)))
if err := r.cache.Delete(r.key); err != nil {
logger.Error(ctx, "could not delete cache key", logger.CapturedE(errors.WithStack(err)))
}
}
}
return length, nil
}
// Seek implements io.ReadSeekCloser.
func (r *readCacher) Seek(offset int64, whence int) (int64, error) {
length, err := r.reader.Seek(offset, whence)
if err != nil {
return length, errors.WithStack(err)
}
return length, nil
}
var _ io.ReadSeekCloser = &readCacher{}
type cachedReader struct {
buffer []byte
offset int64
}
// Read implements io.ReadSeekCloser.
func (r *cachedReader) Read(p []byte) (n int, err error) {
available := len(r.buffer) - int(r.offset)
if available == 0 {
return 0, io.EOF
}
size := len(p)
if size > available {
size = available
}
copy(p, r.buffer[r.offset:r.offset+int64(size)])
r.offset += int64(size)
return size, nil
}
// Close implements io.ReadSeekCloser.
func (r *cachedReader) Close() error {
return nil
}
// Seek implements io.ReadSeekCloser.
func (r *cachedReader) Seek(offset int64, whence int) (int64, error) {
var newOffset int64
switch whence {
case io.SeekStart:
newOffset = offset
case io.SeekCurrent:
newOffset = r.offset + offset
case io.SeekEnd:
newOffset = int64(len(r.buffer)) + offset
default:
return 0, errors.Errorf("unknown seek whence '%d'", whence)
}
if newOffset > int64(len(r.buffer)) || newOffset < 0 {
return 0, fmt.Errorf("invalid offset %d", offset)
}
r.offset = newOffset
return newOffset, nil
}
var _ io.ReadSeekCloser = &cachedReader{}

View File

@ -0,0 +1,2 @@
*
!.gitignore

View File

@ -5,7 +5,6 @@ import (
"net/url" "net/url"
"github.com/keegancsmith/rpc" "github.com/keegancsmith/rpc"
"gitlab.com/wpetit/goweb/logger"
"forge.cadoles.com/arcad/edge/pkg/storage" "forge.cadoles.com/arcad/edge/pkg/storage"
"forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server/blob" "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server/blob"
@ -13,7 +12,7 @@ import (
) )
type BlobStore struct { type BlobStore struct {
serverURL *url.URL withClient WithClientFunc
} }
// DeleteBucket implements storage.BlobStore. // DeleteBucket implements storage.BlobStore.
@ -63,7 +62,7 @@ func (s *BlobStore) OpenBucket(ctx context.Context, name string) (storage.BlobBu
func (s *BlobStore) call(ctx context.Context, serviceMethod string, args any, reply any) error { func (s *BlobStore) call(ctx context.Context, serviceMethod string, args any, reply any) error {
err := s.withClient(ctx, func(ctx context.Context, client *rpc.Client) error { err := s.withClient(ctx, func(ctx context.Context, client *rpc.Client) error {
if err := client.Call(ctx, serviceMethod, args, reply); err != nil { if err := client.Call(ctx, serviceMethod, args, reply); err != nil {
return errors.WithStack(err) return errors.WithStack(remapBlobError(err))
} }
return nil return nil
@ -75,27 +74,11 @@ func (s *BlobStore) call(ctx context.Context, serviceMethod string, args any, re
return nil return nil
} }
func (s *BlobStore) withClient(ctx context.Context, fn func(ctx context.Context, client *rpc.Client) error) error {
client, err := rpc.DialHTTPPath("tcp", s.serverURL.Host, s.serverURL.Path+"?"+s.serverURL.RawQuery)
if err != nil {
return errors.WithStack(err)
}
defer func() {
if err := client.Close(); err != nil {
logger.Error(ctx, "could not close rpc client", logger.CapturedE(errors.WithStack(err)))
}
}()
if err := fn(ctx, client); err != nil {
return errors.WithStack(err)
}
return nil
}
func NewBlobStore(serverURL *url.URL) *BlobStore { func NewBlobStore(serverURL *url.URL) *BlobStore {
return &BlobStore{serverURL} withClient := WithPooledClient(serverURL)
return &BlobStore{
withClient: withClient,
}
} }
var _ storage.BlobStore = &BlobStore{} var _ storage.BlobStore = &BlobStore{}

View File

@ -0,0 +1,94 @@
package client
import (
"context"
"net/url"
"strconv"
"sync"
"github.com/jackc/puddle/v2"
"github.com/keegancsmith/rpc"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
func NewClientPool(serverURL *url.URL, poolSize int) (*puddle.Pool[*rpc.Client], error) {
constructor := func(context.Context) (*rpc.Client, error) {
client, err := rpc.DialHTTPPath("tcp", serverURL.Host, serverURL.Path+"?"+serverURL.RawQuery)
if err != nil {
return nil, errors.WithStack(err)
}
return client, nil
}
destructor := func(client *rpc.Client) {
if err := client.Close(); err != nil {
logger.Error(context.Background(), "could not close client", logger.CapturedE(errors.WithStack(err)))
}
}
maxPoolSize := int32(poolSize)
pool, err := puddle.NewPool(&puddle.Config[*rpc.Client]{Constructor: constructor, Destructor: destructor, MaxSize: maxPoolSize})
if err != nil {
return nil, errors.WithStack(err)
}
return pool, nil
}
type WithClientFunc func(ctx context.Context, fn func(ctx context.Context, client *rpc.Client) error) error
func WithPooledClient(serverURL *url.URL) WithClientFunc {
var (
pool *puddle.Pool[*rpc.Client]
createPool sync.Once
)
return func(ctx context.Context, fn func(context.Context, *rpc.Client) error) error {
var err error
createPool.Do(func() {
rawPoolSize := serverURL.Query().Get("clientPoolSize")
if rawPoolSize == "" {
rawPoolSize = "5"
}
var poolSize int64
poolSize, err = strconv.ParseInt(rawPoolSize, 10, 32)
if err != nil {
err = errors.Wrap(err, "could not parse clientPoolSize url query parameter")
return
}
pool, err = NewClientPool(serverURL, int(poolSize))
if err != nil {
err = errors.WithStack(err)
return
}
})
if err != nil {
return errors.WithStack(err)
}
clientResource, err := pool.Acquire(ctx)
if err != nil {
return errors.WithStack(err)
}
if err := fn(ctx, clientResource.Value()); err != nil {
if errors.Is(err, rpc.ErrShutdown) {
clientResource.Destroy()
}
return errors.WithStack(err)
}
clientResource.Release()
return nil
}
}

View File

@ -6,7 +6,6 @@ import (
"github.com/keegancsmith/rpc" "github.com/keegancsmith/rpc"
"github.com/pkg/errors" "github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
"forge.cadoles.com/arcad/edge/pkg/storage" "forge.cadoles.com/arcad/edge/pkg/storage"
"forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server/document" "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server/document"
@ -14,7 +13,7 @@ import (
) )
type DocumentStore struct { type DocumentStore struct {
serverURL *url.URL withClient WithClientFunc
} }
// Delete implements storage.DocumentStore. // Delete implements storage.DocumentStore.
@ -96,7 +95,7 @@ func (s *DocumentStore) Upsert(ctx context.Context, collection string, doc stora
func (s *DocumentStore) call(ctx context.Context, serviceMethod string, args any, reply any) error { func (s *DocumentStore) call(ctx context.Context, serviceMethod string, args any, reply any) error {
err := s.withClient(ctx, func(ctx context.Context, client *rpc.Client) error { err := s.withClient(ctx, func(ctx context.Context, client *rpc.Client) error {
if err := client.Call(ctx, serviceMethod, args, reply); err != nil { if err := client.Call(ctx, serviceMethod, args, reply); err != nil {
return errors.WithStack(err) return errors.WithStack(remapDocumentError(err))
} }
return nil return nil
@ -108,27 +107,12 @@ func (s *DocumentStore) call(ctx context.Context, serviceMethod string, args any
return nil return nil
} }
func (s *DocumentStore) withClient(ctx context.Context, fn func(ctx context.Context, client *rpc.Client) error) error { func NewDocumentStore(serverURL *url.URL) *DocumentStore {
client, err := rpc.DialHTTPPath("tcp", s.serverURL.Host, s.serverURL.Path+"?"+s.serverURL.RawQuery) withClient := WithPooledClient(serverURL)
if err != nil {
return errors.WithStack(err) return &DocumentStore{
withClient: withClient,
} }
defer func() {
if err := client.Close(); err != nil {
logger.Error(ctx, "could not close rpc client", logger.CapturedE(errors.WithStack(err)))
}
}()
if err := fn(ctx, client); err != nil {
return errors.WithStack(err)
}
return nil
}
func NewDocumentStore(url *url.URL) *DocumentStore {
return &DocumentStore{url}
} }
var _ storage.DocumentStore = &DocumentStore{} var _ storage.DocumentStore = &DocumentStore{}

View File

@ -1,10 +1,31 @@
package client package client
import ( import (
"forge.cadoles.com/arcad/edge/pkg/storage"
"forge.cadoles.com/arcad/edge/pkg/storage/share" "forge.cadoles.com/arcad/edge/pkg/storage/share"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
func remapBlobError(err error) error {
switch errors.Cause(err).Error() {
case storage.ErrBlobNotFound.Error():
return storage.ErrBlobNotFound
case storage.ErrBucketClosed.Error():
return storage.ErrBucketClosed
default:
return err
}
}
func remapDocumentError(err error) error {
switch errors.Cause(err).Error() {
case storage.ErrDocumentNotFound.Error():
return storage.ErrDocumentNotFound
default:
return err
}
}
func remapShareError(err error) error { func remapShareError(err error) error {
switch errors.Cause(err).Error() { switch errors.Cause(err).Error() {
case share.ErrAttributeRequired.Error(): case share.ErrAttributeRequired.Error():

View File

@ -236,7 +236,7 @@ func (b *BlobBucket) withTx(ctx context.Context, fn func(tx *sql.Tx) error) erro
return errors.WithStack(err) return errors.WithStack(err)
} }
if err := WithTx(ctx, db, fn); err != nil { if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }
@ -335,7 +335,7 @@ func (wbc *blobWriterCloser) withTx(ctx context.Context, fn func(tx *sql.Tx) err
return errors.WithStack(err) return errors.WithStack(err)
} }
if err := WithTx(ctx, db, fn); err != nil { if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }
@ -444,7 +444,7 @@ func (brc *blobReaderCloser) withTx(ctx context.Context, fn func(tx *sql.Tx) err
return errors.WithStack(err) return errors.WithStack(err)
} }
if err := WithTx(ctx, db, fn); err != nil { if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }

View File

@ -114,7 +114,7 @@ func (s *BlobStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) error
return errors.WithStack(err) return errors.WithStack(err)
} }
if err := WithTx(ctx, db, fn); err != nil { if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }

View File

@ -0,0 +1,3 @@
package sqlite
const sqliteBusyMaxRetry = 5

View File

@ -269,7 +269,7 @@ func (s *DocumentStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) e
return errors.WithStack(err) return errors.WithStack(err)
} }
if err := WithTx(ctx, db, fn); err != nil { if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }

View File

@ -368,7 +368,7 @@ func (s *ShareStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) erro
return errors.WithStack(err) return errors.WithStack(err)
} }
if err := WithTx(ctx, db, fn); err != nil { if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }

View File

@ -3,7 +3,9 @@ package sqlite
import ( import (
"context" "context"
"database/sql" "database/sql"
"strings"
"sync" "sync"
"time"
"github.com/pkg/errors" "github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger" "gitlab.com/wpetit/goweb/logger"
@ -22,6 +24,58 @@ func Open(path string) (*sql.DB, error) {
return db, nil return db, nil
} }
func WithRetry(ctx context.Context, db *sql.DB, max int, fn func(*sql.Tx) error) error {
attempts := 0
ctx = logger.With(ctx, logger.F("max", max))
var err error
for {
ctx = logger.With(ctx)
if attempts >= max {
logger.Debug(ctx, "transaction retrying failed", logger.F("attempts", attempts))
return errors.Wrapf(err, "transaction failed after %d attempts", max)
}
err = WithTx(ctx, db, fn)
if err != nil {
if !strings.Contains(err.Error(), "(5) (SQLITE_BUSY)") {
return errors.WithStack(err)
}
err = errors.WithStack(err)
logger.Warn(ctx, "database is busy", logger.E(err))
wait := time.Duration(8<<(attempts+1)) * time.Millisecond
logger.Debug(
ctx, "database is busy, waiting before retrying transaction",
logger.F("wait", wait.String()),
logger.F("attempts", attempts),
)
timer := time.NewTimer(wait)
select {
case <-timer.C:
attempts++
continue
case <-ctx.Done():
if err := ctx.Err(); err != nil {
return errors.WithStack(err)
}
return nil
}
}
return nil
}
}
func WithTx(ctx context.Context, db *sql.DB, fn func(tx *sql.Tx) error) error { func WithTx(ctx context.Context, db *sql.DB, fn func(tx *sql.Tx) error) error {
var tx *sql.Tx var tx *sql.Tx

View File

@ -3,8 +3,10 @@ package testsuite
import ( import (
"bytes" "bytes"
"context" "context"
"crypto/rand"
"fmt" "fmt"
"io" "io"
mrand "math/rand"
"testing" "testing"
"forge.cadoles.com/arcad/edge/pkg/storage" "forge.cadoles.com/arcad/edge/pkg/storage"
@ -13,7 +15,6 @@ import (
func BenchmarkBlobStore(t *testing.B, store storage.BlobStore) { func BenchmarkBlobStore(t *testing.B, store storage.BlobStore) {
t.Run("BlobCreateUpdateReadDelete", func(t *testing.B) { t.Run("BlobCreateUpdateReadDelete", func(t *testing.B) {
for i := 0; i < t.N; i++ { for i := 0; i < t.N; i++ {
bucketName := fmt.Sprintf("bucket-%d", i) bucketName := fmt.Sprintf("bucket-%d", i)
if err := runBlobCreateUpdateReadDelete(store, bucketName); err != nil { if err := runBlobCreateUpdateReadDelete(store, bucketName); err != nil {
@ -21,6 +22,21 @@ func BenchmarkBlobStore(t *testing.B, store storage.BlobStore) {
} }
} }
}) })
t.Run("BlobRandomRead", func(t *testing.B) {
t.StopTimer()
if err := prepareBlobStoreRandomRead(store); err != nil {
t.Fatalf("%+v", errors.WithStack(err))
}
t.ResetTimer()
t.StartTimer()
for i := 0; i < t.N; i++ {
if err := doRandomRead(store); err != nil {
t.Fatalf("%+v", errors.WithStack(err))
}
}
})
} }
func runBlobCreateUpdateReadDelete(store storage.BlobStore, bucketName string) error { func runBlobCreateUpdateReadDelete(store storage.BlobStore, bucketName string) error {
@ -77,3 +93,115 @@ func runBlobCreateUpdateReadDelete(store storage.BlobStore, bucketName string) e
return nil return nil
} }
func prepareBlobStoreRandomRead(store storage.BlobStore) error {
ctx := context.Background()
totalBuckets := 128
totalBlobs := 64
for i := 0; i < totalBuckets; i++ {
bucketName := fmt.Sprintf("bucket-%d", i)
err := func(bucketName string) error {
bucket, err := store.OpenBucket(ctx, bucketName)
if err != nil {
return errors.WithStack(err)
}
defer func() {
if err := bucket.Close(); err != nil {
panic(errors.WithStack(err))
}
}()
for j := 0; j < totalBlobs; j++ {
blobID := storage.NewBlobID()
err = func(blobID storage.BlobID) error {
writer, err := bucket.NewWriter(ctx, blobID)
if err != nil {
return errors.WithStack(err)
}
defer func() {
if err := writer.Close(); err != nil {
panic(errors.WithStack(err))
}
}()
data := make([]byte, j)
if _, err := rand.Read(data); err != nil {
return errors.WithStack(err)
}
if _, err = writer.Write(data); err != nil {
return errors.WithStack(err)
}
if err := writer.Close(); err != nil {
return errors.WithStack(err)
}
return nil
}(blobID)
if err != nil {
return errors.WithStack(err)
}
}
return nil
}(bucketName)
if err != nil {
return errors.WithStack(err)
}
}
return nil
}
func doRandomRead(store storage.BlobStore) error {
ctx := context.Background()
buckets, err := store.ListBuckets(ctx)
if err != nil {
return errors.WithStack(err)
}
randBucketIndex := mrand.Int31n(int32(len(buckets)))
bucketName := buckets[randBucketIndex]
bucket, err := store.OpenBucket(ctx, bucketName)
if err != nil {
return errors.WithStack(err)
}
defer func() {
if err := bucket.Close(); err != nil {
panic(errors.WithStack(err))
}
}()
blobs, err := bucket.List(ctx)
if err != nil {
return errors.WithStack(err)
}
randBlobIndex := mrand.Int31n(int32(len(blobs)))
blobInfo := blobs[randBlobIndex]
blobID := blobInfo.ID()
reader, err := bucket.NewReader(ctx, blobID)
if err != nil {
return errors.WithStack(err)
}
var buf bytes.Buffer
if _, err = io.Copy(&buf, reader); err != nil {
return errors.WithStack(err)
}
if err := reader.Close(); err != nil {
return errors.WithStack(err)
}
return nil
}

View File

@ -122,6 +122,24 @@ var blobStoreTestCases = []blobStoreTestCase{
panic(errors.WithStack(err)) panic(errors.WithStack(err))
} }
reader, err = bucket.NewReader(ctx, blobID)
if err != nil {
return errors.WithStack(err)
}
written64, err = io.Copy(&buf, reader)
if err != nil {
return errors.WithStack(err)
}
if e, g := int64(len(data)), written64; e != g {
return errors.Errorf("length of written data: expected '%v', got '%v'", e, g)
}
if err := reader.Close(); err != nil {
panic(errors.WithStack(err))
}
if err := bucket.Close(); err != nil { if err := bucket.Close(); err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }