emissary/internal/agent/controller/gateway/controller.go

125 lines
3.0 KiB
Go

package gateway
import (
"context"
"forge.cadoles.com/Cadoles/emissary/internal/agent"
"forge.cadoles.com/Cadoles/emissary/internal/spec/gateway"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
type Controller struct {
proxies map[gateway.ID]*ReverseProxy
currentSpecRevision int
}
// Name implements node.Controller.
func (c *Controller) Name() string {
return "gateway-controller"
}
// Reconcile implements node.Controller.
func (c *Controller) Reconcile(ctx context.Context, state *agent.State) error {
gatewaySpec := gateway.NewSpec()
if err := state.GetSpec(gateway.NameGateway, gatewaySpec); err != nil {
if errors.Is(err, agent.ErrSpecNotFound) {
logger.Info(ctx, "could not find gateway spec, stopping all remaining proxies")
c.stopAllProxies(ctx)
return nil
}
return errors.WithStack(err)
}
logger.Info(ctx, "retrieved spec", logger.F("spec", gatewaySpec.SpecName()), logger.F("revision", gatewaySpec.SpecRevision()))
if c.currentSpecRevision == gatewaySpec.SpecRevision() {
logger.Info(ctx, "spec revision did not change, doing nothing")
return nil
}
c.updateProxies(ctx, gatewaySpec)
c.currentSpecRevision = gatewaySpec.SpecRevision()
logger.Info(ctx, "updating current spec revision", logger.F("revision", c.currentSpecRevision))
return nil
}
func (c *Controller) stopAllProxies(ctx context.Context) {
for gatewayID, proxy := range c.proxies {
logger.Info(ctx, "stopping proxy", logger.F("gatewayID", gatewayID))
if err := proxy.Stop(); err != nil {
logger.Error(
ctx, "error while stopping proxy",
logger.F("gatewayID", gatewayID),
logger.E(errors.WithStack(err)),
)
delete(c.proxies, gatewayID)
}
}
}
func (c *Controller) updateProxies(ctx context.Context, spec *gateway.Spec) {
// Stop and remove obsolete gateways
for gatewayID, proxy := range c.proxies {
if _, exists := spec.Gateways[gatewayID]; exists {
continue
}
logger.Info(ctx, "stopping proxy", logger.F("gatewayID", gatewayID))
if err := proxy.Stop(); err != nil {
logger.Error(
ctx, "error while stopping proxy",
logger.F("gatewayID", gatewayID),
logger.E(errors.WithStack(err)),
)
delete(c.proxies, gatewayID)
}
}
// (Re)start gateways
for gatewayID, gatewaySpec := range spec.Gateways {
proxy, exists := c.proxies[gatewayID]
if !exists {
proxy = NewReverseProxy()
c.proxies[gatewayID] = proxy
}
logger.Info(
ctx, "starting proxy",
logger.F("gatewayID", gatewayID),
logger.F("addr", gatewaySpec.Address),
logger.F("target", gatewaySpec.Target),
)
if err := proxy.Start(ctx, gatewaySpec.Address, gatewaySpec.Target); err != nil {
logger.Error(
ctx, "error while starting proxy",
logger.F("gatewayID", gatewayID),
logger.E(errors.WithStack(err)),
)
delete(c.proxies, gatewayID)
}
}
}
func NewController() *Controller {
return &Controller{
proxies: make(map[gateway.ID]*ReverseProxy),
currentSpecRevision: -1,
}
}
var _ agent.Controller = &Controller{}