Add soft timeouts to celery jobs; Check for PEM in LE order
This commit is contained in:
@ -12,9 +12,11 @@ import sys
|
||||
from datetime import datetime, timezone, timedelta
|
||||
|
||||
from celery import Celery
|
||||
from celery.exceptions import SoftTimeLimitExceeded
|
||||
from flask import current_app
|
||||
|
||||
from lemur.authorities.service import get as get_authority
|
||||
from lemur.extensions import metrics, sentry
|
||||
from lemur.factory import create_app
|
||||
from lemur.notifications.messaging import send_pending_failure_notification
|
||||
from lemur.pending_certificates import service as pending_certificate_service
|
||||
@ -62,7 +64,7 @@ def is_task_active(fun, task_id, args):
|
||||
return False
|
||||
|
||||
|
||||
@celery.task()
|
||||
@celery.task(soft_time_limit=600)
|
||||
def fetch_acme_cert(id):
|
||||
"""
|
||||
Attempt to get the full certificate for the pending certificate listed.
|
||||
@ -70,11 +72,24 @@ def fetch_acme_cert(id):
|
||||
Args:
|
||||
id: an id of a PendingCertificate
|
||||
"""
|
||||
task_id = None
|
||||
if celery.current_task:
|
||||
task_id = celery.current_task.request.id
|
||||
|
||||
log_data = {
|
||||
"function": "{}.{}".format(__name__, sys._getframe().f_code.co_name),
|
||||
"message": "Resolving pending certificate {}".format(id)
|
||||
"message": "Resolving pending certificate {}".format(id),
|
||||
"task_id": task_id,
|
||||
"id": id,
|
||||
}
|
||||
|
||||
current_app.logger.debug(log_data)
|
||||
|
||||
if task_id and is_task_active(log_data["function"], task_id, (id,)):
|
||||
log_data["message"] = "Skipping task: Task is already active"
|
||||
current_app.logger.debug(log_data)
|
||||
return
|
||||
|
||||
pending_certs = pending_certificate_service.get_pending_certs([id])
|
||||
new = 0
|
||||
failed = 0
|
||||
@ -192,7 +207,7 @@ def remove_old_acme_certs():
|
||||
log_data['pending_cert_name'] = cert.name
|
||||
log_data['message'] = "Deleting pending certificate"
|
||||
current_app.logger.debug(log_data)
|
||||
pending_certificate_service.delete(cert.id)
|
||||
pending_certificate_service.delete(cert)
|
||||
|
||||
|
||||
@celery.task()
|
||||
@ -231,7 +246,7 @@ def sync_all_sources():
|
||||
sync_source.delay(source.label)
|
||||
|
||||
|
||||
@celery.task()
|
||||
@celery.task(soft_time_limit=3600)
|
||||
def sync_source(source):
|
||||
"""
|
||||
This celery task will sync the specified source.
|
||||
@ -241,7 +256,9 @@ def sync_source(source):
|
||||
"""
|
||||
|
||||
function = "{}.{}".format(__name__, sys._getframe().f_code.co_name)
|
||||
task_id = celery.current_task.request.id
|
||||
task_id = None
|
||||
if celery.current_task:
|
||||
task_id = celery.current_task.request.id
|
||||
log_data = {
|
||||
"function": function,
|
||||
"message": "Syncing source",
|
||||
@ -250,11 +267,18 @@ def sync_source(source):
|
||||
}
|
||||
current_app.logger.debug(log_data)
|
||||
|
||||
if is_task_active(function, task_id, (source,)):
|
||||
if task_id and is_task_active(function, task_id, (source,)):
|
||||
log_data["message"] = "Skipping task: Task is already active"
|
||||
current_app.logger.debug(log_data)
|
||||
return
|
||||
sync([source])
|
||||
try:
|
||||
sync([source])
|
||||
except SoftTimeLimitExceeded:
|
||||
log_data["message"] = "Error syncing source: Time limit exceeded."
|
||||
sentry.captureException()
|
||||
metrics.send('sync_source_timeout', 'counter', 1, metric_tags={'source': source})
|
||||
return
|
||||
|
||||
log_data["message"] = "Done syncing source"
|
||||
current_app.logger.debug(log_data)
|
||||
|
||||
|
Reference in New Issue
Block a user