Compare commits

..

8 Commits

Author SHA1 Message Date
8889694125 feat(cli): add basic bundle info command
All checks were successful
arcad/edge/pipeline/head This commit looks good
2023-10-24 22:52:51 +02:00
6a99409a15 feat(blobstore): add cache driver 2023-10-24 22:52:33 +02:00
2fc590d708 feat(storage): retry sqlite failed transaction when database is busy
All checks were successful
arcad/edge/pipeline/head This commit looks good
2023-10-22 23:18:02 +02:00
6e4bf2f025 feat(storage): remap rpc errors
All checks were successful
arcad/edge/pipeline/head This commit looks good
2023-10-22 23:04:56 +02:00
22a3326be9 feat(lifecycle): execute onInit func asynchronously
All checks were successful
arcad/edge/pipeline/head This commit looks good
2023-10-22 10:47:44 +02:00
0cfb132b65 feat(lifecycle-module): add debug message for onInit() execution
All checks were successful
arcad/edge/pipeline/head This commit looks good
2023-10-21 21:46:51 +02:00
de4ab0d02c fix(bus): prevent double close in event dispatcher
All checks were successful
arcad/edge/pipeline/head This commit looks good
2023-10-21 21:38:34 +02:00
d1458bab4a ci: use go 1.21.2
All checks were successful
arcad/edge/pipeline/head This commit looks good
2023-10-20 11:01:32 +02:00
27 changed files with 725 additions and 37 deletions

View File

@ -1,4 +1,4 @@
RUN_APP_ARGS="" RUN_APP_ARGS=""
#EDGE_DOCUMENTSTORE_DSN="rpc://localhost:3001/documentstore?tenant=local&appId=%APPID%" #EDGE_DOCUMENTSTORE_DSN="rpc://localhost:3001/documentstore?tenant=local&appId=%APPID%"
#EDGE_BLOBSTORE_DSN="rpc://localhost:3001/blobstore?tenant=local&appId=%APPID%" #EDGE_BLOBSTORE_DSN="cache://localhost:3001/blobstore?driver=rpc&tenant=local&appId=%APPID%"
#EDGE_SHARESTORE_DSN="rpc://localhost:3001/sharestore?tenant=local" #EDGE_SHARESTORE_DSN="rpc://localhost:3001/sharestore?tenant=local"

View File

@ -0,0 +1,56 @@
package app
import (
"os"
"forge.cadoles.com/arcad/edge/pkg/app"
"forge.cadoles.com/arcad/edge/pkg/bundle"
"github.com/pkg/errors"
"github.com/urfave/cli/v2"
"gopkg.in/yaml.v2"
)
func InfoCommand() *cli.Command {
return &cli.Command{
Name: "info",
Usage: "Print app manifest informations",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "path",
Usage: "use `PATH` as app bundle (zip, zim or directory bundle)",
Aliases: []string{"p"},
Value: "",
Required: true,
},
},
Action: func(ctx *cli.Context) error {
appPath := ctx.String("path")
bundle, err := bundle.FromPath(appPath)
if err != nil {
return errors.Wrap(err, "could not load app bundle")
}
manifest, err := app.LoadManifest(bundle)
if err != nil {
return errors.Wrap(err, "could not load app manifest")
}
if valid, err := manifest.Validate(manifestMetadataValidators...); !valid {
return errors.Wrap(err, "invalid app manifest")
}
encoder := yaml.NewEncoder(os.Stdout)
if err := encoder.Encode(manifest); err != nil {
return errors.Wrap(err, "could not encode manifest")
}
if err := encoder.Close(); err != nil {
return errors.WithStack(err)
}
return nil
},
}
}

View File

@ -12,6 +12,7 @@ func Root() *cli.Command {
RunCommand(), RunCommand(),
PackageCommand(), PackageCommand(),
HashPasswordCommand(), HashPasswordCommand(),
InfoCommand(),
}, },
} }
} }

View File

@ -44,10 +44,13 @@ import (
_ "forge.cadoles.com/arcad/edge/pkg/module/auth/http/passwd/argon2id" _ "forge.cadoles.com/arcad/edge/pkg/module/auth/http/passwd/argon2id"
_ "forge.cadoles.com/arcad/edge/pkg/module/auth/http/passwd/plain" _ "forge.cadoles.com/arcad/edge/pkg/module/auth/http/passwd/plain"
// Register storage drivers
"forge.cadoles.com/arcad/edge/pkg/storage/driver" "forge.cadoles.com/arcad/edge/pkg/storage/driver"
// Register storage drivers
_ "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache"
_ "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc" _ "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc"
_ "forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite" _ "forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite"
"forge.cadoles.com/arcad/edge/pkg/storage/share" "forge.cadoles.com/arcad/edge/pkg/storage/share"
) )

View File

@ -22,13 +22,15 @@ import (
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
// Register storage drivers // Register storage drivers
_ "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache"
_ "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc"
_ "forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite"
"forge.cadoles.com/arcad/edge/cmd/storage-server/command/flag" "forge.cadoles.com/arcad/edge/cmd/storage-server/command/flag"
"forge.cadoles.com/arcad/edge/pkg/jwtutil" "forge.cadoles.com/arcad/edge/pkg/jwtutil"
"forge.cadoles.com/arcad/edge/pkg/storage" "forge.cadoles.com/arcad/edge/pkg/storage"
"forge.cadoles.com/arcad/edge/pkg/storage/driver" "forge.cadoles.com/arcad/edge/pkg/storage/driver"
_ "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc"
"forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server" "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server"
_ "forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite"
"forge.cadoles.com/arcad/edge/pkg/storage/share" "forge.cadoles.com/arcad/edge/pkg/storage/share"
) )
@ -51,17 +53,17 @@ func Run() *cli.Command {
&cli.StringFlag{ &cli.StringFlag{
Name: "blobstore-dsn-pattern", Name: "blobstore-dsn-pattern",
EnvVars: []string{"STORAGE_SERVER_BLOBSTORE_DSN_PATTERN"}, EnvVars: []string{"STORAGE_SERVER_BLOBSTORE_DSN_PATTERN"},
Value: fmt.Sprintf("sqlite://data/%%TENANT%%/%%APPID%%/blobstore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", (60 * time.Second).Milliseconds()), Value: fmt.Sprintf("sqlite://data/%%TENANT%%/%%APPID%%/blobstore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d&_pragma=journal_mode=wal", (60 * time.Second).Milliseconds()),
}, },
&cli.StringFlag{ &cli.StringFlag{
Name: "documentstore-dsn-pattern", Name: "documentstore-dsn-pattern",
EnvVars: []string{"STORAGE_SERVER_DOCUMENTSTORE_DSN_PATTERN"}, EnvVars: []string{"STORAGE_SERVER_DOCUMENTSTORE_DSN_PATTERN"},
Value: fmt.Sprintf("sqlite://data/%%TENANT%%/%%APPID%%/documentstore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", (60 * time.Second).Milliseconds()), Value: fmt.Sprintf("sqlite://data/%%TENANT%%/%%APPID%%/documentstore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d&_pragma=journal_mode=wal", (60 * time.Second).Milliseconds()),
}, },
&cli.StringFlag{ &cli.StringFlag{
Name: "sharestore-dsn-pattern", Name: "sharestore-dsn-pattern",
EnvVars: []string{"STORAGE_SERVER_SHARESTORE_DSN_PATTERN"}, EnvVars: []string{"STORAGE_SERVER_SHARESTORE_DSN_PATTERN"},
Value: fmt.Sprintf("sqlite://data/%%TENANT%%/sharestore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", (60 * time.Second).Milliseconds()), Value: fmt.Sprintf("sqlite://data/%%TENANT%%/sharestore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d&_pragma=journal_mode=wal", (60 * time.Second).Milliseconds()),
}, },
&cli.StringFlag{ &cli.StringFlag{
Name: "sentry-dsn", Name: "sentry-dsn",

View File

@ -4,7 +4,7 @@ ARG HTTP_PROXY=
ARG HTTPS_PROXY= ARG HTTPS_PROXY=
ARG http_proxy= ARG http_proxy=
ARG https_proxy= ARG https_proxy=
ARG GO_VERSION=1.20.2 ARG GO_VERSION=1.21.2
# Install dev environment dependencies # Install dev environment dependencies
RUN export DEBIAN_FRONTEND=noninteractive &&\ RUN export DEBIAN_FRONTEND=noninteractive &&\

View File

@ -83,8 +83,12 @@ func (d *eventDispatcher) Close() {
} }
func (d *eventDispatcher) close() { func (d *eventDispatcher) close() {
d.closed = true if d.closed {
return
}
close(d.in) close(d.in)
d.closed = true
} }
func (d *eventDispatcher) In(msg bus.Message) (err error) { func (d *eventDispatcher) In(msg bus.Message) (err error) {

View File

@ -19,29 +19,16 @@ func (m *LifecycleModule) Export(export *goja.Object) {
} }
func (m *LifecycleModule) OnInit(ctx context.Context, rt *goja.Runtime) (err error) { func (m *LifecycleModule) OnInit(ctx context.Context, rt *goja.Runtime) (err error) {
call, ok := goja.AssertFunction(rt.Get("onInit")) _, ok := goja.AssertFunction(rt.Get("onInit"))
if !ok { if !ok {
logger.Warn(ctx, "could not find onInit() function") logger.Warn(ctx, "could not find onInit() function")
return nil return nil
} }
defer func() { if _, err := rt.RunString("setTimeout(onInit, 0)"); err != nil {
if recovered := recover(); recovered != nil { return errors.WithStack(err)
revoveredErr, ok := recovered.(error) }
if ok {
logger.Error(ctx, "recovered runtime error", logger.CapturedE(errors.WithStack(revoveredErr)))
err = errors.WithStack(app.ErrUnknownError)
return
}
panic(recovered)
}
}()
call(nil)
return nil return nil
} }

View File

@ -115,7 +115,7 @@ func (m *Module) handleClientMessages() {
case msg := <-clientMessages: case msg := <-clientMessages:
clientMessage, ok := msg.(*module.ClientMessage) clientMessage, ok := msg.(*module.ClientMessage)
if !ok { if !ok {
logger.Error( logger.Warn(
ctx, ctx,
"unexpected message type", "unexpected message type",
logger.F("message", msg), logger.F("message", msg),

View File

@ -23,7 +23,7 @@ func NewBlobStore(dsn string) (storage.BlobStore, error) {
factory, exists := blobStoreFactories[url.Scheme] factory, exists := blobStoreFactories[url.Scheme]
if !exists { if !exists {
return nil, errors.WithStack(ErrSchemeNotRegistered) return nil, errors.Wrapf(ErrSchemeNotRegistered, "no driver associated with scheme '%s'", url.Scheme)
} }
store, err := factory(url) store, err := factory(url)

115
pkg/storage/driver/cache/blob_bucket.go vendored Normal file
View File

@ -0,0 +1,115 @@
package cache
import (
"context"
"fmt"
"io"
"forge.cadoles.com/arcad/edge/pkg/storage"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
type BlobBucket struct {
bucket storage.BlobBucket
cache *expirable.LRU[string, []byte]
}
// Close implements storage.BlobBucket.
func (b *BlobBucket) Close() error {
if err := b.bucket.Close(); err != nil {
return errors.WithStack(err)
}
return nil
}
// Delete implements storage.BlobBucket.
func (b *BlobBucket) Delete(ctx context.Context, id storage.BlobID) error {
if err := b.bucket.Delete(ctx, id); err != nil {
return errors.WithStack(err)
}
return nil
}
// Get implements storage.BlobBucket.
func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobInfo, error) {
info, err := b.bucket.Get(ctx, id)
if err != nil {
return nil, errors.WithStack(err)
}
return info, nil
}
// List implements storage.BlobBucket.
func (b *BlobBucket) List(ctx context.Context) ([]storage.BlobInfo, error) {
infos, err := b.bucket.List(ctx)
if err != nil {
return nil, errors.WithStack(err)
}
return infos, nil
}
// Name implements storage.BlobBucket.
func (b *BlobBucket) Name() string {
return b.bucket.Name()
}
// NewReader implements storage.BlobBucket.
func (b *BlobBucket) NewReader(ctx context.Context, id storage.BlobID) (io.ReadSeekCloser, error) {
if cached, exist := b.inCache(id); exist {
logger.Debug(ctx, "found blob in cache", logger.F("cacheKey", b.getCacheKey(id)))
return cached, nil
}
reader, err := b.bucket.NewReader(ctx, id)
if err != nil {
return nil, errors.WithStack(err)
}
return &readCacher{
reader: reader,
cache: b.cache,
key: b.getCacheKey(id),
}, nil
}
func (b *BlobBucket) getCacheKey(id storage.BlobID) string {
return fmt.Sprintf("%s-%s", b.Name(), id)
}
func (b *BlobBucket) inCache(id storage.BlobID) (io.ReadSeekCloser, bool) {
key := b.getCacheKey(id)
data, exist := b.cache.Get(key)
if !exist {
return nil, false
}
return &cachedReader{data, 0}, true
}
// NewWriter implements storage.BlobBucket.
func (b *BlobBucket) NewWriter(ctx context.Context, id storage.BlobID) (io.WriteCloser, error) {
writer, err := b.bucket.NewWriter(ctx, id)
if err != nil {
return nil, errors.WithStack(err)
}
return writer, nil
}
// Size implements storage.BlobBucket.
func (b *BlobBucket) Size(ctx context.Context) (int64, error) {
size, err := b.bucket.Size(ctx)
if err != nil {
return 0, errors.WithStack(err)
}
return size, nil
}
var _ storage.BlobBucket = &BlobBucket{}

56
pkg/storage/driver/cache/blob_store.go vendored Normal file
View File

@ -0,0 +1,56 @@
package cache
import (
"context"
"time"
"forge.cadoles.com/arcad/edge/pkg/storage"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/pkg/errors"
)
type BlobStore struct {
store storage.BlobStore
cache *expirable.LRU[string, []byte]
}
// DeleteBucket implements storage.BlobStore.
func (s *BlobStore) DeleteBucket(ctx context.Context, name string) error {
if err := s.store.DeleteBucket(ctx, name); err != nil {
return errors.WithStack(err)
}
return nil
}
// ListBuckets implements storage.BlobStore.
func (s *BlobStore) ListBuckets(ctx context.Context) ([]string, error) {
buckets, err := s.store.ListBuckets(ctx)
if err != nil {
return nil, errors.WithStack(err)
}
return buckets, nil
}
// OpenBucket implements storage.BlobStore.
func (s *BlobStore) OpenBucket(ctx context.Context, name string) (storage.BlobBucket, error) {
bucket, err := s.store.OpenBucket(ctx, name)
if err != nil {
return nil, errors.WithStack(err)
}
return &BlobBucket{
bucket: bucket,
cache: s.cache,
}, nil
}
func NewBlobStore(store storage.BlobStore, cacheSize int, cacheTTL time.Duration) *BlobStore {
return &BlobStore{
store: store,
cache: expirable.NewLRU[string, []byte](cacheSize, nil, cacheTTL),
}
}
var _ storage.BlobStore = &BlobStore{}

View File

@ -0,0 +1,50 @@
package cache
import (
"context"
"fmt"
"os"
"testing"
"time"
"forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite"
"forge.cadoles.com/arcad/edge/pkg/storage/testsuite"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
func TestBlobStore(t *testing.T) {
t.Parallel()
if testing.Verbose() {
logger.SetLevel(logger.LevelDebug)
}
file := "./testdata/blobstore_test.sqlite"
if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) {
t.Fatalf("%+v", errors.WithStack(err))
}
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
backend := sqlite.NewBlobStore(dsn)
store := NewBlobStore(backend, 32, time.Second*1)
testsuite.TestBlobStore(context.Background(), t, store)
}
func BenchmarkBlobStore(t *testing.B) {
logger.SetLevel(logger.LevelError)
file := "./testdata/blobstore_test.sqlite"
if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) {
t.Fatalf("%+v", errors.WithStack(err))
}
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
backend := sqlite.NewBlobStore(dsn)
store := NewBlobStore(backend, 32, time.Minute)
testsuite.BenchmarkBlobStore(t, store)
}

64
pkg/storage/driver/cache/driver.go vendored Normal file
View File

@ -0,0 +1,64 @@
package cache
import (
"net/url"
"strconv"
"time"
"forge.cadoles.com/arcad/edge/pkg/storage"
"forge.cadoles.com/arcad/edge/pkg/storage/driver"
"github.com/pkg/errors"
)
func init() {
driver.RegisterBlobStoreFactory("cache", blobStoreFactory)
}
func blobStoreFactory(dsn *url.URL) (storage.BlobStore, error) {
query := dsn.Query()
rawCacheSize := query.Get("cacheSize")
if rawCacheSize == "" {
rawCacheSize = "128"
}
cacheSize, err := strconv.ParseInt(rawCacheSize, 10, 32)
if err != nil {
return nil, errors.Wrap(err, "could not parse cacheSize url parameter")
}
query.Del("cacheSize")
rawCacheTTL := query.Get("cacheTTL")
if rawCacheTTL == "" {
rawCacheTTL = "10m"
}
cacheTTL, err := time.ParseDuration(rawCacheTTL)
if err != nil {
return nil, errors.Wrap(err, "could not parse cacheTTL url parameter")
}
query.Del("cacheTTL")
rawDriver := query.Get("driver")
if rawDriver == "" {
return nil, errors.New("missing required url parameter 'driver'")
}
query.Del("driver")
url := &url.URL{
Scheme: rawDriver,
Host: dsn.Host,
Path: dsn.Path,
RawQuery: query.Encode(),
}
store, err := driver.NewBlobStore(url.String())
if err != nil {
return nil, errors.WithStack(err)
}
return NewBlobStore(store, int(cacheSize), cacheTTL), nil
}

124
pkg/storage/driver/cache/reader.go vendored Normal file
View File

@ -0,0 +1,124 @@
package cache
import (
"bytes"
"context"
"fmt"
"io"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
type readCacher struct {
reader io.ReadSeekCloser
cache *expirable.LRU[string, []byte]
key string
buffer bytes.Buffer
}
// Close implements io.ReadSeekCloser.
func (r *readCacher) Close() error {
if err := r.reader.Close(); err != nil {
return errors.WithStack(err)
}
return nil
}
// Read implements io.ReadSeekCloser.
func (r *readCacher) Read(p []byte) (n int, err error) {
length, err := r.reader.Read(p)
if err != nil {
if err == io.EOF {
if length > 0 {
if _, err := r.buffer.Write(p[:length]); err != nil {
logger.Error(context.Background(), "could not write to buffer", logger.CapturedE(errors.WithStack(err)))
return length, io.EOF
}
}
logger.Debug(context.Background(), "caching blob", logger.F("cacheKey", r.key))
r.cache.Add(r.key, r.buffer.Bytes())
return length, io.EOF
}
return length, errors.WithStack(err)
}
if length > 0 {
if _, err := r.buffer.Write(p[:length]); err != nil {
logger.Error(context.Background(), "could not write to buffer", logger.CapturedE(errors.WithStack(err)))
}
}
return length, nil
}
// Seek implements io.ReadSeekCloser.
func (r *readCacher) Seek(offset int64, whence int) (int64, error) {
length, err := r.reader.Seek(offset, whence)
if err != nil {
return length, errors.WithStack(err)
}
return length, nil
}
var _ io.ReadSeekCloser = &readCacher{}
type cachedReader struct {
buffer []byte
offset int64
}
// Read implements io.ReadSeekCloser.
func (r *cachedReader) Read(p []byte) (n int, err error) {
available := len(r.buffer) - int(r.offset)
if available == 0 {
return 0, io.EOF
}
size := len(p)
if size > available {
size = available
}
copy(p, r.buffer[r.offset:r.offset+int64(size)])
r.offset += int64(size)
return size, nil
}
// Close implements io.ReadSeekCloser.
func (r *cachedReader) Close() error {
return nil
}
// Seek implements io.ReadSeekCloser.
func (r *cachedReader) Seek(offset int64, whence int) (int64, error) {
var newOffset int64
switch whence {
case io.SeekStart:
newOffset = offset
case io.SeekCurrent:
newOffset = r.offset + offset
case io.SeekEnd:
newOffset = int64(len(r.buffer)) + offset
default:
return 0, errors.Errorf("unknown seek whence '%d'", whence)
}
if newOffset > int64(len(r.buffer)) || newOffset < 0 {
return 0, fmt.Errorf("invalid offset %d", offset)
}
r.offset = newOffset
return newOffset, nil
}
var _ io.ReadSeekCloser = &cachedReader{}

View File

@ -0,0 +1,2 @@
*
!.gitignore

View File

@ -63,7 +63,7 @@ func (s *BlobStore) OpenBucket(ctx context.Context, name string) (storage.BlobBu
func (s *BlobStore) call(ctx context.Context, serviceMethod string, args any, reply any) error { func (s *BlobStore) call(ctx context.Context, serviceMethod string, args any, reply any) error {
err := s.withClient(ctx, func(ctx context.Context, client *rpc.Client) error { err := s.withClient(ctx, func(ctx context.Context, client *rpc.Client) error {
if err := client.Call(ctx, serviceMethod, args, reply); err != nil { if err := client.Call(ctx, serviceMethod, args, reply); err != nil {
return errors.WithStack(err) return errors.WithStack(remapBlobError(err))
} }
return nil return nil

View File

@ -96,7 +96,7 @@ func (s *DocumentStore) Upsert(ctx context.Context, collection string, doc stora
func (s *DocumentStore) call(ctx context.Context, serviceMethod string, args any, reply any) error { func (s *DocumentStore) call(ctx context.Context, serviceMethod string, args any, reply any) error {
err := s.withClient(ctx, func(ctx context.Context, client *rpc.Client) error { err := s.withClient(ctx, func(ctx context.Context, client *rpc.Client) error {
if err := client.Call(ctx, serviceMethod, args, reply); err != nil { if err := client.Call(ctx, serviceMethod, args, reply); err != nil {
return errors.WithStack(err) return errors.WithStack(remapDocumentError(err))
} }
return nil return nil

View File

@ -1,10 +1,31 @@
package client package client
import ( import (
"forge.cadoles.com/arcad/edge/pkg/storage"
"forge.cadoles.com/arcad/edge/pkg/storage/share" "forge.cadoles.com/arcad/edge/pkg/storage/share"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
func remapBlobError(err error) error {
switch errors.Cause(err).Error() {
case storage.ErrBlobNotFound.Error():
return storage.ErrBlobNotFound
case storage.ErrBucketClosed.Error():
return storage.ErrBucketClosed
default:
return err
}
}
func remapDocumentError(err error) error {
switch errors.Cause(err).Error() {
case storage.ErrDocumentNotFound.Error():
return storage.ErrDocumentNotFound
default:
return err
}
}
func remapShareError(err error) error { func remapShareError(err error) error {
switch errors.Cause(err).Error() { switch errors.Cause(err).Error() {
case share.ErrAttributeRequired.Error(): case share.ErrAttributeRequired.Error():

View File

@ -236,7 +236,7 @@ func (b *BlobBucket) withTx(ctx context.Context, fn func(tx *sql.Tx) error) erro
return errors.WithStack(err) return errors.WithStack(err)
} }
if err := WithTx(ctx, db, fn); err != nil { if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }
@ -335,7 +335,7 @@ func (wbc *blobWriterCloser) withTx(ctx context.Context, fn func(tx *sql.Tx) err
return errors.WithStack(err) return errors.WithStack(err)
} }
if err := WithTx(ctx, db, fn); err != nil { if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }
@ -444,7 +444,7 @@ func (brc *blobReaderCloser) withTx(ctx context.Context, fn func(tx *sql.Tx) err
return errors.WithStack(err) return errors.WithStack(err)
} }
if err := WithTx(ctx, db, fn); err != nil { if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }

View File

@ -114,7 +114,7 @@ func (s *BlobStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) error
return errors.WithStack(err) return errors.WithStack(err)
} }
if err := WithTx(ctx, db, fn); err != nil { if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }

View File

@ -0,0 +1,3 @@
package sqlite
const sqliteBusyMaxRetry = 5

View File

@ -269,7 +269,7 @@ func (s *DocumentStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) e
return errors.WithStack(err) return errors.WithStack(err)
} }
if err := WithTx(ctx, db, fn); err != nil { if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }

View File

@ -368,7 +368,7 @@ func (s *ShareStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) erro
return errors.WithStack(err) return errors.WithStack(err)
} }
if err := WithTx(ctx, db, fn); err != nil { if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }

View File

@ -3,7 +3,9 @@ package sqlite
import ( import (
"context" "context"
"database/sql" "database/sql"
"strings"
"sync" "sync"
"time"
"github.com/pkg/errors" "github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger" "gitlab.com/wpetit/goweb/logger"
@ -22,6 +24,58 @@ func Open(path string) (*sql.DB, error) {
return db, nil return db, nil
} }
func WithRetry(ctx context.Context, db *sql.DB, max int, fn func(*sql.Tx) error) error {
attempts := 0
ctx = logger.With(ctx, logger.F("max", max))
var err error
for {
ctx = logger.With(ctx)
if attempts >= max {
logger.Debug(ctx, "transaction retrying failed", logger.F("attempts", attempts))
return errors.Wrapf(err, "transaction failed after %d attempts", max)
}
err = WithTx(ctx, db, fn)
if err != nil {
if !strings.Contains(err.Error(), "(5) (SQLITE_BUSY)") {
return errors.WithStack(err)
}
err = errors.WithStack(err)
logger.Warn(ctx, "database is busy", logger.E(err))
wait := time.Duration(8<<(attempts+1)) * time.Millisecond
logger.Debug(
ctx, "database is busy, waiting before retrying transaction",
logger.F("wait", wait.String()),
logger.F("attempts", attempts),
)
timer := time.NewTimer(wait)
select {
case <-timer.C:
attempts++
continue
case <-ctx.Done():
if err := ctx.Err(); err != nil {
return errors.WithStack(err)
}
return nil
}
}
return nil
}
}
func WithTx(ctx context.Context, db *sql.DB, fn func(tx *sql.Tx) error) error { func WithTx(ctx context.Context, db *sql.DB, fn func(tx *sql.Tx) error) error {
var tx *sql.Tx var tx *sql.Tx

View File

@ -3,8 +3,10 @@ package testsuite
import ( import (
"bytes" "bytes"
"context" "context"
"crypto/rand"
"fmt" "fmt"
"io" "io"
mrand "math/rand"
"testing" "testing"
"forge.cadoles.com/arcad/edge/pkg/storage" "forge.cadoles.com/arcad/edge/pkg/storage"
@ -13,7 +15,6 @@ import (
func BenchmarkBlobStore(t *testing.B, store storage.BlobStore) { func BenchmarkBlobStore(t *testing.B, store storage.BlobStore) {
t.Run("BlobCreateUpdateReadDelete", func(t *testing.B) { t.Run("BlobCreateUpdateReadDelete", func(t *testing.B) {
for i := 0; i < t.N; i++ { for i := 0; i < t.N; i++ {
bucketName := fmt.Sprintf("bucket-%d", i) bucketName := fmt.Sprintf("bucket-%d", i)
if err := runBlobCreateUpdateReadDelete(store, bucketName); err != nil { if err := runBlobCreateUpdateReadDelete(store, bucketName); err != nil {
@ -21,6 +22,21 @@ func BenchmarkBlobStore(t *testing.B, store storage.BlobStore) {
} }
} }
}) })
t.Run("BlobRandomRead", func(t *testing.B) {
t.StopTimer()
if err := prepareBlobStoreRandomRead(store); err != nil {
t.Fatalf("%+v", errors.WithStack(err))
}
t.ResetTimer()
t.StartTimer()
for i := 0; i < t.N; i++ {
if err := doRandomRead(store); err != nil {
t.Fatalf("%+v", errors.WithStack(err))
}
}
})
} }
func runBlobCreateUpdateReadDelete(store storage.BlobStore, bucketName string) error { func runBlobCreateUpdateReadDelete(store storage.BlobStore, bucketName string) error {
@ -77,3 +93,115 @@ func runBlobCreateUpdateReadDelete(store storage.BlobStore, bucketName string) e
return nil return nil
} }
func prepareBlobStoreRandomRead(store storage.BlobStore) error {
ctx := context.Background()
totalBuckets := 128
totalBlobs := 64
for i := 0; i < totalBuckets; i++ {
bucketName := fmt.Sprintf("bucket-%d", i)
err := func(bucketName string) error {
bucket, err := store.OpenBucket(ctx, bucketName)
if err != nil {
return errors.WithStack(err)
}
defer func() {
if err := bucket.Close(); err != nil {
panic(errors.WithStack(err))
}
}()
for j := 0; j < totalBlobs; j++ {
blobID := storage.NewBlobID()
err = func(blobID storage.BlobID) error {
writer, err := bucket.NewWriter(ctx, blobID)
if err != nil {
return errors.WithStack(err)
}
defer func() {
if err := writer.Close(); err != nil {
panic(errors.WithStack(err))
}
}()
data := make([]byte, j)
if _, err := rand.Read(data); err != nil {
return errors.WithStack(err)
}
if _, err = writer.Write(data); err != nil {
return errors.WithStack(err)
}
if err := writer.Close(); err != nil {
return errors.WithStack(err)
}
return nil
}(blobID)
if err != nil {
return errors.WithStack(err)
}
}
return nil
}(bucketName)
if err != nil {
return errors.WithStack(err)
}
}
return nil
}
func doRandomRead(store storage.BlobStore) error {
ctx := context.Background()
buckets, err := store.ListBuckets(ctx)
if err != nil {
return errors.WithStack(err)
}
randBucketIndex := mrand.Int31n(int32(len(buckets)))
bucketName := buckets[randBucketIndex]
bucket, err := store.OpenBucket(ctx, bucketName)
if err != nil {
return errors.WithStack(err)
}
defer func() {
if err := bucket.Close(); err != nil {
panic(errors.WithStack(err))
}
}()
blobs, err := bucket.List(ctx)
if err != nil {
return errors.WithStack(err)
}
randBlobIndex := mrand.Int31n(int32(len(blobs)))
blobInfo := blobs[randBlobIndex]
blobID := blobInfo.ID()
reader, err := bucket.NewReader(ctx, blobID)
if err != nil {
return errors.WithStack(err)
}
var buf bytes.Buffer
if _, err = io.Copy(&buf, reader); err != nil {
return errors.WithStack(err)
}
if err := reader.Close(); err != nil {
return errors.WithStack(err)
}
return nil
}

View File

@ -122,6 +122,24 @@ var blobStoreTestCases = []blobStoreTestCase{
panic(errors.WithStack(err)) panic(errors.WithStack(err))
} }
reader, err = bucket.NewReader(ctx, blobID)
if err != nil {
return errors.WithStack(err)
}
written64, err = io.Copy(&buf, reader)
if err != nil {
return errors.WithStack(err)
}
if e, g := int64(len(data)), written64; e != g {
return errors.Errorf("length of written data: expected '%v', got '%v'", e, g)
}
if err := reader.Close(); err != nil {
panic(errors.WithStack(err))
}
if err := bucket.Close(); err != nil { if err := bucket.Close(); err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }