feat: observe changes in repository to automatically clear cache
All checks were successful
Cadoles/bouncer/pipeline/pr-master This commit looks good

This commit is contained in:
2025-08-13 18:20:58 +02:00
parent ad4f334bc2
commit 80a1b48966
8 changed files with 229 additions and 4 deletions

View File

@ -11,6 +11,7 @@ import (
"forge.cadoles.com/cadoles/bouncer/internal/command/common"
"forge.cadoles.com/cadoles/bouncer/internal/proxy"
"forge.cadoles.com/cadoles/bouncer/internal/setup"
"forge.cadoles.com/cadoles/bouncer/internal/store"
"github.com/pkg/errors"
"github.com/urfave/cli/v2"
"gitlab.com/wpetit/goweb/logger"
@ -72,6 +73,22 @@ func RunCommand() *cli.Command {
addrs, srvErrs := srv.Start(ctx.Context)
if observableProxyRepository, ok := proxyRepository.(store.Observable); ok {
logger.Info(ctx.Context, "observing proxy repository changes")
observableProxyRepository.Changes(ctx.Context, func(c store.Change) {
logger.Info(ctx.Context, "proxy change detected, clearing cache")
srv.ClearProxyCache()
})
}
if observableLayerRepository, ok := layerRepository.(store.Observable); ok {
logger.Info(ctx.Context, "observing layer repository changes")
observableLayerRepository.Changes(ctx.Context, func(c store.Change) {
logger.Info(ctx.Context, "layer change detected, clearing cache")
srv.ClearLayerCache()
})
}
// Clear director's cache on SIGUSR2
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGUSR2)
@ -80,8 +97,9 @@ func RunCommand() *cli.Command {
for {
select {
case <-sig:
logger.Info(ctx.Context, "received sigusr2, clearing cache")
srv.ClearCache()
logger.Info(ctx.Context, "received sigusr2, clearing whole cache")
srv.ClearProxyCache()
srv.ClearLayerCache()
case <-ctx.Context.Done():
return
}

View File

@ -55,11 +55,14 @@ func (s *Server) Start(ctx context.Context) (<-chan net.Addr, <-chan error) {
return addrs, errs
}
func (s *Server) ClearCache() {
s.layerCache.Clear()
func (s *Server) ClearProxyCache() {
s.proxyCache.Clear()
}
func (s *Server) ClearLayerCache() {
s.layerCache.Clear()
}
func (s *Server) run(parentCtx context.Context, addrs chan net.Addr, errs chan error) {
defer func() {
close(errs)

View File

@ -0,0 +1,11 @@
package store
import "context"
type Change interface {
Change()
}
type Observable interface {
Changes(ctx context.Context, fn func(Change))
}

View File

@ -0,0 +1,87 @@
package redis
import (
"bytes"
"context"
"encoding/gob"
"forge.cadoles.com/cadoles/bouncer/internal/store"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
func init() {
gob.Register(&LayerChange{})
}
type LayerChange struct {
Operation ChangeOperation
Proxy store.ProxyName
Layer store.LayerName
}
// Change implements store.Change.
func (p *LayerChange) Change() {}
func NewLayerChange(op ChangeOperation, proxyName store.ProxyName, layerName store.LayerName) *LayerChange {
return &LayerChange{
Operation: op,
Proxy: proxyName,
Layer: layerName,
}
}
var _ store.Change = &ProxyChange{}
const layerChangeChannel string = "layer-changes"
// Changes implements store.Observable.
func (r *LayerRepository) Changes(ctx context.Context, fn func(store.Change)) {
go func() {
sub := r.client.Subscribe(ctx, layerChangeChannel)
defer sub.Close()
for {
var buff bytes.Buffer
decoder := gob.NewDecoder(&buff)
msg, err := sub.ReceiveMessage(ctx)
if err != nil {
logger.Error(ctx, "could not receive message", logger.E(errors.WithStack(err)))
return
}
buff.Reset()
buff.WriteString(msg.Payload)
change := &ProxyChange{}
if err := decoder.Decode(change); err != nil {
logger.Error(ctx, "could not decode message", logger.E(errors.WithStack(err)))
continue
}
fn(change)
}
}()
}
func (r *LayerRepository) notifyChange(op ChangeOperation, proxyName store.ProxyName, layerName store.LayerName) {
change := NewLayerChange(op, proxyName, layerName)
var buff bytes.Buffer
encoder := gob.NewEncoder(&buff)
ctx := context.Background()
if err := encoder.Encode(change); err != nil {
logger.Error(ctx, "could not encode message", logger.E(errors.WithStack(err)))
return
}
if err := r.client.Publish(ctx, layerChangeChannel, buff.Bytes()).Err(); err != nil {
logger.Error(ctx, "could not publish message", logger.E(errors.WithStack(err)))
return
}
}
var _ store.Observable = &LayerRepository{}

View File

@ -73,6 +73,8 @@ func (r *LayerRepository) CreateLayer(ctx context.Context, proxyName store.Proxy
return nil, errors.WithStack(err)
}
go r.notifyChange(CreateOperation, proxyName, layerName)
return &store.Layer{
LayerHeader: store.LayerHeader{
Name: store.LayerName(layerItem.Name),
@ -96,6 +98,8 @@ func (r *LayerRepository) DeleteLayer(ctx context.Context, proxyName store.Proxy
return errors.WithStack(cmd.Err())
}
go r.notifyChange(DeleteOperation, proxyName, layerName)
return nil
}
@ -249,6 +253,8 @@ func (r *LayerRepository) UpdateLayer(ctx context.Context, proxyName store.Proxy
return nil, errors.WithStack(err)
}
go r.notifyChange(UpdateOperation, proxyName, layerName)
return layer, nil
}

View File

@ -0,0 +1,9 @@
package redis
type ChangeOperation int
const (
CreateOperation ChangeOperation = iota
UpdateOperation
DeleteOperation
)

View File

@ -0,0 +1,85 @@
package redis
import (
"bytes"
"context"
"encoding/gob"
"forge.cadoles.com/cadoles/bouncer/internal/store"
"github.com/pkg/errors"
"gitlab.com/wpetit/goweb/logger"
)
func init() {
gob.Register(&ProxyChange{})
}
type ProxyChange struct {
Operation ChangeOperation
Proxy store.ProxyName
}
// Change implements store.Change.
func (p *ProxyChange) Change() {}
func NewProxyChange(op ChangeOperation, name store.ProxyName) *ProxyChange {
return &ProxyChange{
Operation: op,
Proxy: name,
}
}
var _ store.Change = &ProxyChange{}
const proxyChangeChannel string = "proxy-changes"
// Changes implements store.Observable.
func (r *ProxyRepository) Changes(ctx context.Context, fn func(store.Change)) {
go func() {
sub := r.client.Subscribe(ctx, proxyChangeChannel)
defer sub.Close()
for {
var buff bytes.Buffer
decoder := gob.NewDecoder(&buff)
msg, err := sub.ReceiveMessage(ctx)
if err != nil {
logger.Error(ctx, "could not receive message", logger.E(errors.WithStack(err)))
return
}
buff.Reset()
buff.WriteString(msg.Payload)
change := &ProxyChange{}
if err := decoder.Decode(change); err != nil {
logger.Error(ctx, "could not decode message", logger.E(errors.WithStack(err)))
continue
}
fn(change)
}
}()
}
func (r *ProxyRepository) notifyChange(op ChangeOperation, name store.ProxyName) {
change := NewProxyChange(op, name)
var buff bytes.Buffer
encoder := gob.NewEncoder(&buff)
ctx := context.Background()
if err := encoder.Encode(change); err != nil {
logger.Error(ctx, "could not encode message", logger.E(errors.WithStack(err)))
return
}
if err := r.client.Publish(ctx, proxyChangeChannel, buff.Bytes()).Err(); err != nil {
logger.Error(ctx, "could not publish message", logger.E(errors.WithStack(err)))
return
}
}
var _ store.Observable = &ProxyRepository{}

View File

@ -117,6 +117,8 @@ func (r *ProxyRepository) CreateProxy(ctx context.Context, name store.ProxyName,
return nil, errors.WithStack(err)
}
go r.notifyChange(CreateOperation, name)
return &store.Proxy{
ProxyHeader: store.ProxyHeader{
Name: name,
@ -139,6 +141,8 @@ func (r *ProxyRepository) DeleteProxy(ctx context.Context, name store.ProxyName)
return errors.WithStack(cmd.Err())
}
go r.notifyChange(DeleteOperation, name)
return nil
}
@ -242,6 +246,8 @@ func (r *ProxyRepository) UpdateProxy(ctx context.Context, name store.ProxyName,
return nil, errors.WithStack(err)
}
go r.notifyChange(UpdateOperation, name)
return proxy, nil
}