diff --git a/internal/admin/bootstrap.go b/internal/admin/bootstrap.go index 4a2f11c..0e49f1e 100644 --- a/internal/admin/bootstrap.go +++ b/internal/admin/bootstrap.go @@ -22,7 +22,7 @@ func (s *Server) bootstrapProxies(ctx context.Context) error { layerRepo := s.layerRepository lockTimeout := time.Duration(s.bootstrapConfig.LockTimeout) - locker := redis.NewLocker(s.redisClient) + locker := redis.NewLocker(s.redisClient, int(s.bootstrapConfig.MaxConnectionRetries)) err := locker.WithLock(ctx, "bouncer-admin-bootstrap", lockTimeout, func(ctx context.Context) error { logger.Info(ctx, "bootstrapping proxies") diff --git a/internal/config/bootstrap.go b/internal/config/bootstrap.go index 750c0a2..c113250 100644 --- a/internal/config/bootstrap.go +++ b/internal/config/bootstrap.go @@ -12,9 +12,10 @@ import ( ) type BootstrapConfig struct { - Proxies map[store.ProxyName]BootstrapProxyConfig `yaml:"proxies"` - Dir InterpolatedString `yaml:"dir"` - LockTimeout InterpolatedDuration `yaml:"lockTimeout"` + Proxies map[store.ProxyName]BootstrapProxyConfig `yaml:"proxies"` + Dir InterpolatedString `yaml:"dir"` + LockTimeout InterpolatedDuration `yaml:"lockTimeout"` + MaxConnectionRetries InterpolatedInt `yaml:"maxRetries"` } func (c *BootstrapConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { @@ -62,8 +63,9 @@ type BootstrapLayerConfig struct { func NewDefaultBootstrapConfig() BootstrapConfig { return BootstrapConfig{ - Dir: "", - LockTimeout: *NewInterpolatedDuration(30 * time.Second), + Dir: "", + LockTimeout: *NewInterpolatedDuration(30 * time.Second), + MaxConnectionRetries: 10, } } diff --git a/internal/config/redis.go b/internal/config/redis.go index 9fbaddc..8d71422 100644 --- a/internal/config/redis.go +++ b/internal/config/redis.go @@ -9,19 +9,21 @@ const ( ) type RedisConfig struct { - Adresses InterpolatedStringSlice `yaml:"addresses"` - Master InterpolatedString `yaml:"master"` - ReadTimeout InterpolatedDuration `yaml:"readTimeout"` - WriteTimeout InterpolatedDuration `yaml:"writeTimeout"` - DialTimeout InterpolatedDuration `yaml:"dialTimeout"` + Adresses InterpolatedStringSlice `yaml:"addresses"` + Master InterpolatedString `yaml:"master"` + ReadTimeout InterpolatedDuration `yaml:"readTimeout"` + WriteTimeout InterpolatedDuration `yaml:"writeTimeout"` + DialTimeout InterpolatedDuration `yaml:"dialTimeout"` + LockMaxRetries InterpolatedInt `yaml:"lockMaxRetries"` } func NewDefaultRedisConfig() RedisConfig { return RedisConfig{ - Adresses: InterpolatedStringSlice{"localhost:6379"}, - Master: "", - ReadTimeout: InterpolatedDuration(30 * time.Second), - WriteTimeout: InterpolatedDuration(30 * time.Second), - DialTimeout: InterpolatedDuration(30 * time.Second), + Adresses: InterpolatedStringSlice{"localhost:6379"}, + Master: "", + ReadTimeout: InterpolatedDuration(30 * time.Second), + WriteTimeout: InterpolatedDuration(30 * time.Second), + DialTimeout: InterpolatedDuration(30 * time.Second), + LockMaxRetries: 10, } } diff --git a/internal/lock/redis/locker.go b/internal/lock/redis/locker.go index 46e1d72..337685c 100644 --- a/internal/lock/redis/locker.go +++ b/internal/lock/redis/locker.go @@ -12,8 +12,8 @@ import ( ) type Locker struct { - client redis.UniversalClient - timeout time.Duration + client redis.UniversalClient + maxRetries int } // WithLock implements lock.Locker. @@ -26,33 +26,41 @@ func (l *Locker) WithLock(ctx context.Context, key string, timeout time.Duration logger.Debug(ctx, "acquiring lock") - lock, err := locker.Obtain(ctx, key, timeout, &redislock.Options{ - RetryStrategy: backoff, - }) - if err != nil { - return errors.WithStack(err) - } - - logger.Debug(ctx, "lock obtained") - - defer func() { - if err := lock.Release(ctx); err != nil { - logger.Error(ctx, "could not release lock", logger.E(errors.WithStack(err))) + err := retryWithBackoff(ctx, l.maxRetries, func(ctx context.Context) error { + lock, err := locker.Obtain(ctx, key, timeout, &redislock.Options{ + RetryStrategy: backoff, + }) + if err != nil { + return errors.WithStack(err) } - logger.Debug(ctx, "lock released") - }() + logger.Debug(ctx, "lock obtained") - if err := fn(ctx); err != nil { + defer func() { + if err := lock.Release(ctx); err != nil { + logger.Error(ctx, "could not release lock", logger.E(errors.WithStack(err))) + } + + logger.Debug(ctx, "lock released") + }() + + if err := fn(ctx); err != nil { + return errors.WithStack(err) + } + + return nil + }) + if err != nil { return errors.WithStack(err) } return nil } -func NewLocker(client redis.UniversalClient) *Locker { +func NewLocker(client redis.UniversalClient, maxRetries int) *Locker { return &Locker{ - client: client, + client: client, + maxRetries: maxRetries, } } diff --git a/internal/lock/redis/retry.go b/internal/lock/redis/retry.go new file mode 100644 index 0000000..0b09329 --- /dev/null +++ b/internal/lock/redis/retry.go @@ -0,0 +1,42 @@ +package redis + +import ( + "context" + "time" + + "github.com/pkg/errors" + "gitlab.com/wpetit/goweb/logger" +) + +const ( + baseWatchBackoffDelay = time.Millisecond * 500 + maxDelay = time.Minute * 10 +) + +func retryWithBackoff(ctx context.Context, attempts int, fn func(ctx context.Context) error) error { + backoffDelay := baseWatchBackoffDelay + count := 0 + + for { + err := fn(ctx) + if err == nil { + return nil + } + + err = errors.WithStack(err) + + count++ + if count >= attempts { + return errors.Wrapf(err, "execution failed after %d attempts", attempts) + } + + logger.Error(ctx, "error while executing func, retrying with backoff", logger.E(err), logger.F("backoffDelay", backoffDelay), logger.F("remainingAttempts", attempts-count)) + + time.Sleep(backoffDelay) + + backoffDelay *= 2 + if backoffDelay > maxDelay { + backoffDelay = maxDelay + } + } +} diff --git a/internal/setup/integrations.go b/internal/setup/integrations.go index c69f0bc..580b5af 100644 --- a/internal/setup/integrations.go +++ b/internal/setup/integrations.go @@ -28,7 +28,7 @@ func SetupIntegrations(ctx context.Context, conf *config.Config) ([]integration. func setupKubernetesIntegration(ctx context.Context, conf *config.Config) (*kubernetes.Integration, error) { client := newRedisClient(conf.Redis) - locker := redis.NewLocker(client) + locker := redis.NewLocker(client, 10) integration := kubernetes.NewIntegration( kubernetes.WithReaderTokenSecret(string(conf.Integrations.Kubernetes.ReaderTokenSecret)), diff --git a/internal/setup/lock.go b/internal/setup/lock.go index 6946a16..d560c70 100644 --- a/internal/setup/lock.go +++ b/internal/setup/lock.go @@ -10,6 +10,6 @@ import ( func SetupLocker(ctx context.Context, conf *config.Config) (lock.Locker, error) { client := newRedisClient(conf.Redis) - locker := redis.NewLocker(client) + locker := redis.NewLocker(client, int(conf.Redis.LockMaxRetries)) return locker, nil }