feat(storage): improve caching in cache driver
All checks were successful
arcad/edge/pipeline/head This commit looks good
All checks were successful
arcad/edge/pipeline/head This commit looks good
ref #20
This commit is contained in:
81
pkg/storage/driver/cache/blob_bucket.go
vendored
81
pkg/storage/driver/cache/blob_bucket.go
vendored
@ -7,26 +7,28 @@ import (
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"github.com/allegro/bigcache/v3"
|
||||
"github.com/hashicorp/golang-lru/v2/expirable"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
type BlobBucket struct {
|
||||
bucket storage.BlobBucket
|
||||
cache *bigcache.BigCache
|
||||
bucket storage.BlobBucket
|
||||
contentCache *bigcache.BigCache
|
||||
blobInfoCache *expirable.LRU[string, storage.BlobInfo]
|
||||
bucketCache *expirable.LRU[string, storage.BlobBucket]
|
||||
}
|
||||
|
||||
// Close implements storage.BlobBucket.
|
||||
func (b *BlobBucket) Close() error {
|
||||
if err := b.bucket.Close(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
// Close only when bucket is evicted from cache
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete implements storage.BlobBucket.
|
||||
func (b *BlobBucket) Delete(ctx context.Context, id storage.BlobID) error {
|
||||
defer b.clearCache(ctx, id)
|
||||
|
||||
if err := b.bucket.Delete(ctx, id); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
@ -36,11 +38,28 @@ func (b *BlobBucket) Delete(ctx context.Context, id storage.BlobID) error {
|
||||
|
||||
// Get implements storage.BlobBucket.
|
||||
func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobInfo, error) {
|
||||
key := b.getCacheKey(id)
|
||||
if blobInfo, ok := b.blobInfoCache.Get(key); ok {
|
||||
logger.Debug(
|
||||
ctx, "found blob info in cache",
|
||||
logger.F("cacheKey", key),
|
||||
)
|
||||
|
||||
return blobInfo, nil
|
||||
}
|
||||
|
||||
info, err := b.bucket.Get(ctx, id)
|
||||
if err != nil {
|
||||
if errors.Is(err, storage.ErrBucketClosed) {
|
||||
b.clearCache(ctx, id)
|
||||
b.bucketCache.Remove(b.Name())
|
||||
}
|
||||
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
b.blobInfoCache.Add(key, info)
|
||||
|
||||
return info, nil
|
||||
}
|
||||
|
||||
@ -48,9 +67,18 @@ func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobIn
|
||||
func (b *BlobBucket) List(ctx context.Context) ([]storage.BlobInfo, error) {
|
||||
infos, err := b.bucket.List(ctx)
|
||||
if err != nil {
|
||||
if errors.Is(err, storage.ErrBucketClosed) {
|
||||
b.bucketCache.Remove(b.Name())
|
||||
}
|
||||
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
for _, ifo := range infos {
|
||||
key := b.getCacheKey(ifo.ID())
|
||||
b.blobInfoCache.Add(key, ifo)
|
||||
}
|
||||
|
||||
return infos, nil
|
||||
}
|
||||
|
||||
@ -61,19 +89,28 @@ func (b *BlobBucket) Name() string {
|
||||
|
||||
// NewReader implements storage.BlobBucket.
|
||||
func (b *BlobBucket) NewReader(ctx context.Context, id storage.BlobID) (io.ReadSeekCloser, error) {
|
||||
if cached, exist := b.inCache(id); exist {
|
||||
logger.Debug(ctx, "found blob in cache", logger.F("cacheKey", b.getCacheKey(id)), logger.F("cacheStats", b.cache.Stats()))
|
||||
if cached, exist := b.inContentCache(id); exist {
|
||||
logger.Debug(
|
||||
ctx, "found blob content in cache",
|
||||
logger.F("cacheKey", b.getCacheKey(id)),
|
||||
logger.F("cacheStats", b.contentCache.Stats()),
|
||||
)
|
||||
return cached, nil
|
||||
}
|
||||
|
||||
reader, err := b.bucket.NewReader(ctx, id)
|
||||
if err != nil {
|
||||
if errors.Is(err, storage.ErrBucketClosed) {
|
||||
b.clearCache(ctx, id)
|
||||
b.bucketCache.Remove(b.Name())
|
||||
}
|
||||
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return &readCacher{
|
||||
reader: reader,
|
||||
cache: b.cache,
|
||||
cache: b.contentCache,
|
||||
key: b.getCacheKey(id),
|
||||
}, nil
|
||||
}
|
||||
@ -82,9 +119,9 @@ func (b *BlobBucket) getCacheKey(id storage.BlobID) string {
|
||||
return fmt.Sprintf("%s-%s", b.Name(), id)
|
||||
}
|
||||
|
||||
func (b *BlobBucket) inCache(id storage.BlobID) (io.ReadSeekCloser, bool) {
|
||||
func (b *BlobBucket) inContentCache(id storage.BlobID) (io.ReadSeekCloser, bool) {
|
||||
key := b.getCacheKey(id)
|
||||
data, err := b.cache.Get(key)
|
||||
data, err := b.contentCache.Get(key)
|
||||
if err != nil {
|
||||
if errors.Is(err, bigcache.ErrEntryNotFound) {
|
||||
return nil, false
|
||||
@ -98,10 +135,28 @@ func (b *BlobBucket) inCache(id storage.BlobID) (io.ReadSeekCloser, bool) {
|
||||
return &cachedReader{data, 0}, true
|
||||
}
|
||||
|
||||
func (b *BlobBucket) clearCache(ctx context.Context, id storage.BlobID) {
|
||||
key := b.getCacheKey(id)
|
||||
|
||||
logger.Debug(ctx, "clearing cache", logger.F("cacheKey", key))
|
||||
|
||||
if err := b.contentCache.Delete(key); err != nil {
|
||||
logger.Error(ctx, "could not clear cache", logger.CapturedE(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
b.blobInfoCache.Remove(key)
|
||||
}
|
||||
|
||||
// NewWriter implements storage.BlobBucket.
|
||||
func (b *BlobBucket) NewWriter(ctx context.Context, id storage.BlobID) (io.WriteCloser, error) {
|
||||
defer b.clearCache(ctx, id)
|
||||
|
||||
writer, err := b.bucket.NewWriter(ctx, id)
|
||||
if err != nil {
|
||||
if errors.Is(err, storage.ErrBucketClosed) {
|
||||
b.bucketCache.Remove(b.Name())
|
||||
}
|
||||
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
@ -112,6 +167,10 @@ func (b *BlobBucket) NewWriter(ctx context.Context, id storage.BlobID) (io.Write
|
||||
func (b *BlobBucket) Size(ctx context.Context) (int64, error) {
|
||||
size, err := b.bucket.Size(ctx)
|
||||
if err != nil {
|
||||
if errors.Is(err, storage.ErrBucketClosed) {
|
||||
b.bucketCache.Remove(b.Name())
|
||||
}
|
||||
|
||||
return 0, errors.WithStack(err)
|
||||
}
|
||||
|
||||
|
62
pkg/storage/driver/cache/blob_store.go
vendored
62
pkg/storage/driver/cache/blob_store.go
vendored
@ -5,12 +5,16 @@ import (
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"github.com/allegro/bigcache/v3"
|
||||
"github.com/hashicorp/golang-lru/v2/expirable"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
type BlobStore struct {
|
||||
store storage.BlobStore
|
||||
cache *bigcache.BigCache
|
||||
store storage.BlobStore
|
||||
contentCache *bigcache.BigCache
|
||||
bucketCache *expirable.LRU[string, storage.BlobBucket]
|
||||
blobInfoCache *expirable.LRU[string, storage.BlobInfo]
|
||||
}
|
||||
|
||||
// DeleteBucket implements storage.BlobStore.
|
||||
@ -19,6 +23,8 @@ func (s *BlobStore) DeleteBucket(ctx context.Context, name string) error {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
s.bucketCache.Remove(name)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -34,22 +40,62 @@ func (s *BlobStore) ListBuckets(ctx context.Context) ([]string, error) {
|
||||
|
||||
// OpenBucket implements storage.BlobStore.
|
||||
func (s *BlobStore) OpenBucket(ctx context.Context, name string) (storage.BlobBucket, error) {
|
||||
bucket, ok := s.bucketCache.Get(name)
|
||||
if ok {
|
||||
logger.Debug(ctx, "found bucket in cache", logger.F("name", name))
|
||||
|
||||
return &BlobBucket{
|
||||
bucket: bucket,
|
||||
contentCache: s.contentCache,
|
||||
blobInfoCache: s.blobInfoCache,
|
||||
bucketCache: s.bucketCache,
|
||||
}, nil
|
||||
}
|
||||
|
||||
bucket, err := s.store.OpenBucket(ctx, name)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
s.bucketCache.Add(name, bucket)
|
||||
|
||||
return &BlobBucket{
|
||||
bucket: bucket,
|
||||
cache: s.cache,
|
||||
bucket: bucket,
|
||||
contentCache: s.contentCache,
|
||||
blobInfoCache: s.blobInfoCache,
|
||||
bucketCache: s.bucketCache,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewBlobStore(store storage.BlobStore, cache *bigcache.BigCache) *BlobStore {
|
||||
return &BlobStore{
|
||||
store: store,
|
||||
cache: cache,
|
||||
func NewBlobStore(store storage.BlobStore, funcs ...OptionFunc) (*BlobStore, error) {
|
||||
options := NewOptions(funcs...)
|
||||
|
||||
cacheConfig := bigcache.DefaultConfig(options.CacheTTL)
|
||||
cacheConfig.Logger = &cacheLogger{}
|
||||
|
||||
contentCache, err := bigcache.New(context.Background(), cacheConfig)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
onBlobBucketEvict := func(key string, bucket storage.BlobBucket) {
|
||||
ctx := context.Background()
|
||||
logger.Debug(ctx, "evicting blob bucket from cache", logger.F("cacheKey", key))
|
||||
|
||||
if err := bucket.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close bucket", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}
|
||||
|
||||
bucketCache := expirable.NewLRU[string, storage.BlobBucket](options.BucketCacheSize, onBlobBucketEvict, options.CacheTTL)
|
||||
blobInfoCache := expirable.NewLRU[string, storage.BlobInfo](options.BlobInfoCacheSize, nil, options.CacheTTL)
|
||||
|
||||
return &BlobStore{
|
||||
store: store,
|
||||
contentCache: contentCache,
|
||||
bucketCache: bucketCache,
|
||||
blobInfoCache: blobInfoCache,
|
||||
}, nil
|
||||
}
|
||||
|
||||
var _ storage.BlobStore = &BlobStore{}
|
||||
|
9
pkg/storage/driver/cache/blob_store_test.go
vendored
9
pkg/storage/driver/cache/blob_store_test.go
vendored
@ -9,7 +9,6 @@ import (
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/testsuite"
|
||||
"github.com/allegro/bigcache/v3"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
@ -30,13 +29,11 @@ func TestBlobStore(t *testing.T) {
|
||||
|
||||
backend := sqlite.NewBlobStore(dsn)
|
||||
|
||||
cache, err := bigcache.New(context.Background(), bigcache.DefaultConfig(time.Minute))
|
||||
store, err := NewBlobStore(backend)
|
||||
if err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
store := NewBlobStore(backend, cache)
|
||||
|
||||
testsuite.TestBlobStore(context.Background(), t, store)
|
||||
}
|
||||
|
||||
@ -52,12 +49,10 @@ func BenchmarkBlobStore(t *testing.B) {
|
||||
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
|
||||
backend := sqlite.NewBlobStore(dsn)
|
||||
|
||||
cache, err := bigcache.New(context.Background(), bigcache.DefaultConfig(time.Minute))
|
||||
store, err := NewBlobStore(backend)
|
||||
if err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
store := NewBlobStore(backend, cache)
|
||||
|
||||
testsuite.BenchmarkBlobStore(t, store)
|
||||
}
|
||||
|
58
pkg/storage/driver/cache/driver.go
vendored
58
pkg/storage/driver/cache/driver.go
vendored
@ -28,7 +28,7 @@ func blobStoreFactory(dsn *url.URL) (storage.BlobStore, error) {
|
||||
|
||||
query.Del("driver")
|
||||
|
||||
cacheTTL := time.Minute * 60
|
||||
blobStoreOptionFuncs := make([]OptionFunc, 0)
|
||||
|
||||
rawCacheTTL := query.Get("cacheTTL")
|
||||
if rawCacheTTL != "" {
|
||||
@ -39,41 +39,55 @@ func blobStoreFactory(dsn *url.URL) (storage.BlobStore, error) {
|
||||
return nil, errors.Wrap(err, "could not parse url parameter 'cacheTTL'")
|
||||
}
|
||||
|
||||
cacheTTL = ttl
|
||||
blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithCacheTTL(ttl))
|
||||
}
|
||||
|
||||
cacheConfig := bigcache.DefaultConfig(cacheTTL)
|
||||
cacheConfig.Logger = &cacheLogger{}
|
||||
|
||||
rawCacheShards := query.Get("cacheShards")
|
||||
rawCacheShards := query.Get("blobCacheShards")
|
||||
if rawCacheShards != "" {
|
||||
query.Del("cacheShards")
|
||||
query.Del("blobCacheShards")
|
||||
|
||||
cacheShards, err := strconv.ParseInt(rawCacheShards, 10, 32)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not parse url parameter 'cacheShards'")
|
||||
return nil, errors.Wrap(err, "could not parse url parameter 'blobCacheShards'")
|
||||
}
|
||||
|
||||
cacheConfig.Shards = int(cacheShards)
|
||||
blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBlobCacheShards(int(cacheShards)))
|
||||
}
|
||||
|
||||
rawMaxCacheSize := query.Get("maxCacheSize")
|
||||
if rawMaxCacheSize != "" {
|
||||
query.Del("maxCacheSize")
|
||||
rawBlobCacheMaxMemorySize := query.Get("blobCacheMaxMemorySize")
|
||||
if rawBlobCacheMaxMemorySize != "" {
|
||||
query.Del("blobCacheMaxMemorySize")
|
||||
|
||||
maxCacheSize, err := strconv.ParseInt(rawMaxCacheSize, 10, 32)
|
||||
blobCacheMaxMemorySize, err := strconv.ParseInt(rawBlobCacheMaxMemorySize, 10, 32)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not parse url parameter 'maxCacheSize'")
|
||||
return nil, errors.Wrap(err, "could not parse url parameter 'blobCacheMaxMemorySize'")
|
||||
}
|
||||
|
||||
// See cacheConfig.HardMaxCacheSize documentation
|
||||
var minCacheSize int64 = (2 * (64 + 32) * int64(cacheConfig.Shards)) / 1000
|
||||
blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBlobCacheMaxMemorySize(int(blobCacheMaxMemorySize)))
|
||||
}
|
||||
|
||||
if maxCacheSize < minCacheSize {
|
||||
return nil, errors.Errorf("max cache size can not be set to a value below '%d'", minCacheSize)
|
||||
rawBlobBucketCacheSize := query.Get("blobBucketCacheSize")
|
||||
if rawBlobBucketCacheSize != "" {
|
||||
query.Del("blobBucketCacheSize")
|
||||
|
||||
blobBucketCacheSize, err := strconv.ParseInt(rawBlobBucketCacheSize, 10, 32)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not parse url parameter 'blobBucketCacheSize'")
|
||||
}
|
||||
|
||||
cacheConfig.HardMaxCacheSize = int(maxCacheSize)
|
||||
blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBucketCacheSize(int(blobBucketCacheSize)))
|
||||
}
|
||||
|
||||
rawBlobInfoCacheSize := query.Get("blobInfoCacheSize")
|
||||
if rawBlobInfoCacheSize != "" {
|
||||
query.Del("blobInfoCacheSize")
|
||||
|
||||
blobInfoCacheSize, err := strconv.ParseInt(rawBlobInfoCacheSize, 10, 32)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not parse url parameter 'blobInfoCacheSize'")
|
||||
}
|
||||
|
||||
blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBlobInfoCacheSize(int(blobInfoCacheSize)))
|
||||
}
|
||||
|
||||
url := &url.URL{
|
||||
@ -83,17 +97,17 @@ func blobStoreFactory(dsn *url.URL) (storage.BlobStore, error) {
|
||||
RawQuery: query.Encode(),
|
||||
}
|
||||
|
||||
store, err := driver.NewBlobStore(url.String())
|
||||
backend, err := driver.NewBlobStore(url.String())
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
cache, err := bigcache.New(context.Background(), cacheConfig)
|
||||
store, err := NewBlobStore(backend, blobStoreOptionFuncs...)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return NewBlobStore(store, cache), nil
|
||||
return store, nil
|
||||
}
|
||||
|
||||
type cacheLogger struct{}
|
||||
|
59
pkg/storage/driver/cache/options.go
vendored
Normal file
59
pkg/storage/driver/cache/options.go
vendored
Normal file
@ -0,0 +1,59 @@
|
||||
package cache
|
||||
|
||||
import "time"
|
||||
|
||||
type Options struct {
|
||||
CacheTTL time.Duration
|
||||
BlobCacheMaxMemorySize int
|
||||
BlobCacheShards int
|
||||
BucketCacheSize int
|
||||
BlobInfoCacheSize int
|
||||
}
|
||||
|
||||
type OptionFunc func(opts *Options)
|
||||
|
||||
func NewOptions(funcs ...OptionFunc) *Options {
|
||||
opts := &Options{
|
||||
CacheTTL: 60 * time.Minute,
|
||||
BlobCacheMaxMemorySize: 256,
|
||||
BlobCacheShards: 1024,
|
||||
BucketCacheSize: 16,
|
||||
BlobInfoCacheSize: 512,
|
||||
}
|
||||
|
||||
for _, fn := range funcs {
|
||||
fn(opts)
|
||||
}
|
||||
|
||||
return opts
|
||||
}
|
||||
|
||||
func WithCacheTTL(ttl time.Duration) OptionFunc {
|
||||
return func(opts *Options) {
|
||||
opts.CacheTTL = ttl
|
||||
}
|
||||
}
|
||||
|
||||
func WithBlobCacheMaxMemorySize(size int) OptionFunc {
|
||||
return func(opts *Options) {
|
||||
opts.BlobCacheMaxMemorySize = size
|
||||
}
|
||||
}
|
||||
|
||||
func WithBlobCacheShards(shards int) OptionFunc {
|
||||
return func(opts *Options) {
|
||||
opts.BlobCacheShards = shards
|
||||
}
|
||||
}
|
||||
|
||||
func WithBucketCacheSize(size int) OptionFunc {
|
||||
return func(opts *Options) {
|
||||
opts.BucketCacheSize = size
|
||||
}
|
||||
}
|
||||
|
||||
func WithBlobInfoCacheSize(size int) OptionFunc {
|
||||
return func(opts *Options) {
|
||||
opts.BlobInfoCacheSize = size
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user