301 lines
7.5 KiB
Go
301 lines
7.5 KiB
Go
|
package proxy_test
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"io"
|
||
|
"log"
|
||
|
"net/http"
|
||
|
"net/http/httptest"
|
||
|
"net/http/httputil"
|
||
|
"net/url"
|
||
|
"os"
|
||
|
"path/filepath"
|
||
|
"runtime/pprof"
|
||
|
"strings"
|
||
|
"testing"
|
||
|
"time"
|
||
|
|
||
|
"forge.cadoles.com/Cadoles/go-proxy"
|
||
|
"forge.cadoles.com/cadoles/bouncer/internal/cache/memory"
|
||
|
"forge.cadoles.com/cadoles/bouncer/internal/cache/ttl"
|
||
|
"forge.cadoles.com/cadoles/bouncer/internal/config"
|
||
|
"forge.cadoles.com/cadoles/bouncer/internal/proxy/director"
|
||
|
"forge.cadoles.com/cadoles/bouncer/internal/store"
|
||
|
redisStore "forge.cadoles.com/cadoles/bouncer/internal/store/redis"
|
||
|
"github.com/pkg/errors"
|
||
|
"github.com/redis/go-redis/v9"
|
||
|
"gopkg.in/yaml.v3"
|
||
|
|
||
|
"forge.cadoles.com/cadoles/bouncer/internal/setup"
|
||
|
)
|
||
|
|
||
|
func BenchmarkProxies(b *testing.B) {
|
||
|
proxyFiles, err := filepath.Glob("testdata/proxies/*.yml")
|
||
|
if err != nil {
|
||
|
b.Fatalf("%+v", errors.WithStack(err))
|
||
|
}
|
||
|
|
||
|
for _, f := range proxyFiles {
|
||
|
name := strings.TrimSuffix(filepath.Base(f), filepath.Ext(f))
|
||
|
|
||
|
b.Run(name, func(b *testing.B) {
|
||
|
conf, err := loadProxyBenchConfig(f)
|
||
|
if err != nil {
|
||
|
b.Fatalf("%+v", errors.Wrapf(err, "could notre load bench config"))
|
||
|
}
|
||
|
|
||
|
proxy, backend, err := createProxy(name, conf, b.Logf)
|
||
|
if err != nil {
|
||
|
b.Fatalf("%+v", errors.Wrapf(err, "could not create proxy"))
|
||
|
}
|
||
|
|
||
|
defer proxy.Close()
|
||
|
|
||
|
if backend != nil {
|
||
|
defer backend.Close()
|
||
|
}
|
||
|
|
||
|
client := proxy.Client()
|
||
|
|
||
|
proxyURL, err := url.Parse(proxy.URL)
|
||
|
if err != nil {
|
||
|
b.Fatalf("%+v", errors.Wrapf(err, "could not parse proxy url"))
|
||
|
}
|
||
|
|
||
|
if conf.Fetch.URL.Path != "" {
|
||
|
proxyURL.Path = conf.Fetch.URL.Path
|
||
|
}
|
||
|
|
||
|
if conf.Fetch.URL.RawQuery != "" {
|
||
|
proxyURL.RawQuery = conf.Fetch.URL.RawQuery
|
||
|
}
|
||
|
|
||
|
if conf.Fetch.URL.User.Username != "" || conf.Fetch.URL.User.Password != "" {
|
||
|
proxyURL.User = url.UserPassword(conf.Fetch.URL.User.Username, conf.Fetch.URL.User.Password)
|
||
|
}
|
||
|
|
||
|
rawProxyURL := proxyURL.String()
|
||
|
|
||
|
b.Logf("fetching url '%s'", rawProxyURL)
|
||
|
|
||
|
profile, err := os.Create(filepath.Join("testdata", "proxies", name+".prof"))
|
||
|
if err != nil {
|
||
|
b.Fatalf("%+v", errors.Wrapf(err, "could not create cpu profile"))
|
||
|
}
|
||
|
|
||
|
defer profile.Close()
|
||
|
|
||
|
if err := pprof.StartCPUProfile(profile); err != nil {
|
||
|
log.Fatal(err)
|
||
|
}
|
||
|
|
||
|
defer pprof.StopCPUProfile()
|
||
|
|
||
|
b.ResetTimer()
|
||
|
|
||
|
for i := 0; i < b.N; i++ {
|
||
|
res, err := client.Get(rawProxyURL)
|
||
|
if err != nil {
|
||
|
b.Errorf("could not fetch proxy url: %+v", errors.WithStack(err))
|
||
|
}
|
||
|
|
||
|
body, err := io.ReadAll(res.Body)
|
||
|
if err != nil {
|
||
|
b.Errorf("could not read response body: %+v", errors.WithStack(err))
|
||
|
}
|
||
|
|
||
|
b.Logf("%s \n %v", res.Status, string(body))
|
||
|
|
||
|
if err := res.Body.Close(); err != nil {
|
||
|
b.Errorf("could not close response body: %+v", errors.WithStack(err))
|
||
|
}
|
||
|
}
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
|
||
|
type proxyBenchConfig struct {
|
||
|
Proxy config.BootstrapProxyConfig `yaml:"proxy"`
|
||
|
Fetch fetchBenchConfig `yaml:"fetch"`
|
||
|
}
|
||
|
|
||
|
type fetchBenchConfig struct {
|
||
|
URL fetchURLBenchConfig `yaml:"url"`
|
||
|
}
|
||
|
|
||
|
type fetchURLBenchConfig struct {
|
||
|
Path string `yaml:"path"`
|
||
|
RawQuery string `yaml:"rawQuery"`
|
||
|
User fetchURLUserBenchConfig `yaml:"user"`
|
||
|
}
|
||
|
|
||
|
type fetchURLUserBenchConfig struct {
|
||
|
Username string `yaml:"username"`
|
||
|
Password string `yaml:"password"`
|
||
|
}
|
||
|
|
||
|
func loadProxyBenchConfig(filename string) (*proxyBenchConfig, error) {
|
||
|
data, err := os.ReadFile(filename)
|
||
|
if err != nil {
|
||
|
return nil, errors.Wrapf(err, "could not read file '%s'", filename)
|
||
|
}
|
||
|
|
||
|
conf := proxyBenchConfig{}
|
||
|
|
||
|
if err := yaml.Unmarshal(data, &conf); err != nil {
|
||
|
return nil, errors.Wrapf(err, "could not unmarshal config")
|
||
|
}
|
||
|
|
||
|
return &conf, nil
|
||
|
}
|
||
|
|
||
|
func createProxy(name string, conf *proxyBenchConfig, logf func(format string, a ...any)) (*httptest.Server, *httptest.Server, error) {
|
||
|
redisEndpoint := os.Getenv("BOUNCER_BENCH_REDIS_ADDR")
|
||
|
if redisEndpoint == "" {
|
||
|
redisEndpoint = "127.0.0.1:6379"
|
||
|
}
|
||
|
|
||
|
client := redis.NewUniversalClient(&redis.UniversalOptions{
|
||
|
Addrs: []string{redisEndpoint},
|
||
|
})
|
||
|
|
||
|
proxyRepository := redisStore.NewProxyRepository(client, redisStore.DefaultTxMaxAttempts, redisStore.DefaultTxBaseDelay)
|
||
|
layerRepository := redisStore.NewLayerRepository(client, redisStore.DefaultTxMaxAttempts, redisStore.DefaultTxBaseDelay)
|
||
|
|
||
|
var backend *httptest.Server
|
||
|
|
||
|
if conf.Proxy.To == "" {
|
||
|
backend = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||
|
w.WriteHeader(http.StatusOK)
|
||
|
if _, err := w.Write([]byte("Hello, world.")); err != nil {
|
||
|
logf("[ERROR] %+v", errors.WithStack(err))
|
||
|
}
|
||
|
}))
|
||
|
|
||
|
if err := waitFor(backend.URL, 5*time.Second); err != nil {
|
||
|
return nil, nil, errors.WithStack(err)
|
||
|
}
|
||
|
|
||
|
logf("started backend '%s'", backend.URL)
|
||
|
}
|
||
|
|
||
|
ctx := context.Background()
|
||
|
|
||
|
proxyName := store.ProxyName("bench-" + name)
|
||
|
|
||
|
proxies, err := proxyRepository.QueryProxy(ctx)
|
||
|
if err != nil {
|
||
|
return nil, nil, errors.WithStack(err)
|
||
|
}
|
||
|
|
||
|
// Cleanup existing proxies
|
||
|
for _, p := range proxies {
|
||
|
if err := proxyRepository.DeleteProxy(ctx, p.Name); err != nil {
|
||
|
return nil, nil, errors.WithStack(err)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
logf("creating proxy '%s'", proxyName)
|
||
|
|
||
|
to := string(conf.Proxy.To)
|
||
|
if to == "" {
|
||
|
to = backend.URL
|
||
|
}
|
||
|
|
||
|
if _, err := proxyRepository.CreateProxy(ctx, proxyName, to, conf.Proxy.From...); err != nil {
|
||
|
return nil, nil, errors.WithStack(err)
|
||
|
}
|
||
|
|
||
|
if _, err := proxyRepository.UpdateProxy(ctx, proxyName, store.WithProxyUpdateEnabled(true)); err != nil {
|
||
|
return nil, nil, errors.WithStack(err)
|
||
|
}
|
||
|
|
||
|
for layerName, layerConf := range conf.Proxy.Layers {
|
||
|
if err := layerRepository.DeleteLayer(ctx, proxyName, store.LayerName(layerName)); err != nil {
|
||
|
return nil, nil, errors.WithStack(err)
|
||
|
}
|
||
|
|
||
|
_, err := layerRepository.CreateLayer(ctx, proxyName, store.LayerName(layerName), store.LayerType(layerConf.Type), layerConf.Options.Data)
|
||
|
if err != nil {
|
||
|
return nil, nil, errors.WithStack(err)
|
||
|
}
|
||
|
|
||
|
_, err = layerRepository.UpdateLayer(ctx, proxyName, store.LayerName(layerName), store.WithLayerUpdateEnabled(bool(layerConf.Enabled)))
|
||
|
if err != nil {
|
||
|
return nil, nil, errors.WithStack(err)
|
||
|
}
|
||
|
|
||
|
}
|
||
|
|
||
|
layers, err := setup.GetLayers(context.Background(), config.NewDefault())
|
||
|
if err != nil {
|
||
|
return nil, nil, errors.WithStack(err)
|
||
|
}
|
||
|
|
||
|
director := director.New(
|
||
|
proxyRepository, layerRepository,
|
||
|
director.WithLayerCache(
|
||
|
ttl.NewCache(
|
||
|
memory.NewCache[string, []*store.Layer](),
|
||
|
memory.NewCache[string, time.Time](),
|
||
|
30*time.Second,
|
||
|
),
|
||
|
),
|
||
|
director.WithProxyCache(
|
||
|
ttl.NewCache(
|
||
|
memory.NewCache[string, []*store.Proxy](),
|
||
|
memory.NewCache[string, time.Time](),
|
||
|
30*time.Second,
|
||
|
),
|
||
|
),
|
||
|
director.WithLayers(layers...),
|
||
|
)
|
||
|
|
||
|
directorMiddleware := director.Middleware()
|
||
|
|
||
|
handler := proxy.New(
|
||
|
proxy.WithRequestTransformers(
|
||
|
director.RequestTransformer(),
|
||
|
),
|
||
|
proxy.WithResponseTransformers(
|
||
|
director.ResponseTransformer(),
|
||
|
),
|
||
|
proxy.WithReverseProxyFactory(func(ctx context.Context, target *url.URL) *httputil.ReverseProxy {
|
||
|
reverse := httputil.NewSingleHostReverseProxy(target)
|
||
|
reverse.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
|
||
|
logf("[ERROR] %s", errors.WithStack(err))
|
||
|
}
|
||
|
return reverse
|
||
|
}),
|
||
|
)
|
||
|
|
||
|
server := httptest.NewServer(directorMiddleware(handler))
|
||
|
|
||
|
return server, backend, nil
|
||
|
}
|
||
|
|
||
|
func waitFor(url string, ttl time.Duration) error {
|
||
|
var lastErr error
|
||
|
timeout := time.After(ttl)
|
||
|
for {
|
||
|
select {
|
||
|
case <-timeout:
|
||
|
if lastErr != nil {
|
||
|
return lastErr
|
||
|
}
|
||
|
|
||
|
return errors.New("wait timed out")
|
||
|
default:
|
||
|
res, err := http.Get(url)
|
||
|
if err != nil {
|
||
|
lastErr = errors.WithStack(err)
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
if res.StatusCode >= 200 && res.StatusCode < 400 {
|
||
|
return nil
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|