feat: add revision number to proxy and layers to identify changes
All checks were successful
Cadoles/bouncer/pipeline/head This commit looks good
All checks were successful
Cadoles/bouncer/pipeline/head This commit looks good
This commit is contained in:
@ -3,10 +3,18 @@ package redis
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"math/rand"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"gitlab.com/wpetit/goweb/logger"
|
||||
)
|
||||
|
||||
var (
|
||||
DefaultTxMaxAttempts = 20
|
||||
DefaultTxBaseDelay = 100 * time.Millisecond
|
||||
)
|
||||
|
||||
type jsonWrapper[T any] struct {
|
||||
@ -65,6 +73,33 @@ func key(parts ...string) string {
|
||||
return strings.Join(parts, ":")
|
||||
}
|
||||
|
||||
func WithRetry(ctx context.Context, client redis.UniversalClient, key string, fn func(ctx context.Context, tx *redis.Tx) error, maxAttempts int, baseDelay time.Duration) error {
|
||||
var err error
|
||||
|
||||
delay := baseDelay
|
||||
|
||||
for attempt := 0; attempt < maxAttempts; attempt++ {
|
||||
if err = WithTx(ctx, client, key, fn); err != nil {
|
||||
err = errors.WithStack(err)
|
||||
logger.Debug(ctx, "redis transaction failed", logger.E(err))
|
||||
|
||||
if errors.Is(err, redis.TxFailedErr) {
|
||||
logger.Debug(ctx, "retrying redis transaction", logger.F("attempts", attempt), logger.F("delay", delay))
|
||||
time.Sleep(delay)
|
||||
delay = delay*2 + time.Duration(rand.Int63n(int64(baseDelay)))
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
return errors.WithStack(redis.TxFailedErr)
|
||||
}
|
||||
|
||||
func WithTx(ctx context.Context, client redis.UniversalClient, key string, fn func(ctx context.Context, tx *redis.Tx) error) error {
|
||||
txf := func(tx *redis.Tx) error {
|
||||
if err := fn(ctx, tx); err != nil {
|
||||
|
@ -8,9 +8,10 @@ import (
|
||||
)
|
||||
|
||||
type layerHeaderItem struct {
|
||||
Proxy string `redis:"proxy"`
|
||||
Name string `redis:"name"`
|
||||
Type string `redis:"type"`
|
||||
Proxy string `redis:"proxy"`
|
||||
Name string `redis:"name"`
|
||||
Revision int `redis:"revision"`
|
||||
Type string `redis:"type"`
|
||||
|
||||
Weight int `redis:"weight"`
|
||||
Enabled bool `redis:"enabled"`
|
||||
@ -18,11 +19,12 @@ type layerHeaderItem struct {
|
||||
|
||||
func (i *layerHeaderItem) ToLayerHeader() (*store.LayerHeader, error) {
|
||||
layerHeader := &store.LayerHeader{
|
||||
Proxy: store.ProxyName(i.Proxy),
|
||||
Name: store.LayerName(i.Name),
|
||||
Type: store.LayerType(i.Type),
|
||||
Weight: i.Weight,
|
||||
Enabled: i.Enabled,
|
||||
Proxy: store.ProxyName(i.Proxy),
|
||||
Name: store.LayerName(i.Name),
|
||||
Revision: i.Revision,
|
||||
Type: store.LayerType(i.Type),
|
||||
Weight: i.Weight,
|
||||
Enabled: i.Enabled,
|
||||
}
|
||||
|
||||
return layerHeader, nil
|
||||
|
@ -14,7 +14,9 @@ const (
|
||||
)
|
||||
|
||||
type LayerRepository struct {
|
||||
client redis.UniversalClient
|
||||
client redis.UniversalClient
|
||||
txMaxAttempts int
|
||||
txRetryBaseDelay time.Duration
|
||||
}
|
||||
|
||||
// CreateLayer implements store.LayerRepository
|
||||
@ -24,11 +26,12 @@ func (r *LayerRepository) CreateLayer(ctx context.Context, proxyName store.Proxy
|
||||
|
||||
layerItem := &layerItem{
|
||||
layerHeaderItem: layerHeaderItem{
|
||||
Proxy: string(proxyName),
|
||||
Name: string(layerName),
|
||||
Type: string(layerType),
|
||||
Weight: 0,
|
||||
Enabled: false,
|
||||
Proxy: string(proxyName),
|
||||
Name: string(layerName),
|
||||
Type: string(layerType),
|
||||
Weight: 0,
|
||||
Revision: 0,
|
||||
Enabled: false,
|
||||
},
|
||||
|
||||
CreatedAt: wrap(now),
|
||||
@ -96,7 +99,7 @@ func (r *LayerRepository) GetLayer(ctx context.Context, proxyName store.ProxyNam
|
||||
key := layerKey(proxyName, layerName)
|
||||
var layerItem *layerItem
|
||||
|
||||
err := WithTx(ctx, r.client, key, func(ctx context.Context, tx *redis.Tx) error {
|
||||
err := WithRetry(ctx, r.client, key, func(ctx context.Context, tx *redis.Tx) error {
|
||||
pItem, err := r.txGetLayerItem(ctx, tx, proxyName, layerName)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
@ -105,7 +108,7 @@ func (r *LayerRepository) GetLayer(ctx context.Context, proxyName store.ProxyNam
|
||||
layerItem = pItem
|
||||
|
||||
return nil
|
||||
})
|
||||
}, r.txMaxAttempts, r.txRetryBaseDelay)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
@ -197,7 +200,7 @@ func (r *LayerRepository) UpdateLayer(ctx context.Context, proxyName store.Proxy
|
||||
key := layerKey(proxyName, layerName)
|
||||
var layerItem layerItem
|
||||
|
||||
err := WithTx(ctx, r.client, key, func(ctx context.Context, tx *redis.Tx) error {
|
||||
err := WithRetry(ctx, r.client, key, func(ctx context.Context, tx *redis.Tx) error {
|
||||
item, err := r.txGetLayerItem(ctx, tx, proxyName, layerName)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
@ -216,6 +219,7 @@ func (r *LayerRepository) UpdateLayer(ctx context.Context, proxyName store.Proxy
|
||||
}
|
||||
|
||||
item.UpdatedAt = wrap(time.Now().UTC())
|
||||
item.Revision = item.Revision + 1
|
||||
|
||||
_, err = tx.TxPipelined(ctx, func(p redis.Pipeliner) error {
|
||||
p.HMSet(ctx, key, item.layerHeaderItem)
|
||||
@ -230,7 +234,7 @@ func (r *LayerRepository) UpdateLayer(ctx context.Context, proxyName store.Proxy
|
||||
layerItem = *item
|
||||
|
||||
return nil
|
||||
})
|
||||
}, r.txMaxAttempts, r.txRetryBaseDelay)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
@ -243,9 +247,11 @@ func (r *LayerRepository) UpdateLayer(ctx context.Context, proxyName store.Proxy
|
||||
return layer, nil
|
||||
}
|
||||
|
||||
func NewLayerRepository(client redis.UniversalClient) *LayerRepository {
|
||||
func NewLayerRepository(client redis.UniversalClient, txMaxAttempts int, txRetryBaseDelay time.Duration) *LayerRepository {
|
||||
return &LayerRepository{
|
||||
client: client,
|
||||
client: client,
|
||||
txMaxAttempts: txMaxAttempts,
|
||||
txRetryBaseDelay: txRetryBaseDelay,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -7,6 +7,6 @@ import (
|
||||
)
|
||||
|
||||
func TestLayerRepository(t *testing.T) {
|
||||
repository := NewLayerRepository(client)
|
||||
repository := NewLayerRepository(client, DefaultTxMaxAttempts, DefaultTxBaseDelay)
|
||||
testsuite.TestLayerRepository(t, repository)
|
||||
}
|
||||
|
@ -8,7 +8,8 @@ import (
|
||||
)
|
||||
|
||||
type proxyHeaderItem struct {
|
||||
Name string `redis:"name"`
|
||||
Name string `redis:"name"`
|
||||
Revision int `redis:"revision"`
|
||||
|
||||
Weight int `redis:"weight"`
|
||||
Enabled bool `redis:"enabled"`
|
||||
@ -19,9 +20,10 @@ type proxyHeaderItem struct {
|
||||
|
||||
func (i *proxyHeaderItem) ToProxyHeader() (*store.ProxyHeader, error) {
|
||||
proxyHeader := &store.ProxyHeader{
|
||||
Name: store.ProxyName(i.Name),
|
||||
Weight: i.Weight,
|
||||
Enabled: i.Enabled,
|
||||
Name: store.ProxyName(i.Name),
|
||||
Revision: i.Revision,
|
||||
Weight: i.Weight,
|
||||
Enabled: i.Enabled,
|
||||
}
|
||||
|
||||
return proxyHeader, nil
|
||||
|
@ -14,7 +14,9 @@ const (
|
||||
)
|
||||
|
||||
type ProxyRepository struct {
|
||||
client redis.UniversalClient
|
||||
client redis.UniversalClient
|
||||
txMaxAttempts int
|
||||
txRetryBaseDelay time.Duration
|
||||
}
|
||||
|
||||
// GetProxy implements store.ProxyRepository
|
||||
@ -22,7 +24,7 @@ func (r *ProxyRepository) GetProxy(ctx context.Context, name store.ProxyName) (*
|
||||
key := proxyKey(name)
|
||||
var proxyItem *proxyItem
|
||||
|
||||
err := WithTx(ctx, r.client, key, func(ctx context.Context, tx *redis.Tx) error {
|
||||
err := WithRetry(ctx, r.client, key, func(ctx context.Context, tx *redis.Tx) error {
|
||||
pItem, err := r.txGetProxyItem(ctx, tx, name)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
@ -31,7 +33,7 @@ func (r *ProxyRepository) GetProxy(ctx context.Context, name store.ProxyName) (*
|
||||
proxyItem = pItem
|
||||
|
||||
return nil
|
||||
})
|
||||
}, r.txMaxAttempts, r.txRetryBaseDelay)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
@ -89,6 +91,7 @@ func (r *ProxyRepository) CreateProxy(ctx context.Context, name store.ProxyName,
|
||||
CreatedAt: wrap(now),
|
||||
UpdatedAt: wrap(now),
|
||||
Weight: 0,
|
||||
Revision: 0,
|
||||
Enabled: false,
|
||||
},
|
||||
To: to,
|
||||
@ -191,7 +194,7 @@ func (r *ProxyRepository) UpdateProxy(ctx context.Context, name store.ProxyName,
|
||||
key := proxyKey(name)
|
||||
var proxyItem proxyItem
|
||||
|
||||
err := WithTx(ctx, r.client, key, func(ctx context.Context, tx *redis.Tx) error {
|
||||
err := WithRetry(ctx, r.client, key, func(ctx context.Context, tx *redis.Tx) error {
|
||||
item, err := r.txGetProxyItem(ctx, tx, name)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
@ -214,6 +217,7 @@ func (r *ProxyRepository) UpdateProxy(ctx context.Context, name store.ProxyName,
|
||||
}
|
||||
|
||||
item.UpdatedAt = wrap(time.Now().UTC())
|
||||
item.Revision = item.Revision + 1
|
||||
|
||||
_, err = tx.TxPipelined(ctx, func(p redis.Pipeliner) error {
|
||||
p.HMSet(ctx, key, item.proxyHeaderItem)
|
||||
@ -228,7 +232,7 @@ func (r *ProxyRepository) UpdateProxy(ctx context.Context, name store.ProxyName,
|
||||
proxyItem = *item
|
||||
|
||||
return nil
|
||||
})
|
||||
}, r.txMaxAttempts, r.txRetryBaseDelay)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
@ -241,9 +245,11 @@ func (r *ProxyRepository) UpdateProxy(ctx context.Context, name store.ProxyName,
|
||||
return proxy, nil
|
||||
}
|
||||
|
||||
func NewProxyRepository(client redis.UniversalClient) *ProxyRepository {
|
||||
func NewProxyRepository(client redis.UniversalClient, txMaxAttempts int, txRetryBaseDelay time.Duration) *ProxyRepository {
|
||||
return &ProxyRepository{
|
||||
client: client,
|
||||
client: client,
|
||||
txMaxAttempts: 20,
|
||||
txRetryBaseDelay: txRetryBaseDelay,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -7,6 +7,6 @@ import (
|
||||
)
|
||||
|
||||
func TestProxyRepository(t *testing.T) {
|
||||
repository := NewProxyRepository(client)
|
||||
repository := NewProxyRepository(client, DefaultTxMaxAttempts, DefaultTxBaseDelay)
|
||||
testsuite.TestProxyRepository(t, repository)
|
||||
}
|
||||
|
Reference in New Issue
Block a user