Compare commits
No commits in common. "a268759d330fba4ea0bbe45bb8e759c92d875b09" and "b9c08f647cffcde986b67c910bb5e2d363c820b4" have entirely different histories.
a268759d33
...
b9c08f647c
|
@ -1,4 +1,4 @@
|
||||||
RUN_APP_ARGS=""
|
RUN_APP_ARGS=""
|
||||||
#EDGE_DOCUMENTSTORE_DSN="rpc://localhost:3001/documentstore?tenant=local&appId=%APPID%"
|
#EDGE_DOCUMENTSTORE_DSN="rpc://localhost:3001/documentstore?tenant=local&appId=%APPID%"
|
||||||
#EDGE_BLOBSTORE_DSN="cache://localhost:3001/blobstore?driver=rpc&tenant=local&appId=%APPID%&blobCacheStoreType=fs&blobCacheStoreBaseDir=data/cache/%APPID%&blobCacheSize=64MB"
|
#EDGE_BLOBSTORE_DSN="cache://localhost:3001/blobstore?driver=rpc&tenant=local&appId=%APPID%"
|
||||||
#EDGE_SHARESTORE_DSN="rpc://localhost:3001/sharestore?tenant=local"
|
#EDGE_SHARESTORE_DSN="rpc://localhost:3001/sharestore?tenant=local"
|
3
go.mod
3
go.mod
|
@ -3,15 +3,14 @@ module forge.cadoles.com/arcad/edge
|
||||||
go 1.21
|
go 1.21
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
github.com/allegro/bigcache/v3 v3.1.0
|
||||||
github.com/getsentry/sentry-go v0.25.0
|
github.com/getsentry/sentry-go v0.25.0
|
||||||
github.com/hashicorp/golang-lru/v2 v2.0.7
|
github.com/hashicorp/golang-lru/v2 v2.0.7
|
||||||
github.com/hashicorp/mdns v1.0.5
|
github.com/hashicorp/mdns v1.0.5
|
||||||
github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf
|
|
||||||
github.com/jackc/puddle/v2 v2.2.1
|
github.com/jackc/puddle/v2 v2.2.1
|
||||||
github.com/keegancsmith/rpc v1.3.0
|
github.com/keegancsmith/rpc v1.3.0
|
||||||
github.com/klauspost/compress v1.16.6
|
github.com/klauspost/compress v1.16.6
|
||||||
github.com/lestrrat-go/jwx/v2 v2.0.8
|
github.com/lestrrat-go/jwx/v2 v2.0.8
|
||||||
github.com/mitchellh/hashstructure/v2 v2.0.2
|
|
||||||
github.com/ulikunitz/xz v0.5.11
|
github.com/ulikunitz/xz v0.5.11
|
||||||
go.uber.org/goleak v1.3.0
|
go.uber.org/goleak v1.3.0
|
||||||
modernc.org/sqlite v1.20.4
|
modernc.org/sqlite v1.20.4
|
||||||
|
|
6
go.sum
6
go.sum
|
@ -37,6 +37,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
|
||||||
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
|
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
|
||||||
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
|
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
|
||||||
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
||||||
|
github.com/Bornholm/bigcache v0.0.0-20231201111725-1ddf51584cad h1:PTOf0L/YjiVis5LYzJmi7WqttJ/h/DU6h06aJ24Kpbg=
|
||||||
|
github.com/Bornholm/bigcache v0.0.0-20231201111725-1ddf51584cad/go.mod h1:+q+mA6jGsjfsZ2HzhVSk38qDbX2/ZBJ7Yyciv75Ruo0=
|
||||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||||
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
|
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
|
||||||
github.com/GeertJohan/go.incremental v1.0.0/go.mod h1:6fAjUhbVuX1KcMD3c8TEgVUqmo4seqhv0i0kdATSkM0=
|
github.com/GeertJohan/go.incremental v1.0.0/go.mod h1:6fAjUhbVuX1KcMD3c8TEgVUqmo4seqhv0i0kdATSkM0=
|
||||||
|
@ -206,8 +208,6 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:
|
||||||
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
|
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
|
||||||
github.com/igm/sockjs-go/v3 v3.0.2 h1:2m0k53w0DBiGozeQUIEPR6snZFmpFpYvVsGnfLPNXbE=
|
github.com/igm/sockjs-go/v3 v3.0.2 h1:2m0k53w0DBiGozeQUIEPR6snZFmpFpYvVsGnfLPNXbE=
|
||||||
github.com/igm/sockjs-go/v3 v3.0.2/go.mod h1:UqchsOjeagIBFHvd+RZpLaVRbCwGilEC08EDHsD1jYE=
|
github.com/igm/sockjs-go/v3 v3.0.2/go.mod h1:UqchsOjeagIBFHvd+RZpLaVRbCwGilEC08EDHsD1jYE=
|
||||||
github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf h1:FtEj8sfIcaaBfAKrE1Cwb61YDtYq9JxChK1c7AKce7s=
|
|
||||||
github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf/go.mod h1:yrqSXGoD/4EKfF26AOGzscPOgTTJcyAwM2rpixWT+t4=
|
|
||||||
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
|
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
|
||||||
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
|
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
|
||||||
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
||||||
|
@ -258,8 +258,6 @@ github.com/miekg/dns v0.0.0-20161006100029-fc4e1e2843d8/go.mod h1:W1PPwlIAgtquWB
|
||||||
github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI=
|
github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI=
|
||||||
github.com/miekg/dns v1.1.53 h1:ZBkuHr5dxHtB1caEOlZTLPo7D3L3TWckgUUs/RHfDxw=
|
github.com/miekg/dns v1.1.53 h1:ZBkuHr5dxHtB1caEOlZTLPo7D3L3TWckgUUs/RHfDxw=
|
||||||
github.com/miekg/dns v1.1.53/go.mod h1:uInx36IzPl7FYnDcMeVWxj9byh7DutNykX4G9Sj60FY=
|
github.com/miekg/dns v1.1.53/go.mod h1:uInx36IzPl7FYnDcMeVWxj9byh7DutNykX4G9Sj60FY=
|
||||||
github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4=
|
|
||||||
github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE=
|
|
||||||
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
|
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
|
||||||
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
|
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
|
||||||
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
|
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
|
||||||
|
|
|
@ -6,16 +6,17 @@ import (
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu"
|
"github.com/allegro/bigcache/v3"
|
||||||
|
"github.com/hashicorp/golang-lru/v2/expirable"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"gitlab.com/wpetit/goweb/logger"
|
"gitlab.com/wpetit/goweb/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
type BlobBucket struct {
|
type BlobBucket struct {
|
||||||
bucket storage.BlobBucket
|
bucket storage.BlobBucket
|
||||||
blobCache *lfu.Cache[string, []byte]
|
contentCache *bigcache.BigCache
|
||||||
bucketCache *lfu.Cache[string, storage.BlobBucket]
|
blobInfoCache *expirable.LRU[string, storage.BlobInfo]
|
||||||
blobInfoCache *lfu.Cache[string, storage.BlobInfo]
|
bucketCache *expirable.LRU[string, storage.BlobBucket]
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close implements storage.BlobBucket.
|
// Close implements storage.BlobBucket.
|
||||||
|
@ -38,16 +39,7 @@ func (b *BlobBucket) Delete(ctx context.Context, id storage.BlobID) error {
|
||||||
// Get implements storage.BlobBucket.
|
// Get implements storage.BlobBucket.
|
||||||
func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobInfo, error) {
|
func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobInfo, error) {
|
||||||
key := b.getCacheKey(id)
|
key := b.getCacheKey(id)
|
||||||
blobInfo, err := b.blobInfoCache.Get(key)
|
if blobInfo, ok := b.blobInfoCache.Get(key); ok {
|
||||||
if err != nil && !errors.Is(err, lfu.ErrNotFound) {
|
|
||||||
logger.Error(
|
|
||||||
ctx, "could not retrieve blob info from cache",
|
|
||||||
logger.F("cacheKey", key),
|
|
||||||
logger.CapturedE(errors.WithStack(err)),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
if blobInfo != nil {
|
|
||||||
logger.Debug(
|
logger.Debug(
|
||||||
ctx, "found blob info in cache",
|
ctx, "found blob info in cache",
|
||||||
logger.F("cacheKey", key),
|
logger.F("cacheKey", key),
|
||||||
|
@ -60,25 +52,13 @@ func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobIn
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, storage.ErrBucketClosed) {
|
if errors.Is(err, storage.ErrBucketClosed) {
|
||||||
b.clearCache(ctx, id)
|
b.clearCache(ctx, id)
|
||||||
if err := b.bucketCache.Delete(b.Name()); err != nil && !errors.Is(err, lfu.ErrNotFound) {
|
b.bucketCache.Remove(b.Name())
|
||||||
logger.Error(
|
|
||||||
ctx, "could not delete bucket from cache",
|
|
||||||
logger.F("cacheKey", b.Name()),
|
|
||||||
logger.CapturedE(errors.WithStack(err)),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, errors.WithStack(err)
|
return nil, errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := b.blobInfoCache.Set(key, info); err != nil {
|
b.blobInfoCache.Add(key, info)
|
||||||
logger.Error(
|
|
||||||
ctx, "could not set blob info in cache",
|
|
||||||
logger.F("cacheKey", key),
|
|
||||||
logger.CapturedE(errors.WithStack(err)),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
return info, nil
|
return info, nil
|
||||||
}
|
}
|
||||||
|
@ -88,13 +68,7 @@ func (b *BlobBucket) List(ctx context.Context) ([]storage.BlobInfo, error) {
|
||||||
infos, err := b.bucket.List(ctx)
|
infos, err := b.bucket.List(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, storage.ErrBucketClosed) {
|
if errors.Is(err, storage.ErrBucketClosed) {
|
||||||
if err := b.bucketCache.Delete(b.Name()); err != nil && !errors.Is(err, lfu.ErrNotFound) {
|
b.bucketCache.Remove(b.Name())
|
||||||
logger.Error(
|
|
||||||
ctx, "could not delete bucket from cache",
|
|
||||||
logger.F("cacheKey", b.Name()),
|
|
||||||
logger.CapturedE(errors.WithStack(err)),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, errors.WithStack(err)
|
return nil, errors.WithStack(err)
|
||||||
|
@ -102,13 +76,7 @@ func (b *BlobBucket) List(ctx context.Context) ([]storage.BlobInfo, error) {
|
||||||
|
|
||||||
for _, ifo := range infos {
|
for _, ifo := range infos {
|
||||||
key := b.getCacheKey(ifo.ID())
|
key := b.getCacheKey(ifo.ID())
|
||||||
if err := b.blobInfoCache.Set(key, ifo); err != nil {
|
b.blobInfoCache.Add(key, ifo)
|
||||||
logger.Error(
|
|
||||||
ctx, "could not set blob info in cache",
|
|
||||||
logger.F("cacheKey", key),
|
|
||||||
logger.CapturedE(errors.WithStack(err)),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return infos, nil
|
return infos, nil
|
||||||
|
@ -125,6 +93,7 @@ func (b *BlobBucket) NewReader(ctx context.Context, id storage.BlobID) (io.ReadS
|
||||||
logger.Debug(
|
logger.Debug(
|
||||||
ctx, "found blob content in cache",
|
ctx, "found blob content in cache",
|
||||||
logger.F("cacheKey", b.getCacheKey(id)),
|
logger.F("cacheKey", b.getCacheKey(id)),
|
||||||
|
logger.F("cacheStats", b.contentCache.Stats()),
|
||||||
)
|
)
|
||||||
return cached, nil
|
return cached, nil
|
||||||
}
|
}
|
||||||
|
@ -133,13 +102,7 @@ func (b *BlobBucket) NewReader(ctx context.Context, id storage.BlobID) (io.ReadS
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, storage.ErrBucketClosed) {
|
if errors.Is(err, storage.ErrBucketClosed) {
|
||||||
b.clearCache(ctx, id)
|
b.clearCache(ctx, id)
|
||||||
if err := b.bucketCache.Delete(b.Name()); err != nil && !errors.Is(err, lfu.ErrNotFound) {
|
b.bucketCache.Remove(b.Name())
|
||||||
logger.Error(
|
|
||||||
ctx, "could not delete bucket from cache",
|
|
||||||
logger.F("cacheKey", b.Name()),
|
|
||||||
logger.CapturedE(errors.WithStack(err)),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, errors.WithStack(err)
|
return nil, errors.WithStack(err)
|
||||||
|
@ -147,7 +110,7 @@ func (b *BlobBucket) NewReader(ctx context.Context, id storage.BlobID) (io.ReadS
|
||||||
|
|
||||||
return &readCacher{
|
return &readCacher{
|
||||||
reader: reader,
|
reader: reader,
|
||||||
cache: b.blobCache,
|
cache: b.contentCache,
|
||||||
key: b.getCacheKey(id),
|
key: b.getCacheKey(id),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
@ -158,13 +121,13 @@ func (b *BlobBucket) getCacheKey(id storage.BlobID) string {
|
||||||
|
|
||||||
func (b *BlobBucket) inContentCache(id storage.BlobID) (io.ReadSeekCloser, bool) {
|
func (b *BlobBucket) inContentCache(id storage.BlobID) (io.ReadSeekCloser, bool) {
|
||||||
key := b.getCacheKey(id)
|
key := b.getCacheKey(id)
|
||||||
data, err := b.blobCache.Get(key)
|
data, err := b.contentCache.Get(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, lfu.ErrNotFound) {
|
if errors.Is(err, bigcache.ErrEntryNotFound) {
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Error(context.Background(), "could not retrieve cached value", logger.CapturedE(errors.WithStack(err)))
|
logger.Error(context.Background(), "could not retrieve cache value", logger.CapturedE(errors.WithStack(err)))
|
||||||
|
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
|
@ -177,17 +140,11 @@ func (b *BlobBucket) clearCache(ctx context.Context, id storage.BlobID) {
|
||||||
|
|
||||||
logger.Debug(ctx, "clearing cache", logger.F("cacheKey", key))
|
logger.Debug(ctx, "clearing cache", logger.F("cacheKey", key))
|
||||||
|
|
||||||
if err := b.blobCache.Delete(key); err != nil && !errors.Is(err, lfu.ErrNotFound) {
|
if err := b.contentCache.Delete(key); err != nil && !errors.Is(err, bigcache.ErrEntryNotFound) {
|
||||||
logger.Error(ctx, "could not clear cache", logger.F("cacheKey", key), logger.CapturedE(errors.WithStack(err)))
|
logger.Error(ctx, "could not clear cache", logger.CapturedE(errors.WithStack(err)))
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := b.blobInfoCache.Delete(key); err != nil {
|
b.blobInfoCache.Remove(key)
|
||||||
logger.Error(
|
|
||||||
ctx, "could not delete blob info from cache",
|
|
||||||
logger.F("cacheKey", key),
|
|
||||||
logger.CapturedE(errors.WithStack(err)),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewWriter implements storage.BlobBucket.
|
// NewWriter implements storage.BlobBucket.
|
||||||
|
@ -197,13 +154,7 @@ func (b *BlobBucket) NewWriter(ctx context.Context, id storage.BlobID) (io.Write
|
||||||
writer, err := b.bucket.NewWriter(ctx, id)
|
writer, err := b.bucket.NewWriter(ctx, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, storage.ErrBucketClosed) {
|
if errors.Is(err, storage.ErrBucketClosed) {
|
||||||
if err := b.bucketCache.Delete(b.Name()); err != nil && !errors.Is(err, lfu.ErrNotFound) {
|
b.bucketCache.Remove(b.Name())
|
||||||
logger.Error(
|
|
||||||
ctx, "could not delete bucket from cache",
|
|
||||||
logger.F("cacheKey", b.Name()),
|
|
||||||
logger.CapturedE(errors.WithStack(err)),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, errors.WithStack(err)
|
return nil, errors.WithStack(err)
|
||||||
|
@ -217,13 +168,7 @@ func (b *BlobBucket) Size(ctx context.Context) (int64, error) {
|
||||||
size, err := b.bucket.Size(ctx)
|
size, err := b.bucket.Size(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, storage.ErrBucketClosed) {
|
if errors.Is(err, storage.ErrBucketClosed) {
|
||||||
if err := b.bucketCache.Delete(b.Name()); err != nil && !errors.Is(err, lfu.ErrNotFound) {
|
b.bucketCache.Remove(b.Name())
|
||||||
logger.Error(
|
|
||||||
ctx, "could not delete bucket from cache",
|
|
||||||
logger.F("cacheKey", b.Name()),
|
|
||||||
logger.CapturedE(errors.WithStack(err)),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0, errors.WithStack(err)
|
return 0, errors.WithStack(err)
|
||||||
|
|
|
@ -4,16 +4,17 @@ import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu"
|
"github.com/allegro/bigcache/v3"
|
||||||
|
"github.com/hashicorp/golang-lru/v2/expirable"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"gitlab.com/wpetit/goweb/logger"
|
"gitlab.com/wpetit/goweb/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
type BlobStore struct {
|
type BlobStore struct {
|
||||||
store storage.BlobStore
|
store storage.BlobStore
|
||||||
blobCache *lfu.Cache[string, []byte]
|
contentCache *bigcache.BigCache
|
||||||
bucketCache *lfu.Cache[string, storage.BlobBucket]
|
bucketCache *expirable.LRU[string, storage.BlobBucket]
|
||||||
blobInfoCache *lfu.Cache[string, storage.BlobInfo]
|
blobInfoCache *expirable.LRU[string, storage.BlobInfo]
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteBucket implements storage.BlobStore.
|
// DeleteBucket implements storage.BlobStore.
|
||||||
|
@ -22,7 +23,7 @@ func (s *BlobStore) DeleteBucket(ctx context.Context, name string) error {
|
||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
s.bucketCache.Delete(name)
|
s.bucketCache.Remove(name)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -39,40 +40,28 @@ func (s *BlobStore) ListBuckets(ctx context.Context) ([]string, error) {
|
||||||
|
|
||||||
// OpenBucket implements storage.BlobStore.
|
// OpenBucket implements storage.BlobStore.
|
||||||
func (s *BlobStore) OpenBucket(ctx context.Context, name string) (storage.BlobBucket, error) {
|
func (s *BlobStore) OpenBucket(ctx context.Context, name string) (storage.BlobBucket, error) {
|
||||||
bucket, err := s.bucketCache.Get(name)
|
bucket, ok := s.bucketCache.Get(name)
|
||||||
if err == nil {
|
if ok {
|
||||||
logger.Debug(ctx, "found bucket in cache", logger.F("name", name))
|
logger.Debug(ctx, "found bucket in cache", logger.F("name", name))
|
||||||
|
|
||||||
return &BlobBucket{
|
return &BlobBucket{
|
||||||
bucket: bucket,
|
bucket: bucket,
|
||||||
blobCache: s.blobCache,
|
contentCache: s.contentCache,
|
||||||
blobInfoCache: s.blobInfoCache,
|
blobInfoCache: s.blobInfoCache,
|
||||||
bucketCache: s.bucketCache,
|
bucketCache: s.bucketCache,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil && !errors.Is(err, lfu.ErrNotFound) {
|
bucket, err := s.store.OpenBucket(ctx, name)
|
||||||
logger.Error(ctx, "could not retrieve bucket from cache",
|
|
||||||
logger.F("cacheKey", name),
|
|
||||||
logger.CapturedE(errors.WithStack(err)),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
bucket, err = s.store.OpenBucket(ctx, name)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.WithStack(err)
|
return nil, errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.bucketCache.Set(name, bucket); err != nil {
|
s.bucketCache.Add(name, bucket)
|
||||||
logger.Error(ctx, "could not set bucket in cache",
|
|
||||||
logger.F("cacheKey", name),
|
|
||||||
logger.CapturedE(errors.WithStack(err)),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &BlobBucket{
|
return &BlobBucket{
|
||||||
bucket: bucket,
|
bucket: bucket,
|
||||||
blobCache: s.blobCache,
|
contentCache: s.contentCache,
|
||||||
blobInfoCache: s.blobInfoCache,
|
blobInfoCache: s.blobInfoCache,
|
||||||
bucketCache: s.bucketCache,
|
bucketCache: s.bucketCache,
|
||||||
}, nil
|
}, nil
|
||||||
|
@ -81,36 +70,27 @@ func (s *BlobStore) OpenBucket(ctx context.Context, name string) (storage.BlobBu
|
||||||
func NewBlobStore(store storage.BlobStore, funcs ...OptionFunc) (*BlobStore, error) {
|
func NewBlobStore(store storage.BlobStore, funcs ...OptionFunc) (*BlobStore, error) {
|
||||||
options := NewOptions(funcs...)
|
options := NewOptions(funcs...)
|
||||||
|
|
||||||
blobCache := lfu.NewCache[string, []byte](
|
contentCache, err := bigcache.New(context.Background(), options.BigCache)
|
||||||
options.BlobCacheStore,
|
if err != nil {
|
||||||
lfu.WithTTL[string, []byte](options.CacheTTL),
|
return nil, errors.WithStack(err)
|
||||||
lfu.WithCapacity[string, []byte](options.BlobCacheSize),
|
}
|
||||||
lfu.WithGetValueSize[string, []byte](func(value []byte) (int, error) {
|
|
||||||
return len(value), nil
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
|
|
||||||
blobBucketCache := lfu.NewCache[string, storage.BlobBucket](
|
onBlobBucketEvict := func(key string, bucket storage.BlobBucket) {
|
||||||
options.BlobBucketCacheStore,
|
ctx := context.Background()
|
||||||
lfu.WithCapacity[string, storage.BlobBucket](options.BlobBucketCacheSize),
|
logger.Debug(ctx, "evicting blob bucket from cache", logger.F("cacheKey", key))
|
||||||
lfu.WithGetValueSize[string, storage.BlobBucket](func(value storage.BlobBucket) (int, error) {
|
|
||||||
return 1, nil
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
|
|
||||||
blobInfoCache := lfu.NewCache[string, storage.BlobInfo](
|
if err := bucket.Close(); err != nil {
|
||||||
options.BlobInfoCacheStore,
|
logger.Error(ctx, "could not close bucket", logger.E(errors.WithStack(err)))
|
||||||
lfu.WithTTL[string, storage.BlobInfo](options.CacheTTL),
|
}
|
||||||
lfu.WithCapacity[string, storage.BlobInfo](options.BlobInfoCacheSize),
|
}
|
||||||
lfu.WithGetValueSize[string, storage.BlobInfo](func(value storage.BlobInfo) (int, error) {
|
|
||||||
return 1, nil
|
bucketCache := expirable.NewLRU[string, storage.BlobBucket](options.BucketCacheSize, onBlobBucketEvict, options.CacheTTL)
|
||||||
}),
|
blobInfoCache := expirable.NewLRU[string, storage.BlobInfo](options.BlobInfoCacheSize, nil, options.CacheTTL)
|
||||||
)
|
|
||||||
|
|
||||||
return &BlobStore{
|
return &BlobStore{
|
||||||
store: store,
|
store: store,
|
||||||
blobCache: blobCache,
|
contentCache: contentCache,
|
||||||
bucketCache: blobBucketCache,
|
bucketCache: bucketCache,
|
||||||
blobInfoCache: blobInfoCache,
|
blobInfoCache: blobInfoCache,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,19 +1,17 @@
|
||||||
package cache
|
package cache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"context"
|
||||||
"io"
|
"fmt"
|
||||||
"net/url"
|
"net/url"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver"
|
"forge.cadoles.com/arcad/edge/pkg/storage/driver"
|
||||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu"
|
"github.com/allegro/bigcache/v3"
|
||||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu/fs"
|
|
||||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu/memory"
|
|
||||||
"github.com/inhies/go-bytesize"
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
"gitlab.com/wpetit/goweb/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
@ -43,6 +41,13 @@ func blobStoreFactory(dsn *url.URL) (storage.BlobStore, error) {
|
||||||
|
|
||||||
blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithCacheTTL(cacheTTL))
|
blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithCacheTTL(cacheTTL))
|
||||||
|
|
||||||
|
cacheConfig, err := parseBigCacheConfig(&query, cacheTTL)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "could not parse big cache config")
|
||||||
|
}
|
||||||
|
|
||||||
|
blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBigCacheConfig(*cacheConfig))
|
||||||
|
|
||||||
blobBucketCacheSize, err := parseInt(&query, "blobBucketCacheSize")
|
blobBucketCacheSize, err := parseInt(&query, "blobBucketCacheSize")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !errors.Is(err, errNotFound) {
|
if !errors.Is(err, errNotFound) {
|
||||||
|
@ -52,15 +57,7 @@ func blobStoreFactory(dsn *url.URL) (storage.BlobStore, error) {
|
||||||
blobBucketCacheSize = 16
|
blobBucketCacheSize = 16
|
||||||
}
|
}
|
||||||
|
|
||||||
blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBlobBucketCacheSize(int(blobBucketCacheSize)))
|
blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBucketCacheSize(int(blobBucketCacheSize)))
|
||||||
|
|
||||||
blobBucketCacheStorePrefix := "blobBucketCacheStore"
|
|
||||||
blobBucketCacheStore, err := parseCacheStore[string, storage.BlobBucket](&query, blobBucketCacheStorePrefix)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBlobBucketCacheStore(blobBucketCacheStore))
|
|
||||||
|
|
||||||
bloInfoCacheSize, err := parseInt(&query, "bloInfoCacheSize")
|
bloInfoCacheSize, err := parseInt(&query, "bloInfoCacheSize")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -73,46 +70,6 @@ func blobStoreFactory(dsn *url.URL) (storage.BlobStore, error) {
|
||||||
|
|
||||||
blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBlobInfoCacheSize(int(bloInfoCacheSize)))
|
blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBlobInfoCacheSize(int(bloInfoCacheSize)))
|
||||||
|
|
||||||
blobInfoCacheStorePrefix := "blobInfoCacheStore"
|
|
||||||
blobInfoCacheStore, err := parseCacheStore[string, storage.BlobInfo](&query, blobInfoCacheStorePrefix)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBlobInfoCacheStore(blobInfoCacheStore))
|
|
||||||
|
|
||||||
blobCacheSize, err := parseByteSize(&query, "blobCacheSize")
|
|
||||||
if err != nil {
|
|
||||||
if !errors.Is(err, errNotFound) {
|
|
||||||
return nil, errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
blobCacheSize = 256e+6
|
|
||||||
}
|
|
||||||
|
|
||||||
blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBlobCacheSize(int(blobCacheSize)))
|
|
||||||
|
|
||||||
blobCacheStorePrefix := "blobCacheStore"
|
|
||||||
blobCacheStore, err := parseCacheStore[string, []byte](
|
|
||||||
&query, blobCacheStorePrefix,
|
|
||||||
fs.WithMarshalValue[string, []byte](func(value []byte) (io.Reader, error) {
|
|
||||||
return bytes.NewBuffer(value), nil
|
|
||||||
}),
|
|
||||||
fs.WithUnmarshalValue[string, []byte](func(r io.Reader) ([]byte, error) {
|
|
||||||
data, err := io.ReadAll(r)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return data, nil
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBlobCacheStore(blobCacheStore))
|
|
||||||
|
|
||||||
url := &url.URL{
|
url := &url.URL{
|
||||||
Scheme: rawDriver,
|
Scheme: rawDriver,
|
||||||
Host: dsn.Host,
|
Host: dsn.Host,
|
||||||
|
@ -133,34 +90,89 @@ func blobStoreFactory(dsn *url.URL) (storage.BlobStore, error) {
|
||||||
return store, nil
|
return store, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var errNotFound = errors.New("not found")
|
type cacheLogger struct{}
|
||||||
|
|
||||||
func parseString(query *url.Values, name string) (string, error) {
|
func (l *cacheLogger) Printf(format string, v ...interface{}) {
|
||||||
value := query.Get(name)
|
logger.Debug(context.Background(), fmt.Sprintf(format, v...))
|
||||||
if value != "" {
|
|
||||||
query.Del(name)
|
|
||||||
return value, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return "", errors.WithStack(errNotFound)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseByteSize(query *url.Values, name string) (bytesize.ByteSize, error) {
|
var _ bigcache.Logger = &cacheLogger{}
|
||||||
rawValue := query.Get(name)
|
|
||||||
if rawValue != "" {
|
|
||||||
query.Del(name)
|
|
||||||
|
|
||||||
value, err := bytesize.Parse(rawValue)
|
func parseBigCacheConfig(query *url.Values, cacheTTL time.Duration) (*bigcache.Config, error) {
|
||||||
if err != nil {
|
config := bigcache.DefaultConfig(cacheTTL)
|
||||||
return 0, errors.Wrapf(err, "could not parse url parameter '%s'", name)
|
config.Logger = &cacheLogger{}
|
||||||
|
|
||||||
|
hardMaxCacheSize, err := parseInt(query, "bigCacheHardMaxCacheSize")
|
||||||
|
if err != nil {
|
||||||
|
if !errors.Is(err, errNotFound) {
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return value, nil
|
hardMaxCacheSize = int64(config.HardMaxCacheSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0, errors.WithStack(errNotFound)
|
config.HardMaxCacheSize = int(hardMaxCacheSize)
|
||||||
|
|
||||||
|
maxEntriesInWindow, err := parseInt(query, "bigCacheMaxEntriesInWindow")
|
||||||
|
if err != nil {
|
||||||
|
if !errors.Is(err, errNotFound) {
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
maxEntriesInWindow = int64(config.MaxEntriesInWindow)
|
||||||
|
}
|
||||||
|
|
||||||
|
config.MaxEntriesInWindow = int(maxEntriesInWindow)
|
||||||
|
|
||||||
|
shards, err := parseInt(query, "bigCacheShards")
|
||||||
|
if err != nil {
|
||||||
|
if !errors.Is(err, errNotFound) {
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
shards = int64(config.Shards)
|
||||||
|
}
|
||||||
|
|
||||||
|
config.Shards = int(shards)
|
||||||
|
|
||||||
|
maxEntrySize, err := parseInt(query, "bigCacheMaxEntrySize")
|
||||||
|
if err != nil {
|
||||||
|
if !errors.Is(err, errNotFound) {
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
maxEntrySize = int64(config.MaxEntrySize)
|
||||||
|
}
|
||||||
|
|
||||||
|
config.MaxEntrySize = int(maxEntrySize)
|
||||||
|
|
||||||
|
cleanWindow, err := parseDuration(query, "bigCacheCleanWindow")
|
||||||
|
if err != nil {
|
||||||
|
if !errors.Is(err, errNotFound) {
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cleanWindow = config.CleanWindow
|
||||||
|
}
|
||||||
|
|
||||||
|
config.CleanWindow = cleanWindow
|
||||||
|
|
||||||
|
lifeWindow, err := parseDuration(query, "bigCacheLifeWindow")
|
||||||
|
if err != nil {
|
||||||
|
if !errors.Is(err, errNotFound) {
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
lifeWindow = config.LifeWindow
|
||||||
|
}
|
||||||
|
|
||||||
|
config.LifeWindow = lifeWindow
|
||||||
|
|
||||||
|
return &config, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var errNotFound = errors.New("not found")
|
||||||
|
|
||||||
func parseInt(query *url.Values, name string) (int64, error) {
|
func parseInt(query *url.Values, name string) (int64, error) {
|
||||||
rawValue := query.Get(name)
|
rawValue := query.Get(name)
|
||||||
if rawValue != "" {
|
if rawValue != "" {
|
||||||
|
@ -192,73 +204,3 @@ func parseDuration(query *url.Values, name string) (time.Duration, error) {
|
||||||
|
|
||||||
return 0, errors.WithStack(errNotFound)
|
return 0, errors.WithStack(errNotFound)
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
|
||||||
storeTypeFS string = "fs"
|
|
||||||
storeTypeMemory string = "memory"
|
|
||||||
)
|
|
||||||
|
|
||||||
func parseCacheStore[K comparable, V any](query *url.Values, prefix string, optionFuncs ...any) (lfu.Store[K, V], error) {
|
|
||||||
storeTypeParam := prefix + "Type"
|
|
||||||
storeType, err := parseString(query, storeTypeParam)
|
|
||||||
if err != nil {
|
|
||||||
if errors.Is(err, errNotFound) {
|
|
||||||
storeType = storeTypeMemory
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
switch storeType {
|
|
||||||
case storeTypeFS:
|
|
||||||
store, err := parseFSCacheStore[K, V](query, prefix, optionFuncs...)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return store, nil
|
|
||||||
|
|
||||||
case storeTypeMemory:
|
|
||||||
store, err := parseMemoryCacheStore[K, V](query, prefix, optionFuncs...)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return store, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, errors.Errorf("unexpected store type value '%s' for parameter '%s'", storeType, storeTypeParam)
|
|
||||||
}
|
|
||||||
|
|
||||||
func parseFSCacheStore[K comparable, V any](query *url.Values, prefix string, optionFuncs ...any) (*fs.Store[K, V], error) {
|
|
||||||
baseDirParam := prefix + "BaseDir"
|
|
||||||
baseDir, err := parseString(query, baseDirParam)
|
|
||||||
if err != nil {
|
|
||||||
if errors.Is(err, errNotFound) {
|
|
||||||
return nil, errors.Wrapf(err, "missing required url parameter '%s'", baseDirParam)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
funcs := make([]fs.OptionsFunc[K, V], 0)
|
|
||||||
|
|
||||||
for _, anyFn := range optionFuncs {
|
|
||||||
fn, ok := anyFn.(fs.OptionsFunc[K, V])
|
|
||||||
if !ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
funcs = append(funcs, fn)
|
|
||||||
}
|
|
||||||
|
|
||||||
store := fs.NewStore[K, V](baseDir, funcs...)
|
|
||||||
|
|
||||||
if err := store.Clear(); err != nil {
|
|
||||||
return nil, errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return store, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func parseMemoryCacheStore[K comparable, V any](query *url.Values, prefix string, optionFuncs ...any) (*memory.Store[K, V], error) {
|
|
||||||
return memory.NewStore[K, V](), nil
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,349 +0,0 @@
|
||||||
package lfu
|
|
||||||
|
|
||||||
import (
|
|
||||||
"slices"
|
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
ErrNotFound = errors.New("not found")
|
|
||||||
ErrSizeExceedCapacity = errors.New("size exceed capacity")
|
|
||||||
errExpired = errors.New("expired")
|
|
||||||
)
|
|
||||||
|
|
||||||
type Cache[K comparable, V any] struct {
|
|
||||||
index *Map[K, *cacheItem[K, V]]
|
|
||||||
freqs *List[*frequencyItem[K, V]]
|
|
||||||
|
|
||||||
size atomic.Int32
|
|
||||||
|
|
||||||
capacity int
|
|
||||||
store Store[K, V]
|
|
||||||
getValueSize GetValueSizeFunc[V]
|
|
||||||
sync *Synchronizer[K]
|
|
||||||
|
|
||||||
log LogFunc
|
|
||||||
ttl time.Duration
|
|
||||||
}
|
|
||||||
|
|
||||||
type cacheItem[K any, V any] struct {
|
|
||||||
key K
|
|
||||||
size int
|
|
||||||
time atomic.Int64
|
|
||||||
frequencyParent *Element[*frequencyItem[K, V]]
|
|
||||||
}
|
|
||||||
|
|
||||||
func (i *cacheItem[K, V]) Expired(ttl time.Duration) bool {
|
|
||||||
if ttl == 0 {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
itemTime := time.Unix(i.time.Load(), 0)
|
|
||||||
|
|
||||||
// If item has expired, mark it as not found
|
|
||||||
return itemTime.Add(ttl).Before(time.Now())
|
|
||||||
}
|
|
||||||
|
|
||||||
func (i *cacheItem[K, V]) Refresh() {
|
|
||||||
i.time.Store(time.Now().Unix())
|
|
||||||
}
|
|
||||||
|
|
||||||
func newCacheItem[K any, V any](key K, size int) *cacheItem[K, V] {
|
|
||||||
item := &cacheItem[K, V]{
|
|
||||||
key: key,
|
|
||||||
size: size,
|
|
||||||
}
|
|
||||||
item.time.Store(time.Now().Unix())
|
|
||||||
return item
|
|
||||||
}
|
|
||||||
|
|
||||||
type frequencyItem[K any, V any] struct {
|
|
||||||
entries *Map[*cacheItem[K, V], struct{}]
|
|
||||||
freq int
|
|
||||||
}
|
|
||||||
|
|
||||||
func newFrequencyItem[K any, V any]() *frequencyItem[K, V] {
|
|
||||||
frequencyItem := &frequencyItem[K, V]{}
|
|
||||||
frequencyItem.entries = NewMap[*cacheItem[K, V], struct{}]()
|
|
||||||
return frequencyItem
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Cache[K, V]) Set(key K, value V) error {
|
|
||||||
newItemSize, err := c.getValueSize(value)
|
|
||||||
if err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
c.log("setting '%v' (size: %d)", key, newItemSize)
|
|
||||||
|
|
||||||
if newItemSize > int(c.capacity) {
|
|
||||||
return errors.Wrapf(ErrSizeExceedCapacity, "item size '%d' exceed cache total capacity of '%v'", newItemSize, c.capacity)
|
|
||||||
}
|
|
||||||
|
|
||||||
var sizeDelta int
|
|
||||||
|
|
||||||
err = c.sync.WriteTx(key, func() error {
|
|
||||||
if err := c.store.Set(key, value); err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
item, ok := c.index.Get(key)
|
|
||||||
if ok {
|
|
||||||
oldItemSize := item.size
|
|
||||||
sizeDelta = -int(oldItemSize) + newItemSize
|
|
||||||
item.Refresh()
|
|
||||||
} else {
|
|
||||||
item = newCacheItem[K, V](key, newItemSize)
|
|
||||||
c.index.Set(key, item)
|
|
||||||
sizeDelta = newItemSize
|
|
||||||
}
|
|
||||||
|
|
||||||
c.size.Add(int32(sizeDelta))
|
|
||||||
c.increment(item)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Eviction, if needed
|
|
||||||
if err := c.Evict(key); err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Cache[K, V]) Get(key K) (V, error) {
|
|
||||||
var value V
|
|
||||||
err := c.sync.ReadTx(key, func(upgrade func(func())) error {
|
|
||||||
c.log("getting '%v'", key)
|
|
||||||
|
|
||||||
e, ok := c.index.Get(key)
|
|
||||||
if !ok {
|
|
||||||
return errors.WithStack(ErrNotFound)
|
|
||||||
}
|
|
||||||
|
|
||||||
if e.Expired(c.ttl) {
|
|
||||||
return errors.WithStack(errExpired)
|
|
||||||
}
|
|
||||||
|
|
||||||
v, err := c.store.Get(key)
|
|
||||||
if err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
upgrade(func() {
|
|
||||||
c.increment(e)
|
|
||||||
})
|
|
||||||
|
|
||||||
value = v
|
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
if errors.Is(err, errExpired) {
|
|
||||||
if err := c.Delete(key); err != nil {
|
|
||||||
return *new(V), errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return *new(V), errors.WithStack(ErrNotFound)
|
|
||||||
}
|
|
||||||
|
|
||||||
return *new(V), errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return value, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Cache[K, V]) Delete(key K) error {
|
|
||||||
err := c.sync.WriteTx(key, func() error {
|
|
||||||
c.log("deleting '%v'", key)
|
|
||||||
|
|
||||||
item, exists := c.index.Get(key)
|
|
||||||
if !exists {
|
|
||||||
return errors.WithStack(ErrNotFound)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := c.store.Delete(key); err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
c.size.Add(-int32(item.size))
|
|
||||||
|
|
||||||
c.remove(item.frequencyParent, item)
|
|
||||||
c.index.Delete(key)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Cache[K, V]) Evict(skipped ...K) error {
|
|
||||||
exceed, delta := c.atCapacity()
|
|
||||||
if exceed && delta > 0 {
|
|
||||||
if err := c.evict(delta, skipped...); err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Cache[K, V]) Len() int {
|
|
||||||
return c.index.Len()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Cache[K, V]) Size() int {
|
|
||||||
return int(c.size.Load())
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Cache[K, V]) Capacity() int {
|
|
||||||
return c.capacity
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Cache[K, V]) increment(item *cacheItem[K, V]) {
|
|
||||||
currentFrequencyElement := item.frequencyParent
|
|
||||||
var nextFrequencyAmount int
|
|
||||||
var nextFrequencyElement *Element[*frequencyItem[K, V]]
|
|
||||||
|
|
||||||
if currentFrequencyElement == nil {
|
|
||||||
nextFrequencyAmount = 1
|
|
||||||
nextFrequencyElement = c.freqs.First()
|
|
||||||
} else {
|
|
||||||
atomicFrequencyItem := c.freqs.Value(currentFrequencyElement)
|
|
||||||
nextFrequencyAmount = atomicFrequencyItem.freq + 1
|
|
||||||
nextFrequencyElement = c.freqs.Next(currentFrequencyElement)
|
|
||||||
}
|
|
||||||
|
|
||||||
var nextFrequency *frequencyItem[K, V]
|
|
||||||
if nextFrequencyElement != nil {
|
|
||||||
nextFrequency = c.freqs.Value(nextFrequencyElement)
|
|
||||||
}
|
|
||||||
|
|
||||||
if nextFrequencyElement == nil || nextFrequency == nil || nextFrequency.freq != nextFrequencyAmount {
|
|
||||||
newFrequencyItem := newFrequencyItem[K, V]()
|
|
||||||
newFrequencyItem.freq = nextFrequencyAmount
|
|
||||||
|
|
||||||
if currentFrequencyElement == nil {
|
|
||||||
nextFrequencyElement = c.freqs.PushFront(newFrequencyItem)
|
|
||||||
} else {
|
|
||||||
nextFrequencyElement = c.freqs.InsertValueAfter(newFrequencyItem, currentFrequencyElement)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
item.frequencyParent = nextFrequencyElement
|
|
||||||
|
|
||||||
nextFrequency = c.freqs.Value(nextFrequencyElement)
|
|
||||||
nextFrequency.entries.Set(item, struct{}{})
|
|
||||||
|
|
||||||
if currentFrequencyElement != nil {
|
|
||||||
c.remove(currentFrequencyElement, item)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Cache[K, V]) remove(listItem *Element[*frequencyItem[K, V]], item *cacheItem[K, V]) {
|
|
||||||
entries := c.freqs.Value(listItem).entries
|
|
||||||
|
|
||||||
entries.Delete(item)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Cache[K, V]) atCapacity() (bool, int) {
|
|
||||||
size, capacity := c.Size(), c.Capacity()
|
|
||||||
c.log("cache stats: %d/%d", size, capacity)
|
|
||||||
return size >= capacity, size - capacity
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Cache[K, V]) evict(total int, skipped ...K) error {
|
|
||||||
if total == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
frequencyElement := c.freqs.First()
|
|
||||||
if frequencyElement == nil {
|
|
||||||
c.log("no frequency element")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
for evicted := 0; evicted < total; {
|
|
||||||
c.log("running eviction: [to_evict:%d, evicted: %d]", total, evicted)
|
|
||||||
|
|
||||||
c.log("first frequency element %p", frequencyElement)
|
|
||||||
|
|
||||||
frequencyItem := c.freqs.Value(frequencyElement)
|
|
||||||
if frequencyItem == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
entries := frequencyItem.entries
|
|
||||||
|
|
||||||
if entries.Len() == 0 {
|
|
||||||
c.log("no frequency entries")
|
|
||||||
frequencyElement = c.freqs.Next(frequencyElement)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
var rangeErr error
|
|
||||||
entries.Range(func(key, v any) bool {
|
|
||||||
if evicted >= total {
|
|
||||||
c.log("evicted enough (%d >= %d), stopping", evicted, total)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
entry, _ := key.(*cacheItem[K, V])
|
|
||||||
|
|
||||||
if slices.Contains(skipped, entry.key) {
|
|
||||||
c.log("skipping key '%v'", entry.key)
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := c.Delete(entry.key); err != nil {
|
|
||||||
if errors.Is(err, ErrNotFound) {
|
|
||||||
c.log("key '%s' not found", entry.key)
|
|
||||||
// Cleanup obsolete frequency
|
|
||||||
c.remove(frequencyElement, entry)
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
rangeErr = errors.WithStack(err)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
c.log("evicted key '%v' (size: %d)", entry.key, entry.size)
|
|
||||||
|
|
||||||
evicted += int(entry.size)
|
|
||||||
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
if rangeErr != nil {
|
|
||||||
return errors.WithStack(rangeErr)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewCache[K comparable, V any](store Store[K, V], funcs ...OptionsFunc[K, V]) *Cache[K, V] {
|
|
||||||
opts := DefaultOptions[K, V](funcs...)
|
|
||||||
|
|
||||||
cache := &Cache[K, V]{
|
|
||||||
index: NewMap[K, *cacheItem[K, V]](),
|
|
||||||
freqs: NewList[*frequencyItem[K, V]](),
|
|
||||||
capacity: opts.Capacity,
|
|
||||||
store: store,
|
|
||||||
getValueSize: opts.GetValueSize,
|
|
||||||
sync: NewSynchronizer[K](),
|
|
||||||
log: opts.Log,
|
|
||||||
ttl: opts.TTL,
|
|
||||||
}
|
|
||||||
|
|
||||||
return cache
|
|
||||||
}
|
|
|
@ -1,37 +0,0 @@
|
||||||
package fs
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"io"
|
|
||||||
"path/filepath"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu"
|
|
||||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu/testsuite"
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestCacheWithFSStore(t *testing.T) {
|
|
||||||
testsuite.TestCacheWithStore(t, func(testName string) lfu.Store[string, string] {
|
|
||||||
dir := filepath.Join("testdata", "testsuite", testName)
|
|
||||||
store := NewStore[string, string](dir,
|
|
||||||
WithMarshalValue[string, string](func(value string) (io.Reader, error) {
|
|
||||||
return bytes.NewBuffer([]byte(value)), nil
|
|
||||||
}),
|
|
||||||
WithUnmarshalValue[string, string](func(r io.Reader) (string, error) {
|
|
||||||
data, err := io.ReadAll(r)
|
|
||||||
if err != nil {
|
|
||||||
return "", errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return string(data), nil
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
|
|
||||||
if err := store.Clear(); err != nil {
|
|
||||||
panic(errors.WithStack(err))
|
|
||||||
}
|
|
||||||
|
|
||||||
return store
|
|
||||||
})
|
|
||||||
}
|
|
|
@ -1,19 +0,0 @@
|
||||||
package fs
|
|
||||||
|
|
||||||
import (
|
|
||||||
"strconv"
|
|
||||||
|
|
||||||
"github.com/mitchellh/hashstructure/v2"
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
)
|
|
||||||
|
|
||||||
func DefaultGetPath[K comparable](key K) ([]string, error) {
|
|
||||||
uintHash, err := hashstructure.Hash(key, hashstructure.FormatV2, nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
hash := strconv.FormatUint(uintHash, 16)
|
|
||||||
|
|
||||||
return []string{hash}, nil
|
|
||||||
}
|
|
|
@ -1,31 +0,0 @@
|
||||||
package fs
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"encoding/gob"
|
|
||||||
"io"
|
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
)
|
|
||||||
|
|
||||||
func DefaultMarshalValue[V any](value V) (io.Reader, error) {
|
|
||||||
var buf bytes.Buffer
|
|
||||||
encoder := gob.NewEncoder(&buf)
|
|
||||||
|
|
||||||
if err := encoder.Encode(value); err != nil {
|
|
||||||
return nil, errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &buf, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func DefaultUnmarshalValue[V any](d io.Reader) (V, error) {
|
|
||||||
var value V
|
|
||||||
encoder := gob.NewDecoder(d)
|
|
||||||
|
|
||||||
if err := encoder.Decode(&value); err != nil {
|
|
||||||
return *new(V), errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return value, nil
|
|
||||||
}
|
|
|
@ -1,45 +0,0 @@
|
||||||
package fs
|
|
||||||
|
|
||||||
import "io"
|
|
||||||
|
|
||||||
type GetPathFunc[K comparable] func(key K) ([]string, error)
|
|
||||||
type MarshalValueFunc[V any] func(value V) (io.Reader, error)
|
|
||||||
type UnmarshalValueFunc[V any] func(r io.Reader) (V, error)
|
|
||||||
|
|
||||||
type Options[K comparable, V any] struct {
|
|
||||||
GetPath GetPathFunc[K]
|
|
||||||
MarshalValue MarshalValueFunc[V]
|
|
||||||
UnmarshalValue UnmarshalValueFunc[V]
|
|
||||||
}
|
|
||||||
|
|
||||||
type OptionsFunc[K comparable, V any] func(opts *Options[K, V])
|
|
||||||
|
|
||||||
func DefaultOptions[K comparable, V any](funcs ...OptionsFunc[K, V]) *Options[K, V] {
|
|
||||||
opts := &Options[K, V]{
|
|
||||||
GetPath: DefaultGetPath[K],
|
|
||||||
MarshalValue: DefaultMarshalValue[V],
|
|
||||||
UnmarshalValue: DefaultUnmarshalValue[V],
|
|
||||||
}
|
|
||||||
for _, fn := range funcs {
|
|
||||||
fn(opts)
|
|
||||||
}
|
|
||||||
return opts
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithGetPath[K comparable, V any](getKeyHash GetPathFunc[K]) OptionsFunc[K, V] {
|
|
||||||
return func(opts *Options[K, V]) {
|
|
||||||
opts.GetPath = getKeyHash
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithMarshalValue[K comparable, V any](marshalValue MarshalValueFunc[V]) OptionsFunc[K, V] {
|
|
||||||
return func(opts *Options[K, V]) {
|
|
||||||
opts.MarshalValue = marshalValue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithUnmarshalValue[K comparable, V any](unmarshalValue UnmarshalValueFunc[V]) OptionsFunc[K, V] {
|
|
||||||
return func(opts *Options[K, V]) {
|
|
||||||
opts.UnmarshalValue = unmarshalValue
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,165 +0,0 @@
|
||||||
package fs
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
|
|
||||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu"
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Store[K comparable, V any] struct {
|
|
||||||
baseDir string
|
|
||||||
getPath GetPathFunc[K]
|
|
||||||
marshalValue MarshalValueFunc[V]
|
|
||||||
unmarshalValue UnmarshalValueFunc[V]
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete implements Store.
|
|
||||||
func (s *Store[K, V]) Delete(key K) error {
|
|
||||||
path, err := s.getEntryPath(key)
|
|
||||||
if err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := os.Remove(path); err != nil && !errors.Is(err, os.ErrNotExist) {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get implements Store.
|
|
||||||
func (s *Store[K, V]) Get(key K) (V, error) {
|
|
||||||
path, err := s.getEntryPath(key)
|
|
||||||
if err != nil {
|
|
||||||
return *new(V), errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
value, err := s.readValue(path)
|
|
||||||
if err != nil {
|
|
||||||
if errors.Is(err, os.ErrNotExist) {
|
|
||||||
return *new(V), errors.WithStack(lfu.ErrNotFound)
|
|
||||||
}
|
|
||||||
|
|
||||||
return *new(V), errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return value, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set implements Store.
|
|
||||||
func (s *Store[K, V]) Set(key K, value V) error {
|
|
||||||
path, err := s.getEntryPath(key)
|
|
||||||
if err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := s.writeValue(path, value); err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Store[K, V]) Clear() error {
|
|
||||||
if err := os.RemoveAll(s.baseDir); err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Store[K, V]) getEntryPath(k K) (string, error) {
|
|
||||||
path, err := s.getPath(k)
|
|
||||||
if err != nil {
|
|
||||||
return "", errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
path = append([]string{s.baseDir}, path...)
|
|
||||||
return filepath.Join(path...), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Store[K, V]) writeValue(path string, value V) error {
|
|
||||||
fi, err := os.Stat(path)
|
|
||||||
if err == nil && !fi.Mode().IsRegular() {
|
|
||||||
return fmt.Errorf("%s already exists and is not a regular file", path)
|
|
||||||
}
|
|
||||||
|
|
||||||
dir := filepath.Dir(path)
|
|
||||||
|
|
||||||
if err := os.MkdirAll(dir, 0750); err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
f, err := os.CreateTemp(dir, filepath.Base(path)+".tmp")
|
|
||||||
if err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
tmpName := f.Name()
|
|
||||||
defer func() {
|
|
||||||
if err != nil {
|
|
||||||
f.Close()
|
|
||||||
os.Remove(tmpName)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
reader, err := s.marshalValue(value)
|
|
||||||
if err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err := io.Copy(f, reader); err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := f.Sync(); err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := f.Close(); err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := os.Rename(tmpName, path); err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Store[K, V]) readValue(path string) (V, error) {
|
|
||||||
file, err := os.Open(path)
|
|
||||||
if err != nil {
|
|
||||||
return *new(V), errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
if err := file.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
|
|
||||||
panic(errors.WithStack(err))
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
value, err := s.unmarshalValue(file)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return *new(V), errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return value, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewStore[K comparable, V any](baseDir string, funcs ...OptionsFunc[K, V]) *Store[K, V] {
|
|
||||||
opts := DefaultOptions[K, V](funcs...)
|
|
||||||
return &Store[K, V]{
|
|
||||||
baseDir: baseDir,
|
|
||||||
getPath: opts.GetPath,
|
|
||||||
unmarshalValue: opts.UnmarshalValue,
|
|
||||||
marshalValue: opts.MarshalValue,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ lfu.Store[string, int] = &Store[string, int]{}
|
|
|
@ -1,2 +0,0 @@
|
||||||
*
|
|
||||||
!.gitignore
|
|
|
@ -1,203 +0,0 @@
|
||||||
package lfu
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync/atomic"
|
|
||||||
)
|
|
||||||
|
|
||||||
type List[T any] struct {
|
|
||||||
root *Element[T]
|
|
||||||
len atomic.Int32
|
|
||||||
sync *Synchronizer[*Element[T]]
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *List[T]) First() *Element[T] {
|
|
||||||
if l.Len() == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
var next *Element[T]
|
|
||||||
l.sync.ReadTx(l.root, func(upgrade func(func())) error {
|
|
||||||
next = l.root.next
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
return next
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *List[T]) Last() *Element[T] {
|
|
||||||
if l.Len() == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
var prev *Element[T]
|
|
||||||
l.sync.ReadTx(l.root, func(upgrade func(func())) error {
|
|
||||||
prev = l.root.prev
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
return prev
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *List[T]) Prev(e *Element[T]) *Element[T] {
|
|
||||||
var prev *Element[T]
|
|
||||||
l.sync.ReadTx(e, func(upgrade func(func())) error {
|
|
||||||
prev = e.prev
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
return prev
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *List[T]) Next(e *Element[T]) *Element[T] {
|
|
||||||
var next *Element[T]
|
|
||||||
l.sync.ReadTx(e, func(upgrade func(func())) error {
|
|
||||||
next = e.next
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
return next
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *List[T]) Value(e *Element[T]) T {
|
|
||||||
var value T
|
|
||||||
l.sync.ReadTx(e, func(upgrade func(func())) error {
|
|
||||||
value = e.value
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
return value
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *List[T]) PushFront(v T) *Element[T] {
|
|
||||||
return l.InsertValueAfter(v, l.root)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *List[T]) PushBack(v T) *Element[T] {
|
|
||||||
return l.InsertValueAfter(v, l.root)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *List[T]) Remove(e *Element[T]) {
|
|
||||||
l.remove(e)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *List[T]) Len() int {
|
|
||||||
return int(l.len.Load())
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *List[T]) insertAfter(e *Element[T], at *Element[T]) *Element[T] {
|
|
||||||
l.sync.ReadTx(e, func(upgrade func(fn func())) error {
|
|
||||||
var next *Element[T]
|
|
||||||
l.sync.ReadTx(at, func(upgrade func(func())) error {
|
|
||||||
next = at.next
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
upgrade(func() {
|
|
||||||
e.prev = at
|
|
||||||
e.next = next
|
|
||||||
e.list = l
|
|
||||||
})
|
|
||||||
|
|
||||||
if e.prev != nil {
|
|
||||||
l.sync.WriteTx(e.prev, func() error {
|
|
||||||
e.prev.next = e
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
if e.next != nil {
|
|
||||||
l.sync.WriteTx(e.next, func() error {
|
|
||||||
e.next.prev = e
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
l.len.Add(1)
|
|
||||||
|
|
||||||
return e
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *List[T]) InsertValueAfter(v T, at *Element[T]) *Element[T] {
|
|
||||||
e := NewElement[T](v)
|
|
||||||
return l.insertAfter(e, at)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *List[T]) remove(e *Element[T]) {
|
|
||||||
if e == nil && e == l.root {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
l.sync.ReadTx(e, func(upgrade func(fn func())) error {
|
|
||||||
if e.prev != nil {
|
|
||||||
if e.prev == e {
|
|
||||||
upgrade(func() {
|
|
||||||
e.prev.next = e.next
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
l.sync.WriteTx(e.prev, func() error {
|
|
||||||
e.prev.next = e.next
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if e.next != nil {
|
|
||||||
if e.next == e {
|
|
||||||
upgrade(func() {
|
|
||||||
e.next.prev = e.prev
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
l.sync.WriteTx(e.next, func() error {
|
|
||||||
e.next.prev = e.prev
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
upgrade(func() {
|
|
||||||
e.next = nil
|
|
||||||
e.prev = nil
|
|
||||||
e.list = nil
|
|
||||||
})
|
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
l.sync.Remove(e)
|
|
||||||
l.len.Add(-1)
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewList[T any]() *List[T] {
|
|
||||||
root := NewElement(*new(T))
|
|
||||||
root.next = root
|
|
||||||
root.prev = root
|
|
||||||
|
|
||||||
list := &List[T]{
|
|
||||||
sync: NewSynchronizer[*Element[T]](),
|
|
||||||
}
|
|
||||||
|
|
||||||
root.list = list
|
|
||||||
list.root = root
|
|
||||||
|
|
||||||
return list
|
|
||||||
}
|
|
||||||
|
|
||||||
type Element[T any] struct {
|
|
||||||
prev *Element[T]
|
|
||||||
next *Element[T]
|
|
||||||
list *List[T]
|
|
||||||
value T
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewElement[T any](v T) *Element[T] {
|
|
||||||
element := &Element[T]{
|
|
||||||
prev: nil,
|
|
||||||
next: nil,
|
|
||||||
list: nil,
|
|
||||||
value: v,
|
|
||||||
}
|
|
||||||
return element
|
|
||||||
}
|
|
|
@ -1,67 +0,0 @@
|
||||||
package lfu
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Map[K comparable, V any] struct {
|
|
||||||
size atomic.Int32
|
|
||||||
inner sync.Map
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Map[K, V]) Get(key K) (V, bool) {
|
|
||||||
raw, exists := m.inner.Load(key)
|
|
||||||
if !exists {
|
|
||||||
return *new(V), false
|
|
||||||
}
|
|
||||||
|
|
||||||
value, ok := raw.(V)
|
|
||||||
if !ok {
|
|
||||||
return *new(V), false
|
|
||||||
}
|
|
||||||
|
|
||||||
return value, true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Map[K, V]) GetOrSet(key K, defaultValue V) (V, bool) {
|
|
||||||
raw, loaded := m.inner.LoadOrStore(key, defaultValue)
|
|
||||||
if !loaded {
|
|
||||||
m.size.Add(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
value, ok := raw.(V)
|
|
||||||
if !ok {
|
|
||||||
return *new(V), loaded
|
|
||||||
}
|
|
||||||
|
|
||||||
return value, loaded
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Map[K, V]) Set(key K, value V) {
|
|
||||||
_, loaded := m.inner.Swap(key, value)
|
|
||||||
if !loaded {
|
|
||||||
m.size.Add(1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Map[K, V]) Delete(key K) {
|
|
||||||
_, existed := m.inner.LoadAndDelete(key)
|
|
||||||
if existed {
|
|
||||||
m.size.Add(-1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Map[K, V]) Range(fn func(key, value any) bool) {
|
|
||||||
m.inner.Range(fn)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Map[K, V]) Len() int {
|
|
||||||
return int(m.size.Load())
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewMap[K comparable, V any]() *Map[K, V] {
|
|
||||||
return &Map[K, V]{
|
|
||||||
inner: sync.Map{},
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,14 +0,0 @@
|
||||||
package memory
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu"
|
|
||||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu/testsuite"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestCacheWithMemoryStore(t *testing.T) {
|
|
||||||
testsuite.TestCacheWithStore(t, func(testName string) lfu.Store[string, string] {
|
|
||||||
return NewStore[string, string]()
|
|
||||||
})
|
|
||||||
}
|
|
|
@ -1,40 +0,0 @@
|
||||||
package memory
|
|
||||||
|
|
||||||
import (
|
|
||||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu"
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Store[K comparable, V any] struct {
|
|
||||||
index *lfu.Map[K, V]
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete implements Store.
|
|
||||||
func (s *Store[K, V]) Delete(key K) error {
|
|
||||||
s.index.Delete(key)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get implements Store.
|
|
||||||
func (s *Store[K, V]) Get(key K) (V, error) {
|
|
||||||
value, exists := s.index.Get(key)
|
|
||||||
if !exists {
|
|
||||||
return *new(V), errors.WithStack(lfu.ErrNotFound)
|
|
||||||
}
|
|
||||||
|
|
||||||
return value, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set implements Store.
|
|
||||||
func (s *Store[K, V]) Set(key K, value V) error {
|
|
||||||
s.index.Set(key, value)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewStore[K comparable, V any]() *Store[K, V] {
|
|
||||||
return &Store[K, V]{
|
|
||||||
index: lfu.NewMap[K, V](),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ lfu.Store[string, int] = &Store[string, int]{}
|
|
|
@ -1,57 +0,0 @@
|
||||||
package lfu
|
|
||||||
|
|
||||||
import "time"
|
|
||||||
|
|
||||||
type GetValueSizeFunc[V any] func(value V) (int, error)
|
|
||||||
|
|
||||||
type LogFunc func(format string, values ...any)
|
|
||||||
|
|
||||||
func DefaultLogFunc(format string, values ...any) {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
type Options[K comparable, V any] struct {
|
|
||||||
GetValueSize GetValueSizeFunc[V]
|
|
||||||
Capacity int
|
|
||||||
Log LogFunc
|
|
||||||
TTL time.Duration
|
|
||||||
}
|
|
||||||
|
|
||||||
type OptionsFunc[K comparable, V any] func(opts *Options[K, V])
|
|
||||||
|
|
||||||
func DefaultOptions[K comparable, V any](funcs ...OptionsFunc[K, V]) *Options[K, V] {
|
|
||||||
opts := &Options[K, V]{
|
|
||||||
GetValueSize: DefaultGetValueSize[V],
|
|
||||||
Capacity: 100,
|
|
||||||
Log: DefaultLogFunc,
|
|
||||||
TTL: 0,
|
|
||||||
}
|
|
||||||
for _, fn := range funcs {
|
|
||||||
fn(opts)
|
|
||||||
}
|
|
||||||
return opts
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithCapacity[K comparable, V any](capacity int) OptionsFunc[K, V] {
|
|
||||||
return func(opts *Options[K, V]) {
|
|
||||||
opts.Capacity = capacity
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithGetValueSize[K comparable, V any](getValueSize GetValueSizeFunc[V]) OptionsFunc[K, V] {
|
|
||||||
return func(opts *Options[K, V]) {
|
|
||||||
opts.GetValueSize = getValueSize
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithLog[K comparable, V any](fn LogFunc) OptionsFunc[K, V] {
|
|
||||||
return func(opts *Options[K, V]) {
|
|
||||||
opts.Log = fn
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithTTL[K comparable, V any](ttl time.Duration) OptionsFunc[K, V] {
|
|
||||||
return func(opts *Options[K, V]) {
|
|
||||||
opts.TTL = ttl
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,41 +0,0 @@
|
||||||
package lfu
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Measurable interface {
|
|
||||||
Size() (int, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
func DefaultGetValueSize[V any](value V) (int, error) {
|
|
||||||
switch v := any(value).(type) {
|
|
||||||
case int:
|
|
||||||
return v, nil
|
|
||||||
case int8:
|
|
||||||
return int(v), nil
|
|
||||||
case int32:
|
|
||||||
return int(v), nil
|
|
||||||
case int64:
|
|
||||||
return int(v), nil
|
|
||||||
case float32:
|
|
||||||
return int(v), nil
|
|
||||||
case float64:
|
|
||||||
return int(v), nil
|
|
||||||
case []byte:
|
|
||||||
return len(v), nil
|
|
||||||
case string:
|
|
||||||
return len(v), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if measurable, ok := any(value).(Measurable); ok {
|
|
||||||
size, err := measurable.Size()
|
|
||||||
if err != nil {
|
|
||||||
return 0, errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return size, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0, errors.Errorf("could not retrieve size of type '%T'", value)
|
|
||||||
}
|
|
|
@ -1,7 +0,0 @@
|
||||||
package lfu
|
|
||||||
|
|
||||||
type Store[K comparable, V any] interface {
|
|
||||||
Delete(key K) error
|
|
||||||
Set(key K, value V) error
|
|
||||||
Get(key K) (V, error)
|
|
||||||
}
|
|
|
@ -1,56 +0,0 @@
|
||||||
package lfu
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Synchronizer[K comparable] struct {
|
|
||||||
index *Map[K, *sync.RWMutex]
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Synchronizer[K]) Remove(key K) {
|
|
||||||
s.index.Delete(key)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Synchronizer[K]) ReadTx(key K, fn func(upgrade func(fn func())) error) error {
|
|
||||||
mutex, _ := s.index.GetOrSet(key, &sync.RWMutex{})
|
|
||||||
mutex.RLock()
|
|
||||||
defer mutex.RUnlock()
|
|
||||||
|
|
||||||
upgrade := func(fn func()) {
|
|
||||||
mutex.RUnlock()
|
|
||||||
mutex.Lock()
|
|
||||||
defer func() {
|
|
||||||
mutex.Unlock()
|
|
||||||
mutex.RLock()
|
|
||||||
}()
|
|
||||||
|
|
||||||
fn()
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := fn(upgrade); err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Synchronizer[K]) WriteTx(key K, fn func() error) error {
|
|
||||||
mutex, _ := s.index.GetOrSet(key, &sync.RWMutex{})
|
|
||||||
mutex.Lock()
|
|
||||||
defer mutex.Unlock()
|
|
||||||
|
|
||||||
if err := fn(); err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewSynchronizer[K comparable]() *Synchronizer[K] {
|
|
||||||
return &Synchronizer[K]{
|
|
||||||
index: NewMap[K, *sync.RWMutex](),
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,41 +0,0 @@
|
||||||
package testsuite
|
|
||||||
|
|
||||||
import (
|
|
||||||
"reflect"
|
|
||||||
"runtime"
|
|
||||||
"strings"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu"
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
)
|
|
||||||
|
|
||||||
type StoreFactory func(testName string) lfu.Store[string, string]
|
|
||||||
type testCase func(t *testing.T, store lfu.Store[string, string]) error
|
|
||||||
|
|
||||||
var testCases = []testCase{
|
|
||||||
testSetGetDelete,
|
|
||||||
testEviction,
|
|
||||||
testConcurrent,
|
|
||||||
testMultipleSet,
|
|
||||||
testTTL,
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCacheWithStore(t *testing.T, factory StoreFactory) {
|
|
||||||
for _, tc := range testCases {
|
|
||||||
funcName := runtime.FuncForPC(reflect.ValueOf(tc).Pointer()).Name()
|
|
||||||
funcNameParts := strings.Split(funcName, "/")
|
|
||||||
testName := funcNameParts[len(funcNameParts)-1]
|
|
||||||
func(tc testCase) {
|
|
||||||
t.Run(testName, func(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
store := factory(testName)
|
|
||||||
|
|
||||||
if err := tc(t, store); err != nil {
|
|
||||||
t.Fatalf("%+v", errors.WithStack(err))
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}(tc)
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,67 +0,0 @@
|
||||||
package testsuite
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"sync"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu"
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
)
|
|
||||||
|
|
||||||
func testConcurrent(t *testing.T, store lfu.Store[string, string]) error {
|
|
||||||
const value = "foobar"
|
|
||||||
totalKeys := 25
|
|
||||||
totalSize := len(value) * totalKeys
|
|
||||||
capacity := totalSize / 2
|
|
||||||
|
|
||||||
cache := lfu.NewCache[string, string](store,
|
|
||||||
lfu.WithCapacity[string, string](capacity),
|
|
||||||
lfu.WithLog[string, string](t.Logf),
|
|
||||||
)
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
|
|
||||||
wg.Add(totalKeys)
|
|
||||||
|
|
||||||
loops := totalKeys * 10
|
|
||||||
|
|
||||||
for i := 0; i < totalKeys; i++ {
|
|
||||||
key := fmt.Sprintf("key%d", i)
|
|
||||||
func(key string) {
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
for i := 0; i < loops; i++ {
|
|
||||||
if err := cache.Set(key, value); err != nil {
|
|
||||||
t.Errorf("%+v", errors.WithStack(err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}(key)
|
|
||||||
}
|
|
||||||
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
t.Logf("cache before final evict [capacity: %d, size: %d, len: %d]", cache.Capacity(), cache.Size(), cache.Len())
|
|
||||||
|
|
||||||
if err := cache.Evict(); err != nil {
|
|
||||||
t.Errorf("%+v", errors.WithStack(err))
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Logf("cache after final evict [capacity: %d, size: %d, len: %d]", cache.Capacity(), cache.Size(), cache.Len())
|
|
||||||
|
|
||||||
expectedLength := capacity / len(value)
|
|
||||||
if e, g := expectedLength, cache.Len(); e < g {
|
|
||||||
t.Errorf("cache.Len(): expected <= %d, got %d", e, g)
|
|
||||||
}
|
|
||||||
|
|
||||||
if cache.Size() > capacity {
|
|
||||||
t.Errorf("cache.Size(): expected <= %d, got %d", capacity, cache.Size())
|
|
||||||
}
|
|
||||||
|
|
||||||
if e, g := expectedLength*len(value), cache.Size(); e < g {
|
|
||||||
t.Errorf("cache.Size(): expected <= %d, got %d", e, g)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
|
@ -1,70 +0,0 @@
|
||||||
package testsuite
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu"
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
)
|
|
||||||
|
|
||||||
func testEviction(t *testing.T, store lfu.Store[string, string]) error {
|
|
||||||
cache := lfu.NewCache[string, string](store,
|
|
||||||
lfu.WithCapacity[string, string](10),
|
|
||||||
lfu.WithLog[string, string](t.Logf),
|
|
||||||
)
|
|
||||||
|
|
||||||
if err := cache.Set("key1", "key1"); err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := cache.Set("key2", "key2"); err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Increment frequency of key2
|
|
||||||
if _, err := cache.Get("key2"); err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if e, g := 8, cache.Size(); e != g {
|
|
||||||
t.Errorf("cache.Size(): expected '%v', got '%v'", e, g)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := cache.Set("key3", "key3"); err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Logf("cache [capacity: %d, size: %d, len: %d]", cache.Capacity(), cache.Size(), cache.Len())
|
|
||||||
|
|
||||||
_, err := cache.Get("key1")
|
|
||||||
if err == nil {
|
|
||||||
t.Errorf("expected 'key1' to be evicted")
|
|
||||||
}
|
|
||||||
|
|
||||||
if !errors.Is(err, lfu.ErrNotFound) {
|
|
||||||
t.Errorf("expected err to be 'ErrNotFound'")
|
|
||||||
}
|
|
||||||
|
|
||||||
value, err := cache.Get("key2")
|
|
||||||
if err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if e, g := "key2", value; e < g {
|
|
||||||
t.Errorf("cache.Get(\"key2\"): expected %v, got %v", e, g)
|
|
||||||
}
|
|
||||||
|
|
||||||
if e, g := cache.Capacity(), cache.Size(); e < g {
|
|
||||||
t.Errorf("cache.Size(): expected <= %d, got %d", e, g)
|
|
||||||
}
|
|
||||||
|
|
||||||
if e, g := 2, cache.Len(); e != g {
|
|
||||||
t.Errorf("cache.Len(): expected %d, got %d", e, g)
|
|
||||||
}
|
|
||||||
|
|
||||||
if cache.Size() < 0 {
|
|
||||||
t.Errorf("cache.Size(): expected value >= 0, got %d", cache.Size())
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
|
@ -1,80 +0,0 @@
|
||||||
package testsuite
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu"
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
)
|
|
||||||
|
|
||||||
func testMultipleSet(t *testing.T, store lfu.Store[string, string]) error {
|
|
||||||
const (
|
|
||||||
key = "mykey"
|
|
||||||
firstValue = "foo"
|
|
||||||
secondValue = "bar"
|
|
||||||
thirdValue = "foobar"
|
|
||||||
)
|
|
||||||
|
|
||||||
cache := lfu.NewCache[string, string](store)
|
|
||||||
|
|
||||||
if e, g := 0, cache.Size(); e != g {
|
|
||||||
t.Errorf("cache.Size(): expected '%v', got '%v'", e, g)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := cache.Set(key, firstValue); err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if e, g := len(firstValue), cache.Size(); e != g {
|
|
||||||
t.Errorf("cache.Size(): expected '%v', got '%v'", e, g)
|
|
||||||
}
|
|
||||||
|
|
||||||
retrieved, err := cache.Get(key)
|
|
||||||
if err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if e, g := firstValue, retrieved; e != g {
|
|
||||||
t.Errorf("cache.Get(key): expected '%v', got '%v'", e, g)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := cache.Set(key, secondValue); err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if e, g := len(secondValue), cache.Size(); e != g {
|
|
||||||
t.Errorf("cache.Size(): expected '%v', got '%v'", e, g)
|
|
||||||
}
|
|
||||||
|
|
||||||
retrieved, err = cache.Get(key)
|
|
||||||
if err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if e, g := secondValue, retrieved; e != g {
|
|
||||||
t.Errorf("cache.Get(key): expected '%v', got '%v'", e, g)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := cache.Set(key, thirdValue); err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if e, g := len(thirdValue), cache.Size(); e != g {
|
|
||||||
t.Errorf("cache.Size(): expected '%v', got '%v'", e, g)
|
|
||||||
}
|
|
||||||
|
|
||||||
retrieved, err = cache.Get(key)
|
|
||||||
if err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if e, g := thirdValue, retrieved; e != g {
|
|
||||||
t.Errorf("cache.Get(key): expected '%v', got '%v'", e, g)
|
|
||||||
}
|
|
||||||
|
|
||||||
if e, g := len(thirdValue), cache.Size(); e != g {
|
|
||||||
t.Errorf("cache.Size(): expected '%v', got '%v'", e, g)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
|
@ -1,66 +0,0 @@
|
||||||
package testsuite
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu"
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
)
|
|
||||||
|
|
||||||
func testSetGetDelete(t *testing.T, store lfu.Store[string, string]) error {
|
|
||||||
const (
|
|
||||||
key = "mykey"
|
|
||||||
value = "foobar"
|
|
||||||
)
|
|
||||||
|
|
||||||
cache := lfu.NewCache[string, string](store, lfu.WithCapacity[string, string](10))
|
|
||||||
|
|
||||||
if e, g := 0, cache.Size(); e != g {
|
|
||||||
t.Errorf("cache.Size(): expected '%v', got '%v'", e, g)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := cache.Set(key, value); err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if e, g := len(value), cache.Size(); e != g {
|
|
||||||
t.Errorf("cache.Size(): expected '%v', got '%v'", e, g)
|
|
||||||
}
|
|
||||||
|
|
||||||
if e, g := 1, cache.Len(); e != g {
|
|
||||||
t.Errorf("cache.Len(): expected '%v', got '%v'", e, g)
|
|
||||||
}
|
|
||||||
|
|
||||||
retrieved, err := cache.Get(key)
|
|
||||||
if err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if e, g := value, retrieved; e != g {
|
|
||||||
t.Errorf("cache.Get(key): expected '%v', got '%v'", e, g)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := cache.Delete(key); err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err := cache.Get(key); err == nil || !errors.Is(err, lfu.ErrNotFound) {
|
|
||||||
t.Errorf("cache.Get(key): err should be lfu.ErrNotFound, got '%v'", errors.WithStack(err))
|
|
||||||
}
|
|
||||||
|
|
||||||
if e, g := value, retrieved; e != g {
|
|
||||||
t.Errorf("cache.Get(key): expected '%v', got '%v'", e, g)
|
|
||||||
}
|
|
||||||
|
|
||||||
if e, g := 0, cache.Size(); e != g {
|
|
||||||
t.Errorf("cache.Size(): expected '%v', got '%v'", e, g)
|
|
||||||
}
|
|
||||||
|
|
||||||
if e, g := 0, cache.Len(); e != g {
|
|
||||||
t.Errorf("cache.Len(): expected '%v', got '%v'", e, g)
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Logf("cache [capacity: %d, size: %d, len: %d]", cache.Capacity(), cache.Size(), cache.Len())
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
|
@ -1,54 +0,0 @@
|
||||||
package testsuite
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu"
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
)
|
|
||||||
|
|
||||||
func testTTL(t *testing.T, store lfu.Store[string, string]) error {
|
|
||||||
const (
|
|
||||||
key = "mykey"
|
|
||||||
value = "foobar"
|
|
||||||
)
|
|
||||||
|
|
||||||
ttl := time.Second
|
|
||||||
|
|
||||||
cache := lfu.NewCache[string, string](store,
|
|
||||||
lfu.WithTTL[string, string](ttl),
|
|
||||||
lfu.WithCapacity[string, string](10),
|
|
||||||
)
|
|
||||||
|
|
||||||
if err := cache.Set(key, value); err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
retrieved, err := cache.Get(key)
|
|
||||||
if err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if e, g := value, retrieved; e != g {
|
|
||||||
t.Errorf("cache.Get(key): expected '%v', got '%v'", e, g)
|
|
||||||
}
|
|
||||||
|
|
||||||
time.Sleep(ttl * 2)
|
|
||||||
|
|
||||||
if _, err := cache.Get(key); !errors.Is(err, lfu.ErrNotFound) {
|
|
||||||
t.Errorf("cache.Get(key): expected err == lfu.ErrNotFound, got '%v'", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Logf("cache [capacity: %d, size: %d, len: %d]", cache.Capacity(), cache.Size(), cache.Len())
|
|
||||||
|
|
||||||
if e, g := 0, cache.Size(); e != g {
|
|
||||||
t.Errorf("cache.Size(): expected '%v', got '%v'", e, g)
|
|
||||||
}
|
|
||||||
|
|
||||||
if e, g := 0, cache.Len(); e != g {
|
|
||||||
t.Errorf("cache.Len(): expected '%v', got '%v'", e, g)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
|
@ -3,25 +3,14 @@ package cache
|
||||||
import (
|
import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
"github.com/allegro/bigcache/v3"
|
||||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu"
|
|
||||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu/memory"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type Options struct {
|
type Options struct {
|
||||||
CacheTTL time.Duration
|
CacheTTL time.Duration
|
||||||
|
BigCache bigcache.Config
|
||||||
BlobCacheStore lfu.Store[string, []byte]
|
BucketCacheSize int
|
||||||
// Maximum total size of cached data
|
|
||||||
BlobCacheSize int
|
|
||||||
|
|
||||||
BlobInfoCacheStore lfu.Store[string, storage.BlobInfo]
|
|
||||||
// Maximum number of blob infos
|
|
||||||
BlobInfoCacheSize int
|
BlobInfoCacheSize int
|
||||||
|
|
||||||
BlobBucketCacheStore lfu.Store[string, storage.BlobBucket]
|
|
||||||
// Maximum number of blob bucket
|
|
||||||
BlobBucketCacheSize int
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type OptionFunc func(opts *Options)
|
type OptionFunc func(opts *Options)
|
||||||
|
@ -29,13 +18,10 @@ type OptionFunc func(opts *Options)
|
||||||
func NewOptions(funcs ...OptionFunc) *Options {
|
func NewOptions(funcs ...OptionFunc) *Options {
|
||||||
defaultTTL := 60 * time.Minute
|
defaultTTL := 60 * time.Minute
|
||||||
opts := &Options{
|
opts := &Options{
|
||||||
CacheTTL: defaultTTL,
|
CacheTTL: defaultTTL,
|
||||||
BlobCacheStore: memory.NewStore[string, []byte](),
|
BigCache: bigcache.DefaultConfig(defaultTTL),
|
||||||
BlobCacheSize: 1e+9, // 1Gb
|
BucketCacheSize: 16,
|
||||||
BlobInfoCacheStore: memory.NewStore[string, storage.BlobInfo](),
|
BlobInfoCacheSize: 256,
|
||||||
BlobInfoCacheSize: 256,
|
|
||||||
BlobBucketCacheStore: memory.NewStore[string, storage.BlobBucket](),
|
|
||||||
BlobBucketCacheSize: 16,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, fn := range funcs {
|
for _, fn := range funcs {
|
||||||
|
@ -51,15 +37,15 @@ func WithCacheTTL(ttl time.Duration) OptionFunc {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func WithBlobBucketCacheSize(size int) OptionFunc {
|
func WithBigCacheConfig(config bigcache.Config) OptionFunc {
|
||||||
return func(opts *Options) {
|
return func(opts *Options) {
|
||||||
opts.BlobBucketCacheSize = size
|
opts.BigCache = config
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func WithBlobBucketCacheStore(store lfu.Store[string, storage.BlobBucket]) OptionFunc {
|
func WithBucketCacheSize(size int) OptionFunc {
|
||||||
return func(opts *Options) {
|
return func(opts *Options) {
|
||||||
opts.BlobBucketCacheStore = store
|
opts.BucketCacheSize = size
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -68,21 +54,3 @@ func WithBlobInfoCacheSize(size int) OptionFunc {
|
||||||
opts.BlobInfoCacheSize = size
|
opts.BlobInfoCacheSize = size
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func WithBlobInfoCacheStore(store lfu.Store[string, storage.BlobInfo]) OptionFunc {
|
|
||||||
return func(opts *Options) {
|
|
||||||
opts.BlobInfoCacheStore = store
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithBlobCacheSize(size int) OptionFunc {
|
|
||||||
return func(opts *Options) {
|
|
||||||
opts.BlobCacheSize = size
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithBlobCacheStore(store lfu.Store[string, []byte]) OptionFunc {
|
|
||||||
return func(opts *Options) {
|
|
||||||
opts.BlobCacheStore = store
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,21 +1,18 @@
|
||||||
package cache
|
package cache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
"cdr.dev/slog"
|
"github.com/allegro/bigcache/v3"
|
||||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu"
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"gitlab.com/wpetit/goweb/logger"
|
"gitlab.com/wpetit/goweb/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
type readCacher struct {
|
type readCacher struct {
|
||||||
reader io.ReadSeekCloser
|
reader io.ReadSeekCloser
|
||||||
cache *lfu.Cache[string, []byte]
|
cache *bigcache.BigCache
|
||||||
buf bytes.Buffer
|
|
||||||
key string
|
key string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -25,21 +22,6 @@ func (r *readCacher) Close() error {
|
||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := r.cache.Set(r.key, r.buf.Bytes()); err != nil {
|
|
||||||
var logErr slog.Field
|
|
||||||
if errors.Is(err, lfu.ErrSizeExceedCapacity) {
|
|
||||||
logErr = logger.E(errors.WithStack(err))
|
|
||||||
} else {
|
|
||||||
logErr = logger.CapturedE(errors.WithStack(err))
|
|
||||||
}
|
|
||||||
logger.Error(context.Background(), "could not cache buffered data",
|
|
||||||
logger.F("cacheKey", r.key),
|
|
||||||
logErr,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
r.buf.Reset()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -55,9 +37,13 @@ func (r *readCacher) Read(p []byte) (n int, err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if length > 0 {
|
if length > 0 {
|
||||||
if _, err := r.buf.Write(p[:length]); err != nil {
|
if err := r.cache.Append(r.key, p[:length]); err != nil {
|
||||||
ctx := logger.With(context.Background(), logger.F("cacheKey", r.key))
|
ctx := logger.With(context.Background(), logger.F("cacheKey", r.key))
|
||||||
logger.Error(ctx, "could not write to buffer", logger.CapturedE(errors.WithStack(err)))
|
logger.Error(ctx, "could not write to buffer", logger.CapturedE(errors.WithStack(err)))
|
||||||
|
|
||||||
|
if err := r.cache.Delete(r.key); err != nil {
|
||||||
|
logger.Error(ctx, "could not delete cache key", logger.CapturedE(errors.WithStack(err)))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue