feat: use shared redis client to maximize pooling usage (#39)
All checks were successful
Cadoles/bouncer/pipeline/head This commit looks good
All checks were successful
Cadoles/bouncer/pipeline/head This commit looks good
This commit is contained in:
@ -27,7 +27,7 @@ func (s *Server) initRepositories(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (s *Server) initRedisClient(ctx context.Context) error {
|
||||
client := setup.NewRedisClient(ctx, s.redisConfig)
|
||||
client := setup.NewSharedClient(s.redisConfig)
|
||||
|
||||
s.redisClient = client
|
||||
|
||||
|
@ -15,6 +15,8 @@ type RedisConfig struct {
|
||||
WriteTimeout InterpolatedDuration `yaml:"writeTimeout"`
|
||||
DialTimeout InterpolatedDuration `yaml:"dialTimeout"`
|
||||
LockMaxRetries InterpolatedInt `yaml:"lockMaxRetries"`
|
||||
MaxRetries InterpolatedInt `yaml:"maxRetries"`
|
||||
PingInterval InterpolatedDuration `yaml:"pingInterval"`
|
||||
}
|
||||
|
||||
func NewDefaultRedisConfig() RedisConfig {
|
||||
@ -25,5 +27,7 @@ func NewDefaultRedisConfig() RedisConfig {
|
||||
WriteTimeout: InterpolatedDuration(30 * time.Second),
|
||||
DialTimeout: InterpolatedDuration(30 * time.Second),
|
||||
LockMaxRetries: 10,
|
||||
MaxRetries: 3,
|
||||
PingInterval: InterpolatedDuration(30 * time.Second),
|
||||
}
|
||||
}
|
||||
|
@ -9,7 +9,7 @@ import (
|
||||
)
|
||||
|
||||
func (s *Server) initRepositories(ctx context.Context) error {
|
||||
client := setup.NewRedisClient(ctx, s.redisConfig)
|
||||
client := setup.NewSharedClient(s.redisConfig)
|
||||
|
||||
if err := s.initProxyRepository(ctx, client); err != nil {
|
||||
return errors.WithStack(err)
|
||||
|
@ -23,7 +23,7 @@ func init() {
|
||||
}
|
||||
|
||||
func setupAuthnOIDCLayer(conf *config.Config) (director.Layer, error) {
|
||||
rdb := newRedisClient(conf.Redis)
|
||||
rdb := NewSharedClient(conf.Redis)
|
||||
adapter := redis.NewStoreAdapter(rdb)
|
||||
store := session.NewStore(adapter)
|
||||
|
||||
|
@ -27,7 +27,7 @@ func SetupIntegrations(ctx context.Context, conf *config.Config) ([]integration.
|
||||
}
|
||||
|
||||
func setupKubernetesIntegration(ctx context.Context, conf *config.Config) (*kubernetes.Integration, error) {
|
||||
client := newRedisClient(conf.Redis)
|
||||
client := NewSharedClient(conf.Redis)
|
||||
locker := redis.NewLocker(client, 10)
|
||||
|
||||
integration := kubernetes.NewIntegration(
|
||||
|
@ -9,7 +9,7 @@ import (
|
||||
)
|
||||
|
||||
func SetupLocker(ctx context.Context, conf *config.Config) (lock.Locker, error) {
|
||||
client := newRedisClient(conf.Redis)
|
||||
client := NewSharedClient(conf.Redis)
|
||||
locker := redis.NewLocker(client, int(conf.Redis.LockMaxRetries))
|
||||
return locker, nil
|
||||
}
|
||||
|
@ -3,19 +3,11 @@ package setup
|
||||
import (
|
||||
"context"
|
||||
|
||||
"forge.cadoles.com/cadoles/bouncer/internal/config"
|
||||
"forge.cadoles.com/cadoles/bouncer/internal/store"
|
||||
redisStore "forge.cadoles.com/cadoles/bouncer/internal/store/redis"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
func NewRedisClient(ctx context.Context, conf config.RedisConfig) redis.UniversalClient {
|
||||
return redis.NewUniversalClient(&redis.UniversalOptions{
|
||||
Addrs: conf.Adresses,
|
||||
MasterName: string(conf.Master),
|
||||
})
|
||||
}
|
||||
|
||||
func NewProxyRepository(ctx context.Context, client redis.UniversalClient) (store.ProxyRepository, error) {
|
||||
return redisStore.NewProxyRepository(client, redisStore.DefaultTxMaxAttempts, redisStore.DefaultTxBaseDelay), nil
|
||||
}
|
||||
|
@ -35,6 +35,6 @@ func setupQueueLayer(conf *config.Config) (director.Layer, error) {
|
||||
}
|
||||
|
||||
func newQueueAdapter(redisConf config.RedisConfig) (queue.Adapter, error) {
|
||||
rdb := newRedisClient(redisConf)
|
||||
rdb := NewSharedClient(redisConf)
|
||||
return queueRedis.NewAdapter(rdb, 2), nil
|
||||
}
|
||||
|
@ -1,14 +1,38 @@
|
||||
package setup
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/cadoles/bouncer/internal/config"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
var clients sync.Map
|
||||
|
||||
func NewSharedClient(conf config.RedisConfig) redis.UniversalClient {
|
||||
key := strings.Join(conf.Adresses, "|") + "|" + string(conf.Master)
|
||||
|
||||
value, exists := clients.Load(key)
|
||||
if exists {
|
||||
if client, ok := (value).(redis.UniversalClient); ok {
|
||||
return client
|
||||
}
|
||||
}
|
||||
|
||||
client := newRedisClient(conf)
|
||||
|
||||
clients.Store(key, client)
|
||||
|
||||
return client
|
||||
}
|
||||
|
||||
func newRedisClient(conf config.RedisConfig) redis.UniversalClient {
|
||||
return redis.NewUniversalClient(&redis.UniversalOptions{
|
||||
client := redis.NewUniversalClient(&redis.UniversalOptions{
|
||||
Addrs: conf.Adresses,
|
||||
MasterName: string(conf.Master),
|
||||
ReadTimeout: time.Duration(conf.ReadTimeout),
|
||||
@ -16,5 +40,33 @@ func newRedisClient(conf config.RedisConfig) redis.UniversalClient {
|
||||
DialTimeout: time.Duration(conf.DialTimeout),
|
||||
RouteByLatency: true,
|
||||
ContextTimeoutEnabled: true,
|
||||
MaxRetries: int(conf.MaxRetries),
|
||||
})
|
||||
|
||||
go func() {
|
||||
ctx := logger.With(context.Background(),
|
||||
logger.F("adresses", conf.Adresses),
|
||||
logger.F("master", conf.Master),
|
||||
)
|
||||
|
||||
timer := time.NewTicker(time.Duration(conf.PingInterval))
|
||||
defer timer.Stop()
|
||||
|
||||
connected := true
|
||||
|
||||
for range timer.C {
|
||||
if _, err := client.Ping(ctx).Result(); err != nil {
|
||||
logger.Error(ctx, "redis disconnected", logger.E(errors.WithStack(err)))
|
||||
connected = false
|
||||
continue
|
||||
}
|
||||
|
||||
if !connected {
|
||||
logger.Info(ctx, "redis reconnected")
|
||||
connected = true
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return client
|
||||
}
|
||||
|
@ -91,12 +91,14 @@ func WithRetry(ctx context.Context, client redis.UniversalClient, key string, fn
|
||||
continue
|
||||
}
|
||||
|
||||
return err
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
logger.Error(ctx, "redis error", logger.E(errors.WithStack(err)))
|
||||
|
||||
return errors.WithStack(redis.TxFailedErr)
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user