diff --git a/.gitignore b/.gitignore index c86f74d..feccf6d 100644 --- a/.gitignore +++ b/.gitignore @@ -2,7 +2,7 @@ /bin /.env /tools -*.sqlite +*.sqlite* /.gitea-release /.edge /data diff --git a/cmd/blobstore-test/main.go b/cmd/blobstore-test/main.go new file mode 100644 index 0000000..7eca01a --- /dev/null +++ b/cmd/blobstore-test/main.go @@ -0,0 +1,146 @@ +package main + +import ( + "context" + "crypto/rand" + "flag" + "io" + mrand "math/rand" + "runtime" + "time" + + "forge.cadoles.com/arcad/edge/pkg/storage" + "forge.cadoles.com/arcad/edge/pkg/storage/driver" + "github.com/pkg/errors" + "gitlab.com/wpetit/goweb/logger" + + _ "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache" + _ "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc" + _ "forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite" +) + +var ( + dsn string +) + +func init() { + flag.StringVar(&dsn, "dsn", "cache://./test-cache.sqlite?driver=sqlite&_pragma=foreign_keys(1)&_pragma=journal_mode=wal&bigCacheShards=32&bigCacheHardMaxCacheSize=128&bigCacheMaxEntrySize=125&bigCacheMaxEntriesInWindow=200000", "blobstore dsn") +} + +func main() { + flag.Parse() + + ctx := context.Background() + + logger.SetLevel(logger.LevelDebug) + + blobStore, err := driver.NewBlobStore(dsn) + if err != nil { + logger.Fatal(ctx, "could not create blobstore", logger.CapturedE(errors.WithStack(err))) + } + + bucket, err := blobStore.OpenBucket(ctx, "default") + if err != nil { + logger.Fatal(ctx, "could not open bucket", logger.CapturedE(errors.WithStack(err))) + } + + defer func() { + if err := bucket.Close(); err != nil { + logger.Fatal(ctx, "could not close bucket", logger.CapturedE(errors.WithStack(err))) + } + }() + + go readRandomBlobs(ctx, bucket) + + for { + writeRandomBlob(ctx, bucket) + + time.Sleep(1 * time.Second) + + size, err := bucket.Size(ctx) + if err != nil { + logger.Fatal(ctx, "could not retrieve bucket size", logger.CapturedE(errors.WithStack(err))) + } + + logger.Debug(ctx, "bucket stats", logger.F("size", size)) + } +} + +func readRandomBlobs(ctx context.Context, bucket storage.BlobBucket) { + for { + infos, err := bucket.List(ctx) + if err != nil { + logger.Fatal(ctx, "could not list blobs", logger.CapturedE(errors.WithStack(err))) + } + + total := len(infos) + if total == 0 { + logger.Debug(ctx, "no blob yet") + + continue + } + + blob := infos[mrand.Intn(total)] + + readBlob(ctx, bucket, blob.ID()) + + time.Sleep(250 * time.Millisecond) + } +} + +func readBlob(ctx context.Context, bucket storage.BlobBucket, blobID storage.BlobID) { + ctx = logger.With(ctx, logger.F("blobID", blobID)) + + reader, err := bucket.NewReader(ctx, blobID) + if err != nil { + logger.Fatal(ctx, "could not create reader", logger.CapturedE(errors.WithStack(err))) + } + + defer func() { + if err := reader.Close(); err != nil { + logger.Fatal(ctx, "could not close reader", logger.CapturedE(errors.WithStack(err))) + } + }() + + if _, err := io.ReadAll(reader); err != nil { + logger.Fatal(ctx, "could not read blob", logger.CapturedE(errors.WithStack(err))) + } +} + +func writeRandomBlob(ctx context.Context, bucket storage.BlobBucket) { + blobID := storage.NewBlobID() + buff := make([]byte, 10*1024) + + writer, err := bucket.NewWriter(ctx, blobID) + if err != nil { + logger.Fatal(ctx, "could not create writer", logger.CapturedE(errors.WithStack(err))) + } + + defer func() { + if err := writer.Close(); err != nil { + logger.Fatal(ctx, "could not close writer", logger.CapturedE(errors.WithStack(err))) + } + }() + + if _, err := rand.Read(buff); err != nil { + logger.Fatal(ctx, "could not read random data", logger.CapturedE(errors.WithStack(err))) + } + + if _, err := writer.Write(buff); err != nil { + logger.Fatal(ctx, "could not write blob", logger.CapturedE(errors.WithStack(err))) + } + + printMemUsage(ctx) +} + +func printMemUsage(ctx context.Context) { + var m runtime.MemStats + runtime.ReadMemStats(&m) + logger.Debug( + ctx, "memory usage", + logger.F("alloc", m.Alloc/1024/1024), + logger.F("totalAlloc", m.TotalAlloc/1024/1024), + logger.F("sys", m.Sys/1024/1024), + logger.F("numGC", m.NumGC), + ) +} diff --git a/pkg/storage/driver/cache/blob_bucket.go b/pkg/storage/driver/cache/blob_bucket.go index 34827b1..3e3981d 100644 --- a/pkg/storage/driver/cache/blob_bucket.go +++ b/pkg/storage/driver/cache/blob_bucket.go @@ -140,7 +140,7 @@ func (b *BlobBucket) clearCache(ctx context.Context, id storage.BlobID) { logger.Debug(ctx, "clearing cache", logger.F("cacheKey", key)) - if err := b.contentCache.Delete(key); err != nil { + if err := b.contentCache.Delete(key); err != nil && !errors.Is(err, bigcache.ErrEntryNotFound) { logger.Error(ctx, "could not clear cache", logger.CapturedE(errors.WithStack(err))) } diff --git a/pkg/storage/driver/cache/blob_store.go b/pkg/storage/driver/cache/blob_store.go index d51d78a..a6bdf9f 100644 --- a/pkg/storage/driver/cache/blob_store.go +++ b/pkg/storage/driver/cache/blob_store.go @@ -70,12 +70,7 @@ func (s *BlobStore) OpenBucket(ctx context.Context, name string) (storage.BlobBu func NewBlobStore(store storage.BlobStore, funcs ...OptionFunc) (*BlobStore, error) { options := NewOptions(funcs...) - cacheConfig := bigcache.DefaultConfig(options.CacheTTL) - cacheConfig.Logger = &cacheLogger{} - cacheConfig.HardMaxCacheSize = options.BlobCacheMaxMemorySize - cacheConfig.Shards = options.BlobCacheShards - - contentCache, err := bigcache.New(context.Background(), cacheConfig) + contentCache, err := bigcache.New(context.Background(), options.BigCache) if err != nil { return nil, errors.WithStack(err) } diff --git a/pkg/storage/driver/cache/driver.go b/pkg/storage/driver/cache/driver.go index b02e8d8..a09b69c 100644 --- a/pkg/storage/driver/cache/driver.go +++ b/pkg/storage/driver/cache/driver.go @@ -30,65 +30,45 @@ func blobStoreFactory(dsn *url.URL) (storage.BlobStore, error) { blobStoreOptionFuncs := make([]OptionFunc, 0) - rawCacheTTL := query.Get("cacheTTL") - if rawCacheTTL != "" { - query.Del("cacheTTL") - - ttl, err := time.ParseDuration(rawCacheTTL) - if err != nil { - return nil, errors.Wrap(err, "could not parse url parameter 'cacheTTL'") + cacheTTL, err := parseDuration(&query, "cacheTTL") + if err != nil { + if !errors.Is(err, errNotFound) { + return nil, errors.WithStack(err) } - blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithCacheTTL(ttl)) + cacheTTL = time.Hour } - rawCacheShards := query.Get("blobCacheShards") - if rawCacheShards != "" { - query.Del("blobCacheShards") + blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithCacheTTL(cacheTTL)) - cacheShards, err := strconv.ParseInt(rawCacheShards, 10, 32) - if err != nil { - return nil, errors.Wrap(err, "could not parse url parameter 'blobCacheShards'") + cacheConfig, err := parseBigCacheConfig(&query, cacheTTL) + if err != nil { + return nil, errors.Wrap(err, "could not parse big cache config") + } + + blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBigCacheConfig(*cacheConfig)) + + blobBucketCacheSize, err := parseInt(&query, "blobBucketCacheSize") + if err != nil { + if !errors.Is(err, errNotFound) { + return nil, errors.WithStack(err) } - blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBlobCacheShards(int(cacheShards))) + blobBucketCacheSize = 16 } - rawBlobCacheMaxMemorySize := query.Get("blobCacheMaxMemorySize") - if rawBlobCacheMaxMemorySize != "" { - query.Del("blobCacheMaxMemorySize") + blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBucketCacheSize(int(blobBucketCacheSize))) - blobCacheMaxMemorySize, err := strconv.ParseInt(rawBlobCacheMaxMemorySize, 10, 32) - if err != nil { - return nil, errors.Wrap(err, "could not parse url parameter 'blobCacheMaxMemorySize'") + bloInfoCacheSize, err := parseInt(&query, "bloInfoCacheSize") + if err != nil { + if !errors.Is(err, errNotFound) { + return nil, errors.WithStack(err) } - blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBlobCacheMaxMemorySize(int(blobCacheMaxMemorySize))) + bloInfoCacheSize = 16 } - rawBlobBucketCacheSize := query.Get("blobBucketCacheSize") - if rawBlobBucketCacheSize != "" { - query.Del("blobBucketCacheSize") - - blobBucketCacheSize, err := strconv.ParseInt(rawBlobBucketCacheSize, 10, 32) - if err != nil { - return nil, errors.Wrap(err, "could not parse url parameter 'blobBucketCacheSize'") - } - - blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBucketCacheSize(int(blobBucketCacheSize))) - } - - rawBlobInfoCacheSize := query.Get("blobInfoCacheSize") - if rawBlobInfoCacheSize != "" { - query.Del("blobInfoCacheSize") - - blobInfoCacheSize, err := strconv.ParseInt(rawBlobInfoCacheSize, 10, 32) - if err != nil { - return nil, errors.Wrap(err, "could not parse url parameter 'blobInfoCacheSize'") - } - - blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBlobInfoCacheSize(int(blobInfoCacheSize))) - } + blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBlobInfoCacheSize(int(bloInfoCacheSize))) url := &url.URL{ Scheme: rawDriver, @@ -117,3 +97,110 @@ func (l *cacheLogger) Printf(format string, v ...interface{}) { } var _ bigcache.Logger = &cacheLogger{} + +func parseBigCacheConfig(query *url.Values, cacheTTL time.Duration) (*bigcache.Config, error) { + config := bigcache.DefaultConfig(cacheTTL) + config.Logger = &cacheLogger{} + + hardMaxCacheSize, err := parseInt(query, "bigCacheHardMaxCacheSize") + if err != nil { + if !errors.Is(err, errNotFound) { + return nil, errors.WithStack(err) + } + + hardMaxCacheSize = int64(config.HardMaxCacheSize) + } + + config.HardMaxCacheSize = int(hardMaxCacheSize) + + maxEntriesInWindow, err := parseInt(query, "bigCacheMaxEntriesInWindow") + if err != nil { + if !errors.Is(err, errNotFound) { + return nil, errors.WithStack(err) + } + + maxEntriesInWindow = int64(config.MaxEntriesInWindow) + } + + config.MaxEntriesInWindow = int(maxEntriesInWindow) + + shards, err := parseInt(query, "bigCacheShards") + if err != nil { + if !errors.Is(err, errNotFound) { + return nil, errors.WithStack(err) + } + + shards = int64(config.Shards) + } + + config.Shards = int(shards) + + maxEntrySize, err := parseInt(query, "bigCacheMaxEntrySize") + if err != nil { + if !errors.Is(err, errNotFound) { + return nil, errors.WithStack(err) + } + + maxEntrySize = int64(config.MaxEntrySize) + } + + config.MaxEntrySize = int(maxEntrySize) + + cleanWindow, err := parseDuration(query, "bigCacheCleanWindow") + if err != nil { + if !errors.Is(err, errNotFound) { + return nil, errors.WithStack(err) + } + + cleanWindow = config.CleanWindow + } + + config.CleanWindow = cleanWindow + + lifeWindow, err := parseDuration(query, "bigCacheLifeWindow") + if err != nil { + if !errors.Is(err, errNotFound) { + return nil, errors.WithStack(err) + } + + lifeWindow = config.LifeWindow + } + + config.LifeWindow = lifeWindow + + return &config, nil +} + +var errNotFound = errors.New("not found") + +func parseInt(query *url.Values, name string) (int64, error) { + rawValue := query.Get(name) + if rawValue != "" { + query.Del(name) + + value, err := strconv.ParseInt(rawValue, 10, 32) + if err != nil { + return 0, errors.Wrapf(err, "could not parse url parameter '%s'", name) + } + + return value, nil + } + + return 0, errors.WithStack(errNotFound) +} + +func parseDuration(query *url.Values, name string) (time.Duration, error) { + rawValue := query.Get(name) + if rawValue != "" { + query.Del(name) + + value, err := time.ParseDuration(rawValue) + if err != nil { + return 0, errors.Wrapf(err, "could not parse url parameter '%s'", name) + } + + return value, nil + } + + return 0, errors.WithStack(errNotFound) +} diff --git a/pkg/storage/driver/cache/options.go b/pkg/storage/driver/cache/options.go index 3424f90..974d0c3 100644 --- a/pkg/storage/driver/cache/options.go +++ b/pkg/storage/driver/cache/options.go @@ -1,24 +1,27 @@ package cache -import "time" +import ( + "time" + + "github.com/allegro/bigcache/v3" +) type Options struct { - CacheTTL time.Duration - BlobCacheMaxMemorySize int - BlobCacheShards int - BucketCacheSize int - BlobInfoCacheSize int + CacheTTL time.Duration + BigCache bigcache.Config + BucketCacheSize int + BlobInfoCacheSize int } type OptionFunc func(opts *Options) func NewOptions(funcs ...OptionFunc) *Options { + defaultTTL := 60 * time.Minute opts := &Options{ - CacheTTL: 60 * time.Minute, - BlobCacheMaxMemorySize: 256, - BlobCacheShards: 1024, - BucketCacheSize: 16, - BlobInfoCacheSize: 512, + CacheTTL: defaultTTL, + BigCache: bigcache.DefaultConfig(defaultTTL), + BucketCacheSize: 16, + BlobInfoCacheSize: 256, } for _, fn := range funcs { @@ -34,15 +37,9 @@ func WithCacheTTL(ttl time.Duration) OptionFunc { } } -func WithBlobCacheMaxMemorySize(size int) OptionFunc { +func WithBigCacheConfig(config bigcache.Config) OptionFunc { return func(opts *Options) { - opts.BlobCacheMaxMemorySize = size - } -} - -func WithBlobCacheShards(shards int) OptionFunc { - return func(opts *Options) { - opts.BlobCacheShards = shards + opts.BigCache = config } } diff --git a/pkg/storage/driver/sqlite/driver.go b/pkg/storage/driver/sqlite/driver.go index 97dae13..19e2d56 100644 --- a/pkg/storage/driver/sqlite/driver.go +++ b/pkg/storage/driver/sqlite/driver.go @@ -20,7 +20,7 @@ func init() { func documentStoreFactory(url *url.URL) (storage.DocumentStore, error) { dir := filepath.Dir(url.Host + url.Path) - if dir != ":memory:" { + if dir != "." { if err := os.MkdirAll(dir, os.FileMode(0750)); err != nil { return nil, errors.WithStack(err) } @@ -39,7 +39,7 @@ func documentStoreFactory(url *url.URL) (storage.DocumentStore, error) { func blobStoreFactory(url *url.URL) (storage.BlobStore, error) { dir := filepath.Dir(url.Host + url.Path) - if dir != ":memory:" { + if dir != "." { if err := os.MkdirAll(dir, os.FileMode(0750)); err != nil { return nil, errors.WithStack(err) } @@ -58,7 +58,7 @@ func blobStoreFactory(url *url.URL) (storage.BlobStore, error) { func shareStoreFactory(url *url.URL) (share.Store, error) { dir := filepath.Dir(url.Host + url.Path) - if dir != ":memory:" { + if dir != "." { if err := os.MkdirAll(dir, os.FileMode(0750)); err != nil { return nil, errors.WithStack(err) }