From 9a02230d63419285ef4836784462de3538ce4847 Mon Sep 17 00:00:00 2001 From: Hossein Shafagh Date: Wed, 7 Aug 2019 17:48:06 -0700 Subject: [PATCH 01/10] adding soft time outs for celery --- lemur/common/celery.py | 91 +++++++++++++++++++++++++++++++++++------- 1 file changed, 77 insertions(+), 14 deletions(-) diff --git a/lemur/common/celery.py b/lemur/common/celery.py index f5edb9ab..e868585a 100644 --- a/lemur/common/celery.py +++ b/lemur/common/celery.py @@ -282,7 +282,7 @@ def clean_all_sources(): metrics.send(f"{function}.success", 'counter', 1) -@celery.task() +@celery.task(soft_time_limit=600) def clean_source(source): """ This celery task will clean the specified source. This is a destructive operation that will delete unused @@ -298,7 +298,13 @@ def clean_source(source): "source": source, } current_app.logger.debug(log_data) - clean([source], True) + try: + clean([source], True) + except SoftTimeLimitExceeded: + log_data["message"] = "Checking revoked: Time limit exceeded." + current_app.logger.error(log_data) + sentry.captureException() + metrics.send("clean_source_timeout", "counter", 1) @celery.task() @@ -391,7 +397,7 @@ def sync_source_destination(): metrics.send(f"{function}.success", 'counter', 1) -@celery.task() +@celery.task(soft_time_limit=3600) def certificate_reissue(): """ This celery task reissues certificates which are pending reissue @@ -403,14 +409,21 @@ def certificate_reissue(): "message": "reissuing certificates", } current_app.logger.debug(log_data) - cli_certificate.reissue(None, True) + try: + cli_certificate.reissue(None, True) + except SoftTimeLimitExceeded: + log_data["message"] = "Checking revoked: Time limit exceeded." + current_app.logger.error(log_data) + sentry.captureException() + metrics.send("certificate_reissue_timeout", "counter", 1) + return log_data["message"] = "reissuance completed" current_app.logger.debug(log_data) red.set(f'{function}.last_success', int(time.time())) metrics.send(f"{function}.success", 'counter', 1) -@celery.task() +@celery.task(soft_time_limit=3600) def certificate_rotate(): """ This celery task rotates certificates which are reissued but having endpoints attached to the replaced cert @@ -422,14 +435,21 @@ def certificate_rotate(): "message": "rotating certificates", } current_app.logger.debug(log_data) - cli_certificate.rotate(None, None, None, None, True) + try: + cli_certificate.rotate(None, None, None, None, True) + except SoftTimeLimitExceeded: + log_data["message"] = "Checking revoked: Time limit exceeded." + current_app.logger.error(log_data) + sentry.captureException() + metrics.send("certificate_rotate_timeout", "counter", 1) + return log_data["message"] = "rotation completed" current_app.logger.debug(log_data) red.set(f'{function}.last_success', int(time.time())) metrics.send(f"{function}.success", 'counter', 1) -@celery.task() +@celery.task(soft_time_limit=3600) def endpoints_expire(): """ This celery task removes all endpoints that have not been recently updated @@ -441,12 +461,19 @@ def endpoints_expire(): "message": "endpoints expire", } current_app.logger.debug(log_data) - cli_endpoints.expire(2) # Time in hours + try: + cli_endpoints.expire(2) # Time in hours + except SoftTimeLimitExceeded: + log_data["message"] = "Checking revoked: Time limit exceeded." + current_app.logger.error(log_data) + sentry.captureException() + metrics.send("endpoints_expire_timeout", "counter", 1) + return red.set(f'{function}.last_success', int(time.time())) metrics.send(f"{function}.success", 'counter', 1) -@celery.task() +@celery.task(soft_time_limit=600) def get_all_zones(): """ This celery syncs all zones from the available dns providers @@ -458,29 +485,58 @@ def get_all_zones(): "message": "refresh all zones from available DNS providers", } current_app.logger.debug(log_data) - cli_dns_providers.get_all_zones() + try: + cli_dns_providers.get_all_zones() + except SoftTimeLimitExceeded: + log_data["message"] = "Checking revoked: Time limit exceeded." + current_app.logger.error(log_data) + sentry.captureException() + metrics.send("get_all_zones_timeout", "counter", 1) + return red.set(f'{function}.last_success', int(time.time())) metrics.send(f"{function}.success", 'counter', 1) -@celery.task() +@celery.task(soft_time_limit=3600) def check_revoked(): """ This celery task attempts to check if any certs are expired :return: """ + + task_id = None + if celery.current_task: + task_id = celery.current_task.request.id + function = f"{__name__}.{sys._getframe().f_code.co_name}" log_data = { "function": function, "message": "check if any certificates are revoked revoked", } + + if task_id and is_task_active(function, task_id, (id,)): + log_data["message"] = "Skipping task: Task is already active" + current_app.logger.debug(log_data) + return + current_app.logger.debug(log_data) - cli_certificate.check_revoked() + try: + cli_certificate.check_revoked() + except SoftTimeLimitExceeded: + log_data["message"] = "Checking revoked: Time limit exceeded." + current_app.logger.error(log_data) + sentry.captureException() + metrics.send("check_revoked_timeout", "counter", 1) + return + red.set(f'{function}.last_success', int(time.time())) metrics.send(f"{function}.success", 'counter', 1) -@celery.task() +check_revoked() + + +@celery.task(soft_time_limit=3600) def notify_expirations(): """ This celery task notifies about expiring certs @@ -492,6 +548,13 @@ def notify_expirations(): "message": "notify for cert expiration", } current_app.logger.debug(log_data) - cli_notification.expirations(current_app.config.get("EXCLUDE_CN_FROM_NOTIFICATION", [])) + try: + cli_notification.expirations(current_app.config.get("EXCLUDE_CN_FROM_NOTIFICATION", [])) + except SoftTimeLimitExceeded: + log_data["message"] = "Checking revoked: Time limit exceeded." + current_app.logger.error(log_data) + sentry.captureException() + metrics.send("notify_expirations_timeout", "counter", 1) + return red.set(f'{function}.last_success', int(time.time())) metrics.send(f"{function}.success", 'counter', 1) From 3b9b94623fc0571e3777232906049b4c857e058f Mon Sep 17 00:00:00 2001 From: Hossein Shafagh Date: Wed, 7 Aug 2019 18:06:59 -0700 Subject: [PATCH 02/10] cleaning up --- lemur/common/celery.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/lemur/common/celery.py b/lemur/common/celery.py index e868585a..dfeb7017 100644 --- a/lemur/common/celery.py +++ b/lemur/common/celery.py @@ -533,9 +533,6 @@ def check_revoked(): metrics.send(f"{function}.success", 'counter', 1) -check_revoked() - - @celery.task(soft_time_limit=3600) def notify_expirations(): """ From bf47f87c215f9c6042374ddf3a43f5f4bbc24d43 Mon Sep 17 00:00:00 2001 From: Hossein Shafagh Date: Mon, 12 Aug 2019 13:52:01 -0700 Subject: [PATCH 03/10] preventing celery duplicate tasks --- lemur/common/celery.py | 91 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) diff --git a/lemur/common/celery.py b/lemur/common/celery.py index b19a9607..a79ec838 100644 --- a/lemur/common/celery.py +++ b/lemur/common/celery.py @@ -248,6 +248,15 @@ def remove_old_acme_certs(): } pending_certs = pending_certificate_service.get_pending_certs("all") + task_id = None + if celery.current_task: + task_id = celery.current_task.request.id + + if task_id and is_task_active(function, task_id, (id,)): + log_data["message"] = "Skipping task: Task is already active" + current_app.logger.debug(log_data) + return + # Delete pending certs more than a week old for cert in pending_certs: if datetime.now(timezone.utc) - cert.last_updated > timedelta(days=7): @@ -311,6 +320,17 @@ def sync_all_sources(): "function": function, "message": "creating celery task to sync source", } + + task_id = None + if celery.current_task: + task_id = celery.current_task.request.id + + if task_id and is_task_active(function, task_id, (id,)): + log_data["message"] = "Skipping task: Task is already active" + current_app.logger.debug(log_data) + return + + sources = validate_sources("all") for source in sources: log_data["source"] = source.label @@ -340,6 +360,17 @@ def sync_source(source): "source": source, "task_id": task_id, } + + task_id = None + if celery.current_task: + task_id = celery.current_task.request.id + + if task_id and is_task_active(function, task_id, (id,)): + log_data["message"] = "Skipping task: Task is already active" + current_app.logger.debug(log_data) + return + + current_app.logger.debug(log_data) if task_id and is_task_active(function, task_id, (source,)): @@ -378,6 +409,16 @@ def sync_source_destination(): "function": function, "message": "syncing AWS destinations and sources", } + + task_id = None + if celery.current_task: + task_id = celery.current_task.request.id + + if task_id and is_task_active(function, task_id, (id,)): + log_data["message"] = "Skipping task: Task is already active" + current_app.logger.debug(log_data) + return + current_app.logger.debug(log_data) for dst in destinations_service.get_all(): if add_aws_destination_to_sources(dst): @@ -402,6 +443,16 @@ def certificate_reissue(): "function": function, "message": "reissuing certificates", } + + task_id = None + if celery.current_task: + task_id = celery.current_task.request.id + + if task_id and is_task_active(function, task_id, (id,)): + log_data["message"] = "Skipping task: Task is already active" + current_app.logger.debug(log_data) + return + current_app.logger.debug(log_data) cli_certificate.reissue(None, True) log_data["message"] = "reissuance completed" @@ -421,6 +472,16 @@ def certificate_rotate(): "function": function, "message": "rotating certificates", } + + task_id = None + if celery.current_task: + task_id = celery.current_task.request.id + + if task_id and is_task_active(function, task_id, (id,)): + log_data["message"] = "Skipping task: Task is already active" + current_app.logger.debug(log_data) + return + current_app.logger.debug(log_data) cli_certificate.rotate(None, None, None, None, True) log_data["message"] = "rotation completed" @@ -440,6 +501,16 @@ def endpoints_expire(): "function": function, "message": "endpoints expire", } + + task_id = None + if celery.current_task: + task_id = celery.current_task.request.id + + if task_id and is_task_active(function, task_id, (id,)): + log_data["message"] = "Skipping task: Task is already active" + current_app.logger.debug(log_data) + return + current_app.logger.debug(log_data) cli_endpoints.expire(2) # Time in hours red.set(f'{function}.last_success', int(time.time())) @@ -457,6 +528,16 @@ def get_all_zones(): "function": function, "message": "refresh all zones from available DNS providers", } + + task_id = None + if celery.current_task: + task_id = celery.current_task.request.id + + if task_id and is_task_active(function, task_id, (id,)): + log_data["message"] = "Skipping task: Task is already active" + current_app.logger.debug(log_data) + return + current_app.logger.debug(log_data) cli_dns_providers.get_all_zones() red.set(f'{function}.last_success', int(time.time())) @@ -491,6 +572,16 @@ def notify_expirations(): "function": function, "message": "notify for cert expiration", } + + task_id = None + if celery.current_task: + task_id = celery.current_task.request.id + + if task_id and is_task_active(function, task_id, (id,)): + log_data["message"] = "Skipping task: Task is already active" + current_app.logger.debug(log_data) + return + current_app.logger.debug(log_data) cli_notification.expirations(current_app.config.get("EXCLUDE_CN_FROM_NOTIFICATION", [])) red.set(f'{function}.last_success', int(time.time())) From 07a9c56fb86b97c07a04b67fe576540d961cfb7b Mon Sep 17 00:00:00 2001 From: Hossein Shafagh Date: Tue, 13 Aug 2019 09:35:57 -0700 Subject: [PATCH 04/10] making lint happy --- lemur/common/celery.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/lemur/common/celery.py b/lemur/common/celery.py index a79ec838..bcd7b580 100644 --- a/lemur/common/celery.py +++ b/lemur/common/celery.py @@ -31,7 +31,6 @@ from lemur.dns_providers import cli as cli_dns_providers from lemur.notifications import cli as cli_notification from lemur.endpoints import cli as cli_endpoints - if current_app: flask_app = current_app else: @@ -256,7 +255,7 @@ def remove_old_acme_certs(): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return - + # Delete pending certs more than a week old for cert in pending_certs: if datetime.now(timezone.utc) - cert.last_updated > timedelta(days=7): @@ -330,7 +329,6 @@ def sync_all_sources(): current_app.logger.debug(log_data) return - sources = validate_sources("all") for source in sources: log_data["source"] = source.label @@ -370,7 +368,6 @@ def sync_source(source): current_app.logger.debug(log_data) return - current_app.logger.debug(log_data) if task_id and is_task_active(function, task_id, (source,)): From 4d728738eeb2bcbc7afa4a30d15f6d7656674873 Mon Sep 17 00:00:00 2001 From: Hossein Shafagh Date: Tue, 13 Aug 2019 11:42:43 -0700 Subject: [PATCH 05/10] handling celery tasks without any arguments --- lemur/common/celery.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lemur/common/celery.py b/lemur/common/celery.py index bcd7b580..fa739029 100644 --- a/lemur/common/celery.py +++ b/lemur/common/celery.py @@ -65,6 +65,9 @@ celery = make_celery(flask_app) def is_task_active(fun, task_id, args): from celery.task.control import inspect + if not args: + args = '()' # empty args + i = inspect() active_tasks = i.active() for _, tasks in active_tasks.items(): From c29f2825607df669ff67d3713fcfc4feb3fa96bc Mon Sep 17 00:00:00 2001 From: Hossein Shafagh Date: Tue, 13 Aug 2019 11:52:56 -0700 Subject: [PATCH 06/10] improved the flow for checking if the task is active --- lemur/common/celery.py | 169 ++++++++++++++++++++++++++--------------- 1 file changed, 109 insertions(+), 60 deletions(-) diff --git a/lemur/common/celery.py b/lemur/common/celery.py index fa739029..a37f96e5 100644 --- a/lemur/common/celery.py +++ b/lemur/common/celery.py @@ -91,6 +91,21 @@ def report_celery_last_success_metrics(): """ function = f"{__name__}.{sys._getframe().f_code.co_name}" + task_id = None + if celery.current_task: + task_id = celery.current_task.request.id + + log_data = { + "function": function, + "message": "recurrent task", + "task_id": task_id, + } + + if task_id and is_task_active(function, task_id, None): + log_data["message"] = "Skipping task: Task is already active" + current_app.logger.debug(log_data) + return + current_time = int(time.time()) schedule = current_app.config.get('CELERYBEAT_SCHEDULE') for _, t in schedule.items(): @@ -215,15 +230,25 @@ def fetch_acme_cert(id): @celery.task() def fetch_all_pending_acme_certs(): """Instantiate celery workers to resolve all pending Acme certificates""" - pending_certs = pending_certificate_service.get_unresolved_pending_certs() function = f"{__name__}.{sys._getframe().f_code.co_name}" + task_id = None + if celery.current_task: + task_id = celery.current_task.request.id + log_data = { "function": function, "message": "Starting job.", + "task_id": task_id, } + if task_id and is_task_active(function, task_id, None): + log_data["message"] = "Skipping task: Task is already active" + current_app.logger.debug(log_data) + return + current_app.logger.debug(log_data) + pending_certs = pending_certificate_service.get_unresolved_pending_certs() # We only care about certs using the acme-issuer plugin for cert in pending_certs: @@ -244,21 +269,23 @@ def fetch_all_pending_acme_certs(): def remove_old_acme_certs(): """Prune old pending acme certificates from the database""" function = f"{__name__}.{sys._getframe().f_code.co_name}" - log_data = { - "function": function, - "message": "Starting job.", - } - pending_certs = pending_certificate_service.get_pending_certs("all") - task_id = None if celery.current_task: task_id = celery.current_task.request.id - if task_id and is_task_active(function, task_id, (id,)): + log_data = { + "function": function, + "message": "Starting job.", + "task_id": task_id, + } + + if task_id and is_task_active(function, task_id, None): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return + pending_certs = pending_certificate_service.get_pending_certs("all") + # Delete pending certs more than a week old for cert in pending_certs: if datetime.now(timezone.utc) - cert.last_updated > timedelta(days=7): @@ -279,10 +306,21 @@ def clean_all_sources(): be ran periodically. This function triggers one celery task per source. """ function = f"{__name__}.{sys._getframe().f_code.co_name}" + task_id = None + if celery.current_task: + task_id = celery.current_task.request.id + log_data = { "function": function, "message": "Creating celery task to clean source", + "task_id": task_id, } + + if task_id and is_task_active(function, task_id, None): + log_data["message"] = "Skipping task: Task is already active" + current_app.logger.debug(log_data) + return + sources = validate_sources("all") for source in sources: log_data["source"] = source.label @@ -303,11 +341,22 @@ def clean_source(source): :return: """ function = f"{__name__}.{sys._getframe().f_code.co_name}" + task_id = None + if celery.current_task: + task_id = celery.current_task.request.id + log_data = { "function": function, "message": "Cleaning source", "source": source, + "task_id": task_id, } + + if task_id and is_task_active(function, task_id, (source,)): + log_data["message"] = "Skipping task: Task is already active" + current_app.logger.debug(log_data) + return + current_app.logger.debug(log_data) clean([source], True) @@ -318,16 +367,17 @@ def sync_all_sources(): This function will sync certificates from all sources. This function triggers one celery task per source. """ function = f"{__name__}.{sys._getframe().f_code.co_name}" - log_data = { - "function": function, - "message": "creating celery task to sync source", - } - task_id = None if celery.current_task: task_id = celery.current_task.request.id - if task_id and is_task_active(function, task_id, (id,)): + log_data = { + "function": function, + "message": "creating celery task to sync source", + "task_id": task_id, + } + + if task_id and is_task_active(function, task_id, None): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return @@ -355,6 +405,7 @@ def sync_source(source): task_id = None if celery.current_task: task_id = celery.current_task.request.id + log_data = { "function": function, "message": "Syncing source", @@ -362,21 +413,12 @@ def sync_source(source): "task_id": task_id, } - task_id = None - if celery.current_task: - task_id = celery.current_task.request.id - - if task_id and is_task_active(function, task_id, (id,)): + if task_id and is_task_active(function, task_id, (source,)): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return current_app.logger.debug(log_data) - - if task_id and is_task_active(function, task_id, (source,)): - log_data["message"] = "Skipping task: Task is already active" - current_app.logger.debug(log_data) - return try: sync([source]) metrics.send(f"{function}.success", 'counter', 1, metric_tags={"source": source}) @@ -405,16 +447,17 @@ def sync_source_destination(): We rely on account numbers to avoid duplicates. """ function = f"{__name__}.{sys._getframe().f_code.co_name}" - log_data = { - "function": function, - "message": "syncing AWS destinations and sources", - } - task_id = None if celery.current_task: task_id = celery.current_task.request.id - if task_id and is_task_active(function, task_id, (id,)): + log_data = { + "function": function, + "message": "syncing AWS destinations and sources", + "task_id": task_id, + } + + if task_id and is_task_active(function, task_id, None): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return @@ -439,16 +482,17 @@ def certificate_reissue(): :return: """ function = f"{__name__}.{sys._getframe().f_code.co_name}" - log_data = { - "function": function, - "message": "reissuing certificates", - } - task_id = None if celery.current_task: task_id = celery.current_task.request.id - if task_id and is_task_active(function, task_id, (id,)): + log_data = { + "function": function, + "message": "reissuing certificates", + "task_id": task_id, + } + + if task_id and is_task_active(function, task_id, None): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return @@ -468,16 +512,18 @@ def certificate_rotate(): :return: """ function = f"{__name__}.{sys._getframe().f_code.co_name}" - log_data = { - "function": function, - "message": "rotating certificates", - } - task_id = None if celery.current_task: task_id = celery.current_task.request.id - if task_id and is_task_active(function, task_id, (id,)): + log_data = { + "function": function, + "message": "rotating certificates", + "task_id": task_id, + + } + + if task_id and is_task_active(function, task_id, None): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return @@ -497,16 +543,17 @@ def endpoints_expire(): :return: """ function = f"{__name__}.{sys._getframe().f_code.co_name}" - log_data = { - "function": function, - "message": "endpoints expire", - } - task_id = None if celery.current_task: task_id = celery.current_task.request.id - if task_id and is_task_active(function, task_id, (id,)): + log_data = { + "function": function, + "message": "endpoints expire", + "task_id": task_id, + } + + if task_id and is_task_active(function, task_id, None): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return @@ -524,16 +571,17 @@ def get_all_zones(): :return: """ function = f"{__name__}.{sys._getframe().f_code.co_name}" - log_data = { - "function": function, - "message": "refresh all zones from available DNS providers", - } - task_id = None if celery.current_task: task_id = celery.current_task.request.id - if task_id and is_task_active(function, task_id, (id,)): + log_data = { + "function": function, + "message": "refresh all zones from available DNS providers", + "task_id": task_id, + } + + if task_id and is_task_active(function, task_id, None): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return @@ -568,16 +616,17 @@ def notify_expirations(): :return: """ function = f"{__name__}.{sys._getframe().f_code.co_name}" - log_data = { - "function": function, - "message": "notify for cert expiration", - } - task_id = None if celery.current_task: task_id = celery.current_task.request.id - if task_id and is_task_active(function, task_id, (id,)): + log_data = { + "function": function, + "message": "notify for cert expiration", + "task_id": task_id, + } + + if task_id and is_task_active(function, task_id, None): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return From a3dfc3ef0ad9c43fab1862a6c5b4361096b452ff Mon Sep 17 00:00:00 2001 From: Hossein Shafagh Date: Tue, 13 Aug 2019 11:58:58 -0700 Subject: [PATCH 07/10] consistency --- lemur/common/celery.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lemur/common/celery.py b/lemur/common/celery.py index dfeb7017..acb20081 100644 --- a/lemur/common/celery.py +++ b/lemur/common/celery.py @@ -503,18 +503,18 @@ def check_revoked(): This celery task attempts to check if any certs are expired :return: """ - + function = f"{__name__}.{sys._getframe().f_code.co_name}" task_id = None if celery.current_task: task_id = celery.current_task.request.id - function = f"{__name__}.{sys._getframe().f_code.co_name}" log_data = { "function": function, "message": "check if any certificates are revoked revoked", + "task_id": task_id, } - if task_id and is_task_active(function, task_id, (id,)): + if task_id and is_task_active(function, task_id, None): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return From 22c60fedad9047edf22d4126442e5ed48a338a70 Mon Sep 17 00:00:00 2001 From: Hossein Shafagh Date: Tue, 13 Aug 2019 12:11:04 -0700 Subject: [PATCH 08/10] cosmetics --- lemur/common/celery.py | 1 + 1 file changed, 1 insertion(+) diff --git a/lemur/common/celery.py b/lemur/common/celery.py index a37f96e5..06b93a31 100644 --- a/lemur/common/celery.py +++ b/lemur/common/celery.py @@ -31,6 +31,7 @@ from lemur.dns_providers import cli as cli_dns_providers from lemur.notifications import cli as cli_notification from lemur.endpoints import cli as cli_endpoints + if current_app: flask_app = current_app else: From 6e17d36d76b228d878ecab1c2f983de25237080f Mon Sep 17 00:00:00 2001 From: Hossein Shafagh Date: Tue, 13 Aug 2019 12:16:23 -0700 Subject: [PATCH 09/10] typos --- lemur/common/celery.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lemur/common/celery.py b/lemur/common/celery.py index acb20081..e36d8b35 100644 --- a/lemur/common/celery.py +++ b/lemur/common/celery.py @@ -301,7 +301,7 @@ def clean_source(source): try: clean([source], True) except SoftTimeLimitExceeded: - log_data["message"] = "Checking revoked: Time limit exceeded." + log_data["message"] = "Clean source: Time limit exceeded." current_app.logger.error(log_data) sentry.captureException() metrics.send("clean_source_timeout", "counter", 1) @@ -412,7 +412,7 @@ def certificate_reissue(): try: cli_certificate.reissue(None, True) except SoftTimeLimitExceeded: - log_data["message"] = "Checking revoked: Time limit exceeded." + log_data["message"] = "Certificate reissue: Time limit exceeded." current_app.logger.error(log_data) sentry.captureException() metrics.send("certificate_reissue_timeout", "counter", 1) @@ -438,7 +438,7 @@ def certificate_rotate(): try: cli_certificate.rotate(None, None, None, None, True) except SoftTimeLimitExceeded: - log_data["message"] = "Checking revoked: Time limit exceeded." + log_data["message"] = "Certificate rotate: Time limit exceeded." current_app.logger.error(log_data) sentry.captureException() metrics.send("certificate_rotate_timeout", "counter", 1) @@ -464,7 +464,7 @@ def endpoints_expire(): try: cli_endpoints.expire(2) # Time in hours except SoftTimeLimitExceeded: - log_data["message"] = "Checking revoked: Time limit exceeded." + log_data["message"] = "endpoint expire: Time limit exceeded." current_app.logger.error(log_data) sentry.captureException() metrics.send("endpoints_expire_timeout", "counter", 1) @@ -488,7 +488,7 @@ def get_all_zones(): try: cli_dns_providers.get_all_zones() except SoftTimeLimitExceeded: - log_data["message"] = "Checking revoked: Time limit exceeded." + log_data["message"] = "get all zones: Time limit exceeded." current_app.logger.error(log_data) sentry.captureException() metrics.send("get_all_zones_timeout", "counter", 1) @@ -548,7 +548,7 @@ def notify_expirations(): try: cli_notification.expirations(current_app.config.get("EXCLUDE_CN_FROM_NOTIFICATION", [])) except SoftTimeLimitExceeded: - log_data["message"] = "Checking revoked: Time limit exceeded." + log_data["message"] = "Notify expiring Time limit exceeded." current_app.logger.error(log_data) sentry.captureException() metrics.send("notify_expirations_timeout", "counter", 1) From 2de3f287ab3ace80e6fb750e12d6f4b990bb02c5 Mon Sep 17 00:00:00 2001 From: Hossein Shafagh Date: Tue, 13 Aug 2019 12:21:27 -0700 Subject: [PATCH 10/10] standardizing the timeouts to easier monitor any timeouts --- lemur/common/celery.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/lemur/common/celery.py b/lemur/common/celery.py index e36d8b35..a3f9cc5f 100644 --- a/lemur/common/celery.py +++ b/lemur/common/celery.py @@ -304,7 +304,7 @@ def clean_source(source): log_data["message"] = "Clean source: Time limit exceeded." current_app.logger.error(log_data) sentry.captureException() - metrics.send("clean_source_timeout", "counter", 1) + metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) @celery.task() @@ -359,9 +359,8 @@ def sync_source(source): log_data["message"] = "Error syncing source: Time limit exceeded." current_app.logger.error(log_data) sentry.captureException() - metrics.send( - "sync_source_timeout", "counter", 1, metric_tags={"source": source} - ) + metrics.send("sync_source_timeout", "counter", 1, metric_tags={"source": source}) + metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) return log_data["message"] = "Done syncing source" @@ -415,8 +414,9 @@ def certificate_reissue(): log_data["message"] = "Certificate reissue: Time limit exceeded." current_app.logger.error(log_data) sentry.captureException() - metrics.send("certificate_reissue_timeout", "counter", 1) + metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) return + log_data["message"] = "reissuance completed" current_app.logger.debug(log_data) red.set(f'{function}.last_success', int(time.time())) @@ -441,8 +441,9 @@ def certificate_rotate(): log_data["message"] = "Certificate rotate: Time limit exceeded." current_app.logger.error(log_data) sentry.captureException() - metrics.send("certificate_rotate_timeout", "counter", 1) + metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) return + log_data["message"] = "rotation completed" current_app.logger.debug(log_data) red.set(f'{function}.last_success', int(time.time())) @@ -467,8 +468,9 @@ def endpoints_expire(): log_data["message"] = "endpoint expire: Time limit exceeded." current_app.logger.error(log_data) sentry.captureException() - metrics.send("endpoints_expire_timeout", "counter", 1) + metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) return + red.set(f'{function}.last_success', int(time.time())) metrics.send(f"{function}.success", 'counter', 1) @@ -491,8 +493,9 @@ def get_all_zones(): log_data["message"] = "get all zones: Time limit exceeded." current_app.logger.error(log_data) sentry.captureException() - metrics.send("get_all_zones_timeout", "counter", 1) + metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) return + red.set(f'{function}.last_success', int(time.time())) metrics.send(f"{function}.success", 'counter', 1) @@ -526,7 +529,7 @@ def check_revoked(): log_data["message"] = "Checking revoked: Time limit exceeded." current_app.logger.error(log_data) sentry.captureException() - metrics.send("check_revoked_timeout", "counter", 1) + metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) return red.set(f'{function}.last_success', int(time.time())) @@ -551,7 +554,8 @@ def notify_expirations(): log_data["message"] = "Notify expiring Time limit exceeded." current_app.logger.error(log_data) sentry.captureException() - metrics.send("notify_expirations_timeout", "counter", 1) + metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) return + red.set(f'{function}.last_success', int(time.time())) metrics.send(f"{function}.success", 'counter', 1)