From f4a7366aad19e327583f8ed4ea7abdfbcd522ea5 Mon Sep 17 00:00:00 2001 From: William Petit Date: Wed, 29 Nov 2023 11:10:29 +0100 Subject: [PATCH] feat(storage): rpc driver client pooling and memory-constrained cache driver ref https://forge.cadoles.com/arcad/edge/issues/20 --- go.mod | 3 + go.sum | 4 + pkg/storage/driver/cache/blob_bucket.go | 16 +++- pkg/storage/driver/cache/blob_store.go | 9 +- pkg/storage/driver/cache/blob_store_test.go | 17 +++- pkg/storage/driver/cache/driver.go | 91 +++++++++++++----- pkg/storage/driver/cache/reader.go | 25 ++--- pkg/storage/driver/rpc/client/blob_store.go | 27 +----- pkg/storage/driver/rpc/client/client_pool.go | 94 +++++++++++++++++++ .../driver/rpc/client/document_store.go | 28 ++---- 10 files changed, 217 insertions(+), 97 deletions(-) create mode 100644 pkg/storage/driver/rpc/client/client_pool.go diff --git a/go.mod b/go.mod index c01ed51..143ff60 100644 --- a/go.mod +++ b/go.mod @@ -15,11 +15,13 @@ require ( require ( cloud.google.com/go v0.75.0 // indirect + github.com/allegro/bigcache/v3 v3.1.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect github.com/go-playground/locales v0.14.0 // indirect github.com/go-playground/universal-translator v0.18.0 // indirect github.com/goccy/go-json v0.9.11 // indirect github.com/gogo/protobuf v0.0.0-20161014173244-50d1bd39ce4e // indirect + github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/leodido/go-urn v1.2.1 // indirect github.com/lestrrat-go/blackmagic v1.0.1 // indirect github.com/lestrrat-go/httpcc v1.0.1 // indirect @@ -27,6 +29,7 @@ require ( github.com/lestrrat-go/iter v1.0.2 // indirect github.com/lestrrat-go/option v1.0.0 // indirect github.com/miekg/dns v1.1.53 // indirect + golang.org/x/sync v0.1.0 // indirect google.golang.org/genproto v0.0.0-20210226172003-ab064af71705 // indirect google.golang.org/grpc v1.35.0 // indirect gopkg.in/go-playground/validator.v9 v9.29.1 // indirect diff --git a/go.sum b/go.sum index 3ee3437..a4f017c 100644 --- a/go.sum +++ b/go.sum @@ -53,6 +53,8 @@ github.com/alecthomas/kong v0.2.1-0.20190708041108-0548c6b1afae/go.mod h1:+inYUS github.com/alecthomas/kong-hcl v0.1.8-0.20190615233001-b21fea9723c8/go.mod h1:MRgZdU3vrFd05IQ89AxUZ0aYdF39BYoNFa324SodPCA= github.com/alecthomas/repr v0.0.0-20180818092828-117648cd9897 h1:p9Sln00KOTlrYkxI1zYWl1QLnEqAqEARBEYa8FQnQcY= github.com/alecthomas/repr v0.0.0-20180818092828-117648cd9897/go.mod h1:xTS7Pm1pD1mvyM075QCDSRqH6qRLXylzS24ZTpRiSzQ= +github.com/allegro/bigcache/v3 v3.1.0 h1:H2Vp8VOvxcrB91o86fUSVJFqeuz8kpyyB02eH3bSzwk= +github.com/allegro/bigcache/v3 v3.1.0/go.mod h1:aPyh7jEvrog9zAwx5N7+JUQX5dZTSGpxF1LAR4dr35I= github.com/barnybug/go-cast v0.0.0-20201201064555-a87ccbc26692 h1:JW4WZlqyaNWUUahfr7MigeDW6jmtam5cTzzo1lwsFhE= github.com/barnybug/go-cast v0.0.0-20201201064555-a87ccbc26692/go.mod h1:Au0ipPuCBA7zsOC61SnyrYetm8VT3vo1UJtwHeYke44= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -206,6 +208,8 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1: github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/igm/sockjs-go/v3 v3.0.2 h1:2m0k53w0DBiGozeQUIEPR6snZFmpFpYvVsGnfLPNXbE= github.com/igm/sockjs-go/v3 v3.0.2/go.mod h1:UqchsOjeagIBFHvd+RZpLaVRbCwGilEC08EDHsD1jYE= +github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= +github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= diff --git a/pkg/storage/driver/cache/blob_bucket.go b/pkg/storage/driver/cache/blob_bucket.go index 3445279..14a3acf 100644 --- a/pkg/storage/driver/cache/blob_bucket.go +++ b/pkg/storage/driver/cache/blob_bucket.go @@ -6,14 +6,14 @@ import ( "io" "forge.cadoles.com/arcad/edge/pkg/storage" - "github.com/hashicorp/golang-lru/v2/expirable" + "github.com/allegro/bigcache/v3" "github.com/pkg/errors" "gitlab.com/wpetit/goweb/logger" ) type BlobBucket struct { bucket storage.BlobBucket - cache *expirable.LRU[string, []byte] + cache *bigcache.BigCache } // Close implements storage.BlobBucket. @@ -62,7 +62,7 @@ func (b *BlobBucket) Name() string { // NewReader implements storage.BlobBucket. func (b *BlobBucket) NewReader(ctx context.Context, id storage.BlobID) (io.ReadSeekCloser, error) { if cached, exist := b.inCache(id); exist { - logger.Debug(ctx, "found blob in cache", logger.F("cacheKey", b.getCacheKey(id))) + logger.Debug(ctx, "found blob in cache", logger.F("cacheKey", b.getCacheKey(id)), logger.F("cacheStats", b.cache.Stats())) return cached, nil } @@ -84,8 +84,14 @@ func (b *BlobBucket) getCacheKey(id storage.BlobID) string { func (b *BlobBucket) inCache(id storage.BlobID) (io.ReadSeekCloser, bool) { key := b.getCacheKey(id) - data, exist := b.cache.Get(key) - if !exist { + data, err := b.cache.Get(key) + if err != nil { + if errors.Is(err, bigcache.ErrEntryNotFound) { + return nil, false + } + + logger.Error(context.Background(), "could not retrieve cache value", logger.CapturedE(errors.WithStack(err))) + return nil, false } diff --git a/pkg/storage/driver/cache/blob_store.go b/pkg/storage/driver/cache/blob_store.go index 0f7c275..c296a7d 100644 --- a/pkg/storage/driver/cache/blob_store.go +++ b/pkg/storage/driver/cache/blob_store.go @@ -2,16 +2,15 @@ package cache import ( "context" - "time" "forge.cadoles.com/arcad/edge/pkg/storage" - "github.com/hashicorp/golang-lru/v2/expirable" + "github.com/allegro/bigcache/v3" "github.com/pkg/errors" ) type BlobStore struct { store storage.BlobStore - cache *expirable.LRU[string, []byte] + cache *bigcache.BigCache } // DeleteBucket implements storage.BlobStore. @@ -46,10 +45,10 @@ func (s *BlobStore) OpenBucket(ctx context.Context, name string) (storage.BlobBu }, nil } -func NewBlobStore(store storage.BlobStore, cacheSize int, cacheTTL time.Duration) *BlobStore { +func NewBlobStore(store storage.BlobStore, cache *bigcache.BigCache) *BlobStore { return &BlobStore{ store: store, - cache: expirable.NewLRU[string, []byte](cacheSize, nil, cacheTTL), + cache: cache, } } diff --git a/pkg/storage/driver/cache/blob_store_test.go b/pkg/storage/driver/cache/blob_store_test.go index b1cba21..8bb2f2c 100644 --- a/pkg/storage/driver/cache/blob_store_test.go +++ b/pkg/storage/driver/cache/blob_store_test.go @@ -9,6 +9,7 @@ import ( "forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite" "forge.cadoles.com/arcad/edge/pkg/storage/testsuite" + "github.com/allegro/bigcache/v3" "github.com/pkg/errors" "gitlab.com/wpetit/goweb/logger" ) @@ -28,7 +29,13 @@ func TestBlobStore(t *testing.T) { dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds()) backend := sqlite.NewBlobStore(dsn) - store := NewBlobStore(backend, 32, time.Second*1) + + cache, err := bigcache.New(context.Background(), bigcache.DefaultConfig(time.Minute)) + if err != nil { + t.Fatalf("%+v", errors.WithStack(err)) + } + + store := NewBlobStore(backend, cache) testsuite.TestBlobStore(context.Background(), t, store) } @@ -44,7 +51,13 @@ func BenchmarkBlobStore(t *testing.B) { dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds()) backend := sqlite.NewBlobStore(dsn) - store := NewBlobStore(backend, 32, time.Minute) + + cache, err := bigcache.New(context.Background(), bigcache.DefaultConfig(time.Minute)) + if err != nil { + t.Fatalf("%+v", errors.WithStack(err)) + } + + store := NewBlobStore(backend, cache) testsuite.BenchmarkBlobStore(t, store) } diff --git a/pkg/storage/driver/cache/driver.go b/pkg/storage/driver/cache/driver.go index c94ec27..9c60d7a 100644 --- a/pkg/storage/driver/cache/driver.go +++ b/pkg/storage/driver/cache/driver.go @@ -1,13 +1,17 @@ package cache import ( + "context" + "fmt" "net/url" "strconv" "time" "forge.cadoles.com/arcad/edge/pkg/storage" "forge.cadoles.com/arcad/edge/pkg/storage/driver" + "github.com/allegro/bigcache/v3" "github.com/pkg/errors" + "gitlab.com/wpetit/goweb/logger" ) func init() { @@ -17,30 +21,6 @@ func init() { func blobStoreFactory(dsn *url.URL) (storage.BlobStore, error) { query := dsn.Query() - rawCacheSize := query.Get("cacheSize") - if rawCacheSize == "" { - rawCacheSize = "128" - } - - cacheSize, err := strconv.ParseInt(rawCacheSize, 10, 32) - if err != nil { - return nil, errors.Wrap(err, "could not parse cacheSize url parameter") - } - - query.Del("cacheSize") - - rawCacheTTL := query.Get("cacheTTL") - if rawCacheTTL == "" { - rawCacheTTL = "10m" - } - - cacheTTL, err := time.ParseDuration(rawCacheTTL) - if err != nil { - return nil, errors.Wrap(err, "could not parse cacheTTL url parameter") - } - - query.Del("cacheTTL") - rawDriver := query.Get("driver") if rawDriver == "" { return nil, errors.New("missing required url parameter 'driver'") @@ -48,6 +28,54 @@ func blobStoreFactory(dsn *url.URL) (storage.BlobStore, error) { query.Del("driver") + cacheTTL := time.Minute * 60 + + rawCacheTTL := query.Get("cacheTTL") + if rawCacheTTL != "" { + query.Del("cacheTTL") + + ttl, err := time.ParseDuration(rawCacheTTL) + if err != nil { + return nil, errors.Wrap(err, "could not parse url parameter 'cacheTTL'") + } + + cacheTTL = ttl + } + + cacheConfig := bigcache.DefaultConfig(cacheTTL) + cacheConfig.Logger = &cacheLogger{} + + rawCacheShards := query.Get("cacheShards") + if rawCacheShards != "" { + query.Del("cacheShards") + + cacheShards, err := strconv.ParseInt(rawCacheShards, 10, 32) + if err != nil { + return nil, errors.Wrap(err, "could not parse url parameter 'cacheShards'") + } + + cacheConfig.Shards = int(cacheShards) + } + + rawMaxCacheSize := query.Get("maxCacheSize") + if rawMaxCacheSize != "" { + query.Del("maxCacheSize") + + maxCacheSize, err := strconv.ParseInt(rawMaxCacheSize, 10, 32) + if err != nil { + return nil, errors.Wrap(err, "could not parse url parameter 'maxCacheSize'") + } + + // See cacheConfig.HardMaxCacheSize documentation + var minCacheSize int64 = (2 * (64 + 32) * int64(cacheConfig.Shards)) / 1000 + + if maxCacheSize < minCacheSize { + return nil, errors.Errorf("max cache size can not be set to a value below '%d'", minCacheSize) + } + + cacheConfig.HardMaxCacheSize = int(maxCacheSize) + } + url := &url.URL{ Scheme: rawDriver, Host: dsn.Host, @@ -60,5 +88,18 @@ func blobStoreFactory(dsn *url.URL) (storage.BlobStore, error) { return nil, errors.WithStack(err) } - return NewBlobStore(store, int(cacheSize), cacheTTL), nil + cache, err := bigcache.New(context.Background(), cacheConfig) + if err != nil { + return nil, errors.WithStack(err) + } + + return NewBlobStore(store, cache), nil } + +type cacheLogger struct{} + +func (l *cacheLogger) Printf(format string, v ...interface{}) { + logger.Debug(context.Background(), fmt.Sprintf(format, v...)) +} + +var _ bigcache.Logger = &cacheLogger{} diff --git a/pkg/storage/driver/cache/reader.go b/pkg/storage/driver/cache/reader.go index 9b0117e..3f6c350 100644 --- a/pkg/storage/driver/cache/reader.go +++ b/pkg/storage/driver/cache/reader.go @@ -1,21 +1,19 @@ package cache import ( - "bytes" "context" "fmt" "io" - "github.com/hashicorp/golang-lru/v2/expirable" + "github.com/allegro/bigcache/v3" "github.com/pkg/errors" "gitlab.com/wpetit/goweb/logger" ) type readCacher struct { reader io.ReadSeekCloser - cache *expirable.LRU[string, []byte] + cache *bigcache.BigCache key string - buffer bytes.Buffer } // Close implements io.ReadSeekCloser. @@ -32,16 +30,6 @@ func (r *readCacher) Read(p []byte) (n int, err error) { length, err := r.reader.Read(p) if err != nil { if err == io.EOF { - if length > 0 { - if _, err := r.buffer.Write(p[:length]); err != nil { - logger.Error(context.Background(), "could not write to buffer", logger.CapturedE(errors.WithStack(err))) - return length, io.EOF - } - } - - logger.Debug(context.Background(), "caching blob", logger.F("cacheKey", r.key)) - r.cache.Add(r.key, r.buffer.Bytes()) - return length, io.EOF } @@ -49,8 +37,13 @@ func (r *readCacher) Read(p []byte) (n int, err error) { } if length > 0 { - if _, err := r.buffer.Write(p[:length]); err != nil { - logger.Error(context.Background(), "could not write to buffer", logger.CapturedE(errors.WithStack(err))) + if err := r.cache.Append(r.key, p[:length]); err != nil { + ctx := logger.With(context.Background(), logger.F("cacheKey", r.key)) + logger.Error(ctx, "could not write to buffer", logger.CapturedE(errors.WithStack(err))) + + if err := r.cache.Delete(r.key); err != nil { + logger.Error(ctx, "could not delete cache key", logger.CapturedE(errors.WithStack(err))) + } } } diff --git a/pkg/storage/driver/rpc/client/blob_store.go b/pkg/storage/driver/rpc/client/blob_store.go index 5a62aae..73292a8 100644 --- a/pkg/storage/driver/rpc/client/blob_store.go +++ b/pkg/storage/driver/rpc/client/blob_store.go @@ -5,7 +5,6 @@ import ( "net/url" "github.com/keegancsmith/rpc" - "gitlab.com/wpetit/goweb/logger" "forge.cadoles.com/arcad/edge/pkg/storage" "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server/blob" @@ -13,7 +12,7 @@ import ( ) type BlobStore struct { - serverURL *url.URL + withClient WithClientFunc } // DeleteBucket implements storage.BlobStore. @@ -75,27 +74,11 @@ func (s *BlobStore) call(ctx context.Context, serviceMethod string, args any, re return nil } -func (s *BlobStore) withClient(ctx context.Context, fn func(ctx context.Context, client *rpc.Client) error) error { - client, err := rpc.DialHTTPPath("tcp", s.serverURL.Host, s.serverURL.Path+"?"+s.serverURL.RawQuery) - if err != nil { - return errors.WithStack(err) - } - - defer func() { - if err := client.Close(); err != nil { - logger.Error(ctx, "could not close rpc client", logger.CapturedE(errors.WithStack(err))) - } - }() - - if err := fn(ctx, client); err != nil { - return errors.WithStack(err) - } - - return nil -} - func NewBlobStore(serverURL *url.URL) *BlobStore { - return &BlobStore{serverURL} + withClient := WithPooledClient(serverURL) + return &BlobStore{ + withClient: withClient, + } } var _ storage.BlobStore = &BlobStore{} diff --git a/pkg/storage/driver/rpc/client/client_pool.go b/pkg/storage/driver/rpc/client/client_pool.go new file mode 100644 index 0000000..7646d54 --- /dev/null +++ b/pkg/storage/driver/rpc/client/client_pool.go @@ -0,0 +1,94 @@ +package client + +import ( + "context" + "net/url" + "strconv" + "sync" + + "github.com/jackc/puddle/v2" + "github.com/keegancsmith/rpc" + "github.com/pkg/errors" + "gitlab.com/wpetit/goweb/logger" +) + +func NewClientPool(serverURL *url.URL, poolSize int) (*puddle.Pool[*rpc.Client], error) { + constructor := func(context.Context) (*rpc.Client, error) { + client, err := rpc.DialHTTPPath("tcp", serverURL.Host, serverURL.Path+"?"+serverURL.RawQuery) + if err != nil { + return nil, errors.WithStack(err) + } + + return client, nil + } + + destructor := func(client *rpc.Client) { + if err := client.Close(); err != nil { + logger.Error(context.Background(), "could not close client", logger.CapturedE(errors.WithStack(err))) + } + } + + maxPoolSize := int32(poolSize) + + pool, err := puddle.NewPool(&puddle.Config[*rpc.Client]{Constructor: constructor, Destructor: destructor, MaxSize: maxPoolSize}) + if err != nil { + return nil, errors.WithStack(err) + } + + return pool, nil +} + +type WithClientFunc func(ctx context.Context, fn func(ctx context.Context, client *rpc.Client) error) error + +func WithPooledClient(serverURL *url.URL) WithClientFunc { + var ( + pool *puddle.Pool[*rpc.Client] + createPool sync.Once + ) + + return func(ctx context.Context, fn func(context.Context, *rpc.Client) error) error { + var err error + createPool.Do(func() { + rawPoolSize := serverURL.Query().Get("clientPoolSize") + if rawPoolSize == "" { + rawPoolSize = "5" + } + + var poolSize int64 + + poolSize, err = strconv.ParseInt(rawPoolSize, 10, 32) + if err != nil { + err = errors.Wrap(err, "could not parse clientPoolSize url query parameter") + + return + } + + pool, err = NewClientPool(serverURL, int(poolSize)) + if err != nil { + err = errors.WithStack(err) + + return + } + }) + if err != nil { + return errors.WithStack(err) + } + + clientResource, err := pool.Acquire(ctx) + if err != nil { + return errors.WithStack(err) + } + + if err := fn(ctx, clientResource.Value()); err != nil { + if errors.Is(err, rpc.ErrShutdown) { + clientResource.Destroy() + } + + return errors.WithStack(err) + } + + clientResource.Release() + + return nil + } +} diff --git a/pkg/storage/driver/rpc/client/document_store.go b/pkg/storage/driver/rpc/client/document_store.go index b0aaee3..33a5a7b 100644 --- a/pkg/storage/driver/rpc/client/document_store.go +++ b/pkg/storage/driver/rpc/client/document_store.go @@ -6,7 +6,6 @@ import ( "github.com/keegancsmith/rpc" "github.com/pkg/errors" - "gitlab.com/wpetit/goweb/logger" "forge.cadoles.com/arcad/edge/pkg/storage" "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server/document" @@ -14,7 +13,7 @@ import ( ) type DocumentStore struct { - serverURL *url.URL + withClient WithClientFunc } // Delete implements storage.DocumentStore. @@ -108,27 +107,12 @@ func (s *DocumentStore) call(ctx context.Context, serviceMethod string, args any return nil } -func (s *DocumentStore) withClient(ctx context.Context, fn func(ctx context.Context, client *rpc.Client) error) error { - client, err := rpc.DialHTTPPath("tcp", s.serverURL.Host, s.serverURL.Path+"?"+s.serverURL.RawQuery) - if err != nil { - return errors.WithStack(err) +func NewDocumentStore(serverURL *url.URL) *DocumentStore { + withClient := WithPooledClient(serverURL) + + return &DocumentStore{ + withClient: withClient, } - - defer func() { - if err := client.Close(); err != nil { - logger.Error(ctx, "could not close rpc client", logger.CapturedE(errors.WithStack(err))) - } - }() - - if err := fn(ctx, client); err != nil { - return errors.WithStack(err) - } - - return nil -} - -func NewDocumentStore(url *url.URL) *DocumentStore { - return &DocumentStore{url} } var _ storage.DocumentStore = &DocumentStore{}