feat(controller,app): add edge apps controller
This commit is contained in:
286
internal/agent/controller/app/controller.go
Normal file
286
internal/agent/controller/app/controller.go
Normal file
@ -0,0 +1,286 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"forge.cadoles.com/Cadoles/emissary/internal/agent"
|
||||
"forge.cadoles.com/Cadoles/emissary/internal/spec/app"
|
||||
"forge.cadoles.com/arcad/edge/pkg/bundle"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/sqlite"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
type Controller struct {
|
||||
currentSpecRevision int
|
||||
client *http.Client
|
||||
downloadDir string
|
||||
dataDir string
|
||||
servers map[string]*Server
|
||||
}
|
||||
|
||||
// Name implements node.Controller.
|
||||
func (c *Controller) Name() string {
|
||||
return "app-controller"
|
||||
}
|
||||
|
||||
// Reconcile implements node.Controller.
|
||||
func (c *Controller) Reconcile(ctx context.Context, state *agent.State) error {
|
||||
appSpec := app.NewSpec()
|
||||
|
||||
if err := state.GetSpec(app.NameApp, appSpec); err != nil {
|
||||
if errors.Is(err, agent.ErrSpecNotFound) {
|
||||
logger.Info(ctx, "could not find app spec, stopping all remaining apps")
|
||||
|
||||
c.stopAllApps(ctx)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
logger.Info(ctx, "retrieved spec", logger.F("spec", appSpec.SpecName()), logger.F("revision", appSpec.SpecRevision()))
|
||||
|
||||
if c.currentSpecRevision == appSpec.SpecRevision() {
|
||||
logger.Info(ctx, "spec revision did not change, doing nothing")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
c.updateApps(ctx, appSpec)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) stopAllApps(ctx context.Context) {
|
||||
for appID, server := range c.servers {
|
||||
logger.Info(ctx, "stopping app", logger.F("appID", appID))
|
||||
|
||||
if err := server.Stop(); err != nil {
|
||||
logger.Error(
|
||||
ctx, "error while stopping app",
|
||||
logger.F("appID", appID),
|
||||
logger.E(errors.WithStack(err)),
|
||||
)
|
||||
|
||||
delete(c.servers, appID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) updateApps(ctx context.Context, spec *app.Spec) {
|
||||
hadError := false
|
||||
|
||||
// Stop and remove obsolete apps
|
||||
for appID, server := range c.servers {
|
||||
if _, exists := spec.Apps[appID]; exists {
|
||||
continue
|
||||
}
|
||||
|
||||
logger.Info(ctx, "stopping app", logger.F("appID", appID))
|
||||
|
||||
if err := server.Stop(); err != nil {
|
||||
logger.Error(
|
||||
ctx, "error while stopping app",
|
||||
logger.F("gatewayID", appID),
|
||||
logger.E(errors.WithStack(err)),
|
||||
)
|
||||
|
||||
delete(c.servers, appID)
|
||||
|
||||
hadError = true
|
||||
}
|
||||
}
|
||||
|
||||
// (Re)start apps
|
||||
for appID, appSpec := range spec.Apps {
|
||||
appCtx := logger.With(ctx, logger.F("appID", appID))
|
||||
|
||||
if err := c.updateApp(ctx, appID, appSpec); err != nil {
|
||||
logger.Error(appCtx, "could not update app", logger.E(errors.WithStack(err)))
|
||||
|
||||
hadError = true
|
||||
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if !hadError {
|
||||
c.currentSpecRevision = spec.SpecRevision()
|
||||
logger.Info(ctx, "updating current spec revision", logger.F("revision", c.currentSpecRevision))
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) updateApp(ctx context.Context, appID string, appSpec app.AppEntry) error {
|
||||
bundle, sha256sum, err := c.ensureAppBundle(ctx, appID, appSpec)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not download app bundle")
|
||||
}
|
||||
|
||||
dataDir, err := c.ensureAppDataDir(ctx, appID)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not retrieve app data dir")
|
||||
}
|
||||
|
||||
server, exists := c.servers[appID]
|
||||
if !exists {
|
||||
logger.Info(ctx, "app currently not running")
|
||||
} else if sha256sum != appSpec.SHA256Sum {
|
||||
logger.Info(
|
||||
ctx, "bundle hash mismatch, stopping app",
|
||||
logger.F("currentHash", sha256sum),
|
||||
logger.F("specHash", appSpec.SHA256Sum),
|
||||
)
|
||||
|
||||
if err := server.Stop(); err != nil {
|
||||
return errors.Wrap(err, "could not stop app")
|
||||
}
|
||||
|
||||
server = nil
|
||||
}
|
||||
|
||||
if server == nil {
|
||||
dbFile := filepath.Join(dataDir, appID+".sqlite")
|
||||
db, err := sqlite.Open(dbFile)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "could not opend database file '%s'", dbFile)
|
||||
}
|
||||
|
||||
server = NewServer(bundle, db)
|
||||
c.servers[appID] = server
|
||||
}
|
||||
|
||||
logger.Info(
|
||||
ctx, "starting app",
|
||||
logger.F("address", appSpec.Address),
|
||||
)
|
||||
|
||||
if err := server.Start(ctx, appSpec.Address); err != nil {
|
||||
delete(c.servers, appID)
|
||||
|
||||
return errors.Wrap(err, "could not start app")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) ensureAppBundle(ctx context.Context, appID string, spec app.AppEntry) (bundle.Bundle, string, error) {
|
||||
if err := os.MkdirAll(c.downloadDir, os.ModePerm); err != nil {
|
||||
return nil, "", errors.WithStack(err)
|
||||
}
|
||||
|
||||
bundlePath := filepath.Join(c.downloadDir, appID+"."+spec.Format)
|
||||
|
||||
_, err := os.Stat(bundlePath)
|
||||
if err != nil && !errors.Is(err, os.ErrNotExist) {
|
||||
return nil, "", errors.WithStack(err)
|
||||
}
|
||||
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
if err := c.downloadFile(spec.URL, spec.SHA256Sum, bundlePath); err != nil {
|
||||
return nil, "", errors.WithStack(err)
|
||||
}
|
||||
}
|
||||
|
||||
sha256sum, err := hash(bundlePath)
|
||||
if err != nil {
|
||||
return nil, "", errors.WithStack(err)
|
||||
}
|
||||
|
||||
if sha256sum == spec.SHA256Sum {
|
||||
bdle, err := bundle.FromPath(bundlePath)
|
||||
if err != nil {
|
||||
return nil, "", errors.WithStack(err)
|
||||
}
|
||||
|
||||
return bdle, sha256sum, nil
|
||||
}
|
||||
|
||||
logger.Info(ctx, "bundle hash mismatch, downloading app", logger.F("url", spec.URL))
|
||||
|
||||
if err := c.downloadFile(spec.URL, spec.SHA256Sum, bundlePath); err != nil {
|
||||
return nil, "", errors.WithStack(err)
|
||||
}
|
||||
|
||||
bdle, err := bundle.FromPath(bundlePath)
|
||||
if err != nil {
|
||||
return nil, "", errors.WithStack(err)
|
||||
}
|
||||
|
||||
return bdle, "", nil
|
||||
}
|
||||
|
||||
func (c *Controller) downloadFile(url string, sha256sum string, dest string) error {
|
||||
res, err := c.client.Get(url)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := res.Body.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
|
||||
panic(errors.WithStack(err))
|
||||
}
|
||||
}()
|
||||
|
||||
tmp, err := os.CreateTemp(filepath.Dir(dest), "download_")
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := os.Remove(tmp.Name()); err != nil && !os.IsNotExist(err) {
|
||||
panic(errors.WithStack(err))
|
||||
}
|
||||
}()
|
||||
|
||||
if _, err := io.Copy(tmp, res.Body); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
tmpFileHash, err := hash(tmp.Name())
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if tmpFileHash != sha256sum {
|
||||
return errors.Errorf("sha256 sum mismatch: expected '%s', got '%s'", sha256sum, tmpFileHash)
|
||||
}
|
||||
|
||||
if err := os.Rename(tmp.Name(), dest); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) ensureAppDataDir(ctx context.Context, appID string) (string, error) {
|
||||
dataDir := filepath.Join(c.dataDir, appID)
|
||||
|
||||
if err := os.MkdirAll(dataDir, os.ModePerm); err != nil {
|
||||
return "", errors.WithStack(err)
|
||||
}
|
||||
|
||||
return dataDir, nil
|
||||
}
|
||||
|
||||
func NewController(funcs ...OptionFunc) *Controller {
|
||||
opts := defaultOptions()
|
||||
for _, fn := range funcs {
|
||||
fn(opts)
|
||||
}
|
||||
|
||||
return &Controller{
|
||||
client: opts.Client,
|
||||
downloadDir: opts.DownloadDir,
|
||||
dataDir: opts.DataDir,
|
||||
currentSpecRevision: -1,
|
||||
servers: make(map[string]*Server),
|
||||
}
|
||||
}
|
||||
|
||||
var _ agent.Controller = &Controller{}
|
31
internal/agent/controller/app/hash.go
Normal file
31
internal/agent/controller/app/hash.go
Normal file
@ -0,0 +1,31 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"io"
|
||||
"os"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func hash(path string) (string, error) {
|
||||
file, err := os.Open(path)
|
||||
if err != nil {
|
||||
return "", errors.WithStack(err)
|
||||
}
|
||||
|
||||
hasher := sha256.New()
|
||||
|
||||
defer func() {
|
||||
if err := file.Close(); err != nil {
|
||||
panic(errors.WithStack(err))
|
||||
}
|
||||
}()
|
||||
|
||||
if _, err := io.Copy(hasher, file); err != nil {
|
||||
return "", errors.WithStack(err)
|
||||
}
|
||||
|
||||
return hex.EncodeToString(hasher.Sum(nil)), nil
|
||||
}
|
37
internal/agent/controller/app/options.go
Normal file
37
internal/agent/controller/app/options.go
Normal file
@ -0,0 +1,37 @@
|
||||
package app
|
||||
|
||||
import "net/http"
|
||||
|
||||
type Options struct {
|
||||
DataDir string
|
||||
DownloadDir string
|
||||
Client *http.Client
|
||||
}
|
||||
|
||||
func defaultOptions() *Options {
|
||||
return &Options{
|
||||
DataDir: "apps/data",
|
||||
DownloadDir: "apps/download",
|
||||
Client: &http.Client{},
|
||||
}
|
||||
}
|
||||
|
||||
type OptionFunc func(*Options)
|
||||
|
||||
func WithDataDir(dataDir string) OptionFunc {
|
||||
return func(o *Options) {
|
||||
o.DataDir = dataDir
|
||||
}
|
||||
}
|
||||
|
||||
func WithDownloadDir(downloadDir string) OptionFunc {
|
||||
return func(o *Options) {
|
||||
o.DownloadDir = downloadDir
|
||||
}
|
||||
}
|
||||
|
||||
func WithClient(client *http.Client) OptionFunc {
|
||||
return func(o *Options) {
|
||||
o.Client = client
|
||||
}
|
||||
}
|
125
internal/agent/controller/app/server.go
Normal file
125
internal/agent/controller/app/server.go
Normal file
@ -0,0 +1,125 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"net/http"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/app"
|
||||
"forge.cadoles.com/arcad/edge/pkg/bus"
|
||||
"forge.cadoles.com/arcad/edge/pkg/bus/memory"
|
||||
edgeHTTP "forge.cadoles.com/arcad/edge/pkg/http"
|
||||
"forge.cadoles.com/arcad/edge/pkg/module"
|
||||
"forge.cadoles.com/arcad/edge/pkg/module/cast"
|
||||
"forge.cadoles.com/arcad/edge/pkg/module/net"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage"
|
||||
"forge.cadoles.com/arcad/edge/pkg/storage/sqlite"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/bundle"
|
||||
"github.com/go-chi/chi/middleware"
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
bundle bundle.Bundle
|
||||
db *sql.DB
|
||||
server *http.Server
|
||||
}
|
||||
|
||||
func (s *Server) Start(ctx context.Context, addr string) error {
|
||||
if s.server != nil {
|
||||
if err := s.Stop(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
}
|
||||
|
||||
router := chi.NewRouter()
|
||||
|
||||
router.Use(middleware.Logger)
|
||||
|
||||
bus := memory.NewBus()
|
||||
ds := sqlite.NewDocumentStoreWithDB(s.db)
|
||||
bs := sqlite.NewBlobStoreWithDB(s.db)
|
||||
|
||||
handler := edgeHTTP.NewHandler(
|
||||
edgeHTTP.WithBus(bus),
|
||||
edgeHTTP.WithServerModules(s.getAppModules(bus, ds, bs)...),
|
||||
)
|
||||
if err := handler.Load(s.bundle); err != nil {
|
||||
return errors.Wrap(err, "could not load app bundle")
|
||||
}
|
||||
|
||||
router.Handle("/*", handler)
|
||||
|
||||
server := &http.Server{
|
||||
Addr: addr,
|
||||
Handler: router,
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
if err := s.Stop(); err != nil {
|
||||
panic(errors.WithStack(err))
|
||||
}
|
||||
}()
|
||||
|
||||
if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
panic(errors.WithStack(err))
|
||||
}
|
||||
}()
|
||||
|
||||
s.server = server
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) Stop() error {
|
||||
if s.server == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
defer func() {
|
||||
s.server = nil
|
||||
}()
|
||||
|
||||
if err := s.server.Close(); err != nil {
|
||||
panic(errors.WithStack(err))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) getAppModules(bus bus.Bus, ds storage.DocumentStore, bs storage.BlobStore) []app.ServerModuleFactory {
|
||||
return []app.ServerModuleFactory{
|
||||
module.ContextModuleFactory(),
|
||||
module.ConsoleModuleFactory(),
|
||||
cast.CastModuleFactory(),
|
||||
module.LifecycleModuleFactory(),
|
||||
net.ModuleFactory(bus),
|
||||
module.RPCModuleFactory(bus),
|
||||
module.StoreModuleFactory(ds),
|
||||
module.BlobModuleFactory(bus, bs),
|
||||
// module.Extends(
|
||||
// auth.ModuleFactory(
|
||||
// auth.WithJWT(dummyKeyFunc),
|
||||
// ),
|
||||
// func(o *goja.Object) {
|
||||
// if err := o.Set("CLAIM_ROLE", "role"); err != nil {
|
||||
// panic(errors.New("could not set 'CLAIM_ROLE' property"))
|
||||
// }
|
||||
|
||||
// if err := o.Set("CLAIM_PREFERRED_USERNAME", "preferred_username"); err != nil {
|
||||
// panic(errors.New("could not set 'CLAIM_PREFERRED_USERNAME' property"))
|
||||
// }
|
||||
// },
|
||||
// ),
|
||||
}
|
||||
}
|
||||
|
||||
func NewServer(bundle bundle.Bundle, db *sql.DB) *Server {
|
||||
return &Server{
|
||||
bundle: bundle,
|
||||
db: db,
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user