package queue import ( "context" "fmt" "net/http" "sync/atomic" "time" "forge.cadoles.com/Cadoles/go-proxy" "forge.cadoles.com/cadoles/bouncer/internal/proxy/director" "forge.cadoles.com/cadoles/bouncer/internal/store" "github.com/google/uuid" "github.com/pkg/errors" "gitlab.com/wpetit/goweb/logger" ) const LayerType store.LayerType = "queue" type Queue struct { adapter Adapter refreshJobRunning uint32 } // LayerType implements director.MiddlewareLayer func (q *Queue) LayerType() store.LayerType { return LayerType } // Middleware implements director.MiddlewareLayer func (q *Queue) Middleware(layer *store.Layer) proxy.Middleware { return func(h http.Handler) http.Handler { fn := func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() options, err := fromStoreOptions(layer.Options) if err != nil { logger.Error(ctx, "could not parse layer options", logger.E(errors.WithStack(err))) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) return } cookieName := q.getCookieName(layer.Name) cookie, err := r.Cookie(cookieName) if err != nil && !errors.Is(err, http.ErrNoCookie) { logger.Error(ctx, "could not retrieve cookie", logger.E(errors.WithStack(err))) } if cookie == nil { cookie = &http.Cookie{ Name: cookieName, Value: uuid.NewString(), Path: "/", } w.Header().Add("Set-Cookie", cookie.String()) } sessionID := cookie.Value queueName := string(layer.Name) q.refreshQueue(queueName, options.KeepAlive) rank, err := q.adapter.Touch(ctx, queueName, sessionID) if err != nil { logger.Error(ctx, "could not retrieve session rank", logger.E(errors.WithStack(err))) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) return } if rank >= options.Capacity { q.renderQueuePage(w, r, queueName, options, rank) return } ctx = logger.With(ctx, logger.F("queueSessionId", sessionID), logger.F("queueName", queueName), logger.F("queueSessionRank", rank), ) r = r.WithContext(ctx) h.ServeHTTP(w, r) } return http.HandlerFunc(fn) } } func (q *Queue) renderQueuePage(w http.ResponseWriter, r *http.Request, queueName string, options *LayerOptions, rank int64) { ctx := r.Context() status, err := q.adapter.Status(ctx, queueName) if err != nil { logger.Error(ctx, "could not retrieve queue status", logger.E(errors.WithStack(err))) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) return } http.Error(w, fmt.Sprintf("queued (rank: %d, status: %d/%d)", rank+1, status.Sessions, options.Capacity), http.StatusServiceUnavailable) } func (q *Queue) refreshQueue(queueName string, keepAlive time.Duration) { if !atomic.CompareAndSwapUint32(&q.refreshJobRunning, 0, 1) { return } go func() { defer atomic.StoreUint32(&q.refreshJobRunning, 0) ctx, cancel := context.WithTimeout(context.Background(), keepAlive*2) defer cancel() if err := q.adapter.Refresh(ctx, queueName, keepAlive); err != nil { logger.Error(ctx, "could not refresh queue", logger.E(errors.WithStack(err)), logger.F("queue", queueName), ) } }() } func (q *Queue) getCookieName(layerName store.LayerName) string { return fmt.Sprintf("_%s_%s", LayerType, layerName) } func New(adapter Adapter, funcs ...OptionFunc) *Queue { opts := defaultOptions() for _, fn := range funcs { fn(opts) } return &Queue{ adapter: adapter, } } var _ director.MiddlewareLayer = &Queue{}