edge/cmd/cli/command/app/run.go
William Petit 32f04af138
All checks were successful
arcad/edge/pipeline/head This commit looks good
feat(storage): improve caching in cache driver
ref #20
2023-11-30 19:09:51 +01:00

492 lines
12 KiB
Go

package app
import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"forge.cadoles.com/arcad/edge/pkg/app"
"forge.cadoles.com/arcad/edge/pkg/bus"
"forge.cadoles.com/arcad/edge/pkg/bus/memory"
appHTTP "forge.cadoles.com/arcad/edge/pkg/http"
"forge.cadoles.com/arcad/edge/pkg/jwtutil"
"forge.cadoles.com/arcad/edge/pkg/module"
appModule "forge.cadoles.com/arcad/edge/pkg/module/app"
appModuleMemory "forge.cadoles.com/arcad/edge/pkg/module/app/memory"
authModule "forge.cadoles.com/arcad/edge/pkg/module/auth"
authHTTP "forge.cadoles.com/arcad/edge/pkg/module/auth/http"
authModuleMiddleware "forge.cadoles.com/arcad/edge/pkg/module/auth/middleware"
blobModule "forge.cadoles.com/arcad/edge/pkg/module/blob"
castModule "forge.cadoles.com/arcad/edge/pkg/module/cast"
fetchModule "forge.cadoles.com/arcad/edge/pkg/module/fetch"
netModule "forge.cadoles.com/arcad/edge/pkg/module/net"
rpcModule "forge.cadoles.com/arcad/edge/pkg/module/rpc"
shareModule "forge.cadoles.com/arcad/edge/pkg/module/share"
"forge.cadoles.com/arcad/edge/pkg/storage"
"gitlab.com/wpetit/goweb/logger"
"forge.cadoles.com/arcad/edge/pkg/bundle"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/lestrrat-go/jwx/v2/jwa"
"github.com/lestrrat-go/jwx/v2/jwk"
"github.com/pkg/errors"
"github.com/urfave/cli/v2"
_ "embed"
_ "forge.cadoles.com/arcad/edge/pkg/module/auth/http/passwd/argon2id"
_ "forge.cadoles.com/arcad/edge/pkg/module/auth/http/passwd/plain"
"forge.cadoles.com/arcad/edge/pkg/storage/driver"
// Register storage drivers
_ "forge.cadoles.com/arcad/edge/pkg/storage/driver/cache"
_ "forge.cadoles.com/arcad/edge/pkg/storage/driver/rpc"
_ "forge.cadoles.com/arcad/edge/pkg/storage/driver/sqlite"
"forge.cadoles.com/arcad/edge/pkg/storage/share"
)
var dummySecret = []byte("not_so_secret")
func RunCommand() *cli.Command {
return &cli.Command{
Name: "run",
Usage: "Run the specified app bundle",
Flags: []cli.Flag{
&cli.StringSliceFlag{
Name: "path",
Usage: "use `PATH` as app bundle (zipped bundle or directory)",
Aliases: []string{"p"},
Value: cli.NewStringSlice("."),
},
&cli.StringFlag{
Name: "address",
Usage: "use `ADDRESS` as http server base listening address",
Aliases: []string{"a"},
Value: ":8080",
},
&cli.StringFlag{
Name: "log-format",
Usage: "use `LOG-FORMAT` ('json' or 'human')",
Value: "human",
},
&cli.IntFlag{
Name: "log-level",
Usage: "use `LOG-LEVEL` (0: debug -> 5: fatal)",
Value: 0,
},
&cli.StringFlag{
Name: "blobstore-dsn",
Usage: "use `DSN` for blob storage",
EnvVars: []string{"EDGE_BLOBSTORE_DSN"},
Value: "sqlite://.edge/%APPID%/data.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=60000",
},
&cli.StringFlag{
Name: "documentstore-dsn",
Usage: "use `DSN` for document storage",
EnvVars: []string{"EDGE_DOCUMENTSTORE_DSN"},
Value: "sqlite://.edge/%APPID%/data.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=60000",
},
&cli.StringFlag{
Name: "sharestore-dsn",
Usage: "use `DSN` for share storage",
EnvVars: []string{"EDGE_SHARESTORE_DSN"},
Value: "sqlite://.edge/share.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=60000",
},
&cli.StringFlag{
Name: "accounts-file",
Usage: "use `FILE` as local accounts",
Value: ".edge/%APPID%/accounts.json",
},
&cli.Int64Flag{
Name: "max-upload-size",
Usage: "use `MAX-UPLOAD-SIZE` as blob max upload size",
Value: 128 << (10 * 2), // 128Mb
},
},
Action: func(ctx *cli.Context) error {
address := ctx.String("address")
paths := ctx.StringSlice("path")
logFormat := ctx.String("log-format")
logLevel := ctx.Int("log-level")
blobstoreDSN := ctx.String("blobstore-dsn")
documentstoreDSN := ctx.String("documentstore-dsn")
shareStoreDSN := ctx.String("sharestore-dsn")
accountsFile := ctx.String("accounts-file")
maxUploadSize := ctx.Int64("max-upload-size")
logger.SetFormat(logger.Format(logFormat))
logger.SetLevel(logger.Level(logLevel))
cmdCtx := ctx.Context
host, portStr, err := net.SplitHostPort(address)
if err != nil {
return errors.WithStack(err)
}
port, err := strconv.ParseUint(portStr, 10, 32)
if err != nil {
return errors.WithStack(err)
}
manifests := make([]*app.Manifest, len(paths))
for idx, pth := range paths {
bdl, err := bundle.FromPath(pth)
if err != nil {
return errors.WithStack(err)
}
manifest, err := app.LoadManifest(bdl)
if err != nil {
return errors.WithStack(err)
}
manifests[idx] = manifest
}
var wg sync.WaitGroup
for idx, p := range paths {
wg.Add(1)
go func(path string, basePort uint64, appIndex int) {
defer wg.Done()
port := basePort + uint64(appIndex)
address := fmt.Sprintf("%s:%d", host, port)
appsRepository := newAppRepository(host, basePort, manifests...)
appCtx := logger.With(cmdCtx, logger.F("address", address))
if err := runApp(appCtx, path, address, documentstoreDSN, blobstoreDSN, shareStoreDSN, accountsFile, appsRepository, maxUploadSize); err != nil {
logger.Error(appCtx, "could not run app", logger.CapturedE(errors.WithStack(err)))
}
}(p, port, idx)
}
wg.Wait()
return nil
},
}
}
func runApp(ctx context.Context, path, address, documentStoreDSN, blobStoreDSN, shareStoreDSN, accountsFile string, appRepository appModule.Repository, maxUploadSize int64) error {
absPath, err := filepath.Abs(path)
if err != nil {
return errors.Wrapf(err, "could not resolve path '%s'", path)
}
logger.Info(ctx, "opening app bundle", logger.F("path", absPath))
bundle, err := bundle.FromPath(path)
if err != nil {
return errors.Wrapf(err, "could not open path '%s' as an app bundle", path)
}
manifest, err := app.LoadManifest(bundle)
if err != nil {
return errors.Wrap(err, "could not load manifest from app bundle")
}
if valid, err := manifest.Validate(manifestMetadataValidators...); !valid {
return errors.Wrap(err, "invalid app manifest")
}
ctx = logger.With(ctx, logger.F("appID", manifest.ID))
// Add auth handler
key, err := jwtutil.NewSymmetricKey(dummySecret)
if err != nil {
return errors.WithStack(err)
}
deps := &moduleDeps{}
funcs := []ModuleDepFunc{
initAppID(manifest),
initMemoryBus,
initDatastores(documentStoreDSN, blobStoreDSN, shareStoreDSN, manifest.ID),
initAccounts(accountsFile, manifest.ID),
initAppRepository(appRepository),
}
for _, fn := range funcs {
if err := fn(deps); err != nil {
return errors.WithStack(err)
}
}
handler := appHTTP.NewHandler(
appHTTP.WithBus(deps.Bus),
appHTTP.WithServerModules(getServerModules(deps)...),
appHTTP.WithHTTPMounts(
appModule.Mount(appRepository),
authModule.Mount(
authHTTP.NewLocalHandler(
key,
jwa.HS256,
authHTTP.WithRoutePrefix("/auth"),
authHTTP.WithAccounts(deps.Accounts...),
),
authModule.WithJWT(func() (jwk.Set, error) {
return jwtutil.NewSymmetricKeySet(dummySecret)
}),
),
blobModule.Mount(maxUploadSize), // 10Mb,
fetchModule.Mount(),
),
appHTTP.WithHTTPMiddlewares(
authModuleMiddleware.AnonymousUser(key, jwa.HS256),
),
)
if err := handler.Load(ctx, bundle); err != nil {
return errors.Wrap(err, "could not load app bundle")
}
router := chi.NewRouter()
router.Use(middleware.Logger)
router.Use(middleware.Compress(5))
// Add app handler
router.Handle("/*", handler)
logger.Info(ctx, "listening", logger.F("address", address))
if err := http.ListenAndServe(address, router); err != nil {
return errors.WithStack(err)
}
return nil
}
type moduleDeps struct {
AppID app.ID
Bus bus.Bus
DocumentStore storage.DocumentStore
BlobStore storage.BlobStore
AppRepository appModule.Repository
ShareStore share.Store
Accounts []authHTTP.LocalAccount
}
type ModuleDepFunc func(*moduleDeps) error
func getServerModules(deps *moduleDeps) []app.ServerModuleFactory {
return []app.ServerModuleFactory{
module.LifecycleModuleFactory(),
module.ContextModuleFactory(),
module.ConsoleModuleFactory(),
castModule.CastModuleFactory(),
netModule.ModuleFactory(deps.Bus),
rpcModule.ModuleFactory(deps.Bus),
module.StoreModuleFactory(deps.DocumentStore),
blobModule.ModuleFactory(deps.Bus, deps.BlobStore),
authModule.ModuleFactory(
authModule.WithJWT(func() (jwk.Set, error) {
return jwtutil.NewSymmetricKeySet(dummySecret)
}),
),
appModule.ModuleFactory(deps.AppRepository),
fetchModule.ModuleFactory(deps.Bus),
shareModule.ModuleFactory(deps.AppID, deps.ShareStore),
}
}
func ensureDir(path string) error {
if err := os.MkdirAll(filepath.Dir(path), os.ModePerm); err != nil {
return errors.WithStack(err)
}
return nil
}
func injectAppID(str string, appID app.ID) string {
return strings.ReplaceAll(str, "%APPID%", string(appID))
}
//go:embed default-accounts.json
var defaultAccounts []byte
func loadLocalAccounts(path string) ([]authHTTP.LocalAccount, error) {
if err := ensureDir(path); err != nil {
return nil, errors.WithStack(err)
}
data, err := os.ReadFile(path)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
if err := os.WriteFile(path, defaultAccounts, 0o640); err != nil {
return nil, errors.WithStack(err)
}
data = defaultAccounts
} else {
return nil, errors.WithStack(err)
}
}
var accounts []authHTTP.LocalAccount
if err := json.Unmarshal(data, &accounts); err != nil {
return nil, errors.WithStack(err)
}
return accounts, nil
}
func findMatchingDeviceAddress(ctx context.Context, from string, defaultAddr string) (string, error) {
if from == "" {
return defaultAddr, nil
}
fromIP := net.ParseIP(from)
if fromIP == nil {
return defaultAddr, nil
}
ifaces, err := net.Interfaces()
if err != nil {
return "", errors.WithStack(err)
}
for _, ifa := range ifaces {
addrs, err := ifa.Addrs()
if err != nil {
logger.Error(
ctx, "could not retrieve iface adresses",
logger.CapturedE(errors.WithStack(err)), logger.F("iface", ifa.Name),
)
continue
}
for _, addr := range addrs {
ip, network, err := net.ParseCIDR(addr.String())
if err != nil {
logger.Error(
ctx, "could not parse address",
logger.CapturedE(errors.WithStack(err)), logger.F("address", addr.String()),
)
continue
}
if !network.Contains(fromIP) {
continue
}
if ip.To4() == nil {
continue
}
return ip.To4().String(), nil
}
}
return defaultAddr, nil
}
func newAppRepository(host string, basePort uint64, manifests ...*app.Manifest) *appModuleMemory.Repository {
if host == "" {
host = "127.0.0.1"
}
return appModuleMemory.NewRepository(
func(ctx context.Context, id app.ID, from string) (string, error) {
appIndex := 0
for i := 0; i < len(manifests); i++ {
if manifests[i].ID == id {
appIndex = i
break
}
}
addr, err := findMatchingDeviceAddress(ctx, from, host)
if err != nil {
return "", errors.WithStack(err)
}
return fmt.Sprintf("http://%s:%d", addr, int(basePort)+appIndex), nil
},
manifests...,
)
}
func initAppID(manifest *app.Manifest) ModuleDepFunc {
return func(deps *moduleDeps) error {
deps.AppID = manifest.ID
return nil
}
}
func initAppRepository(repo appModule.Repository) ModuleDepFunc {
return func(deps *moduleDeps) error {
deps.AppRepository = repo
return nil
}
}
func initMemoryBus(deps *moduleDeps) error {
deps.Bus = memory.NewBus()
return nil
}
func initDatastores(documentStoreDSN, blobStoreDSN, shareStoreDSN string, appID app.ID) ModuleDepFunc {
return func(deps *moduleDeps) error {
documentStoreDSN = injectAppID(documentStoreDSN, appID)
documentStore, err := driver.NewDocumentStore(documentStoreDSN)
if err != nil {
return errors.WithStack(err)
}
deps.DocumentStore = documentStore
blobStoreDSN = injectAppID(blobStoreDSN, appID)
blobStore, err := driver.NewBlobStore(blobStoreDSN)
if err != nil {
return errors.WithStack(err)
}
deps.BlobStore = blobStore
shareStore, err := driver.NewShareStore(shareStoreDSN)
if err != nil {
return errors.WithStack(err)
}
deps.ShareStore = shareStore
return nil
}
}
func initAccounts(accountsFile string, appID app.ID) ModuleDepFunc {
return func(deps *moduleDeps) error {
accountsFile = injectAppID(accountsFile, appID)
accounts, err := loadLocalAccounts(accountsFile)
if err != nil {
return errors.Wrap(err, "could not load local accounts")
}
deps.Accounts = accounts
return nil
}
}