Compare commits

...

7 Commits

Author SHA1 Message Date
753a6c9708 fix: temporarily write blob directly as response body without http.ServeContent
All checks were successful
arcad/edge/pipeline/head This commit looks good
2023-12-05 14:18:22 +01:00
b120e590b6 fix: do not use goja.Value outside of run loop 2023-12-05 14:14:08 +01:00
242bf379a8 feat: rewrite cache blobstore driver parameters parsing
All checks were successful
arcad/edge/pipeline/head This commit looks good
2023-12-03 14:26:57 +01:00
065a9002a0 fix(storage): use missing cache driver options
All checks were successful
arcad/edge/pipeline/head This commit looks good
2023-12-01 15:20:12 +01:00
83a1e89665 feat: use forked version of bigcache to prevent 64bits misalignment problems
All checks were successful
arcad/edge/pipeline/head This commit looks good
See https://github.com/allegro/bigcache/issues/368
See https://golang.org/pkg/sync/atomic/#pkg-note-BUG
2023-12-01 12:22:53 +01:00
d9e8aac458 feat(packaging): rotate storage-server log files on alpine
All checks were successful
arcad/edge/pipeline/head This commit looks good
2023-11-30 19:54:00 +01:00
32f04af138 feat(storage): improve caching in cache driver
All checks were successful
arcad/edge/pipeline/head This commit looks good
ref #20
2023-11-30 19:09:51 +01:00
24 changed files with 593 additions and 131 deletions

2
.gitignore vendored
View File

@ -2,7 +2,7 @@
/bin
/.env
/tools
*.sqlite
*.sqlite*
/.gitea-release
/.edge
/data

View File

@ -108,10 +108,17 @@ nfpms:
file_info:
mode: 0640
packager: apk
- src: misc/packaging/openrc/storage-server.logrotate.conf
dst: /etc/logrotate.d/storage-server
packager: apk
- dst: /var/lib/storage-server
type: dir
file_info:
mode: 0700
packager: apk
- dst: /var/log/storage-server
type: dir
file_info:
mode: 0700
scripts:
postinstall: "misc/packaging/common/postinstall-storage-server.sh"

146
cmd/blobstore-test/main.go Normal file
View File

@ -0,0 +1,146 @@
package main
import (
"context"
"crypto/rand"
"flag"
"io"
mrand "math/rand"
"runtime"
"time"
"forge.cadoles.com/arcad/edge/pkg/storage"
"forge.cadoles.com/arcad/edge/pkg/storage/driver"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
_ "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache"
_ "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc"
_ "forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite"
)
var (
dsn string
)
func init() {
flag.StringVar(&dsn, "dsn", "cache://./test-cache.sqlite?driver=sqlite&_pragma=foreign_keys(1)&_pragma=journal_mode=wal&bigCacheShards=32&bigCacheHardMaxCacheSize=128&bigCacheMaxEntrySize=125&bigCacheMaxEntriesInWindow=200000", "blobstore dsn")
}
func main() {
flag.Parse()
ctx := context.Background()
logger.SetLevel(logger.LevelDebug)
blobStore, err := driver.NewBlobStore(dsn)
if err != nil {
logger.Fatal(ctx, "could not create blobstore", logger.CapturedE(errors.WithStack(err)))
}
bucket, err := blobStore.OpenBucket(ctx, "default")
if err != nil {
logger.Fatal(ctx, "could not open bucket", logger.CapturedE(errors.WithStack(err)))
}
defer func() {
if err := bucket.Close(); err != nil {
logger.Fatal(ctx, "could not close bucket", logger.CapturedE(errors.WithStack(err)))
}
}()
go readRandomBlobs(ctx, bucket)
for {
writeRandomBlob(ctx, bucket)
time.Sleep(1 * time.Second)
size, err := bucket.Size(ctx)
if err != nil {
logger.Fatal(ctx, "could not retrieve bucket size", logger.CapturedE(errors.WithStack(err)))
}
logger.Debug(ctx, "bucket stats", logger.F("size", size))
}
}
func readRandomBlobs(ctx context.Context, bucket storage.BlobBucket) {
for {
infos, err := bucket.List(ctx)
if err != nil {
logger.Fatal(ctx, "could not list blobs", logger.CapturedE(errors.WithStack(err)))
}
total := len(infos)
if total == 0 {
logger.Debug(ctx, "no blob yet")
continue
}
blob := infos[mrand.Intn(total)]
readBlob(ctx, bucket, blob.ID())
time.Sleep(250 * time.Millisecond)
}
}
func readBlob(ctx context.Context, bucket storage.BlobBucket, blobID storage.BlobID) {
ctx = logger.With(ctx, logger.F("blobID", blobID))
reader, err := bucket.NewReader(ctx, blobID)
if err != nil {
logger.Fatal(ctx, "could not create reader", logger.CapturedE(errors.WithStack(err)))
}
defer func() {
if err := reader.Close(); err != nil {
logger.Fatal(ctx, "could not close reader", logger.CapturedE(errors.WithStack(err)))
}
}()
if _, err := io.ReadAll(reader); err != nil {
logger.Fatal(ctx, "could not read blob", logger.CapturedE(errors.WithStack(err)))
}
}
func writeRandomBlob(ctx context.Context, bucket storage.BlobBucket) {
blobID := storage.NewBlobID()
buff := make([]byte, 10*1024)
writer, err := bucket.NewWriter(ctx, blobID)
if err != nil {
logger.Fatal(ctx, "could not create writer", logger.CapturedE(errors.WithStack(err)))
}
defer func() {
if err := writer.Close(); err != nil {
logger.Fatal(ctx, "could not close writer", logger.CapturedE(errors.WithStack(err)))
}
}()
if _, err := rand.Read(buff); err != nil {
logger.Fatal(ctx, "could not read random data", logger.CapturedE(errors.WithStack(err)))
}
if _, err := writer.Write(buff); err != nil {
logger.Fatal(ctx, "could not write blob", logger.CapturedE(errors.WithStack(err)))
}
printMemUsage(ctx)
}
func printMemUsage(ctx context.Context) {
var m runtime.MemStats
runtime.ReadMemStats(&m)
logger.Debug(
ctx, "memory usage",
logger.F("alloc", m.Alloc/1024/1024),
logger.F("totalAlloc", m.TotalAlloc/1024/1024),
logger.F("sys", m.Sys/1024/1024),
logger.F("numGC", m.NumGC),
)
}

View File

@ -107,10 +107,10 @@ func RunCommand() *cli.Command {
Usage: "use `FILE` as local accounts",
Value: ".edge/%APPID%/accounts.json",
},
&cli.IntFlag{
&cli.Int64Flag{
Name: "max-upload-size",
Usage: "use `MAX-UPLOAD-SIZE` as blob max upload size",
Value: 10 << (10 * 2), // 10Mb
Value: 128 << (10 * 2), // 128Mb
},
},
Action: func(ctx *cli.Context) error {
@ -123,7 +123,7 @@ func RunCommand() *cli.Command {
documentstoreDSN := ctx.String("documentstore-dsn")
shareStoreDSN := ctx.String("sharestore-dsn")
accountsFile := ctx.String("accounts-file")
maxUploadSize := ctx.Int("max-upload-size")
maxUploadSize := ctx.Int64("max-upload-size")
logger.SetFormat(logger.Format(logFormat))
logger.SetLevel(logger.Level(logLevel))
@ -182,7 +182,7 @@ func RunCommand() *cli.Command {
}
}
func runApp(ctx context.Context, path, address, documentStoreDSN, blobStoreDSN, shareStoreDSN, accountsFile string, appRepository appModule.Repository, maxUploadSize int) error {
func runApp(ctx context.Context, path, address, documentStoreDSN, blobStoreDSN, shareStoreDSN, accountsFile string, appRepository appModule.Repository, maxUploadSize int64) error {
absPath, err := filepath.Abs(path)
if err != nil {
return errors.Wrapf(err, "could not resolve path '%s'", path)

8
go.mod
View File

@ -3,25 +3,26 @@ module forge.cadoles.com/arcad/edge
go 1.21
require (
github.com/allegro/bigcache/v3 v3.1.0
github.com/getsentry/sentry-go v0.25.0
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/hashicorp/mdns v1.0.5
github.com/jackc/puddle/v2 v2.2.1
github.com/keegancsmith/rpc v1.3.0
github.com/klauspost/compress v1.16.6
github.com/lestrrat-go/jwx/v2 v2.0.8
github.com/ulikunitz/xz v0.5.11
go.uber.org/goleak v1.3.0
modernc.org/sqlite v1.20.4
)
require (
cloud.google.com/go v0.75.0 // indirect
github.com/allegro/bigcache/v3 v3.1.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
github.com/go-playground/locales v0.14.0 // indirect
github.com/go-playground/universal-translator v0.18.0 // indirect
github.com/goccy/go-json v0.9.11 // indirect
github.com/gogo/protobuf v0.0.0-20161014173244-50d1bd39ce4e // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/lestrrat-go/blackmagic v1.0.1 // indirect
github.com/lestrrat-go/httpcc v1.0.1 // indirect
@ -29,7 +30,6 @@ require (
github.com/lestrrat-go/iter v1.0.2 // indirect
github.com/lestrrat-go/option v1.0.0 // indirect
github.com/miekg/dns v1.1.53 // indirect
go.uber.org/goleak v1.3.0 // indirect
golang.org/x/sync v0.1.0 // indirect
google.golang.org/genproto v0.0.0-20210226172003-ab064af71705 // indirect
google.golang.org/grpc v1.35.0 // indirect
@ -87,3 +87,5 @@ require (
modernc.org/strutil v1.1.3 // indirect
modernc.org/token v1.0.1 // indirect
)
replace github.com/allegro/bigcache/v3 v3.1.0 => github.com/Bornholm/bigcache v0.0.0-20231201111725-1ddf51584cad

4
go.sum
View File

@ -37,6 +37,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Bornholm/bigcache v0.0.0-20231201111725-1ddf51584cad h1:PTOf0L/YjiVis5LYzJmi7WqttJ/h/DU6h06aJ24Kpbg=
github.com/Bornholm/bigcache v0.0.0-20231201111725-1ddf51584cad/go.mod h1:+q+mA6jGsjfsZ2HzhVSk38qDbX2/ZBJ7Yyciv75Ruo0=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/GeertJohan/go.incremental v1.0.0/go.mod h1:6fAjUhbVuX1KcMD3c8TEgVUqmo4seqhv0i0kdATSkM0=
@ -53,8 +55,6 @@ github.com/alecthomas/kong v0.2.1-0.20190708041108-0548c6b1afae/go.mod h1:+inYUS
github.com/alecthomas/kong-hcl v0.1.8-0.20190615233001-b21fea9723c8/go.mod h1:MRgZdU3vrFd05IQ89AxUZ0aYdF39BYoNFa324SodPCA=
github.com/alecthomas/repr v0.0.0-20180818092828-117648cd9897 h1:p9Sln00KOTlrYkxI1zYWl1QLnEqAqEARBEYa8FQnQcY=
github.com/alecthomas/repr v0.0.0-20180818092828-117648cd9897/go.mod h1:xTS7Pm1pD1mvyM075QCDSRqH6qRLXylzS24ZTpRiSzQ=
github.com/allegro/bigcache/v3 v3.1.0 h1:H2Vp8VOvxcrB91o86fUSVJFqeuz8kpyyB02eH3bSzwk=
github.com/allegro/bigcache/v3 v3.1.0/go.mod h1:aPyh7jEvrog9zAwx5N7+JUQX5dZTSGpxF1LAR4dr35I=
github.com/barnybug/go-cast v0.0.0-20201201064555-a87ccbc26692 h1:JW4WZlqyaNWUUahfr7MigeDW6jmtam5cTzzo1lwsFhE=
github.com/barnybug/go-cast v0.0.0-20201201064555-a87ccbc26692/go.mod h1:Au0ipPuCBA7zsOC61SnyrYetm8VT3vo1UJtwHeYke44=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=

View File

@ -0,0 +1,9 @@
/var/log/storage-server/storage-server.log {
missingok
sharedscripts
compress
rotate 7
postrotate
/etc/init.d/storage-server restart
endscript
}

View File

@ -3,7 +3,7 @@
command="/usr/bin/storage-server"
command_args="run"
supervisor=supervise-daemon
output_log="/var/log/storage-server.log"
output_log="/var/log/storage-server/storage-server.log"
error_log="$output_log"
depend() {

View File

@ -132,17 +132,17 @@ func (s *Server) Exec(ctx context.Context, callableOrFuncname any, args ...inter
value := result.value
if promise, ok := IsPromise(value); ok {
value = s.waitForPromise(promise)
return s.waitForPromise(promise), nil
}
return value.Export(), nil
}
}
func (s *Server) waitForPromise(promise *goja.Promise) goja.Value {
func (s *Server) waitForPromise(promise *goja.Promise) any {
var (
wg sync.WaitGroup
value goja.Value
value any
)
wg.Add(1)
@ -162,7 +162,7 @@ func (s *Server) waitForPromise(promise *goja.Promise) goja.Value {
return
}
value = promise.Result()
value = promise.Result().Export()
breakLoop = true
})

View File

@ -6,7 +6,6 @@ import (
"net/http"
"forge.cadoles.com/arcad/edge/pkg/bus"
"github.com/pkg/errors"
)
type contextKey string
@ -15,15 +14,16 @@ var (
contextKeyBus contextKey = "bus"
contextKeyHTTPRequest contextKey = "httpRequest"
contextKeyHTTPClient contextKey = "httpClient"
contextKeySessionID contextKey = "sessionId"
)
func (h *Handler) contextMiddleware(next http.Handler) http.Handler {
fn := func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
ctx = context.WithValue(ctx, contextKeyBus, h.bus)
ctx = context.WithValue(ctx, contextKeyHTTPRequest, r)
ctx = context.WithValue(ctx, contextKeyHTTPClient, h.httpClient)
ctx = WithContextBus(ctx, h.bus)
ctx = WithContextHTTPRequest(ctx, r)
ctx = WithContextHTTPClient(ctx, h.httpClient)
r = r.WithContext(ctx)
@ -33,23 +33,43 @@ func (h *Handler) contextMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(fn)
}
func ContextBus(ctx context.Context) bus.Bus {
func ContextBus(ctx context.Context) (bus.Bus, bool) {
return contextValue[bus.Bus](ctx, contextKeyBus)
}
func ContextHTTPRequest(ctx context.Context) *http.Request {
func WithContextBus(parent context.Context, bus bus.Bus) context.Context {
return context.WithValue(parent, contextKeyBus, bus)
}
func ContextHTTPRequest(ctx context.Context) (*http.Request, bool) {
return contextValue[*http.Request](ctx, contextKeyHTTPRequest)
}
func ContextHTTPClient(ctx context.Context) *http.Client {
func WithContextHTTPRequest(parent context.Context, request *http.Request) context.Context {
return context.WithValue(parent, contextKeyHTTPRequest, request)
}
func ContextHTTPClient(ctx context.Context) (*http.Client, bool) {
return contextValue[*http.Client](ctx, contextKeyHTTPClient)
}
func contextValue[T any](ctx context.Context, key any) T {
func WithContextHTTPClient(parent context.Context, client *http.Client) context.Context {
return context.WithValue(parent, contextKeyHTTPClient, client)
}
func ContextSessionID(ctx context.Context) (string, bool) {
return contextValue[string](ctx, contextKeySessionID)
}
func WithContextSessionID(parent context.Context, sessionID string) context.Context {
return context.WithValue(parent, contextKeySessionID, sessionID)
}
func contextValue[T any](ctx context.Context, key any) (T, bool) {
value, ok := ctx.Value(key).(T)
if !ok {
panic(errors.Errorf("could not find key '%v' on context", key))
return *new(T), false
}
return value
return value, true
}

View File

@ -102,14 +102,12 @@ func NewHandler(funcs ...HandlerOptionFunc) *Handler {
r.Get("/client.js.map", handler.handleSDKClientMap)
})
r.Group(func(r chi.Router) {
r.Use(handler.contextMiddleware)
for _, fn := range opts.HTTPMounts {
r.Group(func(r chi.Router) {
fn(r)
})
}
})
for _, fn := range opts.HTTPMounts {
r.Group(func(r chi.Router) {
r.Use(handler.contextMiddleware)
fn(r)
})
}
r.HandleFunc("/sock/*", handler.handleSockJS)
})

View File

@ -5,7 +5,6 @@ import (
"encoding/json"
"net/http"
"forge.cadoles.com/arcad/edge/pkg/module"
"github.com/igm/sockjs-go/v3/sockjs"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
@ -15,11 +14,6 @@ const (
statusChannelClosed = iota
)
const (
ContextKeySessionID module.ContextKey = "sessionId"
ContextKeyOriginRequest module.ContextKey = "originRequest"
)
func (h *Handler) handleSockJS(w http.ResponseWriter, r *http.Request) {
h.mutex.RLock()
defer h.mutex.RUnlock()
@ -181,10 +175,8 @@ func (h *Handler) handleIncomingMessages(ctx context.Context, sess sockjs.Sessio
}
ctx := logger.With(ctx, logger.F("payload", payload))
ctx = module.WithContext(ctx, map[module.ContextKey]any{
ContextKeySessionID: sess.ID(),
ContextKeyOriginRequest: sess.Request(),
})
ctx = WithContextHTTPRequest(ctx, sess.Request())
ctx = WithContextSessionID(ctx, sess.ID())
incomingMessage := NewIncomingMessageEnvelope(ctx, payload)

View File

@ -1,10 +1,8 @@
package auth
import (
"net/http"
"forge.cadoles.com/arcad/edge/pkg/app"
edgeHTTP "forge.cadoles.com/arcad/edge/pkg/http"
edgehttp "forge.cadoles.com/arcad/edge/pkg/http"
"forge.cadoles.com/arcad/edge/pkg/jwtutil"
"forge.cadoles.com/arcad/edge/pkg/module/util"
"github.com/dop251/goja"
@ -68,7 +66,7 @@ func (m *Module) getClaim(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
ctx := util.AssertContext(call.Argument(0), rt)
claimName := util.AssertString(call.Argument(1), rt)
req, ok := ctx.Value(edgeHTTP.ContextKeyOriginRequest).(*http.Request)
req, ok := edgehttp.ContextHTTPRequest(ctx)
if !ok {
panic(rt.ToValue(errors.New("could not find http request in context")))
}

View File

@ -9,7 +9,7 @@ import (
"cdr.dev/slog"
"forge.cadoles.com/arcad/edge/pkg/app"
edgeHTTP "forge.cadoles.com/arcad/edge/pkg/http"
edgehttp "forge.cadoles.com/arcad/edge/pkg/http"
"forge.cadoles.com/arcad/edge/pkg/jwtutil"
"forge.cadoles.com/arcad/edge/pkg/module"
"github.com/lestrrat-go/jwx/v2/jwa"
@ -71,7 +71,7 @@ func TestAuthModule(t *testing.T) {
req.Header.Add("Authorization", "Bearer "+string(rawToken))
ctx = context.WithValue(context.Background(), edgeHTTP.ContextKeyOriginRequest, req)
ctx = edgehttp.WithContextHTTPRequest(context.Background(), req)
if _, err := server.ExecFuncByName(ctx, "testAuth", ctx); err != nil {
t.Fatalf("%+v", errors.WithStack(err))
@ -111,7 +111,7 @@ func TestAuthAnonymousModule(t *testing.T) {
t.Fatalf("%+v", errors.WithStack(err))
}
ctx = context.WithValue(context.Background(), edgeHTTP.ContextKeyOriginRequest, req)
ctx = edgehttp.WithContextHTTPRequest(context.Background(), req)
if _, err := server.ExecFuncByName(ctx, "testAuth", ctx); err != nil {
t.Fatalf("%+v", errors.WithStack(err))

View File

@ -2,6 +2,7 @@ package blob
import (
"encoding/json"
"io"
"io/fs"
"mime/multipart"
"net/http"
@ -21,19 +22,17 @@ type uploadResponse struct {
BlobID storage.BlobID `json:"blobId"`
}
func Mount(uploadMaxFileSize int) func(r chi.Router) {
func Mount(uploadMaxFileSize int64) func(r chi.Router) {
return func(r chi.Router) {
r.Post("/api/v1/upload", getAppUploadHandler(uploadMaxFileSize))
r.Get("/api/v1/download/{bucket}/{blobID}", handleAppDownload)
}
}
func getAppUploadHandler(fileMaxUpload int) func(w http.ResponseWriter, r *http.Request) {
func getAppUploadHandler(uploadMaxFileSize int64) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
uploadMaxFileSize := int64(8000)
r.Body = http.MaxBytesReader(w, r.Body, uploadMaxFileSize)
if err := r.ParseMultipartForm(uploadMaxFileSize); err != nil {
@ -65,7 +64,13 @@ func getAppUploadHandler(fileMaxUpload int) func(w http.ResponseWriter, r *http.
requestEnv := NewUploadRequestEnvelope(ctx, fileHeader, metadata)
bus := edgehttp.ContextBus(ctx)
bus, ok := edgehttp.ContextBus(ctx)
if !ok {
logger.Error(ctx, "could find bus on context")
edgehttp.JSONError(w, http.StatusInternalServerError, edgehttp.ErrCodeInternalError)
return
}
reply, err := bus.Request(ctx, requestEnv)
if err != nil {
@ -114,7 +119,13 @@ func handleAppDownload(w http.ResponseWriter, r *http.Request) {
requestMsg := NewDownloadRequestEnvelope(ctx, bucket, storage.BlobID(blobID))
bs := edgehttp.ContextBus(ctx)
bs, ok := edgehttp.ContextBus(ctx)
if !ok {
logger.Error(ctx, "could find bus on context")
edgehttp.JSONError(w, http.StatusInternalServerError, edgehttp.ErrCodeInternalError)
return
}
reply, err := bs.Request(ctx, requestMsg)
if err != nil {
@ -154,7 +165,14 @@ func handleAppDownload(w http.ResponseWriter, r *http.Request) {
}
}()
http.ServeContent(w, r, string(replyMessage.BlobInfo.ID()), replyMessage.BlobInfo.ModTime(), replyMessage.Blob)
// TODO Fix usage of ServeContent
// http.ServeContent(w, r, string(replyMessage.BlobInfo.ID()), replyMessage.BlobInfo.ModTime(), replyMessage.Blob)
w.Header().Add("Content-Type", replyMessage.BlobInfo.ContentType())
if _, err := io.Copy(w, replyMessage.Blob); err != nil {
logger.Error(ctx, "could not write blob", logger.CapturedE(errors.WithStack(err)))
}
}
type uploadedFile struct {

View File

@ -31,7 +31,13 @@ func handleAppFetch(w http.ResponseWriter, r *http.Request) {
requestMsg := NewFetchRequestEnvelope(ctx, r.RemoteAddr, url)
bus := edgehttp.ContextBus(ctx)
bus, ok := edgehttp.ContextBus(ctx)
if !ok {
logger.Error(ctx, "could find bus on context")
edgehttp.JSONError(w, http.StatusInternalServerError, edgehttp.ErrCodeInternalError)
return
}
reply, err := bus.Request(ctx, requestMsg)
if err != nil {
@ -79,7 +85,13 @@ func handleAppFetch(w http.ResponseWriter, r *http.Request) {
proxyReq.Header.Add("X-Forwarded-From", r.RemoteAddr)
httpClient := edgehttp.ContextHTTPClient(ctx)
httpClient, ok := edgehttp.ContextHTTPClient(ctx)
if !ok {
logger.Error(ctx, "could find http client on context")
edgehttp.JSONError(w, http.StatusInternalServerError, edgehttp.ErrCodeInternalError)
return
}
res, err := httpClient.Do(proxyReq)
if err != nil {

View File

@ -6,7 +6,6 @@ import (
"forge.cadoles.com/arcad/edge/pkg/app"
"forge.cadoles.com/arcad/edge/pkg/bus"
edgehttp "forge.cadoles.com/arcad/edge/pkg/http"
"forge.cadoles.com/arcad/edge/pkg/module"
"forge.cadoles.com/arcad/edge/pkg/module/util"
"github.com/dop251/goja"
"github.com/pkg/errors"
@ -57,7 +56,10 @@ func (m *Module) send(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
sessionID, ok := firstArg.Export().(string)
if !ok {
ctx := util.AssertContext(firstArg, rt)
sessionID = module.ContextValue[string](ctx, edgehttp.ContextKeySessionID)
sessionID, ok = edgehttp.ContextSessionID(ctx)
if !ok {
panic(rt.ToValue(errors.New("could not find session id in context")))
}
}
data := call.Argument(1).Export()

View File

@ -7,7 +7,6 @@ import (
"forge.cadoles.com/arcad/edge/pkg/app"
"forge.cadoles.com/arcad/edge/pkg/bus"
edgehttp "forge.cadoles.com/arcad/edge/pkg/http"
"forge.cadoles.com/arcad/edge/pkg/module"
"forge.cadoles.com/arcad/edge/pkg/module/util"
"github.com/dop251/goja"
"github.com/pkg/errors"
@ -143,10 +142,15 @@ func (m *Module) handleIncomingHTTPMessages(ctx context.Context, incoming <-chan
continue
}
requestCtx := logger.With(msg.Context, logger.F("rpcRequestMethod", jsonReq.Method), logger.F("rpcRequestID", jsonReq.ID))
sessionID, ok := edgehttp.ContextSessionID(msg.Context)
if !ok {
logger.Error(ctx, "could not find session id in context")
continue
}
request := NewRequestEnvelope(msg.Context, jsonReq.Method, jsonReq.Params)
sessionID := module.ContextValue[string](msg.Context, edgehttp.ContextKeySessionID)
requestCtx := logger.With(msg.Context, logger.F("rpcRequestMethod", jsonReq.Method), logger.F("rpcRequestID", jsonReq.ID))
reply, err := m.bus.Request(requestCtx, request)
if err != nil {

View File

@ -7,26 +7,28 @@ import (
"forge.cadoles.com/arcad/edge/pkg/storage"
"github.com/allegro/bigcache/v3"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
type BlobBucket struct {
bucket storage.BlobBucket
cache *bigcache.BigCache
bucket storage.BlobBucket
contentCache *bigcache.BigCache
blobInfoCache *expirable.LRU[string, storage.BlobInfo]
bucketCache *expirable.LRU[string, storage.BlobBucket]
}
// Close implements storage.BlobBucket.
func (b *BlobBucket) Close() error {
if err := b.bucket.Close(); err != nil {
return errors.WithStack(err)
}
// Close only when bucket is evicted from cache
return nil
}
// Delete implements storage.BlobBucket.
func (b *BlobBucket) Delete(ctx context.Context, id storage.BlobID) error {
defer b.clearCache(ctx, id)
if err := b.bucket.Delete(ctx, id); err != nil {
return errors.WithStack(err)
}
@ -36,11 +38,28 @@ func (b *BlobBucket) Delete(ctx context.Context, id storage.BlobID) error {
// Get implements storage.BlobBucket.
func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobInfo, error) {
key := b.getCacheKey(id)
if blobInfo, ok := b.blobInfoCache.Get(key); ok {
logger.Debug(
ctx, "found blob info in cache",
logger.F("cacheKey", key),
)
return blobInfo, nil
}
info, err := b.bucket.Get(ctx, id)
if err != nil {
if errors.Is(err, storage.ErrBucketClosed) {
b.clearCache(ctx, id)
b.bucketCache.Remove(b.Name())
}
return nil, errors.WithStack(err)
}
b.blobInfoCache.Add(key, info)
return info, nil
}
@ -48,9 +67,18 @@ func (b *BlobBucket) Get(ctx context.Context, id storage.BlobID) (storage.BlobIn
func (b *BlobBucket) List(ctx context.Context) ([]storage.BlobInfo, error) {
infos, err := b.bucket.List(ctx)
if err != nil {
if errors.Is(err, storage.ErrBucketClosed) {
b.bucketCache.Remove(b.Name())
}
return nil, errors.WithStack(err)
}
for _, ifo := range infos {
key := b.getCacheKey(ifo.ID())
b.blobInfoCache.Add(key, ifo)
}
return infos, nil
}
@ -61,19 +89,28 @@ func (b *BlobBucket) Name() string {
// NewReader implements storage.BlobBucket.
func (b *BlobBucket) NewReader(ctx context.Context, id storage.BlobID) (io.ReadSeekCloser, error) {
if cached, exist := b.inCache(id); exist {
logger.Debug(ctx, "found blob in cache", logger.F("cacheKey", b.getCacheKey(id)), logger.F("cacheStats", b.cache.Stats()))
if cached, exist := b.inContentCache(id); exist {
logger.Debug(
ctx, "found blob content in cache",
logger.F("cacheKey", b.getCacheKey(id)),
logger.F("cacheStats", b.contentCache.Stats()),
)
return cached, nil
}
reader, err := b.bucket.NewReader(ctx, id)
if err != nil {
if errors.Is(err, storage.ErrBucketClosed) {
b.clearCache(ctx, id)
b.bucketCache.Remove(b.Name())
}
return nil, errors.WithStack(err)
}
return &readCacher{
reader: reader,
cache: b.cache,
cache: b.contentCache,
key: b.getCacheKey(id),
}, nil
}
@ -82,9 +119,9 @@ func (b *BlobBucket) getCacheKey(id storage.BlobID) string {
return fmt.Sprintf("%s-%s", b.Name(), id)
}
func (b *BlobBucket) inCache(id storage.BlobID) (io.ReadSeekCloser, bool) {
func (b *BlobBucket) inContentCache(id storage.BlobID) (io.ReadSeekCloser, bool) {
key := b.getCacheKey(id)
data, err := b.cache.Get(key)
data, err := b.contentCache.Get(key)
if err != nil {
if errors.Is(err, bigcache.ErrEntryNotFound) {
return nil, false
@ -98,10 +135,28 @@ func (b *BlobBucket) inCache(id storage.BlobID) (io.ReadSeekCloser, bool) {
return &cachedReader{data, 0}, true
}
func (b *BlobBucket) clearCache(ctx context.Context, id storage.BlobID) {
key := b.getCacheKey(id)
logger.Debug(ctx, "clearing cache", logger.F("cacheKey", key))
if err := b.contentCache.Delete(key); err != nil && !errors.Is(err, bigcache.ErrEntryNotFound) {
logger.Error(ctx, "could not clear cache", logger.CapturedE(errors.WithStack(err)))
}
b.blobInfoCache.Remove(key)
}
// NewWriter implements storage.BlobBucket.
func (b *BlobBucket) NewWriter(ctx context.Context, id storage.BlobID) (io.WriteCloser, error) {
defer b.clearCache(ctx, id)
writer, err := b.bucket.NewWriter(ctx, id)
if err != nil {
if errors.Is(err, storage.ErrBucketClosed) {
b.bucketCache.Remove(b.Name())
}
return nil, errors.WithStack(err)
}
@ -112,6 +167,10 @@ func (b *BlobBucket) NewWriter(ctx context.Context, id storage.BlobID) (io.Write
func (b *BlobBucket) Size(ctx context.Context) (int64, error) {
size, err := b.bucket.Size(ctx)
if err != nil {
if errors.Is(err, storage.ErrBucketClosed) {
b.bucketCache.Remove(b.Name())
}
return 0, errors.WithStack(err)
}

View File

@ -5,12 +5,16 @@ import (
"forge.cadoles.com/arcad/edge/pkg/storage"
"github.com/allegro/bigcache/v3"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
type BlobStore struct {
store storage.BlobStore
cache *bigcache.BigCache
store storage.BlobStore
contentCache *bigcache.BigCache
bucketCache *expirable.LRU[string, storage.BlobBucket]
blobInfoCache *expirable.LRU[string, storage.BlobInfo]
}
// DeleteBucket implements storage.BlobStore.
@ -19,6 +23,8 @@ func (s *BlobStore) DeleteBucket(ctx context.Context, name string) error {
return errors.WithStack(err)
}
s.bucketCache.Remove(name)
return nil
}
@ -34,22 +40,59 @@ func (s *BlobStore) ListBuckets(ctx context.Context) ([]string, error) {
// OpenBucket implements storage.BlobStore.
func (s *BlobStore) OpenBucket(ctx context.Context, name string) (storage.BlobBucket, error) {
bucket, ok := s.bucketCache.Get(name)
if ok {
logger.Debug(ctx, "found bucket in cache", logger.F("name", name))
return &BlobBucket{
bucket: bucket,
contentCache: s.contentCache,
blobInfoCache: s.blobInfoCache,
bucketCache: s.bucketCache,
}, nil
}
bucket, err := s.store.OpenBucket(ctx, name)
if err != nil {
return nil, errors.WithStack(err)
}
s.bucketCache.Add(name, bucket)
return &BlobBucket{
bucket: bucket,
cache: s.cache,
bucket: bucket,
contentCache: s.contentCache,
blobInfoCache: s.blobInfoCache,
bucketCache: s.bucketCache,
}, nil
}
func NewBlobStore(store storage.BlobStore, cache *bigcache.BigCache) *BlobStore {
return &BlobStore{
store: store,
cache: cache,
func NewBlobStore(store storage.BlobStore, funcs ...OptionFunc) (*BlobStore, error) {
options := NewOptions(funcs...)
contentCache, err := bigcache.New(context.Background(), options.BigCache)
if err != nil {
return nil, errors.WithStack(err)
}
onBlobBucketEvict := func(key string, bucket storage.BlobBucket) {
ctx := context.Background()
logger.Debug(ctx, "evicting blob bucket from cache", logger.F("cacheKey", key))
if err := bucket.Close(); err != nil {
logger.Error(ctx, "could not close bucket", logger.E(errors.WithStack(err)))
}
}
bucketCache := expirable.NewLRU[string, storage.BlobBucket](options.BucketCacheSize, onBlobBucketEvict, options.CacheTTL)
blobInfoCache := expirable.NewLRU[string, storage.BlobInfo](options.BlobInfoCacheSize, nil, options.CacheTTL)
return &BlobStore{
store: store,
contentCache: contentCache,
bucketCache: bucketCache,
blobInfoCache: blobInfoCache,
}, nil
}
var _ storage.BlobStore = &BlobStore{}

View File

@ -9,7 +9,6 @@ import (
"forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite"
"forge.cadoles.com/arcad/edge/pkg/storage/testsuite"
"github.com/allegro/bigcache/v3"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
@ -30,13 +29,11 @@ func TestBlobStore(t *testing.T) {
backend := sqlite.NewBlobStore(dsn)
cache, err := bigcache.New(context.Background(), bigcache.DefaultConfig(time.Minute))
store, err := NewBlobStore(backend)
if err != nil {
t.Fatalf("%+v", errors.WithStack(err))
}
store := NewBlobStore(backend, cache)
testsuite.TestBlobStore(context.Background(), t, store)
}
@ -52,12 +49,10 @@ func BenchmarkBlobStore(t *testing.B) {
dsn := fmt.Sprintf("%s?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", file, (60 * time.Second).Milliseconds())
backend := sqlite.NewBlobStore(dsn)
cache, err := bigcache.New(context.Background(), bigcache.DefaultConfig(time.Minute))
store, err := NewBlobStore(backend)
if err != nil {
t.Fatalf("%+v", errors.WithStack(err))
}
store := NewBlobStore(backend, cache)
testsuite.BenchmarkBlobStore(t, store)
}

View File

@ -28,54 +28,48 @@ func blobStoreFactory(dsn *url.URL) (storage.BlobStore, error) {
query.Del("driver")
cacheTTL := time.Minute * 60
blobStoreOptionFuncs := make([]OptionFunc, 0)
rawCacheTTL := query.Get("cacheTTL")
if rawCacheTTL != "" {
query.Del("cacheTTL")
ttl, err := time.ParseDuration(rawCacheTTL)
if err != nil {
return nil, errors.Wrap(err, "could not parse url parameter 'cacheTTL'")
cacheTTL, err := parseDuration(&query, "cacheTTL")
if err != nil {
if !errors.Is(err, errNotFound) {
return nil, errors.WithStack(err)
}
cacheTTL = ttl
cacheTTL = time.Hour
}
cacheConfig := bigcache.DefaultConfig(cacheTTL)
cacheConfig.Logger = &cacheLogger{}
blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithCacheTTL(cacheTTL))
rawCacheShards := query.Get("cacheShards")
if rawCacheShards != "" {
query.Del("cacheShards")
cacheShards, err := strconv.ParseInt(rawCacheShards, 10, 32)
if err != nil {
return nil, errors.Wrap(err, "could not parse url parameter 'cacheShards'")
}
cacheConfig.Shards = int(cacheShards)
cacheConfig, err := parseBigCacheConfig(&query, cacheTTL)
if err != nil {
return nil, errors.Wrap(err, "could not parse big cache config")
}
rawMaxCacheSize := query.Get("maxCacheSize")
if rawMaxCacheSize != "" {
query.Del("maxCacheSize")
blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBigCacheConfig(*cacheConfig))
maxCacheSize, err := strconv.ParseInt(rawMaxCacheSize, 10, 32)
if err != nil {
return nil, errors.Wrap(err, "could not parse url parameter 'maxCacheSize'")
blobBucketCacheSize, err := parseInt(&query, "blobBucketCacheSize")
if err != nil {
if !errors.Is(err, errNotFound) {
return nil, errors.WithStack(err)
}
// See cacheConfig.HardMaxCacheSize documentation
var minCacheSize int64 = (2 * (64 + 32) * int64(cacheConfig.Shards)) / 1000
if maxCacheSize < minCacheSize {
return nil, errors.Errorf("max cache size can not be set to a value below '%d'", minCacheSize)
}
cacheConfig.HardMaxCacheSize = int(maxCacheSize)
blobBucketCacheSize = 16
}
blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBucketCacheSize(int(blobBucketCacheSize)))
bloInfoCacheSize, err := parseInt(&query, "bloInfoCacheSize")
if err != nil {
if !errors.Is(err, errNotFound) {
return nil, errors.WithStack(err)
}
bloInfoCacheSize = 16
}
blobStoreOptionFuncs = append(blobStoreOptionFuncs, WithBlobInfoCacheSize(int(bloInfoCacheSize)))
url := &url.URL{
Scheme: rawDriver,
Host: dsn.Host,
@ -83,17 +77,17 @@ func blobStoreFactory(dsn *url.URL) (storage.BlobStore, error) {
RawQuery: query.Encode(),
}
store, err := driver.NewBlobStore(url.String())
backend, err := driver.NewBlobStore(url.String())
if err != nil {
return nil, errors.WithStack(err)
}
cache, err := bigcache.New(context.Background(), cacheConfig)
store, err := NewBlobStore(backend, blobStoreOptionFuncs...)
if err != nil {
return nil, errors.WithStack(err)
}
return NewBlobStore(store, cache), nil
return store, nil
}
type cacheLogger struct{}
@ -103,3 +97,110 @@ func (l *cacheLogger) Printf(format string, v ...interface{}) {
}
var _ bigcache.Logger = &cacheLogger{}
func parseBigCacheConfig(query *url.Values, cacheTTL time.Duration) (*bigcache.Config, error) {
config := bigcache.DefaultConfig(cacheTTL)
config.Logger = &cacheLogger{}
hardMaxCacheSize, err := parseInt(query, "bigCacheHardMaxCacheSize")
if err != nil {
if !errors.Is(err, errNotFound) {
return nil, errors.WithStack(err)
}
hardMaxCacheSize = int64(config.HardMaxCacheSize)
}
config.HardMaxCacheSize = int(hardMaxCacheSize)
maxEntriesInWindow, err := parseInt(query, "bigCacheMaxEntriesInWindow")
if err != nil {
if !errors.Is(err, errNotFound) {
return nil, errors.WithStack(err)
}
maxEntriesInWindow = int64(config.MaxEntriesInWindow)
}
config.MaxEntriesInWindow = int(maxEntriesInWindow)
shards, err := parseInt(query, "bigCacheShards")
if err != nil {
if !errors.Is(err, errNotFound) {
return nil, errors.WithStack(err)
}
shards = int64(config.Shards)
}
config.Shards = int(shards)
maxEntrySize, err := parseInt(query, "bigCacheMaxEntrySize")
if err != nil {
if !errors.Is(err, errNotFound) {
return nil, errors.WithStack(err)
}
maxEntrySize = int64(config.MaxEntrySize)
}
config.MaxEntrySize = int(maxEntrySize)
cleanWindow, err := parseDuration(query, "bigCacheCleanWindow")
if err != nil {
if !errors.Is(err, errNotFound) {
return nil, errors.WithStack(err)
}
cleanWindow = config.CleanWindow
}
config.CleanWindow = cleanWindow
lifeWindow, err := parseDuration(query, "bigCacheLifeWindow")
if err != nil {
if !errors.Is(err, errNotFound) {
return nil, errors.WithStack(err)
}
lifeWindow = config.LifeWindow
}
config.LifeWindow = lifeWindow
return &config, nil
}
var errNotFound = errors.New("not found")
func parseInt(query *url.Values, name string) (int64, error) {
rawValue := query.Get(name)
if rawValue != "" {
query.Del(name)
value, err := strconv.ParseInt(rawValue, 10, 32)
if err != nil {
return 0, errors.Wrapf(err, "could not parse url parameter '%s'", name)
}
return value, nil
}
return 0, errors.WithStack(errNotFound)
}
func parseDuration(query *url.Values, name string) (time.Duration, error) {
rawValue := query.Get(name)
if rawValue != "" {
query.Del(name)
value, err := time.ParseDuration(rawValue)
if err != nil {
return 0, errors.Wrapf(err, "could not parse url parameter '%s'", name)
}
return value, nil
}
return 0, errors.WithStack(errNotFound)
}

56
pkg/storage/driver/cache/options.go vendored Normal file
View File

@ -0,0 +1,56 @@
package cache
import (
"time"
"github.com/allegro/bigcache/v3"
)
type Options struct {
CacheTTL time.Duration
BigCache bigcache.Config
BucketCacheSize int
BlobInfoCacheSize int
}
type OptionFunc func(opts *Options)
func NewOptions(funcs ...OptionFunc) *Options {
defaultTTL := 60 * time.Minute
opts := &Options{
CacheTTL: defaultTTL,
BigCache: bigcache.DefaultConfig(defaultTTL),
BucketCacheSize: 16,
BlobInfoCacheSize: 256,
}
for _, fn := range funcs {
fn(opts)
}
return opts
}
func WithCacheTTL(ttl time.Duration) OptionFunc {
return func(opts *Options) {
opts.CacheTTL = ttl
}
}
func WithBigCacheConfig(config bigcache.Config) OptionFunc {
return func(opts *Options) {
opts.BigCache = config
}
}
func WithBucketCacheSize(size int) OptionFunc {
return func(opts *Options) {
opts.BucketCacheSize = size
}
}
func WithBlobInfoCacheSize(size int) OptionFunc {
return func(opts *Options) {
opts.BlobInfoCacheSize = size
}
}

View File

@ -20,7 +20,7 @@ func init() {
func documentStoreFactory(url *url.URL) (storage.DocumentStore, error) {
dir := filepath.Dir(url.Host + url.Path)
if dir != ":memory:" {
if dir != "." {
if err := os.MkdirAll(dir, os.FileMode(0750)); err != nil {
return nil, errors.WithStack(err)
}
@ -39,7 +39,7 @@ func documentStoreFactory(url *url.URL) (storage.DocumentStore, error) {
func blobStoreFactory(url *url.URL) (storage.BlobStore, error) {
dir := filepath.Dir(url.Host + url.Path)
if dir != ":memory:" {
if dir != "." {
if err := os.MkdirAll(dir, os.FileMode(0750)); err != nil {
return nil, errors.WithStack(err)
}
@ -58,7 +58,7 @@ func blobStoreFactory(url *url.URL) (storage.BlobStore, error) {
func shareStoreFactory(url *url.URL) (share.Store, error) {
dir := filepath.Dir(url.Host + url.Path)
if dir != ":memory:" {
if dir != "." {
if err := os.MkdirAll(dir, os.FileMode(0750)); err != nil {
return nil, errors.WithStack(err)
}