diff --git a/internal/admin/bootstrap.go b/internal/admin/bootstrap.go index 3b19921..63b1636 100644 --- a/internal/admin/bootstrap.go +++ b/internal/admin/bootstrap.go @@ -5,7 +5,6 @@ import ( "time" "forge.cadoles.com/cadoles/bouncer/internal/config" - "forge.cadoles.com/cadoles/bouncer/internal/lock/redis" "forge.cadoles.com/cadoles/bouncer/internal/schema" "forge.cadoles.com/cadoles/bouncer/internal/setup" "forge.cadoles.com/cadoles/bouncer/internal/store" @@ -21,10 +20,9 @@ func (s *Server) bootstrapProxies(ctx context.Context) error { proxyRepo := s.proxyRepository layerRepo := s.layerRepository - lockTimeout := time.Duration(s.bootstrapConfig.LockTimeout) - locker := redis.NewLocker(s.redisClient, int(s.bootstrapConfig.MaxConnectionRetries)) + lockTimeout := time.Duration(*s.bootstrapConfig.LockTimeout) - err := locker.WithLock(ctx, "bouncer-admin-bootstrap", lockTimeout, func(ctx context.Context) error { + err := s.locker.WithLock(ctx, "bouncer-admin-bootstrap", lockTimeout, func(ctx context.Context) error { logger.Info(ctx, "bootstrapping proxies") for proxyName, proxyConfig := range s.bootstrapConfig.Proxies { diff --git a/internal/admin/init.go b/internal/admin/init.go index e63479a..c58c498 100644 --- a/internal/admin/init.go +++ b/internal/admin/init.go @@ -5,57 +5,10 @@ import ( "forge.cadoles.com/cadoles/bouncer/internal/integration" "forge.cadoles.com/cadoles/bouncer/internal/jwk" - "forge.cadoles.com/cadoles/bouncer/internal/setup" "github.com/pkg/errors" "gitlab.com/wpetit/goweb/logger" ) -func (s *Server) initRepositories(ctx context.Context) error { - if err := s.initRedisClient(ctx); err != nil { - return errors.WithStack(err) - } - - if err := s.initLayerRepository(ctx); err != nil { - return errors.WithStack(err) - } - - if err := s.initProxyRepository(ctx); err != nil { - return errors.WithStack(err) - } - - return nil -} - -func (s *Server) initRedisClient(ctx context.Context) error { - client := setup.NewSharedClient(s.redisConfig) - - s.redisClient = client - - return nil -} - -func (s *Server) initLayerRepository(ctx context.Context) error { - layerRepository, err := setup.NewLayerRepository(ctx, s.redisClient) - if err != nil { - return errors.WithStack(err) - } - - s.layerRepository = layerRepository - - return nil -} - -func (s *Server) initProxyRepository(ctx context.Context) error { - proxyRepository, err := setup.NewProxyRepository(ctx, s.redisClient) - if err != nil { - return errors.WithStack(err) - } - - s.proxyRepository = proxyRepository - - return nil -} - func (s *Server) initPrivateKey(ctx context.Context) error { localKey, err := jwk.LoadOrGenerate(string(s.serverConfig.Auth.PrivateKey), jwk.DefaultKeySize) if err != nil { diff --git a/internal/admin/option.go b/internal/admin/option.go index 758a96c..3a1fe85 100644 --- a/internal/admin/option.go +++ b/internal/admin/option.go @@ -3,13 +3,19 @@ package admin import ( "forge.cadoles.com/cadoles/bouncer/internal/config" "forge.cadoles.com/cadoles/bouncer/internal/integration" + "forge.cadoles.com/cadoles/bouncer/internal/lock" + "forge.cadoles.com/cadoles/bouncer/internal/store" ) type Option struct { BootstrapConfig config.BootstrapConfig ServerConfig config.AdminServerConfig - RedisConfig config.RedisConfig Integrations []integration.Integration + + ProxyRepository store.ProxyRepository + LayerRepository store.LayerRepository + + Locker lock.Locker } type OptionFunc func(*Option) @@ -17,7 +23,6 @@ type OptionFunc func(*Option) func defaultOption() *Option { return &Option{ ServerConfig: config.NewDefaultAdminServerConfig(), - RedisConfig: config.NewDefaultRedisConfig(), Integrations: make([]integration.Integration, 0), } } @@ -28,12 +33,6 @@ func WithServerConfig(conf config.AdminServerConfig) OptionFunc { } } -func WithRedisConfig(conf config.RedisConfig) OptionFunc { - return func(opt *Option) { - opt.RedisConfig = conf - } -} - func WithBootstrapConfig(conf config.BootstrapConfig) OptionFunc { return func(opt *Option) { opt.BootstrapConfig = conf @@ -45,3 +44,21 @@ func WithIntegrations(integrations ...integration.Integration) OptionFunc { opt.Integrations = integrations } } + +func WithLayerRepository(layerRepository store.LayerRepository) OptionFunc { + return func(opt *Option) { + opt.LayerRepository = layerRepository + } +} + +func WithProxyRepository(proxyRepository store.ProxyRepository) OptionFunc { + return func(opt *Option) { + opt.ProxyRepository = proxyRepository + } +} + +func WithLocker(locker lock.Locker) OptionFunc { + return func(opt *Option) { + opt.Locker = locker + } +} diff --git a/internal/admin/server.go b/internal/admin/server.go index 3efd340..5922997 100644 --- a/internal/admin/server.go +++ b/internal/admin/server.go @@ -15,6 +15,7 @@ import ( "forge.cadoles.com/cadoles/bouncer/internal/config" "forge.cadoles.com/cadoles/bouncer/internal/integration" "forge.cadoles.com/cadoles/bouncer/internal/jwk" + "forge.cadoles.com/cadoles/bouncer/internal/lock" "forge.cadoles.com/cadoles/bouncer/internal/store" sentryhttp "github.com/getsentry/sentry-go/http" "github.com/go-chi/chi/v5" @@ -22,15 +23,13 @@ import ( "github.com/go-chi/cors" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/redis/go-redis/v9" "gitlab.com/wpetit/goweb/logger" ) type Server struct { serverConfig config.AdminServerConfig - redisConfig config.RedisConfig - redisClient redis.UniversalClient + locker lock.Locker integrations []integration.Integration @@ -60,12 +59,6 @@ func (s *Server) run(parentCtx context.Context, addrs chan net.Addr, errs chan e ctx, cancel := context.WithCancel(parentCtx) defer cancel() - if err := s.initRepositories(ctx); err != nil { - errs <- errors.WithStack(err) - - return - } - if err := s.bootstrapProxies(ctx); err != nil { errs <- errors.WithStack(err) @@ -231,8 +224,10 @@ func NewServer(funcs ...OptionFunc) *Server { return &Server{ serverConfig: opt.ServerConfig, - redisConfig: opt.RedisConfig, bootstrapConfig: opt.BootstrapConfig, integrations: opt.Integrations, + proxyRepository: opt.ProxyRepository, + layerRepository: opt.LayerRepository, + locker: opt.Locker, } } diff --git a/internal/command/server/admin/run.go b/internal/command/server/admin/run.go index 334ac5a..a6e141a 100644 --- a/internal/command/server/admin/run.go +++ b/internal/command/server/admin/run.go @@ -6,6 +6,7 @@ import ( "forge.cadoles.com/cadoles/bouncer/internal/admin" "forge.cadoles.com/cadoles/bouncer/internal/command/common" + "forge.cadoles.com/cadoles/bouncer/internal/lock/redis" "forge.cadoles.com/cadoles/bouncer/internal/setup" "github.com/pkg/errors" "github.com/urfave/cli/v2" @@ -46,10 +47,26 @@ func RunCommand() *cli.Command { return errors.Wrap(err, "could not setup integrations") } + redisClient := setup.NewSharedClient(conf.Redis) + + proxyRepository, err := setup.NewProxyRepository(ctx.Context, redisClient) + if err != nil { + return errors.Wrap(err, "could not initialize proxy repository") + } + + layerRepository, err := setup.NewLayerRepository(ctx.Context, redisClient) + if err != nil { + return errors.Wrap(err, "could not initialize layer repository") + } + + locker := redis.NewLocker(redisClient, int(conf.Bootstrap.MaxConnectionRetries)) + srv := admin.NewServer( admin.WithServerConfig(conf.Admin), - admin.WithRedisConfig(conf.Redis), admin.WithBootstrapConfig(conf.Bootstrap), + admin.WithProxyRepository(proxyRepository), + admin.WithLayerRepository(layerRepository), + admin.WithLocker(locker), admin.WithIntegrations(integrations...), ) diff --git a/internal/command/server/proxy/run.go b/internal/command/server/proxy/run.go index 08e614f..63e3e4c 100644 --- a/internal/command/server/proxy/run.go +++ b/internal/command/server/proxy/run.go @@ -2,12 +2,16 @@ package proxy import ( "fmt" + "os" + "os/signal" "strings" + "syscall" "time" "forge.cadoles.com/cadoles/bouncer/internal/command/common" "forge.cadoles.com/cadoles/bouncer/internal/proxy" "forge.cadoles.com/cadoles/bouncer/internal/setup" + "forge.cadoles.com/cadoles/bouncer/internal/store" "github.com/pkg/errors" "github.com/urfave/cli/v2" "gitlab.com/wpetit/goweb/logger" @@ -47,15 +51,61 @@ func RunCommand() *cli.Command { return errors.Wrap(err, "could not initialize director layers") } + redisClient := setup.NewSharedClient(conf.Redis) + + proxyRepository, err := setup.NewProxyRepository(ctx.Context, redisClient) + if err != nil { + return errors.Wrap(err, "could not initialize proxy repository") + } + + layerRepository, err := setup.NewLayerRepository(ctx.Context, redisClient) + if err != nil { + return errors.Wrap(err, "could not initialize layer repository") + } + srv := proxy.NewServer( proxy.WithServerConfig(conf.Proxy), - proxy.WithRedisConfig(conf.Redis), + proxy.WithProxyRepository(proxyRepository), + proxy.WithLayerRepository(layerRepository), proxy.WithDirectorLayers(layers...), proxy.WithDirectorCacheTTL(time.Duration(*conf.Proxy.Cache.TTL)), ) addrs, srvErrs := srv.Start(ctx.Context) + if observableProxyRepository, ok := proxyRepository.(store.Observable); ok { + logger.Info(ctx.Context, "observing proxy repository changes") + observableProxyRepository.Changes(ctx.Context, func(c store.Change) { + logger.Info(ctx.Context, "proxy change detected, clearing cache") + srv.ClearProxyCache() + }) + } + + if observableLayerRepository, ok := layerRepository.(store.Observable); ok { + logger.Info(ctx.Context, "observing layer repository changes") + observableLayerRepository.Changes(ctx.Context, func(c store.Change) { + logger.Info(ctx.Context, "layer change detected, clearing cache") + srv.ClearLayerCache() + }) + } + + // Clear director's cache on SIGUSR2 + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGUSR2) + + go func() { + for { + select { + case <-sig: + logger.Info(ctx.Context, "received sigusr2, clearing whole cache") + srv.ClearProxyCache() + srv.ClearLayerCache() + case <-ctx.Context.Done(): + return + } + } + }() + select { case addr := <-addrs: url := fmt.Sprintf("http://%s", addr.String()) diff --git a/internal/config/bootstrap.go b/internal/config/bootstrap.go index 97bc97a..ae447e6 100644 --- a/internal/config/bootstrap.go +++ b/internal/config/bootstrap.go @@ -14,7 +14,7 @@ import ( type BootstrapConfig struct { Proxies map[store.ProxyName]BootstrapProxyConfig `yaml:"proxies"` Dir InterpolatedString `yaml:"dir"` - LockTimeout InterpolatedDuration `yaml:"lockTimeout"` + LockTimeout *InterpolatedDuration `yaml:"lockTimeout"` MaxConnectionRetries InterpolatedInt `yaml:"maxRetries"` } @@ -65,7 +65,7 @@ type BootstrapLayerConfig struct { func NewDefaultBootstrapConfig() BootstrapConfig { return BootstrapConfig{ Dir: "", - LockTimeout: *NewInterpolatedDuration(30 * time.Second), + LockTimeout: NewInterpolatedDuration(30 * time.Second), MaxConnectionRetries: 10, } } diff --git a/internal/config/environment.go b/internal/config/environment.go index 9bfca66..f9b6a71 100644 --- a/internal/config/environment.go +++ b/internal/config/environment.go @@ -220,3 +220,11 @@ func NewInterpolatedDuration(d time.Duration) *InterpolatedDuration { id := InterpolatedDuration(d) return &id } + +func Default[T any](value *T, defaultValue *T) *T { + if value == nil { + return defaultValue + } + + return value +} diff --git a/internal/config/integrations.go b/internal/config/integrations.go index 6a6c687..170921f 100644 --- a/internal/config/integrations.go +++ b/internal/config/integrations.go @@ -16,18 +16,18 @@ func NewDefaultIntegrationsConfig() IntegrationsConfig { PrivateKeySecret: "", PrivateKeySecretNamespace: "", ReaderTokenSecret: "", - LockTimeout: *NewInterpolatedDuration(30 * time.Second), + LockTimeout: NewInterpolatedDuration(30 * time.Second), }, } } type KubernetesConfig struct { - Enabled InterpolatedBool `yaml:"enabled"` - WriterTokenSecret InterpolatedString `yaml:"writerTokenSecret"` - WriterTokenSecretNamespace InterpolatedString `yaml:"writerTokenSecretNamespace"` - ReaderTokenSecret InterpolatedString `yaml:"readerTokenSecret"` - ReaderTokenSecretNamespace InterpolatedString `yaml:"readerTokenSecretNamespace"` - PrivateKeySecret InterpolatedString `yaml:"privateKeySecret"` - PrivateKeySecretNamespace InterpolatedString `yaml:"privateKeySecretNamespace"` - LockTimeout InterpolatedDuration `yaml:"lockTimeout"` + Enabled InterpolatedBool `yaml:"enabled"` + WriterTokenSecret InterpolatedString `yaml:"writerTokenSecret"` + WriterTokenSecretNamespace InterpolatedString `yaml:"writerTokenSecretNamespace"` + ReaderTokenSecret InterpolatedString `yaml:"readerTokenSecret"` + ReaderTokenSecretNamespace InterpolatedString `yaml:"readerTokenSecretNamespace"` + PrivateKeySecret InterpolatedString `yaml:"privateKeySecret"` + PrivateKeySecretNamespace InterpolatedString `yaml:"privateKeySecretNamespace"` + LockTimeout *InterpolatedDuration `yaml:"lockTimeout"` } diff --git a/internal/config/redis.go b/internal/config/redis.go index 7146154..2664ee8 100644 --- a/internal/config/redis.go +++ b/internal/config/redis.go @@ -11,33 +11,36 @@ const ( type RedisConfig struct { Adresses InterpolatedStringSlice `yaml:"addresses"` Master InterpolatedString `yaml:"master"` - ReadTimeout InterpolatedDuration `yaml:"readTimeout"` - WriteTimeout InterpolatedDuration `yaml:"writeTimeout"` - DialTimeout InterpolatedDuration `yaml:"dialTimeout"` + ReadTimeout *InterpolatedDuration `yaml:"readTimeout"` + WriteTimeout *InterpolatedDuration `yaml:"writeTimeout"` + DialTimeout *InterpolatedDuration `yaml:"dialTimeout"` LockMaxRetries InterpolatedInt `yaml:"lockMaxRetries"` RouteByLatency InterpolatedBool `yaml:"routeByLatency"` ContextTimeoutEnabled InterpolatedBool `yaml:"contextTimeoutEnabled"` MaxRetries InterpolatedInt `yaml:"maxRetries"` - PingInterval InterpolatedDuration `yaml:"pingInterval"` + PingInterval *InterpolatedDuration `yaml:"pingInterval"` PoolSize InterpolatedInt `yaml:"poolSize"` - PoolTimeout InterpolatedDuration `yaml:"poolTimeout"` + PoolTimeout *InterpolatedDuration `yaml:"poolTimeout"` MinIdleConns InterpolatedInt `yaml:"minIdleConns"` MaxIdleConns InterpolatedInt `yaml:"maxIdleConns"` - ConnMaxIdleTime InterpolatedDuration `yaml:"connMaxIdleTime"` - ConnMaxLifetime InterpolatedDuration `yaml:"connMaxLifeTime"` + ConnMaxIdleTime *InterpolatedDuration `yaml:"connMaxIdleTime"` + ConnMaxLifetime *InterpolatedDuration `yaml:"connMaxLifeTime"` } func NewDefaultRedisConfig() RedisConfig { return RedisConfig{ Adresses: InterpolatedStringSlice{"localhost:6379"}, Master: "", - ReadTimeout: InterpolatedDuration(30 * time.Second), - WriteTimeout: InterpolatedDuration(30 * time.Second), - DialTimeout: InterpolatedDuration(30 * time.Second), + ReadTimeout: NewInterpolatedDuration(30 * time.Second), + WriteTimeout: NewInterpolatedDuration(30 * time.Second), + DialTimeout: NewInterpolatedDuration(30 * time.Second), + PoolTimeout: NewInterpolatedDuration(31 * time.Second), LockMaxRetries: 10, MaxRetries: 3, - PingInterval: InterpolatedDuration(30 * time.Second), + PingInterval: NewInterpolatedDuration(30 * time.Second), ContextTimeoutEnabled: true, RouteByLatency: true, + ConnMaxIdleTime: NewInterpolatedDuration(1 * time.Minute), + ConnMaxLifetime: NewInterpolatedDuration(5 * time.Minute), } } diff --git a/internal/proxy/init.go b/internal/proxy/init.go deleted file mode 100644 index dd54ecd..0000000 --- a/internal/proxy/init.go +++ /dev/null @@ -1,45 +0,0 @@ -package proxy - -import ( - "context" - - "forge.cadoles.com/cadoles/bouncer/internal/setup" - "github.com/pkg/errors" - "github.com/redis/go-redis/v9" -) - -func (s *Server) initRepositories(ctx context.Context) error { - client := setup.NewSharedClient(s.redisConfig) - - if err := s.initProxyRepository(ctx, client); err != nil { - return errors.WithStack(err) - } - - if err := s.initLayerRepository(ctx, client); err != nil { - return errors.WithStack(err) - } - - return nil -} - -func (s *Server) initProxyRepository(ctx context.Context, client redis.UniversalClient) error { - proxyRepository, err := setup.NewProxyRepository(ctx, client) - if err != nil { - return errors.WithStack(err) - } - - s.proxyRepository = proxyRepository - - return nil -} - -func (s *Server) initLayerRepository(ctx context.Context, client redis.UniversalClient) error { - layerRepository, err := setup.NewLayerRepository(ctx, client) - if err != nil { - return errors.WithStack(err) - } - - s.layerRepository = layerRepository - - return nil -} diff --git a/internal/proxy/option.go b/internal/proxy/option.go index aa1e2a4..fdc91aa 100644 --- a/internal/proxy/option.go +++ b/internal/proxy/option.go @@ -5,13 +5,16 @@ import ( "forge.cadoles.com/cadoles/bouncer/internal/config" "forge.cadoles.com/cadoles/bouncer/internal/proxy/director" + "forge.cadoles.com/cadoles/bouncer/internal/store" ) type Option struct { ServerConfig config.ProxyServerConfig - RedisConfig config.RedisConfig DirectorLayers []director.Layer DirectorCacheTTL time.Duration + + ProxyRepository store.ProxyRepository + LayerRepository store.LayerRepository } type OptionFunc func(*Option) @@ -19,7 +22,6 @@ type OptionFunc func(*Option) func defaultOption() *Option { return &Option{ ServerConfig: config.NewDefaultProxyServerConfig(), - RedisConfig: config.NewDefaultRedisConfig(), DirectorLayers: make([]director.Layer, 0), DirectorCacheTTL: 30 * time.Second, } @@ -31,12 +33,6 @@ func WithServerConfig(conf config.ProxyServerConfig) OptionFunc { } } -func WithRedisConfig(conf config.RedisConfig) OptionFunc { - return func(opt *Option) { - opt.RedisConfig = conf - } -} - func WithDirectorLayers(layers ...director.Layer) OptionFunc { return func(opt *Option) { opt.DirectorLayers = layers @@ -48,3 +44,15 @@ func WithDirectorCacheTTL(ttl time.Duration) OptionFunc { opt.DirectorCacheTTL = ttl } } + +func WithLayerRepository(layerRepository store.LayerRepository) OptionFunc { + return func(opt *Option) { + opt.LayerRepository = layerRepository + } +} + +func WithProxyRepository(proxyRepository store.ProxyRepository) OptionFunc { + return func(opt *Option) { + opt.ProxyRepository = proxyRepository + } +} diff --git a/internal/proxy/server.go b/internal/proxy/server.go index 8759c57..995b585 100644 --- a/internal/proxy/server.go +++ b/internal/proxy/server.go @@ -13,14 +13,12 @@ import ( "net/http/httputil" "net/http/pprof" "net/url" - "os" - "os/signal" "path/filepath" "strconv" - "syscall" "time" "forge.cadoles.com/Cadoles/go-proxy" + "forge.cadoles.com/cadoles/bouncer/internal/cache" "forge.cadoles.com/cadoles/bouncer/internal/cache/memory" "forge.cadoles.com/cadoles/bouncer/internal/cache/ttl" bouncerChi "forge.cadoles.com/cadoles/bouncer/internal/chi" @@ -38,12 +36,14 @@ import ( ) type Server struct { - serverConfig config.ProxyServerConfig - redisConfig config.RedisConfig - directorLayers []director.Layer - directorCacheTTL time.Duration - proxyRepository store.ProxyRepository - layerRepository store.LayerRepository + serverConfig config.ProxyServerConfig + directorLayers []director.Layer + + proxyRepository store.ProxyRepository + layerRepository store.LayerRepository + + layerCache cache.Cache[string, []*store.Layer] + proxyCache cache.Cache[string, []*store.Proxy] } func (s *Server) Start(ctx context.Context) (<-chan net.Addr, <-chan error) { @@ -55,6 +55,14 @@ func (s *Server) Start(ctx context.Context) (<-chan net.Addr, <-chan error) { return addrs, errs } +func (s *Server) ClearProxyCache() { + s.proxyCache.Clear() +} + +func (s *Server) ClearLayerCache() { + s.layerCache.Clear() +} + func (s *Server) run(parentCtx context.Context, addrs chan net.Addr, errs chan error) { defer func() { close(errs) @@ -64,12 +72,6 @@ func (s *Server) run(parentCtx context.Context, addrs chan net.Addr, errs chan e ctx, cancel := context.WithCancel(parentCtx) defer cancel() - if err := s.initRepositories(ctx); err != nil { - errs <- errors.WithStack(err) - - return - } - listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", s.serverConfig.HTTP.Host, s.serverConfig.HTTP.Port)) if err != nil { errs <- errors.WithStack(err) @@ -97,15 +99,12 @@ func (s *Server) run(parentCtx context.Context, addrs chan net.Addr, errs chan e logger.Info(ctx, "http server listening") - layerCache, proxyCache, cancel := s.createDirectorCaches(ctx) - defer cancel() - director := director.New( s.proxyRepository, s.layerRepository, director.WithLayers(s.directorLayers...), - director.WithLayerCache(layerCache), - director.WithProxyCache(proxyCache), + director.WithLayerCache(s.layerCache), + director.WithProxyCache(s.proxyCache), director.WithHandleErrorFunc(s.handleError), ) @@ -199,44 +198,6 @@ func (s *Server) run(parentCtx context.Context, addrs chan net.Addr, errs chan e logger.Info(ctx, "http server exiting") } -func (s *Server) createDirectorCaches(ctx context.Context) (*ttl.Cache[string, []*store.Layer], *ttl.Cache[string, []*store.Proxy], func()) { - layerCache := ttl.NewCache( - memory.NewCache[string, []*store.Layer](), - memory.NewCache[string, time.Time](), - s.directorCacheTTL, - ) - - proxyCache := ttl.NewCache( - memory.NewCache[string, []*store.Proxy](), - memory.NewCache[string, time.Time](), - s.directorCacheTTL, - ) - - sig := make(chan os.Signal, 1) - - signal.Notify(sig, syscall.SIGUSR2) - - go func() { - for { - _, ok := <-sig - if !ok { - return - } - - logger.Info(ctx, "received sigusr2 signal, clearing proxies and layers cache") - - layerCache.Clear() - proxyCache.Clear() - } - }() - - cancel := func() { - close(sig) - } - - return layerCache, proxyCache, cancel -} - func (s *Server) createReverseProxy(ctx context.Context, target *url.URL) *httputil.ReverseProxy { reverseProxy := httputil.NewSingleHostReverseProxy(target) @@ -345,9 +306,21 @@ func NewServer(funcs ...OptionFunc) *Server { } return &Server{ - serverConfig: opt.ServerConfig, - redisConfig: opt.RedisConfig, - directorLayers: opt.DirectorLayers, - directorCacheTTL: opt.DirectorCacheTTL, + serverConfig: opt.ServerConfig, + directorLayers: opt.DirectorLayers, + + proxyRepository: opt.ProxyRepository, + layerRepository: opt.LayerRepository, + + layerCache: ttl.NewCache( + memory.NewCache[string, []*store.Layer](), + memory.NewCache[string, time.Time](), + opt.DirectorCacheTTL, + ), + proxyCache: ttl.NewCache( + memory.NewCache[string, []*store.Proxy](), + memory.NewCache[string, time.Time](), + opt.DirectorCacheTTL, + ), } } diff --git a/internal/setup/integrations.go b/internal/setup/integrations.go index 23675d9..5e43115 100644 --- a/internal/setup/integrations.go +++ b/internal/setup/integrations.go @@ -39,7 +39,7 @@ func setupKubernetesIntegration(ctx context.Context, conf *config.Config) (*kube kubernetes.WithPrivateKeySecretNamespace(string(conf.Integrations.Kubernetes.PrivateKeySecretNamespace)), kubernetes.WithIssuer(string(conf.Admin.Auth.Issuer)), kubernetes.WithLocker(locker), - kubernetes.WithLockTimeout(time.Duration(conf.Integrations.Kubernetes.LockTimeout)), + kubernetes.WithLockTimeout(time.Duration(*conf.Integrations.Kubernetes.LockTimeout)), ) return integration, nil diff --git a/internal/setup/redis.go b/internal/setup/redis.go index 486d1a7..e62a148 100644 --- a/internal/setup/redis.go +++ b/internal/setup/redis.go @@ -35,44 +35,52 @@ func newRedisClient(conf config.RedisConfig) redis.UniversalClient { client := redis.NewUniversalClient(&redis.UniversalOptions{ Addrs: conf.Adresses, MasterName: string(conf.Master), - ReadTimeout: time.Duration(conf.ReadTimeout), - WriteTimeout: time.Duration(conf.WriteTimeout), - DialTimeout: time.Duration(conf.DialTimeout), + ReadTimeout: time.Duration(*conf.ReadTimeout), + WriteTimeout: time.Duration(*conf.WriteTimeout), + DialTimeout: time.Duration(*conf.DialTimeout), RouteByLatency: bool(conf.RouteByLatency), ContextTimeoutEnabled: bool(conf.ContextTimeoutEnabled), MaxRetries: int(conf.MaxRetries), PoolSize: int(conf.PoolSize), - PoolTimeout: time.Duration(conf.PoolTimeout), + PoolTimeout: time.Duration(*conf.PoolTimeout), MinIdleConns: int(conf.MinIdleConns), MaxIdleConns: int(conf.MaxIdleConns), - ConnMaxIdleTime: time.Duration(conf.ConnMaxIdleTime), - ConnMaxLifetime: time.Duration(conf.ConnMaxLifetime), + ConnMaxIdleTime: time.Duration(*conf.ConnMaxIdleTime), + ConnMaxLifetime: time.Duration(*conf.ConnMaxLifetime), }) - go func() { - ctx := logger.With(context.Background(), - logger.F("adresses", conf.Adresses), - logger.F("master", conf.Master), - ) + ctx := context.Background() - timer := time.NewTicker(time.Duration(conf.PingInterval)) - defer timer.Stop() + pingInterval := time.Duration(*conf.PingInterval) - connected := true + if pingInterval > 0 { + go func() { + ctx := logger.With(ctx, + logger.F("adresses", conf.Adresses), + logger.F("master", conf.Master), + ) - for range timer.C { - if _, err := client.Ping(ctx).Result(); err != nil { - logger.Error(ctx, "redis disconnected", logger.CapturedE(errors.WithStack(err))) - connected = false - continue + timer := time.NewTicker(pingInterval) + defer timer.Stop() + + connected := true + + for range timer.C { + if _, err := client.Ping(ctx).Result(); err != nil { + logger.Error(ctx, "redis disconnected", logger.CapturedE(errors.WithStack(err))) + connected = false + continue + } + + if !connected { + logger.Info(ctx, "redis reconnected") + connected = true + } } - - if !connected { - logger.Info(ctx, "redis reconnected") - connected = true - } - } - }() + }() + } else { + logger.Warn(ctx, "redis ping interval at 0, disabling") + } return client } diff --git a/internal/setup/proxy_repository.go b/internal/setup/repository.go similarity index 100% rename from internal/setup/proxy_repository.go rename to internal/setup/repository.go diff --git a/internal/store/observable.go b/internal/store/observable.go new file mode 100644 index 0000000..4950a48 --- /dev/null +++ b/internal/store/observable.go @@ -0,0 +1,11 @@ +package store + +import "context" + +type Change interface { + Change() +} + +type Observable interface { + Changes(ctx context.Context, fn func(Change)) +} diff --git a/internal/store/redis/layer_observable.go b/internal/store/redis/layer_observable.go new file mode 100644 index 0000000..baf52aa --- /dev/null +++ b/internal/store/redis/layer_observable.go @@ -0,0 +1,87 @@ +package redis + +import ( + "bytes" + "context" + "encoding/gob" + + "forge.cadoles.com/cadoles/bouncer/internal/store" + "github.com/pkg/errors" + "gitlab.com/wpetit/goweb/logger" +) + +func init() { + gob.Register(&LayerChange{}) +} + +type LayerChange struct { + Operation ChangeOperation + Proxy store.ProxyName + Layer store.LayerName +} + +// Change implements store.Change. +func (p *LayerChange) Change() {} + +func NewLayerChange(op ChangeOperation, proxyName store.ProxyName, layerName store.LayerName) *LayerChange { + return &LayerChange{ + Operation: op, + Proxy: proxyName, + Layer: layerName, + } +} + +var _ store.Change = &ProxyChange{} + +const layerChangeChannel string = "layer-changes" + +// Changes implements store.Observable. +func (r *LayerRepository) Changes(ctx context.Context, fn func(store.Change)) { + go func() { + sub := r.client.Subscribe(ctx, layerChangeChannel) + defer sub.Close() + + for { + var buff bytes.Buffer + decoder := gob.NewDecoder(&buff) + + msg, err := sub.ReceiveMessage(ctx) + if err != nil { + logger.Error(ctx, "could not receive message", logger.E(errors.WithStack(err))) + return + } + + buff.Reset() + buff.WriteString(msg.Payload) + + change := &ProxyChange{} + if err := decoder.Decode(change); err != nil { + logger.Error(ctx, "could not decode message", logger.E(errors.WithStack(err))) + continue + } + + fn(change) + } + }() +} + +func (r *LayerRepository) notifyChange(op ChangeOperation, proxyName store.ProxyName, layerName store.LayerName) { + change := NewLayerChange(op, proxyName, layerName) + + var buff bytes.Buffer + encoder := gob.NewEncoder(&buff) + + ctx := context.Background() + + if err := encoder.Encode(change); err != nil { + logger.Error(ctx, "could not encode message", logger.E(errors.WithStack(err))) + return + } + + if err := r.client.Publish(ctx, layerChangeChannel, buff.Bytes()).Err(); err != nil { + logger.Error(ctx, "could not publish message", logger.E(errors.WithStack(err))) + return + } +} + +var _ store.Observable = &LayerRepository{} diff --git a/internal/store/redis/layer_repository.go b/internal/store/redis/layer_repository.go index 20bd705..ba16497 100644 --- a/internal/store/redis/layer_repository.go +++ b/internal/store/redis/layer_repository.go @@ -73,6 +73,8 @@ func (r *LayerRepository) CreateLayer(ctx context.Context, proxyName store.Proxy return nil, errors.WithStack(err) } + go r.notifyChange(CreateOperation, proxyName, layerName) + return &store.Layer{ LayerHeader: store.LayerHeader{ Name: store.LayerName(layerItem.Name), @@ -96,6 +98,8 @@ func (r *LayerRepository) DeleteLayer(ctx context.Context, proxyName store.Proxy return errors.WithStack(cmd.Err()) } + go r.notifyChange(DeleteOperation, proxyName, layerName) + return nil } @@ -249,6 +253,8 @@ func (r *LayerRepository) UpdateLayer(ctx context.Context, proxyName store.Proxy return nil, errors.WithStack(err) } + go r.notifyChange(UpdateOperation, proxyName, layerName) + return layer, nil } diff --git a/internal/store/redis/observable.go b/internal/store/redis/observable.go new file mode 100644 index 0000000..469a191 --- /dev/null +++ b/internal/store/redis/observable.go @@ -0,0 +1,9 @@ +package redis + +type ChangeOperation int + +const ( + CreateOperation ChangeOperation = iota + UpdateOperation + DeleteOperation +) diff --git a/internal/store/redis/proxy_observable.go b/internal/store/redis/proxy_observable.go new file mode 100644 index 0000000..acb8986 --- /dev/null +++ b/internal/store/redis/proxy_observable.go @@ -0,0 +1,85 @@ +package redis + +import ( + "bytes" + "context" + "encoding/gob" + + "forge.cadoles.com/cadoles/bouncer/internal/store" + "github.com/pkg/errors" + "gitlab.com/wpetit/goweb/logger" +) + +func init() { + gob.Register(&ProxyChange{}) +} + +type ProxyChange struct { + Operation ChangeOperation + Proxy store.ProxyName +} + +// Change implements store.Change. +func (p *ProxyChange) Change() {} + +func NewProxyChange(op ChangeOperation, name store.ProxyName) *ProxyChange { + return &ProxyChange{ + Operation: op, + Proxy: name, + } +} + +var _ store.Change = &ProxyChange{} + +const proxyChangeChannel string = "proxy-changes" + +// Changes implements store.Observable. +func (r *ProxyRepository) Changes(ctx context.Context, fn func(store.Change)) { + go func() { + sub := r.client.Subscribe(ctx, proxyChangeChannel) + defer sub.Close() + + for { + var buff bytes.Buffer + decoder := gob.NewDecoder(&buff) + + msg, err := sub.ReceiveMessage(ctx) + if err != nil { + logger.Error(ctx, "could not receive message", logger.E(errors.WithStack(err))) + return + } + + buff.Reset() + buff.WriteString(msg.Payload) + + change := &ProxyChange{} + if err := decoder.Decode(change); err != nil { + logger.Error(ctx, "could not decode message", logger.E(errors.WithStack(err))) + continue + } + + fn(change) + } + }() +} + +func (r *ProxyRepository) notifyChange(op ChangeOperation, name store.ProxyName) { + change := NewProxyChange(op, name) + + var buff bytes.Buffer + encoder := gob.NewEncoder(&buff) + + ctx := context.Background() + + if err := encoder.Encode(change); err != nil { + logger.Error(ctx, "could not encode message", logger.E(errors.WithStack(err))) + return + } + + if err := r.client.Publish(ctx, proxyChangeChannel, buff.Bytes()).Err(); err != nil { + logger.Error(ctx, "could not publish message", logger.E(errors.WithStack(err))) + return + } +} + +var _ store.Observable = &ProxyRepository{} diff --git a/internal/store/redis/proxy_repository.go b/internal/store/redis/proxy_repository.go index 9eefd75..2998a72 100644 --- a/internal/store/redis/proxy_repository.go +++ b/internal/store/redis/proxy_repository.go @@ -117,6 +117,8 @@ func (r *ProxyRepository) CreateProxy(ctx context.Context, name store.ProxyName, return nil, errors.WithStack(err) } + go r.notifyChange(CreateOperation, name) + return &store.Proxy{ ProxyHeader: store.ProxyHeader{ Name: name, @@ -139,6 +141,8 @@ func (r *ProxyRepository) DeleteProxy(ctx context.Context, name store.ProxyName) return errors.WithStack(cmd.Err()) } + go r.notifyChange(DeleteOperation, name) + return nil } @@ -242,6 +246,8 @@ func (r *ProxyRepository) UpdateProxy(ctx context.Context, name store.ProxyName, return nil, errors.WithStack(err) } + go r.notifyChange(UpdateOperation, name) + return proxy, nil }