Compare commits

...

3 Commits

Author SHA1 Message Date
f35384c0f3 feat: create profiling package + rewrite profiling tutorial
Some checks reported warnings
Cadoles/bouncer/pipeline/head This commit was not built
2024-06-28 17:44:51 +02:00
c73fe8cca5 feat(rewriter): pass structured url to ease request rewriting
All checks were successful
Cadoles/bouncer/pipeline/head This commit looks good
2024-06-28 10:46:38 +02:00
3c1939f418 feat: add revision number to proxy and layers to identify changes
All checks were successful
Cadoles/bouncer/pipeline/head This commit looks good
2024-06-27 17:03:50 +02:00
28 changed files with 799 additions and 255 deletions

2
.gitignore vendored
View File

@ -10,4 +10,4 @@
/out
.dockerconfigjson
*.prof
proxy.test
*.test

View File

@ -131,7 +131,7 @@ tools/grafterm/bin/grafterm:
GOBIN=$(PWD)/tools/grafterm/bin go install github.com/slok/grafterm/cmd/grafterm@v0.2.0
bench:
go test -bench=. -run '^$$' -count=10 ./...
go test -bench=. -run '^$$' ./internal/bench
tools/benchstat/bin/benchstat:
mkdir -p tools/benchstat/bin

View File

@ -66,7 +66,21 @@ La requête en cours de traitement.
{
method: "string", // Méthode HTTP
host: "string", // Nom d'hôte (`Host`) associé à la requête
url: "string", // URL associée à la requête
url: { // URL associée à la requête sous sa forme structurée
"scheme": "string", // Schéma HTTP de l'URL
"opaque": "string", // Données opaque de l'URL
"user": { // Identifiants d'URL (Basic Auth)
"username": "",
"password": ""
},
"host": "string", // Nom d'hôte (<domaine>:<port>) de l'URL
"path": "string", // Chemin de l'URL (format assaini)
"rawPath": "string", // Chemin de l'URL (format brut)
"rawQuery": "string", // Variables d'URL (format brut)
"fragment" : "string", // Fragment d'URL (format assaini)
"rawFragment" : "string" // Fragment d'URL (format brut)
},
rawUrl: "string", // URL associée à la requête (format assaini)
proto: "string", // Numéro de version du protocole utilisé
protoMajor: "int", // Numéro de version majeure du protocole utilisé
protoMinor: "int", // Numéro de version mineur du protocole utilisé

View File

@ -1,31 +1,54 @@
# Étudier les performances de Bouncer
1. Lancer un benchmark du proxy
Le package `./internal` est dédié à l'étude des performances de Bouncer. Il contient une suite de benchmarks simulant de proxies avec différentes configurations de layers afin d'évaluer les points d'engorgement sur le traitement des requêtes.
```shell
go test -bench=. -run '^$' -count=5 -cpuprofile bench_proxy.prof ./internal/proxy
```
Voir le répertoire `./internal/bench/testdata/proxies` pour voir les différentes configurations de cas.
2. Visualiser les temps d'exécution
## Lancer les benchmarks
```shell
go tool pprof -web bench_proxy.prof
```
Le plus simple est d'utiliser la commande `make bench` qui exécutera séquentiellement tous les benchmarks. Il est également possible de lancer un benchmark spécifique via la commande suivante:
3. Comparer les performances d'une exécution à l'autre
```bash
go test -bench="BenchmarkProxies/$BENCH_CASE" -run='^$' ./internal/bench
```
```shell
# Lancer un premier benchmark
go test -bench=. -run '^$' -count=10 ./internal/proxy > bench_before.txt
Par exemple:
# Faire des modifications sur les sources
```bash
# Pour exécuter ./internal/bench/testdata/proxies/basic-auth.yml
go test -bench='BenchmarkProxies/basic-auth' -run='^$' ./internal/bench
```
# Lancer un second benchmark
go test -bench=. -run '^$' -count=10 ./internal/proxy > bench_after.txt
## Visualiser les profils d'exécution
# Installer l'outil benchstat
make tools/benchstat/bin/benchstat
Vous pouvez visualiser les profils d'exécution via la commande suivante:
# Comparer les rapports
tools/benchstat/bin/benchstat bench_before.txt bench_after.txt
```
```shell
go tool pprof -web path/to/file.prof
```
Par défaut l'exécution des benchmarks créera automatiquement des fichiers de profil dans le répertoire `./internal/bench/testdata/proxies`.
Par exemple:
```shell
go tool pprof -web ./internal/bench/testdata/proxies/basic-auth.prof
```
## Comparer les évolutions
```bash
# Lancer un premier benchmark
go test -bench="BenchmarkProxies/$BENCH_CASE" -run='^$' ./internal/bench
# Faire une sauvegarde du fichier de profil
cp ./internal/bench/testdata/proxies/$BENCH_CASE.prof ./internal/bench/testdata/proxies/$BENCH_CASE-prev.prof
# Faire des modifications sur les sources
# Lancer un second benchmark
go test -bench="BenchmarkProxies/$BENCH_CASE" -run='^$' ./internal/bench
# Visualiser la différence entre les deux profils
go tool pprof -web -base=./internal/bench/testdata/proxies/$BENCH_CASE-prev.prof ./internal/bench/testdata/proxies/$BENCH_CASE.prof
```

View File

@ -0,0 +1,300 @@
package proxy_test
import (
"context"
"io"
"log"
"net/http"
"net/http/httptest"
"net/http/httputil"
"net/url"
"os"
"path/filepath"
"runtime/pprof"
"strings"
"testing"
"time"
"forge.cadoles.com/Cadoles/go-proxy"
"forge.cadoles.com/cadoles/bouncer/internal/cache/memory"
"forge.cadoles.com/cadoles/bouncer/internal/cache/ttl"
"forge.cadoles.com/cadoles/bouncer/internal/config"
"forge.cadoles.com/cadoles/bouncer/internal/proxy/director"
"forge.cadoles.com/cadoles/bouncer/internal/store"
redisStore "forge.cadoles.com/cadoles/bouncer/internal/store/redis"
"github.com/pkg/errors"
"github.com/redis/go-redis/v9"
"gopkg.in/yaml.v3"
"forge.cadoles.com/cadoles/bouncer/internal/setup"
)
func BenchmarkProxies(b *testing.B) {
proxyFiles, err := filepath.Glob("testdata/proxies/*.yml")
if err != nil {
b.Fatalf("%+v", errors.WithStack(err))
}
for _, f := range proxyFiles {
name := strings.TrimSuffix(filepath.Base(f), filepath.Ext(f))
b.Run(name, func(b *testing.B) {
conf, err := loadProxyBenchConfig(f)
if err != nil {
b.Fatalf("%+v", errors.Wrapf(err, "could notre load bench config"))
}
proxy, backend, err := createProxy(name, conf, b.Logf)
if err != nil {
b.Fatalf("%+v", errors.Wrapf(err, "could not create proxy"))
}
defer proxy.Close()
if backend != nil {
defer backend.Close()
}
client := proxy.Client()
proxyURL, err := url.Parse(proxy.URL)
if err != nil {
b.Fatalf("%+v", errors.Wrapf(err, "could not parse proxy url"))
}
if conf.Fetch.URL.Path != "" {
proxyURL.Path = conf.Fetch.URL.Path
}
if conf.Fetch.URL.RawQuery != "" {
proxyURL.RawQuery = conf.Fetch.URL.RawQuery
}
if conf.Fetch.URL.User.Username != "" || conf.Fetch.URL.User.Password != "" {
proxyURL.User = url.UserPassword(conf.Fetch.URL.User.Username, conf.Fetch.URL.User.Password)
}
rawProxyURL := proxyURL.String()
b.Logf("fetching url '%s'", rawProxyURL)
profile, err := os.Create(filepath.Join("testdata", "proxies", name+".prof"))
if err != nil {
b.Fatalf("%+v", errors.Wrapf(err, "could not create cpu profile"))
}
defer profile.Close()
if err := pprof.StartCPUProfile(profile); err != nil {
log.Fatal(err)
}
defer pprof.StopCPUProfile()
b.ResetTimer()
for i := 0; i < b.N; i++ {
res, err := client.Get(rawProxyURL)
if err != nil {
b.Errorf("could not fetch proxy url: %+v", errors.WithStack(err))
}
body, err := io.ReadAll(res.Body)
if err != nil {
b.Errorf("could not read response body: %+v", errors.WithStack(err))
}
b.Logf("%s \n %v", res.Status, string(body))
if err := res.Body.Close(); err != nil {
b.Errorf("could not close response body: %+v", errors.WithStack(err))
}
}
})
}
}
type proxyBenchConfig struct {
Proxy config.BootstrapProxyConfig `yaml:"proxy"`
Fetch fetchBenchConfig `yaml:"fetch"`
}
type fetchBenchConfig struct {
URL fetchURLBenchConfig `yaml:"url"`
}
type fetchURLBenchConfig struct {
Path string `yaml:"path"`
RawQuery string `yaml:"rawQuery"`
User fetchURLUserBenchConfig `yaml:"user"`
}
type fetchURLUserBenchConfig struct {
Username string `yaml:"username"`
Password string `yaml:"password"`
}
func loadProxyBenchConfig(filename string) (*proxyBenchConfig, error) {
data, err := os.ReadFile(filename)
if err != nil {
return nil, errors.Wrapf(err, "could not read file '%s'", filename)
}
conf := proxyBenchConfig{}
if err := yaml.Unmarshal(data, &conf); err != nil {
return nil, errors.Wrapf(err, "could not unmarshal config")
}
return &conf, nil
}
func createProxy(name string, conf *proxyBenchConfig, logf func(format string, a ...any)) (*httptest.Server, *httptest.Server, error) {
redisEndpoint := os.Getenv("BOUNCER_BENCH_REDIS_ADDR")
if redisEndpoint == "" {
redisEndpoint = "127.0.0.1:6379"
}
client := redis.NewUniversalClient(&redis.UniversalOptions{
Addrs: []string{redisEndpoint},
})
proxyRepository := redisStore.NewProxyRepository(client, redisStore.DefaultTxMaxAttempts, redisStore.DefaultTxBaseDelay)
layerRepository := redisStore.NewLayerRepository(client, redisStore.DefaultTxMaxAttempts, redisStore.DefaultTxBaseDelay)
var backend *httptest.Server
if conf.Proxy.To == "" {
backend = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
if _, err := w.Write([]byte("Hello, world.")); err != nil {
logf("[ERROR] %+v", errors.WithStack(err))
}
}))
if err := waitFor(backend.URL, 5*time.Second); err != nil {
return nil, nil, errors.WithStack(err)
}
logf("started backend '%s'", backend.URL)
}
ctx := context.Background()
proxyName := store.ProxyName("bench-" + name)
proxies, err := proxyRepository.QueryProxy(ctx)
if err != nil {
return nil, nil, errors.WithStack(err)
}
// Cleanup existing proxies
for _, p := range proxies {
if err := proxyRepository.DeleteProxy(ctx, p.Name); err != nil {
return nil, nil, errors.WithStack(err)
}
}
logf("creating proxy '%s'", proxyName)
to := string(conf.Proxy.To)
if to == "" {
to = backend.URL
}
if _, err := proxyRepository.CreateProxy(ctx, proxyName, to, conf.Proxy.From...); err != nil {
return nil, nil, errors.WithStack(err)
}
if _, err := proxyRepository.UpdateProxy(ctx, proxyName, store.WithProxyUpdateEnabled(true)); err != nil {
return nil, nil, errors.WithStack(err)
}
for layerName, layerConf := range conf.Proxy.Layers {
if err := layerRepository.DeleteLayer(ctx, proxyName, store.LayerName(layerName)); err != nil {
return nil, nil, errors.WithStack(err)
}
_, err := layerRepository.CreateLayer(ctx, proxyName, store.LayerName(layerName), store.LayerType(layerConf.Type), layerConf.Options.Data)
if err != nil {
return nil, nil, errors.WithStack(err)
}
_, err = layerRepository.UpdateLayer(ctx, proxyName, store.LayerName(layerName), store.WithLayerUpdateEnabled(bool(layerConf.Enabled)))
if err != nil {
return nil, nil, errors.WithStack(err)
}
}
layers, err := setup.GetLayers(context.Background(), config.NewDefault())
if err != nil {
return nil, nil, errors.WithStack(err)
}
director := director.New(
proxyRepository, layerRepository,
director.WithLayerCache(
ttl.NewCache(
memory.NewCache[string, []*store.Layer](),
memory.NewCache[string, time.Time](),
30*time.Second,
),
),
director.WithProxyCache(
ttl.NewCache(
memory.NewCache[string, []*store.Proxy](),
memory.NewCache[string, time.Time](),
30*time.Second,
),
),
director.WithLayers(layers...),
)
directorMiddleware := director.Middleware()
handler := proxy.New(
proxy.WithRequestTransformers(
director.RequestTransformer(),
),
proxy.WithResponseTransformers(
director.ResponseTransformer(),
),
proxy.WithReverseProxyFactory(func(ctx context.Context, target *url.URL) *httputil.ReverseProxy {
reverse := httputil.NewSingleHostReverseProxy(target)
reverse.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
logf("[ERROR] %s", errors.WithStack(err))
}
return reverse
}),
)
server := httptest.NewServer(directorMiddleware(handler))
return server, backend, nil
}
func waitFor(url string, ttl time.Duration) error {
var lastErr error
timeout := time.After(ttl)
for {
select {
case <-timeout:
if lastErr != nil {
return lastErr
}
return errors.New("wait timed out")
default:
res, err := http.Get(url)
if err != nil {
lastErr = errors.WithStack(err)
continue
}
if res.StatusCode >= 200 && res.StatusCode < 400 {
return nil
}
}
}
}

View File

@ -0,0 +1,20 @@
proxy:
from: ["*"]
to: ""
layers:
basic-auth:
type: authn-basic
enabled: true
options:
users:
- username: foo
passwordHash: "$2y$10$ShTc856wMB8PCxyr46qJRO8z06MpV4UejAVRDJ/bixhu0XTGn7Giy"
attributes:
email: foo@bar.com
rules:
- set_header("Remote-User-Attr-Email", user.attrs.email)
fetch:
url:
user:
username: foo
password: bar

View File

@ -0,0 +1,3 @@
proxy:
from: ["*"]
to: ""

View File

@ -0,0 +1,12 @@
proxy:
from: ["*"]
to: ""
layers:
host-rewriter:
type: rewriter
enabled: true
options:
rules:
request:
- set_host(request.url.host)
- set_header("X-Proxied-With", "bouncer")

View File

@ -13,6 +13,7 @@ func layerHeaderHints(outputMode format.OutputMode) format.Hints {
format.NewProp("Type", "Type"),
format.NewProp("Enabled", "Enabled"),
format.NewProp("Weight", "Weight"),
format.NewProp("Revision", "Revision"),
},
}
}
@ -25,6 +26,7 @@ func layerHints(outputMode format.OutputMode) format.Hints {
format.NewProp("Type", "Type"),
format.NewProp("Enabled", "Enabled"),
format.NewProp("Weight", "Weight"),
format.NewProp("Revision", "Revision"),
format.NewProp("Options", "Options"),
format.NewProp("CreatedAt", "CreatedAt", table.WithCompactModeMaxColumnWidth(20)),
format.NewProp("UpdatedAt", "UpdatedAt", table.WithCompactModeMaxColumnWidth(20)),

View File

@ -12,6 +12,7 @@ func proxyHeaderHints(outputMode format.OutputMode) format.Hints {
format.NewProp("Name", "Name"),
format.NewProp("Enabled", "Enabled"),
format.NewProp("Weight", "Weight"),
format.NewProp("Revision", "Revision"),
},
}
}
@ -25,6 +26,7 @@ func proxyHints(outputMode format.OutputMode) format.Hints {
format.NewProp("To", "To"),
format.NewProp("Enabled", "Enabled"),
format.NewProp("Weight", "Weight"),
format.NewProp("Revision", "Revision"),
format.NewProp("CreatedAt", "CreatedAt", table.WithCompactModeMaxColumnWidth(20)),
format.NewProp("UpdatedAt", "UpdatedAt", table.WithCompactModeMaxColumnWidth(20)),
},

View File

@ -80,9 +80,22 @@ func loadBootstrapDir(dir string) (map[store.ProxyName]BootstrapProxyConfig, err
proxies := make(map[store.ProxyName]BootstrapProxyConfig)
for _, f := range files {
data, err := os.ReadFile(f)
proxy, err := loadBootstrappedProxyConfig(f)
if err != nil {
return nil, errors.Wrapf(err, "could not read file '%s'", f)
return nil, errors.Wrapf(err, "could not load proxy bootstrap file '%s'", f)
}
name := store.ProxyName(strings.TrimSuffix(filepath.Base(f), filepath.Ext(f)))
proxies[name] = *proxy
}
return proxies, nil
}
func loadBootstrappedProxyConfig(filename string) (*BootstrapProxyConfig, error) {
data, err := os.ReadFile(filename)
if err != nil {
return nil, errors.Wrapf(err, "could not read file '%s'", filename)
}
proxy := BootstrapProxyConfig{}
@ -91,11 +104,7 @@ func loadBootstrapDir(dir string) (map[store.ProxyName]BootstrapProxyConfig, err
return nil, errors.Wrapf(err, "could not unmarshal proxy")
}
name := store.ProxyName(strings.TrimSuffix(filepath.Base(f), filepath.Ext(f)))
proxies[name] = proxy
}
return proxies, nil
return &proxy, nil
}
func overrideProxies(base map[store.ProxyName]BootstrapProxyConfig, proxies map[store.ProxyName]BootstrapProxyConfig) map[store.ProxyName]BootstrapProxyConfig {

View File

@ -127,7 +127,7 @@ func (im *InterpolatedMap) UnmarshalYAML(value *yaml.Node) error {
return nil
}
func (im *InterpolatedMap) interpolateRecursive(data any) (any, error) {
func (im InterpolatedMap) interpolateRecursive(data any) (any, error) {
switch typ := data.(type) {
case map[string]any:
for key, value := range typ {

View File

@ -74,7 +74,7 @@ func (l *Layer) ResponseTransformer(layer *store.Layer) proxy.ResponseTransforme
}
}
func New() *Layer {
func New(funcs ...OptionFunc) *Layer {
return &Layer{}
}

View File

@ -0,0 +1,16 @@
package rewriter
type Options struct {
}
type OptionFunc func(opts *Options)
func NewOptions(funcs ...OptionFunc) *Options {
opts := &Options{}
for _, fn := range funcs {
fn(opts)
}
return opts
}

View File

@ -12,9 +12,27 @@ type RequestEnv struct {
Request RequestInfo `expr:"request"`
}
type URLEnv struct {
Scheme string `expr:"scheme"`
Opaque string `expr:"opaque"`
User UserInfoEnv `expr:"user"`
Host string `expr:"host"`
Path string `expr:"path"`
RawPath string `expr:"rawPath"`
RawQuery string `expr:"rawQuery"`
Fragment string `expr:"fragment"`
RawFragment string `expr:"rawFragment"`
}
type UserInfoEnv struct {
Username string `expr:"username"`
Password string `expr:"password"`
}
type RequestInfo struct {
Method string `expr:"method"`
URL string `expr:"url"`
URL URLEnv `expr:"url"`
RawURL string `expr:"rawUrl"`
Proto string `expr:"proto"`
ProtoMajor int `expr:"protoMajor"`
ProtoMinor int `expr:"protoMinor"`
@ -33,10 +51,7 @@ func (l *Layer) applyRequestRules(r *http.Request, options *LayerOptions) error
return nil
}
engine, err := rule.NewEngine[*RequestEnv](
ruleHTTP.WithRequestFuncs(r),
rule.WithRules(options.Rules.Request...),
)
engine, err := l.getRequestRuleEngine(r, options)
if err != nil {
return errors.WithStack(err)
}
@ -44,7 +59,24 @@ func (l *Layer) applyRequestRules(r *http.Request, options *LayerOptions) error
env := &RequestEnv{
Request: RequestInfo{
Method: r.Method,
URL: r.URL.String(),
URL: URLEnv{
Scheme: r.URL.Scheme,
Opaque: r.URL.Opaque,
User: UserInfoEnv{
Username: r.URL.User.Username(),
Password: func() string {
passwd, _ := r.URL.User.Password()
return passwd
}(),
},
Host: r.URL.Host,
Path: r.URL.Path,
RawPath: r.URL.RawPath,
RawQuery: r.URL.RawQuery,
Fragment: r.URL.Fragment,
RawFragment: r.URL.RawFragment,
},
RawURL: r.URL.String(),
Proto: r.Proto,
ProtoMajor: r.ProtoMajor,
ProtoMinor: r.ProtoMinor,
@ -65,6 +97,18 @@ func (l *Layer) applyRequestRules(r *http.Request, options *LayerOptions) error
return nil
}
func (l *Layer) getRequestRuleEngine(r *http.Request, options *LayerOptions) (*rule.Engine[*RequestEnv], error) {
engine, err := rule.NewEngine[*RequestEnv](
rule.WithRules(options.Rules.Request...),
ruleHTTP.WithRequestFuncs(r),
)
if err != nil {
return nil, errors.WithStack(err)
}
return engine, nil
}
type ResponseEnv struct {
Request RequestInfo `expr:"request"`
Response ResponseInfo `expr:"response"`
@ -84,15 +128,12 @@ type ResponseInfo struct {
}
func (l *Layer) applyResponseRules(r *http.Response, options *LayerOptions) error {
rules := options.Rules.Request
rules := options.Rules.Response
if len(rules) == 0 {
return nil
}
engine, err := rule.NewEngine[*ResponseEnv](
rule.WithRules(options.Rules.Response...),
ruleHTTP.WithResponseFuncs(r),
)
engine, err := l.getResponseRuleEngine(r, options)
if err != nil {
return errors.WithStack(err)
}
@ -100,7 +141,24 @@ func (l *Layer) applyResponseRules(r *http.Response, options *LayerOptions) erro
env := &ResponseEnv{
Request: RequestInfo{
Method: r.Request.Method,
URL: r.Request.URL.String(),
URL: URLEnv{
Scheme: r.Request.URL.Scheme,
Opaque: r.Request.URL.Opaque,
User: UserInfoEnv{
Username: r.Request.URL.User.Username(),
Password: func() string {
passwd, _ := r.Request.URL.User.Password()
return passwd
}(),
},
Host: r.Request.URL.Host,
Path: r.Request.URL.Path,
RawPath: r.Request.URL.RawPath,
RawQuery: r.Request.URL.RawQuery,
Fragment: r.Request.URL.Fragment,
RawFragment: r.Request.URL.RawFragment,
},
RawURL: r.Request.URL.String(),
Proto: r.Request.Proto,
ProtoMajor: r.Request.ProtoMajor,
ProtoMinor: r.Request.ProtoMinor,
@ -131,3 +189,15 @@ func (l *Layer) applyResponseRules(r *http.Response, options *LayerOptions) erro
return nil
}
func (l *Layer) getResponseRuleEngine(r *http.Response, options *LayerOptions) (*rule.Engine[*ResponseEnv], error) {
engine, err := rule.NewEngine[*ResponseEnv](
rule.WithRules(options.Rules.Response...),
ruleHTTP.WithResponseFuncs(r),
)
if err != nil {
return nil, errors.WithStack(err)
}
return engine, nil
}

View File

@ -1,156 +0,0 @@
package proxy_test
import (
"context"
"io"
"net/http"
"net/http/httptest"
"net/http/httputil"
"net/url"
"os"
"testing"
"time"
"forge.cadoles.com/Cadoles/go-proxy"
"forge.cadoles.com/cadoles/bouncer/internal/cache/memory"
"forge.cadoles.com/cadoles/bouncer/internal/cache/ttl"
"forge.cadoles.com/cadoles/bouncer/internal/proxy/director"
"forge.cadoles.com/cadoles/bouncer/internal/store"
redisStore "forge.cadoles.com/cadoles/bouncer/internal/store/redis"
"github.com/pkg/errors"
"github.com/redis/go-redis/v9"
)
func BenchmarkProxy(b *testing.B) {
redisEndpoint := os.Getenv("BOUNCER_BENCH_REDIS_ADDR")
if redisEndpoint == "" {
redisEndpoint = "127.0.0.1:6379"
}
client := redis.NewUniversalClient(&redis.UniversalOptions{
Addrs: []string{redisEndpoint},
})
proxyRepository := redisStore.NewProxyRepository(client)
layerRepository := redisStore.NewLayerRepository(client)
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
if _, err := w.Write([]byte("Hello, world.")); err != nil {
b.Logf("[ERROR] %+v", errors.WithStack(err))
}
}))
defer backend.Close()
if err := waitFor(backend.URL, 5*time.Second); err != nil {
b.Fatalf("[FATAL] %+v", errors.WithStack(err))
}
b.Logf("started backend '%s'", backend.URL)
ctx := context.Background()
proxyName := store.ProxyName(b.Name())
b.Logf("creating proxy '%s'", proxyName)
if err := proxyRepository.DeleteProxy(ctx, proxyName); err != nil {
b.Fatalf("[FATAL] %+v", errors.WithStack(err))
}
if _, err := proxyRepository.CreateProxy(ctx, proxyName, backend.URL, "*"); err != nil {
b.Fatalf("[FATAL] %+v", errors.WithStack(err))
}
if _, err := proxyRepository.UpdateProxy(ctx, proxyName, store.WithProxyUpdateEnabled(true)); err != nil {
b.Fatalf("[FATAL] %+v", errors.WithStack(err))
}
director := director.New(
proxyRepository, layerRepository,
director.WithLayerCache(
ttl.NewCache(
memory.NewCache[string, []*store.Layer](),
memory.NewCache[string, time.Time](),
30*time.Second,
),
),
director.WithProxyCache(
ttl.NewCache(
memory.NewCache[string, []*store.Proxy](),
memory.NewCache[string, time.Time](),
30*time.Second,
),
),
)
directorMiddleware := director.Middleware()
handler := proxy.New(
proxy.WithRequestTransformers(
director.RequestTransformer(),
),
proxy.WithResponseTransformers(
director.ResponseTransformer(),
),
proxy.WithReverseProxyFactory(func(ctx context.Context, target *url.URL) *httputil.ReverseProxy {
reverse := httputil.NewSingleHostReverseProxy(target)
reverse.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
b.Logf("[ERROR] %s", errors.WithStack(err))
}
return reverse
}),
)
server := httptest.NewServer(directorMiddleware(handler))
defer server.Close()
b.Logf("started proxy '%s'", server.URL)
httpClient := server.Client()
b.ResetTimer()
for i := 0; i < b.N; i++ {
res, err := httpClient.Get(server.URL)
if err != nil {
b.Errorf("could not fetch server url: %+v", errors.WithStack(err))
}
body, err := io.ReadAll(res.Body)
if err != nil {
b.Errorf("could not read response body: %+v", errors.WithStack(err))
}
b.Logf("%s - %v", res.Status, string(body))
if err := res.Body.Close(); err != nil {
b.Errorf("could not close response body: %+v", errors.WithStack(err))
}
}
}
func waitFor(url string, ttl time.Duration) error {
var lastErr error
timeout := time.After(ttl)
for {
select {
case <-timeout:
if lastErr != nil {
return lastErr
}
return errors.New("wait timed out")
default:
res, err := http.Get(url)
if err != nil {
lastErr = errors.WithStack(err)
continue
}
if res.StatusCode >= 200 && res.StatusCode < 400 {
return nil
}
}
}
}

View File

@ -17,9 +17,9 @@ func NewRedisClient(ctx context.Context, conf config.RedisConfig) redis.Universa
}
func NewProxyRepository(ctx context.Context, client redis.UniversalClient) (store.ProxyRepository, error) {
return redisStore.NewProxyRepository(client), nil
return redisStore.NewProxyRepository(client, redisStore.DefaultTxMaxAttempts, redisStore.DefaultTxBaseDelay), nil
}
func NewLayerRepository(ctx context.Context, client redis.UniversalClient) (store.LayerRepository, error) {
return redisStore.NewLayerRepository(client), nil
return redisStore.NewLayerRepository(client, redisStore.DefaultTxMaxAttempts, redisStore.DefaultTxBaseDelay), nil
}

View File

@ -13,6 +13,7 @@ type (
type LayerHeader struct {
Proxy ProxyName `json:"proxy"`
Name LayerName `json:"name"`
Revision int `json:"revision"`
Type LayerType `json:"type"`
Weight int `json:"weight"`

View File

@ -8,7 +8,7 @@ type ProxyName Name
type ProxyHeader struct {
Name ProxyName `json:"name"`
Revision int `json:"revision"`
Weight int `json:"weight"`
Enabled bool `json:"enabled"`
}

View File

@ -3,10 +3,18 @@ package redis
import (
"context"
"encoding/json"
"math/rand"
"strings"
"time"
"github.com/pkg/errors"
"github.com/redis/go-redis/v9"
"gitlab.com/wpetit/goweb/logger"
)
var (
DefaultTxMaxAttempts = 20
DefaultTxBaseDelay = 100 * time.Millisecond
)
type jsonWrapper[T any] struct {
@ -65,6 +73,33 @@ func key(parts ...string) string {
return strings.Join(parts, ":")
}
func WithRetry(ctx context.Context, client redis.UniversalClient, key string, fn func(ctx context.Context, tx *redis.Tx) error, maxAttempts int, baseDelay time.Duration) error {
var err error
delay := baseDelay
for attempt := 0; attempt < maxAttempts; attempt++ {
if err = WithTx(ctx, client, key, fn); err != nil {
err = errors.WithStack(err)
logger.Debug(ctx, "redis transaction failed", logger.E(err))
if errors.Is(err, redis.TxFailedErr) {
logger.Debug(ctx, "retrying redis transaction", logger.F("attempts", attempt), logger.F("delay", delay))
time.Sleep(delay)
delay = delay*2 + time.Duration(rand.Int63n(int64(baseDelay)))
continue
}
return err
}
return nil
}
return errors.WithStack(redis.TxFailedErr)
}
func WithTx(ctx context.Context, client redis.UniversalClient, key string, fn func(ctx context.Context, tx *redis.Tx) error) error {
txf := func(tx *redis.Tx) error {
if err := fn(ctx, tx); err != nil {

View File

@ -10,6 +10,7 @@ import (
type layerHeaderItem struct {
Proxy string `redis:"proxy"`
Name string `redis:"name"`
Revision int `redis:"revision"`
Type string `redis:"type"`
Weight int `redis:"weight"`
@ -20,6 +21,7 @@ func (i *layerHeaderItem) ToLayerHeader() (*store.LayerHeader, error) {
layerHeader := &store.LayerHeader{
Proxy: store.ProxyName(i.Proxy),
Name: store.LayerName(i.Name),
Revision: i.Revision,
Type: store.LayerType(i.Type),
Weight: i.Weight,
Enabled: i.Enabled,

View File

@ -15,6 +15,8 @@ const (
type LayerRepository struct {
client redis.UniversalClient
txMaxAttempts int
txRetryBaseDelay time.Duration
}
// CreateLayer implements store.LayerRepository
@ -28,12 +30,13 @@ func (r *LayerRepository) CreateLayer(ctx context.Context, proxyName store.Proxy
Name: string(layerName),
Type: string(layerType),
Weight: 0,
Revision: 0,
Enabled: false,
},
CreatedAt: wrap(now),
UpdatedAt: wrap(now),
Options: wrap(store.LayerOptions{}),
Options: wrap(options),
}
txf := func(tx *redis.Tx) error {
@ -57,6 +60,11 @@ func (r *LayerRepository) CreateLayer(ctx context.Context, proxyName store.Proxy
return errors.WithStack(err)
}
layerItem, err = r.txGetLayerItem(ctx, tx, proxyName, layerName)
if err != nil {
return errors.WithStack(err)
}
return nil
}
@ -67,16 +75,16 @@ func (r *LayerRepository) CreateLayer(ctx context.Context, proxyName store.Proxy
return &store.Layer{
LayerHeader: store.LayerHeader{
Name: layerName,
Proxy: proxyName,
Type: layerType,
Weight: 0,
Enabled: false,
Name: store.LayerName(layerItem.Name),
Proxy: store.ProxyName(layerItem.Proxy),
Type: store.LayerType(layerItem.Type),
Weight: layerItem.Weight,
Enabled: layerItem.Enabled,
},
CreatedAt: now,
UpdatedAt: now,
Options: store.LayerOptions{},
CreatedAt: layerItem.CreatedAt.Value(),
UpdatedAt: layerItem.UpdatedAt.Value(),
Options: layerItem.Options.Value(),
}, nil
}
@ -96,7 +104,7 @@ func (r *LayerRepository) GetLayer(ctx context.Context, proxyName store.ProxyNam
key := layerKey(proxyName, layerName)
var layerItem *layerItem
err := WithTx(ctx, r.client, key, func(ctx context.Context, tx *redis.Tx) error {
err := WithRetry(ctx, r.client, key, func(ctx context.Context, tx *redis.Tx) error {
pItem, err := r.txGetLayerItem(ctx, tx, proxyName, layerName)
if err != nil {
return errors.WithStack(err)
@ -105,7 +113,7 @@ func (r *LayerRepository) GetLayer(ctx context.Context, proxyName store.ProxyNam
layerItem = pItem
return nil
})
}, r.txMaxAttempts, r.txRetryBaseDelay)
if err != nil {
return nil, errors.WithStack(err)
}
@ -197,7 +205,7 @@ func (r *LayerRepository) UpdateLayer(ctx context.Context, proxyName store.Proxy
key := layerKey(proxyName, layerName)
var layerItem layerItem
err := WithTx(ctx, r.client, key, func(ctx context.Context, tx *redis.Tx) error {
err := WithRetry(ctx, r.client, key, func(ctx context.Context, tx *redis.Tx) error {
item, err := r.txGetLayerItem(ctx, tx, proxyName, layerName)
if err != nil {
return errors.WithStack(err)
@ -216,6 +224,7 @@ func (r *LayerRepository) UpdateLayer(ctx context.Context, proxyName store.Proxy
}
item.UpdatedAt = wrap(time.Now().UTC())
item.Revision = item.Revision + 1
_, err = tx.TxPipelined(ctx, func(p redis.Pipeliner) error {
p.HMSet(ctx, key, item.layerHeaderItem)
@ -230,7 +239,7 @@ func (r *LayerRepository) UpdateLayer(ctx context.Context, proxyName store.Proxy
layerItem = *item
return nil
})
}, r.txMaxAttempts, r.txRetryBaseDelay)
if err != nil {
return nil, errors.WithStack(err)
}
@ -243,9 +252,11 @@ func (r *LayerRepository) UpdateLayer(ctx context.Context, proxyName store.Proxy
return layer, nil
}
func NewLayerRepository(client redis.UniversalClient) *LayerRepository {
func NewLayerRepository(client redis.UniversalClient, txMaxAttempts int, txRetryBaseDelay time.Duration) *LayerRepository {
return &LayerRepository{
client: client,
txMaxAttempts: txMaxAttempts,
txRetryBaseDelay: txRetryBaseDelay,
}
}

View File

@ -7,6 +7,6 @@ import (
)
func TestLayerRepository(t *testing.T) {
repository := NewLayerRepository(client)
repository := NewLayerRepository(client, DefaultTxMaxAttempts, DefaultTxBaseDelay)
testsuite.TestLayerRepository(t, repository)
}

View File

@ -9,6 +9,7 @@ import (
type proxyHeaderItem struct {
Name string `redis:"name"`
Revision int `redis:"revision"`
Weight int `redis:"weight"`
Enabled bool `redis:"enabled"`
@ -20,6 +21,7 @@ type proxyHeaderItem struct {
func (i *proxyHeaderItem) ToProxyHeader() (*store.ProxyHeader, error) {
proxyHeader := &store.ProxyHeader{
Name: store.ProxyName(i.Name),
Revision: i.Revision,
Weight: i.Weight,
Enabled: i.Enabled,
}

View File

@ -15,6 +15,8 @@ const (
type ProxyRepository struct {
client redis.UniversalClient
txMaxAttempts int
txRetryBaseDelay time.Duration
}
// GetProxy implements store.ProxyRepository
@ -22,7 +24,7 @@ func (r *ProxyRepository) GetProxy(ctx context.Context, name store.ProxyName) (*
key := proxyKey(name)
var proxyItem *proxyItem
err := WithTx(ctx, r.client, key, func(ctx context.Context, tx *redis.Tx) error {
err := WithRetry(ctx, r.client, key, func(ctx context.Context, tx *redis.Tx) error {
pItem, err := r.txGetProxyItem(ctx, tx, name)
if err != nil {
return errors.WithStack(err)
@ -31,7 +33,7 @@ func (r *ProxyRepository) GetProxy(ctx context.Context, name store.ProxyName) (*
proxyItem = pItem
return nil
})
}, r.txMaxAttempts, r.txRetryBaseDelay)
if err != nil {
return nil, errors.WithStack(err)
}
@ -89,6 +91,7 @@ func (r *ProxyRepository) CreateProxy(ctx context.Context, name store.ProxyName,
CreatedAt: wrap(now),
UpdatedAt: wrap(now),
Weight: 0,
Revision: 0,
Enabled: false,
},
To: to,
@ -191,7 +194,7 @@ func (r *ProxyRepository) UpdateProxy(ctx context.Context, name store.ProxyName,
key := proxyKey(name)
var proxyItem proxyItem
err := WithTx(ctx, r.client, key, func(ctx context.Context, tx *redis.Tx) error {
err := WithRetry(ctx, r.client, key, func(ctx context.Context, tx *redis.Tx) error {
item, err := r.txGetProxyItem(ctx, tx, name)
if err != nil {
return errors.WithStack(err)
@ -214,6 +217,7 @@ func (r *ProxyRepository) UpdateProxy(ctx context.Context, name store.ProxyName,
}
item.UpdatedAt = wrap(time.Now().UTC())
item.Revision = item.Revision + 1
_, err = tx.TxPipelined(ctx, func(p redis.Pipeliner) error {
p.HMSet(ctx, key, item.proxyHeaderItem)
@ -228,7 +232,7 @@ func (r *ProxyRepository) UpdateProxy(ctx context.Context, name store.ProxyName,
proxyItem = *item
return nil
})
}, r.txMaxAttempts, r.txRetryBaseDelay)
if err != nil {
return nil, errors.WithStack(err)
}
@ -241,9 +245,11 @@ func (r *ProxyRepository) UpdateProxy(ctx context.Context, name store.ProxyName,
return proxy, nil
}
func NewProxyRepository(client redis.UniversalClient) *ProxyRepository {
func NewProxyRepository(client redis.UniversalClient, txMaxAttempts int, txRetryBaseDelay time.Duration) *ProxyRepository {
return &ProxyRepository{
client: client,
txMaxAttempts: 20,
txRetryBaseDelay: txRetryBaseDelay,
}
}

View File

@ -7,6 +7,6 @@ import (
)
func TestProxyRepository(t *testing.T) {
repository := NewProxyRepository(client)
repository := NewProxyRepository(client, DefaultTxMaxAttempts, DefaultTxBaseDelay)
testsuite.TestProxyRepository(t, repository)
}

View File

@ -3,6 +3,7 @@ package testsuite
import (
"context"
"reflect"
"sync"
"testing"
"forge.cadoles.com/cadoles/bouncer/internal/store"
@ -49,6 +50,10 @@ var layerRepositoryTestCases = []layerRepositoryTestCase{
return errors.Errorf("layer.UpdatedAt should not be zero value")
}
if layer.Revision != 0 {
return errors.Errorf("layer.Revision should be zero")
}
return nil
},
},
@ -230,6 +235,86 @@ var layerRepositoryTestCases = []layerRepositoryTestCase{
return errors.New("could not find created layer in query results")
}
return nil
},
},
{
Name: "Create then update layer",
Do: func(repo store.LayerRepository) error {
ctx := context.Background()
var layerName store.LayerName = "create_then_update_layer"
var proxyName store.ProxyName = store.ProxyName(string(layerName) + "_proxy")
var layerType store.LayerType = "dummy"
var layerOptions store.LayerOptions = store.LayerOptions{}
createdLayer, err := repo.CreateLayer(ctx, proxyName, layerName, layerType, layerOptions)
if err != nil {
return errors.WithStack(err)
}
if e, g := 0, createdLayer.Revision; e != g {
return errors.Errorf("createdLayer.Revision: expected '%v', got '%v'", e, g)
}
updatedLayer, err := repo.UpdateLayer(ctx, proxyName, layerName)
if err != nil {
return errors.Wrap(err, "err should be nil")
}
if e, g := 1, updatedLayer.Revision; e != g {
return errors.Errorf("updatedLayer.Revision: expected '%v', got '%v'", e, g)
}
return nil
},
},
{
Name: "Update layer concurrently",
Do: func(repo store.LayerRepository) error {
ctx := context.Background()
var layerName store.LayerName = "update_layer_concurrently"
var proxyName store.ProxyName = store.ProxyName(string(layerName) + "_proxy")
var layerType store.LayerType = "dummy"
var layerOptions store.LayerOptions = store.LayerOptions{}
createdLayer, err := repo.CreateLayer(ctx, proxyName, layerName, layerType, layerOptions)
if err != nil {
return errors.WithStack(err)
}
if createdLayer.Revision != 0 {
return errors.Errorf("createdLayer.Revision should be zero")
}
var wg sync.WaitGroup
total := 100
wg.Add(total)
for i := 0; i < total; i++ {
go func(i int) {
defer wg.Done()
if _, err := repo.UpdateLayer(ctx, createdLayer.Proxy, createdLayer.Name); err != nil {
panic(errors.Wrap(err, "err should be nil"))
}
}(i)
}
wg.Wait()
layer, err := repo.GetLayer(ctx, createdLayer.Proxy, createdLayer.Name)
if err != nil {
return errors.Wrap(err, "err should be nil")
}
if e, g := total, layer.Revision; e != g {
return errors.Errorf("layer.Revision: expected '%v', got '%v'", e, g)
}
return nil
},
},

View File

@ -3,6 +3,7 @@ package testsuite
import (
"context"
"reflect"
"sync"
"testing"
"forge.cadoles.com/cadoles/bouncer/internal/store"
@ -51,6 +52,10 @@ var proxyRepositoryTestCases = []proxyRepositoryTestCase{
return errors.Errorf("proxy.UpdatedAt should not be zero value")
}
if proxy.Revision != 0 {
return errors.Errorf("proxy.Revision should be zero")
}
return nil
},
},
@ -99,6 +104,10 @@ var proxyRepositoryTestCases = []proxyRepositoryTestCase{
return errors.Errorf("foundProxy.UpdatedAt: expected '%v', got '%v'", createdProxy.UpdatedAt, foundProxy.UpdatedAt)
}
if foundProxy.Revision != 0 {
return errors.Errorf("foundProxy.Revision should be zero")
}
return nil
},
},
@ -194,6 +203,84 @@ var proxyRepositoryTestCases = []proxyRepositoryTestCase{
return errors.Errorf("err: expected store.ErrAlreadyExists, got '%+v'", err)
}
return nil
},
},
{
Name: "Create then update proxy",
Do: func(repo store.ProxyRepository) error {
ctx := context.Background()
to := "http://example.com"
var name store.ProxyName = "create_then_update_proxy"
createdProxy, err := repo.CreateProxy(ctx, name, to, "127.0.0.1:*", "localhost:*")
if err != nil {
return errors.Wrap(err, "err should be nil")
}
if createdProxy.Revision != 0 {
return errors.Errorf("createdProxy.Revision should be zero")
}
updatedProxy, err := repo.UpdateProxy(ctx, name)
if err != nil {
return errors.Wrap(err, "err should be nil")
}
if e, g := 1, updatedProxy.Revision; e != g {
return errors.Errorf("updatedProxy.Revision: expected '%v', got '%v'", e, g)
}
return nil
},
},
{
Name: "Update proxy concurrently",
Do: func(repo store.ProxyRepository) error {
ctx := context.Background()
to := "http://example.com"
var name store.ProxyName = "update_proxy_concurrently"
createdProxy, err := repo.CreateProxy(ctx, name, to, "127.0.0.1:*", "localhost:*")
if err != nil {
return errors.Wrap(err, "err should be nil")
}
if createdProxy.Revision != 0 {
return errors.Errorf("createdProxy.Revision should be zero")
}
var wg sync.WaitGroup
total := 100
wg.Add(total)
for i := 0; i < total; i++ {
go func(i int) {
defer wg.Done()
if _, err := repo.UpdateProxy(ctx, name); err != nil {
panic(errors.Wrap(err, "err should be nil"))
}
}(i)
}
wg.Wait()
proxy, err := repo.GetProxy(ctx, name)
if err != nil {
return errors.Wrap(err, "err should be nil")
}
if e, g := total, proxy.Revision; e != g {
return errors.Errorf("proxy.Revision: expected '%v', got '%v'", e, g)
}
return nil
},
},