From c4a9baf82b195e82dac6518e64b89f09fa504dbf Mon Sep 17 00:00:00 2001 From: William Petit Date: Sat, 6 Jan 2024 16:48:14 +0100 Subject: [PATCH] feat: implement lfu based cache strategy --- go.mod | 1 + go.sum | 2 + pkg/storage/driver/cache/lfu/cache.go | 262 ++++++++++++++++++ .../driver/cache/lfu/expirable/cache.go | 17 ++ pkg/storage/driver/cache/lfu/fs/cache_test.go | 37 +++ pkg/storage/driver/cache/lfu/fs/hash.go | 19 ++ pkg/storage/driver/cache/lfu/fs/marshal.go | 31 +++ pkg/storage/driver/cache/lfu/fs/options.go | 45 +++ pkg/storage/driver/cache/lfu/fs/store.go | 165 +++++++++++ .../driver/cache/lfu/fs/testdata/.gitignore | 2 + pkg/storage/driver/cache/lfu/list.go | 162 +++++++++++ pkg/storage/driver/cache/lfu/map.go | 51 ++++ .../driver/cache/lfu/memory/cache_test.go | 14 + pkg/storage/driver/cache/lfu/memory/memory.go | 40 +++ pkg/storage/driver/cache/lfu/options.go | 33 +++ pkg/storage/driver/cache/lfu/size.go | 41 +++ pkg/storage/driver/cache/lfu/store.go | 7 + .../driver/cache/lfu/testsuite/main.go | 40 +++ .../cache/lfu/testsuite/test_concurrent.go | 57 ++++ .../cache/lfu/testsuite/test_eviction.go | 53 ++++ .../cache/lfu/testsuite/test_multiple_set.go | 80 ++++++ .../lfu/testsuite/test_set_get_delete.go | 66 +++++ 22 files changed, 1225 insertions(+) create mode 100644 pkg/storage/driver/cache/lfu/cache.go create mode 100644 pkg/storage/driver/cache/lfu/expirable/cache.go create mode 100644 pkg/storage/driver/cache/lfu/fs/cache_test.go create mode 100644 pkg/storage/driver/cache/lfu/fs/hash.go create mode 100644 pkg/storage/driver/cache/lfu/fs/marshal.go create mode 100644 pkg/storage/driver/cache/lfu/fs/options.go create mode 100644 pkg/storage/driver/cache/lfu/fs/store.go create mode 100644 pkg/storage/driver/cache/lfu/fs/testdata/.gitignore create mode 100644 pkg/storage/driver/cache/lfu/list.go create mode 100644 pkg/storage/driver/cache/lfu/map.go create mode 100644 pkg/storage/driver/cache/lfu/memory/cache_test.go create mode 100644 pkg/storage/driver/cache/lfu/memory/memory.go create mode 100644 pkg/storage/driver/cache/lfu/options.go create mode 100644 pkg/storage/driver/cache/lfu/size.go create mode 100644 pkg/storage/driver/cache/lfu/store.go create mode 100644 pkg/storage/driver/cache/lfu/testsuite/main.go create mode 100644 pkg/storage/driver/cache/lfu/testsuite/test_concurrent.go create mode 100644 pkg/storage/driver/cache/lfu/testsuite/test_eviction.go create mode 100644 pkg/storage/driver/cache/lfu/testsuite/test_multiple_set.go create mode 100644 pkg/storage/driver/cache/lfu/testsuite/test_set_get_delete.go diff --git a/go.mod b/go.mod index 1203320..deab721 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( github.com/lestrrat-go/iter v1.0.2 // indirect github.com/lestrrat-go/option v1.0.0 // indirect github.com/miekg/dns v1.1.53 // indirect + github.com/mitchellh/hashstructure/v2 v2.0.2 // indirect golang.org/x/sync v0.1.0 // indirect google.golang.org/genproto v0.0.0-20210226172003-ab064af71705 // indirect google.golang.org/grpc v1.35.0 // indirect diff --git a/go.sum b/go.sum index 371bd48..e738cf8 100644 --- a/go.sum +++ b/go.sum @@ -258,6 +258,8 @@ github.com/miekg/dns v0.0.0-20161006100029-fc4e1e2843d8/go.mod h1:W1PPwlIAgtquWB github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI= github.com/miekg/dns v1.1.53 h1:ZBkuHr5dxHtB1caEOlZTLPo7D3L3TWckgUUs/RHfDxw= github.com/miekg/dns v1.1.53/go.mod h1:uInx36IzPl7FYnDcMeVWxj9byh7DutNykX4G9Sj60FY= +github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4= +github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= diff --git a/pkg/storage/driver/cache/lfu/cache.go b/pkg/storage/driver/cache/lfu/cache.go new file mode 100644 index 0000000..3bcd055 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/cache.go @@ -0,0 +1,262 @@ +package lfu + +import ( + "log" + "sync/atomic" + + "github.com/pkg/errors" +) + +var ( + ErrNotFound = errors.New("not found") + ErrSizeExceedCapacity = errors.New("size exceed capacity") +) + +type Cache[K comparable, V any] struct { + index *Map[K, *CacheItem[K, V]] + freqs *List[*FrequencyItem[K, V]] + size atomic.Int32 + capacity int + store Store[K, V] + getValueSize GetValueSizeFunc[V] +} + +type CacheItem[K any, V any] struct { + key K + size atomic.Int32 + frequencyParent atomic.Pointer[Element[*FrequencyItem[K, V]]] +} + +func NewCacheItem[K any, V any](key K, size int32) *CacheItem[K, V] { + item := &CacheItem[K, V]{ + key: key, + } + item.size.Store(size) + return item +} + +type FrequencyItem[K any, V any] struct { + entries atomic.Pointer[Map[*CacheItem[K, V], struct{}]] + freq atomic.Int32 +} + +func NewFrequencyItem[K any, V any]() *FrequencyItem[K, V] { + frequencyItem := &FrequencyItem[K, V]{} + frequencyItem.entries.Store(NewMap[*CacheItem[K, V], struct{}]()) + return frequencyItem +} + +func (c *Cache[K, V]) Set(key K, value V) error { + newItemSize, err := c.getValueSize(value) + if err != nil { + return errors.WithStack(err) + } + + log.Printf("setting '%v' (size: %d)", key, newItemSize) + + if newItemSize > int(c.capacity) { + return errors.WithStack(ErrSizeExceedCapacity) + } + + if err := c.store.Set(key, value); err != nil { + return errors.WithStack(err) + } + + if item, ok := c.index.Get(key); ok { + // oldItemSize := item.size.Swap(int32(newItemSize)) + // delta := -int(oldItemSize) + newItemSize + // // Eviction, if needed + // if c.atCapacity(delta) { + // c.evict(delta) + // } + + // c.size.Add(int32(delta)) + c.increment(item) + } else { + item := NewCacheItem[K, V](key, int32(newItemSize)) + c.index.Set(key, item) + + for c.atCapacity(newItemSize) { + c.evict(newItemSize) + } + + c.size.Add(int32(newItemSize)) + c.increment(item) + } + + return nil +} + +func (c *Cache[K, V]) Get(key K) (V, error) { + log.Printf("getting '%v'", key) + + if e, ok := c.index.Get(key); ok { + c.increment(e) + value, err := c.store.Get(key) + if err != nil { + return *new(V), errors.WithStack(err) + } + + return value, nil + } + + return *new(V), errors.WithStack(ErrNotFound) +} + +func (c *Cache[K, V]) Delete(key K) error { + log.Printf("deleting '%v'", key) + + item, exists := c.index.Get(key) + if !exists { + return errors.WithStack(ErrNotFound) + } + + if err := c.store.Delete(key); err != nil { + return errors.WithStack(err) + } + + c.size.Add(-int32(item.size.Load())) + c.remove(item.frequencyParent.Load(), item) + c.index.Delete(key) + + return nil +} + +func (c *Cache[K, V]) Len() int { + return c.index.Len() +} + +func (c *Cache[K, V]) Size() int { + return int(c.size.Load()) +} + +func (c *Cache[K, V]) Capacity() int { + return c.capacity +} + +func (c *Cache[K, V]) increment(item *CacheItem[K, V]) { + currentFrequencyElement := item.frequencyParent.Load() + var nextFrequencyAmount int + var nextFrequencyElement *Element[*FrequencyItem[K, V]] + + if currentFrequencyElement == nil { + nextFrequencyAmount = 1 + nextFrequencyElement = c.freqs.First() + } else { + atomicFrequencyItem := currentFrequencyElement.Value() + nextFrequencyAmount = int(atomicFrequencyItem.freq.Load()) + 1 + nextFrequencyElement = currentFrequencyElement.Next() + } + + var nextFrequency *FrequencyItem[K, V] + if nextFrequencyElement != nil { + nextFrequency = nextFrequencyElement.Value() + } + + if nextFrequencyElement == nil || nextFrequency == nil || int(nextFrequency.freq.Load()) != nextFrequencyAmount { + newFrequencyItem := NewFrequencyItem[K, V]() + newFrequencyItem.freq.Store(int32(nextFrequencyAmount)) + + if currentFrequencyElement == nil { + nextFrequencyElement = c.freqs.PushFront(newFrequencyItem) + } else { + nextFrequencyElement = c.freqs.InsertValueAfter(newFrequencyItem, currentFrequencyElement) + } + } + + item.frequencyParent.Store(nextFrequencyElement) + + nextFrequency = nextFrequencyElement.Value() + nextFrequency.entries.Load().Set(item, struct{}{}) + + if currentFrequencyElement != nil { + c.remove(item.frequencyParent.Load(), item) + } +} + +func (c *Cache[K, V]) remove(listItem *Element[*FrequencyItem[K, V]], item *CacheItem[K, V]) bool { + if listItem == nil { + return false + } + + entries := listItem.Value().entries.Load() + if entries == nil { + return false + } + + entries.Delete(item) + if entries.Len() == 0 { + c.freqs.Remove(listItem) + } + + return true +} + +func (c *Cache[K, V]) atCapacity(delta int) bool { + size, capacity := c.Size(), c.Capacity() + log.Printf("at capacity: %d/%d", size, capacity) + return size+delta >= capacity +} + +func (c *Cache[K, V]) evict(total int) error { + log.Printf("evicting for %d", total) + + for evicted := 0; evicted < total; { + frequencyElement := c.freqs.First() + if frequencyElement == nil { + return nil + } + + frequencyItem := frequencyElement.Value() + if frequencyItem == nil { + return nil + } + + entries := frequencyItem.entries.Load() + if entries == nil { + return nil + } + + var rangeErr error + entries.Range(func(key, v any) bool { + if evicted >= total { + return false + } + + entry, _ := key.(*CacheItem[K, V]) + entrySize := entry.size.Load() + log.Printf("evicting key '%v' (size: %d)", entry.key, entrySize) + + if err := c.store.Delete(entry.key); err != nil { + rangeErr = errors.WithStack(err) + return false + } + + c.index.Delete(entry.key) + c.size.Add(-entrySize) + c.remove(frequencyElement, entry) + evicted += int(entrySize) + + return true + }) + if rangeErr != nil { + return errors.WithStack(rangeErr) + } + } + + return nil +} + +func New[K comparable, V any](store Store[K, V], funcs ...OptionsFunc[K, V]) *Cache[K, V] { + opts := DefaultOptions[K, V](funcs...) + + cache := &Cache[K, V]{ + index: NewMap[K, *CacheItem[K, V]](), + freqs: NewList[*FrequencyItem[K, V]](), + capacity: opts.Capacity, + store: store, + getValueSize: opts.GetValueSize, + } + + return cache +} diff --git a/pkg/storage/driver/cache/lfu/expirable/cache.go b/pkg/storage/driver/cache/lfu/expirable/cache.go new file mode 100644 index 0000000..4f9bba3 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/expirable/cache.go @@ -0,0 +1,17 @@ +package expirable + +import ( + "time" + + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu" +) + +type Cache[K comparable, V any] struct { + ttl time.Duration + cache *lfu.Cache[K, V] + timestamps *lfu.Map[K, time.Duration] +} + +func NewCache[K comparable, V any](cache *lfu.Cache[K, V], ttl time.Duration) *Cache[K, V] { + return &Cache[K, V]{} +} diff --git a/pkg/storage/driver/cache/lfu/fs/cache_test.go b/pkg/storage/driver/cache/lfu/fs/cache_test.go new file mode 100644 index 0000000..bbbd7c3 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/fs/cache_test.go @@ -0,0 +1,37 @@ +package fs + +import ( + "bytes" + "io" + "path/filepath" + "testing" + + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu" + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu/testsuite" + "github.com/pkg/errors" +) + +func TestCacheWithFSStore(t *testing.T) { + testsuite.TestCacheWithStore(t, func(testName string) lfu.Store[string, string] { + dir := filepath.Join("testdata", "testsuite", testName) + store := NewStore[string, string](dir, + WithMarshalValue[string, string](func(value string) (io.Reader, error) { + return bytes.NewBuffer([]byte(value)), nil + }), + WithUnmarshalValue[string, string](func(r io.Reader) (string, error) { + data, err := io.ReadAll(r) + if err != nil { + return "", errors.WithStack(err) + } + + return string(data), nil + }), + ) + + if err := store.Clear(); err != nil { + panic(errors.WithStack(err)) + } + + return store + }) +} diff --git a/pkg/storage/driver/cache/lfu/fs/hash.go b/pkg/storage/driver/cache/lfu/fs/hash.go new file mode 100644 index 0000000..ee90599 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/fs/hash.go @@ -0,0 +1,19 @@ +package fs + +import ( + "strconv" + + "github.com/mitchellh/hashstructure/v2" + "github.com/pkg/errors" +) + +func DefaultGetKeyHash[K comparable](key K) ([]string, error) { + uintHash, err := hashstructure.Hash(key, hashstructure.FormatV2, nil) + if err != nil { + return nil, errors.WithStack(err) + } + + hash := strconv.FormatUint(uintHash, 16) + + return []string{hash[0:2], hash[2:4], hash[2:4], hash}, nil +} diff --git a/pkg/storage/driver/cache/lfu/fs/marshal.go b/pkg/storage/driver/cache/lfu/fs/marshal.go new file mode 100644 index 0000000..09a4eb6 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/fs/marshal.go @@ -0,0 +1,31 @@ +package fs + +import ( + "bytes" + "encoding/gob" + "io" + + "github.com/pkg/errors" +) + +func DefaultMarshalValue[V any](value V) (io.Reader, error) { + var buf bytes.Buffer + encoder := gob.NewEncoder(&buf) + + if err := encoder.Encode(value); err != nil { + return nil, errors.WithStack(err) + } + + return &buf, nil +} + +func DefaultUnmarshalValue[V any](d io.Reader) (V, error) { + var value V + encoder := gob.NewDecoder(d) + + if err := encoder.Decode(&value); err != nil { + return *new(V), errors.WithStack(err) + } + + return value, nil +} diff --git a/pkg/storage/driver/cache/lfu/fs/options.go b/pkg/storage/driver/cache/lfu/fs/options.go new file mode 100644 index 0000000..0291c9d --- /dev/null +++ b/pkg/storage/driver/cache/lfu/fs/options.go @@ -0,0 +1,45 @@ +package fs + +import "io" + +type GetKeyHashFunc[K comparable] func(key K) ([]string, error) +type MarshalValueFunc[V any] func(value V) (io.Reader, error) +type UnmarshalValueFunc[V any] func(r io.Reader) (V, error) + +type Options[K comparable, V any] struct { + GetKeyHash GetKeyHashFunc[K] + MarshalValue MarshalValueFunc[V] + UnmarshalValue UnmarshalValueFunc[V] +} + +type OptionsFunc[K comparable, V any] func(opts *Options[K, V]) + +func DefaultOptions[K comparable, V any](funcs ...OptionsFunc[K, V]) *Options[K, V] { + opts := &Options[K, V]{ + GetKeyHash: DefaultGetKeyHash[K], + MarshalValue: DefaultMarshalValue[V], + UnmarshalValue: DefaultUnmarshalValue[V], + } + for _, fn := range funcs { + fn(opts) + } + return opts +} + +func WithGetKeyHash[K comparable, V any](getKeyHash GetKeyHashFunc[K]) OptionsFunc[K, V] { + return func(opts *Options[K, V]) { + opts.GetKeyHash = getKeyHash + } +} + +func WithMarshalValue[K comparable, V any](marshalValue MarshalValueFunc[V]) OptionsFunc[K, V] { + return func(opts *Options[K, V]) { + opts.MarshalValue = marshalValue + } +} + +func WithUnmarshalValue[K comparable, V any](unmarshalValue UnmarshalValueFunc[V]) OptionsFunc[K, V] { + return func(opts *Options[K, V]) { + opts.UnmarshalValue = unmarshalValue + } +} diff --git a/pkg/storage/driver/cache/lfu/fs/store.go b/pkg/storage/driver/cache/lfu/fs/store.go new file mode 100644 index 0000000..bd4431f --- /dev/null +++ b/pkg/storage/driver/cache/lfu/fs/store.go @@ -0,0 +1,165 @@ +package fs + +import ( + "fmt" + "io" + "os" + "path/filepath" + + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu" + "github.com/pkg/errors" +) + +type Store[K comparable, V any] struct { + baseDir string + getKeyHash GetKeyHashFunc[K] + marshalValue MarshalValueFunc[V] + unmarshalValue UnmarshalValueFunc[V] +} + +// Delete implements Store. +func (s *Store[K, V]) Delete(key K) error { + path, err := s.getEntryPath(key) + if err != nil { + return errors.WithStack(err) + } + + if err := os.Remove(path); err != nil && !errors.Is(err, os.ErrNotExist) { + return errors.WithStack(err) + } + + return nil +} + +// Get implements Store. +func (s *Store[K, V]) Get(key K) (V, error) { + path, err := s.getEntryPath(key) + if err != nil { + return *new(V), errors.WithStack(err) + } + + value, err := s.readValue(path) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return *new(V), errors.WithStack(lfu.ErrNotFound) + } + + return *new(V), errors.WithStack(err) + } + + return value, nil +} + +// Set implements Store. +func (s *Store[K, V]) Set(key K, value V) error { + path, err := s.getEntryPath(key) + if err != nil { + return errors.WithStack(err) + } + + if err := s.writeValue(path, value); err != nil { + return errors.WithStack(err) + } + + return nil +} + +func (s *Store[K, V]) Clear() error { + if err := os.RemoveAll(s.baseDir); err != nil { + return errors.WithStack(err) + } + + return nil +} + +func (s *Store[K, V]) getEntryPath(k K) (string, error) { + hash, err := s.getKeyHash(k) + if err != nil { + return "", errors.WithStack(err) + } + + path := append([]string{s.baseDir}, hash...) + return filepath.Join(path...), nil +} + +func (s *Store[K, V]) writeValue(path string, value V) error { + fi, err := os.Stat(path) + if err == nil && !fi.Mode().IsRegular() { + return fmt.Errorf("%s already exists and is not a regular file", path) + } + + dir := filepath.Dir(path) + + if err := os.MkdirAll(dir, 0750); err != nil { + return errors.WithStack(err) + } + + f, err := os.CreateTemp(dir, filepath.Base(path)+".tmp") + if err != nil { + return errors.WithStack(err) + } + + tmpName := f.Name() + defer func() { + if err != nil { + f.Close() + os.Remove(tmpName) + } + }() + + reader, err := s.marshalValue(value) + if err != nil { + return errors.WithStack(err) + } + + if _, err := io.Copy(f, reader); err != nil { + return errors.WithStack(err) + } + + if err := f.Sync(); err != nil { + return errors.WithStack(err) + } + + if err := f.Close(); err != nil { + return errors.WithStack(err) + } + + if err := os.Rename(tmpName, path); err != nil { + return errors.WithStack(err) + } + + return nil +} + +func (s *Store[K, V]) readValue(path string) (V, error) { + file, err := os.Open(path) + if err != nil { + return *new(V), errors.WithStack(err) + } + + defer func() { + if err := file.Close(); err != nil && !errors.Is(err, os.ErrClosed) { + panic(errors.WithStack(err)) + } + }() + + value, err := s.unmarshalValue(file) + + if err != nil { + return *new(V), errors.WithStack(err) + } + + return value, nil +} + +func NewStore[K comparable, V any](baseDir string, funcs ...OptionsFunc[K, V]) *Store[K, V] { + opts := DefaultOptions[K, V](funcs...) + return &Store[K, V]{ + baseDir: baseDir, + getKeyHash: opts.GetKeyHash, + unmarshalValue: opts.UnmarshalValue, + marshalValue: opts.MarshalValue, + } +} + +var _ lfu.Store[string, int] = &Store[string, int]{} diff --git a/pkg/storage/driver/cache/lfu/fs/testdata/.gitignore b/pkg/storage/driver/cache/lfu/fs/testdata/.gitignore new file mode 100644 index 0000000..c96a04f --- /dev/null +++ b/pkg/storage/driver/cache/lfu/fs/testdata/.gitignore @@ -0,0 +1,2 @@ +* +!.gitignore \ No newline at end of file diff --git a/pkg/storage/driver/cache/lfu/list.go b/pkg/storage/driver/cache/lfu/list.go new file mode 100644 index 0000000..da5de91 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/list.go @@ -0,0 +1,162 @@ +package lfu + +import ( + "sync/atomic" +) + +type List[T any] struct { + root atomic.Pointer[Element[T]] + len atomic.Int32 +} + +func (l *List[T]) First() *Element[T] { + if l.len.Load() == 0 { + return nil + } + + root := l.root.Load() + + return root.next.Load() +} + +func (l *List[T]) Last() *Element[T] { + if l.len.Load() == 0 { + return nil + } + + root := l.root.Load() + + return root.prev.Load() +} + +func (l *List[T]) PushFront(v T) *Element[T] { + root := l.root.Load() + return l.InsertValueAfter(v, root) +} + +func (l *List[T]) PushBack(v T) *Element[T] { + root := l.root.Load() + return l.InsertValueAfter(v, root) +} + +func (l *List[T]) Remove(e *Element[T]) { + if e.list.Load() == l { + l.remove(e) + } +} + +func (l *List[T]) Len() int { + return int(l.len.Load()) +} + +func (l *List[T]) insertAfter(e *Element[T], at *Element[T]) *Element[T] { + e.prev.Store(at) + e.next.Store(at.next.Load()) + + prev := e.prev.Load() + if prev != nil { + prev.next.Store(e) + } + + next := e.next.Load() + if next != nil { + next.prev.Store(e) + } + + e.list.Store(l) + + l.len.Add(1) + + return e +} + +func (l *List[T]) InsertValueAfter(v T, at *Element[T]) *Element[T] { + e := NewElement[T](v) + return l.insertAfter(e, at) +} + +func (l *List[T]) remove(e *Element[T]) { + prev := e.prev.Load() + if prev != nil { + prev.next.Store(e.next.Load()) + } + + next := e.next.Load() + if next != nil { + next.prev.Store(e.prev.Load()) + } + + e.next.Store(nil) + e.prev.Store(nil) + e.list.Store(nil) + + l.len.Add(-1) +} + +func NewList[T any]() *List[T] { + root := NewElement(*new(T)) + root.next.Store(root) + root.prev.Store(root) + + list := &List[T]{} + + root.list.Store(list) + atomic := atomic.Pointer[Element[T]]{} + atomic.Store(root) + list.root = atomic + + return list +} + +type Element[T any] struct { + prev atomic.Pointer[Element[T]] + next atomic.Pointer[Element[T]] + list atomic.Pointer[List[T]] + value atomic.Pointer[T] +} + +func (e *Element[T]) Next() *Element[T] { + l := e.list.Load() + if l == nil { + return nil + } + + n := e.next.Load() + r := l.root.Load() + + if n == r { + return nil + } + + return n +} + +func (e *Element[T]) Prev() *Element[T] { + l := e.list.Load() + if l == nil { + return nil + } + + p := e.prev.Load() + r := l.root.Load() + if p == r { + return nil + } + + return p +} + +func (e *Element[T]) Value() T { + return *e.value.Load() +} + +func NewElement[T any](v T) *Element[T] { + value := atomic.Pointer[T]{} + value.Store(&v) + return &Element[T]{ + value: value, + prev: atomic.Pointer[Element[T]]{}, + next: atomic.Pointer[Element[T]]{}, + list: atomic.Pointer[List[T]]{}, + } +} diff --git a/pkg/storage/driver/cache/lfu/map.go b/pkg/storage/driver/cache/lfu/map.go new file mode 100644 index 0000000..000f164 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/map.go @@ -0,0 +1,51 @@ +package lfu + +import ( + "sync" + "sync/atomic" +) + +type Map[K comparable, V any] struct { + size atomic.Int32 + inner sync.Map +} + +func (m *Map[K, V]) Get(key K) (V, bool) { + raw, exists := m.inner.Load(key) + if !exists { + return *new(V), false + } + + value, ok := raw.(V) + if !ok { + return *new(V), false + } + + return value, true +} + +func (m *Map[K, V]) Set(key K, value V) { + _, loaded := m.inner.Swap(key, value) + if !loaded { + m.size.Add(1) + } +} + +func (m *Map[K, V]) Delete(key K) { + _, loaded := m.inner.LoadAndDelete(key) + if loaded { + m.size.Add(-1) + } +} + +func (m *Map[K, V]) Range(fn func(key, value any) bool) { + m.inner.Range(fn) +} + +func (m *Map[K, V]) Len() int { + return int(m.size.Load()) +} + +func NewMap[K comparable, V any]() *Map[K, V] { + return &Map[K, V]{} +} diff --git a/pkg/storage/driver/cache/lfu/memory/cache_test.go b/pkg/storage/driver/cache/lfu/memory/cache_test.go new file mode 100644 index 0000000..4465e41 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/memory/cache_test.go @@ -0,0 +1,14 @@ +package memory + +import ( + "testing" + + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu" + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu/testsuite" +) + +func TestCacheWithMemoryStore(t *testing.T) { + testsuite.TestCacheWithStore(t, func(testName string) lfu.Store[string, string] { + return NewStore[string, string]() + }) +} diff --git a/pkg/storage/driver/cache/lfu/memory/memory.go b/pkg/storage/driver/cache/lfu/memory/memory.go new file mode 100644 index 0000000..a6616f1 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/memory/memory.go @@ -0,0 +1,40 @@ +package memory + +import ( + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu" + "github.com/pkg/errors" +) + +type Store[K comparable, V any] struct { + index *lfu.Map[K, V] +} + +// Delete implements Store. +func (s *Store[K, V]) Delete(key K) error { + s.index.Delete(key) + return nil +} + +// Get implements Store. +func (s *Store[K, V]) Get(key K) (V, error) { + value, exists := s.index.Get(key) + if !exists { + return *new(V), errors.WithStack(lfu.ErrNotFound) + } + + return value, nil +} + +// Set implements Store. +func (s *Store[K, V]) Set(key K, value V) error { + s.index.Set(key, value) + return nil +} + +func NewStore[K comparable, V any]() *Store[K, V] { + return &Store[K, V]{ + index: lfu.NewMap[K, V](), + } +} + +var _ lfu.Store[string, int] = &Store[string, int]{} diff --git a/pkg/storage/driver/cache/lfu/options.go b/pkg/storage/driver/cache/lfu/options.go new file mode 100644 index 0000000..01a6df7 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/options.go @@ -0,0 +1,33 @@ +package lfu + +type GetValueSizeFunc[V any] func(value V) (int, error) + +type Options[K comparable, V any] struct { + GetValueSize GetValueSizeFunc[V] + Capacity int +} + +type OptionsFunc[K comparable, V any] func(opts *Options[K, V]) + +func DefaultOptions[K comparable, V any](funcs ...OptionsFunc[K, V]) *Options[K, V] { + opts := &Options[K, V]{ + GetValueSize: DefaultGetValueSize[V], + Capacity: 100, + } + for _, fn := range funcs { + fn(opts) + } + return opts +} + +func WithCapacity[K comparable, V any](capacity int) OptionsFunc[K, V] { + return func(opts *Options[K, V]) { + opts.Capacity = capacity + } +} + +func WithGetValueSize[K comparable, V any](getValueSize GetValueSizeFunc[V]) OptionsFunc[K, V] { + return func(opts *Options[K, V]) { + opts.GetValueSize = getValueSize + } +} diff --git a/pkg/storage/driver/cache/lfu/size.go b/pkg/storage/driver/cache/lfu/size.go new file mode 100644 index 0000000..19c5728 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/size.go @@ -0,0 +1,41 @@ +package lfu + +import ( + "github.com/pkg/errors" +) + +type Measurable interface { + Size() (int, error) +} + +func DefaultGetValueSize[V any](value V) (int, error) { + switch v := any(value).(type) { + case int: + return v, nil + case int8: + return int(v), nil + case int32: + return int(v), nil + case int64: + return int(v), nil + case float32: + return int(v), nil + case float64: + return int(v), nil + case []byte: + return len(v), nil + case string: + return len(v), nil + } + + if measurable, ok := any(value).(Measurable); ok { + size, err := measurable.Size() + if err != nil { + return 0, errors.WithStack(err) + } + + return size, nil + } + + return 0, errors.Errorf("could not retrieve size of type '%T'", value) +} diff --git a/pkg/storage/driver/cache/lfu/store.go b/pkg/storage/driver/cache/lfu/store.go new file mode 100644 index 0000000..7b2bd70 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/store.go @@ -0,0 +1,7 @@ +package lfu + +type Store[K comparable, V any] interface { + Delete(key K) error + Set(key K, value V) error + Get(key K) (V, error) +} diff --git a/pkg/storage/driver/cache/lfu/testsuite/main.go b/pkg/storage/driver/cache/lfu/testsuite/main.go new file mode 100644 index 0000000..477a13c --- /dev/null +++ b/pkg/storage/driver/cache/lfu/testsuite/main.go @@ -0,0 +1,40 @@ +package testsuite + +import ( + "reflect" + "runtime" + "strings" + "testing" + + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu" + "github.com/pkg/errors" +) + +type StoreFactory func(testName string) lfu.Store[string, string] +type testCase func(t *testing.T, store lfu.Store[string, string]) error + +var testCases = []testCase{ + testSetGetDelete, + testEviction, + testConcurrent, + testMultipleSet, +} + +func TestCacheWithStore(t *testing.T, factory StoreFactory) { + for _, tc := range testCases { + funcName := runtime.FuncForPC(reflect.ValueOf(tc).Pointer()).Name() + funcNameParts := strings.Split(funcName, "/") + testName := funcNameParts[len(funcNameParts)-1] + func(tc testCase) { + t.Run(testName, func(t *testing.T) { + t.Parallel() + + store := factory(testName) + + if err := tc(t, store); err != nil { + t.Fatalf("%+v", errors.WithStack(err)) + } + }) + }(tc) + } +} diff --git a/pkg/storage/driver/cache/lfu/testsuite/test_concurrent.go b/pkg/storage/driver/cache/lfu/testsuite/test_concurrent.go new file mode 100644 index 0000000..41a3ae3 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/testsuite/test_concurrent.go @@ -0,0 +1,57 @@ +package testsuite + +import ( + "fmt" + "sync" + "testing" + + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu" + "github.com/pkg/errors" +) + +func testConcurrent(t *testing.T, store lfu.Store[string, string]) error { + const value = "foo" + totalKeys := 3 + totalSize := len(value) * totalKeys + capacity := totalSize / 2 + + cache := lfu.New[string, string](store, lfu.WithCapacity[string, string](capacity)) + + var wg sync.WaitGroup + + wg.Add(totalKeys) + + loops := 10 + + for i := 0; i < totalKeys; i++ { + key := fmt.Sprintf("key%d", i) + func(key string) { + go func() { + defer wg.Done() + for i := 0; i < loops; i++ { + if err := cache.Set(key, value); err != nil { + t.Errorf("%+v", errors.WithStack(err)) + } + } + }() + }(key) + } + + wg.Wait() + + t.Logf("cache [capacity: %d, size: %d, len: %d]", cache.Capacity(), cache.Size(), cache.Len()) + + if e, g := (capacity%len(value))*len(value), cache.Size(); e != g { + t.Errorf("cache.Size(): expected %d, got %d", e, g) + } + + if cache.Size() > capacity { + t.Errorf("cache.Size(): expected value <= capacity (%d), got %d", capacity, cache.Size()) + } + + if e, g := capacity%len(value), cache.Len(); e != g { + t.Errorf("cache.Len(): expected %d, got %d", e, g) + } + + return nil +} diff --git a/pkg/storage/driver/cache/lfu/testsuite/test_eviction.go b/pkg/storage/driver/cache/lfu/testsuite/test_eviction.go new file mode 100644 index 0000000..c8c6164 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/testsuite/test_eviction.go @@ -0,0 +1,53 @@ +package testsuite + +import ( + "testing" + + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu" + "github.com/pkg/errors" +) + +func testEviction(t *testing.T, store lfu.Store[string, string]) error { + cache := lfu.New[string, string](store, lfu.WithCapacity[string, string](10)) + + if err := cache.Set("key1", "12345"); err != nil { + return errors.WithStack(err) + } + + if err := cache.Set("key2", "123"); err != nil { + return errors.WithStack(err) + } + + // Increment frequency of key2 + if _, err := cache.Get("key2"); err != nil { + return errors.WithStack(err) + } + + if e, g := 8, cache.Size(); e != g { + t.Errorf("cache.Size(): expected '%v', got '%v'", e, g) + } + + if err := cache.Set("key3", "1234"); err != nil { + return errors.WithStack(err) + } + + if err := cache.Set("key3", "1234"); err != nil { + return errors.WithStack(err) + } + + t.Logf("cache [capacity: %d, size: %d, len: %d]", cache.Capacity(), cache.Size(), cache.Len()) + + if e, g := cache.Capacity(), cache.Size(); e < g { + t.Errorf("cache.Size(): expected value < capacity (%v), got '%v'", e, g) + } + + if e, g := 2, cache.Len(); e != g { + t.Errorf("cache.Len(): expected %d, got %d", e, g) + } + + if cache.Size() < 0 { + t.Errorf("cache.Size(): expected value >= 0, got %d", cache.Size()) + } + + return nil +} diff --git a/pkg/storage/driver/cache/lfu/testsuite/test_multiple_set.go b/pkg/storage/driver/cache/lfu/testsuite/test_multiple_set.go new file mode 100644 index 0000000..d8d97de --- /dev/null +++ b/pkg/storage/driver/cache/lfu/testsuite/test_multiple_set.go @@ -0,0 +1,80 @@ +package testsuite + +import ( + "testing" + + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu" + "github.com/pkg/errors" +) + +func testMultipleSet(t *testing.T, store lfu.Store[string, string]) error { + const ( + key = "mykey" + firstValue = "foo" + secondValue = "bar" + thirdValue = "foobar" + ) + + cache := lfu.New[string, string](store) + + if e, g := 0, cache.Size(); e != g { + t.Errorf("cache.Size(): expected '%v', got '%v'", e, g) + } + + if err := cache.Set(key, firstValue); err != nil { + return errors.WithStack(err) + } + + if e, g := len(firstValue), cache.Size(); e != g { + t.Errorf("cache.Size(): expected '%v', got '%v'", e, g) + } + + retrieved, err := cache.Get(key) + if err != nil { + return errors.WithStack(err) + } + + if e, g := firstValue, retrieved; e != g { + t.Errorf("cache.Get(key): expected '%v', got '%v'", e, g) + } + + if err := cache.Set(key, secondValue); err != nil { + return errors.WithStack(err) + } + + if e, g := len(secondValue), cache.Size(); e != g { + t.Errorf("cache.Size(): expected '%v', got '%v'", e, g) + } + + retrieved, err = cache.Get(key) + if err != nil { + return errors.WithStack(err) + } + + if e, g := secondValue, retrieved; e != g { + t.Errorf("cache.Get(key): expected '%v', got '%v'", e, g) + } + + if err := cache.Set(key, thirdValue); err != nil { + return errors.WithStack(err) + } + + if e, g := len(thirdValue), cache.Size(); e != g { + t.Errorf("cache.Size(): expected '%v', got '%v'", e, g) + } + + retrieved, err = cache.Get(key) + if err != nil { + return errors.WithStack(err) + } + + if e, g := thirdValue, retrieved; e != g { + t.Errorf("cache.Get(key): expected '%v', got '%v'", e, g) + } + + if e, g := len(thirdValue), cache.Size(); e != g { + t.Errorf("cache.Size(): expected '%v', got '%v'", e, g) + } + + return nil +} diff --git a/pkg/storage/driver/cache/lfu/testsuite/test_set_get_delete.go b/pkg/storage/driver/cache/lfu/testsuite/test_set_get_delete.go new file mode 100644 index 0000000..582eccd --- /dev/null +++ b/pkg/storage/driver/cache/lfu/testsuite/test_set_get_delete.go @@ -0,0 +1,66 @@ +package testsuite + +import ( + "testing" + + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu" + "github.com/pkg/errors" +) + +func testSetGetDelete(t *testing.T, store lfu.Store[string, string]) error { + const ( + key = "mykey" + value = "foobar" + ) + + cache := lfu.New[string, string](store, lfu.WithCapacity[string, string](10)) + + if e, g := 0, cache.Size(); e != g { + t.Errorf("cache.Size(): expected '%v', got '%v'", e, g) + } + + if err := cache.Set(key, value); err != nil { + return errors.WithStack(err) + } + + if e, g := len(value), cache.Size(); e != g { + t.Errorf("cache.Size(): expected '%v', got '%v'", e, g) + } + + if e, g := 1, cache.Len(); e != g { + t.Errorf("cache.Len(): expected '%v', got '%v'", e, g) + } + + retrieved, err := cache.Get(key) + if err != nil { + return errors.WithStack(err) + } + + if e, g := value, retrieved; e != g { + t.Errorf("cache.Get(key): expected '%v', got '%v'", e, g) + } + + if err := cache.Delete(key); err != nil { + return errors.WithStack(err) + } + + if _, err := cache.Get(key); err == nil || !errors.Is(err, lfu.ErrNotFound) { + t.Errorf("cache.Get(key): err should be lfu.ErrNotFound, got '%v'", errors.WithStack(err)) + } + + if e, g := value, retrieved; e != g { + t.Errorf("cache.Get(key): expected '%v', got '%v'", e, g) + } + + if e, g := 0, cache.Size(); e != g { + t.Errorf("cache.Size(): expected '%v', got '%v'", e, g) + } + + if e, g := 0, cache.Len(); e != g { + t.Errorf("cache.Len(): expected '%v', got '%v'", e, g) + } + + t.Logf("cache [capacity: %d, size: %d, len: %d]", cache.Capacity(), cache.Size(), cache.Len()) + + return nil +}