feat: add queue adapter tests
Cadoles/bouncer/pipeline/head This commit looks good
Details
Cadoles/bouncer/pipeline/head This commit looks good
Details
This commit is contained in:
parent
7b04eb2418
commit
a176b754cd
1
go.mod
1
go.mod
|
@ -6,6 +6,7 @@ require (
|
||||||
forge.cadoles.com/Cadoles/go-proxy v0.0.0-20230701194111-c6b3d482cca6
|
forge.cadoles.com/Cadoles/go-proxy v0.0.0-20230701194111-c6b3d482cca6
|
||||||
github.com/Masterminds/sprig/v3 v3.2.3
|
github.com/Masterminds/sprig/v3 v3.2.3
|
||||||
github.com/btcsuite/btcd/btcutil v1.1.3
|
github.com/btcsuite/btcd/btcutil v1.1.3
|
||||||
|
github.com/davecgh/go-spew v1.1.1
|
||||||
github.com/go-chi/chi/v5 v5.0.8
|
github.com/go-chi/chi/v5 v5.0.8
|
||||||
github.com/jedib0t/go-pretty/v6 v6.4.6
|
github.com/jedib0t/go-pretty/v6 v6.4.6
|
||||||
github.com/mitchellh/mapstructure v1.4.1
|
github.com/mitchellh/mapstructure v1.4.1
|
||||||
|
|
|
@ -30,7 +30,7 @@ func (a *Adapter) Refresh(ctx context.Context, queueName string, keepAlive time.
|
||||||
|
|
||||||
cmd := tx.ZRangeByScore(ctx, lastSeenKey, &redis.ZRangeBy{
|
cmd := tx.ZRangeByScore(ctx, lastSeenKey, &redis.ZRangeBy{
|
||||||
Min: "0",
|
Min: "0",
|
||||||
Max: strconv.FormatInt(expires.Unix(), 10),
|
Max: strconv.FormatInt(expires.UnixNano(), 10),
|
||||||
})
|
})
|
||||||
|
|
||||||
members, err := cmd.Result()
|
members, err := cmd.Result()
|
||||||
|
@ -75,7 +75,7 @@ func (a *Adapter) Touch(ctx context.Context, queueName string, sessionId string)
|
||||||
|
|
||||||
for retry > 0 {
|
for retry > 0 {
|
||||||
err := withTx(ctx, a.client, func(ctx context.Context, tx *redis.Tx) error {
|
err := withTx(ctx, a.client, func(ctx context.Context, tx *redis.Tx) error {
|
||||||
now := time.Now().UTC().Unix()
|
now := time.Now().UTC().UnixNano()
|
||||||
|
|
||||||
err := tx.ZAddNX(ctx, rankKey, redis.Z{Score: float64(now), Member: sessionId}).Err()
|
err := tx.ZAddNX(ctx, rankKey, redis.Z{Score: float64(now), Member: sessionId}).Err()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -0,0 +1,12 @@
|
||||||
|
package redis
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"forge.cadoles.com/cadoles/bouncer/internal/proxy/director/layer/queue/testsuite"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestAdapter(t *testing.T) {
|
||||||
|
adapter := NewAdapter(client, 3)
|
||||||
|
testsuite.TestAdapter(t, adapter)
|
||||||
|
}
|
|
@ -0,0 +1,58 @@
|
||||||
|
package redis
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/ory/dockertest/v3"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
)
|
||||||
|
|
||||||
|
var client redis.UniversalClient
|
||||||
|
|
||||||
|
func TestMain(m *testing.M) {
|
||||||
|
// uses a sensible default on windows (tcp/http) and linux/osx (socket)
|
||||||
|
pool, err := dockertest.NewPool("")
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("%+v", errors.WithStack(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
// uses pool to try to connect to Docker
|
||||||
|
err = pool.Client.Ping()
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("%+v", errors.WithStack(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
// pulls an image, creates a container based on it and runs it
|
||||||
|
resource, err := pool.Run("redis", "alpine3.17", []string{})
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("%+v", errors.WithStack(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := pool.Retry(func() error {
|
||||||
|
client = redis.NewUniversalClient(&redis.UniversalOptions{
|
||||||
|
Addrs: []string{resource.GetHostPort("6379/tcp")},
|
||||||
|
})
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
if cmd := client.Ping(ctx); cmd.Err() != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
|
log.Fatalf("%+v", errors.WithStack(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
code := m.Run()
|
||||||
|
|
||||||
|
if err := pool.Purge(resource); err != nil {
|
||||||
|
log.Fatalf("%+v", errors.WithStack(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
os.Exit(code)
|
||||||
|
}
|
|
@ -0,0 +1,99 @@
|
||||||
|
package testsuite
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"forge.cadoles.com/cadoles/bouncer/internal/proxy/director/layer/queue"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type adapterTestCase struct {
|
||||||
|
Name string
|
||||||
|
Do func(adapter queue.Adapter) error
|
||||||
|
}
|
||||||
|
|
||||||
|
var adapterTestCases = []adapterTestCase{
|
||||||
|
{
|
||||||
|
Name: "Test queue ranking",
|
||||||
|
Do: func(adapter queue.Adapter) error {
|
||||||
|
ctx := context.Background()
|
||||||
|
queueName := "test_queue_ranking"
|
||||||
|
|
||||||
|
sessionIdPattern := "session-%d"
|
||||||
|
totalSessions := int64(100)
|
||||||
|
|
||||||
|
for idx := int64(0); idx < totalSessions; idx++ {
|
||||||
|
sessionId := fmt.Sprintf(sessionIdPattern, idx)
|
||||||
|
rank, err := adapter.Touch(ctx, queueName, sessionId)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrapf(err, "could not touch session '%s' (index: %d, rank: %d)", sessionId, idx, rank)
|
||||||
|
}
|
||||||
|
|
||||||
|
if e, g := int64(idx), rank; e != g {
|
||||||
|
return errors.Errorf("rank('%s'): expected '%v', got '%v'", sessionId, e, g)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
status, err := adapter.Status(ctx, queueName)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "could not retrieve queue status")
|
||||||
|
}
|
||||||
|
|
||||||
|
if e, g := totalSessions, status.Sessions; e != g {
|
||||||
|
return errors.Errorf("status.Sessions: expected '%v', got '%v'", e, g)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "Test session expiration",
|
||||||
|
Do: func(adapter queue.Adapter) error {
|
||||||
|
ctx := context.Background()
|
||||||
|
queueName := "test_session_expiration"
|
||||||
|
|
||||||
|
sessionId := "session-1"
|
||||||
|
|
||||||
|
rank, err := adapter.Touch(ctx, queueName, sessionId)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrapf(err, "could not touch session '%s'", sessionId)
|
||||||
|
}
|
||||||
|
|
||||||
|
if e, g := int64(0), rank; e != g {
|
||||||
|
return errors.Errorf("rank('%s'): expected '%v', got '%v'", sessionId, e, g)
|
||||||
|
}
|
||||||
|
|
||||||
|
<-time.After(time.Second)
|
||||||
|
|
||||||
|
if err := adapter.Refresh(ctx, queueName, time.Second); err != nil {
|
||||||
|
return errors.Wrap(err, "could not refresh queue")
|
||||||
|
}
|
||||||
|
|
||||||
|
status, err := adapter.Status(ctx, queueName)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "could not retrieve queue status")
|
||||||
|
}
|
||||||
|
|
||||||
|
if e, g := int64(0), status.Sessions; e != g {
|
||||||
|
return errors.Errorf("status.Sessions: expected '%v', got '%v'", e, g)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAdapter(t *testing.T, adapter queue.Adapter) {
|
||||||
|
for _, tc := range adapterTestCases {
|
||||||
|
func(tc adapterTestCase) {
|
||||||
|
t.Run(tc.Name, func(t *testing.T) {
|
||||||
|
if err := tc.Do(adapter); err != nil {
|
||||||
|
t.Fatalf("%+v", errors.WithStack(err))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}(tc)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue