Compare commits
13 Commits
2023.11.30
...
2024.1.12-
Author | SHA1 | Date | |
---|---|---|---|
776dbba5b0 | |||
8f9428b3f3 | |||
a268759d33 | |||
a276b92a03 | |||
b9c08f647c | |||
59f023a7d9 | |||
753a6c9708 | |||
b120e590b6 | |||
242bf379a8 | |||
065a9002a0 | |||
83a1e89665 | |||
d9e8aac458 | |||
32f04af138 |
@ -1,4 +1,4 @@
|
||||
RUN_APP_ARGS=""
|
||||
#EDGE_DOCUMENTSTORE_DSN="rpc://localhost:3001/documentstore?tenant=local&appId=%APPID%"
|
||||
#EDGE_BLOBSTORE_DSN="cache://localhost:3001/blobstore?driver=rpc&tenant=local&appId=%APPID%"
|
||||
#EDGE_BLOBSTORE_DSN="cache://localhost:3001/blobstore?driver=rpc&tenant=local&appId=%APPID%&blobCacheStoreType=fs&blobCacheStoreBaseDir=data/cache/%APPID%&blobCacheSize=64MB"
|
||||
#EDGE_SHARESTORE_DSN="rpc://localhost:3001/sharestore?tenant=local"
|
2
.gitignore
vendored
2
.gitignore
vendored
@ -2,7 +2,7 @@
|
||||
/bin
|
||||
/.env
|
||||
/tools
|
||||
*.sqlite
|
||||
*.sqlite*
|
||||
/.gitea-release
|
||||
/.edge
|
||||
/data
|
||||
|
@ -108,10 +108,17 @@ nfpms:
|
||||
file_info:
|
||||
mode: 0640
|
||||
packager: apk
|
||||
- src: misc/packaging/openrc/storage-server.logrotate.conf
|
||||
dst: /etc/logrotate.d/storage-server
|
||||
packager: apk
|
||||
- dst: /var/lib/storage-server
|
||||
type: dir
|
||||
file_info:
|
||||
mode: 0700
|
||||
packager: apk
|
||||
- dst: /var/log/storage-server
|
||||
type: dir
|
||||
file_info:
|
||||
mode: 0700
|
||||
scripts:
|
||||
postinstall: "misc/packaging/common/postinstall-storage-server.sh"
|
146
cmd/blobstore-test/main.go
Normal file
146
cmd/blobstore-test/main.go
Normal file
@ -0,0 +1,146 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"flag"
|
||||
"io"
|
||||
mrand "math/rand"
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
|
||||
_ "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache"
|
||||
_ "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc"
|
||||
_ "forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite"
|
||||
)
|
||||
|
||||
var (
|
||||
dsn string
|
||||
)
|
||||
|
||||
func init() {
|
||||
flag.StringVar(&dsn, "dsn", "cache://./test-cache.sqlite?driver=sqlite&_pragma=foreign_keys(1)&_pragma=journal_mode=wal&bigCacheShards=32&bigCacheHardMaxCacheSize=128&bigCacheMaxEntrySize=125&bigCacheMaxEntriesInWindow=200000", "blobstore dsn")
|
||||
}
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
logger.SetLevel(logger.LevelDebug)
|
||||
|
||||
blobStore, err := driver.NewBlobStore(dsn)
|
||||
if err != nil {
|
||||
logger.Fatal(ctx, "could not create blobstore", logger.CapturedE(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
bucket, err := blobStore.OpenBucket(ctx, "default")
|
||||
if err != nil {
|
||||
logger.Fatal(ctx, "could not open bucket", logger.CapturedE(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := bucket.Close(); err != nil {
|
||||
logger.Fatal(ctx, "could not close bucket", logger.CapturedE(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
go readRandomBlobs(ctx, bucket)
|
||||
|
||||
for {
|
||||
writeRandomBlob(ctx, bucket)
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
size, err := bucket.Size(ctx)
|
||||
if err != nil {
|
||||
logger.Fatal(ctx, "could not retrieve bucket size", logger.CapturedE(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
logger.Debug(ctx, "bucket stats", logger.F("size", size))
|
||||
}
|
||||
}
|
||||
|
||||
func readRandomBlobs(ctx context.Context, bucket storage.BlobBucket) {
|
||||
for {
|
||||
infos, err := bucket.List(ctx)
|
||||
if err != nil {
|
||||
logger.Fatal(ctx, "could not list blobs", logger.CapturedE(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
total := len(infos)
|
||||
if total == 0 {
|
||||
logger.Debug(ctx, "no blob yet")
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
blob := infos[mrand.Intn(total)]
|
||||
|
||||
readBlob(ctx, bucket, blob.ID())
|
||||
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
func readBlob(ctx context.Context, bucket storage.BlobBucket, blobID storage.BlobID) {
|
||||
ctx = logger.With(ctx, logger.F("blobID", blobID))
|
||||
|
||||
reader, err := bucket.NewReader(ctx, blobID)
|
||||
if err != nil {
|
||||
logger.Fatal(ctx, "could not create reader", logger.CapturedE(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := reader.Close(); err != nil {
|
||||
logger.Fatal(ctx, "could not close reader", logger.CapturedE(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
if _, err := io.ReadAll(reader); err != nil {
|
||||
logger.Fatal(ctx, "could not read blob", logger.CapturedE(errors.WithStack(err)))
|
||||
}
|
||||
}
|
||||
|
||||
func writeRandomBlob(ctx context.Context, bucket storage.BlobBucket) {
|
||||
blobID := storage.NewBlobID()
|
||||
buff := make([]byte, 10*1024)
|
||||
|
||||
writer, err := bucket.NewWriter(ctx, blobID)
|
||||
if err != nil {
|
||||
logger.Fatal(ctx, "could not create writer", logger.CapturedE(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := writer.Close(); err != nil {
|
||||
logger.Fatal(ctx, "could not close writer", logger.CapturedE(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
if _, err := rand.Read(buff); err != nil {
|
||||
logger.Fatal(ctx, "could not read random data", logger.CapturedE(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
if _, err := writer.Write(buff); err != nil {
|
||||
logger.Fatal(ctx, "could not write blob", logger.CapturedE(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
printMemUsage(ctx)
|
||||
}
|
||||
|
||||
func printMemUsage(ctx context.Context) {
|
||||
var m runtime.MemStats
|
||||
runtime.ReadMemStats(&m)
|
||||
logger.Debug(
|
||||
ctx, "memory usage",
|
||||
logger.F("alloc", m.Alloc/1024/1024),
|
||||
logger.F("totalAlloc", m.TotalAlloc/1024/1024),
|
||||
logger.F("sys", m.Sys/1024/1024),
|
||||
logger.F("numGC", m.NumGC),
|
||||
)
|
||||
}
|
@ -107,10 +107,10 @@ func RunCommand() *cli.Command {
|
||||
Usage: "use `FILE` as local accounts",
|
||||
Value: ".edge/%APPID%/accounts.json",
|
||||
},
|
||||
&cli.IntFlag{
|
||||
&cli.Int64Flag{
|
||||
Name: "max-upload-size",
|
||||
Usage: "use `MAX-UPLOAD-SIZE` as blob max upload size",
|
||||
Value: 10 << (10 * 2), // 10Mb
|
||||
Value: 128 << (10 * 2), // 128Mb
|
||||
},
|
||||
},
|
||||
Action: func(ctx *cli.Context) error {
|
||||
@ -123,7 +123,7 @@ func RunCommand() *cli.Command {
|
||||
documentstoreDSN := ctx.String("documentstore-dsn")
|
||||
shareStoreDSN := ctx.String("sharestore-dsn")
|
||||
accountsFile := ctx.String("accounts-file")
|
||||
maxUploadSize := ctx.Int("max-upload-size")
|
||||
maxUploadSize := ctx.Int64("max-upload-size")
|
||||
|
||||
logger.SetFormat(logger.Format(logFormat))
|
||||
logger.SetLevel(logger.Level(logLevel))
|
||||
@ -182,7 +182,7 @@ func RunCommand() *cli.Command {
|
||||
}
|
||||
}
|
||||
|
||||
func runApp(ctx context.Context, path, address, documentStoreDSN, blobStoreDSN, shareStoreDSN, accountsFile string, appRepository appModule.Repository, maxUploadSize int) error {
|
||||
func runApp(ctx context.Context, path, address, documentStoreDSN, blobStoreDSN, shareStoreDSN, accountsFile string, appRepository appModule.Repository, maxUploadSize int64) error {
|
||||
absPath, err := filepath.Abs(path)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "could not resolve path '%s'", path)
|
||||
|
9
go.mod
9
go.mod
@ -6,22 +6,24 @@ require (
|
||||
github.com/getsentry/sentry-go v0.25.0
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.7
|
||||
github.com/hashicorp/mdns v1.0.5
|
||||
github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf
|
||||
github.com/jackc/puddle/v2 v2.2.1
|
||||
github.com/keegancsmith/rpc v1.3.0
|
||||
github.com/klauspost/compress v1.16.6
|
||||
github.com/lestrrat-go/jwx/v2 v2.0.8
|
||||
github.com/mitchellh/hashstructure/v2 v2.0.2
|
||||
github.com/ulikunitz/xz v0.5.11
|
||||
go.uber.org/goleak v1.3.0
|
||||
modernc.org/sqlite v1.20.4
|
||||
)
|
||||
|
||||
require (
|
||||
cloud.google.com/go v0.75.0 // indirect
|
||||
github.com/allegro/bigcache/v3 v3.1.0 // indirect
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
|
||||
github.com/go-playground/locales v0.14.0 // indirect
|
||||
github.com/go-playground/universal-translator v0.18.0 // indirect
|
||||
github.com/goccy/go-json v0.9.11 // indirect
|
||||
github.com/gogo/protobuf v0.0.0-20161014173244-50d1bd39ce4e // indirect
|
||||
github.com/jackc/puddle/v2 v2.2.1 // indirect
|
||||
github.com/leodido/go-urn v1.2.1 // indirect
|
||||
github.com/lestrrat-go/blackmagic v1.0.1 // indirect
|
||||
github.com/lestrrat-go/httpcc v1.0.1 // indirect
|
||||
@ -29,7 +31,6 @@ require (
|
||||
github.com/lestrrat-go/iter v1.0.2 // indirect
|
||||
github.com/lestrrat-go/option v1.0.0 // indirect
|
||||
github.com/miekg/dns v1.1.53 // indirect
|
||||
go.uber.org/goleak v1.3.0 // indirect
|
||||
golang.org/x/sync v0.1.0 // indirect
|
||||
google.golang.org/genproto v0.0.0-20210226172003-ab064af71705 // indirect
|
||||
google.golang.org/grpc v1.35.0 // indirect
|
||||
@ -87,3 +88,5 @@ require (
|
||||
modernc.org/strutil v1.1.3 // indirect
|
||||
modernc.org/token v1.0.1 // indirect
|
||||
)
|
||||
|
||||
replace github.com/allegro/bigcache/v3 v3.1.0 => github.com/Bornholm/bigcache v0.0.0-20231201111725-1ddf51584cad
|
||||
|
6
go.sum
6
go.sum
@ -53,8 +53,6 @@ github.com/alecthomas/kong v0.2.1-0.20190708041108-0548c6b1afae/go.mod h1:+inYUS
|
||||
github.com/alecthomas/kong-hcl v0.1.8-0.20190615233001-b21fea9723c8/go.mod h1:MRgZdU3vrFd05IQ89AxUZ0aYdF39BYoNFa324SodPCA=
|
||||
github.com/alecthomas/repr v0.0.0-20180818092828-117648cd9897 h1:p9Sln00KOTlrYkxI1zYWl1QLnEqAqEARBEYa8FQnQcY=
|
||||
github.com/alecthomas/repr v0.0.0-20180818092828-117648cd9897/go.mod h1:xTS7Pm1pD1mvyM075QCDSRqH6qRLXylzS24ZTpRiSzQ=
|
||||
github.com/allegro/bigcache/v3 v3.1.0 h1:H2Vp8VOvxcrB91o86fUSVJFqeuz8kpyyB02eH3bSzwk=
|
||||
github.com/allegro/bigcache/v3 v3.1.0/go.mod h1:aPyh7jEvrog9zAwx5N7+JUQX5dZTSGpxF1LAR4dr35I=
|
||||
github.com/barnybug/go-cast v0.0.0-20201201064555-a87ccbc26692 h1:JW4WZlqyaNWUUahfr7MigeDW6jmtam5cTzzo1lwsFhE=
|
||||
github.com/barnybug/go-cast v0.0.0-20201201064555-a87ccbc26692/go.mod h1:Au0ipPuCBA7zsOC61SnyrYetm8VT3vo1UJtwHeYke44=
|
||||
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
||||
@ -208,6 +206,8 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:
|
||||
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
|
||||
github.com/igm/sockjs-go/v3 v3.0.2 h1:2m0k53w0DBiGozeQUIEPR6snZFmpFpYvVsGnfLPNXbE=
|
||||
github.com/igm/sockjs-go/v3 v3.0.2/go.mod h1:UqchsOjeagIBFHvd+RZpLaVRbCwGilEC08EDHsD1jYE=
|
||||
github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf h1:FtEj8sfIcaaBfAKrE1Cwb61YDtYq9JxChK1c7AKce7s=
|
||||
github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf/go.mod h1:yrqSXGoD/4EKfF26AOGzscPOgTTJcyAwM2rpixWT+t4=
|
||||
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
|
||||
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
|
||||
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
||||
@ -258,6 +258,8 @@ github.com/miekg/dns v0.0.0-20161006100029-fc4e1e2843d8/go.mod h1:W1PPwlIAgtquWB
|
||||
github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI=
|
||||
github.com/miekg/dns v1.1.53 h1:ZBkuHr5dxHtB1caEOlZTLPo7D3L3TWckgUUs/RHfDxw=
|
||||
github.com/miekg/dns v1.1.53/go.mod h1:uInx36IzPl7FYnDcMeVWxj9byh7DutNykX4G9Sj60FY=
|
||||
github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4=
|
||||
github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE=
|
||||
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
|
||||
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
|
||||
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
|
||||
|
@ -4,7 +4,7 @@ ARG HTTP_PROXY=
|
||||
ARG HTTPS_PROXY=
|
||||
ARG http_proxy=
|
||||
ARG https_proxy=
|
||||
ARG GO_VERSION=1.21.2
|
||||
ARG GO_VERSION=1.21.5
|
||||
|
||||
# Install dev environment dependencies
|
||||
RUN export DEBIAN_FRONTEND=noninteractive &&\
|
||||
|
9
misc/packaging/openrc/storage-server.logrotate.conf
Normal file
9
misc/packaging/openrc/storage-server.logrotate.conf
Normal file
@ -0,0 +1,9 @@
|
||||
/var/log/storage-server/storage-server.log {
|
||||
missingok
|
||||
sharedscripts
|
||||
compress
|
||||
rotate 7
|
||||
postrotate
|
||||
/etc/init.d/storage-server restart
|
||||
endscript
|
||||
}
|
@ -3,7 +3,7 @@
|
||||
command="/usr/bin/storage-server"
|
||||
command_args="run"
|
||||
supervisor=supervise-daemon
|
||||
output_log="/var/log/storage-server.log"
|
||||
output_log="/var/log/storage-server/storage-server.log"
|
||||
error_log="$output_log"
|
||||
|
||||
depend() {
|
||||
|
@ -46,11 +46,11 @@ func NewPromiseProxyFrom(rt *goja.Runtime) *PromiseProxy {
|
||||
return NewPromiseProxy(promise, resolve, reject)
|
||||
}
|
||||
|
||||
func IsPromise(v goja.Value) (*goja.Promise, bool) {
|
||||
func isPromise(v any) (*goja.Promise, bool) {
|
||||
if v == nil {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
promise, ok := v.Export().(*goja.Promise)
|
||||
promise, ok := v.(*goja.Promise)
|
||||
return promise, ok
|
||||
}
|
||||
|
@ -23,7 +23,7 @@ type Server struct {
|
||||
modules []ServerModule
|
||||
}
|
||||
|
||||
func (s *Server) ExecFuncByName(ctx context.Context, funcName string, args ...interface{}) (any, error) {
|
||||
func (s *Server) ExecFuncByName(ctx context.Context, funcName string, args ...any) (any, error) {
|
||||
ctx = logger.With(ctx, logger.F("function", funcName), logger.F("args", args))
|
||||
|
||||
ret, err := s.Exec(ctx, funcName, args...)
|
||||
@ -34,9 +34,9 @@ func (s *Server) ExecFuncByName(ctx context.Context, funcName string, args ...in
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (s *Server) Exec(ctx context.Context, callableOrFuncname any, args ...interface{}) (any, error) {
|
||||
func (s *Server) Exec(ctx context.Context, callableOrFuncname any, args ...any) (any, error) {
|
||||
type result struct {
|
||||
value goja.Value
|
||||
value any
|
||||
err error
|
||||
}
|
||||
|
||||
@ -110,7 +110,7 @@ func (s *Server) Exec(ctx context.Context, callableOrFuncname any, args ...inter
|
||||
}
|
||||
|
||||
done <- result{
|
||||
value: value,
|
||||
value: value.Export(),
|
||||
}
|
||||
|
||||
logger.Debug(ctx, "executed callable", logger.F("callable", callableOrFuncname), logger.F("duration", time.Since(start).String()))
|
||||
@ -129,20 +129,18 @@ func (s *Server) Exec(ctx context.Context, callableOrFuncname any, args ...inter
|
||||
return nil, errors.WithStack(result.err)
|
||||
}
|
||||
|
||||
value := result.value
|
||||
|
||||
if promise, ok := IsPromise(value); ok {
|
||||
value = s.waitForPromise(promise)
|
||||
if promise, ok := isPromise(result.value); ok {
|
||||
return s.waitForPromise(promise), nil
|
||||
}
|
||||
|
||||
return value.Export(), nil
|
||||
return result.value, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) waitForPromise(promise *goja.Promise) goja.Value {
|
||||
func (s *Server) waitForPromise(promise *goja.Promise) any {
|
||||
var (
|
||||
wg sync.WaitGroup
|
||||
value goja.Value
|
||||
value any
|
||||
)
|
||||
|
||||
wg.Add(1)
|
||||
@ -162,7 +160,7 @@ func (s *Server) waitForPromise(promise *goja.Promise) goja.Value {
|
||||
return
|
||||
}
|
||||
|
||||
value = promise.Result()
|
||||
value = promise.Result().Export()
|
||||
|
||||
breakLoop = true
|
||||
})
|
||||
|
@ -6,7 +6,6 @@ import (
|
||||
"net/http"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/bus"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type contextKey string
|
||||
@ -15,15 +14,16 @@ var (
|
||||
contextKeyBus contextKey = "bus"
|
||||
contextKeyHTTPRequest contextKey = "httpRequest"
|
||||
contextKeyHTTPClient contextKey = "httpClient"
|
||||
contextKeySessionID contextKey = "sessionId"
|
||||
)
|
||||
|
||||
func (h *Handler) contextMiddleware(next http.Handler) http.Handler {
|
||||
fn := func(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
|
||||
ctx = context.WithValue(ctx, contextKeyBus, h.bus)
|
||||
ctx = context.WithValue(ctx, contextKeyHTTPRequest, r)
|
||||
ctx = context.WithValue(ctx, contextKeyHTTPClient, h.httpClient)
|
||||
ctx = WithContextBus(ctx, h.bus)
|
||||
ctx = WithContextHTTPRequest(ctx, r)
|
||||
ctx = WithContextHTTPClient(ctx, h.httpClient)
|
||||
|
||||
r = r.WithContext(ctx)
|
||||
|
||||
@ -33,23 +33,43 @@ func (h *Handler) contextMiddleware(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(fn)
|
||||
}
|
||||
|
||||
func ContextBus(ctx context.Context) bus.Bus {
|
||||
func ContextBus(ctx context.Context) (bus.Bus, bool) {
|
||||
return contextValue[bus.Bus](ctx, contextKeyBus)
|
||||
}
|
||||
|
||||
func ContextHTTPRequest(ctx context.Context) *http.Request {
|
||||
func WithContextBus(parent context.Context, bus bus.Bus) context.Context {
|
||||
return context.WithValue(parent, contextKeyBus, bus)
|
||||
}
|
||||
|
||||
func ContextHTTPRequest(ctx context.Context) (*http.Request, bool) {
|
||||
return contextValue[*http.Request](ctx, contextKeyHTTPRequest)
|
||||
}
|
||||
|
||||
func ContextHTTPClient(ctx context.Context) *http.Client {
|
||||
func WithContextHTTPRequest(parent context.Context, request *http.Request) context.Context {
|
||||
return context.WithValue(parent, contextKeyHTTPRequest, request)
|
||||
}
|
||||
|
||||
func ContextHTTPClient(ctx context.Context) (*http.Client, bool) {
|
||||
return contextValue[*http.Client](ctx, contextKeyHTTPClient)
|
||||
}
|
||||
|
||||
func contextValue[T any](ctx context.Context, key any) T {
|
||||
func WithContextHTTPClient(parent context.Context, client *http.Client) context.Context {
|
||||
return context.WithValue(parent, contextKeyHTTPClient, client)
|
||||
}
|
||||
|
||||
func ContextSessionID(ctx context.Context) (string, bool) {
|
||||
return contextValue[string](ctx, contextKeySessionID)
|
||||
}
|
||||
|
||||
func WithContextSessionID(parent context.Context, sessionID string) context.Context {
|
||||
return context.WithValue(parent, contextKeySessionID, sessionID)
|
||||
}
|
||||
|
||||
func contextValue[T any](ctx context.Context, key any) (T, bool) {
|
||||
value, ok := ctx.Value(key).(T)
|
||||
if !ok {
|
||||
panic(errors.Errorf("could not find key '%v' on context", key))
|
||||
return *new(T), false
|
||||
}
|
||||
|
||||
return value
|
||||
return value, true
|
||||
}
|
||||
|
@ -102,14 +102,12 @@ func NewHandler(funcs ...HandlerOptionFunc) *Handler {
|
||||
r.Get("/client.js.map", handler.handleSDKClientMap)
|
||||
})
|
||||
|
||||
r.Group(func(r chi.Router) {
|
||||
r.Use(handler.contextMiddleware)
|
||||
for _, fn := range opts.HTTPMounts {
|
||||
r.Group(func(r chi.Router) {
|
||||
fn(r)
|
||||
})
|
||||
}
|
||||
})
|
||||
for _, fn := range opts.HTTPMounts {
|
||||
r.Group(func(r chi.Router) {
|
||||
r.Use(handler.contextMiddleware)
|
||||
fn(r)
|
||||
})
|
||||
}
|
||||
|
||||
r.HandleFunc("/sock/*", handler.handleSockJS)
|
||||
})
|
||||
|
@ -5,7 +5,6 @@ import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/module"
|
||||
"github.com/igm/sockjs-go/v3/sockjs"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
@ -15,11 +14,6 @@ const (
|
||||
statusChannelClosed = iota
|
||||
)
|
||||
|
||||
const (
|
||||
ContextKeySessionID module.ContextKey = "sessionId"
|
||||
ContextKeyOriginRequest module.ContextKey = "originRequest"
|
||||
)
|
||||
|
||||
func (h *Handler) handleSockJS(w http.ResponseWriter, r *http.Request) {
|
||||
h.mutex.RLock()
|
||||
defer h.mutex.RUnlock()
|
||||
@ -181,10 +175,8 @@ func (h *Handler) handleIncomingMessages(ctx context.Context, sess sockjs.Sessio
|
||||
}
|
||||
|
||||
ctx := logger.With(ctx, logger.F("payload", payload))
|
||||
ctx = module.WithContext(ctx, map[module.ContextKey]any{
|
||||
ContextKeySessionID: sess.ID(),
|
||||
ContextKeyOriginRequest: sess.Request(),
|
||||
})
|
||||
ctx = WithContextHTTPRequest(ctx, sess.Request())
|
||||
ctx = WithContextSessionID(ctx, sess.ID())
|
||||
|
||||
incomingMessage := NewIncomingMessageEnvelope(ctx, payload)
|
||||
|
||||
|
@ -1,10 +1,8 @@
|
||||
package auth
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/app"
|
||||
edgeHTTP "forge.cadoles.com/arcad/edge/pkg/http"
|
||||
edgehttp "forge.cadoles.com/arcad/edge/pkg/http"
|
||||
"forge.cadoles.com/arcad/edge/pkg/jwtutil"
|
||||
"forge.cadoles.com/arcad/edge/pkg/module/util"
|
||||
"github.com/dop251/goja"
|
||||
@ -68,7 +66,7 @@ func (m *Module) getClaim(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
|
||||
ctx := util.AssertContext(call.Argument(0), rt)
|
||||
claimName := util.AssertString(call.Argument(1), rt)
|
||||
|
||||
req, ok := ctx.Value(edgeHTTP.ContextKeyOriginRequest).(*http.Request)
|
||||
req, ok := edgehttp.ContextHTTPRequest(ctx)
|
||||
if !ok {
|
||||
panic(rt.ToValue(errors.New("could not find http request in context")))
|
||||
}
|
||||
|
@ -9,7 +9,7 @@ import (
|
||||
|
||||
"cdr.dev/slog"
|
||||
"forge.cadoles.com/arcad/edge/pkg/app"
|
||||
edgeHTTP "forge.cadoles.com/arcad/edge/pkg/http"
|
||||
edgehttp "forge.cadoles.com/arcad/edge/pkg/http"
|
||||
"forge.cadoles.com/arcad/edge/pkg/jwtutil"
|
||||
"forge.cadoles.com/arcad/edge/pkg/module"
|
||||
"github.com/lestrrat-go/jwx/v2/jwa"
|
||||
@ -71,7 +71,7 @@ func TestAuthModule(t *testing.T) {
|
||||
|
||||
req.Header.Add("Authorization", "Bearer "+string(rawToken))
|
||||
|
||||
ctx = context.WithValue(context.Background(), edgeHTTP.ContextKeyOriginRequest, req)
|
||||
ctx = edgehttp.WithContextHTTPRequest(context.Background(), req)
|
||||
|
||||
if _, err := server.ExecFuncByName(ctx, "testAuth", ctx); err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
@ -111,7 +111,7 @@ func TestAuthAnonymousModule(t *testing.T) {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
ctx = context.WithValue(context.Background(), edgeHTTP.ContextKeyOriginRequest, req)
|
||||
ctx = edgehttp.WithContextHTTPRequest(context.Background(), req)
|
||||
|
||||
if _, err := server.ExecFuncByName(ctx, "testAuth", ctx); err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
|
@ -2,6 +2,7 @@ package blob
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"io/fs"
|
||||
"mime/multipart"
|
||||
"net/http"
|
||||
@ -21,19 +22,17 @@ type uploadResponse struct {
|
||||
BlobID storage.BlobID `json:"blobId"`
|
||||
}
|
||||
|
||||
func Mount(uploadMaxFileSize int) func(r chi.Router) {
|
||||
func Mount(uploadMaxFileSize int64) func(r chi.Router) {
|
||||
return func(r chi.Router) {
|
||||
r.Post("/api/v1/upload", getAppUploadHandler(uploadMaxFileSize))
|
||||
r.Get("/api/v1/download/{bucket}/{blobID}", handleAppDownload)
|
||||
}
|
||||
}
|
||||
|
||||
func getAppUploadHandler(fileMaxUpload int) func(w http.ResponseWriter, r *http.Request) {
|
||||
func getAppUploadHandler(uploadMaxFileSize int64) func(w http.ResponseWriter, r *http.Request) {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
|
||||
uploadMaxFileSize := int64(8000)
|
||||
|
||||
r.Body = http.MaxBytesReader(w, r.Body, uploadMaxFileSize)
|
||||
|
||||
if err := r.ParseMultipartForm(uploadMaxFileSize); err != nil {
|
||||
@ -65,7 +64,13 @@ func getAppUploadHandler(fileMaxUpload int) func(w http.ResponseWriter, r *http.
|
||||
|
||||
requestEnv := NewUploadRequestEnvelope(ctx, fileHeader, metadata)
|
||||
|
||||
bus := edgehttp.ContextBus(ctx)
|
||||
bus, ok := edgehttp.ContextBus(ctx)
|
||||
if !ok {
|
||||
logger.Error(ctx, "could find bus on context")
|
||||
edgehttp.JSONError(w, http.StatusInternalServerError, edgehttp.ErrCodeInternalError)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
reply, err := bus.Request(ctx, requestEnv)
|
||||
if err != nil {
|
||||
@ -114,7 +119,13 @@ func handleAppDownload(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
requestMsg := NewDownloadRequestEnvelope(ctx, bucket, storage.BlobID(blobID))
|
||||
|
||||
bs := edgehttp.ContextBus(ctx)
|
||||
bs, ok := edgehttp.ContextBus(ctx)
|
||||
if !ok {
|
||||
logger.Error(ctx, "could find bus on context")
|
||||
edgehttp.JSONError(w, http.StatusInternalServerError, edgehttp.ErrCodeInternalError)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
reply, err := bs.Request(ctx, requestMsg)
|
||||
if err != nil {
|
||||
@ -154,7 +165,14 @@ func handleAppDownload(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}()
|
||||
|
||||
http.ServeContent(w, r, string(replyMessage.BlobInfo.ID()), replyMessage.BlobInfo.ModTime(), replyMessage.Blob)
|
||||
// TODO Fix usage of ServeContent
|
||||
// http.ServeContent(w, r, string(replyMessage.BlobInfo.ID()), replyMessage.BlobInfo.ModTime(), replyMessage.Blob)
|
||||
|
||||
w.Header().Add("Content-Type", replyMessage.BlobInfo.ContentType())
|
||||
|
||||
if _, err := io.Copy(w, replyMessage.Blob); err != nil {
|
||||
logger.Error(ctx, "could not write blob", logger.CapturedE(errors.WithStack(err)))
|
||||
}
|
||||
}
|
||||
|
||||
type uploadedFile struct {
|
||||
|
@ -31,7 +31,13 @@ func handleAppFetch(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
requestMsg := NewFetchRequestEnvelope(ctx, r.RemoteAddr, url)
|
||||
|
||||
bus := edgehttp.ContextBus(ctx)
|
||||
bus, ok := edgehttp.ContextBus(ctx)
|
||||
if !ok {
|
||||
logger.Error(ctx, "could find bus on context")
|
||||
edgehttp.JSONError(w, http.StatusInternalServerError, edgehttp.ErrCodeInternalError)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
reply, err := bus.Request(ctx, requestMsg)
|
||||
if err != nil {
|
||||
@ -79,7 +85,13 @@ func handleAppFetch(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
proxyReq.Header.Add("X-Forwarded-From", r.RemoteAddr)
|
||||
|
||||
httpClient := edgehttp.ContextHTTPClient(ctx)
|
||||
httpClient, ok := edgehttp.ContextHTTPClient(ctx)
|
||||
if !ok {
|
||||
logger.Error(ctx, "could find http client on context")
|
||||
edgehttp.JSONError(w, http.StatusInternalServerError, edgehttp.ErrCodeInternalError)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
res, err := httpClient.Do(proxyReq)
|
||||
if err != nil {
|
||||
|
@ -6,7 +6,6 @@ import (
|
||||
"forge.cadoles.com/arcad/edge/pkg/app"
|
||||
"forge.cadoles.com/arcad/edge/pkg/bus"
|
||||
edgehttp "forge.cadoles.com/arcad/edge/pkg/http"
|
||||
"forge.cadoles.com/arcad/edge/pkg/module"
|
||||
"forge.cadoles.com/arcad/edge/pkg/module/util"
|
||||
"github.com/dop251/goja"
|
||||
"github.com/pkg/errors"
|
||||
@ -57,7 +56,10 @@ func (m *Module) send(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
|
||||
sessionID, ok := firstArg.Export().(string)
|
||||
if !ok {
|
||||
ctx := util.AssertContext(firstArg, rt)
|
||||
sessionID = module.ContextValue[string](ctx, edgehttp.ContextKeySessionID)
|
||||
sessionID, ok = edgehttp.ContextSessionID(ctx)
|
||||
if !ok {
|
||||
panic(rt.ToValue(errors.New("could not find session id in context")))
|
||||
}
|
||||
}
|
||||
|
||||
data := call.Argument(1).Export()
|
||||
|
@ -7,7 +7,6 @@ import (
|
||||
"forge.cadoles.com/arcad/edge/pkg/app"
|
||||
"forge.cadoles.com/arcad/edge/pkg/bus"
|
||||
edgehttp "forge.cadoles.com/arcad/edge/pkg/http"
|
||||
"forge.cadoles.com/arcad/edge/pkg/module"
|
||||
"forge.cadoles.com/arcad/edge/pkg/module/util"
|
||||
"github.com/dop251/goja"
|
||||
"github.com/pkg/errors"
|
||||
@ -143,10 +142,15 @@ func (m *Module) handleIncomingHTTPMessages(ctx context.Context, incoming <-chan
|
||||
continue
|
||||
}
|
||||
|
||||
requestCtx := logger.With(msg.Context, logger.F("rpcRequestMethod", jsonReq.Method), logger.F("rpcRequestID", jsonReq.ID))
|
||||
sessionID, ok := edgehttp.ContextSessionID(msg.Context)
|
||||
if !ok {
|
||||
logger.Error(ctx, "could not find session id in context")
|
||||
continue
|
||||
}
|
||||
|
||||
request := NewRequestEnvelope(msg.Context, jsonReq.Method, jsonReq.Params)
|
||||
sessionID := module.ContextValue[string](msg.Context, edgehttp.ContextKeySessionID)
|
||||
|
||||
requestCtx := logger.With(msg.Context, logger.F("rpcRequestMethod", jsonReq.Method), logger.F("rpcRequestID", jsonReq.ID))
|
||||
|
||||
reply, err := m.bus.Request(requestCtx, request)
|
||||
if err != nil {
|
||||
|
@ -9,12 +9,16 @@ import (
|
||||
"github.com/oklog/ulid/v2"
|
||||
)
|
||||
|
||||
var ErrDocumentNotFound = errors.New("document not found")
|
||||
var (
|
||||
ErrDocumentNotFound = errors.New("document not found")
|
||||
ErrDocumentRevisionConflict = errors.New("document revision conflict")
|
||||
)
|
||||
|
||||
type DocumentID string
|
||||
|
||||
const (
|
||||
DocumentAttrID = "_id"
|
||||
DocumentAttrRevision = "_revision"
|
||||
DocumentAttrCreatedAt = "_createdAt"
|
||||
DocumentAttrUpdatedAt = "_updatedAt"
|
||||
)
|
||||
@ -44,6 +48,20 @@ func (d Document) ID() (DocumentID, bool) {
|
||||
return "", false
|
||||
}
|
||||
|
||||
func (d Document) Revision() (int, bool) {
|
||||
rawRevision, exists := d[DocumentAttrRevision]
|
||||
if !exists {
|
||||
return 0, false
|
||||
}
|
||||
|
||||
revision, ok := rawRevision.(int)
|
||||
if ok {
|
||||
return revision, true
|
||||
}
|
||||
|
||||
return 0, false
|
||||
}
|
||||
|
||||
func (d Document) CreatedAt() (time.Time, bool) {
|
||||
return d.timeAttr(DocumentAttrCreatedAt)
|
||||
}
|
||||
|
142
pkg/storage/driver/cache/blob_bucket.go
vendored
142
pkg/storage/driver/cache/blob_bucket.go
vendored
@ -6,27 +6,28 @@ import (
|
||||
"io"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"github.com/allegro/bigcache/v3"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
type BlobBucket struct {
|
||||
bucket storage.BlobBucket
|
||||
cache *bigcache.BigCache
|
||||
bucket storage.BlobBucket
|
||||
blobCache *lfu.Cache[string, []byte]
|
||||
bucketCache *lfu.Cache[string, storage.BlobBucket]
|
||||
blobInfoCache *lfu.Cache[string, storage.BlobInfo]
|
||||
}
|
||||
|
||||
// Close implements storage.BlobBucket.
|
||||
func (b *BlobBucket) Close() error {
|
||||
if err := b.bucket.Close(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
// Close only when bucket is evicted from cache
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete implements storage.BlobBucket.
|
||||
func (b *BlobBucket) Delete(ctx context.Context, id storage.BlobID) error {
|
||||
defer b.clearCache(ctx, id)
|
||||
|
||||
if err := b.bucket.Delete(ctx, id); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
@ -36,11 +37,49 @@ func (b *BlobBucket) Delete(ctx context.Context, id storage.BlobID) error {
|
||||
|
||||
// Get implements storage.BlobBucket.
|
||||
func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobInfo, error) {
|
||||
key := b.getCacheKey(id)
|
||||
blobInfo, err := b.blobInfoCache.Get(key)
|
||||
if err != nil && !errors.Is(err, lfu.ErrNotFound) {
|
||||
logger.Error(
|
||||
ctx, "could not retrieve blob info from cache",
|
||||
logger.F("cacheKey", key),
|
||||
logger.CapturedE(errors.WithStack(err)),
|
||||
)
|
||||
}
|
||||
|
||||
if blobInfo != nil {
|
||||
logger.Debug(
|
||||
ctx, "found blob info in cache",
|
||||
logger.F("cacheKey", key),
|
||||
)
|
||||
|
||||
return blobInfo, nil
|
||||
}
|
||||
|
||||
info, err := b.bucket.Get(ctx, id)
|
||||
if err != nil {
|
||||
if errors.Is(err, storage.ErrBucketClosed) {
|
||||
b.clearCache(ctx, id)
|
||||
if err := b.bucketCache.Delete(b.Name()); err != nil && !errors.Is(err, lfu.ErrNotFound) {
|
||||
logger.Error(
|
||||
ctx, "could not delete bucket from cache",
|
||||
logger.F("cacheKey", b.Name()),
|
||||
logger.CapturedE(errors.WithStack(err)),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := b.blobInfoCache.Set(key, info); err != nil {
|
||||
logger.Error(
|
||||
ctx, "could not set blob info in cache",
|
||||
logger.F("cacheKey", key),
|
||||
logger.CapturedE(errors.WithStack(err)),
|
||||
)
|
||||
}
|
||||
|
||||
return info, nil
|
||||
}
|
||||
|
||||
@ -48,9 +87,30 @@ func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobIn
|
||||
func (b *BlobBucket) List(ctx context.Context) ([]storage.BlobInfo, error) {
|
||||
infos, err := b.bucket.List(ctx)
|
||||
if err != nil {
|
||||
if errors.Is(err, storage.ErrBucketClosed) {
|
||||
if err := b.bucketCache.Delete(b.Name()); err != nil && !errors.Is(err, lfu.ErrNotFound) {
|
||||
logger.Error(
|
||||
ctx, "could not delete bucket from cache",
|
||||
logger.F("cacheKey", b.Name()),
|
||||
logger.CapturedE(errors.WithStack(err)),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
for _, ifo := range infos {
|
||||
key := b.getCacheKey(ifo.ID())
|
||||
if err := b.blobInfoCache.Set(key, ifo); err != nil {
|
||||
logger.Error(
|
||||
ctx, "could not set blob info in cache",
|
||||
logger.F("cacheKey", key),
|
||||
logger.CapturedE(errors.WithStack(err)),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
return infos, nil
|
||||
}
|
||||
|
||||
@ -61,19 +121,33 @@ func (b *BlobBucket) Name() string {
|
||||
|
||||
// NewReader implements storage.BlobBucket.
|
||||
func (b *BlobBucket) NewReader(ctx context.Context, id storage.BlobID) (io.ReadSeekCloser, error) {
|
||||
if cached, exist := b.inCache(id); exist {
|
||||
logger.Debug(ctx, "found blob in cache", logger.F("cacheKey", b.getCacheKey(id)), logger.F("cacheStats", b.cache.Stats()))
|
||||
if cached, exist := b.inContentCache(id); exist {
|
||||
logger.Debug(
|
||||
ctx, "found blob content in cache",
|
||||
logger.F("cacheKey", b.getCacheKey(id)),
|
||||
)
|
||||
return cached, nil
|
||||
}
|
||||
|
||||
reader, err := b.bucket.NewReader(ctx, id)
|
||||
if err != nil {
|
||||
if errors.Is(err, storage.ErrBucketClosed) {
|
||||
b.clearCache(ctx, id)
|
||||
if err := b.bucketCache.Delete(b.Name()); err != nil && !errors.Is(err, lfu.ErrNotFound) {
|
||||
logger.Error(
|
||||
ctx, "could not delete bucket from cache",
|
||||
logger.F("cacheKey", b.Name()),
|
||||
logger.CapturedE(errors.WithStack(err)),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return &readCacher{
|
||||
reader: reader,
|
||||
cache: b.cache,
|
||||
cache: b.blobCache,
|
||||
key: b.getCacheKey(id),
|
||||
}, nil
|
||||
}
|
||||
@ -82,15 +156,15 @@ func (b *BlobBucket) getCacheKey(id storage.BlobID) string {
|
||||
return fmt.Sprintf("%s-%s", b.Name(), id)
|
||||
}
|
||||
|
||||
func (b *BlobBucket) inCache(id storage.BlobID) (io.ReadSeekCloser, bool) {
|
||||
func (b *BlobBucket) inContentCache(id storage.BlobID) (io.ReadSeekCloser, bool) {
|
||||
key := b.getCacheKey(id)
|
||||
data, err := b.cache.Get(key)
|
||||
data, err := b.blobCache.Get(key)
|
||||
if err != nil {
|
||||
if errors.Is(err, bigcache.ErrEntryNotFound) {
|
||||
if errors.Is(err, lfu.ErrNotFound) {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
logger.Error(context.Background(), "could not retrieve cache value", logger.CapturedE(errors.WithStack(err)))
|
||||
logger.Error(context.Background(), "could not retrieve cached value", logger.CapturedE(errors.WithStack(err)))
|
||||
|
||||
return nil, false
|
||||
}
|
||||
@ -98,10 +172,40 @@ func (b *BlobBucket) inCache(id storage.BlobID) (io.ReadSeekCloser, bool) {
|
||||
return &cachedReader{data, 0}, true
|
||||
}
|
||||
|
||||
func (b *BlobBucket) clearCache(ctx context.Context, id storage.BlobID) {
|
||||
key := b.getCacheKey(id)
|
||||
|
||||
logger.Debug(ctx, "clearing cache", logger.F("cacheKey", key))
|
||||
|
||||
if err := b.blobCache.Delete(key); err != nil && !errors.Is(err, lfu.ErrNotFound) {
|
||||
logger.Error(ctx, "could not clear cache", logger.F("cacheKey", key), logger.CapturedE(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
if err := b.blobInfoCache.Delete(key); err != nil {
|
||||
logger.Error(
|
||||
ctx, "could not delete blob info from cache",
|
||||
logger.F("cacheKey", key),
|
||||
logger.CapturedE(errors.WithStack(err)),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// NewWriter implements storage.BlobBucket.
|
||||
func (b *BlobBucket) NewWriter(ctx context.Context, id storage.BlobID) (io.WriteCloser, error) {
|
||||
defer b.clearCache(ctx, id)
|
||||
|
||||
writer, err := b.bucket.NewWriter(ctx, id)
|
||||
if err != nil {
|
||||
if errors.Is(err, storage.ErrBucketClosed) {
|
||||
if err := b.bucketCache.Delete(b.Name()); err != nil && !errors.Is(err, lfu.ErrNotFound) {
|
||||
logger.Error(
|
||||
ctx, "could not delete bucket from cache",
|
||||
logger.F("cacheKey", b.Name()),
|
||||
logger.CapturedE(errors.WithStack(err)),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
@ -112,6 +216,16 @@ func (b *BlobBucket) NewWriter(ctx context.Context, id storage.BlobID) (io.Write
|
||||
func (b *BlobBucket) Size(ctx context.Context) (int64, error) {
|
||||
size, err := b.bucket.Size(ctx)
|
||||
if err != nil {
|
||||
if errors.Is(err, storage.ErrBucketClosed) {
|
||||
if err := b.bucketCache.Delete(b.Name()); err != nil && !errors.Is(err, lfu.ErrNotFound) {
|
||||
logger.Error(
|
||||
ctx, "could not delete bucket from cache",
|
||||
logger.F("cacheKey", b.Name()),
|
||||
logger.CapturedE(errors.WithStack(err)),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
return 0, errors.WithStack(err)
|
||||
}
|
||||
|
||||
|
83
pkg/storage/driver/cache/blob_store.go
vendored
83
pkg/storage/driver/cache/blob_store.go
vendored
@ -4,13 +4,16 @@ import (
|
||||
"context"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"github.com/allegro/bigcache/v3"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
type BlobStore struct {
|
||||
store storage.BlobStore
|
||||
cache *bigcache.BigCache
|
||||
store storage.BlobStore
|
||||
blobCache *lfu.Cache[string, []byte]
|
||||
bucketCache *lfu.Cache[string, storage.BlobBucket]
|
||||
blobInfoCache *lfu.Cache[string, storage.BlobInfo]
|
||||
}
|
||||
|
||||
// DeleteBucket implements storage.BlobStore.
|
||||
@ -19,6 +22,8 @@ func (s *BlobStore) DeleteBucket(ctx context.Context, name string) error {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
s.bucketCache.Delete(name)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -34,22 +39,80 @@ func (s *BlobStore) ListBuckets(ctx context.Context) ([]string, error) {
|
||||
|
||||
// OpenBucket implements storage.BlobStore.
|
||||
func (s *BlobStore) OpenBucket(ctx context.Context, name string) (storage.BlobBucket, error) {
|
||||
bucket, err := s.store.OpenBucket(ctx, name)
|
||||
bucket, err := s.bucketCache.Get(name)
|
||||
if err == nil {
|
||||
logger.Debug(ctx, "found bucket in cache", logger.F("name", name))
|
||||
|
||||
return &BlobBucket{
|
||||
bucket: bucket,
|
||||
blobCache: s.blobCache,
|
||||
blobInfoCache: s.blobInfoCache,
|
||||
bucketCache: s.bucketCache,
|
||||
}, nil
|
||||
}
|
||||
|
||||
if err != nil && !errors.Is(err, lfu.ErrNotFound) {
|
||||
logger.Error(ctx, "could not retrieve bucket from cache",
|
||||
logger.F("cacheKey", name),
|
||||
logger.CapturedE(errors.WithStack(err)),
|
||||
)
|
||||
}
|
||||
|
||||
bucket, err = s.store.OpenBucket(ctx, name)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := s.bucketCache.Set(name, bucket); err != nil {
|
||||
logger.Error(ctx, "could not set bucket in cache",
|
||||
logger.F("cacheKey", name),
|
||||
logger.CapturedE(errors.WithStack(err)),
|
||||
)
|
||||
}
|
||||
|
||||
return &BlobBucket{
|
||||
bucket: bucket,
|
||||
cache: s.cache,
|
||||
bucket: bucket,
|
||||
blobCache: s.blobCache,
|
||||
blobInfoCache: s.blobInfoCache,
|
||||
bucketCache: s.bucketCache,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewBlobStore(store storage.BlobStore, cache *bigcache.BigCache) *BlobStore {
|
||||
func NewBlobStore(store storage.BlobStore, funcs ...OptionFunc) (*BlobStore, error) {
|
||||
options := NewOptions(funcs...)
|
||||
|
||||
blobCache := lfu.NewCache[string, []byte](
|
||||
options.BlobCacheStore,
|
||||
lfu.WithTTL[string, []byte](options.CacheTTL),
|
||||
lfu.WithCapacity[string, []byte](options.BlobCacheSize),
|
||||
lfu.WithGetValueSize[string, []byte](func(value []byte) (int, error) {
|
||||
return len(value), nil
|
||||
}),
|
||||
)
|
||||
|
||||
blobBucketCache := lfu.NewCache[string, storage.BlobBucket](
|
||||
options.BlobBucketCacheStore,
|
||||
lfu.WithCapacity[string, storage.BlobBucket](options.BlobBucketCacheSize),
|
||||
lfu.WithGetValueSize[string, storage.BlobBucket](func(value storage.BlobBucket) (int, error) {
|
||||
return 1, nil
|
||||
}),
|
||||
)
|
||||
|
||||
blobInfoCache := lfu.NewCache[string, storage.BlobInfo](
|
||||
options.BlobInfoCacheStore,
|
||||
lfu.WithTTL[string, storage.BlobInfo](options.CacheTTL),
|
||||
lfu.WithCapacity[string, storage.BlobInfo](options.BlobInfoCacheSize),
|
||||
lfu.WithGetValueSize[string, storage.BlobInfo](func(value storage.BlobInfo) (int, error) {
|
||||
return 1, nil
|
||||
}),
|
||||
)
|
||||
|
||||
return &BlobStore{
|
||||
store: store,
|
||||
cache: cache,
|
||||
}
|
||||
store: store,
|
||||
blobCache: blobCache,
|
||||
bucketCache: blobBucketCache,
|
||||
blobInfoCache: blobInfoCache,
|
||||
}, nil
|
||||
}
|
||||
|
||||
var _ storage.BlobStore = &BlobStore{}
|
||||
|
9
pkg/storage/driver/cache/blob_store_test.go
vendored
9
pkg/storage/driver/cache/blob_store_test.go
vendored
@ -9,7 +9,6 @@ import (
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/testsuite"
|
||||
"github.com/allegro/bigcache/v3"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
@ -30,13 +29,11 @@ func TestBlobStore(t *testing.T) {
|
||||
|
||||
backend := sqlite.NewBlobStore(dsn)
|
||||
|
||||
cache, err := bigcache.New(context.Background(), bigcache.DefaultConfig(time.Minute))
|
||||
store, err := NewBlobStore(backend)
|
||||
if err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
store := NewBlobStore(backend, cache)
|
||||
|
||||
testsuite.TestBlobStore(context.Background(), t, store)
|
||||
}
|
||||
|
||||
@ -52,12 +49,10 @@ func BenchmarkBlobStore(t *testing.B) {
|
||||
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
|
||||
backend := sqlite.NewBlobStore(dsn)
|
||||
|
||||
cache, err := bigcache.New(context.Background(), bigcache.DefaultConfig(time.Minute))
|
||||
store, err := NewBlobStore(backend)
|
||||
if err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
store := NewBlobStore(backend, cache)
|
||||
|
||||
testsuite.BenchmarkBlobStore(t, store)
|
||||
}
|
||||
|
251
pkg/storage/driver/cache/driver.go
vendored
251
pkg/storage/driver/cache/driver.go
vendored
@ -1,17 +1,19 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"bytes"
|
||||
"io"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver"
|
||||
"github.com/allegro/bigcache/v3"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu/fs"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu/memory"
|
||||
"github.com/inhies/go-bytesize"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -28,54 +30,89 @@ func blobStoreFactory(dsn *url.URL) (storage.BlobStore, error) {
|
||||
|
||||
query.Del("driver")
|
||||
|
||||
cacheTTL := time.Minute * 60
|
||||
blobStoreOptionFuncs := make([]OptionFunc, 0)
|
||||
|
||||
rawCacheTTL := query.Get("cacheTTL")
|
||||
if rawCacheTTL != "" {
|
||||
query.Del("cacheTTL")
|
||||
|
||||
ttl, err := time.ParseDuration(rawCacheTTL)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not parse url parameter 'cacheTTL'")
|
||||
cacheTTL, err := parseDuration(&query, "cacheTTL")
|
||||
if err != nil {
|
||||
if !errors.Is(err, errNotFound) {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
cacheTTL = ttl
|
||||
cacheTTL = time.Hour
|
||||
}
|
||||
|
||||
cacheConfig := bigcache.DefaultConfig(cacheTTL)
|
||||
cacheConfig.Logger = &cacheLogger{}
|
||||
blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithCacheTTL(cacheTTL))
|
||||
|
||||
rawCacheShards := query.Get("cacheShards")
|
||||
if rawCacheShards != "" {
|
||||
query.Del("cacheShards")
|
||||
|
||||
cacheShards, err := strconv.ParseInt(rawCacheShards, 10, 32)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not parse url parameter 'cacheShards'")
|
||||
blobBucketCacheSize, err := parseInt(&query, "blobBucketCacheSize")
|
||||
if err != nil {
|
||||
if !errors.Is(err, errNotFound) {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
cacheConfig.Shards = int(cacheShards)
|
||||
blobBucketCacheSize = 16
|
||||
}
|
||||
|
||||
rawMaxCacheSize := query.Get("maxCacheSize")
|
||||
if rawMaxCacheSize != "" {
|
||||
query.Del("maxCacheSize")
|
||||
blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBlobBucketCacheSize(int(blobBucketCacheSize)))
|
||||
|
||||
maxCacheSize, err := strconv.ParseInt(rawMaxCacheSize, 10, 32)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not parse url parameter 'maxCacheSize'")
|
||||
}
|
||||
|
||||
// See cacheConfig.HardMaxCacheSize documentation
|
||||
var minCacheSize int64 = (2 * (64 + 32) * int64(cacheConfig.Shards)) / 1000
|
||||
|
||||
if maxCacheSize < minCacheSize {
|
||||
return nil, errors.Errorf("max cache size can not be set to a value below '%d'", minCacheSize)
|
||||
}
|
||||
|
||||
cacheConfig.HardMaxCacheSize = int(maxCacheSize)
|
||||
blobBucketCacheStorePrefix := "blobBucketCacheStore"
|
||||
blobBucketCacheStore, err := parseCacheStore[string, storage.BlobBucket](&query, blobBucketCacheStorePrefix)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBlobBucketCacheStore(blobBucketCacheStore))
|
||||
|
||||
bloInfoCacheSize, err := parseInt(&query, "bloInfoCacheSize")
|
||||
if err != nil {
|
||||
if !errors.Is(err, errNotFound) {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
bloInfoCacheSize = 16
|
||||
}
|
||||
|
||||
blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBlobInfoCacheSize(int(bloInfoCacheSize)))
|
||||
|
||||
blobInfoCacheStorePrefix := "blobInfoCacheStore"
|
||||
blobInfoCacheStore, err := parseCacheStore[string, storage.BlobInfo](&query, blobInfoCacheStorePrefix)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBlobInfoCacheStore(blobInfoCacheStore))
|
||||
|
||||
blobCacheSize, err := parseByteSize(&query, "blobCacheSize")
|
||||
if err != nil {
|
||||
if !errors.Is(err, errNotFound) {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
blobCacheSize = 256e+6
|
||||
}
|
||||
|
||||
blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBlobCacheSize(int(blobCacheSize)))
|
||||
|
||||
blobCacheStorePrefix := "blobCacheStore"
|
||||
blobCacheStore, err := parseCacheStore[string, []byte](
|
||||
&query, blobCacheStorePrefix,
|
||||
fs.WithMarshalValue[string, []byte](func(value []byte) (io.Reader, error) {
|
||||
return bytes.NewBuffer(value), nil
|
||||
}),
|
||||
fs.WithUnmarshalValue[string, []byte](func(r io.Reader) ([]byte, error) {
|
||||
data, err := io.ReadAll(r)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBlobCacheStore(blobCacheStore))
|
||||
|
||||
url := &url.URL{
|
||||
Scheme: rawDriver,
|
||||
Host: dsn.Host,
|
||||
@ -83,23 +120,145 @@ func blobStoreFactory(dsn *url.URL) (storage.BlobStore, error) {
|
||||
RawQuery: query.Encode(),
|
||||
}
|
||||
|
||||
store, err := driver.NewBlobStore(url.String())
|
||||
backend, err := driver.NewBlobStore(url.String())
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
cache, err := bigcache.New(context.Background(), cacheConfig)
|
||||
store, err := NewBlobStore(backend, blobStoreOptionFuncs...)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return NewBlobStore(store, cache), nil
|
||||
return store, nil
|
||||
}
|
||||
|
||||
type cacheLogger struct{}
|
||||
var errNotFound = errors.New("not found")
|
||||
|
||||
func (l *cacheLogger) Printf(format string, v ...interface{}) {
|
||||
logger.Debug(context.Background(), fmt.Sprintf(format, v...))
|
||||
func parseString(query *url.Values, name string) (string, error) {
|
||||
value := query.Get(name)
|
||||
if value != "" {
|
||||
query.Del(name)
|
||||
return value, nil
|
||||
}
|
||||
|
||||
return "", errors.WithStack(errNotFound)
|
||||
}
|
||||
|
||||
var _ bigcache.Logger = &cacheLogger{}
|
||||
func parseByteSize(query *url.Values, name string) (bytesize.ByteSize, error) {
|
||||
rawValue := query.Get(name)
|
||||
if rawValue != "" {
|
||||
query.Del(name)
|
||||
|
||||
value, err := bytesize.Parse(rawValue)
|
||||
if err != nil {
|
||||
return 0, errors.Wrapf(err, "could not parse url parameter '%s'", name)
|
||||
}
|
||||
|
||||
return value, nil
|
||||
}
|
||||
|
||||
return 0, errors.WithStack(errNotFound)
|
||||
}
|
||||
|
||||
func parseInt(query *url.Values, name string) (int64, error) {
|
||||
rawValue := query.Get(name)
|
||||
if rawValue != "" {
|
||||
query.Del(name)
|
||||
|
||||
value, err := strconv.ParseInt(rawValue, 10, 32)
|
||||
if err != nil {
|
||||
return 0, errors.Wrapf(err, "could not parse url parameter '%s'", name)
|
||||
}
|
||||
|
||||
return value, nil
|
||||
}
|
||||
|
||||
return 0, errors.WithStack(errNotFound)
|
||||
}
|
||||
|
||||
func parseDuration(query *url.Values, name string) (time.Duration, error) {
|
||||
rawValue := query.Get(name)
|
||||
if rawValue != "" {
|
||||
query.Del(name)
|
||||
|
||||
value, err := time.ParseDuration(rawValue)
|
||||
if err != nil {
|
||||
return 0, errors.Wrapf(err, "could not parse url parameter '%s'", name)
|
||||
}
|
||||
|
||||
return value, nil
|
||||
}
|
||||
|
||||
return 0, errors.WithStack(errNotFound)
|
||||
}
|
||||
|
||||
const (
|
||||
storeTypeFS string = "fs"
|
||||
storeTypeMemory string = "memory"
|
||||
)
|
||||
|
||||
func parseCacheStore[K comparable, V any](query *url.Values, prefix string, optionFuncs ...any) (lfu.Store[K, V], error) {
|
||||
storeTypeParam := prefix + "Type"
|
||||
storeType, err := parseString(query, storeTypeParam)
|
||||
if err != nil {
|
||||
if errors.Is(err, errNotFound) {
|
||||
storeType = storeTypeMemory
|
||||
}
|
||||
}
|
||||
|
||||
switch storeType {
|
||||
case storeTypeFS:
|
||||
store, err := parseFSCacheStore[K, V](query, prefix, optionFuncs...)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return store, nil
|
||||
|
||||
case storeTypeMemory:
|
||||
store, err := parseMemoryCacheStore[K, V](query, prefix, optionFuncs...)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return store, nil
|
||||
}
|
||||
|
||||
return nil, errors.Errorf("unexpected store type value '%s' for parameter '%s'", storeType, storeTypeParam)
|
||||
}
|
||||
|
||||
func parseFSCacheStore[K comparable, V any](query *url.Values, prefix string, optionFuncs ...any) (*fs.Store[K, V], error) {
|
||||
baseDirParam := prefix + "BaseDir"
|
||||
baseDir, err := parseString(query, baseDirParam)
|
||||
if err != nil {
|
||||
if errors.Is(err, errNotFound) {
|
||||
return nil, errors.Wrapf(err, "missing required url parameter '%s'", baseDirParam)
|
||||
}
|
||||
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
funcs := make([]fs.OptionsFunc[K, V], 0)
|
||||
|
||||
for _, anyFn := range optionFuncs {
|
||||
fn, ok := anyFn.(fs.OptionsFunc[K, V])
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
funcs = append(funcs, fn)
|
||||
}
|
||||
|
||||
store := fs.NewStore[K, V](baseDir, funcs...)
|
||||
|
||||
if err := store.Clear(); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return store, nil
|
||||
}
|
||||
|
||||
func parseMemoryCacheStore[K comparable, V any](query *url.Values, prefix string, optionFuncs ...any) (*memory.Store[K, V], error) {
|
||||
return memory.NewStore[K, V](), nil
|
||||
}
|
||||
|
349
pkg/storage/driver/cache/lfu/cache.go
vendored
Normal file
349
pkg/storage/driver/cache/lfu/cache.go
vendored
Normal file
@ -0,0 +1,349 @@
|
||||
package lfu
|
||||
|
||||
import (
|
||||
"slices"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrNotFound = errors.New("not found")
|
||||
ErrSizeExceedCapacity = errors.New("size exceed capacity")
|
||||
errExpired = errors.New("expired")
|
||||
)
|
||||
|
||||
type Cache[K comparable, V any] struct {
|
||||
index *Map[K, *cacheItem[K, V]]
|
||||
freqs *List[*frequencyItem[K, V]]
|
||||
|
||||
size atomic.Int32
|
||||
|
||||
capacity int
|
||||
store Store[K, V]
|
||||
getValueSize GetValueSizeFunc[V]
|
||||
sync *Synchronizer[K]
|
||||
|
||||
log LogFunc
|
||||
ttl time.Duration
|
||||
}
|
||||
|
||||
type cacheItem[K any, V any] struct {
|
||||
key K
|
||||
size int
|
||||
time atomic.Int64
|
||||
frequencyParent *Element[*frequencyItem[K, V]]
|
||||
}
|
||||
|
||||
func (i *cacheItem[K, V]) Expired(ttl time.Duration) bool {
|
||||
if ttl == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
itemTime := time.Unix(i.time.Load(), 0)
|
||||
|
||||
// If item has expired, mark it as not found
|
||||
return itemTime.Add(ttl).Before(time.Now())
|
||||
}
|
||||
|
||||
func (i *cacheItem[K, V]) Refresh() {
|
||||
i.time.Store(time.Now().Unix())
|
||||
}
|
||||
|
||||
func newCacheItem[K any, V any](key K, size int) *cacheItem[K, V] {
|
||||
item := &cacheItem[K, V]{
|
||||
key: key,
|
||||
size: size,
|
||||
}
|
||||
item.time.Store(time.Now().Unix())
|
||||
return item
|
||||
}
|
||||
|
||||
type frequencyItem[K any, V any] struct {
|
||||
entries *Map[*cacheItem[K, V], struct{}]
|
||||
freq int
|
||||
}
|
||||
|
||||
func newFrequencyItem[K any, V any]() *frequencyItem[K, V] {
|
||||
frequencyItem := &frequencyItem[K, V]{}
|
||||
frequencyItem.entries = NewMap[*cacheItem[K, V], struct{}]()
|
||||
return frequencyItem
|
||||
}
|
||||
|
||||
func (c *Cache[K, V]) Set(key K, value V) error {
|
||||
newItemSize, err := c.getValueSize(value)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
c.log("setting '%v' (size: %d)", key, newItemSize)
|
||||
|
||||
if newItemSize > int(c.capacity) {
|
||||
return errors.Wrapf(ErrSizeExceedCapacity, "item size '%d' exceed cache total capacity of '%v'", newItemSize, c.capacity)
|
||||
}
|
||||
|
||||
var sizeDelta int
|
||||
|
||||
err = c.sync.WriteTx(key, func() error {
|
||||
if err := c.store.Set(key, value); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
item, ok := c.index.Get(key)
|
||||
if ok {
|
||||
oldItemSize := item.size
|
||||
sizeDelta = -int(oldItemSize) + newItemSize
|
||||
item.Refresh()
|
||||
} else {
|
||||
item = newCacheItem[K, V](key, newItemSize)
|
||||
c.index.Set(key, item)
|
||||
sizeDelta = newItemSize
|
||||
}
|
||||
|
||||
c.size.Add(int32(sizeDelta))
|
||||
c.increment(item)
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
// Eviction, if needed
|
||||
if err := c.Evict(key); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cache[K, V]) Get(key K) (V, error) {
|
||||
var value V
|
||||
err := c.sync.ReadTx(key, func(upgrade func(func())) error {
|
||||
c.log("getting '%v'", key)
|
||||
|
||||
e, ok := c.index.Get(key)
|
||||
if !ok {
|
||||
return errors.WithStack(ErrNotFound)
|
||||
}
|
||||
|
||||
if e.Expired(c.ttl) {
|
||||
return errors.WithStack(errExpired)
|
||||
}
|
||||
|
||||
v, err := c.store.Get(key)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
upgrade(func() {
|
||||
c.increment(e)
|
||||
})
|
||||
|
||||
value = v
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
if errors.Is(err, errExpired) {
|
||||
if err := c.Delete(key); err != nil {
|
||||
return *new(V), errors.WithStack(err)
|
||||
}
|
||||
|
||||
return *new(V), errors.WithStack(ErrNotFound)
|
||||
}
|
||||
|
||||
return *new(V), errors.WithStack(err)
|
||||
}
|
||||
|
||||
return value, nil
|
||||
}
|
||||
|
||||
func (c *Cache[K, V]) Delete(key K) error {
|
||||
err := c.sync.WriteTx(key, func() error {
|
||||
c.log("deleting '%v'", key)
|
||||
|
||||
item, exists := c.index.Get(key)
|
||||
if !exists {
|
||||
return errors.WithStack(ErrNotFound)
|
||||
}
|
||||
|
||||
if err := c.store.Delete(key); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
c.size.Add(-int32(item.size))
|
||||
|
||||
c.remove(item.frequencyParent, item)
|
||||
c.index.Delete(key)
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cache[K, V]) Evict(skipped ...K) error {
|
||||
exceed, delta := c.atCapacity()
|
||||
if exceed && delta > 0 {
|
||||
if err := c.evict(delta, skipped...); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cache[K, V]) Len() int {
|
||||
return c.index.Len()
|
||||
}
|
||||
|
||||
func (c *Cache[K, V]) Size() int {
|
||||
return int(c.size.Load())
|
||||
}
|
||||
|
||||
func (c *Cache[K, V]) Capacity() int {
|
||||
return c.capacity
|
||||
}
|
||||
|
||||
func (c *Cache[K, V]) increment(item *cacheItem[K, V]) {
|
||||
currentFrequencyElement := item.frequencyParent
|
||||
var nextFrequencyAmount int
|
||||
var nextFrequencyElement *Element[*frequencyItem[K, V]]
|
||||
|
||||
if currentFrequencyElement == nil {
|
||||
nextFrequencyAmount = 1
|
||||
nextFrequencyElement = c.freqs.First()
|
||||
} else {
|
||||
atomicFrequencyItem := c.freqs.Value(currentFrequencyElement)
|
||||
nextFrequencyAmount = atomicFrequencyItem.freq + 1
|
||||
nextFrequencyElement = c.freqs.Next(currentFrequencyElement)
|
||||
}
|
||||
|
||||
var nextFrequency *frequencyItem[K, V]
|
||||
if nextFrequencyElement != nil {
|
||||
nextFrequency = c.freqs.Value(nextFrequencyElement)
|
||||
}
|
||||
|
||||
if nextFrequencyElement == nil || nextFrequency == nil || nextFrequency.freq != nextFrequencyAmount {
|
||||
newFrequencyItem := newFrequencyItem[K, V]()
|
||||
newFrequencyItem.freq = nextFrequencyAmount
|
||||
|
||||
if currentFrequencyElement == nil {
|
||||
nextFrequencyElement = c.freqs.PushFront(newFrequencyItem)
|
||||
} else {
|
||||
nextFrequencyElement = c.freqs.InsertValueAfter(newFrequencyItem, currentFrequencyElement)
|
||||
}
|
||||
}
|
||||
|
||||
item.frequencyParent = nextFrequencyElement
|
||||
|
||||
nextFrequency = c.freqs.Value(nextFrequencyElement)
|
||||
nextFrequency.entries.Set(item, struct{}{})
|
||||
|
||||
if currentFrequencyElement != nil {
|
||||
c.remove(currentFrequencyElement, item)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cache[K, V]) remove(listItem *Element[*frequencyItem[K, V]], item *cacheItem[K, V]) {
|
||||
entries := c.freqs.Value(listItem).entries
|
||||
|
||||
entries.Delete(item)
|
||||
}
|
||||
|
||||
func (c *Cache[K, V]) atCapacity() (bool, int) {
|
||||
size, capacity := c.Size(), c.Capacity()
|
||||
c.log("cache stats: %d/%d", size, capacity)
|
||||
return size >= capacity, size - capacity
|
||||
}
|
||||
|
||||
func (c *Cache[K, V]) evict(total int, skipped ...K) error {
|
||||
if total == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
frequencyElement := c.freqs.First()
|
||||
if frequencyElement == nil {
|
||||
c.log("no frequency element")
|
||||
return nil
|
||||
}
|
||||
|
||||
for evicted := 0; evicted < total; {
|
||||
c.log("running eviction: [to_evict:%d, evicted: %d]", total, evicted)
|
||||
|
||||
c.log("first frequency element %p", frequencyElement)
|
||||
|
||||
frequencyItem := c.freqs.Value(frequencyElement)
|
||||
if frequencyItem == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
entries := frequencyItem.entries
|
||||
|
||||
if entries.Len() == 0 {
|
||||
c.log("no frequency entries")
|
||||
frequencyElement = c.freqs.Next(frequencyElement)
|
||||
continue
|
||||
}
|
||||
|
||||
var rangeErr error
|
||||
entries.Range(func(key, v any) bool {
|
||||
if evicted >= total {
|
||||
c.log("evicted enough (%d >= %d), stopping", evicted, total)
|
||||
return false
|
||||
}
|
||||
|
||||
entry, _ := key.(*cacheItem[K, V])
|
||||
|
||||
if slices.Contains(skipped, entry.key) {
|
||||
c.log("skipping key '%v'", entry.key)
|
||||
return true
|
||||
}
|
||||
|
||||
if err := c.Delete(entry.key); err != nil {
|
||||
if errors.Is(err, ErrNotFound) {
|
||||
c.log("key '%s' not found", entry.key)
|
||||
// Cleanup obsolete frequency
|
||||
c.remove(frequencyElement, entry)
|
||||
return true
|
||||
}
|
||||
|
||||
rangeErr = errors.WithStack(err)
|
||||
return false
|
||||
}
|
||||
|
||||
c.log("evicted key '%v' (size: %d)", entry.key, entry.size)
|
||||
|
||||
evicted += int(entry.size)
|
||||
|
||||
return true
|
||||
})
|
||||
if rangeErr != nil {
|
||||
return errors.WithStack(rangeErr)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewCache[K comparable, V any](store Store[K, V], funcs ...OptionsFunc[K, V]) *Cache[K, V] {
|
||||
opts := DefaultOptions[K, V](funcs...)
|
||||
|
||||
cache := &Cache[K, V]{
|
||||
index: NewMap[K, *cacheItem[K, V]](),
|
||||
freqs: NewList[*frequencyItem[K, V]](),
|
||||
capacity: opts.Capacity,
|
||||
store: store,
|
||||
getValueSize: opts.GetValueSize,
|
||||
sync: NewSynchronizer[K](),
|
||||
log: opts.Log,
|
||||
ttl: opts.TTL,
|
||||
}
|
||||
|
||||
return cache
|
||||
}
|
37
pkg/storage/driver/cache/lfu/fs/cache_test.go
vendored
Normal file
37
pkg/storage/driver/cache/lfu/fs/cache_test.go
vendored
Normal file
@ -0,0 +1,37 @@
|
||||
package fs
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu/testsuite"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func TestCacheWithFSStore(t *testing.T) {
|
||||
testsuite.TestCacheWithStore(t, func(testName string) lfu.Store[string, string] {
|
||||
dir := filepath.Join("testdata", "testsuite", testName)
|
||||
store := NewStore[string, string](dir,
|
||||
WithMarshalValue[string, string](func(value string) (io.Reader, error) {
|
||||
return bytes.NewBuffer([]byte(value)), nil
|
||||
}),
|
||||
WithUnmarshalValue[string, string](func(r io.Reader) (string, error) {
|
||||
data, err := io.ReadAll(r)
|
||||
if err != nil {
|
||||
return "", errors.WithStack(err)
|
||||
}
|
||||
|
||||
return string(data), nil
|
||||
}),
|
||||
)
|
||||
|
||||
if err := store.Clear(); err != nil {
|
||||
panic(errors.WithStack(err))
|
||||
}
|
||||
|
||||
return store
|
||||
})
|
||||
}
|
19
pkg/storage/driver/cache/lfu/fs/hash.go
vendored
Normal file
19
pkg/storage/driver/cache/lfu/fs/hash.go
vendored
Normal file
@ -0,0 +1,19 @@
|
||||
package fs
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"github.com/mitchellh/hashstructure/v2"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func DefaultGetPath[K comparable](key K) ([]string, error) {
|
||||
uintHash, err := hashstructure.Hash(key, hashstructure.FormatV2, nil)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
hash := strconv.FormatUint(uintHash, 16)
|
||||
|
||||
return []string{hash}, nil
|
||||
}
|
31
pkg/storage/driver/cache/lfu/fs/marshal.go
vendored
Normal file
31
pkg/storage/driver/cache/lfu/fs/marshal.go
vendored
Normal file
@ -0,0 +1,31 @@
|
||||
package fs
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/gob"
|
||||
"io"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func DefaultMarshalValue[V any](value V) (io.Reader, error) {
|
||||
var buf bytes.Buffer
|
||||
encoder := gob.NewEncoder(&buf)
|
||||
|
||||
if err := encoder.Encode(value); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return &buf, nil
|
||||
}
|
||||
|
||||
func DefaultUnmarshalValue[V any](d io.Reader) (V, error) {
|
||||
var value V
|
||||
encoder := gob.NewDecoder(d)
|
||||
|
||||
if err := encoder.Decode(&value); err != nil {
|
||||
return *new(V), errors.WithStack(err)
|
||||
}
|
||||
|
||||
return value, nil
|
||||
}
|
45
pkg/storage/driver/cache/lfu/fs/options.go
vendored
Normal file
45
pkg/storage/driver/cache/lfu/fs/options.go
vendored
Normal file
@ -0,0 +1,45 @@
|
||||
package fs
|
||||
|
||||
import "io"
|
||||
|
||||
type GetPathFunc[K comparable] func(key K) ([]string, error)
|
||||
type MarshalValueFunc[V any] func(value V) (io.Reader, error)
|
||||
type UnmarshalValueFunc[V any] func(r io.Reader) (V, error)
|
||||
|
||||
type Options[K comparable, V any] struct {
|
||||
GetPath GetPathFunc[K]
|
||||
MarshalValue MarshalValueFunc[V]
|
||||
UnmarshalValue UnmarshalValueFunc[V]
|
||||
}
|
||||
|
||||
type OptionsFunc[K comparable, V any] func(opts *Options[K, V])
|
||||
|
||||
func DefaultOptions[K comparable, V any](funcs ...OptionsFunc[K, V]) *Options[K, V] {
|
||||
opts := &Options[K, V]{
|
||||
GetPath: DefaultGetPath[K],
|
||||
MarshalValue: DefaultMarshalValue[V],
|
||||
UnmarshalValue: DefaultUnmarshalValue[V],
|
||||
}
|
||||
for _, fn := range funcs {
|
||||
fn(opts)
|
||||
}
|
||||
return opts
|
||||
}
|
||||
|
||||
func WithGetPath[K comparable, V any](getKeyHash GetPathFunc[K]) OptionsFunc[K, V] {
|
||||
return func(opts *Options[K, V]) {
|
||||
opts.GetPath = getKeyHash
|
||||
}
|
||||
}
|
||||
|
||||
func WithMarshalValue[K comparable, V any](marshalValue MarshalValueFunc[V]) OptionsFunc[K, V] {
|
||||
return func(opts *Options[K, V]) {
|
||||
opts.MarshalValue = marshalValue
|
||||
}
|
||||
}
|
||||
|
||||
func WithUnmarshalValue[K comparable, V any](unmarshalValue UnmarshalValueFunc[V]) OptionsFunc[K, V] {
|
||||
return func(opts *Options[K, V]) {
|
||||
opts.UnmarshalValue = unmarshalValue
|
||||
}
|
||||
}
|
165
pkg/storage/driver/cache/lfu/fs/store.go
vendored
Normal file
165
pkg/storage/driver/cache/lfu/fs/store.go
vendored
Normal file
@ -0,0 +1,165 @@
|
||||
package fs
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type Store[K comparable, V any] struct {
|
||||
baseDir string
|
||||
getPath GetPathFunc[K]
|
||||
marshalValue MarshalValueFunc[V]
|
||||
unmarshalValue UnmarshalValueFunc[V]
|
||||
}
|
||||
|
||||
// Delete implements Store.
|
||||
func (s *Store[K, V]) Delete(key K) error {
|
||||
path, err := s.getEntryPath(key)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := os.Remove(path); err != nil && !errors.Is(err, os.ErrNotExist) {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get implements Store.
|
||||
func (s *Store[K, V]) Get(key K) (V, error) {
|
||||
path, err := s.getEntryPath(key)
|
||||
if err != nil {
|
||||
return *new(V), errors.WithStack(err)
|
||||
}
|
||||
|
||||
value, err := s.readValue(path)
|
||||
if err != nil {
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
return *new(V), errors.WithStack(lfu.ErrNotFound)
|
||||
}
|
||||
|
||||
return *new(V), errors.WithStack(err)
|
||||
}
|
||||
|
||||
return value, nil
|
||||
}
|
||||
|
||||
// Set implements Store.
|
||||
func (s *Store[K, V]) Set(key K, value V) error {
|
||||
path, err := s.getEntryPath(key)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := s.writeValue(path, value); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store[K, V]) Clear() error {
|
||||
if err := os.RemoveAll(s.baseDir); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store[K, V]) getEntryPath(k K) (string, error) {
|
||||
path, err := s.getPath(k)
|
||||
if err != nil {
|
||||
return "", errors.WithStack(err)
|
||||
}
|
||||
|
||||
path = append([]string{s.baseDir}, path...)
|
||||
return filepath.Join(path...), nil
|
||||
}
|
||||
|
||||
func (s *Store[K, V]) writeValue(path string, value V) error {
|
||||
fi, err := os.Stat(path)
|
||||
if err == nil && !fi.Mode().IsRegular() {
|
||||
return fmt.Errorf("%s already exists and is not a regular file", path)
|
||||
}
|
||||
|
||||
dir := filepath.Dir(path)
|
||||
|
||||
if err := os.MkdirAll(dir, 0750); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
f, err := os.CreateTemp(dir, filepath.Base(path)+".tmp")
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
tmpName := f.Name()
|
||||
defer func() {
|
||||
if err != nil {
|
||||
f.Close()
|
||||
os.Remove(tmpName)
|
||||
}
|
||||
}()
|
||||
|
||||
reader, err := s.marshalValue(value)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if _, err := io.Copy(f, reader); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := f.Sync(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := f.Close(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := os.Rename(tmpName, path); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store[K, V]) readValue(path string) (V, error) {
|
||||
file, err := os.Open(path)
|
||||
if err != nil {
|
||||
return *new(V), errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := file.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
|
||||
panic(errors.WithStack(err))
|
||||
}
|
||||
}()
|
||||
|
||||
value, err := s.unmarshalValue(file)
|
||||
|
||||
if err != nil {
|
||||
return *new(V), errors.WithStack(err)
|
||||
}
|
||||
|
||||
return value, nil
|
||||
}
|
||||
|
||||
func NewStore[K comparable, V any](baseDir string, funcs ...OptionsFunc[K, V]) *Store[K, V] {
|
||||
opts := DefaultOptions[K, V](funcs...)
|
||||
return &Store[K, V]{
|
||||
baseDir: baseDir,
|
||||
getPath: opts.GetPath,
|
||||
unmarshalValue: opts.UnmarshalValue,
|
||||
marshalValue: opts.MarshalValue,
|
||||
}
|
||||
}
|
||||
|
||||
var _ lfu.Store[string, int] = &Store[string, int]{}
|
2
pkg/storage/driver/cache/lfu/fs/testdata/.gitignore
vendored
Normal file
2
pkg/storage/driver/cache/lfu/fs/testdata/.gitignore
vendored
Normal file
@ -0,0 +1,2 @@
|
||||
*
|
||||
!.gitignore
|
203
pkg/storage/driver/cache/lfu/list.go
vendored
Normal file
203
pkg/storage/driver/cache/lfu/list.go
vendored
Normal file
@ -0,0 +1,203 @@
|
||||
package lfu
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
type List[T any] struct {
|
||||
root *Element[T]
|
||||
len atomic.Int32
|
||||
sync *Synchronizer[*Element[T]]
|
||||
}
|
||||
|
||||
func (l *List[T]) First() *Element[T] {
|
||||
if l.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var next *Element[T]
|
||||
l.sync.ReadTx(l.root, func(upgrade func(func())) error {
|
||||
next = l.root.next
|
||||
return nil
|
||||
})
|
||||
|
||||
return next
|
||||
}
|
||||
|
||||
func (l *List[T]) Last() *Element[T] {
|
||||
if l.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var prev *Element[T]
|
||||
l.sync.ReadTx(l.root, func(upgrade func(func())) error {
|
||||
prev = l.root.prev
|
||||
return nil
|
||||
})
|
||||
|
||||
return prev
|
||||
}
|
||||
|
||||
func (l *List[T]) Prev(e *Element[T]) *Element[T] {
|
||||
var prev *Element[T]
|
||||
l.sync.ReadTx(e, func(upgrade func(func())) error {
|
||||
prev = e.prev
|
||||
return nil
|
||||
})
|
||||
|
||||
return prev
|
||||
}
|
||||
|
||||
func (l *List[T]) Next(e *Element[T]) *Element[T] {
|
||||
var next *Element[T]
|
||||
l.sync.ReadTx(e, func(upgrade func(func())) error {
|
||||
next = e.next
|
||||
return nil
|
||||
})
|
||||
|
||||
return next
|
||||
}
|
||||
|
||||
func (l *List[T]) Value(e *Element[T]) T {
|
||||
var value T
|
||||
l.sync.ReadTx(e, func(upgrade func(func())) error {
|
||||
value = e.value
|
||||
return nil
|
||||
})
|
||||
|
||||
return value
|
||||
}
|
||||
|
||||
func (l *List[T]) PushFront(v T) *Element[T] {
|
||||
return l.InsertValueAfter(v, l.root)
|
||||
}
|
||||
|
||||
func (l *List[T]) PushBack(v T) *Element[T] {
|
||||
return l.InsertValueAfter(v, l.root)
|
||||
}
|
||||
|
||||
func (l *List[T]) Remove(e *Element[T]) {
|
||||
l.remove(e)
|
||||
}
|
||||
|
||||
func (l *List[T]) Len() int {
|
||||
return int(l.len.Load())
|
||||
}
|
||||
|
||||
func (l *List[T]) insertAfter(e *Element[T], at *Element[T]) *Element[T] {
|
||||
l.sync.ReadTx(e, func(upgrade func(fn func())) error {
|
||||
var next *Element[T]
|
||||
l.sync.ReadTx(at, func(upgrade func(func())) error {
|
||||
next = at.next
|
||||
return nil
|
||||
})
|
||||
|
||||
upgrade(func() {
|
||||
e.prev = at
|
||||
e.next = next
|
||||
e.list = l
|
||||
})
|
||||
|
||||
if e.prev != nil {
|
||||
l.sync.WriteTx(e.prev, func() error {
|
||||
e.prev.next = e
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
if e.next != nil {
|
||||
l.sync.WriteTx(e.next, func() error {
|
||||
e.next.prev = e
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
l.len.Add(1)
|
||||
|
||||
return e
|
||||
}
|
||||
|
||||
func (l *List[T]) InsertValueAfter(v T, at *Element[T]) *Element[T] {
|
||||
e := NewElement[T](v)
|
||||
return l.insertAfter(e, at)
|
||||
}
|
||||
|
||||
func (l *List[T]) remove(e *Element[T]) {
|
||||
if e == nil && e == l.root {
|
||||
return
|
||||
}
|
||||
|
||||
l.sync.ReadTx(e, func(upgrade func(fn func())) error {
|
||||
if e.prev != nil {
|
||||
if e.prev == e {
|
||||
upgrade(func() {
|
||||
e.prev.next = e.next
|
||||
})
|
||||
} else {
|
||||
l.sync.WriteTx(e.prev, func() error {
|
||||
e.prev.next = e.next
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
if e.next != nil {
|
||||
if e.next == e {
|
||||
upgrade(func() {
|
||||
e.next.prev = e.prev
|
||||
})
|
||||
} else {
|
||||
l.sync.WriteTx(e.next, func() error {
|
||||
e.next.prev = e.prev
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
upgrade(func() {
|
||||
e.next = nil
|
||||
e.prev = nil
|
||||
e.list = nil
|
||||
})
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
l.sync.Remove(e)
|
||||
l.len.Add(-1)
|
||||
}
|
||||
|
||||
func NewList[T any]() *List[T] {
|
||||
root := NewElement(*new(T))
|
||||
root.next = root
|
||||
root.prev = root
|
||||
|
||||
list := &List[T]{
|
||||
sync: NewSynchronizer[*Element[T]](),
|
||||
}
|
||||
|
||||
root.list = list
|
||||
list.root = root
|
||||
|
||||
return list
|
||||
}
|
||||
|
||||
type Element[T any] struct {
|
||||
prev *Element[T]
|
||||
next *Element[T]
|
||||
list *List[T]
|
||||
value T
|
||||
}
|
||||
|
||||
func NewElement[T any](v T) *Element[T] {
|
||||
element := &Element[T]{
|
||||
prev: nil,
|
||||
next: nil,
|
||||
list: nil,
|
||||
value: v,
|
||||
}
|
||||
return element
|
||||
}
|
67
pkg/storage/driver/cache/lfu/map.go
vendored
Normal file
67
pkg/storage/driver/cache/lfu/map.go
vendored
Normal file
@ -0,0 +1,67 @@
|
||||
package lfu
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
type Map[K comparable, V any] struct {
|
||||
size atomic.Int32
|
||||
inner sync.Map
|
||||
}
|
||||
|
||||
func (m *Map[K, V]) Get(key K) (V, bool) {
|
||||
raw, exists := m.inner.Load(key)
|
||||
if !exists {
|
||||
return *new(V), false
|
||||
}
|
||||
|
||||
value, ok := raw.(V)
|
||||
if !ok {
|
||||
return *new(V), false
|
||||
}
|
||||
|
||||
return value, true
|
||||
}
|
||||
|
||||
func (m *Map[K, V]) GetOrSet(key K, defaultValue V) (V, bool) {
|
||||
raw, loaded := m.inner.LoadOrStore(key, defaultValue)
|
||||
if !loaded {
|
||||
m.size.Add(1)
|
||||
}
|
||||
|
||||
value, ok := raw.(V)
|
||||
if !ok {
|
||||
return *new(V), loaded
|
||||
}
|
||||
|
||||
return value, loaded
|
||||
}
|
||||
|
||||
func (m *Map[K, V]) Set(key K, value V) {
|
||||
_, loaded := m.inner.Swap(key, value)
|
||||
if !loaded {
|
||||
m.size.Add(1)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Map[K, V]) Delete(key K) {
|
||||
_, existed := m.inner.LoadAndDelete(key)
|
||||
if existed {
|
||||
m.size.Add(-1)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Map[K, V]) Range(fn func(key, value any) bool) {
|
||||
m.inner.Range(fn)
|
||||
}
|
||||
|
||||
func (m *Map[K, V]) Len() int {
|
||||
return int(m.size.Load())
|
||||
}
|
||||
|
||||
func NewMap[K comparable, V any]() *Map[K, V] {
|
||||
return &Map[K, V]{
|
||||
inner: sync.Map{},
|
||||
}
|
||||
}
|
14
pkg/storage/driver/cache/lfu/memory/cache_test.go
vendored
Normal file
14
pkg/storage/driver/cache/lfu/memory/cache_test.go
vendored
Normal file
@ -0,0 +1,14 @@
|
||||
package memory
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu/testsuite"
|
||||
)
|
||||
|
||||
func TestCacheWithMemoryStore(t *testing.T) {
|
||||
testsuite.TestCacheWithStore(t, func(testName string) lfu.Store[string, string] {
|
||||
return NewStore[string, string]()
|
||||
})
|
||||
}
|
40
pkg/storage/driver/cache/lfu/memory/memory.go
vendored
Normal file
40
pkg/storage/driver/cache/lfu/memory/memory.go
vendored
Normal file
@ -0,0 +1,40 @@
|
||||
package memory
|
||||
|
||||
import (
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type Store[K comparable, V any] struct {
|
||||
index *lfu.Map[K, V]
|
||||
}
|
||||
|
||||
// Delete implements Store.
|
||||
func (s *Store[K, V]) Delete(key K) error {
|
||||
s.index.Delete(key)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get implements Store.
|
||||
func (s *Store[K, V]) Get(key K) (V, error) {
|
||||
value, exists := s.index.Get(key)
|
||||
if !exists {
|
||||
return *new(V), errors.WithStack(lfu.ErrNotFound)
|
||||
}
|
||||
|
||||
return value, nil
|
||||
}
|
||||
|
||||
// Set implements Store.
|
||||
func (s *Store[K, V]) Set(key K, value V) error {
|
||||
s.index.Set(key, value)
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewStore[K comparable, V any]() *Store[K, V] {
|
||||
return &Store[K, V]{
|
||||
index: lfu.NewMap[K, V](),
|
||||
}
|
||||
}
|
||||
|
||||
var _ lfu.Store[string, int] = &Store[string, int]{}
|
57
pkg/storage/driver/cache/lfu/options.go
vendored
Normal file
57
pkg/storage/driver/cache/lfu/options.go
vendored
Normal file
@ -0,0 +1,57 @@
|
||||
package lfu
|
||||
|
||||
import "time"
|
||||
|
||||
type GetValueSizeFunc[V any] func(value V) (int, error)
|
||||
|
||||
type LogFunc func(format string, values ...any)
|
||||
|
||||
func DefaultLogFunc(format string, values ...any) {
|
||||
|
||||
}
|
||||
|
||||
type Options[K comparable, V any] struct {
|
||||
GetValueSize GetValueSizeFunc[V]
|
||||
Capacity int
|
||||
Log LogFunc
|
||||
TTL time.Duration
|
||||
}
|
||||
|
||||
type OptionsFunc[K comparable, V any] func(opts *Options[K, V])
|
||||
|
||||
func DefaultOptions[K comparable, V any](funcs ...OptionsFunc[K, V]) *Options[K, V] {
|
||||
opts := &Options[K, V]{
|
||||
GetValueSize: DefaultGetValueSize[V],
|
||||
Capacity: 100,
|
||||
Log: DefaultLogFunc,
|
||||
TTL: 0,
|
||||
}
|
||||
for _, fn := range funcs {
|
||||
fn(opts)
|
||||
}
|
||||
return opts
|
||||
}
|
||||
|
||||
func WithCapacity[K comparable, V any](capacity int) OptionsFunc[K, V] {
|
||||
return func(opts *Options[K, V]) {
|
||||
opts.Capacity = capacity
|
||||
}
|
||||
}
|
||||
|
||||
func WithGetValueSize[K comparable, V any](getValueSize GetValueSizeFunc[V]) OptionsFunc[K, V] {
|
||||
return func(opts *Options[K, V]) {
|
||||
opts.GetValueSize = getValueSize
|
||||
}
|
||||
}
|
||||
|
||||
func WithLog[K comparable, V any](fn LogFunc) OptionsFunc[K, V] {
|
||||
return func(opts *Options[K, V]) {
|
||||
opts.Log = fn
|
||||
}
|
||||
}
|
||||
|
||||
func WithTTL[K comparable, V any](ttl time.Duration) OptionsFunc[K, V] {
|
||||
return func(opts *Options[K, V]) {
|
||||
opts.TTL = ttl
|
||||
}
|
||||
}
|
41
pkg/storage/driver/cache/lfu/size.go
vendored
Normal file
41
pkg/storage/driver/cache/lfu/size.go
vendored
Normal file
@ -0,0 +1,41 @@
|
||||
package lfu
|
||||
|
||||
import (
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type Measurable interface {
|
||||
Size() (int, error)
|
||||
}
|
||||
|
||||
func DefaultGetValueSize[V any](value V) (int, error) {
|
||||
switch v := any(value).(type) {
|
||||
case int:
|
||||
return v, nil
|
||||
case int8:
|
||||
return int(v), nil
|
||||
case int32:
|
||||
return int(v), nil
|
||||
case int64:
|
||||
return int(v), nil
|
||||
case float32:
|
||||
return int(v), nil
|
||||
case float64:
|
||||
return int(v), nil
|
||||
case []byte:
|
||||
return len(v), nil
|
||||
case string:
|
||||
return len(v), nil
|
||||
}
|
||||
|
||||
if measurable, ok := any(value).(Measurable); ok {
|
||||
size, err := measurable.Size()
|
||||
if err != nil {
|
||||
return 0, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return size, nil
|
||||
}
|
||||
|
||||
return 0, errors.Errorf("could not retrieve size of type '%T'", value)
|
||||
}
|
7
pkg/storage/driver/cache/lfu/store.go
vendored
Normal file
7
pkg/storage/driver/cache/lfu/store.go
vendored
Normal file
@ -0,0 +1,7 @@
|
||||
package lfu
|
||||
|
||||
type Store[K comparable, V any] interface {
|
||||
Delete(key K) error
|
||||
Set(key K, value V) error
|
||||
Get(key K) (V, error)
|
||||
}
|
56
pkg/storage/driver/cache/lfu/synchronizer.go
vendored
Normal file
56
pkg/storage/driver/cache/lfu/synchronizer.go
vendored
Normal file
@ -0,0 +1,56 @@
|
||||
package lfu
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type Synchronizer[K comparable] struct {
|
||||
index *Map[K, *sync.RWMutex]
|
||||
}
|
||||
|
||||
func (s *Synchronizer[K]) Remove(key K) {
|
||||
s.index.Delete(key)
|
||||
}
|
||||
|
||||
func (s *Synchronizer[K]) ReadTx(key K, fn func(upgrade func(fn func())) error) error {
|
||||
mutex, _ := s.index.GetOrSet(key, &sync.RWMutex{})
|
||||
mutex.RLock()
|
||||
defer mutex.RUnlock()
|
||||
|
||||
upgrade := func(fn func()) {
|
||||
mutex.RUnlock()
|
||||
mutex.Lock()
|
||||
defer func() {
|
||||
mutex.Unlock()
|
||||
mutex.RLock()
|
||||
}()
|
||||
|
||||
fn()
|
||||
}
|
||||
|
||||
if err := fn(upgrade); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Synchronizer[K]) WriteTx(key K, fn func() error) error {
|
||||
mutex, _ := s.index.GetOrSet(key, &sync.RWMutex{})
|
||||
mutex.Lock()
|
||||
defer mutex.Unlock()
|
||||
|
||||
if err := fn(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewSynchronizer[K comparable]() *Synchronizer[K] {
|
||||
return &Synchronizer[K]{
|
||||
index: NewMap[K, *sync.RWMutex](),
|
||||
}
|
||||
}
|
41
pkg/storage/driver/cache/lfu/testsuite/main.go
vendored
Normal file
41
pkg/storage/driver/cache/lfu/testsuite/main.go
vendored
Normal file
@ -0,0 +1,41 @@
|
||||
package testsuite
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"runtime"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type StoreFactory func(testName string) lfu.Store[string, string]
|
||||
type testCase func(t *testing.T, store lfu.Store[string, string]) error
|
||||
|
||||
var testCases = []testCase{
|
||||
testSetGetDelete,
|
||||
testEviction,
|
||||
testConcurrent,
|
||||
testMultipleSet,
|
||||
testTTL,
|
||||
}
|
||||
|
||||
func TestCacheWithStore(t *testing.T, factory StoreFactory) {
|
||||
for _, tc := range testCases {
|
||||
funcName := runtime.FuncForPC(reflect.ValueOf(tc).Pointer()).Name()
|
||||
funcNameParts := strings.Split(funcName, "/")
|
||||
testName := funcNameParts[len(funcNameParts)-1]
|
||||
func(tc testCase) {
|
||||
t.Run(testName, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
store := factory(testName)
|
||||
|
||||
if err := tc(t, store); err != nil {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
})
|
||||
}(tc)
|
||||
}
|
||||
}
|
67
pkg/storage/driver/cache/lfu/testsuite/test_concurrent.go
vendored
Normal file
67
pkg/storage/driver/cache/lfu/testsuite/test_concurrent.go
vendored
Normal file
@ -0,0 +1,67 @@
|
||||
package testsuite
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func testConcurrent(t *testing.T, store lfu.Store[string, string]) error {
|
||||
const value = "foobar"
|
||||
totalKeys := 25
|
||||
totalSize := len(value) * totalKeys
|
||||
capacity := totalSize / 2
|
||||
|
||||
cache := lfu.NewCache[string, string](store,
|
||||
lfu.WithCapacity[string, string](capacity),
|
||||
lfu.WithLog[string, string](t.Logf),
|
||||
)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
wg.Add(totalKeys)
|
||||
|
||||
loops := totalKeys * 10
|
||||
|
||||
for i := 0; i < totalKeys; i++ {
|
||||
key := fmt.Sprintf("key%d", i)
|
||||
func(key string) {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for i := 0; i < loops; i++ {
|
||||
if err := cache.Set(key, value); err != nil {
|
||||
t.Errorf("%+v", errors.WithStack(err))
|
||||
}
|
||||
}
|
||||
}()
|
||||
}(key)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
t.Logf("cache before final evict [capacity: %d, size: %d, len: %d]", cache.Capacity(), cache.Size(), cache.Len())
|
||||
|
||||
if err := cache.Evict(); err != nil {
|
||||
t.Errorf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
t.Logf("cache after final evict [capacity: %d, size: %d, len: %d]", cache.Capacity(), cache.Size(), cache.Len())
|
||||
|
||||
expectedLength := capacity / len(value)
|
||||
if e, g := expectedLength, cache.Len(); e < g {
|
||||
t.Errorf("cache.Len(): expected <= %d, got %d", e, g)
|
||||
}
|
||||
|
||||
if cache.Size() > capacity {
|
||||
t.Errorf("cache.Size(): expected <= %d, got %d", capacity, cache.Size())
|
||||
}
|
||||
|
||||
if e, g := expectedLength*len(value), cache.Size(); e < g {
|
||||
t.Errorf("cache.Size(): expected <= %d, got %d", e, g)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
70
pkg/storage/driver/cache/lfu/testsuite/test_eviction.go
vendored
Normal file
70
pkg/storage/driver/cache/lfu/testsuite/test_eviction.go
vendored
Normal file
@ -0,0 +1,70 @@
|
||||
package testsuite
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func testEviction(t *testing.T, store lfu.Store[string, string]) error {
|
||||
cache := lfu.NewCache[string, string](store,
|
||||
lfu.WithCapacity[string, string](10),
|
||||
lfu.WithLog[string, string](t.Logf),
|
||||
)
|
||||
|
||||
if err := cache.Set("key1", "key1"); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := cache.Set("key2", "key2"); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
// Increment frequency of key2
|
||||
if _, err := cache.Get("key2"); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if e, g := 8, cache.Size(); e != g {
|
||||
t.Errorf("cache.Size(): expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
if err := cache.Set("key3", "key3"); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
t.Logf("cache [capacity: %d, size: %d, len: %d]", cache.Capacity(), cache.Size(), cache.Len())
|
||||
|
||||
_, err := cache.Get("key1")
|
||||
if err == nil {
|
||||
t.Errorf("expected 'key1' to be evicted")
|
||||
}
|
||||
|
||||
if !errors.Is(err, lfu.ErrNotFound) {
|
||||
t.Errorf("expected err to be 'ErrNotFound'")
|
||||
}
|
||||
|
||||
value, err := cache.Get("key2")
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if e, g := "key2", value; e < g {
|
||||
t.Errorf("cache.Get(\"key2\"): expected %v, got %v", e, g)
|
||||
}
|
||||
|
||||
if e, g := cache.Capacity(), cache.Size(); e < g {
|
||||
t.Errorf("cache.Size(): expected <= %d, got %d", e, g)
|
||||
}
|
||||
|
||||
if e, g := 2, cache.Len(); e != g {
|
||||
t.Errorf("cache.Len(): expected %d, got %d", e, g)
|
||||
}
|
||||
|
||||
if cache.Size() < 0 {
|
||||
t.Errorf("cache.Size(): expected value >= 0, got %d", cache.Size())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
80
pkg/storage/driver/cache/lfu/testsuite/test_multiple_set.go
vendored
Normal file
80
pkg/storage/driver/cache/lfu/testsuite/test_multiple_set.go
vendored
Normal file
@ -0,0 +1,80 @@
|
||||
package testsuite
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func testMultipleSet(t *testing.T, store lfu.Store[string, string]) error {
|
||||
const (
|
||||
key = "mykey"
|
||||
firstValue = "foo"
|
||||
secondValue = "bar"
|
||||
thirdValue = "foobar"
|
||||
)
|
||||
|
||||
cache := lfu.NewCache[string, string](store)
|
||||
|
||||
if e, g := 0, cache.Size(); e != g {
|
||||
t.Errorf("cache.Size(): expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
if err := cache.Set(key, firstValue); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if e, g := len(firstValue), cache.Size(); e != g {
|
||||
t.Errorf("cache.Size(): expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
retrieved, err := cache.Get(key)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if e, g := firstValue, retrieved; e != g {
|
||||
t.Errorf("cache.Get(key): expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
if err := cache.Set(key, secondValue); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if e, g := len(secondValue), cache.Size(); e != g {
|
||||
t.Errorf("cache.Size(): expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
retrieved, err = cache.Get(key)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if e, g := secondValue, retrieved; e != g {
|
||||
t.Errorf("cache.Get(key): expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
if err := cache.Set(key, thirdValue); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if e, g := len(thirdValue), cache.Size(); e != g {
|
||||
t.Errorf("cache.Size(): expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
retrieved, err = cache.Get(key)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if e, g := thirdValue, retrieved; e != g {
|
||||
t.Errorf("cache.Get(key): expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
if e, g := len(thirdValue), cache.Size(); e != g {
|
||||
t.Errorf("cache.Size(): expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
66
pkg/storage/driver/cache/lfu/testsuite/test_set_get_delete.go
vendored
Normal file
66
pkg/storage/driver/cache/lfu/testsuite/test_set_get_delete.go
vendored
Normal file
@ -0,0 +1,66 @@
|
||||
package testsuite
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func testSetGetDelete(t *testing.T, store lfu.Store[string, string]) error {
|
||||
const (
|
||||
key = "mykey"
|
||||
value = "foobar"
|
||||
)
|
||||
|
||||
cache := lfu.NewCache[string, string](store, lfu.WithCapacity[string, string](10))
|
||||
|
||||
if e, g := 0, cache.Size(); e != g {
|
||||
t.Errorf("cache.Size(): expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
if err := cache.Set(key, value); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if e, g := len(value), cache.Size(); e != g {
|
||||
t.Errorf("cache.Size(): expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
if e, g := 1, cache.Len(); e != g {
|
||||
t.Errorf("cache.Len(): expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
retrieved, err := cache.Get(key)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if e, g := value, retrieved; e != g {
|
||||
t.Errorf("cache.Get(key): expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
if err := cache.Delete(key); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if _, err := cache.Get(key); err == nil || !errors.Is(err, lfu.ErrNotFound) {
|
||||
t.Errorf("cache.Get(key): err should be lfu.ErrNotFound, got '%v'", errors.WithStack(err))
|
||||
}
|
||||
|
||||
if e, g := value, retrieved; e != g {
|
||||
t.Errorf("cache.Get(key): expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
if e, g := 0, cache.Size(); e != g {
|
||||
t.Errorf("cache.Size(): expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
if e, g := 0, cache.Len(); e != g {
|
||||
t.Errorf("cache.Len(): expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
t.Logf("cache [capacity: %d, size: %d, len: %d]", cache.Capacity(), cache.Size(), cache.Len())
|
||||
|
||||
return nil
|
||||
}
|
54
pkg/storage/driver/cache/lfu/testsuite/test_ttl.go
vendored
Normal file
54
pkg/storage/driver/cache/lfu/testsuite/test_ttl.go
vendored
Normal file
@ -0,0 +1,54 @@
|
||||
package testsuite
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func testTTL(t *testing.T, store lfu.Store[string, string]) error {
|
||||
const (
|
||||
key = "mykey"
|
||||
value = "foobar"
|
||||
)
|
||||
|
||||
ttl := time.Second
|
||||
|
||||
cache := lfu.NewCache[string, string](store,
|
||||
lfu.WithTTL[string, string](ttl),
|
||||
lfu.WithCapacity[string, string](10),
|
||||
)
|
||||
|
||||
if err := cache.Set(key, value); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
retrieved, err := cache.Get(key)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if e, g := value, retrieved; e != g {
|
||||
t.Errorf("cache.Get(key): expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
time.Sleep(ttl * 2)
|
||||
|
||||
if _, err := cache.Get(key); !errors.Is(err, lfu.ErrNotFound) {
|
||||
t.Errorf("cache.Get(key): expected err == lfu.ErrNotFound, got '%v'", err)
|
||||
}
|
||||
|
||||
t.Logf("cache [capacity: %d, size: %d, len: %d]", cache.Capacity(), cache.Size(), cache.Len())
|
||||
|
||||
if e, g := 0, cache.Size(); e != g {
|
||||
t.Errorf("cache.Size(): expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
if e, g := 0, cache.Len(); e != g {
|
||||
t.Errorf("cache.Len(): expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
88
pkg/storage/driver/cache/options.go
vendored
Normal file
88
pkg/storage/driver/cache/options.go
vendored
Normal file
@ -0,0 +1,88 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu/memory"
|
||||
)
|
||||
|
||||
type Options struct {
|
||||
CacheTTL time.Duration
|
||||
|
||||
BlobCacheStore lfu.Store[string, []byte]
|
||||
// Maximum total size of cached data
|
||||
BlobCacheSize int
|
||||
|
||||
BlobInfoCacheStore lfu.Store[string, storage.BlobInfo]
|
||||
// Maximum number of blob infos
|
||||
BlobInfoCacheSize int
|
||||
|
||||
BlobBucketCacheStore lfu.Store[string, storage.BlobBucket]
|
||||
// Maximum number of blob bucket
|
||||
BlobBucketCacheSize int
|
||||
}
|
||||
|
||||
type OptionFunc func(opts *Options)
|
||||
|
||||
func NewOptions(funcs ...OptionFunc) *Options {
|
||||
defaultTTL := 60 * time.Minute
|
||||
opts := &Options{
|
||||
CacheTTL: defaultTTL,
|
||||
BlobCacheStore: memory.NewStore[string, []byte](),
|
||||
BlobCacheSize: 1e+9, // 1Gb
|
||||
BlobInfoCacheStore: memory.NewStore[string, storage.BlobInfo](),
|
||||
BlobInfoCacheSize: 256,
|
||||
BlobBucketCacheStore: memory.NewStore[string, storage.BlobBucket](),
|
||||
BlobBucketCacheSize: 16,
|
||||
}
|
||||
|
||||
for _, fn := range funcs {
|
||||
fn(opts)
|
||||
}
|
||||
|
||||
return opts
|
||||
}
|
||||
|
||||
func WithCacheTTL(ttl time.Duration) OptionFunc {
|
||||
return func(opts *Options) {
|
||||
opts.CacheTTL = ttl
|
||||
}
|
||||
}
|
||||
|
||||
func WithBlobBucketCacheSize(size int) OptionFunc {
|
||||
return func(opts *Options) {
|
||||
opts.BlobBucketCacheSize = size
|
||||
}
|
||||
}
|
||||
|
||||
func WithBlobBucketCacheStore(store lfu.Store[string, storage.BlobBucket]) OptionFunc {
|
||||
return func(opts *Options) {
|
||||
opts.BlobBucketCacheStore = store
|
||||
}
|
||||
}
|
||||
|
||||
func WithBlobInfoCacheSize(size int) OptionFunc {
|
||||
return func(opts *Options) {
|
||||
opts.BlobInfoCacheSize = size
|
||||
}
|
||||
}
|
||||
|
||||
func WithBlobInfoCacheStore(store lfu.Store[string, storage.BlobInfo]) OptionFunc {
|
||||
return func(opts *Options) {
|
||||
opts.BlobInfoCacheStore = store
|
||||
}
|
||||
}
|
||||
|
||||
func WithBlobCacheSize(size int) OptionFunc {
|
||||
return func(opts *Options) {
|
||||
opts.BlobCacheSize = size
|
||||
}
|
||||
}
|
||||
|
||||
func WithBlobCacheStore(store lfu.Store[string, []byte]) OptionFunc {
|
||||
return func(opts *Options) {
|
||||
opts.BlobCacheStore = store
|
||||
}
|
||||
}
|
28
pkg/storage/driver/cache/reader.go
vendored
28
pkg/storage/driver/cache/reader.go
vendored
@ -1,18 +1,21 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/allegro/bigcache/v3"
|
||||
"cdr.dev/slog"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/driver/cache/lfu"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
type readCacher struct {
|
||||
reader io.ReadSeekCloser
|
||||
cache *bigcache.BigCache
|
||||
cache *lfu.Cache[string, []byte]
|
||||
buf bytes.Buffer
|
||||
key string
|
||||
}
|
||||
|
||||
@ -22,6 +25,21 @@ func (r *readCacher) Close() error {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := r.cache.Set(r.key, r.buf.Bytes()); err != nil {
|
||||
var logErr slog.Field
|
||||
if errors.Is(err, lfu.ErrSizeExceedCapacity) {
|
||||
logErr = logger.E(errors.WithStack(err))
|
||||
} else {
|
||||
logErr = logger.CapturedE(errors.WithStack(err))
|
||||
}
|
||||
logger.Error(context.Background(), "could not cache buffered data",
|
||||
logger.F("cacheKey", r.key),
|
||||
logErr,
|
||||
)
|
||||
}
|
||||
|
||||
r.buf.Reset()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -37,13 +55,9 @@ func (r *readCacher) Read(p []byte) (n int, err error) {
|
||||
}
|
||||
|
||||
if length > 0 {
|
||||
if err := r.cache.Append(r.key, p[:length]); err != nil {
|
||||
if _, err := r.buf.Write(p[:length]); err != nil {
|
||||
ctx := logger.With(context.Background(), logger.F("cacheKey", r.key))
|
||||
logger.Error(ctx, "could not write to buffer", logger.CapturedE(errors.WithStack(err)))
|
||||
|
||||
if err := r.cache.Delete(r.key); err != nil {
|
||||
logger.Error(ctx, "could not delete cache key", logger.CapturedE(errors.WithStack(err)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -24,7 +24,7 @@ func TestBlobStore(t *testing.T) {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
|
||||
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d&_pragma=journal_mode=wal", file, (60 * time.Second).Milliseconds())
|
||||
store := NewBlobStore(dsn)
|
||||
|
||||
testsuite.TestBlobStore(context.Background(), t, store)
|
||||
|
@ -13,9 +13,14 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
|
||||
_ "embed"
|
||||
|
||||
_ "modernc.org/sqlite"
|
||||
)
|
||||
|
||||
//go:embed document_store.sql
|
||||
var documentStoreSchema string
|
||||
|
||||
type DocumentStore struct {
|
||||
getDB GetDBFunc
|
||||
}
|
||||
@ -48,7 +53,7 @@ func (s *DocumentStore) Get(ctx context.Context, collection string, id storage.D
|
||||
|
||||
err := s.withTx(ctx, func(tx *sql.Tx) error {
|
||||
query := `
|
||||
SELECT id, data, created_at, updated_at
|
||||
SELECT id, revision, data, created_at, updated_at
|
||||
FROM documents
|
||||
WHERE collection = $1 AND id = $2
|
||||
`
|
||||
@ -59,9 +64,10 @@ func (s *DocumentStore) Get(ctx context.Context, collection string, id storage.D
|
||||
createdAt time.Time
|
||||
updatedAt time.Time
|
||||
data JSONMap
|
||||
revision int
|
||||
)
|
||||
|
||||
err := row.Scan(&id, &data, &createdAt, &updatedAt)
|
||||
err := row.Scan(&id, &revision, &data, &createdAt, &updatedAt)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return errors.WithStack(storage.ErrDocumentNotFound)
|
||||
@ -77,6 +83,7 @@ func (s *DocumentStore) Get(ctx context.Context, collection string, id storage.D
|
||||
document = storage.Document(data)
|
||||
|
||||
document[storage.DocumentAttrID] = id
|
||||
document[storage.DocumentAttrRevision] = revision
|
||||
document[storage.DocumentAttrCreatedAt] = createdAt
|
||||
document[storage.DocumentAttrUpdatedAt] = updatedAt
|
||||
|
||||
@ -119,7 +126,7 @@ func (s *DocumentStore) Query(ctx context.Context, collection string, filter *fi
|
||||
}
|
||||
|
||||
query := `
|
||||
SELECT id, data, created_at, updated_at
|
||||
SELECT id, revision, data, created_at, updated_at
|
||||
FROM documents
|
||||
WHERE collection = $1 AND (` + criteria + `)
|
||||
`
|
||||
@ -171,17 +178,19 @@ func (s *DocumentStore) Query(ctx context.Context, collection string, filter *fi
|
||||
for rows.Next() {
|
||||
var (
|
||||
id storage.DocumentID
|
||||
revision int
|
||||
createdAt time.Time
|
||||
updatedAt time.Time
|
||||
data JSONMap
|
||||
)
|
||||
|
||||
if err := rows.Scan(&id, &data, &createdAt, &updatedAt); err != nil {
|
||||
if err := rows.Scan(&id, &revision, &data, &createdAt, &updatedAt); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
document := storage.Document(data)
|
||||
document[storage.DocumentAttrID] = id
|
||||
document[storage.DocumentAttrRevision] = revision
|
||||
document[storage.DocumentAttrCreatedAt] = createdAt
|
||||
document[storage.DocumentAttrUpdatedAt] = updatedAt
|
||||
|
||||
@ -206,22 +215,16 @@ func (s *DocumentStore) Upsert(ctx context.Context, collection string, document
|
||||
var upsertedDocument storage.Document
|
||||
|
||||
err := s.withTx(ctx, func(tx *sql.Tx) error {
|
||||
query := `
|
||||
INSERT INTO documents (id, collection, data, created_at, updated_at)
|
||||
VALUES($1, $2, $3, $4, $4)
|
||||
ON CONFLICT (id, collection) DO UPDATE SET
|
||||
data = $3, updated_at = $4
|
||||
RETURNING "id", "data", "created_at", "updated_at"
|
||||
`
|
||||
|
||||
now := time.Now().UTC()
|
||||
|
||||
id, exists := document.ID()
|
||||
if !exists || id == "" {
|
||||
id = storage.NewDocumentID()
|
||||
}
|
||||
|
||||
args := []any{id, collection, JSONMap(document), now, now}
|
||||
query := `
|
||||
SELECT revision FROM documents WHERE id = $1
|
||||
`
|
||||
|
||||
args := []any{id}
|
||||
|
||||
logger.Debug(
|
||||
ctx, "executing query",
|
||||
@ -231,14 +234,44 @@ func (s *DocumentStore) Upsert(ctx context.Context, collection string, document
|
||||
|
||||
row := tx.QueryRowContext(ctx, query, args...)
|
||||
|
||||
var storedRevision int
|
||||
|
||||
if err := row.Scan(&storedRevision); err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
revision, found := document.Revision()
|
||||
if found && storedRevision != revision {
|
||||
return errors.Wrapf(storage.ErrDocumentRevisionConflict, "document revision '%d' does not match stored '%d'", revision, storedRevision)
|
||||
}
|
||||
|
||||
query = `
|
||||
INSERT INTO documents (id, collection, data, created_at, updated_at)
|
||||
VALUES($1, $2, $3, $4, $4)
|
||||
ON CONFLICT (id, collection) DO UPDATE SET
|
||||
data = $3, updated_at = $4, revision = revision + 1
|
||||
RETURNING "id", "revision", "data", "created_at", "updated_at"
|
||||
`
|
||||
|
||||
now := time.Now().UTC()
|
||||
|
||||
args = []any{id, collection, JSONMap(document), now, now}
|
||||
|
||||
logger.Debug(
|
||||
ctx, "executing query",
|
||||
logger.F("query", query),
|
||||
logger.F("args", args),
|
||||
)
|
||||
|
||||
row = tx.QueryRowContext(ctx, query, args...)
|
||||
|
||||
var (
|
||||
createdAt time.Time
|
||||
updatedAt time.Time
|
||||
data JSONMap
|
||||
)
|
||||
|
||||
err := row.Scan(&id, &data, &createdAt, &updatedAt)
|
||||
if err != nil {
|
||||
if err := row.Scan(&id, &revision, &data, &createdAt, &updatedAt); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
@ -249,6 +282,7 @@ func (s *DocumentStore) Upsert(ctx context.Context, collection string, document
|
||||
upsertedDocument = storage.Document(data)
|
||||
|
||||
upsertedDocument[storage.DocumentAttrID] = id
|
||||
upsertedDocument[storage.DocumentAttrRevision] = revision
|
||||
upsertedDocument[storage.DocumentAttrCreatedAt] = createdAt
|
||||
upsertedDocument[storage.DocumentAttrUpdatedAt] = updatedAt
|
||||
|
||||
@ -276,29 +310,13 @@ func (s *DocumentStore) withTx(ctx context.Context, fn func(tx *sql.Tx) error) e
|
||||
return nil
|
||||
}
|
||||
|
||||
func ensureDocumentTables(ctx context.Context, db *sql.DB) error {
|
||||
func migrateSchema(ctx context.Context, db *sql.DB) error {
|
||||
err := WithTx(ctx, db, func(tx *sql.Tx) error {
|
||||
query := `
|
||||
CREATE TABLE IF NOT EXISTS documents (
|
||||
id TEXT PRIMARY KEY,
|
||||
collection TEXT NOT NULL,
|
||||
data TEXT,
|
||||
created_at TIMESTAMP NOT NULL,
|
||||
updated_at TIMESTAMP NOT NULL,
|
||||
UNIQUE(id, collection) ON CONFLICT REPLACE
|
||||
);
|
||||
`
|
||||
if _, err := tx.ExecContext(ctx, query); err != nil {
|
||||
return errors.WithStack(err)
|
||||
for _, migr := range documentStoreMigrations {
|
||||
if err := migr(ctx, tx); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
}
|
||||
|
||||
query = `
|
||||
CREATE INDEX IF NOT EXISTS collection_idx ON documents (collection);
|
||||
`
|
||||
if _, err := tx.ExecContext(ctx, query); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
@ -344,7 +362,7 @@ func withLimitOffsetClause(query string, args []any, limit int, offset int) (str
|
||||
}
|
||||
|
||||
func NewDocumentStore(path string) *DocumentStore {
|
||||
getDB := NewGetDBFunc(path, ensureDocumentTables)
|
||||
getDB := NewGetDBFunc(path, migrateSchema)
|
||||
|
||||
return &DocumentStore{
|
||||
getDB: getDB,
|
||||
@ -352,7 +370,7 @@ func NewDocumentStore(path string) *DocumentStore {
|
||||
}
|
||||
|
||||
func NewDocumentStoreWithDB(db *sql.DB) *DocumentStore {
|
||||
getDB := NewGetDBFuncFromDB(db, ensureDocumentTables)
|
||||
getDB := NewGetDBFuncFromDB(db, migrateSchema)
|
||||
|
||||
return &DocumentStore{
|
||||
getDB: getDB,
|
||||
|
16
pkg/storage/driver/sqlite/document_store.sql
Normal file
16
pkg/storage/driver/sqlite/document_store.sql
Normal file
@ -0,0 +1,16 @@
|
||||
CREATE TABLE IF NOT EXISTS documents (
|
||||
id TEXT PRIMARY KEY,
|
||||
collection TEXT NOT NULL,
|
||||
data TEXT,
|
||||
created_at TIMESTAMP NOT NULL,
|
||||
updated_at TIMESTAMP NOT NULL,
|
||||
UNIQUE(id, collection) ON CONFLICT REPLACE
|
||||
);
|
||||
|
||||
---
|
||||
|
||||
CREATE INDEX IF NOT EXISTS collection_idx ON documents (collection);
|
||||
|
||||
---
|
||||
|
||||
ALTER TABLE documents ADD COLUMN revision INTEGER DEFAULT 0;
|
98
pkg/storage/driver/sqlite/document_store_migrations.go
Normal file
98
pkg/storage/driver/sqlite/document_store_migrations.go
Normal file
@ -0,0 +1,98 @@
|
||||
package sqlite
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
type MigrateFunc func(ctx context.Context, tx *sql.Tx) error
|
||||
|
||||
var documentStoreMigrations = []MigrateFunc{
|
||||
documentStoreMigrationBaseSchema,
|
||||
documentStoreMigrationAddIDIndex,
|
||||
documentStoreMigrationAddRevisionColumn,
|
||||
}
|
||||
|
||||
func documentStoreMigrationBaseSchema(ctx context.Context, tx *sql.Tx) error {
|
||||
query := `
|
||||
CREATE TABLE IF NOT EXISTS documents (
|
||||
id TEXT PRIMARY KEY,
|
||||
collection TEXT NOT NULL,
|
||||
data TEXT,
|
||||
created_at TIMESTAMP NOT NULL,
|
||||
updated_at TIMESTAMP NOT NULL,
|
||||
UNIQUE(id, collection) ON CONFLICT REPLACE
|
||||
);
|
||||
`
|
||||
|
||||
if _, err := tx.ExecContext(ctx, query); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func documentStoreMigrationAddIDIndex(ctx context.Context, tx *sql.Tx) error {
|
||||
query := `
|
||||
CREATE INDEX IF NOT EXISTS collection_idx ON documents (collection);
|
||||
`
|
||||
|
||||
if _, err := tx.ExecContext(ctx, query); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func documentStoreMigrationAddRevisionColumn(ctx context.Context, tx *sql.Tx) error {
|
||||
query := `PRAGMA table_info(documents)`
|
||||
|
||||
rows, err := tx.QueryContext(ctx, query)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := rows.Close(); err != nil {
|
||||
logger.Error(ctx, "could not close rows", logger.CapturedE(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
var hasRevisionColumn bool
|
||||
|
||||
for rows.Next() {
|
||||
var (
|
||||
id int
|
||||
name string
|
||||
dataType string
|
||||
nullable int
|
||||
defaultValue any
|
||||
primaryKey int
|
||||
)
|
||||
|
||||
if err := rows.Scan(&id, &name, &dataType, &nullable, &defaultValue, &primaryKey); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if name == "revision" {
|
||||
hasRevisionColumn = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if !hasRevisionColumn {
|
||||
query = `ALTER TABLE documents ADD COLUMN revision INTEGER DEFAULT 0`
|
||||
if _, err := tx.ExecContext(ctx, query); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
@ -22,7 +22,7 @@ func TestDocumentStore(t *testing.T) {
|
||||
t.Fatalf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
||||
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
|
||||
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d&_pragma=journal_mode=wal", file, (60 * time.Second).Milliseconds())
|
||||
store := NewDocumentStore(dsn)
|
||||
|
||||
testsuite.TestDocumentStore(context.Background(), t, store)
|
||||
|
@ -20,7 +20,7 @@ func init() {
|
||||
func documentStoreFactory(url *url.URL) (storage.DocumentStore, error) {
|
||||
dir := filepath.Dir(url.Host + url.Path)
|
||||
|
||||
if dir != ":memory:" {
|
||||
if dir != "." {
|
||||
if err := os.MkdirAll(dir, os.FileMode(0750)); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
@ -39,7 +39,7 @@ func documentStoreFactory(url *url.URL) (storage.DocumentStore, error) {
|
||||
func blobStoreFactory(url *url.URL) (storage.BlobStore, error) {
|
||||
dir := filepath.Dir(url.Host + url.Path)
|
||||
|
||||
if dir != ":memory:" {
|
||||
if dir != "." {
|
||||
if err := os.MkdirAll(dir, os.FileMode(0750)); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
@ -58,7 +58,7 @@ func blobStoreFactory(url *url.URL) (storage.BlobStore, error) {
|
||||
func shareStoreFactory(url *url.URL) (share.Store, error) {
|
||||
dir := filepath.Dir(url.Host + url.Path)
|
||||
|
||||
if dir != ":memory:" {
|
||||
if dir != "." {
|
||||
if err := os.MkdirAll(dir, os.FileMode(0750)); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
@ -26,7 +26,7 @@ func newTestStore(testName string) (share.Store, error) {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
|
||||
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d&_pragma=journal_mode=wal", file, (60 * time.Second).Milliseconds())
|
||||
store := NewShareStore(dsn)
|
||||
|
||||
return store, nil
|
||||
|
@ -185,8 +185,13 @@ var documentStoreOpsTestCases = []documentStoreOpsTestCase{
|
||||
return errors.Errorf("upsertedDoc[\"attr1\"]: expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
upsertedDocRevision, _ := upsertedDoc.Revision()
|
||||
if e, g := 0, upsertedDocRevision; e != g {
|
||||
return errors.Errorf("upsertedDoc.Revision(): expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
// Check that document does not have unexpected properties
|
||||
if e, g := 4, len(upsertedDoc); e != g {
|
||||
if e, g := 5, len(upsertedDoc); e != g {
|
||||
return errors.Errorf("len(upsertedDoc): expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
@ -217,6 +222,11 @@ var documentStoreOpsTestCases = []documentStoreOpsTestCase{
|
||||
return errors.New("upsertedDoc2.UpdatedAt() should have been different than upsertedDoc.UpdatedAt()")
|
||||
}
|
||||
|
||||
upsertedDoc2Revision, _ := upsertedDoc2.Revision()
|
||||
if e, g := 1, upsertedDoc2Revision; e != g {
|
||||
return errors.Errorf("upsertedDoc.Revision(): expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
// Verify that there is no additional created document in the collection
|
||||
|
||||
results, err := store.Query(ctx, collection, nil)
|
||||
@ -228,6 +238,11 @@ var documentStoreOpsTestCases = []documentStoreOpsTestCase{
|
||||
return errors.Errorf("len(results): expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
firstResultRevision, _ := results[0].Revision()
|
||||
if e, g := 1, firstResultRevision; e != g {
|
||||
return errors.Errorf("results[0].Revision(): expected '%v', got '%v'", e, g)
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
},
|
||||
@ -437,7 +452,6 @@ func testDocumentStoreOps(ctx context.Context, t *testing.T, store storage.Docum
|
||||
for _, tc := range documentStoreOpsTestCases {
|
||||
func(tc documentStoreOpsTestCase) {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
if err := tc.Run(ctx, store); err != nil {
|
||||
t.Errorf("%+v", errors.WithStack(err))
|
||||
}
|
||||
|
Reference in New Issue
Block a user