emissary/internal/agent/controller/proxy/controller.go

181 lines
4.0 KiB
Go
Raw Permalink Normal View History

2023-03-21 13:28:41 +01:00
package proxy
import (
"context"
2023-03-22 18:15:22 +01:00
"net/url"
2023-03-21 13:28:41 +01:00
"forge.cadoles.com/Cadoles/emissary/internal/agent"
"forge.cadoles.com/Cadoles/emissary/internal/spec/proxy"
2023-03-22 18:15:22 +01:00
edgeProxy "forge.cadoles.com/arcad/edge/pkg/proxy"
"github.com/mitchellh/hashstructure/v2"
2023-03-21 13:28:41 +01:00
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
2023-03-22 18:15:22 +01:00
type proxyEntry struct {
SpecHash uint64
Proxy *ReverseProxy
}
2023-03-21 13:28:41 +01:00
type Controller struct {
2023-03-22 18:15:22 +01:00
proxies map[proxy.ID]*proxyEntry
2023-03-21 13:28:41 +01:00
}
// Name implements node.Controller.
func (c *Controller) Name() string {
return "proxy-controller"
}
// Reconcile implements node.Controller.
func (c *Controller) Reconcile(ctx context.Context, state *agent.State) error {
proxySpec := proxy.NewSpec()
if err := state.GetSpec(proxy.NameProxy, proxySpec); err != nil {
if errors.Is(err, agent.ErrSpecNotFound) {
2023-03-22 18:15:22 +01:00
logger.Info(ctx, "could not find proxy spec")
2023-03-21 13:28:41 +01:00
c.stopAllProxies(ctx)
return nil
}
return errors.WithStack(err)
}
logger.Info(ctx, "retrieved spec", logger.F("spec", proxySpec.SpecName()), logger.F("revision", proxySpec.SpecRevision()))
c.updateProxies(ctx, proxySpec)
return nil
}
func (c *Controller) stopAllProxies(ctx context.Context) {
2023-03-22 18:15:22 +01:00
if len(c.proxies) > 0 {
logger.Info(ctx, "stopping all proxies")
}
for proxyID, entry := range c.proxies {
2023-03-21 13:28:41 +01:00
logger.Info(ctx, "stopping proxy", logger.F("proxyID", proxyID))
2023-03-22 18:15:22 +01:00
if err := entry.Proxy.Stop(); err != nil {
2023-03-21 13:28:41 +01:00
logger.Error(
ctx, "error while stopping proxy",
logger.F("proxyID", proxyID),
logger.E(errors.WithStack(err)),
)
delete(c.proxies, proxyID)
}
}
}
func (c *Controller) updateProxies(ctx context.Context, spec *proxy.Spec) {
// Stop and remove obsolete proxys
2023-03-22 18:15:22 +01:00
for proxyID, entry := range c.proxies {
2023-03-21 13:28:41 +01:00
if _, exists := spec.Proxies[proxyID]; exists {
continue
}
logger.Info(ctx, "stopping proxy", logger.F("proxyID", proxyID))
2023-03-22 18:15:22 +01:00
if err := entry.Proxy.Stop(); err != nil {
2023-03-21 13:28:41 +01:00
logger.Error(
ctx, "error while stopping proxy",
logger.F("proxyID", proxyID),
logger.E(errors.WithStack(err)),
)
delete(c.proxies, proxyID)
}
}
// (Re)start proxys
for proxyID, proxySpec := range spec.Proxies {
2023-03-22 18:15:22 +01:00
proxyCtx := logger.With(ctx, logger.F("proxyID", proxyID))
if err := c.updateProxy(ctx, proxyID, proxySpec); err != nil {
logger.Error(proxyCtx, "could not update proxy", logger.E(errors.WithStack(err)))
continue
2023-03-21 13:28:41 +01:00
}
2023-03-22 18:15:22 +01:00
}
}
func (c *Controller) updateProxy(ctx context.Context, proxyID proxy.ID, proxySpec proxy.ProxyEntry) (err error) {
newProxySpecHash, err := hashstructure.Hash(proxySpec, hashstructure.FormatV2, nil)
if err != nil {
return errors.WithStack(err)
}
var entry *proxyEntry
2023-03-21 13:28:41 +01:00
2023-03-22 18:15:22 +01:00
entry, exists := c.proxies[proxyID]
if !exists {
logger.Info(ctx, "proxy currently not running")
}
if entry == nil {
entry = &proxyEntry{
Proxy: NewReverseProxy(),
SpecHash: 0,
}
c.proxies[proxyID] = entry
}
specChanged := newProxySpecHash != entry.SpecHash
if entry.Proxy.Running() && !specChanged {
return nil
}
if specChanged && entry.SpecHash != 0 {
logger.Info(
ctx, "restarting proxy",
logger.F("address", proxySpec.Address),
)
} else {
2023-03-21 13:28:41 +01:00
logger.Info(
ctx, "starting proxy",
2023-03-22 18:15:22 +01:00
logger.F("address", proxySpec.Address),
2023-03-21 13:28:41 +01:00
)
2023-03-22 18:15:22 +01:00
}
2023-03-21 13:28:41 +01:00
2023-03-22 18:15:22 +01:00
options := make([]edgeProxy.OptionFunc, 0)
allowedHosts := make([]string, len(proxySpec.Mappings))
mappings := make(map[string]*url.URL, len(proxySpec.Mappings))
2023-03-21 13:28:41 +01:00
2023-03-22 18:15:22 +01:00
for _, m := range proxySpec.Mappings {
target, err := url.Parse(m.Target)
if err != nil {
return errors.WithStack(err)
2023-03-21 13:28:41 +01:00
}
2023-03-22 18:15:22 +01:00
mappings[m.HostPattern] = target
allowedHosts = append(allowedHosts, m.HostPattern)
}
options = append(
options,
edgeProxy.WithAllowedHosts(allowedHosts...),
edgeProxy.WithRewriteHosts(mappings),
)
if err := entry.Proxy.Start(ctx, proxySpec.Address, options...); err != nil {
delete(c.proxies, proxyID)
return errors.Wrap(err, "could not start app")
2023-03-21 13:28:41 +01:00
}
2023-03-22 18:15:22 +01:00
entry.SpecHash = newProxySpecHash
return nil
2023-03-21 13:28:41 +01:00
}
func NewController() *Controller {
return &Controller{
2023-03-22 18:15:22 +01:00
proxies: make(map[proxy.ID]*proxyEntry),
2023-03-21 13:28:41 +01:00
}
}
var _ agent.Controller = &Controller{}