diff --git a/go.mod b/go.mod index 2873747..d82a25b 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( forge.cadoles.com/Cadoles/go-proxy v0.0.0-20230701194111-c6b3d482cca6 github.com/Masterminds/sprig/v3 v3.2.3 github.com/btcsuite/btcd/btcutil v1.1.3 + github.com/davecgh/go-spew v1.1.1 github.com/go-chi/chi/v5 v5.0.8 github.com/jedib0t/go-pretty/v6 v6.4.6 github.com/mitchellh/mapstructure v1.4.1 diff --git a/internal/proxy/director/layer/queue/redis/adapter.go b/internal/proxy/director/layer/queue/redis/adapter.go index 515dae7..d65fcf0 100644 --- a/internal/proxy/director/layer/queue/redis/adapter.go +++ b/internal/proxy/director/layer/queue/redis/adapter.go @@ -30,7 +30,7 @@ func (a *Adapter) Refresh(ctx context.Context, queueName string, keepAlive time. cmd := tx.ZRangeByScore(ctx, lastSeenKey, &redis.ZRangeBy{ Min: "0", - Max: strconv.FormatInt(expires.Unix(), 10), + Max: strconv.FormatInt(expires.UnixNano(), 10), }) members, err := cmd.Result() @@ -75,7 +75,7 @@ func (a *Adapter) Touch(ctx context.Context, queueName string, sessionId string) for retry > 0 { err := withTx(ctx, a.client, func(ctx context.Context, tx *redis.Tx) error { - now := time.Now().UTC().Unix() + now := time.Now().UTC().UnixNano() err := tx.ZAddNX(ctx, rankKey, redis.Z{Score: float64(now), Member: sessionId}).Err() if err != nil { diff --git a/internal/proxy/director/layer/queue/redis/adapter_test.go b/internal/proxy/director/layer/queue/redis/adapter_test.go new file mode 100644 index 0000000..ccb02a6 --- /dev/null +++ b/internal/proxy/director/layer/queue/redis/adapter_test.go @@ -0,0 +1,12 @@ +package redis + +import ( + "testing" + + "forge.cadoles.com/cadoles/bouncer/internal/proxy/director/layer/queue/testsuite" +) + +func TestAdapter(t *testing.T) { + adapter := NewAdapter(client, 3) + testsuite.TestAdapter(t, adapter) +} diff --git a/internal/proxy/director/layer/queue/redis/main_test.go b/internal/proxy/director/layer/queue/redis/main_test.go new file mode 100644 index 0000000..0939b93 --- /dev/null +++ b/internal/proxy/director/layer/queue/redis/main_test.go @@ -0,0 +1,58 @@ +package redis + +import ( + "context" + "log" + "os" + "testing" + + "github.com/ory/dockertest/v3" + "github.com/pkg/errors" + "github.com/redis/go-redis/v9" +) + +var client redis.UniversalClient + +func TestMain(m *testing.M) { + // uses a sensible default on windows (tcp/http) and linux/osx (socket) + pool, err := dockertest.NewPool("") + if err != nil { + log.Fatalf("%+v", errors.WithStack(err)) + } + + // uses pool to try to connect to Docker + err = pool.Client.Ping() + if err != nil { + log.Fatalf("%+v", errors.WithStack(err)) + } + + // pulls an image, creates a container based on it and runs it + resource, err := pool.Run("redis", "alpine3.17", []string{}) + if err != nil { + log.Fatalf("%+v", errors.WithStack(err)) + } + + if err := pool.Retry(func() error { + client = redis.NewUniversalClient(&redis.UniversalOptions{ + Addrs: []string{resource.GetHostPort("6379/tcp")}, + }) + + ctx := context.Background() + + if cmd := client.Ping(ctx); cmd.Err() != nil { + return errors.WithStack(err) + } + + return nil + }); err != nil { + log.Fatalf("%+v", errors.WithStack(err)) + } + + code := m.Run() + + if err := pool.Purge(resource); err != nil { + log.Fatalf("%+v", errors.WithStack(err)) + } + + os.Exit(code) +} diff --git a/internal/proxy/director/layer/queue/testsuite/adapter.go b/internal/proxy/director/layer/queue/testsuite/adapter.go new file mode 100644 index 0000000..5849170 --- /dev/null +++ b/internal/proxy/director/layer/queue/testsuite/adapter.go @@ -0,0 +1,99 @@ +package testsuite + +import ( + "context" + "fmt" + "testing" + "time" + + "forge.cadoles.com/cadoles/bouncer/internal/proxy/director/layer/queue" + "github.com/pkg/errors" +) + +type adapterTestCase struct { + Name string + Do func(adapter queue.Adapter) error +} + +var adapterTestCases = []adapterTestCase{ + { + Name: "Test queue ranking", + Do: func(adapter queue.Adapter) error { + ctx := context.Background() + queueName := "test_queue_ranking" + + sessionIdPattern := "session-%d" + totalSessions := int64(100) + + for idx := int64(0); idx < totalSessions; idx++ { + sessionId := fmt.Sprintf(sessionIdPattern, idx) + rank, err := adapter.Touch(ctx, queueName, sessionId) + if err != nil { + return errors.Wrapf(err, "could not touch session '%s' (index: %d, rank: %d)", sessionId, idx, rank) + } + + if e, g := int64(idx), rank; e != g { + return errors.Errorf("rank('%s'): expected '%v', got '%v'", sessionId, e, g) + } + } + + status, err := adapter.Status(ctx, queueName) + if err != nil { + return errors.Wrap(err, "could not retrieve queue status") + } + + if e, g := totalSessions, status.Sessions; e != g { + return errors.Errorf("status.Sessions: expected '%v', got '%v'", e, g) + } + + return nil + }, + }, + { + Name: "Test session expiration", + Do: func(adapter queue.Adapter) error { + ctx := context.Background() + queueName := "test_session_expiration" + + sessionId := "session-1" + + rank, err := adapter.Touch(ctx, queueName, sessionId) + if err != nil { + return errors.Wrapf(err, "could not touch session '%s'", sessionId) + } + + if e, g := int64(0), rank; e != g { + return errors.Errorf("rank('%s'): expected '%v', got '%v'", sessionId, e, g) + } + + <-time.After(time.Second) + + if err := adapter.Refresh(ctx, queueName, time.Second); err != nil { + return errors.Wrap(err, "could not refresh queue") + } + + status, err := adapter.Status(ctx, queueName) + if err != nil { + return errors.Wrap(err, "could not retrieve queue status") + } + + if e, g := int64(0), status.Sessions; e != g { + return errors.Errorf("status.Sessions: expected '%v', got '%v'", e, g) + } + + return nil + }, + }, +} + +func TestAdapter(t *testing.T, adapter queue.Adapter) { + for _, tc := range adapterTestCases { + func(tc adapterTestCase) { + t.Run(tc.Name, func(t *testing.T) { + if err := tc.Do(adapter); err != nil { + t.Fatalf("%+v", errors.WithStack(err)) + } + }) + }(tc) + } +}