emissary/internal/agent/controller/app/controller.go

305 lines
6.9 KiB
Go
Raw Normal View History

package app
import (
"context"
"io"
"net/http"
"os"
"path/filepath"
"forge.cadoles.com/Cadoles/emissary/internal/agent"
2023-03-28 20:43:45 +02:00
"forge.cadoles.com/Cadoles/emissary/internal/agent/controller/app/spec"
"forge.cadoles.com/arcad/edge/pkg/bundle"
2023-03-13 10:44:58 +01:00
"github.com/mitchellh/hashstructure/v2"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
2023-03-13 10:44:58 +01:00
type serverEntry struct {
SpecHash uint64
Server *Server
}
type Controller struct {
2023-03-13 10:44:58 +01:00
client *http.Client
downloadDir string
dataDir string
servers map[string]*serverEntry
}
// Name implements node.Controller.
func (c *Controller) Name() string {
return "app-controller"
}
// Reconcile implements node.Controller.
func (c *Controller) Reconcile(ctx context.Context, state *agent.State) error {
2023-03-28 20:43:45 +02:00
appSpec := spec.NewSpec()
2023-03-28 20:43:45 +02:00
if err := state.GetSpec(spec.Name, appSpec); err != nil {
if errors.Is(err, agent.ErrSpecNotFound) {
2023-03-13 10:44:58 +01:00
logger.Info(ctx, "could not find app spec")
2023-03-13 10:44:58 +01:00
c.stopAllApps(ctx, appSpec)
return nil
}
return errors.WithStack(err)
}
logger.Info(ctx, "retrieved spec", logger.F("spec", appSpec.SpecName()), logger.F("revision", appSpec.SpecRevision()))
c.updateApps(ctx, appSpec)
return nil
}
2023-03-28 20:43:45 +02:00
func (c *Controller) stopAllApps(ctx context.Context, spec *spec.Spec) {
2023-03-13 10:44:58 +01:00
if len(c.servers) > 0 {
logger.Info(ctx, "stopping all apps")
}
for appID, entry := range c.servers {
logger.Info(ctx, "stopping app", logger.F("appID", appID))
2023-03-13 10:44:58 +01:00
if err := entry.Server.Stop(); err != nil {
logger.Error(
ctx, "error while stopping app",
logger.F("appID", appID),
logger.E(errors.WithStack(err)),
)
delete(c.servers, appID)
}
}
}
2023-03-28 20:43:45 +02:00
func (c *Controller) updateApps(ctx context.Context, specs *spec.Spec) {
// Stop and remove obsolete apps
2023-03-28 20:43:45 +02:00
for appKey, server := range c.servers {
if _, exists := specs.Apps[appKey]; exists {
continue
}
2023-03-28 20:43:45 +02:00
logger.Info(ctx, "stopping app", logger.F("appKey", appKey))
2023-03-28 20:43:45 +02:00
if err := server.Server.Stop(); err != nil {
logger.Error(
ctx, "error while stopping app",
2023-03-28 20:43:45 +02:00
logger.F("appKey", appKey),
logger.E(errors.WithStack(err)),
)
2023-03-28 20:43:45 +02:00
delete(c.servers, appKey)
}
}
// (Re)start apps
2023-03-28 20:43:45 +02:00
for appKey := range specs.Apps {
appCtx := logger.With(ctx, logger.F("appKey", appKey))
2023-03-28 20:43:45 +02:00
if err := c.updateApp(ctx, specs, appKey); err != nil {
logger.Error(appCtx, "could not update app", logger.E(errors.WithStack(err)))
continue
}
}
2023-03-13 10:44:58 +01:00
}
2023-03-28 20:43:45 +02:00
func (c *Controller) updateApp(ctx context.Context, specs *spec.Spec, appKey string) (err error) {
appEntry := specs.Apps[appKey]
newAppSpecHash, err := hashstructure.Hash(appEntry, hashstructure.FormatV2, nil)
2023-03-13 10:44:58 +01:00
if err != nil {
return errors.WithStack(err)
}
2023-03-28 20:43:45 +02:00
bundle, sha256sum, err := c.ensureAppBundle(ctx, appKey, appEntry)
if err != nil {
return errors.Wrap(err, "could not download app bundle")
}
2023-03-28 20:43:45 +02:00
server, exists := c.servers[appKey]
if !exists {
logger.Info(ctx, "app currently not running")
2023-03-28 20:43:45 +02:00
} else if sha256sum != appEntry.SHA256Sum {
logger.Info(
ctx, "bundle hash mismatch, stopping app",
logger.F("currentHash", sha256sum),
2023-03-28 20:43:45 +02:00
logger.F("specHash", appEntry.SHA256Sum),
)
2023-03-28 20:43:45 +02:00
if err := server.Server.Stop(); err != nil {
return errors.Wrap(err, "could not stop app")
}
2023-03-28 20:43:45 +02:00
server = nil
}
2023-03-28 20:43:45 +02:00
if server == nil {
options, err := c.getHandlerOptions(ctx, appKey, specs)
if err != nil {
2023-03-28 20:43:45 +02:00
return errors.Wrap(err, "could not create handler options")
}
2023-03-28 20:43:45 +02:00
var auth *spec.Auth
if specs.Config != nil {
auth = specs.Config.Auth
}
server = &serverEntry{
Server: NewServer(bundle, auth, options...),
2023-03-13 10:44:58 +01:00
SpecHash: 0,
}
2023-03-28 20:43:45 +02:00
c.servers[appKey] = server
2023-03-13 10:44:58 +01:00
}
2023-03-28 20:43:45 +02:00
specChanged := newAppSpecHash != server.SpecHash
2023-03-13 10:44:58 +01:00
2023-03-28 20:43:45 +02:00
if server.Server.Running() && !specChanged {
2023-03-13 10:44:58 +01:00
return nil
}
2023-03-28 20:43:45 +02:00
if specChanged && server.SpecHash != 0 {
2023-03-13 10:44:58 +01:00
logger.Info(
ctx, "restarting app",
2023-03-28 20:43:45 +02:00
logger.F("address", appEntry.Address),
2023-03-13 10:44:58 +01:00
)
} else {
logger.Info(
ctx, "starting app",
2023-03-28 20:43:45 +02:00
logger.F("address", appEntry.Address),
2023-03-13 10:44:58 +01:00
)
}
2023-03-28 20:43:45 +02:00
if err := server.Server.Start(ctx, appEntry.Address); err != nil {
delete(c.servers, appKey)
return errors.Wrap(err, "could not start app")
}
2023-03-28 20:43:45 +02:00
server.SpecHash = newAppSpecHash
2023-03-13 10:44:58 +01:00
return nil
}
2023-03-28 20:43:45 +02:00
func (c *Controller) ensureAppBundle(ctx context.Context, appID string, spec spec.AppEntry) (bundle.Bundle, string, error) {
if err := os.MkdirAll(c.downloadDir, os.ModePerm); err != nil {
return nil, "", errors.WithStack(err)
}
2023-03-28 20:43:45 +02:00
bundlePath := c.getAppBundlePath(appID, spec.Format)
_, err := os.Stat(bundlePath)
if err != nil && !errors.Is(err, os.ErrNotExist) {
return nil, "", errors.WithStack(err)
}
if errors.Is(err, os.ErrNotExist) {
if err := c.downloadFile(spec.URL, spec.SHA256Sum, bundlePath); err != nil {
return nil, "", errors.WithStack(err)
}
}
sha256sum, err := hash(bundlePath)
if err != nil {
return nil, "", errors.WithStack(err)
}
if sha256sum == spec.SHA256Sum {
bdle, err := bundle.FromPath(bundlePath)
if err != nil {
return nil, "", errors.WithStack(err)
}
return bdle, sha256sum, nil
}
logger.Info(ctx, "bundle hash mismatch, downloading app", logger.F("url", spec.URL))
if err := c.downloadFile(spec.URL, spec.SHA256Sum, bundlePath); err != nil {
return nil, "", errors.WithStack(err)
}
bdle, err := bundle.FromPath(bundlePath)
if err != nil {
return nil, "", errors.WithStack(err)
}
return bdle, "", nil
}
func (c *Controller) downloadFile(url string, sha256sum string, dest string) error {
res, err := c.client.Get(url)
if err != nil {
return errors.WithStack(err)
}
defer func() {
if err := res.Body.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
panic(errors.WithStack(err))
}
}()
tmp, err := os.CreateTemp(filepath.Dir(dest), "download_")
if err != nil {
return errors.WithStack(err)
}
defer func() {
if err := os.Remove(tmp.Name()); err != nil && !os.IsNotExist(err) {
panic(errors.WithStack(err))
}
}()
if _, err := io.Copy(tmp, res.Body); err != nil {
return errors.WithStack(err)
}
tmpFileHash, err := hash(tmp.Name())
if err != nil {
return errors.WithStack(err)
}
if tmpFileHash != sha256sum {
return errors.Errorf("sha256 sum mismatch: expected '%s', got '%s'", sha256sum, tmpFileHash)
}
if err := os.Rename(tmp.Name(), dest); err != nil {
return errors.WithStack(err)
}
return nil
}
func (c *Controller) ensureAppDataDir(ctx context.Context, appID string) (string, error) {
dataDir := filepath.Join(c.dataDir, appID)
if err := os.MkdirAll(dataDir, os.ModePerm); err != nil {
return "", errors.WithStack(err)
}
return dataDir, nil
}
2023-03-28 20:43:45 +02:00
func (c *Controller) getAppBundlePath(appKey string, format string) string {
return filepath.Join(c.downloadDir, appKey+"."+format)
}
func NewController(funcs ...OptionFunc) *Controller {
opts := defaultOptions()
for _, fn := range funcs {
fn(opts)
}
return &Controller{
2023-03-13 10:44:58 +01:00
client: opts.Client,
downloadDir: opts.DownloadDir,
dataDir: opts.DataDir,
servers: make(map[string]*serverEntry),
}
}
var _ agent.Controller = &Controller{}