From 3c1939f41819f852b4f24aa8e8313456b003ebe4 Mon Sep 17 00:00:00 2001 From: William Petit Date: Thu, 27 Jun 2024 17:03:50 +0200 Subject: [PATCH] feat: add revision number to proxy and layers to identify changes --- internal/command/admin/layer/util.go | 2 + internal/command/admin/proxy/util.go | 2 + internal/proxy/proxy_test.go | 4 +- internal/setup/proxy_repository.go | 4 +- internal/store/layer.go | 7 +- internal/store/proxy.go | 8 +- internal/store/redis/helper.go | 35 ++++++++ internal/store/redis/layer_item.go | 18 ++-- internal/store/redis/layer_repository.go | 30 ++++--- internal/store/redis/layer_repository_test.go | 2 +- internal/store/redis/proxy_item.go | 10 ++- internal/store/redis/proxy_repository.go | 20 +++-- internal/store/redis/proxy_repository_test.go | 2 +- internal/store/testsuite/layer_repository.go | 85 ++++++++++++++++++ internal/store/testsuite/proxy_repository.go | 87 +++++++++++++++++++ 15 files changed, 272 insertions(+), 44 deletions(-) diff --git a/internal/command/admin/layer/util.go b/internal/command/admin/layer/util.go index eeabbcd..1073f85 100644 --- a/internal/command/admin/layer/util.go +++ b/internal/command/admin/layer/util.go @@ -13,6 +13,7 @@ func layerHeaderHints(outputMode format.OutputMode) format.Hints { format.NewProp("Type", "Type"), format.NewProp("Enabled", "Enabled"), format.NewProp("Weight", "Weight"), + format.NewProp("Revision", "Revision"), }, } } @@ -25,6 +26,7 @@ func layerHints(outputMode format.OutputMode) format.Hints { format.NewProp("Type", "Type"), format.NewProp("Enabled", "Enabled"), format.NewProp("Weight", "Weight"), + format.NewProp("Revision", "Revision"), format.NewProp("Options", "Options"), format.NewProp("CreatedAt", "CreatedAt", table.WithCompactModeMaxColumnWidth(20)), format.NewProp("UpdatedAt", "UpdatedAt", table.WithCompactModeMaxColumnWidth(20)), diff --git a/internal/command/admin/proxy/util.go b/internal/command/admin/proxy/util.go index 0b5a58a..8be3200 100644 --- a/internal/command/admin/proxy/util.go +++ b/internal/command/admin/proxy/util.go @@ -12,6 +12,7 @@ func proxyHeaderHints(outputMode format.OutputMode) format.Hints { format.NewProp("Name", "Name"), format.NewProp("Enabled", "Enabled"), format.NewProp("Weight", "Weight"), + format.NewProp("Revision", "Revision"), }, } } @@ -25,6 +26,7 @@ func proxyHints(outputMode format.OutputMode) format.Hints { format.NewProp("To", "To"), format.NewProp("Enabled", "Enabled"), format.NewProp("Weight", "Weight"), + format.NewProp("Revision", "Revision"), format.NewProp("CreatedAt", "CreatedAt", table.WithCompactModeMaxColumnWidth(20)), format.NewProp("UpdatedAt", "UpdatedAt", table.WithCompactModeMaxColumnWidth(20)), }, diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index 93f38f7..c1eab39 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -31,8 +31,8 @@ func BenchmarkProxy(b *testing.B) { Addrs: []string{redisEndpoint}, }) - proxyRepository := redisStore.NewProxyRepository(client) - layerRepository := redisStore.NewLayerRepository(client) + proxyRepository := redisStore.NewProxyRepository(client, redisStore.DefaultTxMaxAttempts, redisStore.DefaultTxBaseDelay) + layerRepository := redisStore.NewLayerRepository(client, redisStore.DefaultTxMaxAttempts, redisStore.DefaultTxBaseDelay) backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) diff --git a/internal/setup/proxy_repository.go b/internal/setup/proxy_repository.go index 118b22a..352d009 100644 --- a/internal/setup/proxy_repository.go +++ b/internal/setup/proxy_repository.go @@ -17,9 +17,9 @@ func NewRedisClient(ctx context.Context, conf config.RedisConfig) redis.Universa } func NewProxyRepository(ctx context.Context, client redis.UniversalClient) (store.ProxyRepository, error) { - return redisStore.NewProxyRepository(client), nil + return redisStore.NewProxyRepository(client, redisStore.DefaultTxMaxAttempts, redisStore.DefaultTxBaseDelay), nil } func NewLayerRepository(ctx context.Context, client redis.UniversalClient) (store.LayerRepository, error) { - return redisStore.NewLayerRepository(client), nil + return redisStore.NewLayerRepository(client, redisStore.DefaultTxMaxAttempts, redisStore.DefaultTxBaseDelay), nil } diff --git a/internal/store/layer.go b/internal/store/layer.go index 99e2669..060e8af 100644 --- a/internal/store/layer.go +++ b/internal/store/layer.go @@ -11,9 +11,10 @@ type ( ) type LayerHeader struct { - Proxy ProxyName `json:"proxy"` - Name LayerName `json:"name"` - Type LayerType `json:"type"` + Proxy ProxyName `json:"proxy"` + Name LayerName `json:"name"` + Revision int `json:"revision"` + Type LayerType `json:"type"` Weight int `json:"weight"` Enabled bool `json:"enabled"` diff --git a/internal/store/proxy.go b/internal/store/proxy.go index 3ed85f9..287c236 100644 --- a/internal/store/proxy.go +++ b/internal/store/proxy.go @@ -7,10 +7,10 @@ import ( type ProxyName Name type ProxyHeader struct { - Name ProxyName `json:"name"` - - Weight int `json:"weight"` - Enabled bool `json:"enabled"` + Name ProxyName `json:"name"` + Revision int `json:"revision"` + Weight int `json:"weight"` + Enabled bool `json:"enabled"` } type Proxy struct { diff --git a/internal/store/redis/helper.go b/internal/store/redis/helper.go index 627a767..60db019 100644 --- a/internal/store/redis/helper.go +++ b/internal/store/redis/helper.go @@ -3,10 +3,18 @@ package redis import ( "context" "encoding/json" + "math/rand" "strings" + "time" "github.com/pkg/errors" "github.com/redis/go-redis/v9" + "gitlab.com/wpetit/goweb/logger" +) + +var ( + DefaultTxMaxAttempts = 20 + DefaultTxBaseDelay = 100 * time.Millisecond ) type jsonWrapper[T any] struct { @@ -65,6 +73,33 @@ func key(parts ...string) string { return strings.Join(parts, ":") } +func WithRetry(ctx context.Context, client redis.UniversalClient, key string, fn func(ctx context.Context, tx *redis.Tx) error, maxAttempts int, baseDelay time.Duration) error { + var err error + + delay := baseDelay + + for attempt := 0; attempt < maxAttempts; attempt++ { + if err = WithTx(ctx, client, key, fn); err != nil { + err = errors.WithStack(err) + logger.Debug(ctx, "redis transaction failed", logger.E(err)) + + if errors.Is(err, redis.TxFailedErr) { + logger.Debug(ctx, "retrying redis transaction", logger.F("attempts", attempt), logger.F("delay", delay)) + time.Sleep(delay) + delay = delay*2 + time.Duration(rand.Int63n(int64(baseDelay))) + + continue + } + + return err + } + + return nil + } + + return errors.WithStack(redis.TxFailedErr) +} + func WithTx(ctx context.Context, client redis.UniversalClient, key string, fn func(ctx context.Context, tx *redis.Tx) error) error { txf := func(tx *redis.Tx) error { if err := fn(ctx, tx); err != nil { diff --git a/internal/store/redis/layer_item.go b/internal/store/redis/layer_item.go index 68a5269..2480b43 100644 --- a/internal/store/redis/layer_item.go +++ b/internal/store/redis/layer_item.go @@ -8,9 +8,10 @@ import ( ) type layerHeaderItem struct { - Proxy string `redis:"proxy"` - Name string `redis:"name"` - Type string `redis:"type"` + Proxy string `redis:"proxy"` + Name string `redis:"name"` + Revision int `redis:"revision"` + Type string `redis:"type"` Weight int `redis:"weight"` Enabled bool `redis:"enabled"` @@ -18,11 +19,12 @@ type layerHeaderItem struct { func (i *layerHeaderItem) ToLayerHeader() (*store.LayerHeader, error) { layerHeader := &store.LayerHeader{ - Proxy: store.ProxyName(i.Proxy), - Name: store.LayerName(i.Name), - Type: store.LayerType(i.Type), - Weight: i.Weight, - Enabled: i.Enabled, + Proxy: store.ProxyName(i.Proxy), + Name: store.LayerName(i.Name), + Revision: i.Revision, + Type: store.LayerType(i.Type), + Weight: i.Weight, + Enabled: i.Enabled, } return layerHeader, nil diff --git a/internal/store/redis/layer_repository.go b/internal/store/redis/layer_repository.go index 1b94db7..4899c23 100644 --- a/internal/store/redis/layer_repository.go +++ b/internal/store/redis/layer_repository.go @@ -14,7 +14,9 @@ const ( ) type LayerRepository struct { - client redis.UniversalClient + client redis.UniversalClient + txMaxAttempts int + txRetryBaseDelay time.Duration } // CreateLayer implements store.LayerRepository @@ -24,11 +26,12 @@ func (r *LayerRepository) CreateLayer(ctx context.Context, proxyName store.Proxy layerItem := &layerItem{ layerHeaderItem: layerHeaderItem{ - Proxy: string(proxyName), - Name: string(layerName), - Type: string(layerType), - Weight: 0, - Enabled: false, + Proxy: string(proxyName), + Name: string(layerName), + Type: string(layerType), + Weight: 0, + Revision: 0, + Enabled: false, }, CreatedAt: wrap(now), @@ -96,7 +99,7 @@ func (r *LayerRepository) GetLayer(ctx context.Context, proxyName store.ProxyNam key := layerKey(proxyName, layerName) var layerItem *layerItem - err := WithTx(ctx, r.client, key, func(ctx context.Context, tx *redis.Tx) error { + err := WithRetry(ctx, r.client, key, func(ctx context.Context, tx *redis.Tx) error { pItem, err := r.txGetLayerItem(ctx, tx, proxyName, layerName) if err != nil { return errors.WithStack(err) @@ -105,7 +108,7 @@ func (r *LayerRepository) GetLayer(ctx context.Context, proxyName store.ProxyNam layerItem = pItem return nil - }) + }, r.txMaxAttempts, r.txRetryBaseDelay) if err != nil { return nil, errors.WithStack(err) } @@ -197,7 +200,7 @@ func (r *LayerRepository) UpdateLayer(ctx context.Context, proxyName store.Proxy key := layerKey(proxyName, layerName) var layerItem layerItem - err := WithTx(ctx, r.client, key, func(ctx context.Context, tx *redis.Tx) error { + err := WithRetry(ctx, r.client, key, func(ctx context.Context, tx *redis.Tx) error { item, err := r.txGetLayerItem(ctx, tx, proxyName, layerName) if err != nil { return errors.WithStack(err) @@ -216,6 +219,7 @@ func (r *LayerRepository) UpdateLayer(ctx context.Context, proxyName store.Proxy } item.UpdatedAt = wrap(time.Now().UTC()) + item.Revision = item.Revision + 1 _, err = tx.TxPipelined(ctx, func(p redis.Pipeliner) error { p.HMSet(ctx, key, item.layerHeaderItem) @@ -230,7 +234,7 @@ func (r *LayerRepository) UpdateLayer(ctx context.Context, proxyName store.Proxy layerItem = *item return nil - }) + }, r.txMaxAttempts, r.txRetryBaseDelay) if err != nil { return nil, errors.WithStack(err) } @@ -243,9 +247,11 @@ func (r *LayerRepository) UpdateLayer(ctx context.Context, proxyName store.Proxy return layer, nil } -func NewLayerRepository(client redis.UniversalClient) *LayerRepository { +func NewLayerRepository(client redis.UniversalClient, txMaxAttempts int, txRetryBaseDelay time.Duration) *LayerRepository { return &LayerRepository{ - client: client, + client: client, + txMaxAttempts: txMaxAttempts, + txRetryBaseDelay: txRetryBaseDelay, } } diff --git a/internal/store/redis/layer_repository_test.go b/internal/store/redis/layer_repository_test.go index 939a45f..a84a28f 100644 --- a/internal/store/redis/layer_repository_test.go +++ b/internal/store/redis/layer_repository_test.go @@ -7,6 +7,6 @@ import ( ) func TestLayerRepository(t *testing.T) { - repository := NewLayerRepository(client) + repository := NewLayerRepository(client, DefaultTxMaxAttempts, DefaultTxBaseDelay) testsuite.TestLayerRepository(t, repository) } diff --git a/internal/store/redis/proxy_item.go b/internal/store/redis/proxy_item.go index 5d21654..fa3a175 100644 --- a/internal/store/redis/proxy_item.go +++ b/internal/store/redis/proxy_item.go @@ -8,7 +8,8 @@ import ( ) type proxyHeaderItem struct { - Name string `redis:"name"` + Name string `redis:"name"` + Revision int `redis:"revision"` Weight int `redis:"weight"` Enabled bool `redis:"enabled"` @@ -19,9 +20,10 @@ type proxyHeaderItem struct { func (i *proxyHeaderItem) ToProxyHeader() (*store.ProxyHeader, error) { proxyHeader := &store.ProxyHeader{ - Name: store.ProxyName(i.Name), - Weight: i.Weight, - Enabled: i.Enabled, + Name: store.ProxyName(i.Name), + Revision: i.Revision, + Weight: i.Weight, + Enabled: i.Enabled, } return proxyHeader, nil diff --git a/internal/store/redis/proxy_repository.go b/internal/store/redis/proxy_repository.go index aa5f6b4..9eefd75 100644 --- a/internal/store/redis/proxy_repository.go +++ b/internal/store/redis/proxy_repository.go @@ -14,7 +14,9 @@ const ( ) type ProxyRepository struct { - client redis.UniversalClient + client redis.UniversalClient + txMaxAttempts int + txRetryBaseDelay time.Duration } // GetProxy implements store.ProxyRepository @@ -22,7 +24,7 @@ func (r *ProxyRepository) GetProxy(ctx context.Context, name store.ProxyName) (* key := proxyKey(name) var proxyItem *proxyItem - err := WithTx(ctx, r.client, key, func(ctx context.Context, tx *redis.Tx) error { + err := WithRetry(ctx, r.client, key, func(ctx context.Context, tx *redis.Tx) error { pItem, err := r.txGetProxyItem(ctx, tx, name) if err != nil { return errors.WithStack(err) @@ -31,7 +33,7 @@ func (r *ProxyRepository) GetProxy(ctx context.Context, name store.ProxyName) (* proxyItem = pItem return nil - }) + }, r.txMaxAttempts, r.txRetryBaseDelay) if err != nil { return nil, errors.WithStack(err) } @@ -89,6 +91,7 @@ func (r *ProxyRepository) CreateProxy(ctx context.Context, name store.ProxyName, CreatedAt: wrap(now), UpdatedAt: wrap(now), Weight: 0, + Revision: 0, Enabled: false, }, To: to, @@ -191,7 +194,7 @@ func (r *ProxyRepository) UpdateProxy(ctx context.Context, name store.ProxyName, key := proxyKey(name) var proxyItem proxyItem - err := WithTx(ctx, r.client, key, func(ctx context.Context, tx *redis.Tx) error { + err := WithRetry(ctx, r.client, key, func(ctx context.Context, tx *redis.Tx) error { item, err := r.txGetProxyItem(ctx, tx, name) if err != nil { return errors.WithStack(err) @@ -214,6 +217,7 @@ func (r *ProxyRepository) UpdateProxy(ctx context.Context, name store.ProxyName, } item.UpdatedAt = wrap(time.Now().UTC()) + item.Revision = item.Revision + 1 _, err = tx.TxPipelined(ctx, func(p redis.Pipeliner) error { p.HMSet(ctx, key, item.proxyHeaderItem) @@ -228,7 +232,7 @@ func (r *ProxyRepository) UpdateProxy(ctx context.Context, name store.ProxyName, proxyItem = *item return nil - }) + }, r.txMaxAttempts, r.txRetryBaseDelay) if err != nil { return nil, errors.WithStack(err) } @@ -241,9 +245,11 @@ func (r *ProxyRepository) UpdateProxy(ctx context.Context, name store.ProxyName, return proxy, nil } -func NewProxyRepository(client redis.UniversalClient) *ProxyRepository { +func NewProxyRepository(client redis.UniversalClient, txMaxAttempts int, txRetryBaseDelay time.Duration) *ProxyRepository { return &ProxyRepository{ - client: client, + client: client, + txMaxAttempts: 20, + txRetryBaseDelay: txRetryBaseDelay, } } diff --git a/internal/store/redis/proxy_repository_test.go b/internal/store/redis/proxy_repository_test.go index 73701f0..3f1505d 100644 --- a/internal/store/redis/proxy_repository_test.go +++ b/internal/store/redis/proxy_repository_test.go @@ -7,6 +7,6 @@ import ( ) func TestProxyRepository(t *testing.T) { - repository := NewProxyRepository(client) + repository := NewProxyRepository(client, DefaultTxMaxAttempts, DefaultTxBaseDelay) testsuite.TestProxyRepository(t, repository) } diff --git a/internal/store/testsuite/layer_repository.go b/internal/store/testsuite/layer_repository.go index a326c0d..de63dea 100644 --- a/internal/store/testsuite/layer_repository.go +++ b/internal/store/testsuite/layer_repository.go @@ -3,6 +3,7 @@ package testsuite import ( "context" "reflect" + "sync" "testing" "forge.cadoles.com/cadoles/bouncer/internal/store" @@ -49,6 +50,10 @@ var layerRepositoryTestCases = []layerRepositoryTestCase{ return errors.Errorf("layer.UpdatedAt should not be zero value") } + if layer.Revision != 0 { + return errors.Errorf("layer.Revision should be zero") + } + return nil }, }, @@ -230,6 +235,86 @@ var layerRepositoryTestCases = []layerRepositoryTestCase{ return errors.New("could not find created layer in query results") } + return nil + }, + }, + { + Name: "Create then update layer", + Do: func(repo store.LayerRepository) error { + ctx := context.Background() + + var layerName store.LayerName = "create_then_update_layer" + var proxyName store.ProxyName = store.ProxyName(string(layerName) + "_proxy") + var layerType store.LayerType = "dummy" + var layerOptions store.LayerOptions = store.LayerOptions{} + + createdLayer, err := repo.CreateLayer(ctx, proxyName, layerName, layerType, layerOptions) + if err != nil { + return errors.WithStack(err) + } + + if e, g := 0, createdLayer.Revision; e != g { + return errors.Errorf("createdLayer.Revision: expected '%v', got '%v'", e, g) + } + + updatedLayer, err := repo.UpdateLayer(ctx, proxyName, layerName) + if err != nil { + return errors.Wrap(err, "err should be nil") + } + + if e, g := 1, updatedLayer.Revision; e != g { + return errors.Errorf("updatedLayer.Revision: expected '%v', got '%v'", e, g) + } + + return nil + }, + }, + { + Name: "Update layer concurrently", + Do: func(repo store.LayerRepository) error { + ctx := context.Background() + + var layerName store.LayerName = "update_layer_concurrently" + var proxyName store.ProxyName = store.ProxyName(string(layerName) + "_proxy") + var layerType store.LayerType = "dummy" + var layerOptions store.LayerOptions = store.LayerOptions{} + + createdLayer, err := repo.CreateLayer(ctx, proxyName, layerName, layerType, layerOptions) + if err != nil { + return errors.WithStack(err) + } + + if createdLayer.Revision != 0 { + return errors.Errorf("createdLayer.Revision should be zero") + } + + var wg sync.WaitGroup + + total := 100 + + wg.Add(total) + + for i := 0; i < total; i++ { + go func(i int) { + defer wg.Done() + + if _, err := repo.UpdateLayer(ctx, createdLayer.Proxy, createdLayer.Name); err != nil { + panic(errors.Wrap(err, "err should be nil")) + } + }(i) + } + + wg.Wait() + + layer, err := repo.GetLayer(ctx, createdLayer.Proxy, createdLayer.Name) + if err != nil { + return errors.Wrap(err, "err should be nil") + } + + if e, g := total, layer.Revision; e != g { + return errors.Errorf("layer.Revision: expected '%v', got '%v'", e, g) + } + return nil }, }, diff --git a/internal/store/testsuite/proxy_repository.go b/internal/store/testsuite/proxy_repository.go index ed55183..98f0955 100644 --- a/internal/store/testsuite/proxy_repository.go +++ b/internal/store/testsuite/proxy_repository.go @@ -3,6 +3,7 @@ package testsuite import ( "context" "reflect" + "sync" "testing" "forge.cadoles.com/cadoles/bouncer/internal/store" @@ -51,6 +52,10 @@ var proxyRepositoryTestCases = []proxyRepositoryTestCase{ return errors.Errorf("proxy.UpdatedAt should not be zero value") } + if proxy.Revision != 0 { + return errors.Errorf("proxy.Revision should be zero") + } + return nil }, }, @@ -99,6 +104,10 @@ var proxyRepositoryTestCases = []proxyRepositoryTestCase{ return errors.Errorf("foundProxy.UpdatedAt: expected '%v', got '%v'", createdProxy.UpdatedAt, foundProxy.UpdatedAt) } + if foundProxy.Revision != 0 { + return errors.Errorf("foundProxy.Revision should be zero") + } + return nil }, }, @@ -194,6 +203,84 @@ var proxyRepositoryTestCases = []proxyRepositoryTestCase{ return errors.Errorf("err: expected store.ErrAlreadyExists, got '%+v'", err) } + return nil + }, + }, + { + Name: "Create then update proxy", + Do: func(repo store.ProxyRepository) error { + ctx := context.Background() + + to := "http://example.com" + + var name store.ProxyName = "create_then_update_proxy" + + createdProxy, err := repo.CreateProxy(ctx, name, to, "127.0.0.1:*", "localhost:*") + if err != nil { + return errors.Wrap(err, "err should be nil") + } + + if createdProxy.Revision != 0 { + return errors.Errorf("createdProxy.Revision should be zero") + } + + updatedProxy, err := repo.UpdateProxy(ctx, name) + if err != nil { + return errors.Wrap(err, "err should be nil") + } + + if e, g := 1, updatedProxy.Revision; e != g { + return errors.Errorf("updatedProxy.Revision: expected '%v', got '%v'", e, g) + } + + return nil + }, + }, + { + Name: "Update proxy concurrently", + Do: func(repo store.ProxyRepository) error { + ctx := context.Background() + + to := "http://example.com" + + var name store.ProxyName = "update_proxy_concurrently" + + createdProxy, err := repo.CreateProxy(ctx, name, to, "127.0.0.1:*", "localhost:*") + if err != nil { + return errors.Wrap(err, "err should be nil") + } + + if createdProxy.Revision != 0 { + return errors.Errorf("createdProxy.Revision should be zero") + } + + var wg sync.WaitGroup + + total := 100 + + wg.Add(total) + + for i := 0; i < total; i++ { + go func(i int) { + defer wg.Done() + + if _, err := repo.UpdateProxy(ctx, name); err != nil { + panic(errors.Wrap(err, "err should be nil")) + } + }(i) + } + + wg.Wait() + + proxy, err := repo.GetProxy(ctx, name) + if err != nil { + return errors.Wrap(err, "err should be nil") + } + + if e, g := total, proxy.Revision; e != g { + return errors.Errorf("proxy.Revision: expected '%v', got '%v'", e, g) + } + return nil }, },