package redis import ( "context" "time" "forge.cadoles.com/cadoles/bouncer/internal/store" "github.com/pkg/errors" "github.com/redis/go-redis/v9" ) const ( keyLayerName = "name" keyLayerProxy = "proxy" keyLayerType = "type" keyLayerOptions = "options" keyLayerUpdatedAt = "updated_at" keyLayerCreatedAt = "created_at" keyLayerWeight = "weight" keyPrefixLayer = "layer:" ) type LayerRepository struct { client redis.UniversalClient } // CreateLayer implements store.LayerRepository func (r *LayerRepository) CreateLayer(ctx context.Context, proxyName store.ProxyName, layerName store.LayerName, layerType store.LayerType, options store.LayerOptions) (*store.Layer, error) { now := time.Now().UTC() key := layerKey(proxyName, layerName) txf := func(tx *redis.Tx) error { exists, err := tx.Exists(ctx, key).Uint64() if err != nil { return errors.WithStack(err) } if exists > 0 { return errors.WithStack(store.ErrAlreadyExist) } _, err = tx.TxPipelined(ctx, func(p redis.Pipeliner) error { p.HMSet(ctx, key, keyLayerName, string(layerName)) p.HMSet(ctx, key, keyLayerType, string(layerType)) p.HMSet(ctx, key, keyLayerProxy, string(proxyName)) p.HMSet(ctx, key, keyLayerOptions, wrap(options)) p.HMSet(ctx, key, keyLayerWeight, wrap(0)) p.HMSet(ctx, key, keyLayerCreatedAt, wrap(now)) p.HMSet(ctx, key, keyLayerUpdatedAt, wrap(now)) return nil }) return err } err := r.client.Watch(ctx, txf, key) if err != nil { return nil, errors.WithStack(err) } return &store.Layer{ LayerHeader: store.LayerHeader{ Name: layerName, Proxy: proxyName, Type: layerType, CreatedAt: now, UpdatedAt: now, }, }, nil } // DeleteLayer implements store.LayerRepository func (r *LayerRepository) DeleteLayer(ctx context.Context, proxyName store.ProxyName, layerName store.LayerName) error { key := layerKey(proxyName, layerName) if cmd := r.client.Del(ctx, key); cmd.Err() != nil { return errors.WithStack(cmd.Err()) } return nil } // GetLayer implements store.LayerRepository func (r *LayerRepository) GetLayer(ctx context.Context, proxyName store.ProxyName, layerName store.LayerName) (*store.Layer, error) { var layer store.Layer key := layerKey(proxyName, layerName) cmd := r.client.HMGet(ctx, key, keyLayerType, keyLayerWeight, keyLayerOptions, keyLayerCreatedAt, keyLayerUpdatedAt) values, err := cmd.Result() if err != nil { return nil, errors.WithStack(err) } if allNilValues(values) { return nil, errors.WithStack(store.ErrNotFound) } layer.Name = layerName layer.Proxy = proxyName layerType, ok := values[0].(string) if !ok { return nil, errors.Errorf("unexpected '%s' value of type '%T'", keyLayerType, values[0]) } layer.Type = store.LayerType(layerType) weight, err := unwrap[int](values[1]) if err != nil { return nil, errors.WithStack(err) } layer.Weight = weight options, err := unwrap[map[string]any](values[2]) if err != nil { return nil, errors.WithStack(err) } layer.Options = options createdAt, err := unwrap[time.Time](values[3]) if err != nil { return nil, errors.WithStack(err) } layer.CreatedAt = createdAt updatedAt, err := unwrap[time.Time](values[4]) if err != nil { return nil, errors.WithStack(err) } layer.UpdatedAt = updatedAt return &layer, nil } // QueryLayers implements store.LayerRepository func (r *LayerRepository) QueryLayers(ctx context.Context, funcs ...store.QueryLayerOptionFunc) ([]*store.LayerHeader, error) { opts := &store.QueryLayerOptions{} for _, fn := range funcs { fn(opts) } keyParts := []string{keyPrefixLayer} if opts.Proxy != nil { keyParts = append(keyParts, string(*opts.Proxy)) } else { keyParts = append(keyParts, "*") } if opts.Name != nil { keyParts = append(keyParts, string(*opts.Name)) } else { keyParts = append(keyParts, "*") } key := key(keyParts...) iter := r.client.Scan(ctx, uint64(*opts.Offset), key, int64(*opts.Limit)).Iterator() headers := make([]*store.LayerHeader, 0) for iter.Next(ctx) { key := iter.Val() cmd := r.client.HMGet(ctx, key, keyLayerName, keyLayerProxy, keyLayerType, keyLayerCreatedAt, keyLayerUpdatedAt) values, err := cmd.Result() if err != nil { return nil, errors.WithStack(err) } if allNilValues(values) { continue } layerName, ok := values[0].(string) if !ok { return nil, errors.Errorf("unexpected '%s' field value for key '%s': '%s'", keyLayerName, key, values[0]) } proxyName, ok := values[1].(string) if !ok { return nil, errors.Errorf("unexpected '%s' field value for key '%s': '%s'", keyProxyName, key, values[1]) } layerType, ok := values[2].(string) if !ok { return nil, errors.Errorf("unexpected '%s' field value for key '%s': '%s'", keyLayerType, key, values[1]) } createdAt, err := unwrap[time.Time](values[3]) if err != nil { return nil, errors.WithStack(err) } updatedAt, err := unwrap[time.Time](values[4]) if err != nil { return nil, errors.WithStack(err) } h := &store.LayerHeader{ Name: store.LayerName(layerName), Proxy: store.ProxyName(proxyName), Type: store.LayerType(layerType), CreatedAt: createdAt, UpdatedAt: updatedAt, } headers = append(headers, h) } if err := iter.Err(); err != nil { return nil, errors.WithStack(err) } return headers, nil } // UpdateLayer implements store.LayerRepository func (r *LayerRepository) UpdateLayer(ctx context.Context, proxyName store.ProxyName, layerName store.LayerName, options store.LayerOptions) error { panic("unimplemented") } func NewLayerRepository(client redis.UniversalClient) *LayerRepository { return &LayerRepository{ client: client, } } var _ store.LayerRepository = &LayerRepository{} func layerKey(proxyName store.ProxyName, layerName store.LayerName) string { return key(keyPrefixLayer, string(proxyName), string(layerName)) }