feat: refactor layers registration
All checks were successful
Cadoles/bouncer/pipeline/head This commit looks good
All checks were successful
Cadoles/bouncer/pipeline/head This commit looks good
This commit is contained in:
167
internal/proxy/director/layer/queue/redis/adapter.go
Normal file
167
internal/proxy/director/layer/queue/redis/adapter.go
Normal file
@ -0,0 +1,167 @@
|
||||
package redis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"forge.cadoles.com/cadoles/bouncer/internal/proxy/director/layer/queue"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
const (
|
||||
keyPrefixQueue = "queue"
|
||||
)
|
||||
|
||||
type Adapter struct {
|
||||
client redis.UniversalClient
|
||||
txMaxRetry int
|
||||
}
|
||||
|
||||
// Refresh implements queue.Adapter
|
||||
func (a *Adapter) Refresh(ctx context.Context, queueName string, keepAlive time.Duration) error {
|
||||
lastSeenKey := lastSeenKey(queueName)
|
||||
rankKey := rankKey(queueName)
|
||||
|
||||
err := withTx(ctx, a.client, func(ctx context.Context, tx *redis.Tx) error {
|
||||
expires := time.Now().UTC().Add(-keepAlive)
|
||||
|
||||
cmd := tx.ZRangeByScore(ctx, lastSeenKey, &redis.ZRangeBy{
|
||||
Min: "0",
|
||||
Max: strconv.FormatInt(expires.Unix(), 10),
|
||||
})
|
||||
|
||||
members, err := cmd.Result()
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if len(members) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
anyMembers := make([]any, len(members))
|
||||
for i, m := range members {
|
||||
anyMembers[i] = m
|
||||
}
|
||||
|
||||
if err := tx.ZRem(ctx, rankKey, anyMembers...).Err(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := tx.ZRem(ctx, lastSeenKey, anyMembers...).Err(); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}, rankKey, lastSeenKey)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Touch implements queue.Adapter
|
||||
func (a *Adapter) Touch(ctx context.Context, queueName string, sessionId string) (int64, error) {
|
||||
lastSeenKey := lastSeenKey(queueName)
|
||||
rankKey := rankKey(queueName)
|
||||
|
||||
var rank int64
|
||||
|
||||
retry := a.txMaxRetry
|
||||
|
||||
for retry > 0 {
|
||||
err := withTx(ctx, a.client, func(ctx context.Context, tx *redis.Tx) error {
|
||||
now := time.Now().UTC().Unix()
|
||||
|
||||
err := tx.ZAddNX(ctx, rankKey, redis.Z{Score: float64(now), Member: sessionId}).Err()
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
err = tx.ZAdd(ctx, lastSeenKey, redis.Z{Score: float64(now), Member: sessionId}).Err()
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
val, err := tx.ZRank(ctx, rankKey, sessionId).Result()
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
rank = val
|
||||
|
||||
return nil
|
||||
}, rankKey, lastSeenKey)
|
||||
if err != nil {
|
||||
if errors.Is(err, redis.Nil) && retry > 0 {
|
||||
retry--
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
return 0, errors.WithStack(err)
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
return rank, nil
|
||||
}
|
||||
|
||||
// Status implements queue.Adapter
|
||||
func (a *Adapter) Status(ctx context.Context, queueName string) (*queue.Status, error) {
|
||||
rankKey := rankKey(queueName)
|
||||
|
||||
status := &queue.Status{}
|
||||
|
||||
cmd := a.client.ZCard(ctx, rankKey)
|
||||
if err := cmd.Err(); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
status.Sessions = cmd.Val()
|
||||
|
||||
return status, nil
|
||||
}
|
||||
|
||||
func NewAdapter(client redis.UniversalClient, txMaxRetry int) *Adapter {
|
||||
return &Adapter{
|
||||
client: client,
|
||||
txMaxRetry: txMaxRetry,
|
||||
}
|
||||
}
|
||||
|
||||
var _ queue.Adapter = &Adapter{}
|
||||
|
||||
func key(parts ...string) string {
|
||||
return strings.Join(parts, ":")
|
||||
}
|
||||
|
||||
func rankKey(queueName string) string {
|
||||
return key(keyPrefixQueue, queueName, "rank")
|
||||
}
|
||||
|
||||
func lastSeenKey(queueName string) string {
|
||||
return key(keyPrefixQueue, queueName, "last_seen")
|
||||
}
|
||||
|
||||
func withTx(ctx context.Context, client redis.UniversalClient, fn func(ctx context.Context, tx *redis.Tx) error, keys ...string) error {
|
||||
txf := func(tx *redis.Tx) error {
|
||||
if err := fn(ctx, tx); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
err := client.Watch(ctx, txf, keys...)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
Reference in New Issue
Block a user