Merge branch 'master' into check-revoke-revised
This commit is contained in:
commit
68abf11be8
|
@ -66,6 +66,9 @@ celery = make_celery(flask_app)
|
|||
def is_task_active(fun, task_id, args):
|
||||
from celery.task.control import inspect
|
||||
|
||||
if not args:
|
||||
args = '()' # empty args
|
||||
|
||||
i = inspect()
|
||||
active_tasks = i.active()
|
||||
for _, tasks in active_tasks.items():
|
||||
|
@ -89,6 +92,21 @@ def report_celery_last_success_metrics():
|
|||
|
||||
"""
|
||||
function = f"{__name__}.{sys._getframe().f_code.co_name}"
|
||||
task_id = None
|
||||
if celery.current_task:
|
||||
task_id = celery.current_task.request.id
|
||||
|
||||
log_data = {
|
||||
"function": function,
|
||||
"message": "recurrent task",
|
||||
"task_id": task_id,
|
||||
}
|
||||
|
||||
if task_id and is_task_active(function, task_id, None):
|
||||
log_data["message"] = "Skipping task: Task is already active"
|
||||
current_app.logger.debug(log_data)
|
||||
return
|
||||
|
||||
current_time = int(time.time())
|
||||
schedule = current_app.config.get('CELERYBEAT_SCHEDULE')
|
||||
for _, t in schedule.items():
|
||||
|
@ -213,15 +231,25 @@ def fetch_acme_cert(id):
|
|||
@celery.task()
|
||||
def fetch_all_pending_acme_certs():
|
||||
"""Instantiate celery workers to resolve all pending Acme certificates"""
|
||||
pending_certs = pending_certificate_service.get_unresolved_pending_certs()
|
||||
|
||||
function = f"{__name__}.{sys._getframe().f_code.co_name}"
|
||||
task_id = None
|
||||
if celery.current_task:
|
||||
task_id = celery.current_task.request.id
|
||||
|
||||
log_data = {
|
||||
"function": function,
|
||||
"message": "Starting job.",
|
||||
"task_id": task_id,
|
||||
}
|
||||
|
||||
if task_id and is_task_active(function, task_id, None):
|
||||
log_data["message"] = "Skipping task: Task is already active"
|
||||
current_app.logger.debug(log_data)
|
||||
return
|
||||
|
||||
current_app.logger.debug(log_data)
|
||||
pending_certs = pending_certificate_service.get_unresolved_pending_certs()
|
||||
|
||||
# We only care about certs using the acme-issuer plugin
|
||||
for cert in pending_certs:
|
||||
|
@ -242,10 +270,21 @@ def fetch_all_pending_acme_certs():
|
|||
def remove_old_acme_certs():
|
||||
"""Prune old pending acme certificates from the database"""
|
||||
function = f"{__name__}.{sys._getframe().f_code.co_name}"
|
||||
task_id = None
|
||||
if celery.current_task:
|
||||
task_id = celery.current_task.request.id
|
||||
|
||||
log_data = {
|
||||
"function": function,
|
||||
"message": "Starting job.",
|
||||
"task_id": task_id,
|
||||
}
|
||||
|
||||
if task_id and is_task_active(function, task_id, None):
|
||||
log_data["message"] = "Skipping task: Task is already active"
|
||||
current_app.logger.debug(log_data)
|
||||
return
|
||||
|
||||
pending_certs = pending_certificate_service.get_pending_certs("all")
|
||||
|
||||
# Delete pending certs more than a week old
|
||||
|
@ -268,10 +307,21 @@ def clean_all_sources():
|
|||
be ran periodically. This function triggers one celery task per source.
|
||||
"""
|
||||
function = f"{__name__}.{sys._getframe().f_code.co_name}"
|
||||
task_id = None
|
||||
if celery.current_task:
|
||||
task_id = celery.current_task.request.id
|
||||
|
||||
log_data = {
|
||||
"function": function,
|
||||
"message": "Creating celery task to clean source",
|
||||
"task_id": task_id,
|
||||
}
|
||||
|
||||
if task_id and is_task_active(function, task_id, None):
|
||||
log_data["message"] = "Skipping task: Task is already active"
|
||||
current_app.logger.debug(log_data)
|
||||
return
|
||||
|
||||
sources = validate_sources("all")
|
||||
for source in sources:
|
||||
log_data["source"] = source.label
|
||||
|
@ -282,7 +332,7 @@ def clean_all_sources():
|
|||
metrics.send(f"{function}.success", 'counter', 1)
|
||||
|
||||
|
||||
@celery.task()
|
||||
@celery.task(soft_time_limit=600)
|
||||
def clean_source(source):
|
||||
"""
|
||||
This celery task will clean the specified source. This is a destructive operation that will delete unused
|
||||
|
@ -292,13 +342,30 @@ def clean_source(source):
|
|||
:return:
|
||||
"""
|
||||
function = f"{__name__}.{sys._getframe().f_code.co_name}"
|
||||
task_id = None
|
||||
if celery.current_task:
|
||||
task_id = celery.current_task.request.id
|
||||
|
||||
log_data = {
|
||||
"function": function,
|
||||
"message": "Cleaning source",
|
||||
"source": source,
|
||||
"task_id": task_id,
|
||||
}
|
||||
|
||||
if task_id and is_task_active(function, task_id, (source,)):
|
||||
log_data["message"] = "Skipping task: Task is already active"
|
||||
current_app.logger.debug(log_data)
|
||||
return
|
||||
|
||||
current_app.logger.debug(log_data)
|
||||
try:
|
||||
clean([source], True)
|
||||
except SoftTimeLimitExceeded:
|
||||
log_data["message"] = "Clean source: Time limit exceeded."
|
||||
current_app.logger.error(log_data)
|
||||
sentry.captureException()
|
||||
metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function})
|
||||
|
||||
|
||||
@celery.task()
|
||||
|
@ -307,10 +374,21 @@ def sync_all_sources():
|
|||
This function will sync certificates from all sources. This function triggers one celery task per source.
|
||||
"""
|
||||
function = f"{__name__}.{sys._getframe().f_code.co_name}"
|
||||
task_id = None
|
||||
if celery.current_task:
|
||||
task_id = celery.current_task.request.id
|
||||
|
||||
log_data = {
|
||||
"function": function,
|
||||
"message": "creating celery task to sync source",
|
||||
"task_id": task_id,
|
||||
}
|
||||
|
||||
if task_id and is_task_active(function, task_id, None):
|
||||
log_data["message"] = "Skipping task: Task is already active"
|
||||
current_app.logger.debug(log_data)
|
||||
return
|
||||
|
||||
sources = validate_sources("all")
|
||||
for source in sources:
|
||||
log_data["source"] = source.label
|
||||
|
@ -334,18 +412,20 @@ def sync_source(source):
|
|||
task_id = None
|
||||
if celery.current_task:
|
||||
task_id = celery.current_task.request.id
|
||||
|
||||
log_data = {
|
||||
"function": function,
|
||||
"message": "Syncing source",
|
||||
"source": source,
|
||||
"task_id": task_id,
|
||||
}
|
||||
current_app.logger.debug(log_data)
|
||||
|
||||
if task_id and is_task_active(function, task_id, (source,)):
|
||||
log_data["message"] = "Skipping task: Task is already active"
|
||||
current_app.logger.debug(log_data)
|
||||
return
|
||||
|
||||
current_app.logger.debug(log_data)
|
||||
try:
|
||||
sync([source])
|
||||
metrics.send(f"{function}.success", 'counter', 1, metric_tags={"source": source})
|
||||
|
@ -353,9 +433,8 @@ def sync_source(source):
|
|||
log_data["message"] = "Error syncing source: Time limit exceeded."
|
||||
current_app.logger.error(log_data)
|
||||
sentry.captureException()
|
||||
metrics.send(
|
||||
"sync_source_timeout", "counter", 1, metric_tags={"source": source}
|
||||
)
|
||||
metrics.send("sync_source_timeout", "counter", 1, metric_tags={"source": source})
|
||||
metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function})
|
||||
return
|
||||
|
||||
log_data["message"] = "Done syncing source"
|
||||
|
@ -374,10 +453,21 @@ def sync_source_destination():
|
|||
We rely on account numbers to avoid duplicates.
|
||||
"""
|
||||
function = f"{__name__}.{sys._getframe().f_code.co_name}"
|
||||
task_id = None
|
||||
if celery.current_task:
|
||||
task_id = celery.current_task.request.id
|
||||
|
||||
log_data = {
|
||||
"function": function,
|
||||
"message": "syncing AWS destinations and sources",
|
||||
"task_id": task_id,
|
||||
}
|
||||
|
||||
if task_id and is_task_active(function, task_id, None):
|
||||
log_data["message"] = "Skipping task: Task is already active"
|
||||
current_app.logger.debug(log_data)
|
||||
return
|
||||
|
||||
current_app.logger.debug(log_data)
|
||||
for dst in destinations_service.get_all():
|
||||
if add_aws_destination_to_sources(dst):
|
||||
|
@ -391,107 +481,222 @@ def sync_source_destination():
|
|||
metrics.send(f"{function}.success", 'counter', 1)
|
||||
|
||||
|
||||
@celery.task()
|
||||
@celery.task(soft_time_limit=3600)
|
||||
def certificate_reissue():
|
||||
"""
|
||||
This celery task reissues certificates which are pending reissue
|
||||
:return:
|
||||
"""
|
||||
function = f"{__name__}.{sys._getframe().f_code.co_name}"
|
||||
task_id = None
|
||||
if celery.current_task:
|
||||
task_id = celery.current_task.request.id
|
||||
|
||||
log_data = {
|
||||
"function": function,
|
||||
"message": "reissuing certificates",
|
||||
"task_id": task_id,
|
||||
}
|
||||
|
||||
if task_id and is_task_active(function, task_id, None):
|
||||
log_data["message"] = "Skipping task: Task is already active"
|
||||
current_app.logger.debug(log_data)
|
||||
return
|
||||
|
||||
current_app.logger.debug(log_data)
|
||||
try:
|
||||
cli_certificate.reissue(None, True)
|
||||
except SoftTimeLimitExceeded:
|
||||
log_data["message"] = "Certificate reissue: Time limit exceeded."
|
||||
current_app.logger.error(log_data)
|
||||
sentry.captureException()
|
||||
metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function})
|
||||
return
|
||||
|
||||
log_data["message"] = "reissuance completed"
|
||||
current_app.logger.debug(log_data)
|
||||
red.set(f'{function}.last_success', int(time.time()))
|
||||
metrics.send(f"{function}.success", 'counter', 1)
|
||||
|
||||
|
||||
@celery.task()
|
||||
@celery.task(soft_time_limit=3600)
|
||||
def certificate_rotate():
|
||||
"""
|
||||
This celery task rotates certificates which are reissued but having endpoints attached to the replaced cert
|
||||
:return:
|
||||
"""
|
||||
function = f"{__name__}.{sys._getframe().f_code.co_name}"
|
||||
task_id = None
|
||||
if celery.current_task:
|
||||
task_id = celery.current_task.request.id
|
||||
|
||||
log_data = {
|
||||
"function": function,
|
||||
"message": "rotating certificates",
|
||||
"task_id": task_id,
|
||||
|
||||
}
|
||||
|
||||
if task_id and is_task_active(function, task_id, None):
|
||||
log_data["message"] = "Skipping task: Task is already active"
|
||||
current_app.logger.debug(log_data)
|
||||
return
|
||||
|
||||
current_app.logger.debug(log_data)
|
||||
try:
|
||||
cli_certificate.rotate(None, None, None, None, True)
|
||||
except SoftTimeLimitExceeded:
|
||||
log_data["message"] = "Certificate rotate: Time limit exceeded."
|
||||
current_app.logger.error(log_data)
|
||||
sentry.captureException()
|
||||
metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function})
|
||||
return
|
||||
|
||||
log_data["message"] = "rotation completed"
|
||||
current_app.logger.debug(log_data)
|
||||
red.set(f'{function}.last_success', int(time.time()))
|
||||
metrics.send(f"{function}.success", 'counter', 1)
|
||||
|
||||
|
||||
@celery.task()
|
||||
@celery.task(soft_time_limit=3600)
|
||||
def endpoints_expire():
|
||||
"""
|
||||
This celery task removes all endpoints that have not been recently updated
|
||||
:return:
|
||||
"""
|
||||
function = f"{__name__}.{sys._getframe().f_code.co_name}"
|
||||
task_id = None
|
||||
if celery.current_task:
|
||||
task_id = celery.current_task.request.id
|
||||
|
||||
log_data = {
|
||||
"function": function,
|
||||
"message": "endpoints expire",
|
||||
"task_id": task_id,
|
||||
}
|
||||
|
||||
if task_id and is_task_active(function, task_id, None):
|
||||
log_data["message"] = "Skipping task: Task is already active"
|
||||
current_app.logger.debug(log_data)
|
||||
return
|
||||
|
||||
current_app.logger.debug(log_data)
|
||||
try:
|
||||
cli_endpoints.expire(2) # Time in hours
|
||||
except SoftTimeLimitExceeded:
|
||||
log_data["message"] = "endpoint expire: Time limit exceeded."
|
||||
current_app.logger.error(log_data)
|
||||
sentry.captureException()
|
||||
metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function})
|
||||
return
|
||||
|
||||
red.set(f'{function}.last_success', int(time.time()))
|
||||
metrics.send(f"{function}.success", 'counter', 1)
|
||||
|
||||
|
||||
@celery.task()
|
||||
@celery.task(soft_time_limit=600)
|
||||
def get_all_zones():
|
||||
"""
|
||||
This celery syncs all zones from the available dns providers
|
||||
:return:
|
||||
"""
|
||||
function = f"{__name__}.{sys._getframe().f_code.co_name}"
|
||||
task_id = None
|
||||
if celery.current_task:
|
||||
task_id = celery.current_task.request.id
|
||||
|
||||
log_data = {
|
||||
"function": function,
|
||||
"message": "refresh all zones from available DNS providers",
|
||||
"task_id": task_id,
|
||||
}
|
||||
|
||||
if task_id and is_task_active(function, task_id, None):
|
||||
log_data["message"] = "Skipping task: Task is already active"
|
||||
current_app.logger.debug(log_data)
|
||||
return
|
||||
|
||||
current_app.logger.debug(log_data)
|
||||
try:
|
||||
cli_dns_providers.get_all_zones()
|
||||
except SoftTimeLimitExceeded:
|
||||
log_data["message"] = "get all zones: Time limit exceeded."
|
||||
current_app.logger.error(log_data)
|
||||
sentry.captureException()
|
||||
metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function})
|
||||
return
|
||||
|
||||
red.set(f'{function}.last_success', int(time.time()))
|
||||
metrics.send(f"{function}.success", 'counter', 1)
|
||||
|
||||
|
||||
@celery.task()
|
||||
@celery.task(soft_time_limit=3600)
|
||||
def check_revoked():
|
||||
"""
|
||||
This celery task attempts to check if any certs are expired
|
||||
:return:
|
||||
"""
|
||||
function = f"{__name__}.{sys._getframe().f_code.co_name}"
|
||||
task_id = None
|
||||
if celery.current_task:
|
||||
task_id = celery.current_task.request.id
|
||||
|
||||
log_data = {
|
||||
"function": function,
|
||||
"message": "check if any certificates are revoked revoked",
|
||||
"task_id": task_id,
|
||||
}
|
||||
|
||||
if task_id and is_task_active(function, task_id, None):
|
||||
log_data["message"] = "Skipping task: Task is already active"
|
||||
current_app.logger.debug(log_data)
|
||||
return
|
||||
|
||||
current_app.logger.debug(log_data)
|
||||
try:
|
||||
cli_certificate.check_revoked()
|
||||
except SoftTimeLimitExceeded:
|
||||
log_data["message"] = "Checking revoked: Time limit exceeded."
|
||||
current_app.logger.error(log_data)
|
||||
sentry.captureException()
|
||||
metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function})
|
||||
return
|
||||
|
||||
red.set(f'{function}.last_success', int(time.time()))
|
||||
metrics.send(f"{function}.success", 'counter', 1)
|
||||
|
||||
|
||||
@celery.task()
|
||||
@celery.task(soft_time_limit=3600)
|
||||
def notify_expirations():
|
||||
"""
|
||||
This celery task notifies about expiring certs
|
||||
:return:
|
||||
"""
|
||||
function = f"{__name__}.{sys._getframe().f_code.co_name}"
|
||||
task_id = None
|
||||
if celery.current_task:
|
||||
task_id = celery.current_task.request.id
|
||||
|
||||
log_data = {
|
||||
"function": function,
|
||||
"message": "notify for cert expiration",
|
||||
"task_id": task_id,
|
||||
}
|
||||
|
||||
if task_id and is_task_active(function, task_id, None):
|
||||
log_data["message"] = "Skipping task: Task is already active"
|
||||
current_app.logger.debug(log_data)
|
||||
return
|
||||
|
||||
current_app.logger.debug(log_data)
|
||||
try:
|
||||
cli_notification.expirations(current_app.config.get("EXCLUDE_CN_FROM_NOTIFICATION", []))
|
||||
except SoftTimeLimitExceeded:
|
||||
log_data["message"] = "Notify expiring Time limit exceeded."
|
||||
current_app.logger.error(log_data)
|
||||
sentry.captureException()
|
||||
metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function})
|
||||
return
|
||||
|
||||
red.set(f'{function}.last_success', int(time.time()))
|
||||
metrics.send(f"{function}.success", 'counter', 1)
|
||||
|
|
Loading…
Reference in New Issue