diff --git a/internal/command/server/proxy/run.go b/internal/command/server/proxy/run.go index 263087b..63e3e4c 100644 --- a/internal/command/server/proxy/run.go +++ b/internal/command/server/proxy/run.go @@ -11,6 +11,7 @@ import ( "forge.cadoles.com/cadoles/bouncer/internal/command/common" "forge.cadoles.com/cadoles/bouncer/internal/proxy" "forge.cadoles.com/cadoles/bouncer/internal/setup" + "forge.cadoles.com/cadoles/bouncer/internal/store" "github.com/pkg/errors" "github.com/urfave/cli/v2" "gitlab.com/wpetit/goweb/logger" @@ -72,6 +73,22 @@ func RunCommand() *cli.Command { addrs, srvErrs := srv.Start(ctx.Context) + if observableProxyRepository, ok := proxyRepository.(store.Observable); ok { + logger.Info(ctx.Context, "observing proxy repository changes") + observableProxyRepository.Changes(ctx.Context, func(c store.Change) { + logger.Info(ctx.Context, "proxy change detected, clearing cache") + srv.ClearProxyCache() + }) + } + + if observableLayerRepository, ok := layerRepository.(store.Observable); ok { + logger.Info(ctx.Context, "observing layer repository changes") + observableLayerRepository.Changes(ctx.Context, func(c store.Change) { + logger.Info(ctx.Context, "layer change detected, clearing cache") + srv.ClearLayerCache() + }) + } + // Clear director's cache on SIGUSR2 sig := make(chan os.Signal, 1) signal.Notify(sig, syscall.SIGUSR2) @@ -80,8 +97,9 @@ func RunCommand() *cli.Command { for { select { case <-sig: - logger.Info(ctx.Context, "received sigusr2, clearing cache") - srv.ClearCache() + logger.Info(ctx.Context, "received sigusr2, clearing whole cache") + srv.ClearProxyCache() + srv.ClearLayerCache() case <-ctx.Context.Done(): return } diff --git a/internal/proxy/server.go b/internal/proxy/server.go index fa5cd28..995b585 100644 --- a/internal/proxy/server.go +++ b/internal/proxy/server.go @@ -55,11 +55,14 @@ func (s *Server) Start(ctx context.Context) (<-chan net.Addr, <-chan error) { return addrs, errs } -func (s *Server) ClearCache() { - s.layerCache.Clear() +func (s *Server) ClearProxyCache() { s.proxyCache.Clear() } +func (s *Server) ClearLayerCache() { + s.layerCache.Clear() +} + func (s *Server) run(parentCtx context.Context, addrs chan net.Addr, errs chan error) { defer func() { close(errs) diff --git a/internal/store/observable.go b/internal/store/observable.go new file mode 100644 index 0000000..4950a48 --- /dev/null +++ b/internal/store/observable.go @@ -0,0 +1,11 @@ +package store + +import "context" + +type Change interface { + Change() +} + +type Observable interface { + Changes(ctx context.Context, fn func(Change)) +} diff --git a/internal/store/redis/layer_observable.go b/internal/store/redis/layer_observable.go new file mode 100644 index 0000000..baf52aa --- /dev/null +++ b/internal/store/redis/layer_observable.go @@ -0,0 +1,87 @@ +package redis + +import ( + "bytes" + "context" + "encoding/gob" + + "forge.cadoles.com/cadoles/bouncer/internal/store" + "github.com/pkg/errors" + "gitlab.com/wpetit/goweb/logger" +) + +func init() { + gob.Register(&LayerChange{}) +} + +type LayerChange struct { + Operation ChangeOperation + Proxy store.ProxyName + Layer store.LayerName +} + +// Change implements store.Change. +func (p *LayerChange) Change() {} + +func NewLayerChange(op ChangeOperation, proxyName store.ProxyName, layerName store.LayerName) *LayerChange { + return &LayerChange{ + Operation: op, + Proxy: proxyName, + Layer: layerName, + } +} + +var _ store.Change = &ProxyChange{} + +const layerChangeChannel string = "layer-changes" + +// Changes implements store.Observable. +func (r *LayerRepository) Changes(ctx context.Context, fn func(store.Change)) { + go func() { + sub := r.client.Subscribe(ctx, layerChangeChannel) + defer sub.Close() + + for { + var buff bytes.Buffer + decoder := gob.NewDecoder(&buff) + + msg, err := sub.ReceiveMessage(ctx) + if err != nil { + logger.Error(ctx, "could not receive message", logger.E(errors.WithStack(err))) + return + } + + buff.Reset() + buff.WriteString(msg.Payload) + + change := &ProxyChange{} + if err := decoder.Decode(change); err != nil { + logger.Error(ctx, "could not decode message", logger.E(errors.WithStack(err))) + continue + } + + fn(change) + } + }() +} + +func (r *LayerRepository) notifyChange(op ChangeOperation, proxyName store.ProxyName, layerName store.LayerName) { + change := NewLayerChange(op, proxyName, layerName) + + var buff bytes.Buffer + encoder := gob.NewEncoder(&buff) + + ctx := context.Background() + + if err := encoder.Encode(change); err != nil { + logger.Error(ctx, "could not encode message", logger.E(errors.WithStack(err))) + return + } + + if err := r.client.Publish(ctx, layerChangeChannel, buff.Bytes()).Err(); err != nil { + logger.Error(ctx, "could not publish message", logger.E(errors.WithStack(err))) + return + } +} + +var _ store.Observable = &LayerRepository{} diff --git a/internal/store/redis/layer_repository.go b/internal/store/redis/layer_repository.go index 20bd705..ba16497 100644 --- a/internal/store/redis/layer_repository.go +++ b/internal/store/redis/layer_repository.go @@ -73,6 +73,8 @@ func (r *LayerRepository) CreateLayer(ctx context.Context, proxyName store.Proxy return nil, errors.WithStack(err) } + go r.notifyChange(CreateOperation, proxyName, layerName) + return &store.Layer{ LayerHeader: store.LayerHeader{ Name: store.LayerName(layerItem.Name), @@ -96,6 +98,8 @@ func (r *LayerRepository) DeleteLayer(ctx context.Context, proxyName store.Proxy return errors.WithStack(cmd.Err()) } + go r.notifyChange(DeleteOperation, proxyName, layerName) + return nil } @@ -249,6 +253,8 @@ func (r *LayerRepository) UpdateLayer(ctx context.Context, proxyName store.Proxy return nil, errors.WithStack(err) } + go r.notifyChange(UpdateOperation, proxyName, layerName) + return layer, nil } diff --git a/internal/store/redis/observable.go b/internal/store/redis/observable.go new file mode 100644 index 0000000..469a191 --- /dev/null +++ b/internal/store/redis/observable.go @@ -0,0 +1,9 @@ +package redis + +type ChangeOperation int + +const ( + CreateOperation ChangeOperation = iota + UpdateOperation + DeleteOperation +) diff --git a/internal/store/redis/proxy_observable.go b/internal/store/redis/proxy_observable.go new file mode 100644 index 0000000..acb8986 --- /dev/null +++ b/internal/store/redis/proxy_observable.go @@ -0,0 +1,85 @@ +package redis + +import ( + "bytes" + "context" + "encoding/gob" + + "forge.cadoles.com/cadoles/bouncer/internal/store" + "github.com/pkg/errors" + "gitlab.com/wpetit/goweb/logger" +) + +func init() { + gob.Register(&ProxyChange{}) +} + +type ProxyChange struct { + Operation ChangeOperation + Proxy store.ProxyName +} + +// Change implements store.Change. +func (p *ProxyChange) Change() {} + +func NewProxyChange(op ChangeOperation, name store.ProxyName) *ProxyChange { + return &ProxyChange{ + Operation: op, + Proxy: name, + } +} + +var _ store.Change = &ProxyChange{} + +const proxyChangeChannel string = "proxy-changes" + +// Changes implements store.Observable. +func (r *ProxyRepository) Changes(ctx context.Context, fn func(store.Change)) { + go func() { + sub := r.client.Subscribe(ctx, proxyChangeChannel) + defer sub.Close() + + for { + var buff bytes.Buffer + decoder := gob.NewDecoder(&buff) + + msg, err := sub.ReceiveMessage(ctx) + if err != nil { + logger.Error(ctx, "could not receive message", logger.E(errors.WithStack(err))) + return + } + + buff.Reset() + buff.WriteString(msg.Payload) + + change := &ProxyChange{} + if err := decoder.Decode(change); err != nil { + logger.Error(ctx, "could not decode message", logger.E(errors.WithStack(err))) + continue + } + + fn(change) + } + }() +} + +func (r *ProxyRepository) notifyChange(op ChangeOperation, name store.ProxyName) { + change := NewProxyChange(op, name) + + var buff bytes.Buffer + encoder := gob.NewEncoder(&buff) + + ctx := context.Background() + + if err := encoder.Encode(change); err != nil { + logger.Error(ctx, "could not encode message", logger.E(errors.WithStack(err))) + return + } + + if err := r.client.Publish(ctx, proxyChangeChannel, buff.Bytes()).Err(); err != nil { + logger.Error(ctx, "could not publish message", logger.E(errors.WithStack(err))) + return + } +} + +var _ store.Observable = &ProxyRepository{} diff --git a/internal/store/redis/proxy_repository.go b/internal/store/redis/proxy_repository.go index 9eefd75..2998a72 100644 --- a/internal/store/redis/proxy_repository.go +++ b/internal/store/redis/proxy_repository.go @@ -117,6 +117,8 @@ func (r *ProxyRepository) CreateProxy(ctx context.Context, name store.ProxyName, return nil, errors.WithStack(err) } + go r.notifyChange(CreateOperation, name) + return &store.Proxy{ ProxyHeader: store.ProxyHeader{ Name: name, @@ -139,6 +141,8 @@ func (r *ProxyRepository) DeleteProxy(ctx context.Context, name store.ProxyName) return errors.WithStack(cmd.Err()) } + go r.notifyChange(DeleteOperation, name) + return nil } @@ -242,6 +246,8 @@ func (r *ProxyRepository) UpdateProxy(ctx context.Context, name store.ProxyName, return nil, errors.WithStack(err) } + go r.notifyChange(UpdateOperation, name) + return proxy, nil }