From d220e9326c0c2a66106a9f3ec74832091c3c7ff2 Mon Sep 17 00:00:00 2001 From: Curtis Castrapel Date: Tue, 12 Mar 2019 14:45:43 -0700 Subject: [PATCH 1/4] Skip a task if similar task already active --- lemur/common/celery.py | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/lemur/common/celery.py b/lemur/common/celery.py index f2a2f826..56837cba 100644 --- a/lemur/common/celery.py +++ b/lemur/common/celery.py @@ -47,6 +47,19 @@ def make_celery(app): celery = make_celery(flask_app) +def is_task_active(fun: str, task_id: str, args: str) -> bool: + from celery.task.control import inspect + i = inspect() + active_tasks: dict = i.active() + for _, tasks in active_tasks.items(): + for task in tasks: + if task.get("id") == task_id: + continue + if task.get("name") == fun and task.get("args") == str(args): + return True + return False + + @celery.task() def fetch_acme_cert(id): """ @@ -224,5 +237,21 @@ def sync_source(source): :param source: :return: """ - current_app.logger.debug("Syncing source {}".format(source)) + + function = f"{__name__}.{sys._getframe().f_code.co_name}" + task_id = celery.current_task.request.id + log_data = { + "function": function, + "message": "Syncing source", + "source": source, + "task_id": task_id, + } + current_app.logger.debug(log_data) + + if is_task_active(function, task_id, (source,)): + log_data["message"] = "Skipping task: Task is already active" + current_app.logger.debug(log_data) + return sync([source]) + log_data["message"] = "Done syncing source" + current_app.logger.debug(log_data) From 1a5a91ccc72ec869ed1ea5f940f33ea92d77c50e Mon Sep 17 00:00:00 2001 From: Curtis Date: Tue, 12 Mar 2019 15:11:13 -0700 Subject: [PATCH 2/4] Update celery.py --- lemur/common/celery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lemur/common/celery.py b/lemur/common/celery.py index 56837cba..b7f23c32 100644 --- a/lemur/common/celery.py +++ b/lemur/common/celery.py @@ -50,7 +50,7 @@ celery = make_celery(flask_app) def is_task_active(fun: str, task_id: str, args: str) -> bool: from celery.task.control import inspect i = inspect() - active_tasks: dict = i.active() + active_tasks = i.active() for _, tasks in active_tasks.items(): for task in tasks: if task.get("id") == task_id: From f38e5b0879225255fb9ba514cb1d81f86fdb8d3c Mon Sep 17 00:00:00 2001 From: Curtis Date: Tue, 12 Mar 2019 15:29:04 -0700 Subject: [PATCH 3/4] Update celery.py --- lemur/common/celery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lemur/common/celery.py b/lemur/common/celery.py index b7f23c32..90b6f9a2 100644 --- a/lemur/common/celery.py +++ b/lemur/common/celery.py @@ -47,7 +47,7 @@ def make_celery(app): celery = make_celery(flask_app) -def is_task_active(fun: str, task_id: str, args: str) -> bool: +def is_task_active(fun, task_id, args): from celery.task.control import inspect i = inspect() active_tasks = i.active() From c445297357e3ab47116beed3d9fe0a4b50582595 Mon Sep 17 00:00:00 2001 From: Curtis Date: Tue, 12 Mar 2019 15:41:24 -0700 Subject: [PATCH 4/4] Update celery.py --- lemur/common/celery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lemur/common/celery.py b/lemur/common/celery.py index 90b6f9a2..991dac2c 100644 --- a/lemur/common/celery.py +++ b/lemur/common/celery.py @@ -238,7 +238,7 @@ def sync_source(source): :return: """ - function = f"{__name__}.{sys._getframe().f_code.co_name}" + function = "{}.{}".format(__name__, sys._getframe().f_code.co_name) task_id = celery.current_task.request.id log_data = { "function": function,