package proxy import ( "context" "net/url" "forge.cadoles.com/Cadoles/emissary/internal/agent" "forge.cadoles.com/Cadoles/emissary/internal/spec/proxy" edgeProxy "forge.cadoles.com/arcad/edge/pkg/proxy" "github.com/mitchellh/hashstructure/v2" "github.com/pkg/errors" "gitlab.com/wpetit/goweb/logger" ) type proxyEntry struct { SpecHash uint64 Proxy *ReverseProxy } type Controller struct { proxies map[proxy.ID]*proxyEntry } // Name implements node.Controller. func (c *Controller) Name() string { return "proxy-controller" } // Reconcile implements node.Controller. func (c *Controller) Reconcile(ctx context.Context, state *agent.State) error { proxySpec := proxy.NewSpec() if err := state.GetSpec(proxy.NameProxy, proxySpec); err != nil { if errors.Is(err, agent.ErrSpecNotFound) { logger.Info(ctx, "could not find proxy spec") c.stopAllProxies(ctx) return nil } return errors.WithStack(err) } logger.Info(ctx, "retrieved spec", logger.F("spec", proxySpec.SpecName()), logger.F("revision", proxySpec.SpecRevision())) c.updateProxies(ctx, proxySpec) return nil } func (c *Controller) stopAllProxies(ctx context.Context) { if len(c.proxies) > 0 { logger.Info(ctx, "stopping all proxies") } for proxyID, entry := range c.proxies { logger.Info(ctx, "stopping proxy", logger.F("proxyID", proxyID)) if err := entry.Proxy.Stop(); err != nil { logger.Error( ctx, "error while stopping proxy", logger.F("proxyID", proxyID), logger.E(errors.WithStack(err)), ) delete(c.proxies, proxyID) } } } func (c *Controller) updateProxies(ctx context.Context, spec *proxy.Spec) { // Stop and remove obsolete proxys for proxyID, entry := range c.proxies { if _, exists := spec.Proxies[proxyID]; exists { continue } logger.Info(ctx, "stopping proxy", logger.F("proxyID", proxyID)) if err := entry.Proxy.Stop(); err != nil { logger.Error( ctx, "error while stopping proxy", logger.F("proxyID", proxyID), logger.E(errors.WithStack(err)), ) delete(c.proxies, proxyID) } } // (Re)start proxys for proxyID, proxySpec := range spec.Proxies { proxyCtx := logger.With(ctx, logger.F("proxyID", proxyID)) if err := c.updateProxy(ctx, proxyID, proxySpec); err != nil { logger.Error(proxyCtx, "could not update proxy", logger.E(errors.WithStack(err))) continue } } } func (c *Controller) updateProxy(ctx context.Context, proxyID proxy.ID, proxySpec proxy.ProxyEntry) (err error) { newProxySpecHash, err := hashstructure.Hash(proxySpec, hashstructure.FormatV2, nil) if err != nil { return errors.WithStack(err) } var entry *proxyEntry entry, exists := c.proxies[proxyID] if !exists { logger.Info(ctx, "proxy currently not running") } if entry == nil { entry = &proxyEntry{ Proxy: NewReverseProxy(), SpecHash: 0, } c.proxies[proxyID] = entry } specChanged := newProxySpecHash != entry.SpecHash if entry.Proxy.Running() && !specChanged { return nil } if specChanged && entry.SpecHash != 0 { logger.Info( ctx, "restarting proxy", logger.F("address", proxySpec.Address), ) } else { logger.Info( ctx, "starting proxy", logger.F("address", proxySpec.Address), ) } options := make([]edgeProxy.OptionFunc, 0) allowedHosts := make([]string, len(proxySpec.Mappings)) mappings := make(map[string]*url.URL, len(proxySpec.Mappings)) for _, m := range proxySpec.Mappings { target, err := url.Parse(m.Target) if err != nil { return errors.WithStack(err) } mappings[m.HostPattern] = target allowedHosts = append(allowedHosts, m.HostPattern) } options = append( options, edgeProxy.WithAllowedHosts(allowedHosts...), edgeProxy.WithRewriteHosts(mappings), ) if err := entry.Proxy.Start(ctx, proxySpec.Address, options...); err != nil { delete(c.proxies, proxyID) return errors.Wrap(err, "could not start app") } entry.SpecHash = newProxySpecHash return nil } func NewController() *Controller { return &Controller{ proxies: make(map[proxy.ID]*proxyEntry), } } var _ agent.Controller = &Controller{}