diff --git a/lemur/common/celery.py b/lemur/common/celery.py index f2a2f826..991dac2c 100644 --- a/lemur/common/celery.py +++ b/lemur/common/celery.py @@ -47,6 +47,19 @@ def make_celery(app): celery = make_celery(flask_app) +def is_task_active(fun, task_id, args): + from celery.task.control import inspect + i = inspect() + active_tasks = i.active() + for _, tasks in active_tasks.items(): + for task in tasks: + if task.get("id") == task_id: + continue + if task.get("name") == fun and task.get("args") == str(args): + return True + return False + + @celery.task() def fetch_acme_cert(id): """ @@ -224,5 +237,21 @@ def sync_source(source): :param source: :return: """ - current_app.logger.debug("Syncing source {}".format(source)) + + function = "{}.{}".format(__name__, sys._getframe().f_code.co_name) + task_id = celery.current_task.request.id + log_data = { + "function": function, + "message": "Syncing source", + "source": source, + "task_id": task_id, + } + current_app.logger.debug(log_data) + + if is_task_active(function, task_id, (source,)): + log_data["message"] = "Skipping task: Task is already active" + current_app.logger.debug(log_data) + return sync([source]) + log_data["message"] = "Done syncing source" + current_app.logger.debug(log_data)