lemur/lemur/common/celery.py

229 lines
8.0 KiB
Python
Raw Normal View History

2018-09-13 19:35:54 +02:00
"""
This module controls defines celery tasks and their applicable schedules. The celery beat server and workers will start
when invoked.
When ran in development mode (LEMUR_CONFIG=<location of development configuration file. To run both the celery
beat scheduler and a worker simultaneously, and to have jobs kick off starting at the next minute, run the following
command: celery -A lemur.common.celery worker --loglevel=info -l DEBUG -B
"""
import copy
import sys
from datetime import datetime, timezone, timedelta
2018-09-13 19:35:54 +02:00
from celery import Celery
from flask import current_app
from lemur.authorities.service import get as get_authority
from lemur.factory import create_app
from lemur.notifications.messaging import send_pending_failure_notification
from lemur.pending_certificates import service as pending_certificate_service
from lemur.plugins.base import plugins
2018-10-29 17:10:43 +01:00
from lemur.sources.cli import clean, sync, validate_sources
2018-09-13 19:35:54 +02:00
2018-11-29 18:29:05 +01:00
if current_app:
flask_app = current_app
else:
flask_app = create_app()
2018-09-13 19:35:54 +02:00
def make_celery(app):
2018-11-28 23:27:03 +01:00
celery = Celery(app.import_name, backend=app.config.get('CELERY_RESULT_BACKEND'),
broker=app.config.get('CELERY_BROKER_URL'))
2018-09-13 19:35:54 +02:00
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
celery = make_celery(flask_app)
@celery.task()
def fetch_acme_cert(id):
"""
Attempt to get the full certificate for the pending certificate listed.
Args:
id: an id of a PendingCertificate
"""
log_data = {
"function": "{}.{}".format(__name__, sys._getframe().f_code.co_name),
"message": "Resolving pending certificate {}".format(id)
2018-09-13 19:35:54 +02:00
}
current_app.logger.debug(log_data)
2018-09-13 19:35:54 +02:00
pending_certs = pending_certificate_service.get_pending_certs([id])
new = 0
failed = 0
wrong_issuer = 0
acme_certs = []
# We only care about certs using the acme-issuer plugin
for cert in pending_certs:
cert_authority = get_authority(cert.authority_id)
if cert_authority.plugin_name == 'acme-issuer':
acme_certs.append(cert)
else:
wrong_issuer += 1
authority = plugins.get("acme-issuer")
resolved_certs = authority.get_ordered_certificates(acme_certs)
for cert in resolved_certs:
real_cert = cert.get("cert")
# It's necessary to reload the pending cert due to detached instance: http://sqlalche.me/e/bhk3
pending_cert = pending_certificate_service.get(cert.get("pending_cert").id)
2018-10-12 07:01:05 +02:00
if not pending_cert:
log_data["message"] = "Pending certificate doesn't exist anymore. Was it resolved by another process?"
current_app.logger.error(log_data)
continue
2018-09-13 19:35:54 +02:00
if real_cert:
2018-10-12 07:01:05 +02:00
# If a real certificate was returned from issuer, then create it in Lemur and mark
# the pending certificate as resolved
final_cert = pending_certificate_service.create_certificate(pending_cert, real_cert, pending_cert.user)
pending_certificate_service.update(
cert.get("pending_cert").id,
resolved=True
)
pending_certificate_service.update(
cert.get("pending_cert").id,
resolved_cert_id=final_cert.id
)
2018-09-13 19:35:54 +02:00
# add metrics to metrics extension
new += 1
else:
failed += 1
error_log = copy.deepcopy(log_data)
error_log["message"] = "Pending certificate creation failure"
error_log["pending_cert_id"] = pending_cert.id
error_log["last_error"] = cert.get("last_error")
error_log["cn"] = pending_cert.cn
if pending_cert.number_attempts > 4:
error_log["message"] = "Deleting pending certificate"
send_pending_failure_notification(pending_cert, notify_owner=pending_cert.notify)
2018-10-12 14:56:14 +02:00
# Mark the pending cert as resolved
2018-10-12 07:01:05 +02:00
pending_certificate_service.update(
cert.get("pending_cert").id,
resolved=True
)
2018-09-13 19:35:54 +02:00
else:
pending_certificate_service.increment_attempt(pending_cert)
pending_certificate_service.update(
cert.get("pending_cert").id,
status=str(cert.get("last_error"))
)
# Add failed pending cert task back to queue
fetch_acme_cert.delay(id)
current_app.logger.error(error_log)
log_data["message"] = "Complete"
log_data["new"] = new
log_data["failed"] = failed
log_data["wrong_issuer"] = wrong_issuer
current_app.logger.debug(log_data)
print(
"[+] Certificates: New: {new} Failed: {failed} Not using ACME: {wrong_issuer}".format(
new=new,
failed=failed,
wrong_issuer=wrong_issuer
)
)
@celery.task()
def fetch_all_pending_acme_certs():
"""Instantiate celery workers to resolve all pending Acme certificates"""
2018-10-12 07:01:05 +02:00
pending_certs = pending_certificate_service.get_unresolved_pending_certs()
log_data = {
"function": "{}.{}".format(__name__, sys._getframe().f_code.co_name),
"message": "Starting job."
}
current_app.logger.debug(log_data)
# We only care about certs using the acme-issuer plugin
for cert in pending_certs:
cert_authority = get_authority(cert.authority_id)
if cert_authority.plugin_name == 'acme-issuer':
2018-11-09 19:31:27 +01:00
if datetime.now(timezone.utc) - cert.last_updated > timedelta(minutes=5):
log_data["message"] = "Triggering job for cert {}".format(cert.name)
log_data["cert_name"] = cert.name
log_data["cert_id"] = cert.id
current_app.logger.debug(log_data)
fetch_acme_cert.delay(cert.id)
@celery.task()
def remove_old_acme_certs():
"""Prune old pending acme certificates from the database"""
log_data = {
"function": "{}.{}".format(__name__, sys._getframe().f_code.co_name)
}
pending_certs = pending_certificate_service.get_pending_certs('all')
# Delete pending certs more than a week old
for cert in pending_certs:
2018-10-12 16:25:58 +02:00
if datetime.now(timezone.utc) - cert.last_updated > timedelta(days=7):
log_data['pending_cert_id'] = cert.id
log_data['pending_cert_name'] = cert.name
log_data['message'] = "Deleting pending certificate"
current_app.logger.debug(log_data)
pending_certificate_service.delete(cert.id)
2018-10-22 23:51:13 +02:00
@celery.task()
def clean_all_sources():
"""
This function will clean unused certificates from sources. This is a destructive operation and should only
be ran periodically. This function triggers one celery task per source.
"""
sources = validate_sources("all")
for source in sources:
current_app.logger.debug("Creating celery task to clean source {}".format(source.label))
clean_source.delay(source.label)
@celery.task()
def clean_source(source):
"""
This celery task will clean the specified source. This is a destructive operation that will delete unused
certificates from each source.
:param source:
:return:
"""
current_app.logger.debug("Cleaning source {}".format(source))
clean([source], True)
2018-10-29 17:10:43 +01:00
@celery.task()
def sync_all_sources():
"""
This function will sync certificates from all sources. This function triggers one celery task per source.
"""
2018-10-29 17:10:43 +01:00
sources = validate_sources("all")
for source in sources:
current_app.logger.debug("Creating celery task to sync source {}".format(source.label))
sync_source.delay(source.label)
@celery.task()
def sync_source(source):
"""
This celery task will sync the specified source.
:param source:
:return:
"""
current_app.logger.debug("Syncing source {}".format(source))
2018-10-29 21:22:50 +01:00
sync([source])