package redis import ( "bytes" "context" "encoding/gob" "forge.cadoles.com/cadoles/bouncer/internal/store" "github.com/pkg/errors" "gitlab.com/wpetit/goweb/logger" ) func init() { gob.Register(&LayerChange{}) } type LayerChange struct { Operation ChangeOperation Proxy store.ProxyName Layer store.LayerName } // Change implements store.Change. func (p *LayerChange) Change() {} func NewLayerChange(op ChangeOperation, proxyName store.ProxyName, layerName store.LayerName) *LayerChange { return &LayerChange{ Operation: op, Proxy: proxyName, Layer: layerName, } } var _ store.Change = &ProxyChange{} const layerChangeChannel string = "layer-changes" // Changes implements store.Observable. func (r *LayerRepository) Changes(ctx context.Context, fn func(store.Change)) { go func() { sub := r.client.Subscribe(ctx, layerChangeChannel) defer sub.Close() for { var buff bytes.Buffer decoder := gob.NewDecoder(&buff) msg, err := sub.ReceiveMessage(ctx) if err != nil { logger.Error(ctx, "could not receive message", logger.E(errors.WithStack(err))) return } buff.Reset() buff.WriteString(msg.Payload) change := &ProxyChange{} if err := decoder.Decode(change); err != nil { logger.Error(ctx, "could not decode message", logger.E(errors.WithStack(err))) continue } fn(change) } }() } func (r *LayerRepository) notifyChange(op ChangeOperation, proxyName store.ProxyName, layerName store.LayerName) { change := NewLayerChange(op, proxyName, layerName) var buff bytes.Buffer encoder := gob.NewEncoder(&buff) ctx := context.Background() if err := encoder.Encode(change); err != nil { logger.Error(ctx, "could not encode message", logger.E(errors.WithStack(err))) return } if err := r.client.Publish(ctx, layerChangeChannel, buff.Bytes()).Err(); err != nil { logger.Error(ctx, "could not publish message", logger.E(errors.WithStack(err))) return } } var _ store.Observable = &LayerRepository{}