package command import ( "fmt" "net/http" "strings" "sync" "time" "github.com/hashicorp/golang-lru/v2/expirable" "github.com/keegancsmith/rpc" "gitlab.com/wpetit/goweb/logger" "forge.cadoles.com/arcad/edge/pkg/storage/driver" "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc/server" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" "github.com/pkg/errors" "github.com/urfave/cli/v2" // Register storage drivers _ "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc" _ "forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite" ) func Run() *cli.Command { return &cli.Command{ Name: "run", Usage: "Run server", Flags: []cli.Flag{ &cli.StringFlag{ Name: "address", Aliases: []string{"addr"}, Value: ":3001", }, &cli.StringFlag{ Name: "blobstore-dsn-pattern", EnvVars: []string{"STORAGE_SERVER_BLOBSTORE_DSN_PATTERN"}, Value: fmt.Sprintf("sqlite://data/%%TENANT%%/%%APPID%%/blobstore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", (60 * time.Second).Milliseconds()), }, &cli.StringFlag{ Name: "documentstore-dsn-pattern", EnvVars: []string{"STORAGE_SERVER_DOCUMENTSTORE_DSN_PATTERN"}, Value: fmt.Sprintf("sqlite://data/%%TENANT%%/%%APPID%%/documentstore.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=%d", (60 * time.Second).Milliseconds()), }, &cli.DurationFlag{ Name: "cache-ttl", EnvVars: []string{"STORAGE_SERVER_CACHE_TTL"}, Value: time.Hour, }, &cli.IntFlag{ Name: "cache-size", EnvVars: []string{"STORAGE_SERVER_CACHE_SIZE"}, Value: 32, }, }, Action: func(ctx *cli.Context) error { addr := ctx.String("address") blobstoreDSNPattern := ctx.String("blobstore-dsn-pattern") documentstoreDSNPattern := ctx.String("documentstore-dsn-pattern") cacheSize := ctx.Int("cache-size") cacheTTL := ctx.Duration("cache-ttl") router := chi.NewRouter() router.Use(middleware.RealIP) router.Use(middleware.Logger) router.Handle("/blobstore", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { tenant := r.URL.Query().Get("tenant") if tenant == "" { http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) return } appID := r.URL.Query().Get("appId") if tenant == "" { http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) return } server, err := getBlobStoreStoreServer(cacheSize, cacheTTL, tenant, appID, blobstoreDSNPattern) if err != nil { logger.Error(r.Context(), "could not retrieve blob store server", logger.E(errors.WithStack(err)), logger.F("tenant", tenant)) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) return } server.ServeHTTP(w, r) })) router.Handle("/documentstore", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { tenant := r.URL.Query().Get("tenant") if tenant == "" { http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) return } appID := r.URL.Query().Get("appId") if tenant == "" { http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) return } server, err := getDocumentStoreServer(cacheSize, cacheTTL, tenant, appID, documentstoreDSNPattern) if err != nil { logger.Error(r.Context(), "could not retrieve document store server", logger.E(errors.WithStack(err)), logger.F("tenant", tenant)) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) return } server.ServeHTTP(w, r) })) if err := http.ListenAndServe(addr, router); err != nil { return errors.WithStack(err) } return nil }, } } var ( documentStoreCache *expirable.LRU[string, *rpc.Server] initDocumentStoreCache sync.Once ) func getDocumentStoreServer(cacheSize int, cacheTTL time.Duration, tenant, appID, dsnPattern string) (*rpc.Server, error) { initDocumentStoreCache.Do(func() { documentStoreCache = expirable.NewLRU[string, *rpc.Server](cacheSize, nil, cacheTTL) }) key := fmt.Sprintf("%s:%s", tenant, appID) documentStoreServer, _ := documentStoreCache.Get(key) if documentStoreServer != nil { return documentStoreServer, nil } dsn := strings.ReplaceAll(dsnPattern, "%TENANT%", tenant) dsn = strings.ReplaceAll(dsn, "%APPID%", appID) documentStore, err := driver.NewDocumentStore(dsn) if err != nil { return nil, errors.WithStack(err) } documentStoreServer = server.NewDocumentStoreServer(documentStore) documentStoreCache.Add(key, documentStoreServer) return documentStoreServer, nil } var ( blobStoreCache *expirable.LRU[string, *rpc.Server] initBlobStoreCache sync.Once ) func getBlobStoreStoreServer(cacheSize int, cacheTTL time.Duration, tenant, appID, dsnPattern string) (*rpc.Server, error) { initBlobStoreCache.Do(func() { blobStoreCache = expirable.NewLRU[string, *rpc.Server](cacheSize, nil, cacheTTL) }) key := fmt.Sprintf("%s:%s", tenant, appID) blobStoreServer, _ := blobStoreCache.Get(key) if blobStoreServer != nil { return blobStoreServer, nil } dsn := strings.ReplaceAll(dsnPattern, "%TENANT%", tenant) dsn = strings.ReplaceAll(dsn, "%APPID%", appID) blobStore, err := driver.NewBlobStore(dsn) if err != nil { return nil, errors.WithStack(err) } blobStoreServer = server.NewBlobStoreServer(blobStore) blobStoreCache.Add(key, blobStoreServer) return blobStoreServer, nil }