package app import ( "context" "io" "net/http" "os" "path/filepath" "forge.cadoles.com/Cadoles/emissary/internal/agent" "forge.cadoles.com/Cadoles/emissary/internal/agent/controller/app/spec" "forge.cadoles.com/arcad/edge/pkg/bundle" "github.com/mitchellh/hashstructure/v2" "github.com/pkg/errors" "gitlab.com/wpetit/goweb/logger" ) type serverEntry struct { SpecHash uint64 Server *Server } type Controller struct { client *http.Client downloadDir string dataDir string servers map[string]*serverEntry appRepository *AppRepository } // Name implements node.Controller. func (c *Controller) Name() string { return "app-controller" } // Reconcile implements node.Controller. func (c *Controller) Reconcile(ctx context.Context, state *agent.State) error { appSpec := spec.NewSpec() if err := state.GetSpec(spec.Name, appSpec); err != nil { if errors.Is(err, agent.ErrSpecNotFound) { logger.Info(ctx, "could not find app spec") c.stopAllApps(ctx, appSpec) return nil } return errors.WithStack(err) } logger.Info(ctx, "retrieved spec", logger.F("spec", appSpec.SpecName()), logger.F("revision", appSpec.SpecRevision())) c.updateApps(ctx, appSpec) return nil } func (c *Controller) stopAllApps(ctx context.Context, spec *spec.Spec) { if len(c.servers) > 0 { logger.Info(ctx, "stopping all apps") } for appID, entry := range c.servers { logger.Info(ctx, "stopping app", logger.F("appID", appID)) if err := entry.Server.Stop(); err != nil { logger.Error( ctx, "error while stopping app", logger.F("appID", appID), logger.E(errors.WithStack(err)), ) delete(c.servers, appID) } } } func (c *Controller) updateApps(ctx context.Context, specs *spec.Spec) { // Stop and remove obsolete apps for appKey, server := range c.servers { if _, exists := specs.Apps[appKey]; exists { continue } logger.Info(ctx, "stopping app", logger.F("appKey", appKey)) if err := server.Server.Stop(); err != nil { logger.Error( ctx, "error while stopping app", logger.F("appKey", appKey), logger.E(errors.WithStack(err)), ) delete(c.servers, appKey) } } c.updateAppRepository(ctx, specs) // (Re)start apps if necessary for appKey := range specs.Apps { appCtx := logger.With(ctx, logger.F("appKey", appKey)) if err := c.updateApp(ctx, specs, appKey); err != nil { logger.Error(appCtx, "could not update app", logger.E(errors.WithStack(err))) continue } } } func (c *Controller) updateAppRepository(ctx context.Context, specs *spec.Spec) { bundles := make([]string, 0, len(specs.Apps)) for appKey, app := range specs.Apps { path := c.getAppBundlePath(appKey, app.Format) bundles = append(bundles, path) } getURL := createGetAppURL(specs) c.appRepository.Update(getURL, bundles) } func (c *Controller) updateApp(ctx context.Context, specs *spec.Spec, appKey string) (err error) { appEntry := specs.Apps[appKey] newAppSpecHash, err := hashstructure.Hash(appEntry, hashstructure.FormatV2, nil) if err != nil { return errors.WithStack(err) } bundle, sha256sum, err := c.ensureAppBundle(ctx, appKey, appEntry) if err != nil { return errors.Wrap(err, "could not download app bundle") } server, exists := c.servers[appKey] if !exists { logger.Info(ctx, "app currently not running") } else if sha256sum != appEntry.SHA256Sum { logger.Info( ctx, "bundle hash mismatch, stopping app", logger.F("currentHash", sha256sum), logger.F("specHash", appEntry.SHA256Sum), ) if err := server.Server.Stop(); err != nil { return errors.Wrap(err, "could not stop app") } server = nil } if server == nil { options, err := c.getHandlerOptions(ctx, appKey, specs) if err != nil { return errors.Wrap(err, "could not create handler options") } var auth *spec.Auth if specs.Config != nil { auth = specs.Config.Auth } server = &serverEntry{ Server: NewServer(bundle, auth, options...), SpecHash: 0, } c.servers[appKey] = server } specChanged := newAppSpecHash != server.SpecHash if server.Server.Running() && !specChanged { return nil } if specChanged && server.SpecHash != 0 { logger.Info( ctx, "restarting app", logger.F("address", appEntry.Address), ) } else { logger.Info( ctx, "starting app", logger.F("address", appEntry.Address), ) } if err := server.Server.Start(ctx, appEntry.Address); err != nil { delete(c.servers, appKey) return errors.Wrap(err, "could not start app") } server.SpecHash = newAppSpecHash return nil } func (c *Controller) ensureAppBundle(ctx context.Context, appID string, spec spec.AppEntry) (bundle.Bundle, string, error) { if err := os.MkdirAll(c.downloadDir, os.ModePerm); err != nil { return nil, "", errors.WithStack(err) } bundlePath := c.getAppBundlePath(appID, spec.Format) _, err := os.Stat(bundlePath) if err != nil && !errors.Is(err, os.ErrNotExist) { return nil, "", errors.WithStack(err) } if errors.Is(err, os.ErrNotExist) { if err := c.downloadFile(spec.URL, spec.SHA256Sum, bundlePath); err != nil { return nil, "", errors.WithStack(err) } } sha256sum, err := hash(bundlePath) if err != nil { return nil, "", errors.WithStack(err) } if sha256sum == spec.SHA256Sum { bdle, err := bundle.FromPath(bundlePath) if err != nil { return nil, "", errors.WithStack(err) } return bdle, sha256sum, nil } logger.Info(ctx, "bundle hash mismatch, downloading app", logger.F("url", spec.URL)) if err := c.downloadFile(spec.URL, spec.SHA256Sum, bundlePath); err != nil { return nil, "", errors.WithStack(err) } bdle, err := bundle.FromPath(bundlePath) if err != nil { return nil, "", errors.WithStack(err) } return bdle, "", nil } func (c *Controller) downloadFile(url string, sha256sum string, dest string) error { res, err := c.client.Get(url) if err != nil { return errors.WithStack(err) } defer func() { if err := res.Body.Close(); err != nil && !errors.Is(err, os.ErrClosed) { panic(errors.WithStack(err)) } }() tmp, err := os.CreateTemp(filepath.Dir(dest), "download_") if err != nil { return errors.WithStack(err) } defer func() { if err := os.Remove(tmp.Name()); err != nil && !os.IsNotExist(err) { panic(errors.WithStack(err)) } }() if _, err := io.Copy(tmp, res.Body); err != nil { return errors.WithStack(err) } tmpFileHash, err := hash(tmp.Name()) if err != nil { return errors.WithStack(err) } if tmpFileHash != sha256sum { return errors.Errorf("sha256 sum mismatch: expected '%s', got '%s'", sha256sum, tmpFileHash) } if err := os.Rename(tmp.Name(), dest); err != nil { return errors.WithStack(err) } return nil } func (c *Controller) ensureAppDataDir(ctx context.Context, appID string) (string, error) { dataDir := filepath.Join(c.dataDir, appID) if err := os.MkdirAll(dataDir, os.ModePerm); err != nil { return "", errors.WithStack(err) } return dataDir, nil } func (c *Controller) getAppBundlePath(appKey string, format string) string { return filepath.Join(c.downloadDir, appKey+"."+format) } func NewController(funcs ...OptionFunc) *Controller { opts := defaultOptions() for _, fn := range funcs { fn(opts) } return &Controller{ client: opts.Client, downloadDir: opts.DownloadDir, dataDir: opts.DataDir, servers: make(map[string]*serverEntry), appRepository: NewAppRepository(), } } var _ agent.Controller = &Controller{}