Compare commits

...

5 Commits

Author SHA1 Message Date
2fc590d708 feat(storage): retry sqlite failed transaction when database is busy
All checks were successful
arcad/edge/pipeline/head This commit looks good
2023-10-22 23:18:02 +02:00
6e4bf2f025 feat(storage): remap rpc errors
All checks were successful
arcad/edge/pipeline/head This commit looks good
2023-10-22 23:04:56 +02:00
22a3326be9 feat(lifecycle): execute onInit func asynchronously
All checks were successful
arcad/edge/pipeline/head This commit looks good
2023-10-22 10:47:44 +02:00
0cfb132b65 feat(lifecycle-module): add debug message for onInit() execution
All checks were successful
arcad/edge/pipeline/head This commit looks good
2023-10-21 21:46:51 +02:00
de4ab0d02c fix(bus): prevent double close in event dispatcher
All checks were successful
arcad/edge/pipeline/head This commit looks good
2023-10-21 21:38:34 +02:00
11 changed files with 95 additions and 26 deletions

View File

@ -83,8 +83,12 @@ func (d *eventDispatcher) Close() {
} }
func (d *eventDispatcher) close() { func (d *eventDispatcher) close() {
d.closed = true if d.closed {
return
}
close(d.in) close(d.in)
d.closed = true
} }
func (d *eventDispatcher) In(msg bus.Message) (err error) { func (d *eventDispatcher) In(msg bus.Message) (err error) {

View File

@ -19,29 +19,16 @@ func (m *LifecycleModule) Export(export *goja.Object) {
} }
func (m *LifecycleModule) OnInit(ctx context.Context, rt *goja.Runtime) (err error) { func (m *LifecycleModule) OnInit(ctx context.Context, rt *goja.Runtime) (err error) {
call, ok := goja.AssertFunction(rt.Get("onInit")) _, ok := goja.AssertFunction(rt.Get("onInit"))
if !ok { if !ok {
logger.Warn(ctx, "could not find onInit() function") logger.Warn(ctx, "could not find onInit() function")
return nil return nil
} }
defer func() { if _, err := rt.RunString("setTimeout(onInit, 0)"); err != nil {
if recovered := recover(); recovered != nil { return errors.WithStack(err)
revoveredErr, ok := recovered.(error) }
if ok {
logger.Error(ctx, "recovered runtime error", logger.CapturedE(errors.WithStack(revoveredErr)))
err = errors.WithStack(app.ErrUnknownError)
return
}
panic(recovered)
}
}()
call(nil)
return nil return nil
} }

View File

@ -63,7 +63,7 @@ func (s *BlobStore) OpenBucket(ctx context.Context, name string) (storage.BlobBu
func (s *BlobStore) call(ctx context.Context, serviceMethod string, args any, reply any) error { func (s *BlobStore) call(ctx context.Context, serviceMethod string, args any, reply any) error {
err := s.withClient(ctx, func(ctx context.Context, client *rpc.Client) error { err := s.withClient(ctx, func(ctx context.Context, client *rpc.Client) error {
if err := client.Call(ctx, serviceMethod, args, reply); err != nil { if err := client.Call(ctx, serviceMethod, args, reply); err != nil {
return errors.WithStack(err) return errors.WithStack(remapBlobError(err))
} }
return nil return nil

View File

@ -96,7 +96,7 @@ func (s *DocumentStore) Upsert(ctx context.Context, collection string, doc stora
func (s *DocumentStore) call(ctx context.Context, serviceMethod string, args any, reply any) error { func (s *DocumentStore) call(ctx context.Context, serviceMethod string, args any, reply any) error {
err := s.withClient(ctx, func(ctx context.Context, client *rpc.Client) error { err := s.withClient(ctx, func(ctx context.Context, client *rpc.Client) error {
if err := client.Call(ctx, serviceMethod, args, reply); err != nil { if err := client.Call(ctx, serviceMethod, args, reply); err != nil {
return errors.WithStack(err) return errors.WithStack(remapDocumentError(err))
} }
return nil return nil

View File

@ -1,10 +1,31 @@
package client package client
import ( import (
"forge.cadoles.com/arcad/edge/pkg/storage"
"forge.cadoles.com/arcad/edge/pkg/storage/share" "forge.cadoles.com/arcad/edge/pkg/storage/share"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
func remapBlobError(err error) error {
switch errors.Cause(err).Error() {
case storage.ErrBlobNotFound.Error():
return storage.ErrBlobNotFound
case storage.ErrBucketClosed.Error():
return storage.ErrBucketClosed
default:
return err
}
}
func remapDocumentError(err error) error {
switch errors.Cause(err).Error() {
case storage.ErrDocumentNotFound.Error():
return storage.ErrDocumentNotFound
default:
return err
}
}
func remapShareError(err error) error { func remapShareError(err error) error {
switch errors.Cause(err).Error() { switch errors.Cause(err).Error() {
case share.ErrAttributeRequired.Error(): case share.ErrAttributeRequired.Error():

View File

@ -236,7 +236,7 @@ func (b *BlobBucket) withTx(ctx context.Context, fn func(tx *sql.Tx) error) erro
return errors.WithStack(err) return errors.WithStack(err)
} }
if err := WithTx(ctx, db, fn); err != nil { if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }
@ -335,7 +335,7 @@ func (wbc *blobWriterCloser) withTx(ctx context.Context, fn func(tx *sql.Tx) err
return errors.WithStack(err) return errors.WithStack(err)
} }
if err := WithTx(ctx, db, fn); err != nil { if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }
@ -444,7 +444,7 @@ func (brc *blobReaderCloser) withTx(ctx context.Context, fn func(tx *sql.Tx) err
return errors.WithStack(err) return errors.WithStack(err)
} }
if err := WithTx(ctx, db, fn); err != nil { if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }

View File

@ -114,7 +114,7 @@ func (s *BlobStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) error
return errors.WithStack(err) return errors.WithStack(err)
} }
if err := WithTx(ctx, db, fn); err != nil { if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }

View File

@ -0,0 +1,3 @@
package sqlite
const sqliteBusyMaxRetry = 5

View File

@ -269,7 +269,7 @@ func (s *DocumentStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) e
return errors.WithStack(err) return errors.WithStack(err)
} }
if err := WithTx(ctx, db, fn); err != nil { if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }

View File

@ -368,7 +368,7 @@ func (s *ShareStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) erro
return errors.WithStack(err) return errors.WithStack(err)
} }
if err := WithTx(ctx, db, fn); err != nil { if err := WithRetry(ctx, db, sqliteBusyMaxRetry, fn); err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }

View File

@ -3,7 +3,9 @@ package sqlite
import ( import (
"context" "context"
"database/sql" "database/sql"
"strings"
"sync" "sync"
"time"
"github.com/pkg/errors" "github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger" "gitlab.com/wpetit/goweb/logger"
@ -22,6 +24,58 @@ func Open(path string) (*sql.DB, error) {
return db, nil return db, nil
} }
func WithRetry(ctx context.Context, db *sql.DB, max int, fn func(*sql.Tx) error) error {
attempts := 0
ctx = logger.With(ctx, logger.F("max", max))
var err error
for {
ctx = logger.With(ctx)
if attempts >= max {
logger.Debug(ctx, "transaction retrying failed", logger.F("attempts", attempts))
return errors.Wrapf(err, "transaction failed after %d attempts", max)
}
err = WithTx(ctx, db, fn)
if err != nil {
if !strings.Contains(err.Error(), "(5) (SQLITE_BUSY)") {
return errors.WithStack(err)
}
err = errors.WithStack(err)
logger.Warn(ctx, "database is busy", logger.E(err))
wait := time.Duration(8<<(attempts+1)) * time.Millisecond
logger.Debug(
ctx, "database is busy, waiting before retrying transaction",
logger.F("wait", wait.String()),
logger.F("attempts", attempts),
)
timer := time.NewTimer(wait)
select {
case <-timer.C:
attempts++
continue
case <-ctx.Done():
if err := ctx.Err(); err != nil {
return errors.WithStack(err)
}
return nil
}
}
return nil
}
}
func WithTx(ctx context.Context, db *sql.DB, fn func(tx *sql.Tx) error) error { func WithTx(ctx context.Context, db *sql.DB, fn func(tx *sql.Tx) error) error {
var tx *sql.Tx var tx *sql.Tx