diff --git a/go.mod b/go.mod index 1203320..deab721 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( github.com/lestrrat-go/iter v1.0.2 // indirect github.com/lestrrat-go/option v1.0.0 // indirect github.com/miekg/dns v1.1.53 // indirect + github.com/mitchellh/hashstructure/v2 v2.0.2 // indirect golang.org/x/sync v0.1.0 // indirect google.golang.org/genproto v0.0.0-20210226172003-ab064af71705 // indirect google.golang.org/grpc v1.35.0 // indirect diff --git a/go.sum b/go.sum index 371bd48..e738cf8 100644 --- a/go.sum +++ b/go.sum @@ -258,6 +258,8 @@ github.com/miekg/dns v0.0.0-20161006100029-fc4e1e2843d8/go.mod h1:W1PPwlIAgtquWB github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI= github.com/miekg/dns v1.1.53 h1:ZBkuHr5dxHtB1caEOlZTLPo7D3L3TWckgUUs/RHfDxw= github.com/miekg/dns v1.1.53/go.mod h1:uInx36IzPl7FYnDcMeVWxj9byh7DutNykX4G9Sj60FY= +github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4= +github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= diff --git a/pkg/storage/driver/cache/lfu/cache.go b/pkg/storage/driver/cache/lfu/cache.go new file mode 100644 index 0000000..341d39f --- /dev/null +++ b/pkg/storage/driver/cache/lfu/cache.go @@ -0,0 +1,313 @@ +package lfu + +import ( + "sync" + "sync/atomic" + + "github.com/pkg/errors" +) + +var ( + ErrNotFound = errors.New("not found") + ErrSizeExceedCapacity = errors.New("size exceed capacity") +) + +type Cache[K comparable, V any] struct { + index *Map[K, *CacheItem[K, V]] + freqs *List[*FrequencyItem[K, V]] + + size atomic.Int32 + + capacity int + store Store[K, V] + getValueSize GetValueSizeFunc[V] + sync *Synchronizer[K] + + evictMutex sync.Mutex + + log LogFunc +} + +type CacheItem[K any, V any] struct { + key K + size int + frequencyParent *Element[*FrequencyItem[K, V]] +} + +func NewCacheItem[K any, V any](key K, size int) *CacheItem[K, V] { + item := &CacheItem[K, V]{ + key: key, + size: size, + } + return item +} + +type FrequencyItem[K any, V any] struct { + entries *Map[*CacheItem[K, V], struct{}] + freq int +} + +func NewFrequencyItem[K any, V any]() *FrequencyItem[K, V] { + frequencyItem := &FrequencyItem[K, V]{} + frequencyItem.entries = NewMap[*CacheItem[K, V], struct{}]() + return frequencyItem +} + +func (c *Cache[K, V]) Set(key K, value V) error { + newItemSize, err := c.getValueSize(value) + if err != nil { + return errors.WithStack(err) + } + + c.log("setting '%v' (size: %d)", key, newItemSize) + + if newItemSize > int(c.capacity) { + return errors.WithStack(ErrSizeExceedCapacity) + } + + var sizeDelta int + + err = c.sync.WriteTx(key, func() error { + if err := c.store.Set(key, value); err != nil { + return errors.WithStack(err) + } + + item, ok := c.index.Get(key) + if ok { + oldItemSize := item.size + sizeDelta = -int(oldItemSize) + newItemSize + + } else { + item = NewCacheItem[K, V](key, newItemSize) + c.index.Set(key, item) + sizeDelta = newItemSize + } + + c.size.Add(int32(sizeDelta)) + c.increment(item) + + return nil + }) + if err != nil { + return errors.WithStack(err) + } + + // Eviction, if needed + if err := c.Evict(); err != nil { + return errors.WithStack(err) + } + + return nil +} + +func (c *Cache[K, V]) Get(key K) (V, error) { + var value V + err := c.sync.WriteTx(key, func() error { + c.log("getting '%v'", key) + + e, ok := c.index.Get(key) + if !ok { + return errors.WithStack(ErrNotFound) + } + + v, err := c.store.Get(key) + if err != nil { + return errors.WithStack(err) + } + + c.increment(e) + + value = v + return nil + }) + if err != nil { + return *new(V), errors.WithStack(err) + } + + return value, nil +} + +func (c *Cache[K, V]) Delete(key K) error { + err := c.sync.WriteTx(key, func() error { + c.log("deleting '%v'", key) + + item, exists := c.index.Get(key) + if !exists { + return errors.WithStack(ErrNotFound) + } + + if err := c.store.Delete(key); err != nil { + return errors.WithStack(err) + } + + c.size.Add(-int32(item.size)) + + c.remove(item.frequencyParent, item) + c.index.Delete(key) + + return nil + }) + if err != nil { + return errors.WithStack(err) + } + + return nil +} + +func (c *Cache[K, V]) Len() int { + return c.index.Len() +} + +func (c *Cache[K, V]) Size() int { + return int(c.size.Load()) +} + +func (c *Cache[K, V]) Capacity() int { + return c.capacity +} + +func (c *Cache[K, V]) increment(item *CacheItem[K, V]) { + currentFrequencyElement := item.frequencyParent + var nextFrequencyAmount int + var nextFrequencyElement *Element[*FrequencyItem[K, V]] + + if currentFrequencyElement == nil { + nextFrequencyAmount = 1 + nextFrequencyElement = c.freqs.First() + } else { + atomicFrequencyItem := c.freqs.Value(currentFrequencyElement) + nextFrequencyAmount = atomicFrequencyItem.freq + 1 + nextFrequencyElement = c.freqs.Next(currentFrequencyElement) + } + + var nextFrequency *FrequencyItem[K, V] + if nextFrequencyElement != nil { + nextFrequency = c.freqs.Value(nextFrequencyElement) + } + + if nextFrequencyElement == nil || nextFrequency == nil || nextFrequency.freq != nextFrequencyAmount { + newFrequencyItem := NewFrequencyItem[K, V]() + newFrequencyItem.freq = nextFrequencyAmount + + if currentFrequencyElement == nil { + nextFrequencyElement = c.freqs.PushFront(newFrequencyItem) + } else { + nextFrequencyElement = c.freqs.InsertValueAfter(newFrequencyItem, currentFrequencyElement) + } + } + + item.frequencyParent = nextFrequencyElement + + nextFrequency = c.freqs.Value(nextFrequencyElement) + nextFrequency.entries.Set(item, struct{}{}) + + if currentFrequencyElement != nil { + c.remove(currentFrequencyElement, item) + } +} + +func (c *Cache[K, V]) remove(listItem *Element[*FrequencyItem[K, V]], item *CacheItem[K, V]) { + entries := c.freqs.Value(listItem).entries + + entries.Delete(item) + // if entries.Len() == 0 { + // c.freqs.Remove(listItem) + // } +} + +func (c *Cache[K, V]) atCapacity() (bool, int) { + size, capacity := c.Size(), c.Capacity() + c.log("cache stats: %d/%d", size, capacity) + return size >= capacity, size - capacity +} + +func (c *Cache[K, V]) evict(total int) error { + if total == 0 { + return nil + } + + frequencyElement := c.freqs.First() + if frequencyElement == nil { + c.log("no frequency element") + return nil + } + + for evicted := 0; evicted < total; { + c.log("running eviction: [to_evict:%d, evicted: %d]", total, evicted) + + c.log("first frequency element %p", frequencyElement) + + frequencyItem := c.freqs.Value(frequencyElement) + if frequencyItem == nil { + return nil + } + + entries := frequencyItem.entries + + if entries.Len() == 0 { + c.log("no frequency entries") + frequencyElement = c.freqs.Next(frequencyElement) + continue + } + + var rangeErr error + entries.Range(func(key, v any) bool { + if evicted >= total { + c.log("evicted enough (%d >= %d), stopping", evicted, total) + return false + } + + entry, _ := key.(*CacheItem[K, V]) + + if err := c.Delete(entry.key); err != nil { + if errors.Is(err, ErrNotFound) { + c.log("key '%s' not found", entry.key) + // Cleanup obsolete frequency + c.remove(frequencyElement, entry) + return true + } + + rangeErr = errors.WithStack(err) + return false + } + + c.log("evicted key '%v' (size: %d)", entry.key, entry.size) + + evicted += int(entry.size) + + return true + }) + if rangeErr != nil { + return errors.WithStack(rangeErr) + } + } + + return nil +} + +func (c *Cache[K, V]) Evict() error { + exceed, delta := c.atCapacity() + if exceed && delta > 0 { + if err := c.evict(delta); err != nil { + return errors.WithStack(err) + } + } + + return nil +} + +func New[K comparable, V any](store Store[K, V], funcs ...OptionsFunc[K, V]) *Cache[K, V] { + opts := DefaultOptions[K, V](funcs...) + + cache := &Cache[K, V]{ + index: NewMap[K, *CacheItem[K, V]](), + freqs: NewList[*FrequencyItem[K, V]](), + capacity: opts.Capacity, + store: store, + getValueSize: opts.GetValueSize, + sync: NewSynchronizer[K](), + log: opts.Log, + } + + return cache +} diff --git a/pkg/storage/driver/cache/lfu/expirable/cache.go b/pkg/storage/driver/cache/lfu/expirable/cache.go new file mode 100644 index 0000000..4f9bba3 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/expirable/cache.go @@ -0,0 +1,17 @@ +package expirable + +import ( + "time" + + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu" +) + +type Cache[K comparable, V any] struct { + ttl time.Duration + cache *lfu.Cache[K, V] + timestamps *lfu.Map[K, time.Duration] +} + +func NewCache[K comparable, V any](cache *lfu.Cache[K, V], ttl time.Duration) *Cache[K, V] { + return &Cache[K, V]{} +} diff --git a/pkg/storage/driver/cache/lfu/fs/cache_test.go b/pkg/storage/driver/cache/lfu/fs/cache_test.go new file mode 100644 index 0000000..bbbd7c3 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/fs/cache_test.go @@ -0,0 +1,37 @@ +package fs + +import ( + "bytes" + "io" + "path/filepath" + "testing" + + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu" + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu/testsuite" + "github.com/pkg/errors" +) + +func TestCacheWithFSStore(t *testing.T) { + testsuite.TestCacheWithStore(t, func(testName string) lfu.Store[string, string] { + dir := filepath.Join("testdata", "testsuite", testName) + store := NewStore[string, string](dir, + WithMarshalValue[string, string](func(value string) (io.Reader, error) { + return bytes.NewBuffer([]byte(value)), nil + }), + WithUnmarshalValue[string, string](func(r io.Reader) (string, error) { + data, err := io.ReadAll(r) + if err != nil { + return "", errors.WithStack(err) + } + + return string(data), nil + }), + ) + + if err := store.Clear(); err != nil { + panic(errors.WithStack(err)) + } + + return store + }) +} diff --git a/pkg/storage/driver/cache/lfu/fs/hash.go b/pkg/storage/driver/cache/lfu/fs/hash.go new file mode 100644 index 0000000..ee90599 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/fs/hash.go @@ -0,0 +1,19 @@ +package fs + +import ( + "strconv" + + "github.com/mitchellh/hashstructure/v2" + "github.com/pkg/errors" +) + +func DefaultGetKeyHash[K comparable](key K) ([]string, error) { + uintHash, err := hashstructure.Hash(key, hashstructure.FormatV2, nil) + if err != nil { + return nil, errors.WithStack(err) + } + + hash := strconv.FormatUint(uintHash, 16) + + return []string{hash[0:2], hash[2:4], hash[2:4], hash}, nil +} diff --git a/pkg/storage/driver/cache/lfu/fs/marshal.go b/pkg/storage/driver/cache/lfu/fs/marshal.go new file mode 100644 index 0000000..09a4eb6 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/fs/marshal.go @@ -0,0 +1,31 @@ +package fs + +import ( + "bytes" + "encoding/gob" + "io" + + "github.com/pkg/errors" +) + +func DefaultMarshalValue[V any](value V) (io.Reader, error) { + var buf bytes.Buffer + encoder := gob.NewEncoder(&buf) + + if err := encoder.Encode(value); err != nil { + return nil, errors.WithStack(err) + } + + return &buf, nil +} + +func DefaultUnmarshalValue[V any](d io.Reader) (V, error) { + var value V + encoder := gob.NewDecoder(d) + + if err := encoder.Decode(&value); err != nil { + return *new(V), errors.WithStack(err) + } + + return value, nil +} diff --git a/pkg/storage/driver/cache/lfu/fs/options.go b/pkg/storage/driver/cache/lfu/fs/options.go new file mode 100644 index 0000000..0291c9d --- /dev/null +++ b/pkg/storage/driver/cache/lfu/fs/options.go @@ -0,0 +1,45 @@ +package fs + +import "io" + +type GetKeyHashFunc[K comparable] func(key K) ([]string, error) +type MarshalValueFunc[V any] func(value V) (io.Reader, error) +type UnmarshalValueFunc[V any] func(r io.Reader) (V, error) + +type Options[K comparable, V any] struct { + GetKeyHash GetKeyHashFunc[K] + MarshalValue MarshalValueFunc[V] + UnmarshalValue UnmarshalValueFunc[V] +} + +type OptionsFunc[K comparable, V any] func(opts *Options[K, V]) + +func DefaultOptions[K comparable, V any](funcs ...OptionsFunc[K, V]) *Options[K, V] { + opts := &Options[K, V]{ + GetKeyHash: DefaultGetKeyHash[K], + MarshalValue: DefaultMarshalValue[V], + UnmarshalValue: DefaultUnmarshalValue[V], + } + for _, fn := range funcs { + fn(opts) + } + return opts +} + +func WithGetKeyHash[K comparable, V any](getKeyHash GetKeyHashFunc[K]) OptionsFunc[K, V] { + return func(opts *Options[K, V]) { + opts.GetKeyHash = getKeyHash + } +} + +func WithMarshalValue[K comparable, V any](marshalValue MarshalValueFunc[V]) OptionsFunc[K, V] { + return func(opts *Options[K, V]) { + opts.MarshalValue = marshalValue + } +} + +func WithUnmarshalValue[K comparable, V any](unmarshalValue UnmarshalValueFunc[V]) OptionsFunc[K, V] { + return func(opts *Options[K, V]) { + opts.UnmarshalValue = unmarshalValue + } +} diff --git a/pkg/storage/driver/cache/lfu/fs/store.go b/pkg/storage/driver/cache/lfu/fs/store.go new file mode 100644 index 0000000..bd4431f --- /dev/null +++ b/pkg/storage/driver/cache/lfu/fs/store.go @@ -0,0 +1,165 @@ +package fs + +import ( + "fmt" + "io" + "os" + "path/filepath" + + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu" + "github.com/pkg/errors" +) + +type Store[K comparable, V any] struct { + baseDir string + getKeyHash GetKeyHashFunc[K] + marshalValue MarshalValueFunc[V] + unmarshalValue UnmarshalValueFunc[V] +} + +// Delete implements Store. +func (s *Store[K, V]) Delete(key K) error { + path, err := s.getEntryPath(key) + if err != nil { + return errors.WithStack(err) + } + + if err := os.Remove(path); err != nil && !errors.Is(err, os.ErrNotExist) { + return errors.WithStack(err) + } + + return nil +} + +// Get implements Store. +func (s *Store[K, V]) Get(key K) (V, error) { + path, err := s.getEntryPath(key) + if err != nil { + return *new(V), errors.WithStack(err) + } + + value, err := s.readValue(path) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return *new(V), errors.WithStack(lfu.ErrNotFound) + } + + return *new(V), errors.WithStack(err) + } + + return value, nil +} + +// Set implements Store. +func (s *Store[K, V]) Set(key K, value V) error { + path, err := s.getEntryPath(key) + if err != nil { + return errors.WithStack(err) + } + + if err := s.writeValue(path, value); err != nil { + return errors.WithStack(err) + } + + return nil +} + +func (s *Store[K, V]) Clear() error { + if err := os.RemoveAll(s.baseDir); err != nil { + return errors.WithStack(err) + } + + return nil +} + +func (s *Store[K, V]) getEntryPath(k K) (string, error) { + hash, err := s.getKeyHash(k) + if err != nil { + return "", errors.WithStack(err) + } + + path := append([]string{s.baseDir}, hash...) + return filepath.Join(path...), nil +} + +func (s *Store[K, V]) writeValue(path string, value V) error { + fi, err := os.Stat(path) + if err == nil && !fi.Mode().IsRegular() { + return fmt.Errorf("%s already exists and is not a regular file", path) + } + + dir := filepath.Dir(path) + + if err := os.MkdirAll(dir, 0750); err != nil { + return errors.WithStack(err) + } + + f, err := os.CreateTemp(dir, filepath.Base(path)+".tmp") + if err != nil { + return errors.WithStack(err) + } + + tmpName := f.Name() + defer func() { + if err != nil { + f.Close() + os.Remove(tmpName) + } + }() + + reader, err := s.marshalValue(value) + if err != nil { + return errors.WithStack(err) + } + + if _, err := io.Copy(f, reader); err != nil { + return errors.WithStack(err) + } + + if err := f.Sync(); err != nil { + return errors.WithStack(err) + } + + if err := f.Close(); err != nil { + return errors.WithStack(err) + } + + if err := os.Rename(tmpName, path); err != nil { + return errors.WithStack(err) + } + + return nil +} + +func (s *Store[K, V]) readValue(path string) (V, error) { + file, err := os.Open(path) + if err != nil { + return *new(V), errors.WithStack(err) + } + + defer func() { + if err := file.Close(); err != nil && !errors.Is(err, os.ErrClosed) { + panic(errors.WithStack(err)) + } + }() + + value, err := s.unmarshalValue(file) + + if err != nil { + return *new(V), errors.WithStack(err) + } + + return value, nil +} + +func NewStore[K comparable, V any](baseDir string, funcs ...OptionsFunc[K, V]) *Store[K, V] { + opts := DefaultOptions[K, V](funcs...) + return &Store[K, V]{ + baseDir: baseDir, + getKeyHash: opts.GetKeyHash, + unmarshalValue: opts.UnmarshalValue, + marshalValue: opts.MarshalValue, + } +} + +var _ lfu.Store[string, int] = &Store[string, int]{} diff --git a/pkg/storage/driver/cache/lfu/fs/testdata/.gitignore b/pkg/storage/driver/cache/lfu/fs/testdata/.gitignore new file mode 100644 index 0000000..c96a04f --- /dev/null +++ b/pkg/storage/driver/cache/lfu/fs/testdata/.gitignore @@ -0,0 +1,2 @@ +* +!.gitignore \ No newline at end of file diff --git a/pkg/storage/driver/cache/lfu/list.go b/pkg/storage/driver/cache/lfu/list.go new file mode 100644 index 0000000..f8e6e60 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/list.go @@ -0,0 +1,203 @@ +package lfu + +import ( + "sync/atomic" +) + +type List[T any] struct { + root *Element[T] + len atomic.Int32 + sync *Synchronizer[*Element[T]] +} + +func (l *List[T]) First() *Element[T] { + if l.Len() == 0 { + return nil + } + + var next *Element[T] + l.sync.ReadTx(l.root, func(upgrade func(func())) error { + next = l.root.next + return nil + }) + + return next +} + +func (l *List[T]) Last() *Element[T] { + if l.Len() == 0 { + return nil + } + + var prev *Element[T] + l.sync.ReadTx(l.root, func(upgrade func(func())) error { + prev = l.root.prev + return nil + }) + + return prev +} + +func (l *List[T]) Prev(e *Element[T]) *Element[T] { + var prev *Element[T] + l.sync.ReadTx(e, func(upgrade func(func())) error { + prev = e.prev + return nil + }) + + return prev +} + +func (l *List[T]) Next(e *Element[T]) *Element[T] { + var next *Element[T] + l.sync.ReadTx(e, func(upgrade func(func())) error { + next = e.next + return nil + }) + + return next +} + +func (l *List[T]) Value(e *Element[T]) T { + var value T + l.sync.ReadTx(e, func(upgrade func(func())) error { + value = e.value + return nil + }) + + return value +} + +func (l *List[T]) PushFront(v T) *Element[T] { + return l.InsertValueAfter(v, l.root) +} + +func (l *List[T]) PushBack(v T) *Element[T] { + return l.InsertValueAfter(v, l.root) +} + +func (l *List[T]) Remove(e *Element[T]) { + l.remove(e) +} + +func (l *List[T]) Len() int { + return int(l.len.Load()) +} + +func (l *List[T]) insertAfter(e *Element[T], at *Element[T]) *Element[T] { + l.sync.ReadTx(e, func(upgrade func(fn func())) error { + var next *Element[T] + l.sync.ReadTx(at, func(upgrade func(func())) error { + next = at.next + return nil + }) + + upgrade(func() { + e.prev = at + e.next = next + e.list = l + }) + + if e.prev != nil { + l.sync.WriteTx(e.prev, func() error { + e.prev.next = e + return nil + }) + } + + if e.next != nil { + l.sync.WriteTx(e.next, func() error { + e.next.prev = e + return nil + }) + } + + return nil + }) + + l.len.Add(1) + + return e +} + +func (l *List[T]) InsertValueAfter(v T, at *Element[T]) *Element[T] { + e := NewElement[T](v) + return l.insertAfter(e, at) +} + +func (l *List[T]) remove(e *Element[T]) { + if e == nil && e == l.root { + return + } + + l.sync.ReadTx(e, func(upgrade func(fn func())) error { + if e.prev != nil { + if e.prev == e { + upgrade(func() { + e.prev.next = e.next + }) + } else { + l.sync.WriteTx(e.prev, func() error { + e.prev.next = e.next + return nil + }) + } + } + + if e.next != nil { + if e.next == e { + upgrade(func() { + e.next.prev = e.prev + }) + } else { + l.sync.WriteTx(e.next, func() error { + e.next.prev = e.prev + return nil + }) + } + } + + upgrade(func() { + e.next = nil + e.prev = nil + e.list = nil + }) + + return nil + }) + + l.sync.Remove(e) + l.len.Add(-1) +} + +func NewList[T any]() *List[T] { + root := NewElement(*new(T)) + root.next = root + root.prev = root + + list := &List[T]{ + sync: NewSynchronizer[*Element[T]](), + } + + root.list = list + list.root = root + + return list +} + +type Element[T any] struct { + prev *Element[T] + next *Element[T] + list *List[T] + value T +} + +func NewElement[T any](v T) *Element[T] { + element := &Element[T]{ + prev: nil, + next: nil, + list: nil, + value: v, + } + return element +} diff --git a/pkg/storage/driver/cache/lfu/map.go b/pkg/storage/driver/cache/lfu/map.go new file mode 100644 index 0000000..26afd21 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/map.go @@ -0,0 +1,67 @@ +package lfu + +import ( + "sync" + "sync/atomic" +) + +type Map[K comparable, V any] struct { + size atomic.Int32 + inner sync.Map +} + +func (m *Map[K, V]) Get(key K) (V, bool) { + raw, exists := m.inner.Load(key) + if !exists { + return *new(V), false + } + + value, ok := raw.(V) + if !ok { + return *new(V), false + } + + return value, true +} + +func (m *Map[K, V]) GetOrSet(key K, defaultValue V) (V, bool) { + raw, loaded := m.inner.LoadOrStore(key, defaultValue) + if !loaded { + m.size.Add(1) + } + + value, ok := raw.(V) + if !ok { + return *new(V), loaded + } + + return value, loaded +} + +func (m *Map[K, V]) Set(key K, value V) { + _, loaded := m.inner.Swap(key, value) + if !loaded { + m.size.Add(1) + } +} + +func (m *Map[K, V]) Delete(key K) { + _, existed := m.inner.LoadAndDelete(key) + if existed { + m.size.Add(-1) + } +} + +func (m *Map[K, V]) Range(fn func(key, value any) bool) { + m.inner.Range(fn) +} + +func (m *Map[K, V]) Len() int { + return int(m.size.Load()) +} + +func NewMap[K comparable, V any]() *Map[K, V] { + return &Map[K, V]{ + inner: sync.Map{}, + } +} diff --git a/pkg/storage/driver/cache/lfu/memory/cache_test.go b/pkg/storage/driver/cache/lfu/memory/cache_test.go new file mode 100644 index 0000000..4465e41 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/memory/cache_test.go @@ -0,0 +1,14 @@ +package memory + +import ( + "testing" + + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu" + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu/testsuite" +) + +func TestCacheWithMemoryStore(t *testing.T) { + testsuite.TestCacheWithStore(t, func(testName string) lfu.Store[string, string] { + return NewStore[string, string]() + }) +} diff --git a/pkg/storage/driver/cache/lfu/memory/memory.go b/pkg/storage/driver/cache/lfu/memory/memory.go new file mode 100644 index 0000000..a6616f1 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/memory/memory.go @@ -0,0 +1,40 @@ +package memory + +import ( + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu" + "github.com/pkg/errors" +) + +type Store[K comparable, V any] struct { + index *lfu.Map[K, V] +} + +// Delete implements Store. +func (s *Store[K, V]) Delete(key K) error { + s.index.Delete(key) + return nil +} + +// Get implements Store. +func (s *Store[K, V]) Get(key K) (V, error) { + value, exists := s.index.Get(key) + if !exists { + return *new(V), errors.WithStack(lfu.ErrNotFound) + } + + return value, nil +} + +// Set implements Store. +func (s *Store[K, V]) Set(key K, value V) error { + s.index.Set(key, value) + return nil +} + +func NewStore[K comparable, V any]() *Store[K, V] { + return &Store[K, V]{ + index: lfu.NewMap[K, V](), + } +} + +var _ lfu.Store[string, int] = &Store[string, int]{} diff --git a/pkg/storage/driver/cache/lfu/options.go b/pkg/storage/driver/cache/lfu/options.go new file mode 100644 index 0000000..d75c195 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/options.go @@ -0,0 +1,47 @@ +package lfu + +type GetValueSizeFunc[V any] func(value V) (int, error) + +type LogFunc func(format string, values ...any) + +func DefaultLogFunc(format string, values ...any) { + +} + +type Options[K comparable, V any] struct { + GetValueSize GetValueSizeFunc[V] + Capacity int + Log LogFunc +} + +type OptionsFunc[K comparable, V any] func(opts *Options[K, V]) + +func DefaultOptions[K comparable, V any](funcs ...OptionsFunc[K, V]) *Options[K, V] { + opts := &Options[K, V]{ + GetValueSize: DefaultGetValueSize[V], + Capacity: 100, + Log: DefaultLogFunc, + } + for _, fn := range funcs { + fn(opts) + } + return opts +} + +func WithCapacity[K comparable, V any](capacity int) OptionsFunc[K, V] { + return func(opts *Options[K, V]) { + opts.Capacity = capacity + } +} + +func WithGetValueSize[K comparable, V any](getValueSize GetValueSizeFunc[V]) OptionsFunc[K, V] { + return func(opts *Options[K, V]) { + opts.GetValueSize = getValueSize + } +} + +func WithLog[K comparable, V any](fn LogFunc) OptionsFunc[K, V] { + return func(opts *Options[K, V]) { + opts.Log = fn + } +} diff --git a/pkg/storage/driver/cache/lfu/size.go b/pkg/storage/driver/cache/lfu/size.go new file mode 100644 index 0000000..19c5728 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/size.go @@ -0,0 +1,41 @@ +package lfu + +import ( + "github.com/pkg/errors" +) + +type Measurable interface { + Size() (int, error) +} + +func DefaultGetValueSize[V any](value V) (int, error) { + switch v := any(value).(type) { + case int: + return v, nil + case int8: + return int(v), nil + case int32: + return int(v), nil + case int64: + return int(v), nil + case float32: + return int(v), nil + case float64: + return int(v), nil + case []byte: + return len(v), nil + case string: + return len(v), nil + } + + if measurable, ok := any(value).(Measurable); ok { + size, err := measurable.Size() + if err != nil { + return 0, errors.WithStack(err) + } + + return size, nil + } + + return 0, errors.Errorf("could not retrieve size of type '%T'", value) +} diff --git a/pkg/storage/driver/cache/lfu/store.go b/pkg/storage/driver/cache/lfu/store.go new file mode 100644 index 0000000..7b2bd70 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/store.go @@ -0,0 +1,7 @@ +package lfu + +type Store[K comparable, V any] interface { + Delete(key K) error + Set(key K, value V) error + Get(key K) (V, error) +} diff --git a/pkg/storage/driver/cache/lfu/synchronizer.go b/pkg/storage/driver/cache/lfu/synchronizer.go new file mode 100644 index 0000000..7f7af2e --- /dev/null +++ b/pkg/storage/driver/cache/lfu/synchronizer.go @@ -0,0 +1,56 @@ +package lfu + +import ( + "sync" + + "github.com/pkg/errors" +) + +type Synchronizer[K comparable] struct { + index *Map[K, *sync.RWMutex] +} + +func (s *Synchronizer[K]) Remove(key K) { + s.index.Delete(key) +} + +func (s *Synchronizer[K]) ReadTx(key K, fn func(upgrade func(fn func())) error) error { + mutex, _ := s.index.GetOrSet(key, &sync.RWMutex{}) + mutex.RLock() + defer mutex.RUnlock() + + upgrade := func(fn func()) { + mutex.RUnlock() + mutex.Lock() + defer func() { + mutex.Unlock() + mutex.RLock() + }() + + fn() + } + + if err := fn(upgrade); err != nil { + return errors.WithStack(err) + } + + return nil +} + +func (s *Synchronizer[K]) WriteTx(key K, fn func() error) error { + mutex, _ := s.index.GetOrSet(key, &sync.RWMutex{}) + mutex.Lock() + defer mutex.Unlock() + + if err := fn(); err != nil { + return errors.WithStack(err) + } + + return nil +} + +func NewSynchronizer[K comparable]() *Synchronizer[K] { + return &Synchronizer[K]{ + index: NewMap[K, *sync.RWMutex](), + } +} diff --git a/pkg/storage/driver/cache/lfu/testsuite/main.go b/pkg/storage/driver/cache/lfu/testsuite/main.go new file mode 100644 index 0000000..477a13c --- /dev/null +++ b/pkg/storage/driver/cache/lfu/testsuite/main.go @@ -0,0 +1,40 @@ +package testsuite + +import ( + "reflect" + "runtime" + "strings" + "testing" + + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu" + "github.com/pkg/errors" +) + +type StoreFactory func(testName string) lfu.Store[string, string] +type testCase func(t *testing.T, store lfu.Store[string, string]) error + +var testCases = []testCase{ + testSetGetDelete, + testEviction, + testConcurrent, + testMultipleSet, +} + +func TestCacheWithStore(t *testing.T, factory StoreFactory) { + for _, tc := range testCases { + funcName := runtime.FuncForPC(reflect.ValueOf(tc).Pointer()).Name() + funcNameParts := strings.Split(funcName, "/") + testName := funcNameParts[len(funcNameParts)-1] + func(tc testCase) { + t.Run(testName, func(t *testing.T) { + t.Parallel() + + store := factory(testName) + + if err := tc(t, store); err != nil { + t.Fatalf("%+v", errors.WithStack(err)) + } + }) + }(tc) + } +} diff --git a/pkg/storage/driver/cache/lfu/testsuite/test_concurrent.go b/pkg/storage/driver/cache/lfu/testsuite/test_concurrent.go new file mode 100644 index 0000000..98f9fb0 --- /dev/null +++ b/pkg/storage/driver/cache/lfu/testsuite/test_concurrent.go @@ -0,0 +1,67 @@ +package testsuite + +import ( + "fmt" + "sync" + "testing" + + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu" + "github.com/pkg/errors" +) + +func testConcurrent(t *testing.T, store lfu.Store[string, string]) error { + const value = "foobar" + totalKeys := 25 + totalSize := len(value) * totalKeys + capacity := totalSize / 2 + + cache := lfu.New[string, string](store, + lfu.WithCapacity[string, string](capacity), + lfu.WithLog[string, string](t.Logf), + ) + + var wg sync.WaitGroup + + wg.Add(totalKeys) + + loops := totalKeys * 10 + + for i := 0; i < totalKeys; i++ { + key := fmt.Sprintf("key%d", i) + func(key string) { + go func() { + defer wg.Done() + for i := 0; i < loops; i++ { + if err := cache.Set(key, value); err != nil { + t.Errorf("%+v", errors.WithStack(err)) + } + } + }() + }(key) + } + + wg.Wait() + + t.Logf("cache before final evict [capacity: %d, size: %d, len: %d]", cache.Capacity(), cache.Size(), cache.Len()) + + if err := cache.Evict(); err != nil { + t.Errorf("%+v", errors.WithStack(err)) + } + + t.Logf("cache after final evict [capacity: %d, size: %d, len: %d]", cache.Capacity(), cache.Size(), cache.Len()) + + expectedLength := capacity / len(value) + if e, g := expectedLength, cache.Len(); e < g { + t.Errorf("cache.Len(): expected <= %d, got %d", e, g) + } + + if cache.Size() > capacity { + t.Errorf("cache.Size(): expected <= %d, got %d", capacity, cache.Size()) + } + + if e, g := expectedLength*len(value), cache.Size(); e < g { + t.Errorf("cache.Size(): expected <= %d, got %d", e, g) + } + + return nil +} diff --git a/pkg/storage/driver/cache/lfu/testsuite/test_eviction.go b/pkg/storage/driver/cache/lfu/testsuite/test_eviction.go new file mode 100644 index 0000000..fe1d83a --- /dev/null +++ b/pkg/storage/driver/cache/lfu/testsuite/test_eviction.go @@ -0,0 +1,56 @@ +package testsuite + +import ( + "testing" + + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu" + "github.com/pkg/errors" +) + +func testEviction(t *testing.T, store lfu.Store[string, string]) error { + cache := lfu.New[string, string](store, + lfu.WithCapacity[string, string](10), + lfu.WithLog[string, string](t.Logf), + ) + + if err := cache.Set("key1", "key1"); err != nil { + return errors.WithStack(err) + } + + if err := cache.Set("key2", "key2"); err != nil { + return errors.WithStack(err) + } + + // Increment frequency of key2 + if _, err := cache.Get("key2"); err != nil { + return errors.WithStack(err) + } + + if e, g := 8, cache.Size(); e != g { + t.Errorf("cache.Size(): expected '%v', got '%v'", e, g) + } + + if err := cache.Set("key3", "key3"); err != nil { + return errors.WithStack(err) + } + + if err := cache.Set("key3", "key3"); err != nil { + return errors.WithStack(err) + } + + t.Logf("cache [capacity: %d, size: %d, len: %d]", cache.Capacity(), cache.Size(), cache.Len()) + + if e, g := cache.Capacity(), cache.Size(); e < g { + t.Errorf("cache.Size(): expected <= %d, got %d", e, g) + } + + if e, g := 2, cache.Len(); e != g { + t.Errorf("cache.Len(): expected %d, got %d", e, g) + } + + if cache.Size() < 0 { + t.Errorf("cache.Size(): expected value >= 0, got %d", cache.Size()) + } + + return nil +} diff --git a/pkg/storage/driver/cache/lfu/testsuite/test_multiple_set.go b/pkg/storage/driver/cache/lfu/testsuite/test_multiple_set.go new file mode 100644 index 0000000..d8d97de --- /dev/null +++ b/pkg/storage/driver/cache/lfu/testsuite/test_multiple_set.go @@ -0,0 +1,80 @@ +package testsuite + +import ( + "testing" + + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu" + "github.com/pkg/errors" +) + +func testMultipleSet(t *testing.T, store lfu.Store[string, string]) error { + const ( + key = "mykey" + firstValue = "foo" + secondValue = "bar" + thirdValue = "foobar" + ) + + cache := lfu.New[string, string](store) + + if e, g := 0, cache.Size(); e != g { + t.Errorf("cache.Size(): expected '%v', got '%v'", e, g) + } + + if err := cache.Set(key, firstValue); err != nil { + return errors.WithStack(err) + } + + if e, g := len(firstValue), cache.Size(); e != g { + t.Errorf("cache.Size(): expected '%v', got '%v'", e, g) + } + + retrieved, err := cache.Get(key) + if err != nil { + return errors.WithStack(err) + } + + if e, g := firstValue, retrieved; e != g { + t.Errorf("cache.Get(key): expected '%v', got '%v'", e, g) + } + + if err := cache.Set(key, secondValue); err != nil { + return errors.WithStack(err) + } + + if e, g := len(secondValue), cache.Size(); e != g { + t.Errorf("cache.Size(): expected '%v', got '%v'", e, g) + } + + retrieved, err = cache.Get(key) + if err != nil { + return errors.WithStack(err) + } + + if e, g := secondValue, retrieved; e != g { + t.Errorf("cache.Get(key): expected '%v', got '%v'", e, g) + } + + if err := cache.Set(key, thirdValue); err != nil { + return errors.WithStack(err) + } + + if e, g := len(thirdValue), cache.Size(); e != g { + t.Errorf("cache.Size(): expected '%v', got '%v'", e, g) + } + + retrieved, err = cache.Get(key) + if err != nil { + return errors.WithStack(err) + } + + if e, g := thirdValue, retrieved; e != g { + t.Errorf("cache.Get(key): expected '%v', got '%v'", e, g) + } + + if e, g := len(thirdValue), cache.Size(); e != g { + t.Errorf("cache.Size(): expected '%v', got '%v'", e, g) + } + + return nil +} diff --git a/pkg/storage/driver/cache/lfu/testsuite/test_set_get_delete.go b/pkg/storage/driver/cache/lfu/testsuite/test_set_get_delete.go new file mode 100644 index 0000000..582eccd --- /dev/null +++ b/pkg/storage/driver/cache/lfu/testsuite/test_set_get_delete.go @@ -0,0 +1,66 @@ +package testsuite + +import ( + "testing" + + "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu" + "github.com/pkg/errors" +) + +func testSetGetDelete(t *testing.T, store lfu.Store[string, string]) error { + const ( + key = "mykey" + value = "foobar" + ) + + cache := lfu.New[string, string](store, lfu.WithCapacity[string, string](10)) + + if e, g := 0, cache.Size(); e != g { + t.Errorf("cache.Size(): expected '%v', got '%v'", e, g) + } + + if err := cache.Set(key, value); err != nil { + return errors.WithStack(err) + } + + if e, g := len(value), cache.Size(); e != g { + t.Errorf("cache.Size(): expected '%v', got '%v'", e, g) + } + + if e, g := 1, cache.Len(); e != g { + t.Errorf("cache.Len(): expected '%v', got '%v'", e, g) + } + + retrieved, err := cache.Get(key) + if err != nil { + return errors.WithStack(err) + } + + if e, g := value, retrieved; e != g { + t.Errorf("cache.Get(key): expected '%v', got '%v'", e, g) + } + + if err := cache.Delete(key); err != nil { + return errors.WithStack(err) + } + + if _, err := cache.Get(key); err == nil || !errors.Is(err, lfu.ErrNotFound) { + t.Errorf("cache.Get(key): err should be lfu.ErrNotFound, got '%v'", errors.WithStack(err)) + } + + if e, g := value, retrieved; e != g { + t.Errorf("cache.Get(key): expected '%v', got '%v'", e, g) + } + + if e, g := 0, cache.Size(); e != g { + t.Errorf("cache.Size(): expected '%v', got '%v'", e, g) + } + + if e, g := 0, cache.Len(); e != g { + t.Errorf("cache.Len(): expected '%v', got '%v'", e, g) + } + + t.Logf("cache [capacity: %d, size: %d, len: %d]", cache.Capacity(), cache.Size(), cache.Len()) + + return nil +}