package queue import ( "bytes" "context" "fmt" "html/template" "io" "math/rand" "net/http" "path/filepath" "strconv" "sync" "sync/atomic" "time" "forge.cadoles.com/Cadoles/go-proxy" "forge.cadoles.com/Cadoles/go-proxy/wildcard" "forge.cadoles.com/cadoles/bouncer/internal/proxy/director" "forge.cadoles.com/cadoles/bouncer/internal/store" "github.com/Masterminds/sprig/v3" "github.com/google/uuid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "gitlab.com/wpetit/goweb/logger" ) const LayerType store.LayerType = "queue" type Queue struct { adapter Adapter defaultKeepAlive time.Duration templateDir string loadOnce sync.Once tmpl *template.Template refreshJobRunning uint32 updateMetricsJobRunning uint32 postKeepAliveDebouncer *DebouncerMap } // LayerType implements director.MiddlewareLayer func (q *Queue) LayerType() store.LayerType { return LayerType } // Middleware implements director.MiddlewareLayer func (q *Queue) Middleware(layer *store.Layer) proxy.Middleware { return func(h http.Handler) http.Handler { fn := func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() options, err := fromStoreOptions(layer.Options, q.defaultKeepAlive) if err != nil { director.HandleError(ctx, w, r, http.StatusInternalServerError, errors.Wrap(err, "could not parse layer options")) return } matches := wildcard.MatchAny(r.URL.String(), options.MatchURLs...) if !matches { h.ServeHTTP(w, r) return } defer q.updateMetrics(layer.Proxy, layer.Name, options) cookieName := q.getCookieName(layer.Name) cookie, err := r.Cookie(cookieName) if err != nil && !errors.Is(err, http.ErrNoCookie) { logger.Error(ctx, "could not retrieve cookie", logger.CapturedE(errors.WithStack(err))) } if cookie == nil { cookie = &http.Cookie{ Name: cookieName, Value: uuid.NewString(), Path: "/", } w.Header().Add("Set-Cookie", cookie.String()) } sessionID := cookie.Value queueName := string(layer.Name) rank, err := q.adapter.Touch(ctx, queueName, sessionID) if err != nil { director.HandleError(ctx, w, r, http.StatusInternalServerError, errors.Wrap(err, "could not update queue session rank")) return } if rank >= options.Capacity { q.renderQueuePage(w, r, queueName, options, rank) return } ctx = logger.With(ctx, logger.F("queueSessionId", sessionID), logger.F("queueName", queueName), logger.F("queueSessionRank", rank), ) r = r.WithContext(ctx) h.ServeHTTP(w, r) } return http.HandlerFunc(fn) } } func (q *Queue) updateSessionsMetric(ctx context.Context, proxyName store.ProxyName, layerName store.LayerName) { if !atomic.CompareAndSwapUint32(&q.updateMetricsJobRunning, 0, 1) { return } defer atomic.StoreUint32(&q.updateMetricsJobRunning, 0) queueName := string(layerName) status, err := q.adapter.Status(ctx, queueName) if err != nil { logger.Error(ctx, "could not retrieve queue status", logger.CapturedE(errors.WithStack(err))) return } metricQueueSessions.With( prometheus.Labels{ metricLabelLayer: string(layerName), metricLabelProxy: string(proxyName), }, ).Set(float64(status.Sessions)) } func (q *Queue) renderQueuePage(w http.ResponseWriter, r *http.Request, queueName string, options *LayerOptions, rank int64) { ctx := r.Context() status, err := q.adapter.Status(ctx, queueName) if err != nil { director.HandleError(ctx, w, r, http.StatusInternalServerError, errors.Wrap(err, "could not retrieve queue status")) return } q.loadOnce.Do(func() { pattern := filepath.Join(q.templateDir, "*.gohtml") logger.Info(ctx, "loading queue page templates", logger.F("pattern", pattern)) tmpl, err := template.New("").Funcs(sprig.FuncMap()).ParseGlob(pattern) if err != nil { logger.Error(ctx, "could not load queue templates", logger.CapturedE(errors.WithStack(err))) return } q.tmpl = tmpl }) if q.tmpl == nil { director.HandleError(ctx, w, r, http.StatusInternalServerError, errors.New("queue page templates not loaded")) return } refreshRate := time.Duration(int64(options.KeepAlive.Seconds()/2)) * time.Second templateData := struct { QueueName string LayerOptions *LayerOptions Rank int64 CurrentSessions int64 MaxSessions int64 RefreshRate time.Duration }{ QueueName: queueName, LayerOptions: options, Rank: rank + 1, CurrentSessions: status.Sessions, MaxSessions: options.Capacity, RefreshRate: refreshRate, } w.Header().Add("Cache-Control", "no-cache") w.Header().Add("Retry-After", strconv.FormatInt(int64(refreshRate.Seconds()), 10)) w.WriteHeader(http.StatusServiceUnavailable) var buf bytes.Buffer if err := q.tmpl.ExecuteTemplate(&buf, "queue", templateData); err != nil { director.HandleError(ctx, w, r, http.StatusInternalServerError, errors.Wrap(err, "could not render queue page")) return } if _, err := io.Copy(w, &buf); err != nil { logger.Error(ctx, "could not write queue page", logger.CapturedE(errors.WithStack(err))) } } func (q *Queue) refreshQueue(ctx context.Context, layerName store.LayerName, keepAlive time.Duration) { if !atomic.CompareAndSwapUint32(&q.refreshJobRunning, 0, 1) { return } defer atomic.StoreUint32(&q.refreshJobRunning, 0) if err := q.adapter.Refresh(ctx, string(layerName), keepAlive); err != nil { logger.Error(ctx, "could not refresh queue", logger.CapturedE(errors.WithStack(err)), logger.F("queue", layerName), ) } } func (q *Queue) updateMetrics(proxyName store.ProxyName, layerName store.LayerName, options *LayerOptions) { ctx := context.Background() // Update queue capacity metric metricQueueCapacity.With( prometheus.Labels{ metricLabelLayer: string(layerName), metricLabelProxy: string(proxyName), }, ).Set(float64(options.Capacity)) // Refresh queue data and metrics q.refreshQueue(ctx, layerName, options.KeepAlive) q.updateSessionsMetric(ctx, proxyName, layerName) // (Re)schedule an update job after session ttl + semi-random time padding // to update metrics after last session expiration randDuration := rand.Int63n(int64(options.KeepAlive)) timePadding := options.KeepAlive/2 + time.Duration(randDuration) after := options.KeepAlive + timePadding debouncingKey := fmt.Sprintf("%s/%s", proxyName, layerName) q.postKeepAliveDebouncer.Do(debouncingKey, after, func() { ctx := logger.With( context.Background(), logger.F("proxy", proxyName), logger.F("layer", layerName), logger.F("after", after), ) logger.Info(ctx, "running post keep alive refresh job") q.refreshQueue(ctx, layerName, options.KeepAlive) q.updateSessionsMetric(ctx, proxyName, layerName) }) } func (q *Queue) getCookieName(layerName store.LayerName) string { return fmt.Sprintf("_bouncer_%s_%s", LayerType, layerName) } func New(adapter Adapter, funcs ...OptionFunc) *Queue { opts := defaultOptions() for _, fn := range funcs { fn(opts) } return &Queue{ adapter: adapter, templateDir: opts.TemplateDir, defaultKeepAlive: opts.DefaultKeepAlive, postKeepAliveDebouncer: NewDebouncerMap(), } } var _ director.MiddlewareLayer = &Queue{}