diff --git a/lemur/common/celery.py b/lemur/common/celery.py index fa739029..a37f96e5 100644 --- a/lemur/common/celery.py +++ b/lemur/common/celery.py @@ -91,6 +91,21 @@ def report_celery_last_success_metrics(): """ function = f"{__name__}.{sys._getframe().f_code.co_name}" + task_id = None + if celery.current_task: + task_id = celery.current_task.request.id + + log_data = { + "function": function, + "message": "recurrent task", + "task_id": task_id, + } + + if task_id and is_task_active(function, task_id, None): + log_data["message"] = "Skipping task: Task is already active" + current_app.logger.debug(log_data) + return + current_time = int(time.time()) schedule = current_app.config.get('CELERYBEAT_SCHEDULE') for _, t in schedule.items(): @@ -215,15 +230,25 @@ def fetch_acme_cert(id): @celery.task() def fetch_all_pending_acme_certs(): """Instantiate celery workers to resolve all pending Acme certificates""" - pending_certs = pending_certificate_service.get_unresolved_pending_certs() function = f"{__name__}.{sys._getframe().f_code.co_name}" + task_id = None + if celery.current_task: + task_id = celery.current_task.request.id + log_data = { "function": function, "message": "Starting job.", + "task_id": task_id, } + if task_id and is_task_active(function, task_id, None): + log_data["message"] = "Skipping task: Task is already active" + current_app.logger.debug(log_data) + return + current_app.logger.debug(log_data) + pending_certs = pending_certificate_service.get_unresolved_pending_certs() # We only care about certs using the acme-issuer plugin for cert in pending_certs: @@ -244,21 +269,23 @@ def fetch_all_pending_acme_certs(): def remove_old_acme_certs(): """Prune old pending acme certificates from the database""" function = f"{__name__}.{sys._getframe().f_code.co_name}" - log_data = { - "function": function, - "message": "Starting job.", - } - pending_certs = pending_certificate_service.get_pending_certs("all") - task_id = None if celery.current_task: task_id = celery.current_task.request.id - if task_id and is_task_active(function, task_id, (id,)): + log_data = { + "function": function, + "message": "Starting job.", + "task_id": task_id, + } + + if task_id and is_task_active(function, task_id, None): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return + pending_certs = pending_certificate_service.get_pending_certs("all") + # Delete pending certs more than a week old for cert in pending_certs: if datetime.now(timezone.utc) - cert.last_updated > timedelta(days=7): @@ -279,10 +306,21 @@ def clean_all_sources(): be ran periodically. This function triggers one celery task per source. """ function = f"{__name__}.{sys._getframe().f_code.co_name}" + task_id = None + if celery.current_task: + task_id = celery.current_task.request.id + log_data = { "function": function, "message": "Creating celery task to clean source", + "task_id": task_id, } + + if task_id and is_task_active(function, task_id, None): + log_data["message"] = "Skipping task: Task is already active" + current_app.logger.debug(log_data) + return + sources = validate_sources("all") for source in sources: log_data["source"] = source.label @@ -303,11 +341,22 @@ def clean_source(source): :return: """ function = f"{__name__}.{sys._getframe().f_code.co_name}" + task_id = None + if celery.current_task: + task_id = celery.current_task.request.id + log_data = { "function": function, "message": "Cleaning source", "source": source, + "task_id": task_id, } + + if task_id and is_task_active(function, task_id, (source,)): + log_data["message"] = "Skipping task: Task is already active" + current_app.logger.debug(log_data) + return + current_app.logger.debug(log_data) clean([source], True) @@ -318,16 +367,17 @@ def sync_all_sources(): This function will sync certificates from all sources. This function triggers one celery task per source. """ function = f"{__name__}.{sys._getframe().f_code.co_name}" - log_data = { - "function": function, - "message": "creating celery task to sync source", - } - task_id = None if celery.current_task: task_id = celery.current_task.request.id - if task_id and is_task_active(function, task_id, (id,)): + log_data = { + "function": function, + "message": "creating celery task to sync source", + "task_id": task_id, + } + + if task_id and is_task_active(function, task_id, None): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return @@ -355,6 +405,7 @@ def sync_source(source): task_id = None if celery.current_task: task_id = celery.current_task.request.id + log_data = { "function": function, "message": "Syncing source", @@ -362,21 +413,12 @@ def sync_source(source): "task_id": task_id, } - task_id = None - if celery.current_task: - task_id = celery.current_task.request.id - - if task_id and is_task_active(function, task_id, (id,)): + if task_id and is_task_active(function, task_id, (source,)): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return current_app.logger.debug(log_data) - - if task_id and is_task_active(function, task_id, (source,)): - log_data["message"] = "Skipping task: Task is already active" - current_app.logger.debug(log_data) - return try: sync([source]) metrics.send(f"{function}.success", 'counter', 1, metric_tags={"source": source}) @@ -405,16 +447,17 @@ def sync_source_destination(): We rely on account numbers to avoid duplicates. """ function = f"{__name__}.{sys._getframe().f_code.co_name}" - log_data = { - "function": function, - "message": "syncing AWS destinations and sources", - } - task_id = None if celery.current_task: task_id = celery.current_task.request.id - if task_id and is_task_active(function, task_id, (id,)): + log_data = { + "function": function, + "message": "syncing AWS destinations and sources", + "task_id": task_id, + } + + if task_id and is_task_active(function, task_id, None): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return @@ -439,16 +482,17 @@ def certificate_reissue(): :return: """ function = f"{__name__}.{sys._getframe().f_code.co_name}" - log_data = { - "function": function, - "message": "reissuing certificates", - } - task_id = None if celery.current_task: task_id = celery.current_task.request.id - if task_id and is_task_active(function, task_id, (id,)): + log_data = { + "function": function, + "message": "reissuing certificates", + "task_id": task_id, + } + + if task_id and is_task_active(function, task_id, None): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return @@ -468,16 +512,18 @@ def certificate_rotate(): :return: """ function = f"{__name__}.{sys._getframe().f_code.co_name}" - log_data = { - "function": function, - "message": "rotating certificates", - } - task_id = None if celery.current_task: task_id = celery.current_task.request.id - if task_id and is_task_active(function, task_id, (id,)): + log_data = { + "function": function, + "message": "rotating certificates", + "task_id": task_id, + + } + + if task_id and is_task_active(function, task_id, None): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return @@ -497,16 +543,17 @@ def endpoints_expire(): :return: """ function = f"{__name__}.{sys._getframe().f_code.co_name}" - log_data = { - "function": function, - "message": "endpoints expire", - } - task_id = None if celery.current_task: task_id = celery.current_task.request.id - if task_id and is_task_active(function, task_id, (id,)): + log_data = { + "function": function, + "message": "endpoints expire", + "task_id": task_id, + } + + if task_id and is_task_active(function, task_id, None): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return @@ -524,16 +571,17 @@ def get_all_zones(): :return: """ function = f"{__name__}.{sys._getframe().f_code.co_name}" - log_data = { - "function": function, - "message": "refresh all zones from available DNS providers", - } - task_id = None if celery.current_task: task_id = celery.current_task.request.id - if task_id and is_task_active(function, task_id, (id,)): + log_data = { + "function": function, + "message": "refresh all zones from available DNS providers", + "task_id": task_id, + } + + if task_id and is_task_active(function, task_id, None): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return @@ -568,16 +616,17 @@ def notify_expirations(): :return: """ function = f"{__name__}.{sys._getframe().f_code.co_name}" - log_data = { - "function": function, - "message": "notify for cert expiration", - } - task_id = None if celery.current_task: task_id = celery.current_task.request.id - if task_id and is_task_active(function, task_id, (id,)): + log_data = { + "function": function, + "message": "notify for cert expiration", + "task_id": task_id, + } + + if task_id and is_task_active(function, task_id, None): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return