Merge pull request #2843 from hosseinsh/soft_time_outs
adding soft time outs for celery
This commit is contained in:
commit
0e31e708e3
|
@ -332,7 +332,7 @@ def clean_all_sources():
|
||||||
metrics.send(f"{function}.success", 'counter', 1)
|
metrics.send(f"{function}.success", 'counter', 1)
|
||||||
|
|
||||||
|
|
||||||
@celery.task()
|
@celery.task(soft_time_limit=600)
|
||||||
def clean_source(source):
|
def clean_source(source):
|
||||||
"""
|
"""
|
||||||
This celery task will clean the specified source. This is a destructive operation that will delete unused
|
This celery task will clean the specified source. This is a destructive operation that will delete unused
|
||||||
|
@ -359,7 +359,13 @@ def clean_source(source):
|
||||||
return
|
return
|
||||||
|
|
||||||
current_app.logger.debug(log_data)
|
current_app.logger.debug(log_data)
|
||||||
clean([source], True)
|
try:
|
||||||
|
clean([source], True)
|
||||||
|
except SoftTimeLimitExceeded:
|
||||||
|
log_data["message"] = "Clean source: Time limit exceeded."
|
||||||
|
current_app.logger.error(log_data)
|
||||||
|
sentry.captureException()
|
||||||
|
metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function})
|
||||||
|
|
||||||
|
|
||||||
@celery.task()
|
@celery.task()
|
||||||
|
@ -427,9 +433,8 @@ def sync_source(source):
|
||||||
log_data["message"] = "Error syncing source: Time limit exceeded."
|
log_data["message"] = "Error syncing source: Time limit exceeded."
|
||||||
current_app.logger.error(log_data)
|
current_app.logger.error(log_data)
|
||||||
sentry.captureException()
|
sentry.captureException()
|
||||||
metrics.send(
|
metrics.send("sync_source_timeout", "counter", 1, metric_tags={"source": source})
|
||||||
"sync_source_timeout", "counter", 1, metric_tags={"source": source}
|
metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function})
|
||||||
)
|
|
||||||
return
|
return
|
||||||
|
|
||||||
log_data["message"] = "Done syncing source"
|
log_data["message"] = "Done syncing source"
|
||||||
|
@ -476,7 +481,7 @@ def sync_source_destination():
|
||||||
metrics.send(f"{function}.success", 'counter', 1)
|
metrics.send(f"{function}.success", 'counter', 1)
|
||||||
|
|
||||||
|
|
||||||
@celery.task()
|
@celery.task(soft_time_limit=3600)
|
||||||
def certificate_reissue():
|
def certificate_reissue():
|
||||||
"""
|
"""
|
||||||
This celery task reissues certificates which are pending reissue
|
This celery task reissues certificates which are pending reissue
|
||||||
|
@ -499,14 +504,22 @@ def certificate_reissue():
|
||||||
return
|
return
|
||||||
|
|
||||||
current_app.logger.debug(log_data)
|
current_app.logger.debug(log_data)
|
||||||
cli_certificate.reissue(None, True)
|
try:
|
||||||
|
cli_certificate.reissue(None, True)
|
||||||
|
except SoftTimeLimitExceeded:
|
||||||
|
log_data["message"] = "Certificate reissue: Time limit exceeded."
|
||||||
|
current_app.logger.error(log_data)
|
||||||
|
sentry.captureException()
|
||||||
|
metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function})
|
||||||
|
return
|
||||||
|
|
||||||
log_data["message"] = "reissuance completed"
|
log_data["message"] = "reissuance completed"
|
||||||
current_app.logger.debug(log_data)
|
current_app.logger.debug(log_data)
|
||||||
red.set(f'{function}.last_success', int(time.time()))
|
red.set(f'{function}.last_success', int(time.time()))
|
||||||
metrics.send(f"{function}.success", 'counter', 1)
|
metrics.send(f"{function}.success", 'counter', 1)
|
||||||
|
|
||||||
|
|
||||||
@celery.task()
|
@celery.task(soft_time_limit=3600)
|
||||||
def certificate_rotate():
|
def certificate_rotate():
|
||||||
"""
|
"""
|
||||||
This celery task rotates certificates which are reissued but having endpoints attached to the replaced cert
|
This celery task rotates certificates which are reissued but having endpoints attached to the replaced cert
|
||||||
|
@ -530,14 +543,22 @@ def certificate_rotate():
|
||||||
return
|
return
|
||||||
|
|
||||||
current_app.logger.debug(log_data)
|
current_app.logger.debug(log_data)
|
||||||
cli_certificate.rotate(None, None, None, None, True)
|
try:
|
||||||
|
cli_certificate.rotate(None, None, None, None, True)
|
||||||
|
except SoftTimeLimitExceeded:
|
||||||
|
log_data["message"] = "Certificate rotate: Time limit exceeded."
|
||||||
|
current_app.logger.error(log_data)
|
||||||
|
sentry.captureException()
|
||||||
|
metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function})
|
||||||
|
return
|
||||||
|
|
||||||
log_data["message"] = "rotation completed"
|
log_data["message"] = "rotation completed"
|
||||||
current_app.logger.debug(log_data)
|
current_app.logger.debug(log_data)
|
||||||
red.set(f'{function}.last_success', int(time.time()))
|
red.set(f'{function}.last_success', int(time.time()))
|
||||||
metrics.send(f"{function}.success", 'counter', 1)
|
metrics.send(f"{function}.success", 'counter', 1)
|
||||||
|
|
||||||
|
|
||||||
@celery.task()
|
@celery.task(soft_time_limit=3600)
|
||||||
def endpoints_expire():
|
def endpoints_expire():
|
||||||
"""
|
"""
|
||||||
This celery task removes all endpoints that have not been recently updated
|
This celery task removes all endpoints that have not been recently updated
|
||||||
|
@ -560,12 +581,20 @@ def endpoints_expire():
|
||||||
return
|
return
|
||||||
|
|
||||||
current_app.logger.debug(log_data)
|
current_app.logger.debug(log_data)
|
||||||
cli_endpoints.expire(2) # Time in hours
|
try:
|
||||||
|
cli_endpoints.expire(2) # Time in hours
|
||||||
|
except SoftTimeLimitExceeded:
|
||||||
|
log_data["message"] = "endpoint expire: Time limit exceeded."
|
||||||
|
current_app.logger.error(log_data)
|
||||||
|
sentry.captureException()
|
||||||
|
metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function})
|
||||||
|
return
|
||||||
|
|
||||||
red.set(f'{function}.last_success', int(time.time()))
|
red.set(f'{function}.last_success', int(time.time()))
|
||||||
metrics.send(f"{function}.success", 'counter', 1)
|
metrics.send(f"{function}.success", 'counter', 1)
|
||||||
|
|
||||||
|
|
||||||
@celery.task()
|
@celery.task(soft_time_limit=600)
|
||||||
def get_all_zones():
|
def get_all_zones():
|
||||||
"""
|
"""
|
||||||
This celery syncs all zones from the available dns providers
|
This celery syncs all zones from the available dns providers
|
||||||
|
@ -588,29 +617,56 @@ def get_all_zones():
|
||||||
return
|
return
|
||||||
|
|
||||||
current_app.logger.debug(log_data)
|
current_app.logger.debug(log_data)
|
||||||
cli_dns_providers.get_all_zones()
|
try:
|
||||||
|
cli_dns_providers.get_all_zones()
|
||||||
|
except SoftTimeLimitExceeded:
|
||||||
|
log_data["message"] = "get all zones: Time limit exceeded."
|
||||||
|
current_app.logger.error(log_data)
|
||||||
|
sentry.captureException()
|
||||||
|
metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function})
|
||||||
|
return
|
||||||
|
|
||||||
red.set(f'{function}.last_success', int(time.time()))
|
red.set(f'{function}.last_success', int(time.time()))
|
||||||
metrics.send(f"{function}.success", 'counter', 1)
|
metrics.send(f"{function}.success", 'counter', 1)
|
||||||
|
|
||||||
|
|
||||||
@celery.task()
|
@celery.task(soft_time_limit=3600)
|
||||||
def check_revoked():
|
def check_revoked():
|
||||||
"""
|
"""
|
||||||
This celery task attempts to check if any certs are expired
|
This celery task attempts to check if any certs are expired
|
||||||
:return:
|
:return:
|
||||||
"""
|
"""
|
||||||
function = f"{__name__}.{sys._getframe().f_code.co_name}"
|
function = f"{__name__}.{sys._getframe().f_code.co_name}"
|
||||||
|
task_id = None
|
||||||
|
if celery.current_task:
|
||||||
|
task_id = celery.current_task.request.id
|
||||||
|
|
||||||
log_data = {
|
log_data = {
|
||||||
"function": function,
|
"function": function,
|
||||||
"message": "check if any certificates are revoked revoked",
|
"message": "check if any certificates are revoked revoked",
|
||||||
|
"task_id": task_id,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if task_id and is_task_active(function, task_id, None):
|
||||||
|
log_data["message"] = "Skipping task: Task is already active"
|
||||||
|
current_app.logger.debug(log_data)
|
||||||
|
return
|
||||||
|
|
||||||
current_app.logger.debug(log_data)
|
current_app.logger.debug(log_data)
|
||||||
cli_certificate.check_revoked()
|
try:
|
||||||
|
cli_certificate.check_revoked()
|
||||||
|
except SoftTimeLimitExceeded:
|
||||||
|
log_data["message"] = "Checking revoked: Time limit exceeded."
|
||||||
|
current_app.logger.error(log_data)
|
||||||
|
sentry.captureException()
|
||||||
|
metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function})
|
||||||
|
return
|
||||||
|
|
||||||
red.set(f'{function}.last_success', int(time.time()))
|
red.set(f'{function}.last_success', int(time.time()))
|
||||||
metrics.send(f"{function}.success", 'counter', 1)
|
metrics.send(f"{function}.success", 'counter', 1)
|
||||||
|
|
||||||
|
|
||||||
@celery.task()
|
@celery.task(soft_time_limit=3600)
|
||||||
def notify_expirations():
|
def notify_expirations():
|
||||||
"""
|
"""
|
||||||
This celery task notifies about expiring certs
|
This celery task notifies about expiring certs
|
||||||
|
@ -633,6 +689,14 @@ def notify_expirations():
|
||||||
return
|
return
|
||||||
|
|
||||||
current_app.logger.debug(log_data)
|
current_app.logger.debug(log_data)
|
||||||
cli_notification.expirations(current_app.config.get("EXCLUDE_CN_FROM_NOTIFICATION", []))
|
try:
|
||||||
|
cli_notification.expirations(current_app.config.get("EXCLUDE_CN_FROM_NOTIFICATION", []))
|
||||||
|
except SoftTimeLimitExceeded:
|
||||||
|
log_data["message"] = "Notify expiring Time limit exceeded."
|
||||||
|
current_app.logger.error(log_data)
|
||||||
|
sentry.captureException()
|
||||||
|
metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function})
|
||||||
|
return
|
||||||
|
|
||||||
red.set(f'{function}.last_success', int(time.time()))
|
red.set(f'{function}.last_success', int(time.time()))
|
||||||
metrics.send(f"{function}.success", 'counter', 1)
|
metrics.send(f"{function}.success", 'counter', 1)
|
||||||
|
|
Loading…
Reference in New Issue