package app import ( "context" "io" "net/http" "os" "path/filepath" "forge.cadoles.com/Cadoles/emissary/internal/agent" "forge.cadoles.com/Cadoles/emissary/internal/agent/controller/app/spec" "forge.cadoles.com/arcad/edge/pkg/app" "forge.cadoles.com/arcad/edge/pkg/bundle" "github.com/mitchellh/hashstructure/v2" "github.com/pkg/errors" "gitlab.com/wpetit/goweb/logger" ) type serverEntry struct { AppDefHash uint64 Server *Server } type Controller struct { client *http.Client downloadDir string dataDir string servers map[string]*serverEntry appRepository *AppRepository } // Name implements node.Controller. func (c *Controller) Name() string { return "app-controller" } // Reconcile implements node.Controller. func (c *Controller) Reconcile(ctx context.Context, state *agent.State) error { appSpec := spec.NewSpec() if err := state.GetSpec(spec.Name, spec.Version, appSpec); err != nil { if errors.Is(err, agent.ErrSpecNotFound) { logger.Info(ctx, "could not find app spec") c.stopAllApps(ctx, appSpec) return nil } return errors.WithStack(err) } logger.Info( ctx, "retrieved spec", logger.F("name", appSpec.SpecDefinitionName()), logger.F("version", appSpec.SpecDefinitionVersion()), logger.F("revision", appSpec.SpecRevision()), ) c.updateApps(ctx, appSpec) return nil } func (c *Controller) stopAllApps(ctx context.Context, spec *spec.Spec) { if len(c.servers) > 0 { logger.Info(ctx, "stopping all apps") } for appID, entry := range c.servers { logger.Info(ctx, "stopping app", logger.F("appID", appID)) if err := entry.Server.Stop(); err != nil { err = errors.WithStack(err) logger.Error( ctx, "error while stopping app", logger.F("appID", appID), logger.CapturedE(err), ) delete(c.servers, appID) } } } func (c *Controller) updateApps(ctx context.Context, specs *spec.Spec) { // Stop and remove obsolete apps for appKey, server := range c.servers { if _, exists := specs.Apps[appKey]; exists { continue } logger.Info(ctx, "stopping app", logger.F("appKey", appKey)) if err := server.Server.Stop(); err != nil { err = errors.WithStack(err) logger.Error( ctx, "error while stopping app", logger.F("appKey", appKey), logger.CapturedE(err), ) delete(c.servers, appKey) } } if err := c.updateAppRepository(ctx, specs); err != nil { err = errors.WithStack(err) logger.Error( ctx, "could not update app repository", logger.CapturedE(err), ) return } // (Re)start apps if necessary for appKey := range specs.Apps { appCtx := logger.With(ctx, logger.F("appKey", appKey)) if err := c.updateApp(ctx, specs, appKey); err != nil { err = errors.WithStack(err) logger.Error(appCtx, "could not update app", logger.CapturedE(err)) continue } } } func (c *Controller) updateAppRepository(ctx context.Context, specs *spec.Spec) error { bundles := make([]string, 0, len(specs.Apps)) for appKey, app := range specs.Apps { path := c.getAppBundlePath(appKey, app.Format) bundles = append(bundles, path) } resolveAppURL, err := createResolveAppURL(specs) if err != nil { return errors.WithStack(err) } c.appRepository.Update(resolveAppURL, bundles) return nil } func (c *Controller) updateApp(ctx context.Context, specs *spec.Spec, appKey string) (err error) { appEntry := specs.Apps[appKey] appDef := struct { App spec.AppEntry Config *spec.Config }{ App: appEntry, Config: specs.Config, } newAppDefHash, err := hashstructure.Hash(appDef, hashstructure.FormatV2, nil) if err != nil { return errors.WithStack(err) } bundle, sha256sum, err := c.ensureAppBundle(ctx, appKey, appEntry) if err != nil { return errors.Wrap(err, "could not download app bundle") } server, exists := c.servers[appKey] if !exists { logger.Info(ctx, "app currently not running") } else if sha256sum != appEntry.SHA256Sum { logger.Info( ctx, "bundle hash mismatch, stopping app", logger.F("currentHash", sha256sum), logger.F("specHash", appEntry.SHA256Sum), ) if err := server.Server.Stop(); err != nil { return errors.Wrap(err, "could not stop app") } server = nil } newServerEntry := func() (*serverEntry, error) { options, err := c.getHandlerOptions(ctx, appKey, specs) if err != nil { return nil, errors.Wrap(err, "could not create handler options") } server = &serverEntry{ Server: NewServer(bundle, specs.Config, options...), AppDefHash: 0, } return server, nil } if server == nil { serverEntry, err := newServerEntry() if err != nil { return errors.WithStack(err) } c.servers[appKey] = serverEntry } defChanged := newAppDefHash != server.AppDefHash if server.Server.Running() && !defChanged { return nil } ctx = logger.With(ctx, logger.F("appKey", appKey), logger.F("address", appEntry.Address), ) if defChanged && server.AppDefHash != 0 { logger.Info(ctx, "restarting app") if err := server.Server.Stop(); err != nil { return errors.WithStack(err) } serverEntry, err := newServerEntry() if err != nil { return errors.WithStack(err) } c.servers[appKey] = serverEntry } else { logger.Info(ctx, "starting app") } if err := server.Server.Start(ctx, appEntry.Address); err != nil { delete(c.servers, appKey) return errors.Wrap(err, "could not start app") } server.AppDefHash = newAppDefHash return nil } func (c *Controller) ensureAppBundle(ctx context.Context, appID string, spec spec.AppEntry) (bundle.Bundle, string, error) { if err := os.MkdirAll(c.downloadDir, os.ModePerm); err != nil { return nil, "", errors.WithStack(err) } bundlePath := c.getAppBundlePath(appID, spec.Format) _, err := os.Stat(bundlePath) if err != nil && !errors.Is(err, os.ErrNotExist) { return nil, "", errors.WithStack(err) } if errors.Is(err, os.ErrNotExist) { if err := c.downloadFile(spec.URL, spec.SHA256Sum, bundlePath); err != nil { return nil, "", errors.WithStack(err) } } sha256sum, err := hash(bundlePath) if err != nil { return nil, "", errors.WithStack(err) } if sha256sum == spec.SHA256Sum { bdle, err := bundle.FromPath(bundlePath) if err != nil { return nil, "", errors.WithStack(err) } return bdle, sha256sum, nil } logger.Info(ctx, "bundle hash mismatch, downloading app", logger.F("url", spec.URL)) if err := c.downloadFile(spec.URL, spec.SHA256Sum, bundlePath); err != nil { return nil, "", errors.WithStack(err) } bdle, err := bundle.FromPath(bundlePath) if err != nil { return nil, "", errors.WithStack(err) } manifest, err := app.LoadManifest(bdle) if err != nil { return nil, "", errors.WithStack(err) } valid, err := validateManifest(manifest) if err != nil { return nil, "", errors.WithStack(err) } if !valid { return nil, "", errors.New("bundle's manifest is invalid") } return bdle, spec.SHA256Sum, nil } func (c *Controller) downloadFile(url string, sha256sum string, dest string) error { res, err := c.client.Get(url) if err != nil { return errors.WithStack(err) } defer func() { if err := res.Body.Close(); err != nil && !errors.Is(err, os.ErrClosed) { panic(errors.WithStack(err)) } }() tmp, err := os.CreateTemp(filepath.Dir(dest), "download_") if err != nil { return errors.WithStack(err) } defer func() { if err := os.Remove(tmp.Name()); err != nil && !os.IsNotExist(err) { panic(errors.WithStack(err)) } }() if _, err := io.Copy(tmp, res.Body); err != nil { return errors.WithStack(err) } tmpFileHash, err := hash(tmp.Name()) if err != nil { return errors.WithStack(err) } if tmpFileHash != sha256sum { return errors.Errorf("sha256 sum mismatch: expected '%s', got '%s'", sha256sum, tmpFileHash) } if err := os.Rename(tmp.Name(), dest); err != nil { return errors.WithStack(err) } return nil } func (c *Controller) ensureAppDataDir(ctx context.Context, appID string) (string, error) { dataDir := filepath.Join(c.dataDir, appID) if err := os.MkdirAll(dataDir, os.ModePerm); err != nil { return "", errors.WithStack(err) } return dataDir, nil } func (c *Controller) getAppBundlePath(appKey string, format string) string { return filepath.Join(c.downloadDir, appKey+"."+format) } func NewController(funcs ...OptionFunc) *Controller { opts := defaultOptions() for _, fn := range funcs { fn(opts) } return &Controller{ client: opts.Client, downloadDir: opts.DownloadDir, dataDir: opts.DataDir, servers: make(map[string]*serverEntry), appRepository: NewAppRepository(), } } var _ agent.Controller = &Controller{}