diff --git a/.env.dist b/.env.dist index 4a9a61a..0d432c4 100644 --- a/.env.dist +++ b/.env.dist @@ -1,4 +1,4 @@ RUN_APP_ARGS="" #EDGE_DOCUMENTSTORE_DSN="rpc://localhost:3001/documentstore?tenant=local&appId=%APPID%" -#EDGE_BLOBSTORE_DSN="rpc://localhost:3001/blobstore?tenant=local&appId=%APPID%" +#EDGE_BLOBSTORE_DSN="cache://localhost:3001/blobstore?driver=rpc&tenant=local&appId=%APPID%" #EDGE_SHARESTORE_DSN="rpc://localhost:3001/sharestore?tenant=local" \ No newline at end of file diff --git a/cmd/cli/command/app/run.go b/cmd/cli/command/app/run.go index 0058c5e..a11343a 100644 --- a/cmd/cli/command/app/run.go +++ b/cmd/cli/command/app/run.go @@ -44,10 +44,13 @@ import ( _ "forge.cadoles.com/arcad/edge/pkg/module/auth/http/passwd/argon2id" _ "forge.cadoles.com/arcad/edge/pkg/module/auth/http/passwd/plain" - // Register storage drivers "forge.cadoles.com/arcad/edge/pkg/storage/driver" + + // Register storage drivers + _ "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache" _ "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc" _ "forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite" + "forge.cadoles.com/arcad/edge/pkg/storage/share" ) diff --git a/cmd/storage-server/command/run.go b/cmd/storage-server/command/run.go index d954f6e..b9f1785 100644 --- a/cmd/storage-server/command/run.go +++ b/cmd/storage-server/command/run.go @@ -22,13 +22,15 @@ import ( "github.com/urfave/cli/v2" // Register storage drivers + _ "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache" + _ "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc" + _ "forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite" + "forge.cadoles.com/arcad/edge/cmd/storage-server/command/flag" "forge.cadoles.com/arcad/edge/pkg/jwtutil" "forge.cadoles.com/arcad/edge/pkg/storage" "forge.cadoles.com/arcad/edge/pkg/storage/driver" - _ "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc" "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server" - _ "forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite" "forge.cadoles.com/arcad/edge/pkg/storage/share" ) @@ -51,17 +53,17 @@ func Run() *cli.Command { &cli.StringFlag{ Name: "blobstore-dsn-pattern", EnvVars: []string{"STORAGE_SERVER_BLOBSTORE_DSN_PATTERN"}, - Value: fmt.Sprintf("sqlite://data/%%TENANT%%/%%APPID%%/blobstore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", (60 * time.Second).Milliseconds()), + Value: fmt.Sprintf("sqlite://data/%%TENANT%%/%%APPID%%/blobstore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d&_pragma=journal_mode=wal", (60 * time.Second).Milliseconds()), }, &cli.StringFlag{ Name: "documentstore-dsn-pattern", EnvVars: []string{"STORAGE_SERVER_DOCUMENTSTORE_DSN_PATTERN"}, - Value: fmt.Sprintf("sqlite://data/%%TENANT%%/%%APPID%%/documentstore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", (60 * time.Second).Milliseconds()), + Value: fmt.Sprintf("sqlite://data/%%TENANT%%/%%APPID%%/documentstore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d&_pragma=journal_mode=wal", (60 * time.Second).Milliseconds()), }, &cli.StringFlag{ Name: "sharestore-dsn-pattern", EnvVars: []string{"STORAGE_SERVER_SHARESTORE_DSN_PATTERN"}, - Value: fmt.Sprintf("sqlite://data/%%TENANT%%/sharestore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", (60 * time.Second).Milliseconds()), + Value: fmt.Sprintf("sqlite://data/%%TENANT%%/sharestore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d&_pragma=journal_mode=wal", (60 * time.Second).Milliseconds()), }, &cli.StringFlag{ Name: "sentry-dsn", diff --git a/pkg/module/net/module.go b/pkg/module/net/module.go index 542b666..d26a6b3 100644 --- a/pkg/module/net/module.go +++ b/pkg/module/net/module.go @@ -115,7 +115,7 @@ func (m *Module) handleClientMessages() { case msg := <-clientMessages: clientMessage, ok := msg.(*module.ClientMessage) if !ok { - logger.Error( + logger.Warn( ctx, "unexpected message type", logger.F("message", msg), diff --git a/pkg/storage/driver/blob_store.go b/pkg/storage/driver/blob_store.go index c7d1df8..b87d880 100644 --- a/pkg/storage/driver/blob_store.go +++ b/pkg/storage/driver/blob_store.go @@ -23,7 +23,7 @@ func NewBlobStore(dsn string) (storage.BlobStore, error) { factory, exists := blobStoreFactories[url.Scheme] if !exists { - return nil, errors.WithStack(ErrSchemeNotRegistered) + return nil, errors.Wrapf(ErrSchemeNotRegistered, "no driver associated with scheme '%s'", url.Scheme) } store, err := factory(url) diff --git a/pkg/storage/driver/cache/blob_bucket.go b/pkg/storage/driver/cache/blob_bucket.go new file mode 100644 index 0000000..3445279 --- /dev/null +++ b/pkg/storage/driver/cache/blob_bucket.go @@ -0,0 +1,115 @@ +package cache + +import ( + "context" + "fmt" + "io" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "github.com/hashicorp/golang-lru/v2/expirable" + "github.com/pkg/errors" + "gitlab.com/wpetit/goweb/logger" +) + +type BlobBucket struct { + bucket storage.BlobBucket + cache *expirable.LRU[string, []byte] +} + +// Close implements storage.BlobBucket. +func (b *BlobBucket) Close() error { + if err := b.bucket.Close(); err != nil { + return errors.WithStack(err) + } + + return nil +} + +// Delete implements storage.BlobBucket. +func (b *BlobBucket) Delete(ctx context.Context, id storage.BlobID) error { + if err := b.bucket.Delete(ctx, id); err != nil { + return errors.WithStack(err) + } + + return nil +} + +// Get implements storage.BlobBucket. +func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobInfo, error) { + info, err := b.bucket.Get(ctx, id) + if err != nil { + return nil, errors.WithStack(err) + } + + return info, nil +} + +// List implements storage.BlobBucket. +func (b *BlobBucket) List(ctx context.Context) ([]storage.BlobInfo, error) { + infos, err := b.bucket.List(ctx) + if err != nil { + return nil, errors.WithStack(err) + } + + return infos, nil +} + +// Name implements storage.BlobBucket. +func (b *BlobBucket) Name() string { + return b.bucket.Name() +} + +// NewReader implements storage.BlobBucket. +func (b *BlobBucket) NewReader(ctx context.Context, id storage.BlobID) (io.ReadSeekCloser, error) { + if cached, exist := b.inCache(id); exist { + logger.Debug(ctx, "found blob in cache", logger.F("cacheKey", b.getCacheKey(id))) + return cached, nil + } + + reader, err := b.bucket.NewReader(ctx, id) + if err != nil { + return nil, errors.WithStack(err) + } + + return &readCacher{ + reader: reader, + cache: b.cache, + key: b.getCacheKey(id), + }, nil +} + +func (b *BlobBucket) getCacheKey(id storage.BlobID) string { + return fmt.Sprintf("%s-%s", b.Name(), id) +} + +func (b *BlobBucket) inCache(id storage.BlobID) (io.ReadSeekCloser, bool) { + key := b.getCacheKey(id) + data, exist := b.cache.Get(key) + if !exist { + return nil, false + } + + return &cachedReader{data, 0}, true +} + +// NewWriter implements storage.BlobBucket. +func (b *BlobBucket) NewWriter(ctx context.Context, id storage.BlobID) (io.WriteCloser, error) { + writer, err := b.bucket.NewWriter(ctx, id) + if err != nil { + return nil, errors.WithStack(err) + } + + return writer, nil +} + +// Size implements storage.BlobBucket. +func (b *BlobBucket) Size(ctx context.Context) (int64, error) { + size, err := b.bucket.Size(ctx) + if err != nil { + return 0, errors.WithStack(err) + } + + return size, nil +} + +var _ storage.BlobBucket = &BlobBucket{} diff --git a/pkg/storage/driver/cache/blob_store.go b/pkg/storage/driver/cache/blob_store.go new file mode 100644 index 0000000..0f7c275 --- /dev/null +++ b/pkg/storage/driver/cache/blob_store.go @@ -0,0 +1,56 @@ +package cache + +import ( + "context" + "time" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "github.com/hashicorp/golang-lru/v2/expirable" + "github.com/pkg/errors" +) + +type BlobStore struct { + store storage.BlobStore + cache *expirable.LRU[string, []byte] +} + +// DeleteBucket implements storage.BlobStore. +func (s *BlobStore) DeleteBucket(ctx context.Context, name string) error { + if err := s.store.DeleteBucket(ctx, name); err != nil { + return errors.WithStack(err) + } + + return nil +} + +// ListBuckets implements storage.BlobStore. +func (s *BlobStore) ListBuckets(ctx context.Context) ([]string, error) { + buckets, err := s.store.ListBuckets(ctx) + if err != nil { + return nil, errors.WithStack(err) + } + + return buckets, nil +} + +// OpenBucket implements storage.BlobStore. +func (s *BlobStore) OpenBucket(ctx context.Context, name string) (storage.BlobBucket, error) { + bucket, err := s.store.OpenBucket(ctx, name) + if err != nil { + return nil, errors.WithStack(err) + } + + return &BlobBucket{ + bucket: bucket, + cache: s.cache, + }, nil +} + +func NewBlobStore(store storage.BlobStore, cacheSize int, cacheTTL time.Duration) *BlobStore { + return &BlobStore{ + store: store, + cache: expirable.NewLRU[string, []byte](cacheSize, nil, cacheTTL), + } +} + +var _ storage.BlobStore = &BlobStore{} diff --git a/pkg/storage/driver/cache/blob_store_test.go b/pkg/storage/driver/cache/blob_store_test.go new file mode 100644 index 0000000..b1cba21 --- /dev/null +++ b/pkg/storage/driver/cache/blob_store_test.go @@ -0,0 +1,50 @@ +package cache + +import ( + "context" + "fmt" + "os" + "testing" + "time" + + "forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite" + "forge.cadoles.com/arcad/edge/pkg/storage/testsuite" + "github.com/pkg/errors" + "gitlab.com/wpetit/goweb/logger" +) + +func TestBlobStore(t *testing.T) { + t.Parallel() + if testing.Verbose() { + logger.SetLevel(logger.LevelDebug) + } + + file := "./testdata/blobstore_test.sqlite" + + if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) { + t.Fatalf("%+v", errors.WithStack(err)) + } + + dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds()) + + backend := sqlite.NewBlobStore(dsn) + store := NewBlobStore(backend, 32, time.Second*1) + + testsuite.TestBlobStore(context.Background(), t, store) +} + +func BenchmarkBlobStore(t *testing.B) { + logger.SetLevel(logger.LevelError) + + file := "./testdata/blobstore_test.sqlite" + + if err := os.Remove(file); err != nil && !errors.Is(err, os.ErrNotExist) { + t.Fatalf("%+v", errors.WithStack(err)) + } + + dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds()) + backend := sqlite.NewBlobStore(dsn) + store := NewBlobStore(backend, 32, time.Minute) + + testsuite.BenchmarkBlobStore(t, store) +} diff --git a/pkg/storage/driver/cache/driver.go b/pkg/storage/driver/cache/driver.go new file mode 100644 index 0000000..c94ec27 --- /dev/null +++ b/pkg/storage/driver/cache/driver.go @@ -0,0 +1,64 @@ +package cache + +import ( + "net/url" + "strconv" + "time" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "forge.cadoles.com/arcad/edge/pkg/storage/driver" + "github.com/pkg/errors" +) + +func init() { + driver.RegisterBlobStoreFactory("cache", blobStoreFactory) +} + +func blobStoreFactory(dsn *url.URL) (storage.BlobStore, error) { + query := dsn.Query() + + rawCacheSize := query.Get("cacheSize") + if rawCacheSize == "" { + rawCacheSize = "128" + } + + cacheSize, err := strconv.ParseInt(rawCacheSize, 10, 32) + if err != nil { + return nil, errors.Wrap(err, "could not parse cacheSize url parameter") + } + + query.Del("cacheSize") + + rawCacheTTL := query.Get("cacheTTL") + if rawCacheTTL == "" { + rawCacheTTL = "10m" + } + + cacheTTL, err := time.ParseDuration(rawCacheTTL) + if err != nil { + return nil, errors.Wrap(err, "could not parse cacheTTL url parameter") + } + + query.Del("cacheTTL") + + rawDriver := query.Get("driver") + if rawDriver == "" { + return nil, errors.New("missing required url parameter 'driver'") + } + + query.Del("driver") + + url := &url.URL{ + Scheme: rawDriver, + Host: dsn.Host, + Path: dsn.Path, + RawQuery: query.Encode(), + } + + store, err := driver.NewBlobStore(url.String()) + if err != nil { + return nil, errors.WithStack(err) + } + + return NewBlobStore(store, int(cacheSize), cacheTTL), nil +} diff --git a/pkg/storage/driver/cache/reader.go b/pkg/storage/driver/cache/reader.go new file mode 100644 index 0000000..9b0117e --- /dev/null +++ b/pkg/storage/driver/cache/reader.go @@ -0,0 +1,124 @@ +package cache + +import ( + "bytes" + "context" + "fmt" + "io" + + "github.com/hashicorp/golang-lru/v2/expirable" + "github.com/pkg/errors" + "gitlab.com/wpetit/goweb/logger" +) + +type readCacher struct { + reader io.ReadSeekCloser + cache *expirable.LRU[string, []byte] + key string + buffer bytes.Buffer +} + +// Close implements io.ReadSeekCloser. +func (r *readCacher) Close() error { + if err := r.reader.Close(); err != nil { + return errors.WithStack(err) + } + + return nil +} + +// Read implements io.ReadSeekCloser. +func (r *readCacher) Read(p []byte) (n int, err error) { + length, err := r.reader.Read(p) + if err != nil { + if err == io.EOF { + if length > 0 { + if _, err := r.buffer.Write(p[:length]); err != nil { + logger.Error(context.Background(), "could not write to buffer", logger.CapturedE(errors.WithStack(err))) + return length, io.EOF + } + } + + logger.Debug(context.Background(), "caching blob", logger.F("cacheKey", r.key)) + r.cache.Add(r.key, r.buffer.Bytes()) + + return length, io.EOF + } + + return length, errors.WithStack(err) + } + + if length > 0 { + if _, err := r.buffer.Write(p[:length]); err != nil { + logger.Error(context.Background(), "could not write to buffer", logger.CapturedE(errors.WithStack(err))) + } + } + + return length, nil +} + +// Seek implements io.ReadSeekCloser. +func (r *readCacher) Seek(offset int64, whence int) (int64, error) { + length, err := r.reader.Seek(offset, whence) + if err != nil { + return length, errors.WithStack(err) + } + + return length, nil +} + +var _ io.ReadSeekCloser = &readCacher{} + +type cachedReader struct { + buffer []byte + offset int64 +} + +// Read implements io.ReadSeekCloser. +func (r *cachedReader) Read(p []byte) (n int, err error) { + available := len(r.buffer) - int(r.offset) + if available == 0 { + return 0, io.EOF + } + + size := len(p) + if size > available { + size = available + } + + copy(p, r.buffer[r.offset:r.offset+int64(size)]) + r.offset += int64(size) + + return size, nil +} + +// Close implements io.ReadSeekCloser. +func (r *cachedReader) Close() error { + return nil +} + +// Seek implements io.ReadSeekCloser. +func (r *cachedReader) Seek(offset int64, whence int) (int64, error) { + var newOffset int64 + + switch whence { + case io.SeekStart: + newOffset = offset + case io.SeekCurrent: + newOffset = r.offset + offset + case io.SeekEnd: + newOffset = int64(len(r.buffer)) + offset + default: + return 0, errors.Errorf("unknown seek whence '%d'", whence) + } + + if newOffset > int64(len(r.buffer)) || newOffset < 0 { + return 0, fmt.Errorf("invalid offset %d", offset) + } + + r.offset = newOffset + + return newOffset, nil +} + +var _ io.ReadSeekCloser = &cachedReader{} diff --git a/pkg/storage/driver/cache/testdata/.gitignore b/pkg/storage/driver/cache/testdata/.gitignore new file mode 100644 index 0000000..c96a04f --- /dev/null +++ b/pkg/storage/driver/cache/testdata/.gitignore @@ -0,0 +1,2 @@ +* +!.gitignore \ No newline at end of file diff --git a/pkg/storage/testsuite/blob_store_benchmark.go b/pkg/storage/testsuite/blob_store_benchmark.go index 3dc1486..0758eaa 100644 --- a/pkg/storage/testsuite/blob_store_benchmark.go +++ b/pkg/storage/testsuite/blob_store_benchmark.go @@ -3,8 +3,10 @@ package testsuite import ( "bytes" "context" + "crypto/rand" "fmt" "io" + mrand "math/rand" "testing" "forge.cadoles.com/arcad/edge/pkg/storage" @@ -13,7 +15,6 @@ import ( func BenchmarkBlobStore(t *testing.B, store storage.BlobStore) { t.Run("BlobCreateUpdateReadDelete", func(t *testing.B) { - for i := 0; i < t.N; i++ { bucketName := fmt.Sprintf("bucket-%d", i) if err := runBlobCreateUpdateReadDelete(store, bucketName); err != nil { @@ -21,6 +22,21 @@ func BenchmarkBlobStore(t *testing.B, store storage.BlobStore) { } } }) + + t.Run("BlobRandomRead", func(t *testing.B) { + t.StopTimer() + if err := prepareBlobStoreRandomRead(store); err != nil { + t.Fatalf("%+v", errors.WithStack(err)) + } + t.ResetTimer() + + t.StartTimer() + for i := 0; i < t.N; i++ { + if err := doRandomRead(store); err != nil { + t.Fatalf("%+v", errors.WithStack(err)) + } + } + }) } func runBlobCreateUpdateReadDelete(store storage.BlobStore, bucketName string) error { @@ -77,3 +93,115 @@ func runBlobCreateUpdateReadDelete(store storage.BlobStore, bucketName string) e return nil } + +func prepareBlobStoreRandomRead(store storage.BlobStore) error { + ctx := context.Background() + totalBuckets := 128 + totalBlobs := 64 + + for i := 0; i < totalBuckets; i++ { + bucketName := fmt.Sprintf("bucket-%d", i) + err := func(bucketName string) error { + bucket, err := store.OpenBucket(ctx, bucketName) + if err != nil { + return errors.WithStack(err) + } + + defer func() { + if err := bucket.Close(); err != nil { + panic(errors.WithStack(err)) + } + }() + + for j := 0; j < totalBlobs; j++ { + blobID := storage.NewBlobID() + err = func(blobID storage.BlobID) error { + writer, err := bucket.NewWriter(ctx, blobID) + if err != nil { + return errors.WithStack(err) + } + + defer func() { + if err := writer.Close(); err != nil { + panic(errors.WithStack(err)) + } + }() + + data := make([]byte, j) + if _, err := rand.Read(data); err != nil { + return errors.WithStack(err) + } + + if _, err = writer.Write(data); err != nil { + return errors.WithStack(err) + } + + if err := writer.Close(); err != nil { + return errors.WithStack(err) + } + + return nil + }(blobID) + if err != nil { + return errors.WithStack(err) + } + } + + return nil + }(bucketName) + if err != nil { + return errors.WithStack(err) + } + } + + return nil +} + +func doRandomRead(store storage.BlobStore) error { + ctx := context.Background() + + buckets, err := store.ListBuckets(ctx) + if err != nil { + return errors.WithStack(err) + } + + randBucketIndex := mrand.Int31n(int32(len(buckets))) + bucketName := buckets[randBucketIndex] + + bucket, err := store.OpenBucket(ctx, bucketName) + if err != nil { + return errors.WithStack(err) + } + + defer func() { + if err := bucket.Close(); err != nil { + panic(errors.WithStack(err)) + } + }() + + blobs, err := bucket.List(ctx) + if err != nil { + return errors.WithStack(err) + } + + randBlobIndex := mrand.Int31n(int32(len(blobs))) + blobInfo := blobs[randBlobIndex] + blobID := blobInfo.ID() + + reader, err := bucket.NewReader(ctx, blobID) + if err != nil { + return errors.WithStack(err) + } + + var buf bytes.Buffer + + if _, err = io.Copy(&buf, reader); err != nil { + return errors.WithStack(err) + } + + if err := reader.Close(); err != nil { + return errors.WithStack(err) + } + + return nil +} diff --git a/pkg/storage/testsuite/blob_store_ops.go b/pkg/storage/testsuite/blob_store_ops.go index 323fdf2..c96a159 100644 --- a/pkg/storage/testsuite/blob_store_ops.go +++ b/pkg/storage/testsuite/blob_store_ops.go @@ -122,6 +122,24 @@ var blobStoreTestCases = []blobStoreTestCase{ panic(errors.WithStack(err)) } + reader, err = bucket.NewReader(ctx, blobID) + if err != nil { + return errors.WithStack(err) + } + + written64, err = io.Copy(&buf, reader) + if err != nil { + return errors.WithStack(err) + } + + if e, g := int64(len(data)), written64; e != g { + return errors.Errorf("length of written data: expected '%v', got '%v'", e, g) + } + + if err := reader.Close(); err != nil { + panic(errors.WithStack(err)) + } + if err := bucket.Close(); err != nil { return errors.WithStack(err) }