Celery integration

This commit is contained in:
Curtis Castrapel
2018-09-13 10:35:54 -07:00
parent 3c521f66a5
commit 23382b2777
8 changed files with 181 additions and 48 deletions

View File

@ -5,32 +5,27 @@
:license: Apache, see LICENSE for more details.
.. moduleauthor:: Kevin Glisson <kglisson@netflix.com>
"""
import arrow
from flask import current_app
from cryptography import x509
from sqlalchemy import func, or_, not_, cast, Integer
from cryptography import x509
import arrow
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from flask import current_app
from lemur import database
from lemur.extensions import metrics, sentry, signals
from lemur.plugins.base import plugins
from lemur.common.utils import generate_private_key, truthiness
from lemur.roles.models import Role
from lemur.domains.models import Domain
from lemur.authorities.models import Authority
from lemur.destinations.models import Destination
from lemur.certificates.models import Certificate
from lemur.certificates.schemas import CertificateOutputSchema, CertificateInputSchema
from lemur.common.utils import generate_private_key, truthiness
from lemur.destinations.models import Destination
from lemur.domains.models import Domain
from lemur.extensions import metrics, sentry, signals
from lemur.notifications.models import Notification
from lemur.pending_certificates.models import PendingCertificate
from lemur.certificates.schemas import CertificateOutputSchema, CertificateInputSchema
from lemur.plugins.base import plugins
from lemur.roles import service as role_service
from lemur.roles.models import Role
csr_created = signals.signal('csr_created', "CSR generated")
csr_imported = signals.signal('csr_imported', "CSR imported from external source")
@ -95,8 +90,8 @@ def get_all_pending_cleaning(source):
:param source:
:return:
"""
return Certificate.query.filter(Certificate.sources.any(id=source.id))\
.filter(not_(Certificate.endpoints.any())).all()
return Certificate.query.filter(Certificate.sources.any(id=source.id)) \
.filter(not_(Certificate.endpoints.any())).all()
def get_all_pending_reissue():
@ -109,8 +104,8 @@ def get_all_pending_reissue():
:return:
"""
return Certificate.query.filter(Certificate.rotation == True)\
.filter(not_(Certificate.replaced.any()))\
return Certificate.query.filter(Certificate.rotation == True) \
.filter(not_(Certificate.replaced.any())) \
.filter(Certificate.in_rotation_window == True).all() # noqa
@ -280,6 +275,11 @@ def create(**kwargs):
if isinstance(cert, Certificate):
certificate_issued.send(certificate=cert, authority=cert.authority)
metrics.send('certificate_issued', 'counter', 1, metric_tags=dict(owner=cert.owner, issuer=cert.issuer))
if isinstance(cert, PendingCertificate) and cert.authority.plugin_name == 'acme-issuer':
# Call Celery to create acme-issuer (LetsEncrypt) certificates
from lemur.common.celery import fetch_acme_cert
fetch_acme_cert.delay(cert.id)
return cert
@ -310,8 +310,8 @@ def render(args):
if 'issuer' in terms:
# we can't rely on issuer being correct in the cert directly so we combine queries
sub_query = database.session_query(Authority.id)\
.filter(Authority.name.ilike(term))\
sub_query = database.session_query(Authority.id) \
.filter(Authority.name.ilike(term)) \
.subquery()
query = query.filter(
@ -450,8 +450,8 @@ def stats(**kwargs):
if kwargs.get('metric') == 'not_after':
start = arrow.utcnow()
end = start.replace(weeks=+32)
items = database.db.session.query(Certificate.issuer, func.count(Certificate.id))\
.group_by(Certificate.issuer)\
items = database.db.session.query(Certificate.issuer, func.count(Certificate.id)) \
.group_by(Certificate.issuer) \
.filter(Certificate.not_after <= end.format('YYYY-MM-DD')) \
.filter(Certificate.not_after >= start.format('YYYY-MM-DD')).all()

119
lemur/common/celery.py Normal file
View File

@ -0,0 +1,119 @@
"""
This module controls defines celery tasks and their applicable schedules. The celery beat server and workers will start
when invoked.
When ran in development mode (LEMUR_CONFIG=<location of development configuration file. To run both the celery
beat scheduler and a worker simultaneously, and to have jobs kick off starting at the next minute, run the following
command: celery -A lemur.common.celery worker --loglevel=info -l DEBUG -B
"""
import copy
import sys
from celery import Celery
from flask import current_app
from lemur.authorities.service import get as get_authority
from lemur.factory import create_app
from lemur.notifications.messaging import send_pending_failure_notification
from lemur.pending_certificates import service as pending_certificate_service
from lemur.plugins.base import plugins
from lemur.users import service as user_service
flask_app = create_app()
def make_celery(app):
celery = Celery(app.import_name, backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
celery = make_celery(flask_app)
@celery.task()
def fetch_acme_cert(id):
"""
Attempt to get the full certificate for the pending certificate listed.
Args:
id: an id of a PendingCertificate
"""
log_data = {
"function": "{}.{}".format(__name__, sys._getframe().f_code.co_name)
}
pending_certs = pending_certificate_service.get_pending_certs([id])
user = user_service.get_by_username('lemur')
new = 0
failed = 0
wrong_issuer = 0
acme_certs = []
# We only care about certs using the acme-issuer plugin
for cert in pending_certs:
cert_authority = get_authority(cert.authority_id)
if cert_authority.plugin_name == 'acme-issuer':
acme_certs.append(cert)
else:
wrong_issuer += 1
authority = plugins.get("acme-issuer")
resolved_certs = authority.get_ordered_certificates(acme_certs)
for cert in resolved_certs:
real_cert = cert.get("cert")
# It's necessary to reload the pending cert due to detached instance: http://sqlalche.me/e/bhk3
pending_cert = pending_certificate_service.get(cert.get("pending_cert").id)
if real_cert:
# If a real certificate was returned from issuer, then create it in Lemur and delete
# the pending certificate
pending_certificate_service.create_certificate(pending_cert, real_cert, user)
pending_certificate_service.delete_by_id(pending_cert.id)
# add metrics to metrics extension
new += 1
else:
failed += 1
error_log = copy.deepcopy(log_data)
error_log["message"] = "Pending certificate creation failure"
error_log["pending_cert_id"] = pending_cert.id
error_log["last_error"] = cert.get("last_error")
error_log["cn"] = pending_cert.cn
if pending_cert.number_attempts > 4:
error_log["message"] = "Deleting pending certificate"
send_pending_failure_notification(pending_cert, notify_owner=pending_cert.notify)
pending_certificate_service.delete(pending_certificate_service.cancel(pending_cert))
else:
pending_certificate_service.increment_attempt(pending_cert)
pending_certificate_service.update(
cert.get("pending_cert").id,
status=str(cert.get("last_error"))
)
# Add failed pending cert task back to queue
fetch_acme_cert.delay(id)
current_app.logger.error(error_log)
log_data["message"] = "Complete"
log_data["new"] = new
log_data["failed"] = failed
log_data["wrong_issuer"] = wrong_issuer
current_app.logger.debug(log_data)
print(
"[+] Certificates: New: {new} Failed: {failed} Not using ACME: {wrong_issuer}".format(
new=new,
failed=failed,
wrong_issuer=wrong_issuer
)
)