diff --git a/cmd/cli/command/app/run.go b/cmd/cli/command/app/run.go index d6169ec..c4b8719 100644 --- a/cmd/cli/command/app/run.go +++ b/cmd/cli/command/app/run.go @@ -107,10 +107,10 @@ func RunCommand() *cli.Command { Usage: "use `FILE` as local accounts", Value: ".edge/%APPID%/accounts.json", }, - &cli.IntFlag{ + &cli.Int64Flag{ Name: "max-upload-size", Usage: "use `MAX-UPLOAD-SIZE` as blob max upload size", - Value: 10 << (10 * 2), // 10Mb + Value: 128 << (10 * 2), // 128Mb }, }, Action: func(ctx *cli.Context) error { @@ -123,7 +123,7 @@ func RunCommand() *cli.Command { documentstoreDSN := ctx.String("documentstore-dsn") shareStoreDSN := ctx.String("sharestore-dsn") accountsFile := ctx.String("accounts-file") - maxUploadSize := ctx.Int("max-upload-size") + maxUploadSize := ctx.Int64("max-upload-size") logger.SetFormat(logger.Format(logFormat)) logger.SetLevel(logger.Level(logLevel)) @@ -182,7 +182,7 @@ func RunCommand() *cli.Command { } } -func runApp(ctx context.Context, path, address, documentStoreDSN, blobStoreDSN, shareStoreDSN, accountsFile string, appRepository appModule.Repository, maxUploadSize int) error { +func runApp(ctx context.Context, path, address, documentStoreDSN, blobStoreDSN, shareStoreDSN, accountsFile string, appRepository appModule.Repository, maxUploadSize int64) error { absPath, err := filepath.Abs(path) if err != nil { return errors.Wrapf(err, "could not resolve path '%s'", path) diff --git a/go.mod b/go.mod index 4549a4e..9f7ff55 100644 --- a/go.mod +++ b/go.mod @@ -3,25 +3,26 @@ module forge.cadoles.com/arcad/edge go 1.21 require ( + github.com/allegro/bigcache/v3 v3.1.0 github.com/getsentry/sentry-go v0.25.0 github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/hashicorp/mdns v1.0.5 + github.com/jackc/puddle/v2 v2.2.1 github.com/keegancsmith/rpc v1.3.0 github.com/klauspost/compress v1.16.6 github.com/lestrrat-go/jwx/v2 v2.0.8 github.com/ulikunitz/xz v0.5.11 + go.uber.org/goleak v1.3.0 modernc.org/sqlite v1.20.4 ) require ( cloud.google.com/go v0.75.0 // indirect - github.com/allegro/bigcache/v3 v3.1.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect github.com/go-playground/locales v0.14.0 // indirect github.com/go-playground/universal-translator v0.18.0 // indirect github.com/goccy/go-json v0.9.11 // indirect github.com/gogo/protobuf v0.0.0-20161014173244-50d1bd39ce4e // indirect - github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/leodido/go-urn v1.2.1 // indirect github.com/lestrrat-go/blackmagic v1.0.1 // indirect github.com/lestrrat-go/httpcc v1.0.1 // indirect @@ -29,7 +30,6 @@ require ( github.com/lestrrat-go/iter v1.0.2 // indirect github.com/lestrrat-go/option v1.0.0 // indirect github.com/miekg/dns v1.1.53 // indirect - go.uber.org/goleak v1.3.0 // indirect golang.org/x/sync v0.1.0 // indirect google.golang.org/genproto v0.0.0-20210226172003-ab064af71705 // indirect google.golang.org/grpc v1.35.0 // indirect diff --git a/pkg/http/context.go b/pkg/http/context.go index e492dd3..786838d 100644 --- a/pkg/http/context.go +++ b/pkg/http/context.go @@ -6,7 +6,6 @@ import ( "net/http" "forge.cadoles.com/arcad/edge/pkg/bus" - "github.com/pkg/errors" ) type contextKey string @@ -15,15 +14,16 @@ var ( contextKeyBus contextKey = "bus" contextKeyHTTPRequest contextKey = "httpRequest" contextKeyHTTPClient contextKey = "httpClient" + contextKeySessionID contextKey = "sessionId" ) func (h *Handler) contextMiddleware(next http.Handler) http.Handler { fn := func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - ctx = context.WithValue(ctx, contextKeyBus, h.bus) - ctx = context.WithValue(ctx, contextKeyHTTPRequest, r) - ctx = context.WithValue(ctx, contextKeyHTTPClient, h.httpClient) + ctx = WithContextBus(ctx, h.bus) + ctx = WithContextHTTPRequest(ctx, r) + ctx = WithContextHTTPClient(ctx, h.httpClient) r = r.WithContext(ctx) @@ -33,23 +33,43 @@ func (h *Handler) contextMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(fn) } -func ContextBus(ctx context.Context) bus.Bus { +func ContextBus(ctx context.Context) (bus.Bus, bool) { return contextValue[bus.Bus](ctx, contextKeyBus) } -func ContextHTTPRequest(ctx context.Context) *http.Request { +func WithContextBus(parent context.Context, bus bus.Bus) context.Context { + return context.WithValue(parent, contextKeyBus, bus) +} + +func ContextHTTPRequest(ctx context.Context) (*http.Request, bool) { return contextValue[*http.Request](ctx, contextKeyHTTPRequest) } -func ContextHTTPClient(ctx context.Context) *http.Client { +func WithContextHTTPRequest(parent context.Context, request *http.Request) context.Context { + return context.WithValue(parent, contextKeyHTTPRequest, request) +} + +func ContextHTTPClient(ctx context.Context) (*http.Client, bool) { return contextValue[*http.Client](ctx, contextKeyHTTPClient) } -func contextValue[T any](ctx context.Context, key any) T { +func WithContextHTTPClient(parent context.Context, client *http.Client) context.Context { + return context.WithValue(parent, contextKeyHTTPClient, client) +} + +func ContextSessionID(ctx context.Context) (string, bool) { + return contextValue[string](ctx, contextKeySessionID) +} + +func WithContextSessionID(parent context.Context, sessionID string) context.Context { + return context.WithValue(parent, contextKeySessionID, sessionID) +} + +func contextValue[T any](ctx context.Context, key any) (T, bool) { value, ok := ctx.Value(key).(T) if !ok { - panic(errors.Errorf("could not find key '%v' on context", key)) + return *new(T), false } - return value + return value, true } diff --git a/pkg/http/handler.go b/pkg/http/handler.go index c9cf99e..f0061a8 100644 --- a/pkg/http/handler.go +++ b/pkg/http/handler.go @@ -102,14 +102,12 @@ func NewHandler(funcs ...HandlerOptionFunc) *Handler { r.Get("/client.js.map", handler.handleSDKClientMap) }) - r.Group(func(r chi.Router) { - r.Use(handler.contextMiddleware) - for _, fn := range opts.HTTPMounts { - r.Group(func(r chi.Router) { - fn(r) - }) - } - }) + for _, fn := range opts.HTTPMounts { + r.Group(func(r chi.Router) { + r.Use(handler.contextMiddleware) + fn(r) + }) + } r.HandleFunc("/sock/*", handler.handleSockJS) }) diff --git a/pkg/http/sockjs.go b/pkg/http/sockjs.go index 9b7b3c6..551e6b3 100644 --- a/pkg/http/sockjs.go +++ b/pkg/http/sockjs.go @@ -5,7 +5,6 @@ import ( "encoding/json" "net/http" - "forge.cadoles.com/arcad/edge/pkg/module" "github.com/igm/sockjs-go/v3/sockjs" "github.com/pkg/errors" "gitlab.com/wpetit/goweb/logger" @@ -15,11 +14,6 @@ const ( statusChannelClosed = iota ) -const ( - ContextKeySessionID module.ContextKey = "sessionId" - ContextKeyOriginRequest module.ContextKey = "originRequest" -) - func (h *Handler) handleSockJS(w http.ResponseWriter, r *http.Request) { h.mutex.RLock() defer h.mutex.RUnlock() @@ -181,10 +175,8 @@ func (h *Handler) handleIncomingMessages(ctx context.Context, sess sockjs.Sessio } ctx := logger.With(ctx, logger.F("payload", payload)) - ctx = module.WithContext(ctx, map[module.ContextKey]any{ - ContextKeySessionID: sess.ID(), - ContextKeyOriginRequest: sess.Request(), - }) + ctx = WithContextHTTPRequest(ctx, sess.Request()) + ctx = WithContextSessionID(ctx, sess.ID()) incomingMessage := NewIncomingMessageEnvelope(ctx, payload) diff --git a/pkg/module/auth/module.go b/pkg/module/auth/module.go index 7732d01..96dbfb6 100644 --- a/pkg/module/auth/module.go +++ b/pkg/module/auth/module.go @@ -1,10 +1,8 @@ package auth import ( - "net/http" - "forge.cadoles.com/arcad/edge/pkg/app" - edgeHTTP "forge.cadoles.com/arcad/edge/pkg/http" + edgehttp "forge.cadoles.com/arcad/edge/pkg/http" "forge.cadoles.com/arcad/edge/pkg/jwtutil" "forge.cadoles.com/arcad/edge/pkg/module/util" "github.com/dop251/goja" @@ -68,7 +66,7 @@ func (m *Module) getClaim(call goja.FunctionCall, rt *goja.Runtime) goja.Value { ctx := util.AssertContext(call.Argument(0), rt) claimName := util.AssertString(call.Argument(1), rt) - req, ok := ctx.Value(edgeHTTP.ContextKeyOriginRequest).(*http.Request) + req, ok := edgehttp.ContextHTTPRequest(ctx) if !ok { panic(rt.ToValue(errors.New("could not find http request in context"))) } diff --git a/pkg/module/auth/module_test.go b/pkg/module/auth/module_test.go index de14312..edddbc9 100644 --- a/pkg/module/auth/module_test.go +++ b/pkg/module/auth/module_test.go @@ -9,7 +9,7 @@ import ( "cdr.dev/slog" "forge.cadoles.com/arcad/edge/pkg/app" - edgeHTTP "forge.cadoles.com/arcad/edge/pkg/http" + edgehttp "forge.cadoles.com/arcad/edge/pkg/http" "forge.cadoles.com/arcad/edge/pkg/jwtutil" "forge.cadoles.com/arcad/edge/pkg/module" "github.com/lestrrat-go/jwx/v2/jwa" @@ -71,7 +71,7 @@ func TestAuthModule(t *testing.T) { req.Header.Add("Authorization", "Bearer "+string(rawToken)) - ctx = context.WithValue(context.Background(), edgeHTTP.ContextKeyOriginRequest, req) + ctx = edgehttp.WithContextHTTPRequest(context.Background(), req) if _, err := server.ExecFuncByName(ctx, "testAuth", ctx); err != nil { t.Fatalf("%+v", errors.WithStack(err)) @@ -111,7 +111,7 @@ func TestAuthAnonymousModule(t *testing.T) { t.Fatalf("%+v", errors.WithStack(err)) } - ctx = context.WithValue(context.Background(), edgeHTTP.ContextKeyOriginRequest, req) + ctx = edgehttp.WithContextHTTPRequest(context.Background(), req) if _, err := server.ExecFuncByName(ctx, "testAuth", ctx); err != nil { t.Fatalf("%+v", errors.WithStack(err)) diff --git a/pkg/module/blob/http.go b/pkg/module/blob/http.go index fc4edcb..0c61a4f 100644 --- a/pkg/module/blob/http.go +++ b/pkg/module/blob/http.go @@ -21,19 +21,17 @@ type uploadResponse struct { BlobID storage.BlobID `json:"blobId"` } -func Mount(uploadMaxFileSize int) func(r chi.Router) { +func Mount(uploadMaxFileSize int64) func(r chi.Router) { return func(r chi.Router) { r.Post("/api/v1/upload", getAppUploadHandler(uploadMaxFileSize)) r.Get("/api/v1/download/{bucket}/{blobID}", handleAppDownload) } } -func getAppUploadHandler(fileMaxUpload int) func(w http.ResponseWriter, r *http.Request) { +func getAppUploadHandler(uploadMaxFileSize int64) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - uploadMaxFileSize := int64(8000) - r.Body = http.MaxBytesReader(w, r.Body, uploadMaxFileSize) if err := r.ParseMultipartForm(uploadMaxFileSize); err != nil { @@ -65,7 +63,13 @@ func getAppUploadHandler(fileMaxUpload int) func(w http.ResponseWriter, r *http. requestEnv := NewUploadRequestEnvelope(ctx, fileHeader, metadata) - bus := edgehttp.ContextBus(ctx) + bus, ok := edgehttp.ContextBus(ctx) + if !ok { + logger.Error(ctx, "could find bus on context") + edgehttp.JSONError(w, http.StatusInternalServerError, edgehttp.ErrCodeInternalError) + + return + } reply, err := bus.Request(ctx, requestEnv) if err != nil { @@ -114,7 +118,13 @@ func handleAppDownload(w http.ResponseWriter, r *http.Request) { requestMsg := NewDownloadRequestEnvelope(ctx, bucket, storage.BlobID(blobID)) - bs := edgehttp.ContextBus(ctx) + bs, ok := edgehttp.ContextBus(ctx) + if !ok { + logger.Error(ctx, "could find bus on context") + edgehttp.JSONError(w, http.StatusInternalServerError, edgehttp.ErrCodeInternalError) + + return + } reply, err := bs.Request(ctx, requestMsg) if err != nil { diff --git a/pkg/module/fetch/http.go b/pkg/module/fetch/http.go index e30b975..df38679 100644 --- a/pkg/module/fetch/http.go +++ b/pkg/module/fetch/http.go @@ -31,7 +31,13 @@ func handleAppFetch(w http.ResponseWriter, r *http.Request) { requestMsg := NewFetchRequestEnvelope(ctx, r.RemoteAddr, url) - bus := edgehttp.ContextBus(ctx) + bus, ok := edgehttp.ContextBus(ctx) + if !ok { + logger.Error(ctx, "could find bus on context") + edgehttp.JSONError(w, http.StatusInternalServerError, edgehttp.ErrCodeInternalError) + + return + } reply, err := bus.Request(ctx, requestMsg) if err != nil { @@ -79,7 +85,13 @@ func handleAppFetch(w http.ResponseWriter, r *http.Request) { proxyReq.Header.Add("X-Forwarded-From", r.RemoteAddr) - httpClient := edgehttp.ContextHTTPClient(ctx) + httpClient, ok := edgehttp.ContextHTTPClient(ctx) + if !ok { + logger.Error(ctx, "could find http client on context") + edgehttp.JSONError(w, http.StatusInternalServerError, edgehttp.ErrCodeInternalError) + + return + } res, err := httpClient.Do(proxyReq) if err != nil { diff --git a/pkg/module/net/module.go b/pkg/module/net/module.go index 9e4c44e..edc43b8 100644 --- a/pkg/module/net/module.go +++ b/pkg/module/net/module.go @@ -6,7 +6,6 @@ import ( "forge.cadoles.com/arcad/edge/pkg/app" "forge.cadoles.com/arcad/edge/pkg/bus" edgehttp "forge.cadoles.com/arcad/edge/pkg/http" - "forge.cadoles.com/arcad/edge/pkg/module" "forge.cadoles.com/arcad/edge/pkg/module/util" "github.com/dop251/goja" "github.com/pkg/errors" @@ -57,7 +56,10 @@ func (m *Module) send(call goja.FunctionCall, rt *goja.Runtime) goja.Value { sessionID, ok := firstArg.Export().(string) if !ok { ctx := util.AssertContext(firstArg, rt) - sessionID = module.ContextValue[string](ctx, edgehttp.ContextKeySessionID) + sessionID, ok = edgehttp.ContextSessionID(ctx) + if !ok { + panic(rt.ToValue(errors.New("could not find session id in context"))) + } } data := call.Argument(1).Export() diff --git a/pkg/module/rpc/module.go b/pkg/module/rpc/module.go index f134677..1716c87 100644 --- a/pkg/module/rpc/module.go +++ b/pkg/module/rpc/module.go @@ -7,7 +7,6 @@ import ( "forge.cadoles.com/arcad/edge/pkg/app" "forge.cadoles.com/arcad/edge/pkg/bus" edgehttp "forge.cadoles.com/arcad/edge/pkg/http" - "forge.cadoles.com/arcad/edge/pkg/module" "forge.cadoles.com/arcad/edge/pkg/module/util" "github.com/dop251/goja" "github.com/pkg/errors" @@ -143,10 +142,15 @@ func (m *Module) handleIncomingHTTPMessages(ctx context.Context, incoming <-chan continue } - requestCtx := logger.With(msg.Context, logger.F("rpcRequestMethod", jsonReq.Method), logger.F("rpcRequestID", jsonReq.ID)) + sessionID, ok := edgehttp.ContextSessionID(msg.Context) + if !ok { + logger.Error(ctx, "could not find session id in context") + continue + } request := NewRequestEnvelope(msg.Context, jsonReq.Method, jsonReq.Params) - sessionID := module.ContextValue[string](msg.Context, edgehttp.ContextKeySessionID) + + requestCtx := logger.With(msg.Context, logger.F("rpcRequestMethod", jsonReq.Method), logger.F("rpcRequestID", jsonReq.ID)) reply, err := m.bus.Request(requestCtx, request) if err != nil { diff --git a/pkg/storage/driver/cache/blob_bucket.go b/pkg/storage/driver/cache/blob_bucket.go index 14a3acf..34827b1 100644 --- a/pkg/storage/driver/cache/blob_bucket.go +++ b/pkg/storage/driver/cache/blob_bucket.go @@ -7,26 +7,28 @@ import ( "forge.cadoles.com/arcad/edge/pkg/storage" "github.com/allegro/bigcache/v3" + "github.com/hashicorp/golang-lru/v2/expirable" "github.com/pkg/errors" "gitlab.com/wpetit/goweb/logger" ) type BlobBucket struct { - bucket storage.BlobBucket - cache *bigcache.BigCache + bucket storage.BlobBucket + contentCache *bigcache.BigCache + blobInfoCache *expirable.LRU[string, storage.BlobInfo] + bucketCache *expirable.LRU[string, storage.BlobBucket] } // Close implements storage.BlobBucket. func (b *BlobBucket) Close() error { - if err := b.bucket.Close(); err != nil { - return errors.WithStack(err) - } - + // Close only when bucket is evicted from cache return nil } // Delete implements storage.BlobBucket. func (b *BlobBucket) Delete(ctx context.Context, id storage.BlobID) error { + defer b.clearCache(ctx, id) + if err := b.bucket.Delete(ctx, id); err != nil { return errors.WithStack(err) } @@ -36,11 +38,28 @@ func (b *BlobBucket) Delete(ctx context.Context, id storage.BlobID) error { // Get implements storage.BlobBucket. func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobInfo, error) { + key := b.getCacheKey(id) + if blobInfo, ok := b.blobInfoCache.Get(key); ok { + logger.Debug( + ctx, "found blob info in cache", + logger.F("cacheKey", key), + ) + + return blobInfo, nil + } + info, err := b.bucket.Get(ctx, id) if err != nil { + if errors.Is(err, storage.ErrBucketClosed) { + b.clearCache(ctx, id) + b.bucketCache.Remove(b.Name()) + } + return nil, errors.WithStack(err) } + b.blobInfoCache.Add(key, info) + return info, nil } @@ -48,9 +67,18 @@ func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobIn func (b *BlobBucket) List(ctx context.Context) ([]storage.BlobInfo, error) { infos, err := b.bucket.List(ctx) if err != nil { + if errors.Is(err, storage.ErrBucketClosed) { + b.bucketCache.Remove(b.Name()) + } + return nil, errors.WithStack(err) } + for _, ifo := range infos { + key := b.getCacheKey(ifo.ID()) + b.blobInfoCache.Add(key, ifo) + } + return infos, nil } @@ -61,19 +89,28 @@ func (b *BlobBucket) Name() string { // NewReader implements storage.BlobBucket. func (b *BlobBucket) NewReader(ctx context.Context, id storage.BlobID) (io.ReadSeekCloser, error) { - if cached, exist := b.inCache(id); exist { - logger.Debug(ctx, "found blob in cache", logger.F("cacheKey", b.getCacheKey(id)), logger.F("cacheStats", b.cache.Stats())) + if cached, exist := b.inContentCache(id); exist { + logger.Debug( + ctx, "found blob content in cache", + logger.F("cacheKey", b.getCacheKey(id)), + logger.F("cacheStats", b.contentCache.Stats()), + ) return cached, nil } reader, err := b.bucket.NewReader(ctx, id) if err != nil { + if errors.Is(err, storage.ErrBucketClosed) { + b.clearCache(ctx, id) + b.bucketCache.Remove(b.Name()) + } + return nil, errors.WithStack(err) } return &readCacher{ reader: reader, - cache: b.cache, + cache: b.contentCache, key: b.getCacheKey(id), }, nil } @@ -82,9 +119,9 @@ func (b *BlobBucket) getCacheKey(id storage.BlobID) string { return fmt.Sprintf("%s-%s", b.Name(), id) } -func (b *BlobBucket) inCache(id storage.BlobID) (io.ReadSeekCloser, bool) { +func (b *BlobBucket) inContentCache(id storage.BlobID) (io.ReadSeekCloser, bool) { key := b.getCacheKey(id) - data, err := b.cache.Get(key) + data, err := b.contentCache.Get(key) if err != nil { if errors.Is(err, bigcache.ErrEntryNotFound) { return nil, false @@ -98,10 +135,28 @@ func (b *BlobBucket) inCache(id storage.BlobID) (io.ReadSeekCloser, bool) { return &cachedReader{data, 0}, true } +func (b *BlobBucket) clearCache(ctx context.Context, id storage.BlobID) { + key := b.getCacheKey(id) + + logger.Debug(ctx, "clearing cache", logger.F("cacheKey", key)) + + if err := b.contentCache.Delete(key); err != nil { + logger.Error(ctx, "could not clear cache", logger.CapturedE(errors.WithStack(err))) + } + + b.blobInfoCache.Remove(key) +} + // NewWriter implements storage.BlobBucket. func (b *BlobBucket) NewWriter(ctx context.Context, id storage.BlobID) (io.WriteCloser, error) { + defer b.clearCache(ctx, id) + writer, err := b.bucket.NewWriter(ctx, id) if err != nil { + if errors.Is(err, storage.ErrBucketClosed) { + b.bucketCache.Remove(b.Name()) + } + return nil, errors.WithStack(err) } @@ -112,6 +167,10 @@ func (b *BlobBucket) NewWriter(ctx context.Context, id storage.BlobID) (io.Write func (b *BlobBucket) Size(ctx context.Context) (int64, error) { size, err := b.bucket.Size(ctx) if err != nil { + if errors.Is(err, storage.ErrBucketClosed) { + b.bucketCache.Remove(b.Name()) + } + return 0, errors.WithStack(err) } diff --git a/pkg/storage/driver/cache/blob_store.go b/pkg/storage/driver/cache/blob_store.go index c296a7d..fa6a331 100644 --- a/pkg/storage/driver/cache/blob_store.go +++ b/pkg/storage/driver/cache/blob_store.go @@ -5,12 +5,16 @@ import ( "forge.cadoles.com/arcad/edge/pkg/storage" "github.com/allegro/bigcache/v3" + "github.com/hashicorp/golang-lru/v2/expirable" "github.com/pkg/errors" + "gitlab.com/wpetit/goweb/logger" ) type BlobStore struct { - store storage.BlobStore - cache *bigcache.BigCache + store storage.BlobStore + contentCache *bigcache.BigCache + bucketCache *expirable.LRU[string, storage.BlobBucket] + blobInfoCache *expirable.LRU[string, storage.BlobInfo] } // DeleteBucket implements storage.BlobStore. @@ -19,6 +23,8 @@ func (s *BlobStore) DeleteBucket(ctx context.Context, name string) error { return errors.WithStack(err) } + s.bucketCache.Remove(name) + return nil } @@ -34,22 +40,62 @@ func (s *BlobStore) ListBuckets(ctx context.Context) ([]string, error) { // OpenBucket implements storage.BlobStore. func (s *BlobStore) OpenBucket(ctx context.Context, name string) (storage.BlobBucket, error) { + bucket, ok := s.bucketCache.Get(name) + if ok { + logger.Debug(ctx, "found bucket in cache", logger.F("name", name)) + + return &BlobBucket{ + bucket: bucket, + contentCache: s.contentCache, + blobInfoCache: s.blobInfoCache, + bucketCache: s.bucketCache, + }, nil + } + bucket, err := s.store.OpenBucket(ctx, name) if err != nil { return nil, errors.WithStack(err) } + s.bucketCache.Add(name, bucket) + return &BlobBucket{ - bucket: bucket, - cache: s.cache, + bucket: bucket, + contentCache: s.contentCache, + blobInfoCache: s.blobInfoCache, + bucketCache: s.bucketCache, }, nil } -func NewBlobStore(store storage.BlobStore, cache *bigcache.BigCache) *BlobStore { - return &BlobStore{ - store: store, - cache: cache, +func NewBlobStore(store storage.BlobStore, funcs ...OptionFunc) (*BlobStore, error) { + options := NewOptions(funcs...) + + cacheConfig := bigcache.DefaultConfig(options.CacheTTL) + cacheConfig.Logger = &cacheLogger{} + + contentCache, err := bigcache.New(context.Background(), cacheConfig) + if err != nil { + return nil, errors.WithStack(err) } + + onBlobBucketEvict := func(key string, bucket storage.BlobBucket) { + ctx := context.Background() + logger.Debug(ctx, "evicting blob bucket from cache", logger.F("cacheKey", key)) + + if err := bucket.Close(); err != nil { + logger.Error(ctx, "could not close bucket", logger.E(errors.WithStack(err))) + } + } + + bucketCache := expirable.NewLRU[string, storage.BlobBucket](options.BucketCacheSize, onBlobBucketEvict, options.CacheTTL) + blobInfoCache := expirable.NewLRU[string, storage.BlobInfo](options.BlobInfoCacheSize, nil, options.CacheTTL) + + return &BlobStore{ + store: store, + contentCache: contentCache, + bucketCache: bucketCache, + blobInfoCache: blobInfoCache, + }, nil } var _ storage.BlobStore = &BlobStore{} diff --git a/pkg/storage/driver/cache/blob_store_test.go b/pkg/storage/driver/cache/blob_store_test.go index 8bb2f2c..6715a00 100644 --- a/pkg/storage/driver/cache/blob_store_test.go +++ b/pkg/storage/driver/cache/blob_store_test.go @@ -9,7 +9,6 @@ import ( "forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite" "forge.cadoles.com/arcad/edge/pkg/storage/testsuite" - "github.com/allegro/bigcache/v3" "github.com/pkg/errors" "gitlab.com/wpetit/goweb/logger" ) @@ -30,13 +29,11 @@ func TestBlobStore(t *testing.T) { backend := sqlite.NewBlobStore(dsn) - cache, err := bigcache.New(context.Background(), bigcache.DefaultConfig(time.Minute)) + store, err := NewBlobStore(backend) if err != nil { t.Fatalf("%+v", errors.WithStack(err)) } - store := NewBlobStore(backend, cache) - testsuite.TestBlobStore(context.Background(), t, store) } @@ -52,12 +49,10 @@ func BenchmarkBlobStore(t *testing.B) { dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds()) backend := sqlite.NewBlobStore(dsn) - cache, err := bigcache.New(context.Background(), bigcache.DefaultConfig(time.Minute)) + store, err := NewBlobStore(backend) if err != nil { t.Fatalf("%+v", errors.WithStack(err)) } - store := NewBlobStore(backend, cache) - testsuite.BenchmarkBlobStore(t, store) } diff --git a/pkg/storage/driver/cache/driver.go b/pkg/storage/driver/cache/driver.go index 9c60d7a..b02e8d8 100644 --- a/pkg/storage/driver/cache/driver.go +++ b/pkg/storage/driver/cache/driver.go @@ -28,7 +28,7 @@ func blobStoreFactory(dsn *url.URL) (storage.BlobStore, error) { query.Del("driver") - cacheTTL := time.Minute * 60 + blobStoreOptionFuncs := make([]OptionFunc, 0) rawCacheTTL := query.Get("cacheTTL") if rawCacheTTL != "" { @@ -39,41 +39,55 @@ func blobStoreFactory(dsn *url.URL) (storage.BlobStore, error) { return nil, errors.Wrap(err, "could not parse url parameter 'cacheTTL'") } - cacheTTL = ttl + blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithCacheTTL(ttl)) } - cacheConfig := bigcache.DefaultConfig(cacheTTL) - cacheConfig.Logger = &cacheLogger{} - - rawCacheShards := query.Get("cacheShards") + rawCacheShards := query.Get("blobCacheShards") if rawCacheShards != "" { - query.Del("cacheShards") + query.Del("blobCacheShards") cacheShards, err := strconv.ParseInt(rawCacheShards, 10, 32) if err != nil { - return nil, errors.Wrap(err, "could not parse url parameter 'cacheShards'") + return nil, errors.Wrap(err, "could not parse url parameter 'blobCacheShards'") } - cacheConfig.Shards = int(cacheShards) + blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBlobCacheShards(int(cacheShards))) } - rawMaxCacheSize := query.Get("maxCacheSize") - if rawMaxCacheSize != "" { - query.Del("maxCacheSize") + rawBlobCacheMaxMemorySize := query.Get("blobCacheMaxMemorySize") + if rawBlobCacheMaxMemorySize != "" { + query.Del("blobCacheMaxMemorySize") - maxCacheSize, err := strconv.ParseInt(rawMaxCacheSize, 10, 32) + blobCacheMaxMemorySize, err := strconv.ParseInt(rawBlobCacheMaxMemorySize, 10, 32) if err != nil { - return nil, errors.Wrap(err, "could not parse url parameter 'maxCacheSize'") + return nil, errors.Wrap(err, "could not parse url parameter 'blobCacheMaxMemorySize'") } - // See cacheConfig.HardMaxCacheSize documentation - var minCacheSize int64 = (2 * (64 + 32) * int64(cacheConfig.Shards)) / 1000 + blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBlobCacheMaxMemorySize(int(blobCacheMaxMemorySize))) + } - if maxCacheSize < minCacheSize { - return nil, errors.Errorf("max cache size can not be set to a value below '%d'", minCacheSize) + rawBlobBucketCacheSize := query.Get("blobBucketCacheSize") + if rawBlobBucketCacheSize != "" { + query.Del("blobBucketCacheSize") + + blobBucketCacheSize, err := strconv.ParseInt(rawBlobBucketCacheSize, 10, 32) + if err != nil { + return nil, errors.Wrap(err, "could not parse url parameter 'blobBucketCacheSize'") } - cacheConfig.HardMaxCacheSize = int(maxCacheSize) + blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBucketCacheSize(int(blobBucketCacheSize))) + } + + rawBlobInfoCacheSize := query.Get("blobInfoCacheSize") + if rawBlobInfoCacheSize != "" { + query.Del("blobInfoCacheSize") + + blobInfoCacheSize, err := strconv.ParseInt(rawBlobInfoCacheSize, 10, 32) + if err != nil { + return nil, errors.Wrap(err, "could not parse url parameter 'blobInfoCacheSize'") + } + + blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBlobInfoCacheSize(int(blobInfoCacheSize))) } url := &url.URL{ @@ -83,17 +97,17 @@ func blobStoreFactory(dsn *url.URL) (storage.BlobStore, error) { RawQuery: query.Encode(), } - store, err := driver.NewBlobStore(url.String()) + backend, err := driver.NewBlobStore(url.String()) if err != nil { return nil, errors.WithStack(err) } - cache, err := bigcache.New(context.Background(), cacheConfig) + store, err := NewBlobStore(backend, blobStoreOptionFuncs...) if err != nil { return nil, errors.WithStack(err) } - return NewBlobStore(store, cache), nil + return store, nil } type cacheLogger struct{} diff --git a/pkg/storage/driver/cache/options.go b/pkg/storage/driver/cache/options.go new file mode 100644 index 0000000..3424f90 --- /dev/null +++ b/pkg/storage/driver/cache/options.go @@ -0,0 +1,59 @@ +package cache + +import "time" + +type Options struct { + CacheTTL time.Duration + BlobCacheMaxMemorySize int + BlobCacheShards int + BucketCacheSize int + BlobInfoCacheSize int +} + +type OptionFunc func(opts *Options) + +func NewOptions(funcs ...OptionFunc) *Options { + opts := &Options{ + CacheTTL: 60 * time.Minute, + BlobCacheMaxMemorySize: 256, + BlobCacheShards: 1024, + BucketCacheSize: 16, + BlobInfoCacheSize: 512, + } + + for _, fn := range funcs { + fn(opts) + } + + return opts +} + +func WithCacheTTL(ttl time.Duration) OptionFunc { + return func(opts *Options) { + opts.CacheTTL = ttl + } +} + +func WithBlobCacheMaxMemorySize(size int) OptionFunc { + return func(opts *Options) { + opts.BlobCacheMaxMemorySize = size + } +} + +func WithBlobCacheShards(shards int) OptionFunc { + return func(opts *Options) { + opts.BlobCacheShards = shards + } +} + +func WithBucketCacheSize(size int) OptionFunc { + return func(opts *Options) { + opts.BucketCacheSize = size + } +} + +func WithBlobInfoCacheSize(size int) OptionFunc { + return func(opts *Options) { + opts.BlobInfoCacheSize = size + } +}