preventing celery duplicate tasks

This commit is contained in:
Hossein Shafagh 2019-08-12 13:52:01 -07:00
parent 95086e08dc
commit bf47f87c21
1 changed files with 91 additions and 0 deletions

View File

@ -248,6 +248,15 @@ def remove_old_acme_certs():
} }
pending_certs = pending_certificate_service.get_pending_certs("all") pending_certs = pending_certificate_service.get_pending_certs("all")
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
if task_id and is_task_active(function, task_id, (id,)):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
# Delete pending certs more than a week old # Delete pending certs more than a week old
for cert in pending_certs: for cert in pending_certs:
if datetime.now(timezone.utc) - cert.last_updated > timedelta(days=7): if datetime.now(timezone.utc) - cert.last_updated > timedelta(days=7):
@ -311,6 +320,17 @@ def sync_all_sources():
"function": function, "function": function,
"message": "creating celery task to sync source", "message": "creating celery task to sync source",
} }
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
if task_id and is_task_active(function, task_id, (id,)):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
sources = validate_sources("all") sources = validate_sources("all")
for source in sources: for source in sources:
log_data["source"] = source.label log_data["source"] = source.label
@ -340,6 +360,17 @@ def sync_source(source):
"source": source, "source": source,
"task_id": task_id, "task_id": task_id,
} }
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
if task_id and is_task_active(function, task_id, (id,)):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
if task_id and is_task_active(function, task_id, (source,)): if task_id and is_task_active(function, task_id, (source,)):
@ -378,6 +409,16 @@ def sync_source_destination():
"function": function, "function": function,
"message": "syncing AWS destinations and sources", "message": "syncing AWS destinations and sources",
} }
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
if task_id and is_task_active(function, task_id, (id,)):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
for dst in destinations_service.get_all(): for dst in destinations_service.get_all():
if add_aws_destination_to_sources(dst): if add_aws_destination_to_sources(dst):
@ -402,6 +443,16 @@ def certificate_reissue():
"function": function, "function": function,
"message": "reissuing certificates", "message": "reissuing certificates",
} }
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
if task_id and is_task_active(function, task_id, (id,)):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
cli_certificate.reissue(None, True) cli_certificate.reissue(None, True)
log_data["message"] = "reissuance completed" log_data["message"] = "reissuance completed"
@ -421,6 +472,16 @@ def certificate_rotate():
"function": function, "function": function,
"message": "rotating certificates", "message": "rotating certificates",
} }
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
if task_id and is_task_active(function, task_id, (id,)):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
cli_certificate.rotate(None, None, None, None, True) cli_certificate.rotate(None, None, None, None, True)
log_data["message"] = "rotation completed" log_data["message"] = "rotation completed"
@ -440,6 +501,16 @@ def endpoints_expire():
"function": function, "function": function,
"message": "endpoints expire", "message": "endpoints expire",
} }
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
if task_id and is_task_active(function, task_id, (id,)):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
cli_endpoints.expire(2) # Time in hours cli_endpoints.expire(2) # Time in hours
red.set(f'{function}.last_success', int(time.time())) red.set(f'{function}.last_success', int(time.time()))
@ -457,6 +528,16 @@ def get_all_zones():
"function": function, "function": function,
"message": "refresh all zones from available DNS providers", "message": "refresh all zones from available DNS providers",
} }
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
if task_id and is_task_active(function, task_id, (id,)):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
cli_dns_providers.get_all_zones() cli_dns_providers.get_all_zones()
red.set(f'{function}.last_success', int(time.time())) red.set(f'{function}.last_success', int(time.time()))
@ -491,6 +572,16 @@ def notify_expirations():
"function": function, "function": function,
"message": "notify for cert expiration", "message": "notify for cert expiration",
} }
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
if task_id and is_task_active(function, task_id, (id,)):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
cli_notification.expirations(current_app.config.get("EXCLUDE_CN_FROM_NOTIFICATION", [])) cli_notification.expirations(current_app.config.get("EXCLUDE_CN_FROM_NOTIFICATION", []))
red.set(f'{function}.last_success', int(time.time())) red.set(f'{function}.last_success', int(time.time()))