bouncer/internal/store/redis/proxy_repository.go

141 lines
3.1 KiB
Go

package redis
import (
"context"
"encoding/json"
"net/url"
"time"
"forge.cadoles.com/cadoles/bouncer/internal/store"
"github.com/pkg/errors"
"github.com/redis/go-redis/v9"
)
const (
keyID = "id"
keyFrom = "from"
keyTo = "to"
keyUpdatedAt = "updated_at"
keyCreatedAt = "created_at"
keyPrefixProxy = "proxy:"
)
type ProxyRepository struct {
client redis.UniversalClient
}
// GetProxy implements store.ProxyRepository
func (r *ProxyRepository) GetProxy(ctx context.Context, id store.ProxyID) (*store.Proxy, error) {
panic("unimplemented")
}
// CreateProxy implements store.ProxyRepository
func (r *ProxyRepository) CreateProxy(ctx context.Context, to *url.URL, from ...string) (*store.Proxy, error) {
id := store.NewProxyID()
now := time.Now().UTC()
_, err := r.client.Pipelined(ctx, func(p redis.Pipeliner) error {
key := proxyKey(id)
nowStr := now.Format(time.RFC3339)
rawFrom, err := json.Marshal(from)
if err != nil {
return errors.WithStack(err)
}
p.HMSet(ctx, key, keyID, string(id))
p.HMSet(ctx, key, keyFrom, rawFrom)
p.HMSet(ctx, key, keyTo, to.String())
p.HMSet(ctx, key, keyCreatedAt, nowStr)
p.HMSet(ctx, key, keyUpdatedAt, nowStr)
return nil
})
if err != nil {
return nil, errors.WithStack(err)
}
return &store.Proxy{
ProxyHeader: store.ProxyHeader{
ID: id,
CreatedAt: now,
UpdatedAt: now,
},
To: to,
From: from,
}, nil
}
// DeleteProxy implements store.ProxyRepository
func (r *ProxyRepository) DeleteProxy(ctx context.Context, id store.ProxyID) error {
key := proxyKey(id)
if cmd := r.client.Del(ctx, key); cmd.Err() != nil {
return errors.WithStack(cmd.Err())
}
return nil
}
// QueryProxies implements store.ProxyRepository
func (r *ProxyRepository) QueryProxies(ctx context.Context, funcs ...store.QueryProxyOptionFunc) ([]*store.ProxyHeader, error) {
iter := r.client.Scan(ctx, 0, keyPrefixProxy+"*", 0).Iterator()
headers := make([]*store.ProxyHeader, 0)
for iter.Next(ctx) {
var rawID string
var rawCreatedAt string
var rawUpdatedAt string
_, err := r.client.Pipelined(ctx, func(p redis.Pipeliner) error {
return nil
})
if err != nil {
return nil, errors.WithStack(err)
}
createdAt, err := time.Parse(rawCreatedAt, time.RFC3339)
if err != nil {
return nil, errors.WithStack(err)
}
updatedAt, err := time.Parse(rawUpdatedAt, time.RFC3339)
if err != nil {
return nil, errors.WithStack(err)
}
h := &store.ProxyHeader{
ID: store.ProxyID(rawID),
CreatedAt: createdAt,
UpdatedAt: updatedAt,
}
headers = append(headers, h)
}
if err := iter.Err(); err != nil {
return nil, errors.WithStack(err)
}
return headers, nil
}
// UpdateProxy implements store.ProxyRepository
func (*ProxyRepository) UpdateProxy(ctx context.Context, id store.ProxyID, funcs ...store.UpdateProxyOptionFunc) (*store.Proxy, error) {
panic("unimplemented")
}
func NewProxyRepository(client redis.UniversalClient) *ProxyRepository {
return &ProxyRepository{
client: client,
}
}
var _ store.ProxyRepository = &ProxyRepository{}
func proxyKey(id store.ProxyID) string {
return keyPrefixProxy + string(id)
}