improved the flow for checking if the task is active

This commit is contained in:
Hossein Shafagh 2019-08-13 11:52:56 -07:00
parent 4d728738ee
commit c29f282560
1 changed files with 109 additions and 60 deletions

View File

@ -91,6 +91,21 @@ def report_celery_last_success_metrics():
""" """
function = f"{__name__}.{sys._getframe().f_code.co_name}" function = f"{__name__}.{sys._getframe().f_code.co_name}"
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
log_data = {
"function": function,
"message": "recurrent task",
"task_id": task_id,
}
if task_id and is_task_active(function, task_id, None):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
current_time = int(time.time()) current_time = int(time.time())
schedule = current_app.config.get('CELERYBEAT_SCHEDULE') schedule = current_app.config.get('CELERYBEAT_SCHEDULE')
for _, t in schedule.items(): for _, t in schedule.items():
@ -215,15 +230,25 @@ def fetch_acme_cert(id):
@celery.task() @celery.task()
def fetch_all_pending_acme_certs(): def fetch_all_pending_acme_certs():
"""Instantiate celery workers to resolve all pending Acme certificates""" """Instantiate celery workers to resolve all pending Acme certificates"""
pending_certs = pending_certificate_service.get_unresolved_pending_certs()
function = f"{__name__}.{sys._getframe().f_code.co_name}" function = f"{__name__}.{sys._getframe().f_code.co_name}"
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
log_data = { log_data = {
"function": function, "function": function,
"message": "Starting job.", "message": "Starting job.",
"task_id": task_id,
} }
if task_id and is_task_active(function, task_id, None):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
return
current_app.logger.debug(log_data)
pending_certs = pending_certificate_service.get_unresolved_pending_certs()
# We only care about certs using the acme-issuer plugin # We only care about certs using the acme-issuer plugin
for cert in pending_certs: for cert in pending_certs:
@ -244,21 +269,23 @@ def fetch_all_pending_acme_certs():
def remove_old_acme_certs(): def remove_old_acme_certs():
"""Prune old pending acme certificates from the database""" """Prune old pending acme certificates from the database"""
function = f"{__name__}.{sys._getframe().f_code.co_name}" function = f"{__name__}.{sys._getframe().f_code.co_name}"
log_data = {
"function": function,
"message": "Starting job.",
}
pending_certs = pending_certificate_service.get_pending_certs("all")
task_id = None task_id = None
if celery.current_task: if celery.current_task:
task_id = celery.current_task.request.id task_id = celery.current_task.request.id
if task_id and is_task_active(function, task_id, (id,)): log_data = {
"function": function,
"message": "Starting job.",
"task_id": task_id,
}
if task_id and is_task_active(function, task_id, None):
log_data["message"] = "Skipping task: Task is already active" log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
return return
pending_certs = pending_certificate_service.get_pending_certs("all")
# Delete pending certs more than a week old # Delete pending certs more than a week old
for cert in pending_certs: for cert in pending_certs:
if datetime.now(timezone.utc) - cert.last_updated > timedelta(days=7): if datetime.now(timezone.utc) - cert.last_updated > timedelta(days=7):
@ -279,10 +306,21 @@ def clean_all_sources():
be ran periodically. This function triggers one celery task per source. be ran periodically. This function triggers one celery task per source.
""" """
function = f"{__name__}.{sys._getframe().f_code.co_name}" function = f"{__name__}.{sys._getframe().f_code.co_name}"
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
log_data = { log_data = {
"function": function, "function": function,
"message": "Creating celery task to clean source", "message": "Creating celery task to clean source",
"task_id": task_id,
} }
if task_id and is_task_active(function, task_id, None):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
sources = validate_sources("all") sources = validate_sources("all")
for source in sources: for source in sources:
log_data["source"] = source.label log_data["source"] = source.label
@ -303,11 +341,22 @@ def clean_source(source):
:return: :return:
""" """
function = f"{__name__}.{sys._getframe().f_code.co_name}" function = f"{__name__}.{sys._getframe().f_code.co_name}"
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
log_data = { log_data = {
"function": function, "function": function,
"message": "Cleaning source", "message": "Cleaning source",
"source": source, "source": source,
"task_id": task_id,
} }
if task_id and is_task_active(function, task_id, (source,)):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
clean([source], True) clean([source], True)
@ -318,16 +367,17 @@ def sync_all_sources():
This function will sync certificates from all sources. This function triggers one celery task per source. This function will sync certificates from all sources. This function triggers one celery task per source.
""" """
function = f"{__name__}.{sys._getframe().f_code.co_name}" function = f"{__name__}.{sys._getframe().f_code.co_name}"
log_data = {
"function": function,
"message": "creating celery task to sync source",
}
task_id = None task_id = None
if celery.current_task: if celery.current_task:
task_id = celery.current_task.request.id task_id = celery.current_task.request.id
if task_id and is_task_active(function, task_id, (id,)): log_data = {
"function": function,
"message": "creating celery task to sync source",
"task_id": task_id,
}
if task_id and is_task_active(function, task_id, None):
log_data["message"] = "Skipping task: Task is already active" log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
return return
@ -355,6 +405,7 @@ def sync_source(source):
task_id = None task_id = None
if celery.current_task: if celery.current_task:
task_id = celery.current_task.request.id task_id = celery.current_task.request.id
log_data = { log_data = {
"function": function, "function": function,
"message": "Syncing source", "message": "Syncing source",
@ -362,21 +413,12 @@ def sync_source(source):
"task_id": task_id, "task_id": task_id,
} }
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
if task_id and is_task_active(function, task_id, (id,)):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
current_app.logger.debug(log_data)
if task_id and is_task_active(function, task_id, (source,)): if task_id and is_task_active(function, task_id, (source,)):
log_data["message"] = "Skipping task: Task is already active" log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
return return
current_app.logger.debug(log_data)
try: try:
sync([source]) sync([source])
metrics.send(f"{function}.success", 'counter', 1, metric_tags={"source": source}) metrics.send(f"{function}.success", 'counter', 1, metric_tags={"source": source})
@ -405,16 +447,17 @@ def sync_source_destination():
We rely on account numbers to avoid duplicates. We rely on account numbers to avoid duplicates.
""" """
function = f"{__name__}.{sys._getframe().f_code.co_name}" function = f"{__name__}.{sys._getframe().f_code.co_name}"
log_data = {
"function": function,
"message": "syncing AWS destinations and sources",
}
task_id = None task_id = None
if celery.current_task: if celery.current_task:
task_id = celery.current_task.request.id task_id = celery.current_task.request.id
if task_id and is_task_active(function, task_id, (id,)): log_data = {
"function": function,
"message": "syncing AWS destinations and sources",
"task_id": task_id,
}
if task_id and is_task_active(function, task_id, None):
log_data["message"] = "Skipping task: Task is already active" log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
return return
@ -439,16 +482,17 @@ def certificate_reissue():
:return: :return:
""" """
function = f"{__name__}.{sys._getframe().f_code.co_name}" function = f"{__name__}.{sys._getframe().f_code.co_name}"
log_data = {
"function": function,
"message": "reissuing certificates",
}
task_id = None task_id = None
if celery.current_task: if celery.current_task:
task_id = celery.current_task.request.id task_id = celery.current_task.request.id
if task_id and is_task_active(function, task_id, (id,)): log_data = {
"function": function,
"message": "reissuing certificates",
"task_id": task_id,
}
if task_id and is_task_active(function, task_id, None):
log_data["message"] = "Skipping task: Task is already active" log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
return return
@ -468,16 +512,18 @@ def certificate_rotate():
:return: :return:
""" """
function = f"{__name__}.{sys._getframe().f_code.co_name}" function = f"{__name__}.{sys._getframe().f_code.co_name}"
log_data = {
"function": function,
"message": "rotating certificates",
}
task_id = None task_id = None
if celery.current_task: if celery.current_task:
task_id = celery.current_task.request.id task_id = celery.current_task.request.id
if task_id and is_task_active(function, task_id, (id,)): log_data = {
"function": function,
"message": "rotating certificates",
"task_id": task_id,
}
if task_id and is_task_active(function, task_id, None):
log_data["message"] = "Skipping task: Task is already active" log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
return return
@ -497,16 +543,17 @@ def endpoints_expire():
:return: :return:
""" """
function = f"{__name__}.{sys._getframe().f_code.co_name}" function = f"{__name__}.{sys._getframe().f_code.co_name}"
log_data = {
"function": function,
"message": "endpoints expire",
}
task_id = None task_id = None
if celery.current_task: if celery.current_task:
task_id = celery.current_task.request.id task_id = celery.current_task.request.id
if task_id and is_task_active(function, task_id, (id,)): log_data = {
"function": function,
"message": "endpoints expire",
"task_id": task_id,
}
if task_id and is_task_active(function, task_id, None):
log_data["message"] = "Skipping task: Task is already active" log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
return return
@ -524,16 +571,17 @@ def get_all_zones():
:return: :return:
""" """
function = f"{__name__}.{sys._getframe().f_code.co_name}" function = f"{__name__}.{sys._getframe().f_code.co_name}"
log_data = {
"function": function,
"message": "refresh all zones from available DNS providers",
}
task_id = None task_id = None
if celery.current_task: if celery.current_task:
task_id = celery.current_task.request.id task_id = celery.current_task.request.id
if task_id and is_task_active(function, task_id, (id,)): log_data = {
"function": function,
"message": "refresh all zones from available DNS providers",
"task_id": task_id,
}
if task_id and is_task_active(function, task_id, None):
log_data["message"] = "Skipping task: Task is already active" log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
return return
@ -568,16 +616,17 @@ def notify_expirations():
:return: :return:
""" """
function = f"{__name__}.{sys._getframe().f_code.co_name}" function = f"{__name__}.{sys._getframe().f_code.co_name}"
log_data = {
"function": function,
"message": "notify for cert expiration",
}
task_id = None task_id = None
if celery.current_task: if celery.current_task:
task_id = celery.current_task.request.id task_id = celery.current_task.request.id
if task_id and is_task_active(function, task_id, (id,)): log_data = {
"function": function,
"message": "notify for cert expiration",
"task_id": task_id,
}
if task_id and is_task_active(function, task_id, None):
log_data["message"] = "Skipping task: Task is already active" log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
return return