181 lines
4.0 KiB
Go
181 lines
4.0 KiB
Go
package proxy
|
|
|
|
import (
|
|
"context"
|
|
"net/url"
|
|
|
|
"forge.cadoles.com/Cadoles/emissary/internal/agent"
|
|
"forge.cadoles.com/Cadoles/emissary/internal/proxy"
|
|
spec "forge.cadoles.com/Cadoles/emissary/internal/spec/proxy"
|
|
"github.com/mitchellh/hashstructure/v2"
|
|
"github.com/pkg/errors"
|
|
"gitlab.com/wpetit/goweb/logger"
|
|
)
|
|
|
|
type proxyEntry struct {
|
|
SpecHash uint64
|
|
Proxy *ReverseProxy
|
|
}
|
|
|
|
type Controller struct {
|
|
proxies map[spec.ID]*proxyEntry
|
|
}
|
|
|
|
// Name implements node.Controller.
|
|
func (c *Controller) Name() string {
|
|
return "proxy-controller"
|
|
}
|
|
|
|
// Reconcile implements node.Controller.
|
|
func (c *Controller) Reconcile(ctx context.Context, state *agent.State) error {
|
|
proxySpec := spec.NewSpec()
|
|
|
|
if err := state.GetSpec(spec.NameProxy, proxySpec); err != nil {
|
|
if errors.Is(err, agent.ErrSpecNotFound) {
|
|
logger.Info(ctx, "could not find proxy spec")
|
|
|
|
c.stopAllProxies(ctx)
|
|
|
|
return nil
|
|
}
|
|
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
logger.Info(ctx, "retrieved spec", logger.F("spec", proxySpec.SpecName()), logger.F("revision", proxySpec.SpecRevision()))
|
|
|
|
c.updateProxies(ctx, proxySpec)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Controller) stopAllProxies(ctx context.Context) {
|
|
if len(c.proxies) > 0 {
|
|
logger.Info(ctx, "stopping all proxies")
|
|
}
|
|
|
|
for proxyID, entry := range c.proxies {
|
|
logger.Info(ctx, "stopping proxy", logger.F("proxyID", proxyID))
|
|
|
|
if err := entry.Proxy.Stop(); err != nil {
|
|
logger.Error(
|
|
ctx, "error while stopping proxy",
|
|
logger.F("proxyID", proxyID),
|
|
logger.E(errors.WithStack(err)),
|
|
)
|
|
|
|
delete(c.proxies, proxyID)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Controller) updateProxies(ctx context.Context, spec *spec.Spec) {
|
|
// Stop and remove obsolete proxys
|
|
for proxyID, entry := range c.proxies {
|
|
if _, exists := spec.Proxies[proxyID]; exists {
|
|
continue
|
|
}
|
|
|
|
logger.Info(ctx, "stopping proxy", logger.F("proxyID", proxyID))
|
|
|
|
if err := entry.Proxy.Stop(); err != nil {
|
|
logger.Error(
|
|
ctx, "error while stopping proxy",
|
|
logger.F("proxyID", proxyID),
|
|
logger.E(errors.WithStack(err)),
|
|
)
|
|
|
|
delete(c.proxies, proxyID)
|
|
}
|
|
}
|
|
|
|
// (Re)start proxys
|
|
for proxyID, proxySpec := range spec.Proxies {
|
|
proxyCtx := logger.With(ctx, logger.F("proxyID", proxyID))
|
|
|
|
if err := c.updateProxy(ctx, proxyID, proxySpec); err != nil {
|
|
logger.Error(proxyCtx, "could not update proxy", logger.E(errors.WithStack(err)))
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Controller) updateProxy(ctx context.Context, proxyID spec.ID, proxySpec spec.ProxyEntry) (err error) {
|
|
newProxySpecHash, err := hashstructure.Hash(proxySpec, hashstructure.FormatV2, nil)
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
var entry *proxyEntry
|
|
|
|
entry, exists := c.proxies[proxyID]
|
|
if !exists {
|
|
logger.Info(ctx, "proxy currently not running")
|
|
}
|
|
|
|
if entry == nil {
|
|
entry = &proxyEntry{
|
|
Proxy: NewReverseProxy(),
|
|
SpecHash: 0,
|
|
}
|
|
|
|
c.proxies[proxyID] = entry
|
|
}
|
|
|
|
specChanged := newProxySpecHash != entry.SpecHash
|
|
|
|
if entry.Proxy.Running() && !specChanged {
|
|
return nil
|
|
}
|
|
|
|
if specChanged && entry.SpecHash != 0 {
|
|
logger.Info(
|
|
ctx, "restarting proxy",
|
|
logger.F("address", proxySpec.Address),
|
|
)
|
|
} else {
|
|
logger.Info(
|
|
ctx, "starting proxy",
|
|
logger.F("address", proxySpec.Address),
|
|
)
|
|
}
|
|
|
|
options := make([]proxy.OptionFunc, 0)
|
|
allowedHosts := make([]string, len(proxySpec.Mappings))
|
|
mappings := make(map[string]*url.URL, len(proxySpec.Mappings))
|
|
|
|
for _, m := range proxySpec.Mappings {
|
|
target, err := url.Parse(m.Target)
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
mappings[m.HostPattern] = target
|
|
allowedHosts = append(allowedHosts, m.HostPattern)
|
|
}
|
|
|
|
options = append(
|
|
options,
|
|
proxy.WithAllowedHosts(allowedHosts...),
|
|
proxy.WithRewriteHosts(mappings),
|
|
)
|
|
|
|
if err := entry.Proxy.Start(ctx, proxySpec.Address, options...); err != nil {
|
|
delete(c.proxies, proxyID)
|
|
|
|
return errors.Wrap(err, "could not start app")
|
|
}
|
|
|
|
entry.SpecHash = newProxySpecHash
|
|
|
|
return nil
|
|
}
|
|
|
|
func NewController() *Controller {
|
|
return &Controller{
|
|
proxies: make(map[spec.ID]*proxyEntry),
|
|
}
|
|
}
|
|
|
|
var _ agent.Controller = &Controller{}
|