feat: host-based routing in proxy
This commit is contained in:
parent
8ada7e3b84
commit
cd796fff89
2
go.mod
2
go.mod
@ -3,7 +3,7 @@ module forge.cadoles.com/Cadoles/emissary
|
||||
go 1.19
|
||||
|
||||
require (
|
||||
forge.cadoles.com/arcad/edge v0.0.0-20230320204239-1f4f795d43ff
|
||||
forge.cadoles.com/arcad/edge v0.0.0-20230322170544-cf8a3f8ac077
|
||||
github.com/alecthomas/participle/v2 v2.0.0-beta.5
|
||||
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883
|
||||
github.com/btcsuite/btcd/btcutil v1.1.3
|
||||
|
4
go.sum
4
go.sum
@ -54,8 +54,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
|
||||
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
|
||||
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
|
||||
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
||||
forge.cadoles.com/arcad/edge v0.0.0-20230320204239-1f4f795d43ff h1:dT7cZQWzSYfW3dVDOSe6OACBGBQNvn0wo32BIvDEowE=
|
||||
forge.cadoles.com/arcad/edge v0.0.0-20230320204239-1f4f795d43ff/go.mod h1:ONd6vyQ0IM0vHi1i+bmZBRc1Fd0BoXMuDdY/+0sZefw=
|
||||
forge.cadoles.com/arcad/edge v0.0.0-20230322170544-cf8a3f8ac077 h1:vsYcNHZevZrs0VeOTasvJoqvPynb8OvH+MMpIUvNT6Q=
|
||||
forge.cadoles.com/arcad/edge v0.0.0-20230322170544-cf8a3f8ac077/go.mod h1:ONd6vyQ0IM0vHi1i+bmZBRc1Fd0BoXMuDdY/+0sZefw=
|
||||
gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8=
|
||||
github.com/AdaLogics/go-fuzz-headers v0.0.0-20210715213245-6c3934b029d8/go.mod h1:CzsSbkDixRphAF5hS6wbMKq0eI6ccJRb7/A0M6JBnwg=
|
||||
github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k=
|
||||
|
@ -173,7 +173,15 @@ func (s *Server) getAppModules(bus bus.Bus, ds storage.DocumentStore, bs storage
|
||||
auth.WithJWT(s.getJWTKeySet),
|
||||
),
|
||||
func(o *goja.Object) {
|
||||
if err := o.Set("CLAIM_ROLE", "role"); err != nil {
|
||||
if err := o.Set("CLAIM_TENANT", "arcad_tenant"); err != nil {
|
||||
panic(errors.New("could not set 'CLAIM_TENANT' property"))
|
||||
}
|
||||
|
||||
if err := o.Set("CLAIM_ENTRYPOINT", "arcad_entrypoint"); err != nil {
|
||||
panic(errors.New("could not set 'CLAIM_ENTRYPOINT' property"))
|
||||
}
|
||||
|
||||
if err := o.Set("CLAIM_ROLE", "arcad_role"); err != nil {
|
||||
panic(errors.New("could not set 'CLAIM_ROLE' property"))
|
||||
}
|
||||
|
||||
|
@ -2,16 +2,23 @@ package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/url"
|
||||
|
||||
"forge.cadoles.com/Cadoles/emissary/internal/agent"
|
||||
"forge.cadoles.com/Cadoles/emissary/internal/spec/proxy"
|
||||
edgeProxy "forge.cadoles.com/arcad/edge/pkg/proxy"
|
||||
"github.com/mitchellh/hashstructure/v2"
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
type proxyEntry struct {
|
||||
SpecHash uint64
|
||||
Proxy *ReverseProxy
|
||||
}
|
||||
|
||||
type Controller struct {
|
||||
proxies map[proxy.ID]*ReverseProxy
|
||||
currentSpecRevision int
|
||||
proxies map[proxy.ID]*proxyEntry
|
||||
}
|
||||
|
||||
// Name implements node.Controller.
|
||||
@ -25,7 +32,7 @@ func (c *Controller) Reconcile(ctx context.Context, state *agent.State) error {
|
||||
|
||||
if err := state.GetSpec(proxy.NameProxy, proxySpec); err != nil {
|
||||
if errors.Is(err, agent.ErrSpecNotFound) {
|
||||
logger.Info(ctx, "could not find proxy spec, stopping all remaining proxies")
|
||||
logger.Info(ctx, "could not find proxy spec")
|
||||
|
||||
c.stopAllProxies(ctx)
|
||||
|
||||
@ -37,25 +44,20 @@ func (c *Controller) Reconcile(ctx context.Context, state *agent.State) error {
|
||||
|
||||
logger.Info(ctx, "retrieved spec", logger.F("spec", proxySpec.SpecName()), logger.F("revision", proxySpec.SpecRevision()))
|
||||
|
||||
if c.currentSpecRevision == proxySpec.SpecRevision() {
|
||||
logger.Info(ctx, "spec revision did not change, doing nothing")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
c.updateProxies(ctx, proxySpec)
|
||||
|
||||
c.currentSpecRevision = proxySpec.SpecRevision()
|
||||
logger.Info(ctx, "updating current spec revision", logger.F("revision", c.currentSpecRevision))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) stopAllProxies(ctx context.Context) {
|
||||
for proxyID, proxy := range c.proxies {
|
||||
if len(c.proxies) > 0 {
|
||||
logger.Info(ctx, "stopping all proxies")
|
||||
}
|
||||
|
||||
for proxyID, entry := range c.proxies {
|
||||
logger.Info(ctx, "stopping proxy", logger.F("proxyID", proxyID))
|
||||
|
||||
if err := proxy.Stop(); err != nil {
|
||||
if err := entry.Proxy.Stop(); err != nil {
|
||||
logger.Error(
|
||||
ctx, "error while stopping proxy",
|
||||
logger.F("proxyID", proxyID),
|
||||
@ -69,14 +71,14 @@ func (c *Controller) stopAllProxies(ctx context.Context) {
|
||||
|
||||
func (c *Controller) updateProxies(ctx context.Context, spec *proxy.Spec) {
|
||||
// Stop and remove obsolete proxys
|
||||
for proxyID, proxy := range c.proxies {
|
||||
for proxyID, entry := range c.proxies {
|
||||
if _, exists := spec.Proxies[proxyID]; exists {
|
||||
continue
|
||||
}
|
||||
|
||||
logger.Info(ctx, "stopping proxy", logger.F("proxyID", proxyID))
|
||||
|
||||
if err := proxy.Stop(); err != nil {
|
||||
if err := entry.Proxy.Stop(); err != nil {
|
||||
logger.Error(
|
||||
ctx, "error while stopping proxy",
|
||||
logger.F("proxyID", proxyID),
|
||||
@ -89,35 +91,89 @@ func (c *Controller) updateProxies(ctx context.Context, spec *proxy.Spec) {
|
||||
|
||||
// (Re)start proxys
|
||||
for proxyID, proxySpec := range spec.Proxies {
|
||||
proxy, exists := c.proxies[proxyID]
|
||||
if !exists {
|
||||
proxy = NewReverseProxy()
|
||||
c.proxies[proxyID] = proxy
|
||||
}
|
||||
proxyCtx := logger.With(ctx, logger.F("proxyID", proxyID))
|
||||
|
||||
logger.Info(
|
||||
ctx, "starting proxy",
|
||||
logger.F("proxyID", proxyID),
|
||||
logger.F("addr", proxySpec.Address),
|
||||
logger.F("target", proxySpec.Target),
|
||||
)
|
||||
|
||||
if err := proxy.Start(ctx, proxySpec.Address, proxySpec.Target); err != nil {
|
||||
logger.Error(
|
||||
ctx, "error while starting proxy",
|
||||
logger.F("proxyID", proxyID),
|
||||
logger.E(errors.WithStack(err)),
|
||||
)
|
||||
|
||||
delete(c.proxies, proxyID)
|
||||
if err := c.updateProxy(ctx, proxyID, proxySpec); err != nil {
|
||||
logger.Error(proxyCtx, "could not update proxy", logger.E(errors.WithStack(err)))
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) updateProxy(ctx context.Context, proxyID proxy.ID, proxySpec proxy.ProxyEntry) (err error) {
|
||||
newProxySpecHash, err := hashstructure.Hash(proxySpec, hashstructure.FormatV2, nil)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
var entry *proxyEntry
|
||||
|
||||
entry, exists := c.proxies[proxyID]
|
||||
if !exists {
|
||||
logger.Info(ctx, "proxy currently not running")
|
||||
}
|
||||
|
||||
if entry == nil {
|
||||
entry = &proxyEntry{
|
||||
Proxy: NewReverseProxy(),
|
||||
SpecHash: 0,
|
||||
}
|
||||
|
||||
c.proxies[proxyID] = entry
|
||||
}
|
||||
|
||||
specChanged := newProxySpecHash != entry.SpecHash
|
||||
|
||||
if entry.Proxy.Running() && !specChanged {
|
||||
return nil
|
||||
}
|
||||
|
||||
if specChanged && entry.SpecHash != 0 {
|
||||
logger.Info(
|
||||
ctx, "restarting proxy",
|
||||
logger.F("address", proxySpec.Address),
|
||||
)
|
||||
} else {
|
||||
logger.Info(
|
||||
ctx, "starting proxy",
|
||||
logger.F("address", proxySpec.Address),
|
||||
)
|
||||
}
|
||||
|
||||
options := make([]edgeProxy.OptionFunc, 0)
|
||||
allowedHosts := make([]string, len(proxySpec.Mappings))
|
||||
mappings := make(map[string]*url.URL, len(proxySpec.Mappings))
|
||||
|
||||
for _, m := range proxySpec.Mappings {
|
||||
target, err := url.Parse(m.Target)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
mappings[m.HostPattern] = target
|
||||
allowedHosts = append(allowedHosts, m.HostPattern)
|
||||
}
|
||||
|
||||
options = append(
|
||||
options,
|
||||
edgeProxy.WithAllowedHosts(allowedHosts...),
|
||||
edgeProxy.WithRewriteHosts(mappings),
|
||||
)
|
||||
|
||||
if err := entry.Proxy.Start(ctx, proxySpec.Address, options...); err != nil {
|
||||
delete(c.proxies, proxyID)
|
||||
|
||||
return errors.Wrap(err, "could not start app")
|
||||
}
|
||||
|
||||
entry.SpecHash = newProxySpecHash
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewController() *Controller {
|
||||
return &Controller{
|
||||
proxies: make(map[proxy.ID]*ReverseProxy),
|
||||
currentSpecRevision: -1,
|
||||
proxies: make(map[proxy.ID]*proxyEntry),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3,26 +3,21 @@ package proxy
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
"net/url"
|
||||
"sync"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
|
||||
"forge.cadoles.com/arcad/edge/pkg/proxy"
|
||||
)
|
||||
|
||||
type ReverseProxy struct {
|
||||
addr string
|
||||
target string
|
||||
server *http.Server
|
||||
mutex sync.RWMutex
|
||||
}
|
||||
|
||||
func (p *ReverseProxy) Start(ctx context.Context, addr, target string) error {
|
||||
alreadyRunning := p.server != nil && target == p.target && addr == p.target
|
||||
|
||||
if alreadyRunning {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *ReverseProxy) Start(ctx context.Context, addr string, funcs ...proxy.OptionFunc) error {
|
||||
if p.server != nil {
|
||||
if err := p.Stop(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
@ -33,33 +28,40 @@ func (p *ReverseProxy) Start(ctx context.Context, addr, target string) error {
|
||||
Addr: addr,
|
||||
}
|
||||
|
||||
url, err := url.Parse(target)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
proxy := httputil.NewSingleHostReverseProxy(url)
|
||||
|
||||
proxy := proxy.New(funcs...)
|
||||
server.Handler = proxy
|
||||
|
||||
p.mutex.Lock()
|
||||
p.server = server
|
||||
p.addr = addr
|
||||
p.target = target
|
||||
p.mutex.Unlock()
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
if err := p.Stop(); err != nil {
|
||||
logger.Error(ctx, "error while stopping gateway", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
logger.Error(ctx, "error while listening", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
|
||||
if err := p.Stop(); err != nil {
|
||||
logger.Error(ctx, "error while stopping gateway", logger.E(errors.WithStack(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *ReverseProxy) Running() bool {
|
||||
p.mutex.RLock()
|
||||
defer p.mutex.RUnlock()
|
||||
|
||||
return p.server != nil
|
||||
}
|
||||
|
||||
func (p *ReverseProxy) Stop() error {
|
||||
p.mutex.Lock()
|
||||
defer p.mutex.Unlock()
|
||||
|
||||
if p.server == nil {
|
||||
return nil
|
||||
}
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"$schema": "https://json-schema.org/draft/2020-12/schema",
|
||||
"$id": "https://app.emissary.cadoles.com/spec.json",
|
||||
"$id": "https://app.edge.emissary.cadoles.com/spec.json",
|
||||
"title": "AppSpec",
|
||||
"description": "Emissary 'App' specification",
|
||||
"type": "object",
|
||||
|
@ -14,11 +14,23 @@
|
||||
"address": {
|
||||
"type": "string"
|
||||
},
|
||||
"target": {
|
||||
"type": "string"
|
||||
"mappings": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"hostPattern": {
|
||||
"type": "string"
|
||||
},
|
||||
"target": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": ["hostPattern", "target"]
|
||||
}
|
||||
}
|
||||
},
|
||||
"required": ["address", "target"],
|
||||
"required": ["address", "mappings"],
|
||||
"additionalProperties": false
|
||||
}
|
||||
}
|
||||
|
@ -12,8 +12,13 @@ type Spec struct {
|
||||
}
|
||||
|
||||
type ProxyEntry struct {
|
||||
Address string `json:"address"`
|
||||
Target string `json:"target"`
|
||||
Address string `json:"address"`
|
||||
Mappings []ProxyMapping `json:"mappings"`
|
||||
}
|
||||
|
||||
type ProxyMapping struct {
|
||||
HostPattern string `json:"hostPattern"`
|
||||
Target string `json:"target"`
|
||||
}
|
||||
|
||||
func (s *Spec) SpecName() spec.Name {
|
||||
|
@ -4,7 +4,12 @@
|
||||
"proxies": {
|
||||
"cadoles.com": {
|
||||
"address": ":3003",
|
||||
"target": "https://www.cadoles.com",
|
||||
"mappings": [
|
||||
{
|
||||
"hostPattern": "localhost:3003",
|
||||
"target": "https://www.cadoles.com"
|
||||
}
|
||||
],
|
||||
"foo": "bar"
|
||||
}
|
||||
}
|
||||
|
7
internal/spec/proxy/testdata/spec-ok.json
vendored
7
internal/spec/proxy/testdata/spec-ok.json
vendored
@ -4,7 +4,12 @@
|
||||
"proxies": {
|
||||
"cadoles.com": {
|
||||
"address": ":3003",
|
||||
"target": "https://www.cadoles.com"
|
||||
"mappings": [
|
||||
{
|
||||
"hostPattern": "localhost:3003",
|
||||
"target": "https://www.cadoles.com"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
},
|
||||
|
Loading…
Reference in New Issue
Block a user