Merge pull request #2845 from hosseinsh/prevent-duplicate-celery-tasks

preventing celery duplicate tasks
This commit is contained in:
Hossein Shafagh 2019-08-13 19:42:11 -07:00 committed by GitHub
commit b6486d85ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 143 additions and 2 deletions

View File

@ -66,6 +66,9 @@ celery = make_celery(flask_app)
def is_task_active(fun, task_id, args): def is_task_active(fun, task_id, args):
from celery.task.control import inspect from celery.task.control import inspect
if not args:
args = '()' # empty args
i = inspect() i = inspect()
active_tasks = i.active() active_tasks = i.active()
for _, tasks in active_tasks.items(): for _, tasks in active_tasks.items():
@ -89,6 +92,21 @@ def report_celery_last_success_metrics():
""" """
function = f"{__name__}.{sys._getframe().f_code.co_name}" function = f"{__name__}.{sys._getframe().f_code.co_name}"
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
log_data = {
"function": function,
"message": "recurrent task",
"task_id": task_id,
}
if task_id and is_task_active(function, task_id, None):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
current_time = int(time.time()) current_time = int(time.time())
schedule = current_app.config.get('CELERYBEAT_SCHEDULE') schedule = current_app.config.get('CELERYBEAT_SCHEDULE')
for _, t in schedule.items(): for _, t in schedule.items():
@ -213,15 +231,25 @@ def fetch_acme_cert(id):
@celery.task() @celery.task()
def fetch_all_pending_acme_certs(): def fetch_all_pending_acme_certs():
"""Instantiate celery workers to resolve all pending Acme certificates""" """Instantiate celery workers to resolve all pending Acme certificates"""
pending_certs = pending_certificate_service.get_unresolved_pending_certs()
function = f"{__name__}.{sys._getframe().f_code.co_name}" function = f"{__name__}.{sys._getframe().f_code.co_name}"
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
log_data = { log_data = {
"function": function, "function": function,
"message": "Starting job.", "message": "Starting job.",
"task_id": task_id,
} }
if task_id and is_task_active(function, task_id, None):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
pending_certs = pending_certificate_service.get_unresolved_pending_certs()
# We only care about certs using the acme-issuer plugin # We only care about certs using the acme-issuer plugin
for cert in pending_certs: for cert in pending_certs:
@ -242,10 +270,21 @@ def fetch_all_pending_acme_certs():
def remove_old_acme_certs(): def remove_old_acme_certs():
"""Prune old pending acme certificates from the database""" """Prune old pending acme certificates from the database"""
function = f"{__name__}.{sys._getframe().f_code.co_name}" function = f"{__name__}.{sys._getframe().f_code.co_name}"
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
log_data = { log_data = {
"function": function, "function": function,
"message": "Starting job.", "message": "Starting job.",
"task_id": task_id,
} }
if task_id and is_task_active(function, task_id, None):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
pending_certs = pending_certificate_service.get_pending_certs("all") pending_certs = pending_certificate_service.get_pending_certs("all")
# Delete pending certs more than a week old # Delete pending certs more than a week old
@ -268,10 +307,21 @@ def clean_all_sources():
be ran periodically. This function triggers one celery task per source. be ran periodically. This function triggers one celery task per source.
""" """
function = f"{__name__}.{sys._getframe().f_code.co_name}" function = f"{__name__}.{sys._getframe().f_code.co_name}"
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
log_data = { log_data = {
"function": function, "function": function,
"message": "Creating celery task to clean source", "message": "Creating celery task to clean source",
"task_id": task_id,
} }
if task_id and is_task_active(function, task_id, None):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
sources = validate_sources("all") sources = validate_sources("all")
for source in sources: for source in sources:
log_data["source"] = source.label log_data["source"] = source.label
@ -292,11 +342,22 @@ def clean_source(source):
:return: :return:
""" """
function = f"{__name__}.{sys._getframe().f_code.co_name}" function = f"{__name__}.{sys._getframe().f_code.co_name}"
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
log_data = { log_data = {
"function": function, "function": function,
"message": "Cleaning source", "message": "Cleaning source",
"source": source, "source": source,
"task_id": task_id,
} }
if task_id and is_task_active(function, task_id, (source,)):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
clean([source], True) clean([source], True)
@ -307,10 +368,21 @@ def sync_all_sources():
This function will sync certificates from all sources. This function triggers one celery task per source. This function will sync certificates from all sources. This function triggers one celery task per source.
""" """
function = f"{__name__}.{sys._getframe().f_code.co_name}" function = f"{__name__}.{sys._getframe().f_code.co_name}"
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
log_data = { log_data = {
"function": function, "function": function,
"message": "creating celery task to sync source", "message": "creating celery task to sync source",
"task_id": task_id,
} }
if task_id and is_task_active(function, task_id, None):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
sources = validate_sources("all") sources = validate_sources("all")
for source in sources: for source in sources:
log_data["source"] = source.label log_data["source"] = source.label
@ -334,18 +406,20 @@ def sync_source(source):
task_id = None task_id = None
if celery.current_task: if celery.current_task:
task_id = celery.current_task.request.id task_id = celery.current_task.request.id
log_data = { log_data = {
"function": function, "function": function,
"message": "Syncing source", "message": "Syncing source",
"source": source, "source": source,
"task_id": task_id, "task_id": task_id,
} }
current_app.logger.debug(log_data)
if task_id and is_task_active(function, task_id, (source,)): if task_id and is_task_active(function, task_id, (source,)):
log_data["message"] = "Skipping task: Task is already active" log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
return return
current_app.logger.debug(log_data)
try: try:
sync([source]) sync([source])
metrics.send(f"{function}.success", 'counter', 1, metric_tags={"source": source}) metrics.send(f"{function}.success", 'counter', 1, metric_tags={"source": source})
@ -374,10 +448,21 @@ def sync_source_destination():
We rely on account numbers to avoid duplicates. We rely on account numbers to avoid duplicates.
""" """
function = f"{__name__}.{sys._getframe().f_code.co_name}" function = f"{__name__}.{sys._getframe().f_code.co_name}"
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
log_data = { log_data = {
"function": function, "function": function,
"message": "syncing AWS destinations and sources", "message": "syncing AWS destinations and sources",
"task_id": task_id,
} }
if task_id and is_task_active(function, task_id, None):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
for dst in destinations_service.get_all(): for dst in destinations_service.get_all():
if add_aws_destination_to_sources(dst): if add_aws_destination_to_sources(dst):
@ -398,10 +483,21 @@ def certificate_reissue():
:return: :return:
""" """
function = f"{__name__}.{sys._getframe().f_code.co_name}" function = f"{__name__}.{sys._getframe().f_code.co_name}"
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
log_data = { log_data = {
"function": function, "function": function,
"message": "reissuing certificates", "message": "reissuing certificates",
"task_id": task_id,
} }
if task_id and is_task_active(function, task_id, None):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
cli_certificate.reissue(None, True) cli_certificate.reissue(None, True)
log_data["message"] = "reissuance completed" log_data["message"] = "reissuance completed"
@ -417,10 +513,22 @@ def certificate_rotate():
:return: :return:
""" """
function = f"{__name__}.{sys._getframe().f_code.co_name}" function = f"{__name__}.{sys._getframe().f_code.co_name}"
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
log_data = { log_data = {
"function": function, "function": function,
"message": "rotating certificates", "message": "rotating certificates",
"task_id": task_id,
} }
if task_id and is_task_active(function, task_id, None):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
cli_certificate.rotate(None, None, None, None, True) cli_certificate.rotate(None, None, None, None, True)
log_data["message"] = "rotation completed" log_data["message"] = "rotation completed"
@ -436,10 +544,21 @@ def endpoints_expire():
:return: :return:
""" """
function = f"{__name__}.{sys._getframe().f_code.co_name}" function = f"{__name__}.{sys._getframe().f_code.co_name}"
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
log_data = { log_data = {
"function": function, "function": function,
"message": "endpoints expire", "message": "endpoints expire",
"task_id": task_id,
} }
if task_id and is_task_active(function, task_id, None):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
cli_endpoints.expire(2) # Time in hours cli_endpoints.expire(2) # Time in hours
red.set(f'{function}.last_success', int(time.time())) red.set(f'{function}.last_success', int(time.time()))
@ -453,10 +572,21 @@ def get_all_zones():
:return: :return:
""" """
function = f"{__name__}.{sys._getframe().f_code.co_name}" function = f"{__name__}.{sys._getframe().f_code.co_name}"
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
log_data = { log_data = {
"function": function, "function": function,
"message": "refresh all zones from available DNS providers", "message": "refresh all zones from available DNS providers",
"task_id": task_id,
} }
if task_id and is_task_active(function, task_id, None):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
cli_dns_providers.get_all_zones() cli_dns_providers.get_all_zones()
red.set(f'{function}.last_success', int(time.time())) red.set(f'{function}.last_success', int(time.time()))
@ -487,10 +617,21 @@ def notify_expirations():
:return: :return:
""" """
function = f"{__name__}.{sys._getframe().f_code.co_name}" function = f"{__name__}.{sys._getframe().f_code.co_name}"
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
log_data = { log_data = {
"function": function, "function": function,
"message": "notify for cert expiration", "message": "notify for cert expiration",
"task_id": task_id,
} }
if task_id and is_task_active(function, task_id, None):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
cli_notification.expirations(current_app.config.get("EXCLUDE_CN_FROM_NOTIFICATION", [])) cli_notification.expirations(current_app.config.get("EXCLUDE_CN_FROM_NOTIFICATION", []))
red.set(f'{function}.last_success', int(time.time())) red.set(f'{function}.last_success', int(time.time()))