feat(storage): rpc driver client pooling and memory-constrained cache
All checks were successful
arcad/edge/pipeline/head This commit looks good

driver

ref #20
This commit is contained in:
2023-11-29 11:10:29 +01:00
parent 02c74b6f8d
commit f4a7366aad
10 changed files with 217 additions and 97 deletions

View File

@ -6,14 +6,14 @@ import (
"io"
"forge.cadoles.com/arcad/edge/pkg/storage"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/allegro/bigcache/v3"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
type BlobBucket struct {
bucket storage.BlobBucket
cache *expirable.LRU[string, []byte]
cache *bigcache.BigCache
}
// Close implements storage.BlobBucket.
@ -62,7 +62,7 @@ func (b *BlobBucket) Name() string {
// NewReader implements storage.BlobBucket.
func (b *BlobBucket) NewReader(ctx context.Context, id storage.BlobID) (io.ReadSeekCloser, error) {
if cached, exist := b.inCache(id); exist {
logger.Debug(ctx, "found blob in cache", logger.F("cacheKey", b.getCacheKey(id)))
logger.Debug(ctx, "found blob in cache", logger.F("cacheKey", b.getCacheKey(id)), logger.F("cacheStats", b.cache.Stats()))
return cached, nil
}
@ -84,8 +84,14 @@ func (b *BlobBucket) getCacheKey(id storage.BlobID) string {
func (b *BlobBucket) inCache(id storage.BlobID) (io.ReadSeekCloser, bool) {
key := b.getCacheKey(id)
data, exist := b.cache.Get(key)
if !exist {
data, err := b.cache.Get(key)
if err != nil {
if errors.Is(err, bigcache.ErrEntryNotFound) {
return nil, false
}
logger.Error(context.Background(), "could not retrieve cache value", logger.CapturedE(errors.WithStack(err)))
return nil, false
}

View File

@ -2,16 +2,15 @@ package cache
import (
"context"
"time"
"forge.cadoles.com/arcad/edge/pkg/storage"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/allegro/bigcache/v3"
"github.com/pkg/errors"
)
type BlobStore struct {
store storage.BlobStore
cache *expirable.LRU[string, []byte]
cache *bigcache.BigCache
}
// DeleteBucket implements storage.BlobStore.
@ -46,10 +45,10 @@ func (s *BlobStore) OpenBucket(ctx context.Context, name string) (storage.BlobBu
}, nil
}
func NewBlobStore(store storage.BlobStore, cacheSize int, cacheTTL time.Duration) *BlobStore {
func NewBlobStore(store storage.BlobStore, cache *bigcache.BigCache) *BlobStore {
return &BlobStore{
store: store,
cache: expirable.NewLRU[string, []byte](cacheSize, nil, cacheTTL),
cache: cache,
}
}

View File

@ -9,6 +9,7 @@ import (
"forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite"
"forge.cadoles.com/arcad/edge/pkg/storage/testsuite"
"github.com/allegro/bigcache/v3"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
@ -28,7 +29,13 @@ func TestBlobStore(t *testing.T) {
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
backend := sqlite.NewBlobStore(dsn)
store := NewBlobStore(backend, 32, time.Second*1)
cache, err := bigcache.New(context.Background(), bigcache.DefaultConfig(time.Minute))
if err != nil {
t.Fatalf("%+v", errors.WithStack(err))
}
store := NewBlobStore(backend, cache)
testsuite.TestBlobStore(context.Background(), t, store)
}
@ -44,7 +51,13 @@ func BenchmarkBlobStore(t *testing.B) {
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
backend := sqlite.NewBlobStore(dsn)
store := NewBlobStore(backend, 32, time.Minute)
cache, err := bigcache.New(context.Background(), bigcache.DefaultConfig(time.Minute))
if err != nil {
t.Fatalf("%+v", errors.WithStack(err))
}
store := NewBlobStore(backend, cache)
testsuite.BenchmarkBlobStore(t, store)
}

View File

@ -1,13 +1,17 @@
package cache
import (
"context"
"fmt"
"net/url"
"strconv"
"time"
"forge.cadoles.com/arcad/edge/pkg/storage"
"forge.cadoles.com/arcad/edge/pkg/storage/driver"
"github.com/allegro/bigcache/v3"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
func init() {
@ -17,30 +21,6 @@ func init() {
func blobStoreFactory(dsn *url.URL) (storage.BlobStore, error) {
query := dsn.Query()
rawCacheSize := query.Get("cacheSize")
if rawCacheSize == "" {
rawCacheSize = "128"
}
cacheSize, err := strconv.ParseInt(rawCacheSize, 10, 32)
if err != nil {
return nil, errors.Wrap(err, "could not parse cacheSize url parameter")
}
query.Del("cacheSize")
rawCacheTTL := query.Get("cacheTTL")
if rawCacheTTL == "" {
rawCacheTTL = "10m"
}
cacheTTL, err := time.ParseDuration(rawCacheTTL)
if err != nil {
return nil, errors.Wrap(err, "could not parse cacheTTL url parameter")
}
query.Del("cacheTTL")
rawDriver := query.Get("driver")
if rawDriver == "" {
return nil, errors.New("missing required url parameter 'driver'")
@ -48,6 +28,54 @@ func blobStoreFactory(dsn *url.URL) (storage.BlobStore, error) {
query.Del("driver")
cacheTTL := time.Minute * 60
rawCacheTTL := query.Get("cacheTTL")
if rawCacheTTL != "" {
query.Del("cacheTTL")
ttl, err := time.ParseDuration(rawCacheTTL)
if err != nil {
return nil, errors.Wrap(err, "could not parse url parameter 'cacheTTL'")
}
cacheTTL = ttl
}
cacheConfig := bigcache.DefaultConfig(cacheTTL)
cacheConfig.Logger = &cacheLogger{}
rawCacheShards := query.Get("cacheShards")
if rawCacheShards != "" {
query.Del("cacheShards")
cacheShards, err := strconv.ParseInt(rawCacheShards, 10, 32)
if err != nil {
return nil, errors.Wrap(err, "could not parse url parameter 'cacheShards'")
}
cacheConfig.Shards = int(cacheShards)
}
rawMaxCacheSize := query.Get("maxCacheSize")
if rawMaxCacheSize != "" {
query.Del("maxCacheSize")
maxCacheSize, err := strconv.ParseInt(rawMaxCacheSize, 10, 32)
if err != nil {
return nil, errors.Wrap(err, "could not parse url parameter 'maxCacheSize'")
}
// See cacheConfig.HardMaxCacheSize documentation
var minCacheSize int64 = (2 * (64 + 32) * int64(cacheConfig.Shards)) / 1000
if maxCacheSize < minCacheSize {
return nil, errors.Errorf("max cache size can not be set to a value below '%d'", minCacheSize)
}
cacheConfig.HardMaxCacheSize = int(maxCacheSize)
}
url := &url.URL{
Scheme: rawDriver,
Host: dsn.Host,
@ -60,5 +88,18 @@ func blobStoreFactory(dsn *url.URL) (storage.BlobStore, error) {
return nil, errors.WithStack(err)
}
return NewBlobStore(store, int(cacheSize), cacheTTL), nil
cache, err := bigcache.New(context.Background(), cacheConfig)
if err != nil {
return nil, errors.WithStack(err)
}
return NewBlobStore(store, cache), nil
}
type cacheLogger struct{}
func (l *cacheLogger) Printf(format string, v ...interface{}) {
logger.Debug(context.Background(), fmt.Sprintf(format, v...))
}
var _ bigcache.Logger = &cacheLogger{}

View File

@ -1,21 +1,19 @@
package cache
import (
"bytes"
"context"
"fmt"
"io"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/allegro/bigcache/v3"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
type readCacher struct {
reader io.ReadSeekCloser
cache *expirable.LRU[string, []byte]
cache *bigcache.BigCache
key string
buffer bytes.Buffer
}
// Close implements io.ReadSeekCloser.
@ -32,16 +30,6 @@ func (r *readCacher) Read(p []byte) (n int, err error) {
length, err := r.reader.Read(p)
if err != nil {
if err == io.EOF {
if length > 0 {
if _, err := r.buffer.Write(p[:length]); err != nil {
logger.Error(context.Background(), "could not write to buffer", logger.CapturedE(errors.WithStack(err)))
return length, io.EOF
}
}
logger.Debug(context.Background(), "caching blob", logger.F("cacheKey", r.key))
r.cache.Add(r.key, r.buffer.Bytes())
return length, io.EOF
}
@ -49,8 +37,13 @@ func (r *readCacher) Read(p []byte) (n int, err error) {
}
if length > 0 {
if _, err := r.buffer.Write(p[:length]); err != nil {
logger.Error(context.Background(), "could not write to buffer", logger.CapturedE(errors.WithStack(err)))
if err := r.cache.Append(r.key, p[:length]); err != nil {
ctx := logger.With(context.Background(), logger.F("cacheKey", r.key))
logger.Error(ctx, "could not write to buffer", logger.CapturedE(errors.WithStack(err)))
if err := r.cache.Delete(r.key); err != nil {
logger.Error(ctx, "could not delete cache key", logger.CapturedE(errors.WithStack(err)))
}
}
}

View File

@ -5,7 +5,6 @@ import (
"net/url"
"github.com/keegancsmith/rpc"
"gitlab.com/wpetit/goweb/logger"
"forge.cadoles.com/arcad/edge/pkg/storage"
"forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server/blob"
@ -13,7 +12,7 @@ import (
)
type BlobStore struct {
serverURL *url.URL
withClient WithClientFunc
}
// DeleteBucket implements storage.BlobStore.
@ -75,27 +74,11 @@ func (s *BlobStore) call(ctx context.Context, serviceMethod string, args any, re
return nil
}
func (s *BlobStore) withClient(ctx context.Context, fn func(ctx context.Context, client *rpc.Client) error) error {
client, err := rpc.DialHTTPPath("tcp", s.serverURL.Host, s.serverURL.Path+"?"+s.serverURL.RawQuery)
if err != nil {
return errors.WithStack(err)
}
defer func() {
if err := client.Close(); err != nil {
logger.Error(ctx, "could not close rpc client", logger.CapturedE(errors.WithStack(err)))
}
}()
if err := fn(ctx, client); err != nil {
return errors.WithStack(err)
}
return nil
}
func NewBlobStore(serverURL *url.URL) *BlobStore {
return &BlobStore{serverURL}
withClient := WithPooledClient(serverURL)
return &BlobStore{
withClient: withClient,
}
}
var _ storage.BlobStore = &BlobStore{}

View File

@ -0,0 +1,94 @@
package client
import (
"context"
"net/url"
"strconv"
"sync"
"github.com/jackc/puddle/v2"
"github.com/keegancsmith/rpc"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
func NewClientPool(serverURL *url.URL, poolSize int) (*puddle.Pool[*rpc.Client], error) {
constructor := func(context.Context) (*rpc.Client, error) {
client, err := rpc.DialHTTPPath("tcp", serverURL.Host, serverURL.Path+"?"+serverURL.RawQuery)
if err != nil {
return nil, errors.WithStack(err)
}
return client, nil
}
destructor := func(client *rpc.Client) {
if err := client.Close(); err != nil {
logger.Error(context.Background(), "could not close client", logger.CapturedE(errors.WithStack(err)))
}
}
maxPoolSize := int32(poolSize)
pool, err := puddle.NewPool(&puddle.Config[*rpc.Client]{Constructor: constructor, Destructor: destructor, MaxSize: maxPoolSize})
if err != nil {
return nil, errors.WithStack(err)
}
return pool, nil
}
type WithClientFunc func(ctx context.Context, fn func(ctx context.Context, client *rpc.Client) error) error
func WithPooledClient(serverURL *url.URL) WithClientFunc {
var (
pool *puddle.Pool[*rpc.Client]
createPool sync.Once
)
return func(ctx context.Context, fn func(context.Context, *rpc.Client) error) error {
var err error
createPool.Do(func() {
rawPoolSize := serverURL.Query().Get("clientPoolSize")
if rawPoolSize == "" {
rawPoolSize = "5"
}
var poolSize int64
poolSize, err = strconv.ParseInt(rawPoolSize, 10, 32)
if err != nil {
err = errors.Wrap(err, "could not parse clientPoolSize url query parameter")
return
}
pool, err = NewClientPool(serverURL, int(poolSize))
if err != nil {
err = errors.WithStack(err)
return
}
})
if err != nil {
return errors.WithStack(err)
}
clientResource, err := pool.Acquire(ctx)
if err != nil {
return errors.WithStack(err)
}
if err := fn(ctx, clientResource.Value()); err != nil {
if errors.Is(err, rpc.ErrShutdown) {
clientResource.Destroy()
}
return errors.WithStack(err)
}
clientResource.Release()
return nil
}
}

View File

@ -6,7 +6,6 @@ import (
"github.com/keegancsmith/rpc"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
"forge.cadoles.com/arcad/edge/pkg/storage"
"forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server/document"
@ -14,7 +13,7 @@ import (
)
type DocumentStore struct {
serverURL *url.URL
withClient WithClientFunc
}
// Delete implements storage.DocumentStore.
@ -108,27 +107,12 @@ func (s *DocumentStore) call(ctx context.Context, serviceMethod string, args any
return nil
}
func (s *DocumentStore) withClient(ctx context.Context, fn func(ctx context.Context, client *rpc.Client) error) error {
client, err := rpc.DialHTTPPath("tcp", s.serverURL.Host, s.serverURL.Path+"?"+s.serverURL.RawQuery)
if err != nil {
return errors.WithStack(err)
func NewDocumentStore(serverURL *url.URL) *DocumentStore {
withClient := WithPooledClient(serverURL)
return &DocumentStore{
withClient: withClient,
}
defer func() {
if err := client.Close(); err != nil {
logger.Error(ctx, "could not close rpc client", logger.CapturedE(errors.WithStack(err)))
}
}()
if err := fn(ctx, client); err != nil {
return errors.WithStack(err)
}
return nil
}
func NewDocumentStore(url *url.URL) *DocumentStore {
return &DocumentStore{url}
}
var _ storage.DocumentStore = &DocumentStore{}