Merge pull request #2689 from castrapel/skip_duplicate_tasks
Skip a task if similar task already active
This commit is contained in:
commit
cfe0595918
|
@ -47,6 +47,19 @@ def make_celery(app):
|
|||
celery = make_celery(flask_app)
|
||||
|
||||
|
||||
def is_task_active(fun, task_id, args):
|
||||
from celery.task.control import inspect
|
||||
i = inspect()
|
||||
active_tasks = i.active()
|
||||
for _, tasks in active_tasks.items():
|
||||
for task in tasks:
|
||||
if task.get("id") == task_id:
|
||||
continue
|
||||
if task.get("name") == fun and task.get("args") == str(args):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
@celery.task()
|
||||
def fetch_acme_cert(id):
|
||||
"""
|
||||
|
@ -224,5 +237,21 @@ def sync_source(source):
|
|||
:param source:
|
||||
:return:
|
||||
"""
|
||||
current_app.logger.debug("Syncing source {}".format(source))
|
||||
|
||||
function = "{}.{}".format(__name__, sys._getframe().f_code.co_name)
|
||||
task_id = celery.current_task.request.id
|
||||
log_data = {
|
||||
"function": function,
|
||||
"message": "Syncing source",
|
||||
"source": source,
|
||||
"task_id": task_id,
|
||||
}
|
||||
current_app.logger.debug(log_data)
|
||||
|
||||
if is_task_active(function, task_id, (source,)):
|
||||
log_data["message"] = "Skipping task: Task is already active"
|
||||
current_app.logger.debug(log_data)
|
||||
return
|
||||
sync([source])
|
||||
log_data["message"] = "Done syncing source"
|
||||
current_app.logger.debug(log_data)
|
||||
|
|
Loading…
Reference in New Issue