package queue import ( "context" "fmt" "html/template" "net/http" "path/filepath" "sync" "sync/atomic" "time" "forge.cadoles.com/Cadoles/go-proxy" "forge.cadoles.com/cadoles/bouncer/internal/proxy/director" "forge.cadoles.com/cadoles/bouncer/internal/store" "github.com/Masterminds/sprig/v3" "github.com/google/uuid" "github.com/pkg/errors" "gitlab.com/wpetit/goweb/logger" ) const LayerType store.LayerType = "queue" type Queue struct { adapter Adapter defaultKeepAlive time.Duration templateDir string loadOnce sync.Once tmpl *template.Template refreshJobRunning uint32 } // LayerType implements director.MiddlewareLayer func (q *Queue) LayerType() store.LayerType { return LayerType } // Middleware implements director.MiddlewareLayer func (q *Queue) Middleware(layer *store.Layer) proxy.Middleware { return func(h http.Handler) http.Handler { fn := func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() options, err := fromStoreOptions(layer.Options, q.defaultKeepAlive) if err != nil { logger.Error(ctx, "could not parse layer options", logger.E(errors.WithStack(err))) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) return } cookieName := q.getCookieName(layer.Name) cookie, err := r.Cookie(cookieName) if err != nil && !errors.Is(err, http.ErrNoCookie) { logger.Error(ctx, "could not retrieve cookie", logger.E(errors.WithStack(err))) } if cookie == nil { cookie = &http.Cookie{ Name: cookieName, Value: uuid.NewString(), Path: "/", } w.Header().Add("Set-Cookie", cookie.String()) } sessionID := cookie.Value queueName := string(layer.Name) q.refreshQueue(queueName, options.KeepAlive) rank, err := q.adapter.Touch(ctx, queueName, sessionID) if err != nil { logger.Error(ctx, "could not retrieve session rank", logger.E(errors.WithStack(err))) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) return } if rank >= options.Capacity { q.renderQueuePage(w, r, queueName, options, rank) return } ctx = logger.With(ctx, logger.F("queueSessionId", sessionID), logger.F("queueName", queueName), logger.F("queueSessionRank", rank), ) r = r.WithContext(ctx) h.ServeHTTP(w, r) } return http.HandlerFunc(fn) } } func (q *Queue) renderQueuePage(w http.ResponseWriter, r *http.Request, queueName string, options *LayerOptions, rank int64) { ctx := r.Context() status, err := q.adapter.Status(ctx, queueName) if err != nil { logger.Error(ctx, "could not retrieve queue status", logger.E(errors.WithStack(err))) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) return } q.loadOnce.Do(func() { pattern := filepath.Join(q.templateDir, "*.gohtml") logger.Info(ctx, "loading queue page templates", logger.F("pattern", pattern)) tmpl, err := template.New("").Funcs(sprig.FuncMap()).ParseGlob(pattern) if err != nil { logger.Error(ctx, "could not load queue templates", logger.E(errors.WithStack(err))) return } q.tmpl = tmpl }) if q.tmpl == nil { logger.Error(ctx, "queue page templates not loaded", logger.E(errors.WithStack(err))) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) return } templateData := struct { QueueName string LayerOptions *LayerOptions Rank int64 CurrentSessions int64 MaxSessions int64 RefreshRate int64 }{ QueueName: queueName, LayerOptions: options, Rank: rank + 1, CurrentSessions: status.Sessions, MaxSessions: options.Capacity, RefreshRate: 5, } w.WriteHeader(http.StatusServiceUnavailable) if err := q.tmpl.ExecuteTemplate(w, "queue", templateData); err != nil { logger.Error(ctx, "could not render queue page", logger.E(errors.WithStack(err))) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) return } } func (q *Queue) refreshQueue(queueName string, keepAlive time.Duration) { if !atomic.CompareAndSwapUint32(&q.refreshJobRunning, 0, 1) { return } go func() { defer atomic.StoreUint32(&q.refreshJobRunning, 0) ctx, cancel := context.WithTimeout(context.Background(), keepAlive*2) defer cancel() if err := q.adapter.Refresh(ctx, queueName, keepAlive); err != nil { logger.Error(ctx, "could not refresh queue", logger.E(errors.WithStack(err)), logger.F("queue", queueName), ) } }() } func (q *Queue) getCookieName(layerName store.LayerName) string { return fmt.Sprintf("_%s_%s", LayerType, layerName) } func New(adapter Adapter, funcs ...OptionFunc) *Queue { opts := defaultOptions() for _, fn := range funcs { fn(opts) } return &Queue{ adapter: adapter, templateDir: opts.TemplateDir, defaultKeepAlive: opts.DefaultKeepAlive, } } var _ director.MiddlewareLayer = &Queue{}