edge/cmd/cli/command/app/run.go
William Petit 19539ec34a
All checks were successful
arcad/edge/pipeline/pr-master This commit looks good
feat(storage): rpc based implementation
2023-09-26 22:28:25 -06:00

512 lines
12 KiB
Go

package app
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"github.com/keegancsmith/rpc"
"forge.cadoles.com/arcad/edge/pkg/app"
"forge.cadoles.com/arcad/edge/pkg/bus"
"forge.cadoles.com/arcad/edge/pkg/bus/memory"
appHTTP "forge.cadoles.com/arcad/edge/pkg/http"
"forge.cadoles.com/arcad/edge/pkg/module"
appModule "forge.cadoles.com/arcad/edge/pkg/module/app"
appModuleMemory "forge.cadoles.com/arcad/edge/pkg/module/app/memory"
authModule "forge.cadoles.com/arcad/edge/pkg/module/auth"
authHTTP "forge.cadoles.com/arcad/edge/pkg/module/auth/http"
authModuleMiddleware "forge.cadoles.com/arcad/edge/pkg/module/auth/middleware"
"forge.cadoles.com/arcad/edge/pkg/module/blob"
"forge.cadoles.com/arcad/edge/pkg/module/cast"
"forge.cadoles.com/arcad/edge/pkg/module/fetch"
netModule "forge.cadoles.com/arcad/edge/pkg/module/net"
shareModule "forge.cadoles.com/arcad/edge/pkg/module/share"
shareSqlite "forge.cadoles.com/arcad/edge/pkg/module/share/sqlite"
"forge.cadoles.com/arcad/edge/pkg/storage"
"forge.cadoles.com/arcad/edge/pkg/storage/rpc/client"
"gitlab.com/wpetit/goweb/logger"
"forge.cadoles.com/arcad/edge/pkg/bundle"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/lestrrat-go/jwx/v2/jwa"
"github.com/lestrrat-go/jwx/v2/jwk"
"github.com/pkg/errors"
"github.com/urfave/cli/v2"
_ "embed"
_ "forge.cadoles.com/arcad/edge/pkg/module/auth/http/passwd/argon2id"
_ "forge.cadoles.com/arcad/edge/pkg/module/auth/http/passwd/plain"
)
func RunCommand() *cli.Command {
return &cli.Command{
Name: "run",
Usage: "Run the specified app bundle",
Flags: []cli.Flag{
&cli.StringSliceFlag{
Name: "path",
Usage: "use `PATH` as app bundle (zipped bundle or directory)",
Aliases: []string{"p"},
Value: cli.NewStringSlice("."),
},
&cli.StringFlag{
Name: "address",
Usage: "use `ADDRESS` as http server base listening address",
Aliases: []string{"a"},
Value: ":8080",
},
&cli.StringFlag{
Name: "log-format",
Usage: "use `LOG-FORMAT` ('json' or 'human')",
Value: "human",
},
&cli.IntFlag{
Name: "log-level",
Usage: "use `LOG-LEVEL` (0: debug -> 5: fatal)",
Value: 0,
},
&cli.StringFlag{
Name: "storage-file",
Usage: "use `FILE` for SQLite storage database",
Value: ".edge/%APPID%/data.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=60000",
},
&cli.StringFlag{
Name: "shared-resources-file",
Usage: "use `FILE` for SQLite shared resources database",
Value: ".edge/shared-resources.sqlite?_pragma=foreign_keys(1)&_pragma=busy_timeout=60000",
},
&cli.StringFlag{
Name: "accounts-file",
Usage: "use `FILE` as local accounts",
Value: ".edge/%APPID%/accounts.json",
},
},
Action: func(ctx *cli.Context) error {
address := ctx.String("address")
paths := ctx.StringSlice("path")
logFormat := ctx.String("log-format")
logLevel := ctx.Int("log-level")
storageFile := ctx.String("storage-file")
accountsFile := ctx.String("accounts-file")
sharedResourcesFile := ctx.String("shared-resources-file")
logger.SetFormat(logger.Format(logFormat))
logger.SetLevel(logger.Level(logLevel))
cmdCtx := ctx.Context
host, portStr, err := net.SplitHostPort(address)
if err != nil {
return errors.WithStack(err)
}
port, err := strconv.ParseUint(portStr, 10, 32)
if err != nil {
return errors.WithStack(err)
}
manifests := make([]*app.Manifest, len(paths))
for idx, pth := range paths {
bdl, err := bundle.FromPath(pth)
if err != nil {
return errors.WithStack(err)
}
manifest, err := app.LoadManifest(bdl)
if err != nil {
return errors.WithStack(err)
}
manifests[idx] = manifest
}
var wg sync.WaitGroup
for idx, p := range paths {
wg.Add(1)
go func(path string, basePort uint64, appIndex int) {
defer wg.Done()
port := basePort + uint64(appIndex)
address := fmt.Sprintf("%s:%d", host, port)
appsRepository := newAppRepository(host, basePort, manifests...)
appCtx := logger.With(cmdCtx, logger.F("address", address))
if err := runApp(appCtx, path, address, storageFile, accountsFile, appsRepository, sharedResourcesFile); err != nil {
logger.Error(appCtx, "could not run app", logger.E(errors.WithStack(err)))
}
}(p, port, idx)
}
wg.Wait()
return nil
},
}
}
func runApp(ctx context.Context, path string, address string, storageFile string, accountsFile string, appRepository appModule.Repository, sharedResourcesFile string) error {
absPath, err := filepath.Abs(path)
if err != nil {
return errors.Wrapf(err, "could not resolve path '%s'", path)
}
logger.Info(ctx, "opening app bundle", logger.F("path", absPath))
bundle, err := bundle.FromPath(path)
if err != nil {
return errors.Wrapf(err, "could not open path '%s' as an app bundle", path)
}
manifest, err := app.LoadManifest(bundle)
if err != nil {
return errors.Wrap(err, "could not load manifest from app bundle")
}
if valid, err := manifest.Validate(manifestMetadataValidators...); !valid {
return errors.Wrap(err, "invalid app manifest")
}
ctx = logger.With(ctx, logger.F("appID", manifest.ID))
// Add auth handler
key, err := dummyKey()
if err != nil {
return errors.WithStack(err)
}
deps := &moduleDeps{}
funcs := []ModuleDepFunc{
initMemoryBus,
initDatastores(storageFile, manifest.ID),
initAccounts(accountsFile, manifest.ID),
initShareRepository(sharedResourcesFile),
initAppRepository(appRepository),
}
for _, fn := range funcs {
if err := fn(deps); err != nil {
return errors.WithStack(err)
}
}
handler := appHTTP.NewHandler(
appHTTP.WithBus(deps.Bus),
appHTTP.WithServerModules(getServerModules(deps)...),
appHTTP.WithHTTPMounts(
appModule.Mount(appRepository),
authModule.Mount(
authHTTP.NewLocalHandler(
jwa.HS256, key,
authHTTP.WithRoutePrefix("/auth"),
authHTTP.WithAccounts(deps.Accounts...),
),
authModule.WithJWT(dummyKeySet),
),
),
appHTTP.WithHTTPMiddlewares(
authModuleMiddleware.AnonymousUser(
jwa.HS256, key,
),
),
)
if err := handler.Load(bundle); err != nil {
return errors.Wrap(err, "could not load app bundle")
}
router := chi.NewRouter()
router.Use(middleware.Logger)
router.Use(middleware.Compress(5))
// Add app handler
router.Handle("/*", handler)
logger.Info(ctx, "listening", logger.F("address", address))
if err := http.ListenAndServe(address, router); err != nil {
return errors.WithStack(err)
}
return nil
}
type moduleDeps struct {
AppID app.ID
Bus bus.Bus
DocumentStore storage.DocumentStore
BlobStore storage.BlobStore
AppRepository appModule.Repository
ShareRepository shareModule.Repository
Accounts []authHTTP.LocalAccount
}
type ModuleDepFunc func(*moduleDeps) error
func getServerModules(deps *moduleDeps) []app.ServerModuleFactory {
return []app.ServerModuleFactory{
module.LifecycleModuleFactory(),
module.ContextModuleFactory(),
module.ConsoleModuleFactory(),
cast.CastModuleFactory(),
netModule.ModuleFactory(deps.Bus),
module.RPCModuleFactory(deps.Bus),
module.StoreModuleFactory(deps.DocumentStore),
blob.ModuleFactory(deps.Bus, deps.BlobStore),
authModule.ModuleFactory(
authModule.WithJWT(dummyKeySet),
),
appModule.ModuleFactory(deps.AppRepository),
fetch.ModuleFactory(deps.Bus),
shareModule.ModuleFactory(deps.AppID, deps.ShareRepository),
}
}
var dummySecret = []byte("not_so_secret")
func dummyKey() (jwk.Key, error) {
key, err := jwk.FromRaw(dummySecret)
if err != nil {
return nil, errors.WithStack(err)
}
return key, nil
}
func dummyKeySet() (jwk.Set, error) {
key, err := dummyKey()
if err != nil {
return nil, errors.WithStack(err)
}
if err := key.Set(jwk.AlgorithmKey, jwa.HS256); err != nil {
return nil, errors.WithStack(err)
}
set := jwk.NewSet()
if err := set.AddKey(key); err != nil {
return nil, errors.WithStack(err)
}
return set, nil
}
func ensureDir(path string) error {
if err := os.MkdirAll(filepath.Dir(path), os.ModePerm); err != nil {
return errors.WithStack(err)
}
return nil
}
func injectAppID(str string, appID app.ID) string {
return strings.ReplaceAll(str, "%APPID%", string(appID))
}
//go:embed default-accounts.json
var defaultAccounts []byte
func loadLocalAccounts(path string) ([]authHTTP.LocalAccount, error) {
if err := ensureDir(path); err != nil {
return nil, errors.WithStack(err)
}
data, err := ioutil.ReadFile(path)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
if err := ioutil.WriteFile(path, defaultAccounts, 0o640); err != nil {
return nil, errors.WithStack(err)
}
data = defaultAccounts
} else {
return nil, errors.WithStack(err)
}
}
var accounts []authHTTP.LocalAccount
if err := json.Unmarshal(data, &accounts); err != nil {
return nil, errors.WithStack(err)
}
return accounts, nil
}
func findMatchingDeviceAddress(ctx context.Context, from string, defaultAddr string) (string, error) {
if from == "" {
return defaultAddr, nil
}
fromIP := net.ParseIP(from)
if fromIP == nil {
return defaultAddr, nil
}
ifaces, err := net.Interfaces()
if err != nil {
return "", errors.WithStack(err)
}
for _, ifa := range ifaces {
addrs, err := ifa.Addrs()
if err != nil {
logger.Error(
ctx, "could not retrieve iface adresses",
logger.E(errors.WithStack(err)), logger.F("iface", ifa.Name),
)
continue
}
for _, addr := range addrs {
ip, network, err := net.ParseCIDR(addr.String())
if err != nil {
logger.Error(
ctx, "could not parse address",
logger.E(errors.WithStack(err)), logger.F("address", addr.String()),
)
continue
}
if !network.Contains(fromIP) {
continue
}
if ip.To4() == nil {
continue
}
return ip.To4().String(), nil
}
}
return defaultAddr, nil
}
func newAppRepository(host string, basePort uint64, manifests ...*app.Manifest) *appModuleMemory.Repository {
if host == "" {
host = "127.0.0.1"
}
return appModuleMemory.NewRepository(
func(ctx context.Context, id app.ID, from string) (string, error) {
appIndex := 0
for i := 0; i < len(manifests); i++ {
if manifests[i].ID == id {
appIndex = i
break
}
}
addr, err := findMatchingDeviceAddress(ctx, from, host)
if err != nil {
return "", errors.WithStack(err)
}
return fmt.Sprintf("http://%s:%d", addr, int(basePort)+appIndex), nil
},
manifests...,
)
}
func initAppRepository(repo appModule.Repository) ModuleDepFunc {
return func(deps *moduleDeps) error {
deps.AppRepository = repo
return nil
}
}
func initMemoryBus(deps *moduleDeps) error {
deps.Bus = memory.NewBus()
return nil
}
func initDatastores(storageFile string, appID app.ID) ModuleDepFunc {
return func(deps *moduleDeps) error {
// storageFile = injectAppID(storageFile, appID)
blobStoreClient, err := rpc.DialHTTPPath(
"tcp",
"localhost:3001",
injectAppID("/blobstore?tenant=%APPID%", appID),
)
if err != nil {
return errors.WithStack(err)
}
deps.BlobStore = client.NewBlobStore(blobStoreClient)
documentStoreClient, err := rpc.DialHTTPPath(
"tcp",
"localhost:3001",
injectAppID("/documentstore?tenant=%APPID%", appID),
)
if err != nil {
return errors.WithStack(err)
}
deps.DocumentStore = client.NewDocumentStore(documentStoreClient)
// if err := ensureDir(storageFile); err != nil {
// return errors.WithStack(err)
// }
// db, err := storageSqlite.Open(storageFile)
// if err != nil {
// return errors.WithStack(err)
// }
// deps.DocumentStore = storageSqlite.NewDocumentStoreWithDB(db)
// deps.BlobStore = storageSqlite.NewBlobStoreWithDB(db)
return nil
}
}
func initAccounts(accountsFile string, appID app.ID) ModuleDepFunc {
return func(deps *moduleDeps) error {
accountsFile = injectAppID(accountsFile, appID)
accounts, err := loadLocalAccounts(accountsFile)
if err != nil {
return errors.Wrap(err, "could not load local accounts")
}
deps.Accounts = accounts
return nil
}
}
func initShareRepository(shareRepositoryFile string) ModuleDepFunc {
return func(deps *moduleDeps) error {
if err := ensureDir(shareRepositoryFile); err != nil {
return errors.WithStack(err)
}
repo := shareSqlite.NewRepository(shareRepositoryFile)
deps.ShareRepository = repo
return nil
}
}