diff --git a/lemur/common/celery.py b/lemur/common/celery.py index b19a9607..4af33d86 100644 --- a/lemur/common/celery.py +++ b/lemur/common/celery.py @@ -66,6 +66,9 @@ celery = make_celery(flask_app) def is_task_active(fun, task_id, args): from celery.task.control import inspect + if not args: + args = '()' # empty args + i = inspect() active_tasks = i.active() for _, tasks in active_tasks.items(): @@ -89,6 +92,21 @@ def report_celery_last_success_metrics(): """ function = f"{__name__}.{sys._getframe().f_code.co_name}" + task_id = None + if celery.current_task: + task_id = celery.current_task.request.id + + log_data = { + "function": function, + "message": "recurrent task", + "task_id": task_id, + } + + if task_id and is_task_active(function, task_id, None): + log_data["message"] = "Skipping task: Task is already active" + current_app.logger.debug(log_data) + return + current_time = int(time.time()) schedule = current_app.config.get('CELERYBEAT_SCHEDULE') for _, t in schedule.items(): @@ -213,15 +231,25 @@ def fetch_acme_cert(id): @celery.task() def fetch_all_pending_acme_certs(): """Instantiate celery workers to resolve all pending Acme certificates""" - pending_certs = pending_certificate_service.get_unresolved_pending_certs() function = f"{__name__}.{sys._getframe().f_code.co_name}" + task_id = None + if celery.current_task: + task_id = celery.current_task.request.id + log_data = { "function": function, "message": "Starting job.", + "task_id": task_id, } + if task_id and is_task_active(function, task_id, None): + log_data["message"] = "Skipping task: Task is already active" + current_app.logger.debug(log_data) + return + current_app.logger.debug(log_data) + pending_certs = pending_certificate_service.get_unresolved_pending_certs() # We only care about certs using the acme-issuer plugin for cert in pending_certs: @@ -242,10 +270,21 @@ def fetch_all_pending_acme_certs(): def remove_old_acme_certs(): """Prune old pending acme certificates from the database""" function = f"{__name__}.{sys._getframe().f_code.co_name}" + task_id = None + if celery.current_task: + task_id = celery.current_task.request.id + log_data = { "function": function, "message": "Starting job.", + "task_id": task_id, } + + if task_id and is_task_active(function, task_id, None): + log_data["message"] = "Skipping task: Task is already active" + current_app.logger.debug(log_data) + return + pending_certs = pending_certificate_service.get_pending_certs("all") # Delete pending certs more than a week old @@ -268,10 +307,21 @@ def clean_all_sources(): be ran periodically. This function triggers one celery task per source. """ function = f"{__name__}.{sys._getframe().f_code.co_name}" + task_id = None + if celery.current_task: + task_id = celery.current_task.request.id + log_data = { "function": function, "message": "Creating celery task to clean source", + "task_id": task_id, } + + if task_id and is_task_active(function, task_id, None): + log_data["message"] = "Skipping task: Task is already active" + current_app.logger.debug(log_data) + return + sources = validate_sources("all") for source in sources: log_data["source"] = source.label @@ -282,7 +332,7 @@ def clean_all_sources(): metrics.send(f"{function}.success", 'counter', 1) -@celery.task() +@celery.task(soft_time_limit=600) def clean_source(source): """ This celery task will clean the specified source. This is a destructive operation that will delete unused @@ -292,13 +342,30 @@ def clean_source(source): :return: """ function = f"{__name__}.{sys._getframe().f_code.co_name}" + task_id = None + if celery.current_task: + task_id = celery.current_task.request.id + log_data = { "function": function, "message": "Cleaning source", "source": source, + "task_id": task_id, } + + if task_id and is_task_active(function, task_id, (source,)): + log_data["message"] = "Skipping task: Task is already active" + current_app.logger.debug(log_data) + return + current_app.logger.debug(log_data) - clean([source], True) + try: + clean([source], True) + except SoftTimeLimitExceeded: + log_data["message"] = "Clean source: Time limit exceeded." + current_app.logger.error(log_data) + sentry.captureException() + metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) @celery.task() @@ -307,10 +374,21 @@ def sync_all_sources(): This function will sync certificates from all sources. This function triggers one celery task per source. """ function = f"{__name__}.{sys._getframe().f_code.co_name}" + task_id = None + if celery.current_task: + task_id = celery.current_task.request.id + log_data = { "function": function, "message": "creating celery task to sync source", + "task_id": task_id, } + + if task_id and is_task_active(function, task_id, None): + log_data["message"] = "Skipping task: Task is already active" + current_app.logger.debug(log_data) + return + sources = validate_sources("all") for source in sources: log_data["source"] = source.label @@ -334,18 +412,20 @@ def sync_source(source): task_id = None if celery.current_task: task_id = celery.current_task.request.id + log_data = { "function": function, "message": "Syncing source", "source": source, "task_id": task_id, } - current_app.logger.debug(log_data) if task_id and is_task_active(function, task_id, (source,)): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return + + current_app.logger.debug(log_data) try: sync([source]) metrics.send(f"{function}.success", 'counter', 1, metric_tags={"source": source}) @@ -353,9 +433,8 @@ def sync_source(source): log_data["message"] = "Error syncing source: Time limit exceeded." current_app.logger.error(log_data) sentry.captureException() - metrics.send( - "sync_source_timeout", "counter", 1, metric_tags={"source": source} - ) + metrics.send("sync_source_timeout", "counter", 1, metric_tags={"source": source}) + metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) return log_data["message"] = "Done syncing source" @@ -374,10 +453,21 @@ def sync_source_destination(): We rely on account numbers to avoid duplicates. """ function = f"{__name__}.{sys._getframe().f_code.co_name}" + task_id = None + if celery.current_task: + task_id = celery.current_task.request.id + log_data = { "function": function, "message": "syncing AWS destinations and sources", + "task_id": task_id, } + + if task_id and is_task_active(function, task_id, None): + log_data["message"] = "Skipping task: Task is already active" + current_app.logger.debug(log_data) + return + current_app.logger.debug(log_data) for dst in destinations_service.get_all(): if add_aws_destination_to_sources(dst): @@ -391,107 +481,222 @@ def sync_source_destination(): metrics.send(f"{function}.success", 'counter', 1) -@celery.task() +@celery.task(soft_time_limit=3600) def certificate_reissue(): """ This celery task reissues certificates which are pending reissue :return: """ function = f"{__name__}.{sys._getframe().f_code.co_name}" + task_id = None + if celery.current_task: + task_id = celery.current_task.request.id + log_data = { "function": function, "message": "reissuing certificates", + "task_id": task_id, } + + if task_id and is_task_active(function, task_id, None): + log_data["message"] = "Skipping task: Task is already active" + current_app.logger.debug(log_data) + return + current_app.logger.debug(log_data) - cli_certificate.reissue(None, True) + try: + cli_certificate.reissue(None, True) + except SoftTimeLimitExceeded: + log_data["message"] = "Certificate reissue: Time limit exceeded." + current_app.logger.error(log_data) + sentry.captureException() + metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) + return + log_data["message"] = "reissuance completed" current_app.logger.debug(log_data) red.set(f'{function}.last_success', int(time.time())) metrics.send(f"{function}.success", 'counter', 1) -@celery.task() +@celery.task(soft_time_limit=3600) def certificate_rotate(): """ This celery task rotates certificates which are reissued but having endpoints attached to the replaced cert :return: """ function = f"{__name__}.{sys._getframe().f_code.co_name}" + task_id = None + if celery.current_task: + task_id = celery.current_task.request.id + log_data = { "function": function, "message": "rotating certificates", + "task_id": task_id, + } + + if task_id and is_task_active(function, task_id, None): + log_data["message"] = "Skipping task: Task is already active" + current_app.logger.debug(log_data) + return + current_app.logger.debug(log_data) - cli_certificate.rotate(None, None, None, None, True) + try: + cli_certificate.rotate(None, None, None, None, True) + except SoftTimeLimitExceeded: + log_data["message"] = "Certificate rotate: Time limit exceeded." + current_app.logger.error(log_data) + sentry.captureException() + metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) + return + log_data["message"] = "rotation completed" current_app.logger.debug(log_data) red.set(f'{function}.last_success', int(time.time())) metrics.send(f"{function}.success", 'counter', 1) -@celery.task() +@celery.task(soft_time_limit=3600) def endpoints_expire(): """ This celery task removes all endpoints that have not been recently updated :return: """ function = f"{__name__}.{sys._getframe().f_code.co_name}" + task_id = None + if celery.current_task: + task_id = celery.current_task.request.id + log_data = { "function": function, "message": "endpoints expire", + "task_id": task_id, } + + if task_id and is_task_active(function, task_id, None): + log_data["message"] = "Skipping task: Task is already active" + current_app.logger.debug(log_data) + return + current_app.logger.debug(log_data) - cli_endpoints.expire(2) # Time in hours + try: + cli_endpoints.expire(2) # Time in hours + except SoftTimeLimitExceeded: + log_data["message"] = "endpoint expire: Time limit exceeded." + current_app.logger.error(log_data) + sentry.captureException() + metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) + return + red.set(f'{function}.last_success', int(time.time())) metrics.send(f"{function}.success", 'counter', 1) -@celery.task() +@celery.task(soft_time_limit=600) def get_all_zones(): """ This celery syncs all zones from the available dns providers :return: """ function = f"{__name__}.{sys._getframe().f_code.co_name}" + task_id = None + if celery.current_task: + task_id = celery.current_task.request.id + log_data = { "function": function, "message": "refresh all zones from available DNS providers", + "task_id": task_id, } + + if task_id and is_task_active(function, task_id, None): + log_data["message"] = "Skipping task: Task is already active" + current_app.logger.debug(log_data) + return + current_app.logger.debug(log_data) - cli_dns_providers.get_all_zones() + try: + cli_dns_providers.get_all_zones() + except SoftTimeLimitExceeded: + log_data["message"] = "get all zones: Time limit exceeded." + current_app.logger.error(log_data) + sentry.captureException() + metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) + return + red.set(f'{function}.last_success', int(time.time())) metrics.send(f"{function}.success", 'counter', 1) -@celery.task() +@celery.task(soft_time_limit=3600) def check_revoked(): """ This celery task attempts to check if any certs are expired :return: """ function = f"{__name__}.{sys._getframe().f_code.co_name}" + task_id = None + if celery.current_task: + task_id = celery.current_task.request.id + log_data = { "function": function, "message": "check if any certificates are revoked revoked", + "task_id": task_id, } + + if task_id and is_task_active(function, task_id, None): + log_data["message"] = "Skipping task: Task is already active" + current_app.logger.debug(log_data) + return + current_app.logger.debug(log_data) - cli_certificate.check_revoked() + try: + cli_certificate.check_revoked() + except SoftTimeLimitExceeded: + log_data["message"] = "Checking revoked: Time limit exceeded." + current_app.logger.error(log_data) + sentry.captureException() + metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) + return + red.set(f'{function}.last_success', int(time.time())) metrics.send(f"{function}.success", 'counter', 1) -@celery.task() +@celery.task(soft_time_limit=3600) def notify_expirations(): """ This celery task notifies about expiring certs :return: """ function = f"{__name__}.{sys._getframe().f_code.co_name}" + task_id = None + if celery.current_task: + task_id = celery.current_task.request.id + log_data = { "function": function, "message": "notify for cert expiration", + "task_id": task_id, } + + if task_id and is_task_active(function, task_id, None): + log_data["message"] = "Skipping task: Task is already active" + current_app.logger.debug(log_data) + return + current_app.logger.debug(log_data) - cli_notification.expirations(current_app.config.get("EXCLUDE_CN_FROM_NOTIFICATION", [])) + try: + cli_notification.expirations(current_app.config.get("EXCLUDE_CN_FROM_NOTIFICATION", [])) + except SoftTimeLimitExceeded: + log_data["message"] = "Notify expiring Time limit exceeded." + current_app.logger.error(log_data) + sentry.captureException() + metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) + return + red.set(f'{function}.last_success', int(time.time())) metrics.send(f"{function}.success", 'counter', 1)