diff --git a/lemur/common/celery.py b/lemur/common/celery.py index f5edb9ab..e868585a 100644 --- a/lemur/common/celery.py +++ b/lemur/common/celery.py @@ -282,7 +282,7 @@ def clean_all_sources(): metrics.send(f"{function}.success", 'counter', 1) -@celery.task() +@celery.task(soft_time_limit=600) def clean_source(source): """ This celery task will clean the specified source. This is a destructive operation that will delete unused @@ -298,7 +298,13 @@ def clean_source(source): "source": source, } current_app.logger.debug(log_data) - clean([source], True) + try: + clean([source], True) + except SoftTimeLimitExceeded: + log_data["message"] = "Checking revoked: Time limit exceeded." + current_app.logger.error(log_data) + sentry.captureException() + metrics.send("clean_source_timeout", "counter", 1) @celery.task() @@ -391,7 +397,7 @@ def sync_source_destination(): metrics.send(f"{function}.success", 'counter', 1) -@celery.task() +@celery.task(soft_time_limit=3600) def certificate_reissue(): """ This celery task reissues certificates which are pending reissue @@ -403,14 +409,21 @@ def certificate_reissue(): "message": "reissuing certificates", } current_app.logger.debug(log_data) - cli_certificate.reissue(None, True) + try: + cli_certificate.reissue(None, True) + except SoftTimeLimitExceeded: + log_data["message"] = "Checking revoked: Time limit exceeded." + current_app.logger.error(log_data) + sentry.captureException() + metrics.send("certificate_reissue_timeout", "counter", 1) + return log_data["message"] = "reissuance completed" current_app.logger.debug(log_data) red.set(f'{function}.last_success', int(time.time())) metrics.send(f"{function}.success", 'counter', 1) -@celery.task() +@celery.task(soft_time_limit=3600) def certificate_rotate(): """ This celery task rotates certificates which are reissued but having endpoints attached to the replaced cert @@ -422,14 +435,21 @@ def certificate_rotate(): "message": "rotating certificates", } current_app.logger.debug(log_data) - cli_certificate.rotate(None, None, None, None, True) + try: + cli_certificate.rotate(None, None, None, None, True) + except SoftTimeLimitExceeded: + log_data["message"] = "Checking revoked: Time limit exceeded." + current_app.logger.error(log_data) + sentry.captureException() + metrics.send("certificate_rotate_timeout", "counter", 1) + return log_data["message"] = "rotation completed" current_app.logger.debug(log_data) red.set(f'{function}.last_success', int(time.time())) metrics.send(f"{function}.success", 'counter', 1) -@celery.task() +@celery.task(soft_time_limit=3600) def endpoints_expire(): """ This celery task removes all endpoints that have not been recently updated @@ -441,12 +461,19 @@ def endpoints_expire(): "message": "endpoints expire", } current_app.logger.debug(log_data) - cli_endpoints.expire(2) # Time in hours + try: + cli_endpoints.expire(2) # Time in hours + except SoftTimeLimitExceeded: + log_data["message"] = "Checking revoked: Time limit exceeded." + current_app.logger.error(log_data) + sentry.captureException() + metrics.send("endpoints_expire_timeout", "counter", 1) + return red.set(f'{function}.last_success', int(time.time())) metrics.send(f"{function}.success", 'counter', 1) -@celery.task() +@celery.task(soft_time_limit=600) def get_all_zones(): """ This celery syncs all zones from the available dns providers @@ -458,29 +485,58 @@ def get_all_zones(): "message": "refresh all zones from available DNS providers", } current_app.logger.debug(log_data) - cli_dns_providers.get_all_zones() + try: + cli_dns_providers.get_all_zones() + except SoftTimeLimitExceeded: + log_data["message"] = "Checking revoked: Time limit exceeded." + current_app.logger.error(log_data) + sentry.captureException() + metrics.send("get_all_zones_timeout", "counter", 1) + return red.set(f'{function}.last_success', int(time.time())) metrics.send(f"{function}.success", 'counter', 1) -@celery.task() +@celery.task(soft_time_limit=3600) def check_revoked(): """ This celery task attempts to check if any certs are expired :return: """ + + task_id = None + if celery.current_task: + task_id = celery.current_task.request.id + function = f"{__name__}.{sys._getframe().f_code.co_name}" log_data = { "function": function, "message": "check if any certificates are revoked revoked", } + + if task_id and is_task_active(function, task_id, (id,)): + log_data["message"] = "Skipping task: Task is already active" + current_app.logger.debug(log_data) + return + current_app.logger.debug(log_data) - cli_certificate.check_revoked() + try: + cli_certificate.check_revoked() + except SoftTimeLimitExceeded: + log_data["message"] = "Checking revoked: Time limit exceeded." + current_app.logger.error(log_data) + sentry.captureException() + metrics.send("check_revoked_timeout", "counter", 1) + return + red.set(f'{function}.last_success', int(time.time())) metrics.send(f"{function}.success", 'counter', 1) -@celery.task() +check_revoked() + + +@celery.task(soft_time_limit=3600) def notify_expirations(): """ This celery task notifies about expiring certs @@ -492,6 +548,13 @@ def notify_expirations(): "message": "notify for cert expiration", } current_app.logger.debug(log_data) - cli_notification.expirations(current_app.config.get("EXCLUDE_CN_FROM_NOTIFICATION", [])) + try: + cli_notification.expirations(current_app.config.get("EXCLUDE_CN_FROM_NOTIFICATION", [])) + except SoftTimeLimitExceeded: + log_data["message"] = "Checking revoked: Time limit exceeded." + current_app.logger.error(log_data) + sentry.captureException() + metrics.send("notify_expirations_timeout", "counter", 1) + return red.set(f'{function}.last_success', int(time.time())) metrics.send(f"{function}.success", 'counter', 1)