125 lines
2.9 KiB
Go
125 lines
2.9 KiB
Go
|
package proxy
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
|
||
|
"forge.cadoles.com/Cadoles/emissary/internal/agent"
|
||
|
"forge.cadoles.com/Cadoles/emissary/internal/spec/proxy"
|
||
|
"github.com/pkg/errors"
|
||
|
"gitlab.com/wpetit/goweb/logger"
|
||
|
)
|
||
|
|
||
|
type Controller struct {
|
||
|
proxies map[proxy.ID]*ReverseProxy
|
||
|
currentSpecRevision int
|
||
|
}
|
||
|
|
||
|
// Name implements node.Controller.
|
||
|
func (c *Controller) Name() string {
|
||
|
return "proxy-controller"
|
||
|
}
|
||
|
|
||
|
// Reconcile implements node.Controller.
|
||
|
func (c *Controller) Reconcile(ctx context.Context, state *agent.State) error {
|
||
|
proxySpec := proxy.NewSpec()
|
||
|
|
||
|
if err := state.GetSpec(proxy.NameProxy, proxySpec); err != nil {
|
||
|
if errors.Is(err, agent.ErrSpecNotFound) {
|
||
|
logger.Info(ctx, "could not find proxy spec, stopping all remaining proxies")
|
||
|
|
||
|
c.stopAllProxies(ctx)
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
return errors.WithStack(err)
|
||
|
}
|
||
|
|
||
|
logger.Info(ctx, "retrieved spec", logger.F("spec", proxySpec.SpecName()), logger.F("revision", proxySpec.SpecRevision()))
|
||
|
|
||
|
if c.currentSpecRevision == proxySpec.SpecRevision() {
|
||
|
logger.Info(ctx, "spec revision did not change, doing nothing")
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
c.updateProxies(ctx, proxySpec)
|
||
|
|
||
|
c.currentSpecRevision = proxySpec.SpecRevision()
|
||
|
logger.Info(ctx, "updating current spec revision", logger.F("revision", c.currentSpecRevision))
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (c *Controller) stopAllProxies(ctx context.Context) {
|
||
|
for proxyID, proxy := range c.proxies {
|
||
|
logger.Info(ctx, "stopping proxy", logger.F("proxyID", proxyID))
|
||
|
|
||
|
if err := proxy.Stop(); err != nil {
|
||
|
logger.Error(
|
||
|
ctx, "error while stopping proxy",
|
||
|
logger.F("proxyID", proxyID),
|
||
|
logger.E(errors.WithStack(err)),
|
||
|
)
|
||
|
|
||
|
delete(c.proxies, proxyID)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (c *Controller) updateProxies(ctx context.Context, spec *proxy.Spec) {
|
||
|
// Stop and remove obsolete proxys
|
||
|
for proxyID, proxy := range c.proxies {
|
||
|
if _, exists := spec.Proxies[proxyID]; exists {
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
logger.Info(ctx, "stopping proxy", logger.F("proxyID", proxyID))
|
||
|
|
||
|
if err := proxy.Stop(); err != nil {
|
||
|
logger.Error(
|
||
|
ctx, "error while stopping proxy",
|
||
|
logger.F("proxyID", proxyID),
|
||
|
logger.E(errors.WithStack(err)),
|
||
|
)
|
||
|
|
||
|
delete(c.proxies, proxyID)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// (Re)start proxys
|
||
|
for proxyID, proxySpec := range spec.Proxies {
|
||
|
proxy, exists := c.proxies[proxyID]
|
||
|
if !exists {
|
||
|
proxy = NewReverseProxy()
|
||
|
c.proxies[proxyID] = proxy
|
||
|
}
|
||
|
|
||
|
logger.Info(
|
||
|
ctx, "starting proxy",
|
||
|
logger.F("proxyID", proxyID),
|
||
|
logger.F("addr", proxySpec.Address),
|
||
|
logger.F("target", proxySpec.Target),
|
||
|
)
|
||
|
|
||
|
if err := proxy.Start(ctx, proxySpec.Address, proxySpec.Target); err != nil {
|
||
|
logger.Error(
|
||
|
ctx, "error while starting proxy",
|
||
|
logger.F("proxyID", proxyID),
|
||
|
logger.E(errors.WithStack(err)),
|
||
|
)
|
||
|
|
||
|
delete(c.proxies, proxyID)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func NewController() *Controller {
|
||
|
return &Controller{
|
||
|
proxies: make(map[proxy.ID]*ReverseProxy),
|
||
|
currentSpecRevision: -1,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
var _ agent.Controller = &Controller{}
|