Compare commits
3 Commits
2023.10.21
...
2023.10.22
Author | SHA1 | Date | |
---|---|---|---|
2fc590d708 | |||
6e4bf2f025 | |||
22a3326be9 |
@ -2,7 +2,6 @@ package module
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/app"
|
||||
"github.com/dop251/goja"
|
||||
@ -20,34 +19,17 @@ func (m *LifecycleModule) Export(export *goja.Object) {
|
||||
}
|
||||
|
||||
func (m *LifecycleModule) OnInit(ctx context.Context, rt *goja.Runtime) (err error) {
|
||||
call, ok := goja.AssertFunction(rt.Get("onInit"))
|
||||
_, ok := goja.AssertFunction(rt.Get("onInit"))
|
||||
if !ok {
|
||||
logger.Warn(ctx, "could not find onInit() function")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if recovered := recover(); recovered != nil {
|
||||
revoveredErr, ok := recovered.(error)
|
||||
if ok {
|
||||
logger.Error(ctx, "recovered runtime error", logger.CapturedE(errors.WithStack(revoveredErr)))
|
||||
|
||||
err = errors.WithStack(app.ErrUnknownError)
|
||||
|
||||
return
|
||||
if _, err := rt.RunString("setTimeout(onInit, 0)"); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
panic(recovered)
|
||||
}
|
||||
}()
|
||||
|
||||
logger.Debug(ctx, "executing app onInit() function")
|
||||
start := time.Now()
|
||||
call(nil)
|
||||
duration := time.Since(start)
|
||||
logger.Debug(ctx, "executed app onInit() function", logger.F("duration", duration.String()))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -63,7 +63,7 @@ func (s *BlobStore) OpenBucket(ctx context.Context, name string) (storage.BlobBu
|
||||
func (s *BlobStore) call(ctx context.Context, serviceMethod string, args any, reply any) error {
|
||||
err := s.withClient(ctx, func(ctx context.Context, client *rpc.Client) error {
|
||||
if err := client.Call(ctx, serviceMethod, args, reply); err != nil {
|
||||
return errors.WithStack(err)
|
||||
return errors.WithStack(remapBlobError(err))
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -96,7 +96,7 @@ func (s *DocumentStore) Upsert(ctx context.Context, collection string, doc stora
|
||||
func (s *DocumentStore) call(ctx context.Context, serviceMethod string, args any, reply any) error {
|
||||
err := s.withClient(ctx, func(ctx context.Context, client *rpc.Client) error {
|
||||
if err := client.Call(ctx, serviceMethod, args, reply); err != nil {
|
||||
return errors.WithStack(err)
|
||||
return errors.WithStack(remapDocumentError(err))
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -1,10 +1,31 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/share"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func remapBlobError(err error) error {
|
||||
switch errors.Cause(err).Error() {
|
||||
case storage.ErrBlobNotFound.Error():
|
||||
return storage.ErrBlobNotFound
|
||||
case storage.ErrBucketClosed.Error():
|
||||
return storage.ErrBucketClosed
|
||||
default:
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func remapDocumentError(err error) error {
|
||||
switch errors.Cause(err).Error() {
|
||||
case storage.ErrDocumentNotFound.Error():
|
||||
return storage.ErrDocumentNotFound
|
||||
default:
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func remapShareError(err error) error {
|
||||
switch errors.Cause(err).Error() {
|
||||
case share.ErrAttributeRequired.Error():
|
||||
|
@ -236,7 +236,7 @@ func (b *BlobBucket) withTx(ctx context.Context, fn func(tx *sql.Tx) error) erro
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := WithTx(ctx, db, fn); err != nil {
|
||||
if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
@ -335,7 +335,7 @@ func (wbc *blobWriterCloser) withTx(ctx context.Context, fn func(tx *sql.Tx) err
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := WithTx(ctx, db, fn); err != nil {
|
||||
if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
@ -444,7 +444,7 @@ func (brc *blobReaderCloser) withTx(ctx context.Context, fn func(tx *sql.Tx) err
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := WithTx(ctx, db, fn); err != nil {
|
||||
if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
|
@ -114,7 +114,7 @@ func (s *BlobStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) error
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := WithTx(ctx, db, fn); err != nil {
|
||||
if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
|
3
pkg/storage/driver/sqlite/const.go
Normal file
3
pkg/storage/driver/sqlite/const.go
Normal file
@ -0,0 +1,3 @@
|
||||
package sqlite
|
||||
|
||||
const sqliteBusyMaxRetry = 5
|
@ -269,7 +269,7 @@ func (s *DocumentStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) e
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := WithTx(ctx, db, fn); err != nil {
|
||||
if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
|
@ -368,7 +368,7 @@ func (s *ShareStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) erro
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := WithTx(ctx, db, fn); err != nil {
|
||||
if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
|
@ -3,7 +3,9 @@ package sqlite
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
@ -22,6 +24,58 @@ func Open(path string) (*sql.DB, error) {
|
||||
return db, nil
|
||||
}
|
||||
|
||||
func WithRetry(ctx context.Context, db *sql.DB, max int, fn func(*sql.Tx) error) error {
|
||||
attempts := 0
|
||||
|
||||
ctx = logger.With(ctx, logger.F("max", max))
|
||||
|
||||
var err error
|
||||
for {
|
||||
ctx = logger.With(ctx)
|
||||
|
||||
if attempts >= max {
|
||||
logger.Debug(ctx, "transaction retrying failed", logger.F("attempts", attempts))
|
||||
|
||||
return errors.Wrapf(err, "transaction failed after %d attempts", max)
|
||||
}
|
||||
|
||||
err = WithTx(ctx, db, fn)
|
||||
if err != nil {
|
||||
if !strings.Contains(err.Error(), "(5) (SQLITE_BUSY)") {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
err = errors.WithStack(err)
|
||||
|
||||
logger.Warn(ctx, "database is busy", logger.E(err))
|
||||
|
||||
wait := time.Duration(8<<(attempts+1)) * time.Millisecond
|
||||
|
||||
logger.Debug(
|
||||
ctx, "database is busy, waiting before retrying transaction",
|
||||
logger.F("wait", wait.String()),
|
||||
logger.F("attempts", attempts),
|
||||
)
|
||||
|
||||
timer := time.NewTimer(wait)
|
||||
select {
|
||||
case <-timer.C:
|
||||
attempts++
|
||||
continue
|
||||
|
||||
case <-ctx.Done():
|
||||
if err := ctx.Err(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithTx(ctx context.Context, db *sql.DB, fn func(tx *sql.Tx) error) error {
|
||||
var tx *sql.Tx
|
||||
|
||||
|
Reference in New Issue
Block a user