package gateway import ( "context" "forge.cadoles.com/Cadoles/emissary/internal/agent" "forge.cadoles.com/Cadoles/emissary/internal/spec" "github.com/pkg/errors" "gitlab.com/wpetit/goweb/logger" ) type Controller struct { proxies map[spec.GatewayID]*ReverseProxy currentSpecRevision int } // Name implements node.Controller. func (c *Controller) Name() string { return "gateway-controller" } // Reconcile implements node.Controller. func (c *Controller) Reconcile(ctx context.Context, state *agent.State) error { gatewaySpec := spec.NewGatewaySpec() if err := state.GetSpec(spec.NameGateway, gatewaySpec); err != nil { if errors.Is(err, agent.ErrSpecNotFound) { logger.Info(ctx, "could not find gateway spec, stopping all remaining proxies") c.stopAllProxies(ctx) return nil } return errors.WithStack(err) } logger.Info(ctx, "retrieved spec", logger.F("spec", gatewaySpec.SpecName()), logger.F("revision", gatewaySpec.SpecRevision())) if c.currentSpecRevision == gatewaySpec.SpecRevision() { logger.Info(ctx, "spec revision did not change, doing nothing") return nil } c.updateProxies(ctx, gatewaySpec) c.currentSpecRevision = gatewaySpec.SpecRevision() logger.Info(ctx, "updating current spec revision", logger.F("revision", c.currentSpecRevision)) return nil } func (c *Controller) stopAllProxies(ctx context.Context) { for gatewayID, proxy := range c.proxies { logger.Info(ctx, "stopping proxy", logger.F("gatewayID", gatewayID)) if err := proxy.Stop(); err != nil { logger.Error( ctx, "error while stopping proxy", logger.F("gatewayID", gatewayID), logger.E(errors.WithStack(err)), ) delete(c.proxies, gatewayID) } } } func (c *Controller) updateProxies(ctx context.Context, spec *spec.Gateway) { // Stop and remove obsolete gateways for gatewayID, proxy := range c.proxies { if _, exists := spec.Gateways[gatewayID]; exists { continue } logger.Info(ctx, "stopping proxy", logger.F("gatewayID", gatewayID)) if err := proxy.Stop(); err != nil { logger.Error( ctx, "error while stopping proxy", logger.F("gatewayID", gatewayID), logger.E(errors.WithStack(err)), ) delete(c.proxies, gatewayID) } } // (Re)start gateways for gatewayID, gatewaySpec := range spec.Gateways { proxy, exists := c.proxies[gatewayID] if !exists { proxy = NewReverseProxy() c.proxies[gatewayID] = proxy } logger.Info( ctx, "starting proxy", logger.F("gatewayID", gatewayID), logger.F("addr", gatewaySpec.Address), logger.F("target", gatewaySpec.Target), ) if err := proxy.Start(ctx, gatewaySpec.Address, gatewaySpec.Target); err != nil { logger.Error( ctx, "error while starting proxy", logger.F("gatewayID", gatewayID), logger.E(errors.WithStack(err)), ) delete(c.proxies, gatewayID) } } } func NewController() *Controller { return &Controller{ proxies: make(map[spec.GatewayID]*ReverseProxy), currentSpecRevision: -1, } } var _ agent.Controller = &Controller{}