feat: add limited retry mechanism to prevent startup error if redis is not ready
Cadoles/bouncer/pipeline/head This commit looks good
Details
Cadoles/bouncer/pipeline/head This commit looks good
Details
This commit is contained in:
parent
ad907576dc
commit
83fcb9a39d
|
@ -22,7 +22,7 @@ func (s *Server) bootstrapProxies(ctx context.Context) error {
|
||||||
layerRepo := s.layerRepository
|
layerRepo := s.layerRepository
|
||||||
|
|
||||||
lockTimeout := time.Duration(s.bootstrapConfig.LockTimeout)
|
lockTimeout := time.Duration(s.bootstrapConfig.LockTimeout)
|
||||||
locker := redis.NewLocker(s.redisClient)
|
locker := redis.NewLocker(s.redisClient, int(s.bootstrapConfig.MaxConnectionRetries))
|
||||||
|
|
||||||
err := locker.WithLock(ctx, "bouncer-admin-bootstrap", lockTimeout, func(ctx context.Context) error {
|
err := locker.WithLock(ctx, "bouncer-admin-bootstrap", lockTimeout, func(ctx context.Context) error {
|
||||||
logger.Info(ctx, "bootstrapping proxies")
|
logger.Info(ctx, "bootstrapping proxies")
|
||||||
|
|
|
@ -15,6 +15,7 @@ type BootstrapConfig struct {
|
||||||
Proxies map[store.ProxyName]BootstrapProxyConfig `yaml:"proxies"`
|
Proxies map[store.ProxyName]BootstrapProxyConfig `yaml:"proxies"`
|
||||||
Dir InterpolatedString `yaml:"dir"`
|
Dir InterpolatedString `yaml:"dir"`
|
||||||
LockTimeout InterpolatedDuration `yaml:"lockTimeout"`
|
LockTimeout InterpolatedDuration `yaml:"lockTimeout"`
|
||||||
|
MaxConnectionRetries InterpolatedInt `yaml:"maxRetries"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *BootstrapConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
|
func (c *BootstrapConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
|
||||||
|
@ -64,6 +65,7 @@ func NewDefaultBootstrapConfig() BootstrapConfig {
|
||||||
return BootstrapConfig{
|
return BootstrapConfig{
|
||||||
Dir: "",
|
Dir: "",
|
||||||
LockTimeout: *NewInterpolatedDuration(30 * time.Second),
|
LockTimeout: *NewInterpolatedDuration(30 * time.Second),
|
||||||
|
MaxConnectionRetries: 10,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -14,6 +14,7 @@ type RedisConfig struct {
|
||||||
ReadTimeout InterpolatedDuration `yaml:"readTimeout"`
|
ReadTimeout InterpolatedDuration `yaml:"readTimeout"`
|
||||||
WriteTimeout InterpolatedDuration `yaml:"writeTimeout"`
|
WriteTimeout InterpolatedDuration `yaml:"writeTimeout"`
|
||||||
DialTimeout InterpolatedDuration `yaml:"dialTimeout"`
|
DialTimeout InterpolatedDuration `yaml:"dialTimeout"`
|
||||||
|
LockMaxRetries InterpolatedInt `yaml:"lockMaxRetries"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDefaultRedisConfig() RedisConfig {
|
func NewDefaultRedisConfig() RedisConfig {
|
||||||
|
@ -23,5 +24,6 @@ func NewDefaultRedisConfig() RedisConfig {
|
||||||
ReadTimeout: InterpolatedDuration(30 * time.Second),
|
ReadTimeout: InterpolatedDuration(30 * time.Second),
|
||||||
WriteTimeout: InterpolatedDuration(30 * time.Second),
|
WriteTimeout: InterpolatedDuration(30 * time.Second),
|
||||||
DialTimeout: InterpolatedDuration(30 * time.Second),
|
DialTimeout: InterpolatedDuration(30 * time.Second),
|
||||||
|
LockMaxRetries: 10,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,7 +13,7 @@ import (
|
||||||
|
|
||||||
type Locker struct {
|
type Locker struct {
|
||||||
client redis.UniversalClient
|
client redis.UniversalClient
|
||||||
timeout time.Duration
|
maxRetries int
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithLock implements lock.Locker.
|
// WithLock implements lock.Locker.
|
||||||
|
@ -26,6 +26,7 @@ func (l *Locker) WithLock(ctx context.Context, key string, timeout time.Duration
|
||||||
|
|
||||||
logger.Debug(ctx, "acquiring lock")
|
logger.Debug(ctx, "acquiring lock")
|
||||||
|
|
||||||
|
err := retryWithBackoff(ctx, l.maxRetries, func(ctx context.Context) error {
|
||||||
lock, err := locker.Obtain(ctx, key, timeout, &redislock.Options{
|
lock, err := locker.Obtain(ctx, key, timeout, &redislock.Options{
|
||||||
RetryStrategy: backoff,
|
RetryStrategy: backoff,
|
||||||
})
|
})
|
||||||
|
@ -48,11 +49,18 @@ func (l *Locker) WithLock(ctx context.Context, key string, timeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLocker(client redis.UniversalClient) *Locker {
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewLocker(client redis.UniversalClient, maxRetries int) *Locker {
|
||||||
return &Locker{
|
return &Locker{
|
||||||
client: client,
|
client: client,
|
||||||
|
maxRetries: maxRetries,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,42 @@
|
||||||
|
package redis
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"gitlab.com/wpetit/goweb/logger"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
baseWatchBackoffDelay = time.Millisecond * 500
|
||||||
|
maxDelay = time.Minute * 10
|
||||||
|
)
|
||||||
|
|
||||||
|
func retryWithBackoff(ctx context.Context, attempts int, fn func(ctx context.Context) error) error {
|
||||||
|
backoffDelay := baseWatchBackoffDelay
|
||||||
|
count := 0
|
||||||
|
|
||||||
|
for {
|
||||||
|
err := fn(ctx)
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
err = errors.WithStack(err)
|
||||||
|
|
||||||
|
count++
|
||||||
|
if count >= attempts {
|
||||||
|
return errors.Wrapf(err, "execution failed after %d attempts", attempts)
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Error(ctx, "error while executing func, retrying with backoff", logger.E(err), logger.F("backoffDelay", backoffDelay), logger.F("remainingAttempts", attempts-count))
|
||||||
|
|
||||||
|
time.Sleep(backoffDelay)
|
||||||
|
|
||||||
|
backoffDelay *= 2
|
||||||
|
if backoffDelay > maxDelay {
|
||||||
|
backoffDelay = maxDelay
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -28,7 +28,7 @@ func SetupIntegrations(ctx context.Context, conf *config.Config) ([]integration.
|
||||||
|
|
||||||
func setupKubernetesIntegration(ctx context.Context, conf *config.Config) (*kubernetes.Integration, error) {
|
func setupKubernetesIntegration(ctx context.Context, conf *config.Config) (*kubernetes.Integration, error) {
|
||||||
client := newRedisClient(conf.Redis)
|
client := newRedisClient(conf.Redis)
|
||||||
locker := redis.NewLocker(client)
|
locker := redis.NewLocker(client, 10)
|
||||||
|
|
||||||
integration := kubernetes.NewIntegration(
|
integration := kubernetes.NewIntegration(
|
||||||
kubernetes.WithReaderTokenSecret(string(conf.Integrations.Kubernetes.ReaderTokenSecret)),
|
kubernetes.WithReaderTokenSecret(string(conf.Integrations.Kubernetes.ReaderTokenSecret)),
|
||||||
|
|
|
@ -10,6 +10,6 @@ import (
|
||||||
|
|
||||||
func SetupLocker(ctx context.Context, conf *config.Config) (lock.Locker, error) {
|
func SetupLocker(ctx context.Context, conf *config.Config) (lock.Locker, error) {
|
||||||
client := newRedisClient(conf.Redis)
|
client := newRedisClient(conf.Redis)
|
||||||
locker := redis.NewLocker(client)
|
locker := redis.NewLocker(client, int(conf.Redis.LockMaxRetries))
|
||||||
return locker, nil
|
return locker, nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue