edge/cmd/storage-server/command/run.go

193 lines
5.4 KiB
Go

package command
import (
"fmt"
"net/http"
"strings"
"sync"
"time"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/keegancsmith/rpc"
"gitlab.com/wpetit/goweb/logger"
"forge.cadoles.com/arcad/edge/pkg/storage/registry"
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/server"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/pkg/errors"
"github.com/urfave/cli/v2"
// Register storage schemes
_ "forge.cadoles.com/arcad/edge/pkg/storage/rpc/registry"
_ "forge.cadoles.com/arcad/edge/pkg/storage/sqlite/registry"
)
func Run() *cli.Command {
return &cli.Command{
Name: "run",
Usage: "Run server",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "address",
Aliases: []string{"addr"},
Value: ":3001",
},
&cli.StringFlag{
Name: "blobstore-dsn-pattern",
EnvVars: []string{"STORAGE_SERVER_BLOBSTORE_DSN_PATTERN"},
Value: fmt.Sprintf("sqlite://data/%%TENANT%%/%%APPID%%/blobstore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", (60 * time.Second).Milliseconds()),
},
&cli.StringFlag{
Name: "documentstore-dsn-pattern",
EnvVars: []string{"STORAGE_SERVER_DOCUMENTSTORE_DSN_PATTERN"},
Value: fmt.Sprintf("sqlite://data/%%TENANT%%/%%APPID%%/documentstore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", (60 * time.Second).Milliseconds()),
},
&cli.DurationFlag{
Name: "cache-ttl",
EnvVars: []string{"STORAGE_SERVER_CACHE_TTL"},
Value: time.Hour,
},
&cli.IntFlag{
Name: "cache-size",
EnvVars: []string{"STORAGE_SERVER_CACHE_SIZE"},
Value: 32,
},
},
Action: func(ctx *cli.Context) error {
addr := ctx.String("address")
blobstoreDSNPattern := ctx.String("blobstore-dsn-pattern")
documentstoreDSNPattern := ctx.String("documentstore-dsn-pattern")
cacheSize := ctx.Int("cache-size")
cacheTTL := ctx.Duration("cache-ttl")
router := chi.NewRouter()
router.Use(middleware.RealIP)
router.Use(middleware.Logger)
router.Handle("/blobstore", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tenant := r.URL.Query().Get("tenant")
if tenant == "" {
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
appID := r.URL.Query().Get("appId")
if tenant == "" {
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
server, err := getBlobStoreStoreServer(cacheSize, cacheTTL, tenant, appID, blobstoreDSNPattern)
if err != nil {
logger.Error(r.Context(), "could not retrieve blob store server", logger.E(errors.WithStack(err)), logger.F("tenant", tenant))
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
server.ServeHTTP(w, r)
}))
router.Handle("/documentstore", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tenant := r.URL.Query().Get("tenant")
if tenant == "" {
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
appID := r.URL.Query().Get("appId")
if tenant == "" {
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
server, err := getDocumentStoreServer(cacheSize, cacheTTL, tenant, appID, documentstoreDSNPattern)
if err != nil {
logger.Error(r.Context(), "could not retrieve document store server", logger.E(errors.WithStack(err)), logger.F("tenant", tenant))
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
server.ServeHTTP(w, r)
}))
if err := http.ListenAndServe(addr, router); err != nil {
return errors.WithStack(err)
}
return nil
},
}
}
var (
documentStoreCache *expirable.LRU[string, *rpc.Server]
initDocumentStoreCache sync.Once
)
func getDocumentStoreServer(cacheSize int, cacheTTL time.Duration, tenant, appID, dsnPattern string) (*rpc.Server, error) {
initDocumentStoreCache.Do(func() {
documentStoreCache = expirable.NewLRU[string, *rpc.Server](cacheSize, nil, cacheTTL)
})
key := fmt.Sprintf("%s:%s", tenant, appID)
documentStoreServer, _ := documentStoreCache.Get(key)
if documentStoreServer != nil {
return documentStoreServer, nil
}
dsn := strings.ReplaceAll(dsnPattern, "%TENANT%", tenant)
dsn = strings.ReplaceAll(dsn, "%APPID%", appID)
documentStore, err := registry.NewDocumentStore(dsn)
if err != nil {
return nil, errors.WithStack(err)
}
documentStoreServer = server.NewDocumentStoreServer(documentStore)
documentStoreCache.Add(key, documentStoreServer)
return documentStoreServer, nil
}
var (
blobStoreCache *expirable.LRU[string, *rpc.Server]
initBlobStoreCache sync.Once
)
func getBlobStoreStoreServer(cacheSize int, cacheTTL time.Duration, tenant, appID, dsnPattern string) (*rpc.Server, error) {
initBlobStoreCache.Do(func() {
blobStoreCache = expirable.NewLRU[string, *rpc.Server](cacheSize, nil, cacheTTL)
})
key := fmt.Sprintf("%s:%s", tenant, appID)
blobStoreServer, _ := blobStoreCache.Get(key)
if blobStoreServer != nil {
return blobStoreServer, nil
}
dsn := strings.ReplaceAll(dsnPattern, "%TENANT%", tenant)
dsn = strings.ReplaceAll(dsn, "%APPID%", appID)
blobStore, err := registry.NewBlobStore(dsn)
if err != nil {
return nil, errors.WithStack(err)
}
blobStoreServer = server.NewBlobStoreServer(blobStore)
blobStoreCache.Add(key, blobStoreServer)
return blobStoreServer, nil
}