diff --git a/.env.dist b/.env.dist index 0d432c4..cdbfaed 100644 --- a/.env.dist +++ b/.env.dist @@ -1,4 +1,4 @@ RUN_APP_ARGS="" #EDGE_DOCUMENTSTORE_DSN="rpc://localhost:3001/documentstore?tenant=local&appId=%APPID%" -#EDGE_BLOBSTORE_DSN="cache://localhost:3001/blobstore?driver=rpc&tenant=local&appId=%APPID%" +#EDGE_BLOBSTORE_DSN="cache://localhost:3001/blobstore?driver=rpc&tenant=local&appId=%APPID%&blobCacheStoreType=fs&blobCacheStoreBaseDir=data/cache/%APPID%&blobCacheSize=64MB" #EDGE_SHARESTORE_DSN="rpc://localhost:3001/sharestore?tenant=local" \ No newline at end of file diff --git a/go.mod b/go.mod index 1203320..0b6238a 100644 --- a/go.mod +++ b/go.mod @@ -3,14 +3,15 @@ module forge.cadoles.com/arcad/edge go 1.21 require ( - github.com/allegro/bigcache/v3 v3.1.0 github.com/getsentry/sentry-go v0.25.0 github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/hashicorp/mdns v1.0.5 + github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf github.com/jackc/puddle/v2 v2.2.1 github.com/keegancsmith/rpc v1.3.0 github.com/klauspost/compress v1.16.6 github.com/lestrrat-go/jwx/v2 v2.0.8 + github.com/mitchellh/hashstructure/v2 v2.0.2 github.com/ulikunitz/xz v0.5.11 go.uber.org/goleak v1.3.0 modernc.org/sqlite v1.20.4 diff --git a/go.sum b/go.sum index 371bd48..38182ff 100644 --- a/go.sum +++ b/go.sum @@ -37,8 +37,6 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= -github.com/Bornholm/bigcache v0.0.0-20231201111725-1ddf51584cad h1:PTOf0L/YjiVis5LYzJmi7WqttJ/h/DU6h06aJ24Kpbg= -github.com/Bornholm/bigcache v0.0.0-20231201111725-1ddf51584cad/go.mod h1:+q+mA6jGsjfsZ2HzhVSk38qDbX2/ZBJ7Yyciv75Ruo0= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/GeertJohan/go.incremental v1.0.0/go.mod h1:6fAjUhbVuX1KcMD3c8TEgVUqmo4seqhv0i0kdATSkM0= @@ -208,6 +206,8 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1: github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/igm/sockjs-go/v3 v3.0.2 h1:2m0k53w0DBiGozeQUIEPR6snZFmpFpYvVsGnfLPNXbE= github.com/igm/sockjs-go/v3 v3.0.2/go.mod h1:UqchsOjeagIBFHvd+RZpLaVRbCwGilEC08EDHsD1jYE= +github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf h1:FtEj8sfIcaaBfAKrE1Cwb61YDtYq9JxChK1c7AKce7s= +github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf/go.mod h1:yrqSXGoD/4EKfF26AOGzscPOgTTJcyAwM2rpixWT+t4= github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= @@ -258,6 +258,8 @@ github.com/miekg/dns v0.0.0-20161006100029-fc4e1e2843d8/go.mod h1:W1PPwlIAgtquWB github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI= github.com/miekg/dns v1.1.53 h1:ZBkuHr5dxHtB1caEOlZTLPo7D3L3TWckgUUs/RHfDxw= github.com/miekg/dns v1.1.53/go.mod h1:uInx36IzPl7FYnDcMeVWxj9byh7DutNykX4G9Sj60FY= +github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4= +github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= diff --git a/pkg/storage/driver/cache/blob_bucket.go b/pkg/storage/driver/cache/blob_bucket.go index 3e3981d..6f6b4fc 100644 --- a/pkg/storage/driver/cache/blob_bucket.go +++ b/pkg/storage/driver/cache/blob_bucket.go @@ -6,17 +6,16 @@ import ( "io" "forge.cadoles.com/arcad/edge/pkg/storage" - "github.com/allegro/bigcache/v3" - "github.com/hashicorp/golang-lru/v2/expirable" + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu" "github.com/pkg/errors" "gitlab.com/wpetit/goweb/logger" ) type BlobBucket struct { bucket storage.BlobBucket - contentCache *bigcache.BigCache - blobInfoCache *expirable.LRU[string, storage.BlobInfo] - bucketCache *expirable.LRU[string, storage.BlobBucket] + blobCache *lfu.Cache[string, []byte] + bucketCache *lfu.Cache[string, storage.BlobBucket] + blobInfoCache *lfu.Cache[string, storage.BlobInfo] } // Close implements storage.BlobBucket. @@ -39,7 +38,16 @@ func (b *BlobBucket) Delete(ctx context.Context, id storage.BlobID) error { // Get implements storage.BlobBucket. func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobInfo, error) { key := b.getCacheKey(id) - if blobInfo, ok := b.blobInfoCache.Get(key); ok { + blobInfo, err := b.blobInfoCache.Get(key) + if err != nil && !errors.Is(err, lfu.ErrNotFound) { + logger.Error( + ctx, "could not retrieve blob info from cache", + logger.F("cacheKey", key), + logger.CapturedE(errors.WithStack(err)), + ) + } + + if blobInfo != nil { logger.Debug( ctx, "found blob info in cache", logger.F("cacheKey", key), @@ -52,13 +60,25 @@ func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobIn if err != nil { if errors.Is(err, storage.ErrBucketClosed) { b.clearCache(ctx, id) - b.bucketCache.Remove(b.Name()) + if err := b.bucketCache.Delete(b.Name()); err != nil && !errors.Is(err, lfu.ErrNotFound) { + logger.Error( + ctx, "could not delete bucket from cache", + logger.F("cacheKey", b.Name()), + logger.CapturedE(errors.WithStack(err)), + ) + } } return nil, errors.WithStack(err) } - b.blobInfoCache.Add(key, info) + if err := b.blobInfoCache.Set(key, info); err != nil { + logger.Error( + ctx, "could not set blob info in cache", + logger.F("cacheKey", key), + logger.CapturedE(errors.WithStack(err)), + ) + } return info, nil } @@ -68,7 +88,13 @@ func (b *BlobBucket) List(ctx context.Context) ([]storage.BlobInfo, error) { infos, err := b.bucket.List(ctx) if err != nil { if errors.Is(err, storage.ErrBucketClosed) { - b.bucketCache.Remove(b.Name()) + if err := b.bucketCache.Delete(b.Name()); err != nil && !errors.Is(err, lfu.ErrNotFound) { + logger.Error( + ctx, "could not delete bucket from cache", + logger.F("cacheKey", b.Name()), + logger.CapturedE(errors.WithStack(err)), + ) + } } return nil, errors.WithStack(err) @@ -76,7 +102,13 @@ func (b *BlobBucket) List(ctx context.Context) ([]storage.BlobInfo, error) { for _, ifo := range infos { key := b.getCacheKey(ifo.ID()) - b.blobInfoCache.Add(key, ifo) + if err := b.blobInfoCache.Set(key, ifo); err != nil { + logger.Error( + ctx, "could not set blob info in cache", + logger.F("cacheKey", key), + logger.CapturedE(errors.WithStack(err)), + ) + } } return infos, nil @@ -93,7 +125,6 @@ func (b *BlobBucket) NewReader(ctx context.Context, id storage.BlobID) (io.ReadS logger.Debug( ctx, "found blob content in cache", logger.F("cacheKey", b.getCacheKey(id)), - logger.F("cacheStats", b.contentCache.Stats()), ) return cached, nil } @@ -102,7 +133,13 @@ func (b *BlobBucket) NewReader(ctx context.Context, id storage.BlobID) (io.ReadS if err != nil { if errors.Is(err, storage.ErrBucketClosed) { b.clearCache(ctx, id) - b.bucketCache.Remove(b.Name()) + if err := b.bucketCache.Delete(b.Name()); err != nil && !errors.Is(err, lfu.ErrNotFound) { + logger.Error( + ctx, "could not delete bucket from cache", + logger.F("cacheKey", b.Name()), + logger.CapturedE(errors.WithStack(err)), + ) + } } return nil, errors.WithStack(err) @@ -110,7 +147,7 @@ func (b *BlobBucket) NewReader(ctx context.Context, id storage.BlobID) (io.ReadS return &readCacher{ reader: reader, - cache: b.contentCache, + cache: b.blobCache, key: b.getCacheKey(id), }, nil } @@ -121,13 +158,13 @@ func (b *BlobBucket) getCacheKey(id storage.BlobID) string { func (b *BlobBucket) inContentCache(id storage.BlobID) (io.ReadSeekCloser, bool) { key := b.getCacheKey(id) - data, err := b.contentCache.Get(key) + data, err := b.blobCache.Get(key) if err != nil { - if errors.Is(err, bigcache.ErrEntryNotFound) { + if errors.Is(err, lfu.ErrNotFound) { return nil, false } - logger.Error(context.Background(), "could not retrieve cache value", logger.CapturedE(errors.WithStack(err))) + logger.Error(context.Background(), "could not retrieve cached value", logger.CapturedE(errors.WithStack(err))) return nil, false } @@ -140,11 +177,17 @@ func (b *BlobBucket) clearCache(ctx context.Context, id storage.BlobID) { logger.Debug(ctx, "clearing cache", logger.F("cacheKey", key)) - if err := b.contentCache.Delete(key); err != nil && !errors.Is(err, bigcache.ErrEntryNotFound) { - logger.Error(ctx, "could not clear cache", logger.CapturedE(errors.WithStack(err))) + if err := b.blobCache.Delete(key); err != nil && !errors.Is(err, lfu.ErrNotFound) { + logger.Error(ctx, "could not clear cache", logger.F("cacheKey", key), logger.CapturedE(errors.WithStack(err))) } - b.blobInfoCache.Remove(key) + if err := b.blobInfoCache.Delete(key); err != nil { + logger.Error( + ctx, "could not delete blob info from cache", + logger.F("cacheKey", key), + logger.CapturedE(errors.WithStack(err)), + ) + } } // NewWriter implements storage.BlobBucket. @@ -154,7 +197,13 @@ func (b *BlobBucket) NewWriter(ctx context.Context, id storage.BlobID) (io.Write writer, err := b.bucket.NewWriter(ctx, id) if err != nil { if errors.Is(err, storage.ErrBucketClosed) { - b.bucketCache.Remove(b.Name()) + if err := b.bucketCache.Delete(b.Name()); err != nil && !errors.Is(err, lfu.ErrNotFound) { + logger.Error( + ctx, "could not delete bucket from cache", + logger.F("cacheKey", b.Name()), + logger.CapturedE(errors.WithStack(err)), + ) + } } return nil, errors.WithStack(err) @@ -168,7 +217,13 @@ func (b *BlobBucket) Size(ctx context.Context) (int64, error) { size, err := b.bucket.Size(ctx) if err != nil { if errors.Is(err, storage.ErrBucketClosed) { - b.bucketCache.Remove(b.Name()) + if err := b.bucketCache.Delete(b.Name()); err != nil && !errors.Is(err, lfu.ErrNotFound) { + logger.Error( + ctx, "could not delete bucket from cache", + logger.F("cacheKey", b.Name()), + logger.CapturedE(errors.WithStack(err)), + ) + } } return 0, errors.WithStack(err) diff --git a/pkg/storage/driver/cache/blob_store.go b/pkg/storage/driver/cache/blob_store.go index a6bdf9f..31b150b 100644 --- a/pkg/storage/driver/cache/blob_store.go +++ b/pkg/storage/driver/cache/blob_store.go @@ -4,17 +4,16 @@ import ( "context" "forge.cadoles.com/arcad/edge/pkg/storage" - "github.com/allegro/bigcache/v3" - "github.com/hashicorp/golang-lru/v2/expirable" + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu" "github.com/pkg/errors" "gitlab.com/wpetit/goweb/logger" ) type BlobStore struct { store storage.BlobStore - contentCache *bigcache.BigCache - bucketCache *expirable.LRU[string, storage.BlobBucket] - blobInfoCache *expirable.LRU[string, storage.BlobInfo] + blobCache *lfu.Cache[string, []byte] + bucketCache *lfu.Cache[string, storage.BlobBucket] + blobInfoCache *lfu.Cache[string, storage.BlobInfo] } // DeleteBucket implements storage.BlobStore. @@ -23,7 +22,7 @@ func (s *BlobStore) DeleteBucket(ctx context.Context, name string) error { return errors.WithStack(err) } - s.bucketCache.Remove(name) + s.bucketCache.Delete(name) return nil } @@ -40,28 +39,40 @@ func (s *BlobStore) ListBuckets(ctx context.Context) ([]string, error) { // OpenBucket implements storage.BlobStore. func (s *BlobStore) OpenBucket(ctx context.Context, name string) (storage.BlobBucket, error) { - bucket, ok := s.bucketCache.Get(name) - if ok { + bucket, err := s.bucketCache.Get(name) + if err == nil { logger.Debug(ctx, "found bucket in cache", logger.F("name", name)) return &BlobBucket{ bucket: bucket, - contentCache: s.contentCache, + blobCache: s.blobCache, blobInfoCache: s.blobInfoCache, bucketCache: s.bucketCache, }, nil } - bucket, err := s.store.OpenBucket(ctx, name) + if err != nil && !errors.Is(err, lfu.ErrNotFound) { + logger.Error(ctx, "could not retrieve bucket from cache", + logger.F("cacheKey", name), + logger.CapturedE(errors.WithStack(err)), + ) + } + + bucket, err = s.store.OpenBucket(ctx, name) if err != nil { return nil, errors.WithStack(err) } - s.bucketCache.Add(name, bucket) + if err := s.bucketCache.Set(name, bucket); err != nil { + logger.Error(ctx, "could not set bucket in cache", + logger.F("cacheKey", name), + logger.CapturedE(errors.WithStack(err)), + ) + } return &BlobBucket{ bucket: bucket, - contentCache: s.contentCache, + blobCache: s.blobCache, blobInfoCache: s.blobInfoCache, bucketCache: s.bucketCache, }, nil @@ -70,27 +81,36 @@ func (s *BlobStore) OpenBucket(ctx context.Context, name string) (storage.BlobBu func NewBlobStore(store storage.BlobStore, funcs ...OptionFunc) (*BlobStore, error) { options := NewOptions(funcs...) - contentCache, err := bigcache.New(context.Background(), options.BigCache) - if err != nil { - return nil, errors.WithStack(err) - } + blobCache := lfu.NewCache[string, []byte]( + options.BlobCacheStore, + lfu.WithTTL[string, []byte](options.CacheTTL), + lfu.WithCapacity[string, []byte](options.BlobCacheSize), + lfu.WithGetValueSize[string, []byte](func(value []byte) (int, error) { + return len(value), nil + }), + ) - onBlobBucketEvict := func(key string, bucket storage.BlobBucket) { - ctx := context.Background() - logger.Debug(ctx, "evicting blob bucket from cache", logger.F("cacheKey", key)) + blobBucketCache := lfu.NewCache[string, storage.BlobBucket]( + options.BlobBucketCacheStore, + lfu.WithCapacity[string, storage.BlobBucket](options.BlobBucketCacheSize), + lfu.WithGetValueSize[string, storage.BlobBucket](func(value storage.BlobBucket) (int, error) { + return 1, nil + }), + ) - if err := bucket.Close(); err != nil { - logger.Error(ctx, "could not close bucket", logger.E(errors.WithStack(err))) - } - } - - bucketCache := expirable.NewLRU[string, storage.BlobBucket](options.BucketCacheSize, onBlobBucketEvict, options.CacheTTL) - blobInfoCache := expirable.NewLRU[string, storage.BlobInfo](options.BlobInfoCacheSize, nil, options.CacheTTL) + blobInfoCache := lfu.NewCache[string, storage.BlobInfo]( + options.BlobInfoCacheStore, + lfu.WithTTL[string, storage.BlobInfo](options.CacheTTL), + lfu.WithCapacity[string, storage.BlobInfo](options.BlobInfoCacheSize), + lfu.WithGetValueSize[string, storage.BlobInfo](func(value storage.BlobInfo) (int, error) { + return 1, nil + }), + ) return &BlobStore{ store: store, - contentCache: contentCache, - bucketCache: bucketCache, + blobCache: blobCache, + bucketCache: blobBucketCache, blobInfoCache: blobInfoCache, }, nil } diff --git a/pkg/storage/driver/cache/driver.go b/pkg/storage/driver/cache/driver.go index a09b69c..5b2d2c1 100644 --- a/pkg/storage/driver/cache/driver.go +++ b/pkg/storage/driver/cache/driver.go @@ -1,17 +1,19 @@ package cache import ( - "context" - "fmt" + "bytes" + "io" "net/url" "strconv" "time" "forge.cadoles.com/arcad/edge/pkg/storage" "forge.cadoles.com/arcad/edge/pkg/storage/driver" - "github.com/allegro/bigcache/v3" + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu" + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu/fs" + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu/memory" + "github.com/inhies/go-bytesize" "github.com/pkg/errors" - "gitlab.com/wpetit/goweb/logger" ) func init() { @@ -41,13 +43,6 @@ func blobStoreFactory(dsn *url.URL) (storage.BlobStore, error) { blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithCacheTTL(cacheTTL)) - cacheConfig, err := parseBigCacheConfig(&query, cacheTTL) - if err != nil { - return nil, errors.Wrap(err, "could not parse big cache config") - } - - blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBigCacheConfig(*cacheConfig)) - blobBucketCacheSize, err := parseInt(&query, "blobBucketCacheSize") if err != nil { if !errors.Is(err, errNotFound) { @@ -57,7 +52,15 @@ func blobStoreFactory(dsn *url.URL) (storage.BlobStore, error) { blobBucketCacheSize = 16 } - blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBucketCacheSize(int(blobBucketCacheSize))) + blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBlobBucketCacheSize(int(blobBucketCacheSize))) + + blobBucketCacheStorePrefix := "blobBucketCacheStore" + blobBucketCacheStore, err := parseCacheStore[string, storage.BlobBucket](&query, blobBucketCacheStorePrefix) + if err != nil { + return nil, errors.WithStack(err) + } + + blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBlobBucketCacheStore(blobBucketCacheStore)) bloInfoCacheSize, err := parseInt(&query, "bloInfoCacheSize") if err != nil { @@ -70,6 +73,46 @@ func blobStoreFactory(dsn *url.URL) (storage.BlobStore, error) { blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBlobInfoCacheSize(int(bloInfoCacheSize))) + blobInfoCacheStorePrefix := "blobInfoCacheStore" + blobInfoCacheStore, err := parseCacheStore[string, storage.BlobInfo](&query, blobInfoCacheStorePrefix) + if err != nil { + return nil, errors.WithStack(err) + } + + blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBlobInfoCacheStore(blobInfoCacheStore)) + + blobCacheSize, err := parseByteSize(&query, "blobCacheSize") + if err != nil { + if !errors.Is(err, errNotFound) { + return nil, errors.WithStack(err) + } + + blobCacheSize = 256e+6 + } + + blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBlobCacheSize(int(blobCacheSize))) + + blobCacheStorePrefix := "blobCacheStore" + blobCacheStore, err := parseCacheStore[string, []byte]( + &query, blobCacheStorePrefix, + fs.WithMarshalValue[string, []byte](func(value []byte) (io.Reader, error) { + return bytes.NewBuffer(value), nil + }), + fs.WithUnmarshalValue[string, []byte](func(r io.Reader) ([]byte, error) { + data, err := io.ReadAll(r) + if err != nil { + return nil, errors.WithStack(err) + } + + return data, nil + }), + ) + if err != nil { + return nil, errors.WithStack(err) + } + + blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBlobCacheStore(blobCacheStore)) + url := &url.URL{ Scheme: rawDriver, Host: dsn.Host, @@ -90,89 +133,34 @@ func blobStoreFactory(dsn *url.URL) (storage.BlobStore, error) { return store, nil } -type cacheLogger struct{} - -func (l *cacheLogger) Printf(format string, v ...interface{}) { - logger.Debug(context.Background(), fmt.Sprintf(format, v...)) -} - -var _ bigcache.Logger = &cacheLogger{} - -func parseBigCacheConfig(query *url.Values, cacheTTL time.Duration) (*bigcache.Config, error) { - config := bigcache.DefaultConfig(cacheTTL) - config.Logger = &cacheLogger{} - - hardMaxCacheSize, err := parseInt(query, "bigCacheHardMaxCacheSize") - if err != nil { - if !errors.Is(err, errNotFound) { - return nil, errors.WithStack(err) - } - - hardMaxCacheSize = int64(config.HardMaxCacheSize) - } - - config.HardMaxCacheSize = int(hardMaxCacheSize) - - maxEntriesInWindow, err := parseInt(query, "bigCacheMaxEntriesInWindow") - if err != nil { - if !errors.Is(err, errNotFound) { - return nil, errors.WithStack(err) - } - - maxEntriesInWindow = int64(config.MaxEntriesInWindow) - } - - config.MaxEntriesInWindow = int(maxEntriesInWindow) - - shards, err := parseInt(query, "bigCacheShards") - if err != nil { - if !errors.Is(err, errNotFound) { - return nil, errors.WithStack(err) - } - - shards = int64(config.Shards) - } - - config.Shards = int(shards) - - maxEntrySize, err := parseInt(query, "bigCacheMaxEntrySize") - if err != nil { - if !errors.Is(err, errNotFound) { - return nil, errors.WithStack(err) - } - - maxEntrySize = int64(config.MaxEntrySize) - } - - config.MaxEntrySize = int(maxEntrySize) - - cleanWindow, err := parseDuration(query, "bigCacheCleanWindow") - if err != nil { - if !errors.Is(err, errNotFound) { - return nil, errors.WithStack(err) - } - - cleanWindow = config.CleanWindow - } - - config.CleanWindow = cleanWindow - - lifeWindow, err := parseDuration(query, "bigCacheLifeWindow") - if err != nil { - if !errors.Is(err, errNotFound) { - return nil, errors.WithStack(err) - } - - lifeWindow = config.LifeWindow - } - - config.LifeWindow = lifeWindow - - return &config, nil -} - var errNotFound = errors.New("not found") +func parseString(query *url.Values, name string) (string, error) { + value := query.Get(name) + if value != "" { + query.Del(name) + return value, nil + } + + return "", errors.WithStack(errNotFound) +} + +func parseByteSize(query *url.Values, name string) (bytesize.ByteSize, error) { + rawValue := query.Get(name) + if rawValue != "" { + query.Del(name) + + value, err := bytesize.Parse(rawValue) + if err != nil { + return 0, errors.Wrapf(err, "could not parse url parameter '%s'", name) + } + + return value, nil + } + + return 0, errors.WithStack(errNotFound) +} + func parseInt(query *url.Values, name string) (int64, error) { rawValue := query.Get(name) if rawValue != "" { @@ -204,3 +192,73 @@ func parseDuration(query *url.Values, name string) (time.Duration, error) { return 0, errors.WithStack(errNotFound) } + +const ( + storeTypeFS string = "fs" + storeTypeMemory string = "memory" +) + +func parseCacheStore[K comparable, V any](query *url.Values, prefix string, optionFuncs ...any) (lfu.Store[K, V], error) { + storeTypeParam := prefix + "Type" + storeType, err := parseString(query, storeTypeParam) + if err != nil { + if errors.Is(err, errNotFound) { + storeType = storeTypeMemory + } + } + + switch storeType { + case storeTypeFS: + store, err := parseFSCacheStore[K, V](query, prefix, optionFuncs...) + if err != nil { + return nil, errors.WithStack(err) + } + + return store, nil + + case storeTypeMemory: + store, err := parseMemoryCacheStore[K, V](query, prefix, optionFuncs...) + if err != nil { + return nil, errors.WithStack(err) + } + + return store, nil + } + + return nil, errors.Errorf("unexpected store type value '%s' for parameter '%s'", storeType, storeTypeParam) +} + +func parseFSCacheStore[K comparable, V any](query *url.Values, prefix string, optionFuncs ...any) (*fs.Store[K, V], error) { + baseDirParam := prefix + "BaseDir" + baseDir, err := parseString(query, baseDirParam) + if err != nil { + if errors.Is(err, errNotFound) { + return nil, errors.Wrapf(err, "missing required url parameter '%s'", baseDirParam) + } + + return nil, errors.WithStack(err) + } + + funcs := make([]fs.OptionsFunc[K, V], 0) + + for _, anyFn := range optionFuncs { + fn, ok := anyFn.(fs.OptionsFunc[K, V]) + if !ok { + continue + } + + funcs = append(funcs, fn) + } + + store := fs.NewStore[K, V](baseDir, funcs...) + + if err := store.Clear(); err != nil { + return nil, errors.WithStack(err) + } + + return store, nil +} + +func parseMemoryCacheStore[K comparable, V any](query *url.Values, prefix string, optionFuncs ...any) (*memory.Store[K, V], error) { + return memory.NewStore[K, V](), nil +} diff --git a/pkg/storage/driver/cache/lfu/cache.go b/pkg/storage/driver/cache/lfu/cache.go new file mode 100644 index 0000000..d5ad705 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/cache.go @@ -0,0 +1,349 @@ +package lfu + +import ( + "slices" + "sync/atomic" + "time" + + "github.com/pkg/errors" +) + +var ( + ErrNotFound = errors.New("not found") + ErrSizeExceedCapacity = errors.New("size exceed capacity") + errExpired = errors.New("expired") +) + +type Cache[K comparable, V any] struct { + index *Map[K, *cacheItem[K, V]] + freqs *List[*frequencyItem[K, V]] + + size atomic.Int32 + + capacity int + store Store[K, V] + getValueSize GetValueSizeFunc[V] + sync *Synchronizer[K] + + log LogFunc + ttl time.Duration +} + +type cacheItem[K any, V any] struct { + key K + size int + time atomic.Int64 + frequencyParent *Element[*frequencyItem[K, V]] +} + +func (i *cacheItem[K, V]) Expired(ttl time.Duration) bool { + if ttl == 0 { + return false + } + + itemTime := time.Unix(i.time.Load(), 0) + + // If item has expired, mark it as not found + return itemTime.Add(ttl).Before(time.Now()) +} + +func (i *cacheItem[K, V]) Refresh() { + i.time.Store(time.Now().Unix()) +} + +func newCacheItem[K any, V any](key K, size int) *cacheItem[K, V] { + item := &cacheItem[K, V]{ + key: key, + size: size, + } + item.time.Store(time.Now().Unix()) + return item +} + +type frequencyItem[K any, V any] struct { + entries *Map[*cacheItem[K, V], struct{}] + freq int +} + +func newFrequencyItem[K any, V any]() *frequencyItem[K, V] { + frequencyItem := &frequencyItem[K, V]{} + frequencyItem.entries = NewMap[*cacheItem[K, V], struct{}]() + return frequencyItem +} + +func (c *Cache[K, V]) Set(key K, value V) error { + newItemSize, err := c.getValueSize(value) + if err != nil { + return errors.WithStack(err) + } + + c.log("setting '%v' (size: %d)", key, newItemSize) + + if newItemSize > int(c.capacity) { + return errors.Wrapf(ErrSizeExceedCapacity, "item size '%d' exceed cache total capacity of '%v'", newItemSize, c.capacity) + } + + var sizeDelta int + + err = c.sync.WriteTx(key, func() error { + if err := c.store.Set(key, value); err != nil { + return errors.WithStack(err) + } + + item, ok := c.index.Get(key) + if ok { + oldItemSize := item.size + sizeDelta = -int(oldItemSize) + newItemSize + item.Refresh() + } else { + item = newCacheItem[K, V](key, newItemSize) + c.index.Set(key, item) + sizeDelta = newItemSize + } + + c.size.Add(int32(sizeDelta)) + c.increment(item) + + return nil + }) + if err != nil { + return errors.WithStack(err) + } + + // Eviction, if needed + if err := c.Evict(key); err != nil { + return errors.WithStack(err) + } + + return nil +} + +func (c *Cache[K, V]) Get(key K) (V, error) { + var value V + err := c.sync.ReadTx(key, func(upgrade func(func())) error { + c.log("getting '%v'", key) + + e, ok := c.index.Get(key) + if !ok { + return errors.WithStack(ErrNotFound) + } + + if e.Expired(c.ttl) { + return errors.WithStack(errExpired) + } + + v, err := c.store.Get(key) + if err != nil { + return errors.WithStack(err) + } + + upgrade(func() { + c.increment(e) + }) + + value = v + + return nil + }) + if err != nil { + if errors.Is(err, errExpired) { + if err := c.Delete(key); err != nil { + return *new(V), errors.WithStack(err) + } + + return *new(V), errors.WithStack(ErrNotFound) + } + + return *new(V), errors.WithStack(err) + } + + return value, nil +} + +func (c *Cache[K, V]) Delete(key K) error { + err := c.sync.WriteTx(key, func() error { + c.log("deleting '%v'", key) + + item, exists := c.index.Get(key) + if !exists { + return errors.WithStack(ErrNotFound) + } + + if err := c.store.Delete(key); err != nil { + return errors.WithStack(err) + } + + c.size.Add(-int32(item.size)) + + c.remove(item.frequencyParent, item) + c.index.Delete(key) + + return nil + }) + if err != nil { + return errors.WithStack(err) + } + + return nil +} + +func (c *Cache[K, V]) Evict(skipped ...K) error { + exceed, delta := c.atCapacity() + if exceed && delta > 0 { + if err := c.evict(delta, skipped...); err != nil { + return errors.WithStack(err) + } + } + + return nil +} + +func (c *Cache[K, V]) Len() int { + return c.index.Len() +} + +func (c *Cache[K, V]) Size() int { + return int(c.size.Load()) +} + +func (c *Cache[K, V]) Capacity() int { + return c.capacity +} + +func (c *Cache[K, V]) increment(item *cacheItem[K, V]) { + currentFrequencyElement := item.frequencyParent + var nextFrequencyAmount int + var nextFrequencyElement *Element[*frequencyItem[K, V]] + + if currentFrequencyElement == nil { + nextFrequencyAmount = 1 + nextFrequencyElement = c.freqs.First() + } else { + atomicFrequencyItem := c.freqs.Value(currentFrequencyElement) + nextFrequencyAmount = atomicFrequencyItem.freq + 1 + nextFrequencyElement = c.freqs.Next(currentFrequencyElement) + } + + var nextFrequency *frequencyItem[K, V] + if nextFrequencyElement != nil { + nextFrequency = c.freqs.Value(nextFrequencyElement) + } + + if nextFrequencyElement == nil || nextFrequency == nil || nextFrequency.freq != nextFrequencyAmount { + newFrequencyItem := newFrequencyItem[K, V]() + newFrequencyItem.freq = nextFrequencyAmount + + if currentFrequencyElement == nil { + nextFrequencyElement = c.freqs.PushFront(newFrequencyItem) + } else { + nextFrequencyElement = c.freqs.InsertValueAfter(newFrequencyItem, currentFrequencyElement) + } + } + + item.frequencyParent = nextFrequencyElement + + nextFrequency = c.freqs.Value(nextFrequencyElement) + nextFrequency.entries.Set(item, struct{}{}) + + if currentFrequencyElement != nil { + c.remove(currentFrequencyElement, item) + } +} + +func (c *Cache[K, V]) remove(listItem *Element[*frequencyItem[K, V]], item *cacheItem[K, V]) { + entries := c.freqs.Value(listItem).entries + + entries.Delete(item) +} + +func (c *Cache[K, V]) atCapacity() (bool, int) { + size, capacity := c.Size(), c.Capacity() + c.log("cache stats: %d/%d", size, capacity) + return size >= capacity, size - capacity +} + +func (c *Cache[K, V]) evict(total int, skipped ...K) error { + if total == 0 { + return nil + } + + frequencyElement := c.freqs.First() + if frequencyElement == nil { + c.log("no frequency element") + return nil + } + + for evicted := 0; evicted < total; { + c.log("running eviction: [to_evict:%d, evicted: %d]", total, evicted) + + c.log("first frequency element %p", frequencyElement) + + frequencyItem := c.freqs.Value(frequencyElement) + if frequencyItem == nil { + return nil + } + + entries := frequencyItem.entries + + if entries.Len() == 0 { + c.log("no frequency entries") + frequencyElement = c.freqs.Next(frequencyElement) + continue + } + + var rangeErr error + entries.Range(func(key, v any) bool { + if evicted >= total { + c.log("evicted enough (%d >= %d), stopping", evicted, total) + return false + } + + entry, _ := key.(*cacheItem[K, V]) + + if slices.Contains(skipped, entry.key) { + c.log("skipping key '%v'", entry.key) + return true + } + + if err := c.Delete(entry.key); err != nil { + if errors.Is(err, ErrNotFound) { + c.log("key '%s' not found", entry.key) + // Cleanup obsolete frequency + c.remove(frequencyElement, entry) + return true + } + + rangeErr = errors.WithStack(err) + return false + } + + c.log("evicted key '%v' (size: %d)", entry.key, entry.size) + + evicted += int(entry.size) + + return true + }) + if rangeErr != nil { + return errors.WithStack(rangeErr) + } + } + + return nil +} + +func NewCache[K comparable, V any](store Store[K, V], funcs ...OptionsFunc[K, V]) *Cache[K, V] { + opts := DefaultOptions[K, V](funcs...) + + cache := &Cache[K, V]{ + index: NewMap[K, *cacheItem[K, V]](), + freqs: NewList[*frequencyItem[K, V]](), + capacity: opts.Capacity, + store: store, + getValueSize: opts.GetValueSize, + sync: NewSynchronizer[K](), + log: opts.Log, + ttl: opts.TTL, + } + + return cache +} diff --git a/pkg/storage/driver/cache/lfu/fs/cache_test.go b/pkg/storage/driver/cache/lfu/fs/cache_test.go new file mode 100644 index 0000000..bbbd7c3 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/fs/cache_test.go @@ -0,0 +1,37 @@ +package fs + +import ( + "bytes" + "io" + "path/filepath" + "testing" + + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu" + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu/testsuite" + "github.com/pkg/errors" +) + +func TestCacheWithFSStore(t *testing.T) { + testsuite.TestCacheWithStore(t, func(testName string) lfu.Store[string, string] { + dir := filepath.Join("testdata", "testsuite", testName) + store := NewStore[string, string](dir, + WithMarshalValue[string, string](func(value string) (io.Reader, error) { + return bytes.NewBuffer([]byte(value)), nil + }), + WithUnmarshalValue[string, string](func(r io.Reader) (string, error) { + data, err := io.ReadAll(r) + if err != nil { + return "", errors.WithStack(err) + } + + return string(data), nil + }), + ) + + if err := store.Clear(); err != nil { + panic(errors.WithStack(err)) + } + + return store + }) +} diff --git a/pkg/storage/driver/cache/lfu/fs/hash.go b/pkg/storage/driver/cache/lfu/fs/hash.go new file mode 100644 index 0000000..3aefe55 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/fs/hash.go @@ -0,0 +1,19 @@ +package fs + +import ( + "strconv" + + "github.com/mitchellh/hashstructure/v2" + "github.com/pkg/errors" +) + +func DefaultGetPath[K comparable](key K) ([]string, error) { + uintHash, err := hashstructure.Hash(key, hashstructure.FormatV2, nil) + if err != nil { + return nil, errors.WithStack(err) + } + + hash := strconv.FormatUint(uintHash, 16) + + return []string{hash}, nil +} diff --git a/pkg/storage/driver/cache/lfu/fs/marshal.go b/pkg/storage/driver/cache/lfu/fs/marshal.go new file mode 100644 index 0000000..09a4eb6 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/fs/marshal.go @@ -0,0 +1,31 @@ +package fs + +import ( + "bytes" + "encoding/gob" + "io" + + "github.com/pkg/errors" +) + +func DefaultMarshalValue[V any](value V) (io.Reader, error) { + var buf bytes.Buffer + encoder := gob.NewEncoder(&buf) + + if err := encoder.Encode(value); err != nil { + return nil, errors.WithStack(err) + } + + return &buf, nil +} + +func DefaultUnmarshalValue[V any](d io.Reader) (V, error) { + var value V + encoder := gob.NewDecoder(d) + + if err := encoder.Decode(&value); err != nil { + return *new(V), errors.WithStack(err) + } + + return value, nil +} diff --git a/pkg/storage/driver/cache/lfu/fs/options.go b/pkg/storage/driver/cache/lfu/fs/options.go new file mode 100644 index 0000000..458f63a --- /dev/null +++ b/pkg/storage/driver/cache/lfu/fs/options.go @@ -0,0 +1,45 @@ +package fs + +import "io" + +type GetPathFunc[K comparable] func(key K) ([]string, error) +type MarshalValueFunc[V any] func(value V) (io.Reader, error) +type UnmarshalValueFunc[V any] func(r io.Reader) (V, error) + +type Options[K comparable, V any] struct { + GetPath GetPathFunc[K] + MarshalValue MarshalValueFunc[V] + UnmarshalValue UnmarshalValueFunc[V] +} + +type OptionsFunc[K comparable, V any] func(opts *Options[K, V]) + +func DefaultOptions[K comparable, V any](funcs ...OptionsFunc[K, V]) *Options[K, V] { + opts := &Options[K, V]{ + GetPath: DefaultGetPath[K], + MarshalValue: DefaultMarshalValue[V], + UnmarshalValue: DefaultUnmarshalValue[V], + } + for _, fn := range funcs { + fn(opts) + } + return opts +} + +func WithGetPath[K comparable, V any](getKeyHash GetPathFunc[K]) OptionsFunc[K, V] { + return func(opts *Options[K, V]) { + opts.GetPath = getKeyHash + } +} + +func WithMarshalValue[K comparable, V any](marshalValue MarshalValueFunc[V]) OptionsFunc[K, V] { + return func(opts *Options[K, V]) { + opts.MarshalValue = marshalValue + } +} + +func WithUnmarshalValue[K comparable, V any](unmarshalValue UnmarshalValueFunc[V]) OptionsFunc[K, V] { + return func(opts *Options[K, V]) { + opts.UnmarshalValue = unmarshalValue + } +} diff --git a/pkg/storage/driver/cache/lfu/fs/store.go b/pkg/storage/driver/cache/lfu/fs/store.go new file mode 100644 index 0000000..0751020 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/fs/store.go @@ -0,0 +1,165 @@ +package fs + +import ( + "fmt" + "io" + "os" + "path/filepath" + + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu" + "github.com/pkg/errors" +) + +type Store[K comparable, V any] struct { + baseDir string + getPath GetPathFunc[K] + marshalValue MarshalValueFunc[V] + unmarshalValue UnmarshalValueFunc[V] +} + +// Delete implements Store. +func (s *Store[K, V]) Delete(key K) error { + path, err := s.getEntryPath(key) + if err != nil { + return errors.WithStack(err) + } + + if err := os.Remove(path); err != nil && !errors.Is(err, os.ErrNotExist) { + return errors.WithStack(err) + } + + return nil +} + +// Get implements Store. +func (s *Store[K, V]) Get(key K) (V, error) { + path, err := s.getEntryPath(key) + if err != nil { + return *new(V), errors.WithStack(err) + } + + value, err := s.readValue(path) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return *new(V), errors.WithStack(lfu.ErrNotFound) + } + + return *new(V), errors.WithStack(err) + } + + return value, nil +} + +// Set implements Store. +func (s *Store[K, V]) Set(key K, value V) error { + path, err := s.getEntryPath(key) + if err != nil { + return errors.WithStack(err) + } + + if err := s.writeValue(path, value); err != nil { + return errors.WithStack(err) + } + + return nil +} + +func (s *Store[K, V]) Clear() error { + if err := os.RemoveAll(s.baseDir); err != nil { + return errors.WithStack(err) + } + + return nil +} + +func (s *Store[K, V]) getEntryPath(k K) (string, error) { + path, err := s.getPath(k) + if err != nil { + return "", errors.WithStack(err) + } + + path = append([]string{s.baseDir}, path...) + return filepath.Join(path...), nil +} + +func (s *Store[K, V]) writeValue(path string, value V) error { + fi, err := os.Stat(path) + if err == nil && !fi.Mode().IsRegular() { + return fmt.Errorf("%s already exists and is not a regular file", path) + } + + dir := filepath.Dir(path) + + if err := os.MkdirAll(dir, 0750); err != nil { + return errors.WithStack(err) + } + + f, err := os.CreateTemp(dir, filepath.Base(path)+".tmp") + if err != nil { + return errors.WithStack(err) + } + + tmpName := f.Name() + defer func() { + if err != nil { + f.Close() + os.Remove(tmpName) + } + }() + + reader, err := s.marshalValue(value) + if err != nil { + return errors.WithStack(err) + } + + if _, err := io.Copy(f, reader); err != nil { + return errors.WithStack(err) + } + + if err := f.Sync(); err != nil { + return errors.WithStack(err) + } + + if err := f.Close(); err != nil { + return errors.WithStack(err) + } + + if err := os.Rename(tmpName, path); err != nil { + return errors.WithStack(err) + } + + return nil +} + +func (s *Store[K, V]) readValue(path string) (V, error) { + file, err := os.Open(path) + if err != nil { + return *new(V), errors.WithStack(err) + } + + defer func() { + if err := file.Close(); err != nil && !errors.Is(err, os.ErrClosed) { + panic(errors.WithStack(err)) + } + }() + + value, err := s.unmarshalValue(file) + + if err != nil { + return *new(V), errors.WithStack(err) + } + + return value, nil +} + +func NewStore[K comparable, V any](baseDir string, funcs ...OptionsFunc[K, V]) *Store[K, V] { + opts := DefaultOptions[K, V](funcs...) + return &Store[K, V]{ + baseDir: baseDir, + getPath: opts.GetPath, + unmarshalValue: opts.UnmarshalValue, + marshalValue: opts.MarshalValue, + } +} + +var _ lfu.Store[string, int] = &Store[string, int]{} diff --git a/pkg/storage/driver/cache/lfu/fs/testdata/.gitignore b/pkg/storage/driver/cache/lfu/fs/testdata/.gitignore new file mode 100644 index 0000000..c96a04f --- /dev/null +++ b/pkg/storage/driver/cache/lfu/fs/testdata/.gitignore @@ -0,0 +1,2 @@ +* +!.gitignore \ No newline at end of file diff --git a/pkg/storage/driver/cache/lfu/list.go b/pkg/storage/driver/cache/lfu/list.go new file mode 100644 index 0000000..f8e6e60 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/list.go @@ -0,0 +1,203 @@ +package lfu + +import ( + "sync/atomic" +) + +type List[T any] struct { + root *Element[T] + len atomic.Int32 + sync *Synchronizer[*Element[T]] +} + +func (l *List[T]) First() *Element[T] { + if l.Len() == 0 { + return nil + } + + var next *Element[T] + l.sync.ReadTx(l.root, func(upgrade func(func())) error { + next = l.root.next + return nil + }) + + return next +} + +func (l *List[T]) Last() *Element[T] { + if l.Len() == 0 { + return nil + } + + var prev *Element[T] + l.sync.ReadTx(l.root, func(upgrade func(func())) error { + prev = l.root.prev + return nil + }) + + return prev +} + +func (l *List[T]) Prev(e *Element[T]) *Element[T] { + var prev *Element[T] + l.sync.ReadTx(e, func(upgrade func(func())) error { + prev = e.prev + return nil + }) + + return prev +} + +func (l *List[T]) Next(e *Element[T]) *Element[T] { + var next *Element[T] + l.sync.ReadTx(e, func(upgrade func(func())) error { + next = e.next + return nil + }) + + return next +} + +func (l *List[T]) Value(e *Element[T]) T { + var value T + l.sync.ReadTx(e, func(upgrade func(func())) error { + value = e.value + return nil + }) + + return value +} + +func (l *List[T]) PushFront(v T) *Element[T] { + return l.InsertValueAfter(v, l.root) +} + +func (l *List[T]) PushBack(v T) *Element[T] { + return l.InsertValueAfter(v, l.root) +} + +func (l *List[T]) Remove(e *Element[T]) { + l.remove(e) +} + +func (l *List[T]) Len() int { + return int(l.len.Load()) +} + +func (l *List[T]) insertAfter(e *Element[T], at *Element[T]) *Element[T] { + l.sync.ReadTx(e, func(upgrade func(fn func())) error { + var next *Element[T] + l.sync.ReadTx(at, func(upgrade func(func())) error { + next = at.next + return nil + }) + + upgrade(func() { + e.prev = at + e.next = next + e.list = l + }) + + if e.prev != nil { + l.sync.WriteTx(e.prev, func() error { + e.prev.next = e + return nil + }) + } + + if e.next != nil { + l.sync.WriteTx(e.next, func() error { + e.next.prev = e + return nil + }) + } + + return nil + }) + + l.len.Add(1) + + return e +} + +func (l *List[T]) InsertValueAfter(v T, at *Element[T]) *Element[T] { + e := NewElement[T](v) + return l.insertAfter(e, at) +} + +func (l *List[T]) remove(e *Element[T]) { + if e == nil && e == l.root { + return + } + + l.sync.ReadTx(e, func(upgrade func(fn func())) error { + if e.prev != nil { + if e.prev == e { + upgrade(func() { + e.prev.next = e.next + }) + } else { + l.sync.WriteTx(e.prev, func() error { + e.prev.next = e.next + return nil + }) + } + } + + if e.next != nil { + if e.next == e { + upgrade(func() { + e.next.prev = e.prev + }) + } else { + l.sync.WriteTx(e.next, func() error { + e.next.prev = e.prev + return nil + }) + } + } + + upgrade(func() { + e.next = nil + e.prev = nil + e.list = nil + }) + + return nil + }) + + l.sync.Remove(e) + l.len.Add(-1) +} + +func NewList[T any]() *List[T] { + root := NewElement(*new(T)) + root.next = root + root.prev = root + + list := &List[T]{ + sync: NewSynchronizer[*Element[T]](), + } + + root.list = list + list.root = root + + return list +} + +type Element[T any] struct { + prev *Element[T] + next *Element[T] + list *List[T] + value T +} + +func NewElement[T any](v T) *Element[T] { + element := &Element[T]{ + prev: nil, + next: nil, + list: nil, + value: v, + } + return element +} diff --git a/pkg/storage/driver/cache/lfu/map.go b/pkg/storage/driver/cache/lfu/map.go new file mode 100644 index 0000000..26afd21 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/map.go @@ -0,0 +1,67 @@ +package lfu + +import ( + "sync" + "sync/atomic" +) + +type Map[K comparable, V any] struct { + size atomic.Int32 + inner sync.Map +} + +func (m *Map[K, V]) Get(key K) (V, bool) { + raw, exists := m.inner.Load(key) + if !exists { + return *new(V), false + } + + value, ok := raw.(V) + if !ok { + return *new(V), false + } + + return value, true +} + +func (m *Map[K, V]) GetOrSet(key K, defaultValue V) (V, bool) { + raw, loaded := m.inner.LoadOrStore(key, defaultValue) + if !loaded { + m.size.Add(1) + } + + value, ok := raw.(V) + if !ok { + return *new(V), loaded + } + + return value, loaded +} + +func (m *Map[K, V]) Set(key K, value V) { + _, loaded := m.inner.Swap(key, value) + if !loaded { + m.size.Add(1) + } +} + +func (m *Map[K, V]) Delete(key K) { + _, existed := m.inner.LoadAndDelete(key) + if existed { + m.size.Add(-1) + } +} + +func (m *Map[K, V]) Range(fn func(key, value any) bool) { + m.inner.Range(fn) +} + +func (m *Map[K, V]) Len() int { + return int(m.size.Load()) +} + +func NewMap[K comparable, V any]() *Map[K, V] { + return &Map[K, V]{ + inner: sync.Map{}, + } +} diff --git a/pkg/storage/driver/cache/lfu/memory/cache_test.go b/pkg/storage/driver/cache/lfu/memory/cache_test.go new file mode 100644 index 0000000..4465e41 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/memory/cache_test.go @@ -0,0 +1,14 @@ +package memory + +import ( + "testing" + + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu" + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu/testsuite" +) + +func TestCacheWithMemoryStore(t *testing.T) { + testsuite.TestCacheWithStore(t, func(testName string) lfu.Store[string, string] { + return NewStore[string, string]() + }) +} diff --git a/pkg/storage/driver/cache/lfu/memory/memory.go b/pkg/storage/driver/cache/lfu/memory/memory.go new file mode 100644 index 0000000..a6616f1 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/memory/memory.go @@ -0,0 +1,40 @@ +package memory + +import ( + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu" + "github.com/pkg/errors" +) + +type Store[K comparable, V any] struct { + index *lfu.Map[K, V] +} + +// Delete implements Store. +func (s *Store[K, V]) Delete(key K) error { + s.index.Delete(key) + return nil +} + +// Get implements Store. +func (s *Store[K, V]) Get(key K) (V, error) { + value, exists := s.index.Get(key) + if !exists { + return *new(V), errors.WithStack(lfu.ErrNotFound) + } + + return value, nil +} + +// Set implements Store. +func (s *Store[K, V]) Set(key K, value V) error { + s.index.Set(key, value) + return nil +} + +func NewStore[K comparable, V any]() *Store[K, V] { + return &Store[K, V]{ + index: lfu.NewMap[K, V](), + } +} + +var _ lfu.Store[string, int] = &Store[string, int]{} diff --git a/pkg/storage/driver/cache/lfu/options.go b/pkg/storage/driver/cache/lfu/options.go new file mode 100644 index 0000000..120fef9 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/options.go @@ -0,0 +1,57 @@ +package lfu + +import "time" + +type GetValueSizeFunc[V any] func(value V) (int, error) + +type LogFunc func(format string, values ...any) + +func DefaultLogFunc(format string, values ...any) { + +} + +type Options[K comparable, V any] struct { + GetValueSize GetValueSizeFunc[V] + Capacity int + Log LogFunc + TTL time.Duration +} + +type OptionsFunc[K comparable, V any] func(opts *Options[K, V]) + +func DefaultOptions[K comparable, V any](funcs ...OptionsFunc[K, V]) *Options[K, V] { + opts := &Options[K, V]{ + GetValueSize: DefaultGetValueSize[V], + Capacity: 100, + Log: DefaultLogFunc, + TTL: 0, + } + for _, fn := range funcs { + fn(opts) + } + return opts +} + +func WithCapacity[K comparable, V any](capacity int) OptionsFunc[K, V] { + return func(opts *Options[K, V]) { + opts.Capacity = capacity + } +} + +func WithGetValueSize[K comparable, V any](getValueSize GetValueSizeFunc[V]) OptionsFunc[K, V] { + return func(opts *Options[K, V]) { + opts.GetValueSize = getValueSize + } +} + +func WithLog[K comparable, V any](fn LogFunc) OptionsFunc[K, V] { + return func(opts *Options[K, V]) { + opts.Log = fn + } +} + +func WithTTL[K comparable, V any](ttl time.Duration) OptionsFunc[K, V] { + return func(opts *Options[K, V]) { + opts.TTL = ttl + } +} diff --git a/pkg/storage/driver/cache/lfu/size.go b/pkg/storage/driver/cache/lfu/size.go new file mode 100644 index 0000000..19c5728 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/size.go @@ -0,0 +1,41 @@ +package lfu + +import ( + "github.com/pkg/errors" +) + +type Measurable interface { + Size() (int, error) +} + +func DefaultGetValueSize[V any](value V) (int, error) { + switch v := any(value).(type) { + case int: + return v, nil + case int8: + return int(v), nil + case int32: + return int(v), nil + case int64: + return int(v), nil + case float32: + return int(v), nil + case float64: + return int(v), nil + case []byte: + return len(v), nil + case string: + return len(v), nil + } + + if measurable, ok := any(value).(Measurable); ok { + size, err := measurable.Size() + if err != nil { + return 0, errors.WithStack(err) + } + + return size, nil + } + + return 0, errors.Errorf("could not retrieve size of type '%T'", value) +} diff --git a/pkg/storage/driver/cache/lfu/store.go b/pkg/storage/driver/cache/lfu/store.go new file mode 100644 index 0000000..7b2bd70 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/store.go @@ -0,0 +1,7 @@ +package lfu + +type Store[K comparable, V any] interface { + Delete(key K) error + Set(key K, value V) error + Get(key K) (V, error) +} diff --git a/pkg/storage/driver/cache/lfu/synchronizer.go b/pkg/storage/driver/cache/lfu/synchronizer.go new file mode 100644 index 0000000..7f7af2e --- /dev/null +++ b/pkg/storage/driver/cache/lfu/synchronizer.go @@ -0,0 +1,56 @@ +package lfu + +import ( + "sync" + + "github.com/pkg/errors" +) + +type Synchronizer[K comparable] struct { + index *Map[K, *sync.RWMutex] +} + +func (s *Synchronizer[K]) Remove(key K) { + s.index.Delete(key) +} + +func (s *Synchronizer[K]) ReadTx(key K, fn func(upgrade func(fn func())) error) error { + mutex, _ := s.index.GetOrSet(key, &sync.RWMutex{}) + mutex.RLock() + defer mutex.RUnlock() + + upgrade := func(fn func()) { + mutex.RUnlock() + mutex.Lock() + defer func() { + mutex.Unlock() + mutex.RLock() + }() + + fn() + } + + if err := fn(upgrade); err != nil { + return errors.WithStack(err) + } + + return nil +} + +func (s *Synchronizer[K]) WriteTx(key K, fn func() error) error { + mutex, _ := s.index.GetOrSet(key, &sync.RWMutex{}) + mutex.Lock() + defer mutex.Unlock() + + if err := fn(); err != nil { + return errors.WithStack(err) + } + + return nil +} + +func NewSynchronizer[K comparable]() *Synchronizer[K] { + return &Synchronizer[K]{ + index: NewMap[K, *sync.RWMutex](), + } +} diff --git a/pkg/storage/driver/cache/lfu/testsuite/main.go b/pkg/storage/driver/cache/lfu/testsuite/main.go new file mode 100644 index 0000000..f8406ce --- /dev/null +++ b/pkg/storage/driver/cache/lfu/testsuite/main.go @@ -0,0 +1,41 @@ +package testsuite + +import ( + "reflect" + "runtime" + "strings" + "testing" + + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu" + "github.com/pkg/errors" +) + +type StoreFactory func(testName string) lfu.Store[string, string] +type testCase func(t *testing.T, store lfu.Store[string, string]) error + +var testCases = []testCase{ + testSetGetDelete, + testEviction, + testConcurrent, + testMultipleSet, + testTTL, +} + +func TestCacheWithStore(t *testing.T, factory StoreFactory) { + for _, tc := range testCases { + funcName := runtime.FuncForPC(reflect.ValueOf(tc).Pointer()).Name() + funcNameParts := strings.Split(funcName, "/") + testName := funcNameParts[len(funcNameParts)-1] + func(tc testCase) { + t.Run(testName, func(t *testing.T) { + t.Parallel() + + store := factory(testName) + + if err := tc(t, store); err != nil { + t.Fatalf("%+v", errors.WithStack(err)) + } + }) + }(tc) + } +} diff --git a/pkg/storage/driver/cache/lfu/testsuite/test_concurrent.go b/pkg/storage/driver/cache/lfu/testsuite/test_concurrent.go new file mode 100644 index 0000000..67d6813 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/testsuite/test_concurrent.go @@ -0,0 +1,67 @@ +package testsuite + +import ( + "fmt" + "sync" + "testing" + + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu" + "github.com/pkg/errors" +) + +func testConcurrent(t *testing.T, store lfu.Store[string, string]) error { + const value = "foobar" + totalKeys := 25 + totalSize := len(value) * totalKeys + capacity := totalSize / 2 + + cache := lfu.NewCache[string, string](store, + lfu.WithCapacity[string, string](capacity), + lfu.WithLog[string, string](t.Logf), + ) + + var wg sync.WaitGroup + + wg.Add(totalKeys) + + loops := totalKeys * 10 + + for i := 0; i < totalKeys; i++ { + key := fmt.Sprintf("key%d", i) + func(key string) { + go func() { + defer wg.Done() + for i := 0; i < loops; i++ { + if err := cache.Set(key, value); err != nil { + t.Errorf("%+v", errors.WithStack(err)) + } + } + }() + }(key) + } + + wg.Wait() + + t.Logf("cache before final evict [capacity: %d, size: %d, len: %d]", cache.Capacity(), cache.Size(), cache.Len()) + + if err := cache.Evict(); err != nil { + t.Errorf("%+v", errors.WithStack(err)) + } + + t.Logf("cache after final evict [capacity: %d, size: %d, len: %d]", cache.Capacity(), cache.Size(), cache.Len()) + + expectedLength := capacity / len(value) + if e, g := expectedLength, cache.Len(); e < g { + t.Errorf("cache.Len(): expected <= %d, got %d", e, g) + } + + if cache.Size() > capacity { + t.Errorf("cache.Size(): expected <= %d, got %d", capacity, cache.Size()) + } + + if e, g := expectedLength*len(value), cache.Size(); e < g { + t.Errorf("cache.Size(): expected <= %d, got %d", e, g) + } + + return nil +} diff --git a/pkg/storage/driver/cache/lfu/testsuite/test_eviction.go b/pkg/storage/driver/cache/lfu/testsuite/test_eviction.go new file mode 100644 index 0000000..1ed117d --- /dev/null +++ b/pkg/storage/driver/cache/lfu/testsuite/test_eviction.go @@ -0,0 +1,70 @@ +package testsuite + +import ( + "testing" + + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu" + "github.com/pkg/errors" +) + +func testEviction(t *testing.T, store lfu.Store[string, string]) error { + cache := lfu.NewCache[string, string](store, + lfu.WithCapacity[string, string](10), + lfu.WithLog[string, string](t.Logf), + ) + + if err := cache.Set("key1", "key1"); err != nil { + return errors.WithStack(err) + } + + if err := cache.Set("key2", "key2"); err != nil { + return errors.WithStack(err) + } + + // Increment frequency of key2 + if _, err := cache.Get("key2"); err != nil { + return errors.WithStack(err) + } + + if e, g := 8, cache.Size(); e != g { + t.Errorf("cache.Size(): expected '%v', got '%v'", e, g) + } + + if err := cache.Set("key3", "key3"); err != nil { + return errors.WithStack(err) + } + + t.Logf("cache [capacity: %d, size: %d, len: %d]", cache.Capacity(), cache.Size(), cache.Len()) + + _, err := cache.Get("key1") + if err == nil { + t.Errorf("expected 'key1' to be evicted") + } + + if !errors.Is(err, lfu.ErrNotFound) { + t.Errorf("expected err to be 'ErrNotFound'") + } + + value, err := cache.Get("key2") + if err != nil { + return errors.WithStack(err) + } + + if e, g := "key2", value; e < g { + t.Errorf("cache.Get(\"key2\"): expected %v, got %v", e, g) + } + + if e, g := cache.Capacity(), cache.Size(); e < g { + t.Errorf("cache.Size(): expected <= %d, got %d", e, g) + } + + if e, g := 2, cache.Len(); e != g { + t.Errorf("cache.Len(): expected %d, got %d", e, g) + } + + if cache.Size() < 0 { + t.Errorf("cache.Size(): expected value >= 0, got %d", cache.Size()) + } + + return nil +} diff --git a/pkg/storage/driver/cache/lfu/testsuite/test_multiple_set.go b/pkg/storage/driver/cache/lfu/testsuite/test_multiple_set.go new file mode 100644 index 0000000..f1ad374 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/testsuite/test_multiple_set.go @@ -0,0 +1,80 @@ +package testsuite + +import ( + "testing" + + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu" + "github.com/pkg/errors" +) + +func testMultipleSet(t *testing.T, store lfu.Store[string, string]) error { + const ( + key = "mykey" + firstValue = "foo" + secondValue = "bar" + thirdValue = "foobar" + ) + + cache := lfu.NewCache[string, string](store) + + if e, g := 0, cache.Size(); e != g { + t.Errorf("cache.Size(): expected '%v', got '%v'", e, g) + } + + if err := cache.Set(key, firstValue); err != nil { + return errors.WithStack(err) + } + + if e, g := len(firstValue), cache.Size(); e != g { + t.Errorf("cache.Size(): expected '%v', got '%v'", e, g) + } + + retrieved, err := cache.Get(key) + if err != nil { + return errors.WithStack(err) + } + + if e, g := firstValue, retrieved; e != g { + t.Errorf("cache.Get(key): expected '%v', got '%v'", e, g) + } + + if err := cache.Set(key, secondValue); err != nil { + return errors.WithStack(err) + } + + if e, g := len(secondValue), cache.Size(); e != g { + t.Errorf("cache.Size(): expected '%v', got '%v'", e, g) + } + + retrieved, err = cache.Get(key) + if err != nil { + return errors.WithStack(err) + } + + if e, g := secondValue, retrieved; e != g { + t.Errorf("cache.Get(key): expected '%v', got '%v'", e, g) + } + + if err := cache.Set(key, thirdValue); err != nil { + return errors.WithStack(err) + } + + if e, g := len(thirdValue), cache.Size(); e != g { + t.Errorf("cache.Size(): expected '%v', got '%v'", e, g) + } + + retrieved, err = cache.Get(key) + if err != nil { + return errors.WithStack(err) + } + + if e, g := thirdValue, retrieved; e != g { + t.Errorf("cache.Get(key): expected '%v', got '%v'", e, g) + } + + if e, g := len(thirdValue), cache.Size(); e != g { + t.Errorf("cache.Size(): expected '%v', got '%v'", e, g) + } + + return nil +} diff --git a/pkg/storage/driver/cache/lfu/testsuite/test_set_get_delete.go b/pkg/storage/driver/cache/lfu/testsuite/test_set_get_delete.go new file mode 100644 index 0000000..b6f4e14 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/testsuite/test_set_get_delete.go @@ -0,0 +1,66 @@ +package testsuite + +import ( + "testing" + + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu" + "github.com/pkg/errors" +) + +func testSetGetDelete(t *testing.T, store lfu.Store[string, string]) error { + const ( + key = "mykey" + value = "foobar" + ) + + cache := lfu.NewCache[string, string](store, lfu.WithCapacity[string, string](10)) + + if e, g := 0, cache.Size(); e != g { + t.Errorf("cache.Size(): expected '%v', got '%v'", e, g) + } + + if err := cache.Set(key, value); err != nil { + return errors.WithStack(err) + } + + if e, g := len(value), cache.Size(); e != g { + t.Errorf("cache.Size(): expected '%v', got '%v'", e, g) + } + + if e, g := 1, cache.Len(); e != g { + t.Errorf("cache.Len(): expected '%v', got '%v'", e, g) + } + + retrieved, err := cache.Get(key) + if err != nil { + return errors.WithStack(err) + } + + if e, g := value, retrieved; e != g { + t.Errorf("cache.Get(key): expected '%v', got '%v'", e, g) + } + + if err := cache.Delete(key); err != nil { + return errors.WithStack(err) + } + + if _, err := cache.Get(key); err == nil || !errors.Is(err, lfu.ErrNotFound) { + t.Errorf("cache.Get(key): err should be lfu.ErrNotFound, got '%v'", errors.WithStack(err)) + } + + if e, g := value, retrieved; e != g { + t.Errorf("cache.Get(key): expected '%v', got '%v'", e, g) + } + + if e, g := 0, cache.Size(); e != g { + t.Errorf("cache.Size(): expected '%v', got '%v'", e, g) + } + + if e, g := 0, cache.Len(); e != g { + t.Errorf("cache.Len(): expected '%v', got '%v'", e, g) + } + + t.Logf("cache [capacity: %d, size: %d, len: %d]", cache.Capacity(), cache.Size(), cache.Len()) + + return nil +} diff --git a/pkg/storage/driver/cache/lfu/testsuite/test_ttl.go b/pkg/storage/driver/cache/lfu/testsuite/test_ttl.go new file mode 100644 index 0000000..8f5f1b0 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/testsuite/test_ttl.go @@ -0,0 +1,54 @@ +package testsuite + +import ( + "testing" + "time" + + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu" + "github.com/pkg/errors" +) + +func testTTL(t *testing.T, store lfu.Store[string, string]) error { + const ( + key = "mykey" + value = "foobar" + ) + + ttl := time.Second + + cache := lfu.NewCache[string, string](store, + lfu.WithTTL[string, string](ttl), + lfu.WithCapacity[string, string](10), + ) + + if err := cache.Set(key, value); err != nil { + return errors.WithStack(err) + } + + retrieved, err := cache.Get(key) + if err != nil { + return errors.WithStack(err) + } + + if e, g := value, retrieved; e != g { + t.Errorf("cache.Get(key): expected '%v', got '%v'", e, g) + } + + time.Sleep(ttl * 2) + + if _, err := cache.Get(key); !errors.Is(err, lfu.ErrNotFound) { + t.Errorf("cache.Get(key): expected err == lfu.ErrNotFound, got '%v'", err) + } + + t.Logf("cache [capacity: %d, size: %d, len: %d]", cache.Capacity(), cache.Size(), cache.Len()) + + if e, g := 0, cache.Size(); e != g { + t.Errorf("cache.Size(): expected '%v', got '%v'", e, g) + } + + if e, g := 0, cache.Len(); e != g { + t.Errorf("cache.Len(): expected '%v', got '%v'", e, g) + } + + return nil +} diff --git a/pkg/storage/driver/cache/options.go b/pkg/storage/driver/cache/options.go index 974d0c3..eae2098 100644 --- a/pkg/storage/driver/cache/options.go +++ b/pkg/storage/driver/cache/options.go @@ -3,14 +3,25 @@ package cache import ( "time" - "github.com/allegro/bigcache/v3" + "forge.cadoles.com/arcad/edge/pkg/storage" + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu" + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu/memory" ) type Options struct { - CacheTTL time.Duration - BigCache bigcache.Config - BucketCacheSize int + CacheTTL time.Duration + + BlobCacheStore lfu.Store[string, []byte] + // Maximum total size of cached data + BlobCacheSize int + + BlobInfoCacheStore lfu.Store[string, storage.BlobInfo] + // Maximum number of blob infos BlobInfoCacheSize int + + BlobBucketCacheStore lfu.Store[string, storage.BlobBucket] + // Maximum number of blob bucket + BlobBucketCacheSize int } type OptionFunc func(opts *Options) @@ -18,10 +29,13 @@ type OptionFunc func(opts *Options) func NewOptions(funcs ...OptionFunc) *Options { defaultTTL := 60 * time.Minute opts := &Options{ - CacheTTL: defaultTTL, - BigCache: bigcache.DefaultConfig(defaultTTL), - BucketCacheSize: 16, - BlobInfoCacheSize: 256, + CacheTTL: defaultTTL, + BlobCacheStore: memory.NewStore[string, []byte](), + BlobCacheSize: 1e+9, // 1Gb + BlobInfoCacheStore: memory.NewStore[string, storage.BlobInfo](), + BlobInfoCacheSize: 256, + BlobBucketCacheStore: memory.NewStore[string, storage.BlobBucket](), + BlobBucketCacheSize: 16, } for _, fn := range funcs { @@ -37,15 +51,15 @@ func WithCacheTTL(ttl time.Duration) OptionFunc { } } -func WithBigCacheConfig(config bigcache.Config) OptionFunc { +func WithBlobBucketCacheSize(size int) OptionFunc { return func(opts *Options) { - opts.BigCache = config + opts.BlobBucketCacheSize = size } } -func WithBucketCacheSize(size int) OptionFunc { +func WithBlobBucketCacheStore(store lfu.Store[string, storage.BlobBucket]) OptionFunc { return func(opts *Options) { - opts.BucketCacheSize = size + opts.BlobBucketCacheStore = store } } @@ -54,3 +68,21 @@ func WithBlobInfoCacheSize(size int) OptionFunc { opts.BlobInfoCacheSize = size } } + +func WithBlobInfoCacheStore(store lfu.Store[string, storage.BlobInfo]) OptionFunc { + return func(opts *Options) { + opts.BlobInfoCacheStore = store + } +} + +func WithBlobCacheSize(size int) OptionFunc { + return func(opts *Options) { + opts.BlobCacheSize = size + } +} + +func WithBlobCacheStore(store lfu.Store[string, []byte]) OptionFunc { + return func(opts *Options) { + opts.BlobCacheStore = store + } +} diff --git a/pkg/storage/driver/cache/reader.go b/pkg/storage/driver/cache/reader.go index 3f6c350..70a3739 100644 --- a/pkg/storage/driver/cache/reader.go +++ b/pkg/storage/driver/cache/reader.go @@ -1,18 +1,21 @@ package cache import ( + "bytes" "context" "fmt" "io" - "github.com/allegro/bigcache/v3" + "cdr.dev/slog" + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu" "github.com/pkg/errors" "gitlab.com/wpetit/goweb/logger" ) type readCacher struct { reader io.ReadSeekCloser - cache *bigcache.BigCache + cache *lfu.Cache[string, []byte] + buf bytes.Buffer key string } @@ -22,6 +25,21 @@ func (r *readCacher) Close() error { return errors.WithStack(err) } + if err := r.cache.Set(r.key, r.buf.Bytes()); err != nil { + var logErr slog.Field + if errors.Is(err, lfu.ErrSizeExceedCapacity) { + logErr = logger.E(errors.WithStack(err)) + } else { + logErr = logger.CapturedE(errors.WithStack(err)) + } + logger.Error(context.Background(), "could not cache buffered data", + logger.F("cacheKey", r.key), + logErr, + ) + } + + r.buf.Reset() + return nil } @@ -37,13 +55,9 @@ func (r *readCacher) Read(p []byte) (n int, err error) { } if length > 0 { - if err := r.cache.Append(r.key, p[:length]); err != nil { + if _, err := r.buf.Write(p[:length]); err != nil { ctx := logger.With(context.Background(), logger.F("cacheKey", r.key)) logger.Error(ctx, "could not write to buffer", logger.CapturedE(errors.WithStack(err))) - - if err := r.cache.Delete(r.key); err != nil { - logger.Error(ctx, "could not delete cache key", logger.CapturedE(errors.WithStack(err))) - } } }