Skip a task if similar task already active
This commit is contained in:
parent
bd27932783
commit
d220e9326c
|
@ -47,6 +47,19 @@ def make_celery(app):
|
||||||
celery = make_celery(flask_app)
|
celery = make_celery(flask_app)
|
||||||
|
|
||||||
|
|
||||||
|
def is_task_active(fun: str, task_id: str, args: str) -> bool:
|
||||||
|
from celery.task.control import inspect
|
||||||
|
i = inspect()
|
||||||
|
active_tasks: dict = i.active()
|
||||||
|
for _, tasks in active_tasks.items():
|
||||||
|
for task in tasks:
|
||||||
|
if task.get("id") == task_id:
|
||||||
|
continue
|
||||||
|
if task.get("name") == fun and task.get("args") == str(args):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
@celery.task()
|
@celery.task()
|
||||||
def fetch_acme_cert(id):
|
def fetch_acme_cert(id):
|
||||||
"""
|
"""
|
||||||
|
@ -224,5 +237,21 @@ def sync_source(source):
|
||||||
:param source:
|
:param source:
|
||||||
:return:
|
:return:
|
||||||
"""
|
"""
|
||||||
current_app.logger.debug("Syncing source {}".format(source))
|
|
||||||
|
function = f"{__name__}.{sys._getframe().f_code.co_name}"
|
||||||
|
task_id = celery.current_task.request.id
|
||||||
|
log_data = {
|
||||||
|
"function": function,
|
||||||
|
"message": "Syncing source",
|
||||||
|
"source": source,
|
||||||
|
"task_id": task_id,
|
||||||
|
}
|
||||||
|
current_app.logger.debug(log_data)
|
||||||
|
|
||||||
|
if is_task_active(function, task_id, (source,)):
|
||||||
|
log_data["message"] = "Skipping task: Task is already active"
|
||||||
|
current_app.logger.debug(log_data)
|
||||||
|
return
|
||||||
sync([source])
|
sync([source])
|
||||||
|
log_data["message"] = "Done syncing source"
|
||||||
|
current_app.logger.debug(log_data)
|
||||||
|
|
Loading…
Reference in New Issue