feat(blobstore): add cache driver

This commit is contained in:
2023-10-24 22:52:33 +02:00
parent 2fc590d708
commit 6a99409a15
13 changed files with 572 additions and 10 deletions

View File

@ -23,7 +23,7 @@ func NewBlobStore(dsn string) (storage.BlobStore, error) {
factory, exists := blobStoreFactories[url.Scheme]
if !exists {
return nil, errors.WithStack(ErrSchemeNotRegistered)
return nil, errors.Wrapf(ErrSchemeNotRegistered, "no driver associated with scheme '%s'", url.Scheme)
}
store, err := factory(url)

115
pkg/storage/driver/cache/blob_bucket.go vendored Normal file
View File

@ -0,0 +1,115 @@
package cache
import (
"context"
"fmt"
"io"
"forge.cadoles.com/arcad/edge/pkg/storage"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
type BlobBucket struct {
bucket storage.BlobBucket
cache *expirable.LRU[string, []byte]
}
// Close implements storage.BlobBucket.
func (b *BlobBucket) Close() error {
if err := b.bucket.Close(); err != nil {
return errors.WithStack(err)
}
return nil
}
// Delete implements storage.BlobBucket.
func (b *BlobBucket) Delete(ctx context.Context, id storage.BlobID) error {
if err := b.bucket.Delete(ctx, id); err != nil {
return errors.WithStack(err)
}
return nil
}
// Get implements storage.BlobBucket.
func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobInfo, error) {
info, err := b.bucket.Get(ctx, id)
if err != nil {
return nil, errors.WithStack(err)
}
return info, nil
}
// List implements storage.BlobBucket.
func (b *BlobBucket) List(ctx context.Context) ([]storage.BlobInfo, error) {
infos, err := b.bucket.List(ctx)
if err != nil {
return nil, errors.WithStack(err)
}
return infos, nil
}
// Name implements storage.BlobBucket.
func (b *BlobBucket) Name() string {
return b.bucket.Name()
}
// NewReader implements storage.BlobBucket.
func (b *BlobBucket) NewReader(ctx context.Context, id storage.BlobID) (io.ReadSeekCloser, error) {
if cached, exist := b.inCache(id); exist {
logger.Debug(ctx, "found blob in cache", logger.F("cacheKey", b.getCacheKey(id)))
return cached, nil
}
reader, err := b.bucket.NewReader(ctx, id)
if err != nil {
return nil, errors.WithStack(err)
}
return &readCacher{
reader: reader,
cache: b.cache,
key: b.getCacheKey(id),
}, nil
}
func (b *BlobBucket) getCacheKey(id storage.BlobID) string {
return fmt.Sprintf("%s-%s", b.Name(), id)
}
func (b *BlobBucket) inCache(id storage.BlobID) (io.ReadSeekCloser, bool) {
key := b.getCacheKey(id)
data, exist := b.cache.Get(key)
if !exist {
return nil, false
}
return &cachedReader{data, 0}, true
}
// NewWriter implements storage.BlobBucket.
func (b *BlobBucket) NewWriter(ctx context.Context, id storage.BlobID) (io.WriteCloser, error) {
writer, err := b.bucket.NewWriter(ctx, id)
if err != nil {
return nil, errors.WithStack(err)
}
return writer, nil
}
// Size implements storage.BlobBucket.
func (b *BlobBucket) Size(ctx context.Context) (int64, error) {
size, err := b.bucket.Size(ctx)
if err != nil {
return 0, errors.WithStack(err)
}
return size, nil
}
var _ storage.BlobBucket = &BlobBucket{}

56
pkg/storage/driver/cache/blob_store.go vendored Normal file
View File

@ -0,0 +1,56 @@
package cache
import (
"context"
"time"
"forge.cadoles.com/arcad/edge/pkg/storage"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/pkg/errors"
)
type BlobStore struct {
store storage.BlobStore
cache *expirable.LRU[string, []byte]
}
// DeleteBucket implements storage.BlobStore.
func (s *BlobStore) DeleteBucket(ctx context.Context, name string) error {
if err := s.store.DeleteBucket(ctx, name); err != nil {
return errors.WithStack(err)
}
return nil
}
// ListBuckets implements storage.BlobStore.
func (s *BlobStore) ListBuckets(ctx context.Context) ([]string, error) {
buckets, err := s.store.ListBuckets(ctx)
if err != nil {
return nil, errors.WithStack(err)
}
return buckets, nil
}
// OpenBucket implements storage.BlobStore.
func (s *BlobStore) OpenBucket(ctx context.Context, name string) (storage.BlobBucket, error) {
bucket, err := s.store.OpenBucket(ctx, name)
if err != nil {
return nil, errors.WithStack(err)
}
return &BlobBucket{
bucket: bucket,
cache: s.cache,
}, nil
}
func NewBlobStore(store storage.BlobStore, cacheSize int, cacheTTL time.Duration) *BlobStore {
return &BlobStore{
store: store,
cache: expirable.NewLRU[string, []byte](cacheSize, nil, cacheTTL),
}
}
var _ storage.BlobStore = &BlobStore{}

View File

@ -0,0 +1,50 @@
package cache
import (
"context"
"fmt"
"os"
"testing"
"time"
"forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite"
"forge.cadoles.com/arcad/edge/pkg/storage/testsuite"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
func TestBlobStore(t *testing.T) {
t.Parallel()
if testing.Verbose() {
logger.SetLevel(logger.LevelDebug)
}
file := "./testdata/blobstore_test.sqlite"
if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) {
t.Fatalf("%+v", errors.WithStack(err))
}
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
backend := sqlite.NewBlobStore(dsn)
store := NewBlobStore(backend, 32, time.Second*1)
testsuite.TestBlobStore(context.Background(), t, store)
}
func BenchmarkBlobStore(t *testing.B) {
logger.SetLevel(logger.LevelError)
file := "./testdata/blobstore_test.sqlite"
if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) {
t.Fatalf("%+v", errors.WithStack(err))
}
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
backend := sqlite.NewBlobStore(dsn)
store := NewBlobStore(backend, 32, time.Minute)
testsuite.BenchmarkBlobStore(t, store)
}

64
pkg/storage/driver/cache/driver.go vendored Normal file
View File

@ -0,0 +1,64 @@
package cache
import (
"net/url"
"strconv"
"time"
"forge.cadoles.com/arcad/edge/pkg/storage"
"forge.cadoles.com/arcad/edge/pkg/storage/driver"
"github.com/pkg/errors"
)
func init() {
driver.RegisterBlobStoreFactory("cache", blobStoreFactory)
}
func blobStoreFactory(dsn *url.URL) (storage.BlobStore, error) {
query := dsn.Query()
rawCacheSize := query.Get("cacheSize")
if rawCacheSize == "" {
rawCacheSize = "128"
}
cacheSize, err := strconv.ParseInt(rawCacheSize, 10, 32)
if err != nil {
return nil, errors.Wrap(err, "could not parse cacheSize url parameter")
}
query.Del("cacheSize")
rawCacheTTL := query.Get("cacheTTL")
if rawCacheTTL == "" {
rawCacheTTL = "10m"
}
cacheTTL, err := time.ParseDuration(rawCacheTTL)
if err != nil {
return nil, errors.Wrap(err, "could not parse cacheTTL url parameter")
}
query.Del("cacheTTL")
rawDriver := query.Get("driver")
if rawDriver == "" {
return nil, errors.New("missing required url parameter 'driver'")
}
query.Del("driver")
url := &url.URL{
Scheme: rawDriver,
Host: dsn.Host,
Path: dsn.Path,
RawQuery: query.Encode(),
}
store, err := driver.NewBlobStore(url.String())
if err != nil {
return nil, errors.WithStack(err)
}
return NewBlobStore(store, int(cacheSize), cacheTTL), nil
}

124
pkg/storage/driver/cache/reader.go vendored Normal file
View File

@ -0,0 +1,124 @@
package cache
import (
"bytes"
"context"
"fmt"
"io"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
type readCacher struct {
reader io.ReadSeekCloser
cache *expirable.LRU[string, []byte]
key string
buffer bytes.Buffer
}
// Close implements io.ReadSeekCloser.
func (r *readCacher) Close() error {
if err := r.reader.Close(); err != nil {
return errors.WithStack(err)
}
return nil
}
// Read implements io.ReadSeekCloser.
func (r *readCacher) Read(p []byte) (n int, err error) {
length, err := r.reader.Read(p)
if err != nil {
if err == io.EOF {
if length > 0 {
if _, err := r.buffer.Write(p[:length]); err != nil {
logger.Error(context.Background(), "could not write to buffer", logger.CapturedE(errors.WithStack(err)))
return length, io.EOF
}
}
logger.Debug(context.Background(), "caching blob", logger.F("cacheKey", r.key))
r.cache.Add(r.key, r.buffer.Bytes())
return length, io.EOF
}
return length, errors.WithStack(err)
}
if length > 0 {
if _, err := r.buffer.Write(p[:length]); err != nil {
logger.Error(context.Background(), "could not write to buffer", logger.CapturedE(errors.WithStack(err)))
}
}
return length, nil
}
// Seek implements io.ReadSeekCloser.
func (r *readCacher) Seek(offset int64, whence int) (int64, error) {
length, err := r.reader.Seek(offset, whence)
if err != nil {
return length, errors.WithStack(err)
}
return length, nil
}
var _ io.ReadSeekCloser = &readCacher{}
type cachedReader struct {
buffer []byte
offset int64
}
// Read implements io.ReadSeekCloser.
func (r *cachedReader) Read(p []byte) (n int, err error) {
available := len(r.buffer) - int(r.offset)
if available == 0 {
return 0, io.EOF
}
size := len(p)
if size > available {
size = available
}
copy(p, r.buffer[r.offset:r.offset+int64(size)])
r.offset += int64(size)
return size, nil
}
// Close implements io.ReadSeekCloser.
func (r *cachedReader) Close() error {
return nil
}
// Seek implements io.ReadSeekCloser.
func (r *cachedReader) Seek(offset int64, whence int) (int64, error) {
var newOffset int64
switch whence {
case io.SeekStart:
newOffset = offset
case io.SeekCurrent:
newOffset = r.offset + offset
case io.SeekEnd:
newOffset = int64(len(r.buffer)) + offset
default:
return 0, errors.Errorf("unknown seek whence '%d'", whence)
}
if newOffset > int64(len(r.buffer)) || newOffset < 0 {
return 0, fmt.Errorf("invalid offset %d", offset)
}
r.offset = newOffset
return newOffset, nil
}
var _ io.ReadSeekCloser = &cachedReader{}

View File

@ -0,0 +1,2 @@
*
!.gitignore