Merge pull request #1711 from castrapel/celery_092018

Celery integration
This commit is contained in:
Curtis 2018-09-17 15:36:06 -07:00 committed by GitHub
commit c7b57e21a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 243 additions and 72 deletions

View File

@ -6,31 +6,25 @@
.. moduleauthor:: Kevin Glisson <kglisson@netflix.com>
"""
import arrow
from flask import current_app
from sqlalchemy import func, or_, not_, cast, Integer
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from flask import current_app
from sqlalchemy import func, or_, not_, cast, Integer
from lemur import database
from lemur.extensions import metrics, sentry, signals
from lemur.plugins.base import plugins
from lemur.common.utils import generate_private_key, truthiness
from lemur.roles.models import Role
from lemur.domains.models import Domain
from lemur.authorities.models import Authority
from lemur.destinations.models import Destination
from lemur.certificates.models import Certificate
from lemur.certificates.schemas import CertificateOutputSchema, CertificateInputSchema
from lemur.common.utils import generate_private_key, truthiness
from lemur.destinations.models import Destination
from lemur.domains.models import Domain
from lemur.extensions import metrics, sentry, signals
from lemur.notifications.models import Notification
from lemur.pending_certificates.models import PendingCertificate
from lemur.certificates.schemas import CertificateOutputSchema, CertificateInputSchema
from lemur.plugins.base import plugins
from lemur.roles import service as role_service
from lemur.roles.models import Role
csr_created = signals.signal('csr_created', "CSR generated")
csr_imported = signals.signal('csr_imported', "CSR imported from external source")
@ -95,8 +89,8 @@ def get_all_pending_cleaning(source):
:param source:
:return:
"""
return Certificate.query.filter(Certificate.sources.any(id=source.id))\
.filter(not_(Certificate.endpoints.any())).all()
return Certificate.query.filter(Certificate.sources.any(id=source.id)) \
.filter(not_(Certificate.endpoints.any())).all()
def get_all_pending_reissue():
@ -109,8 +103,8 @@ def get_all_pending_reissue():
:return:
"""
return Certificate.query.filter(Certificate.rotation == True)\
.filter(not_(Certificate.replaced.any()))\
return Certificate.query.filter(Certificate.rotation == True) \
.filter(not_(Certificate.replaced.any())) \
.filter(Certificate.in_rotation_window == True).all() # noqa
@ -280,6 +274,7 @@ def create(**kwargs):
if isinstance(cert, Certificate):
certificate_issued.send(certificate=cert, authority=cert.authority)
metrics.send('certificate_issued', 'counter', 1, metric_tags=dict(owner=cert.owner, issuer=cert.issuer))
return cert
@ -310,8 +305,8 @@ def render(args):
if 'issuer' in terms:
# we can't rely on issuer being correct in the cert directly so we combine queries
sub_query = database.session_query(Authority.id)\
.filter(Authority.name.ilike(term))\
sub_query = database.session_query(Authority.id) \
.filter(Authority.name.ilike(term)) \
.subquery()
query = query.filter(
@ -450,8 +445,8 @@ def stats(**kwargs):
if kwargs.get('metric') == 'not_after':
start = arrow.utcnow()
end = start.replace(weeks=+32)
items = database.db.session.query(Certificate.issuer, func.count(Certificate.id))\
.group_by(Certificate.issuer)\
items = database.db.session.query(Certificate.issuer, func.count(Certificate.id)) \
.group_by(Certificate.issuer) \
.filter(Certificate.not_after <= end.format('YYYY-MM-DD')) \
.filter(Certificate.not_after >= start.format('YYYY-MM-DD')).all()

135
lemur/common/celery.py Normal file
View File

@ -0,0 +1,135 @@
"""
This module controls defines celery tasks and their applicable schedules. The celery beat server and workers will start
when invoked.
When ran in development mode (LEMUR_CONFIG=<location of development configuration file. To run both the celery
beat scheduler and a worker simultaneously, and to have jobs kick off starting at the next minute, run the following
command: celery -A lemur.common.celery worker --loglevel=info -l DEBUG -B
"""
import copy
import datetime
import sys
from datetime import timezone
from celery import Celery
from flask import current_app
from lemur.authorities.service import get as get_authority
from lemur.factory import create_app
from lemur.notifications.messaging import send_pending_failure_notification
from lemur.pending_certificates import service as pending_certificate_service
from lemur.plugins.base import plugins
from lemur.users import service as user_service
flask_app = create_app()
def make_celery(app):
celery = Celery(app.import_name, backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
celery = make_celery(flask_app)
@celery.task()
def fetch_acme_cert(id):
"""
Attempt to get the full certificate for the pending certificate listed.
Args:
id: an id of a PendingCertificate
"""
log_data = {
"function": "{}.{}".format(__name__, sys._getframe().f_code.co_name)
}
pending_certs = pending_certificate_service.get_pending_certs([id])
user = user_service.get_by_username('lemur')
new = 0
failed = 0
wrong_issuer = 0
acme_certs = []
# We only care about certs using the acme-issuer plugin
for cert in pending_certs:
cert_authority = get_authority(cert.authority_id)
if cert_authority.plugin_name == 'acme-issuer':
acme_certs.append(cert)
else:
wrong_issuer += 1
authority = plugins.get("acme-issuer")
resolved_certs = authority.get_ordered_certificates(acme_certs)
for cert in resolved_certs:
real_cert = cert.get("cert")
# It's necessary to reload the pending cert due to detached instance: http://sqlalche.me/e/bhk3
pending_cert = pending_certificate_service.get(cert.get("pending_cert").id)
if real_cert:
# If a real certificate was returned from issuer, then create it in Lemur and delete
# the pending certificate
pending_certificate_service.create_certificate(pending_cert, real_cert, user)
pending_certificate_service.delete_by_id(pending_cert.id)
# add metrics to metrics extension
new += 1
else:
failed += 1
error_log = copy.deepcopy(log_data)
error_log["message"] = "Pending certificate creation failure"
error_log["pending_cert_id"] = pending_cert.id
error_log["last_error"] = cert.get("last_error")
error_log["cn"] = pending_cert.cn
if pending_cert.number_attempts > 4:
error_log["message"] = "Deleting pending certificate"
send_pending_failure_notification(pending_cert, notify_owner=pending_cert.notify)
pending_certificate_service.delete(pending_certificate_service.cancel(pending_cert))
else:
pending_certificate_service.increment_attempt(pending_cert)
pending_certificate_service.update(
cert.get("pending_cert").id,
status=str(cert.get("last_error"))
)
# Add failed pending cert task back to queue
fetch_acme_cert.delay(id)
current_app.logger.error(error_log)
log_data["message"] = "Complete"
log_data["new"] = new
log_data["failed"] = failed
log_data["wrong_issuer"] = wrong_issuer
current_app.logger.debug(log_data)
print(
"[+] Certificates: New: {new} Failed: {failed} Not using ACME: {wrong_issuer}".format(
new=new,
failed=failed,
wrong_issuer=wrong_issuer
)
)
@celery.task()
def fetch_all_pending_acme_certs():
"""Instantiate celery workers to resolve all pending Acme certificates"""
pending_certs = pending_certificate_service.get_pending_certs('all')
# We only care about certs using the acme-issuer plugin
for cert in pending_certs:
cert_authority = get_authority(cert.authority_id)
if cert_authority.plugin_name == 'acme-issuer':
if cert.last_updated == cert.date_created or datetime.datetime.now(
timezone.utc) - cert.last_updated > datetime.timedelta(minutes=3):
fetch_acme_cert.delay(cert.id)

View File

@ -0,0 +1,24 @@
"""Add last_updated field to Pending Certs
Revision ID: 9392b9f9a805
Revises: 5ae0ecefb01f
Create Date: 2018-09-17 08:33:37.087488
"""
# revision identifiers, used by Alembic.
revision = '9392b9f9a805'
down_revision = '5ae0ecefb01f'
from alembic import op
from sqlalchemy_utils import ArrowType
import sqlalchemy as sa
def upgrade():
op.add_column('pending_certs', sa.Column('last_updated', ArrowType, server_default=sa.text('now()'), onupdate=sa.text('now()'),
nullable=False))
def downgrade():
op.drop_column('pending_certs', 'last_updated')

View File

@ -5,19 +5,18 @@
"""
from datetime import datetime as dt
from sqlalchemy.orm import relationship
from sqlalchemy import Integer, ForeignKey, String, PassiveDefault, func, Column, Text, Boolean
from sqlalchemy_utils.types.arrow import ArrowType
from sqlalchemy.orm import relationship
from sqlalchemy_utils import JSONType
from sqlalchemy_utils.types.arrow import ArrowType
from lemur.certificates.models import get_or_increase_name
from lemur.common import defaults, utils
from lemur.database import db
from lemur.utils import Vault
from lemur.models import pending_cert_source_associations, \
pending_cert_destination_associations, pending_cert_notification_associations, \
pending_cert_replacement_associations, pending_cert_role_associations
from lemur.utils import Vault
class PendingCertificate(db.Model):
@ -40,6 +39,7 @@ class PendingCertificate(db.Model):
dns_provider_id = Column(Integer, ForeignKey('dns_providers.id', ondelete="CASCADE"))
status = Column(Text(), nullable=True)
last_updated = Column(ArrowType, PassiveDefault(func.now()), onupdate=func.now(), nullable=False)
rotation = Column(Boolean, default=False)
user_id = Column(Integer, ForeignKey('users.id'))
@ -47,9 +47,12 @@ class PendingCertificate(db.Model):
root_authority_id = Column(Integer, ForeignKey('authorities.id', ondelete="CASCADE"))
rotation_policy_id = Column(Integer, ForeignKey('rotation_policies.id'))
notifications = relationship('Notification', secondary=pending_cert_notification_associations, backref='pending_cert', passive_deletes=True)
destinations = relationship('Destination', secondary=pending_cert_destination_associations, backref='pending_cert', passive_deletes=True)
sources = relationship('Source', secondary=pending_cert_source_associations, backref='pending_cert', passive_deletes=True)
notifications = relationship('Notification', secondary=pending_cert_notification_associations,
backref='pending_cert', passive_deletes=True)
destinations = relationship('Destination', secondary=pending_cert_destination_associations, backref='pending_cert',
passive_deletes=True)
sources = relationship('Source', secondary=pending_cert_source_associations, backref='pending_cert',
passive_deletes=True)
roles = relationship('Role', secondary=pending_cert_role_associations, backref='pending_cert', passive_deletes=True)
replaces = relationship('Certificate',
secondary=pending_cert_replacement_associations,
@ -77,7 +80,7 @@ class PendingCertificate(db.Model):
# TODO: Fix auto-generated name, it should be renamed on creation
self.name = get_or_increase_name(
defaults.certificate_name(kwargs['common_name'], kwargs['authority'].name,
dt.now(), dt.now(), False), self.external_id)
dt.now(), dt.now(), False), self.external_id)
self.rename = True
self.cn = defaults.common_name(utils.parse_csr(self.csr))

View File

@ -1,5 +1,14 @@
from marshmallow import fields, post_load
from lemur.authorities.schemas import AuthorityNestedOutputSchema
from lemur.certificates.schemas import CertificateNestedOutputSchema
from lemur.common.schema import LemurInputSchema, LemurOutputSchema
from lemur.destinations.schemas import DestinationNestedOutputSchema
from lemur.domains.schemas import DomainNestedOutputSchema
from lemur.notifications import service as notification_service
from lemur.notifications.schemas import NotificationNestedOutputSchema
from lemur.policies.schemas import RotationPolicyNestedOutputSchema
from lemur.roles.schemas import RoleNestedOutputSchema
from lemur.schemas import (
AssociatedCertificateSchema,
AssociatedDestinationSchema,
@ -8,18 +17,7 @@ from lemur.schemas import (
EndpointNestedOutputSchema,
ExtensionSchema
)
from lemur.common.schema import LemurInputSchema, LemurOutputSchema
from lemur.users.schemas import UserNestedOutputSchema
from lemur.authorities.schemas import AuthorityNestedOutputSchema
from lemur.certificates.schemas import CertificateNestedOutputSchema
from lemur.destinations.schemas import DestinationNestedOutputSchema
from lemur.domains.schemas import DomainNestedOutputSchema
from lemur.notifications.schemas import NotificationNestedOutputSchema
from lemur.roles.schemas import RoleNestedOutputSchema
from lemur.policies.schemas import RotationPolicyNestedOutputSchema
from lemur.notifications import service as notification_service
class PendingCertificateSchema(LemurInputSchema):
@ -38,6 +36,7 @@ class PendingCertificateOutputSchema(LemurOutputSchema):
name = fields.String()
number_attempts = fields.Integer()
date_created = fields.Date()
last_updated = fields.Date()
rotation = fields.Boolean()
@ -88,7 +87,8 @@ class PendingCertificateEditInputSchema(PendingCertificateSchema):
"""
if data['owner']:
notification_name = "DEFAULT_{0}".format(data['owner'].split('@')[0].upper())
data['notifications'] += notification_service.create_default_expiration_notifications(notification_name, [data['owner']])
data['notifications'] += notification_service.create_default_expiration_notifications(notification_name,
[data['owner']])
return data

View File

@ -5,26 +5,26 @@
# pip-compile --no-index --output-file requirements-dev.txt requirements-dev.in
#
aspy.yaml==1.1.1 # via pre-commit
cached-property==1.4.3 # via pre-commit
cached-property==1.5.1 # via pre-commit
certifi==2018.8.24 # via requests
cfgv==1.1.0 # via pre-commit
chardet==3.0.4 # via requests
flake8==3.5.0
identify==1.1.4 # via pre-commit
identify==1.1.6 # via pre-commit
idna==2.7 # via requests
invoke==1.1.1
invoke==1.2.0
mccabe==0.6.1 # via flake8
nodeenv==1.3.2
pkginfo==1.4.2 # via twine
pre-commit==1.10.5
pre-commit==1.11.0
pycodestyle==2.3.1 # via flake8
pyflakes==1.6.0 # via flake8
pyyaml==3.13 # via aspy.yaml, pre-commit
requests-toolbelt==0.8.0 # via twine
requests==2.19.1 # via requests-toolbelt, twine
six==1.11.0 # via cfgv, pre-commit
toml==0.9.4 # via pre-commit
tqdm==4.25.0 # via twine
toml==0.9.6 # via pre-commit
tqdm==4.26.0 # via twine
twine==1.11.0
urllib3==1.23 # via requests
virtualenv==16.0.0 # via pre-commit

View File

@ -4,19 +4,22 @@
#
# pip-compile --no-index --output-file requirements-docs.txt requirements-docs.in
#
acme==0.26.1
acme==0.27.1
alabaster==0.7.11 # via sphinx
alembic-autogenerate-enums==0.0.2
alembic==1.0.0
amqp==2.3.2
aniso8601==3.0.2
arrow==0.12.1
asn1crypto==0.24.0
asyncpool==1.0
babel==2.6.0 # via sphinx
bcrypt==3.1.4
billiard==3.5.0.4
blinker==1.4
boto3==1.8.1
botocore==1.11.1
boto3==1.7.79
botocore==1.10.84
celery[redis]==4.2.1
certifi==2018.8.24
cffi==1.11.5
chardet==3.0.4
@ -39,13 +42,14 @@ flask==0.12
future==0.16.0
gunicorn==19.9.0
idna==2.7
imagesize==1.0.0 # via sphinx
imagesize==1.1.0 # via sphinx
inflection==0.3.1
itsdangerous==0.24
jinja2==2.10
jmespath==0.9.3
josepy==1.1.0
jsonlines==1.2.0
kombu==4.2.1
lockfile==0.12.2
mako==1.0.7
markupsafe==1.0
@ -72,6 +76,7 @@ python-editor==1.0.3
pytz==2018.5
pyyaml==3.13
raven[flask]==6.9.0
redis==2.10.6
requests-toolbelt==0.8.0
requests[security]==2.19.1
retrying==1.3.3
@ -79,12 +84,13 @@ s3transfer==0.1.13
six==1.11.0
snowballstemmer==1.2.1 # via sphinx
sphinx-rtd-theme==0.4.1
sphinx==1.7.7
sphinx==1.8.0
sphinxcontrib-httpdomain==1.7.0
sphinxcontrib-websupport==1.1.0 # via sphinx
sqlalchemy-utils==0.33.3
sqlalchemy==1.2.10
sqlalchemy-utils==0.33.4
sqlalchemy==1.2.11
tabulate==0.8.2
urllib3==1.23
vine==1.1.4
werkzeug==0.14.1
xmltodict==0.11.0

View File

@ -4,7 +4,7 @@ coverage
factory-boy
Faker
freezegun
moto
moto==1.3.4 # Issue with moto: https://github.com/spulec/moto/issues/1813
nose
pyflakes
pytest

View File

@ -5,12 +5,12 @@
# pip-compile --no-index --output-file requirements-tests.txt requirements-tests.in
#
asn1crypto==0.24.0 # via cryptography
atomicwrites==1.2.0 # via pytest
attrs==18.1.0 # via pytest
atomicwrites==1.2.1 # via pytest
attrs==18.2.0 # via pytest
aws-xray-sdk==0.95 # via moto
boto3==1.8.1 # via moto
boto3==1.9.4 # via moto
boto==2.49.0 # via moto
botocore==1.11.1 # via boto3, moto, s3transfer
botocore==1.12.4 # via boto3, moto, s3transfer
certifi==2018.8.24 # via requests
cffi==1.11.5 # via cryptography
chardet==3.0.4 # via requests
@ -23,7 +23,7 @@ docker==3.5.0 # via moto
docutils==0.14 # via botocore
ecdsa==0.13 # via python-jose
factory-boy==2.11.1
faker==0.9.0
faker==0.9.1
flask==1.0.2 # via pytest-flask
freezegun==0.3.10
future==0.16.0 # via python-jose
@ -40,14 +40,14 @@ moto==1.3.4
nose==1.3.7
pbr==4.2.0 # via mock
pluggy==0.7.1 # via pytest
py==1.5.4 # via pytest
py==1.6.0 # via pytest
pyaml==17.12.1 # via moto
pycparser==2.18 # via cffi
pycryptodome==3.6.6 # via python-jose
pyflakes==2.0.0
pytest-flask==0.10.0
pytest-flask==0.12.0
pytest-mock==1.10.0
pytest==3.7.3
pytest==3.8.0
python-dateutil==2.7.3 # via botocore, faker, freezegun, moto
python-jose==2.0.2 # via moto
pytz==2018.5 # via moto
@ -59,7 +59,7 @@ s3transfer==0.1.13 # via boto3
six==1.11.0 # via cryptography, docker, docker-pycreds, faker, freezegun, mock, more-itertools, moto, pytest, python-dateutil, python-jose, requests-mock, responses, websocket-client
text-unidecode==1.2 # via faker
urllib3==1.23 # via botocore, requests
websocket-client==0.51.0 # via docker
websocket-client==0.53.0 # via docker
werkzeug==0.14.1 # via flask, moto, pytest-flask
wrapt==1.10.11 # via aws-xray-sdk
xmltodict==0.11.0 # via moto

View File

@ -4,7 +4,9 @@ acme
alembic-autogenerate-enums
arrow
asyncpool
boto3==1.7.79
boto3==1.7.79 # Issue with moto: https://github.com/spulec/moto/issues/1813
botocore== 1.10.84 # Issue with moto: https://github.com/spulec/moto/issues/1813
celery[redis]
certifi
CloudFlare
cryptography

View File

@ -4,17 +4,20 @@
#
# pip-compile --no-index --output-file requirements.txt requirements.in
#
acme==0.26.1
acme==0.27.1
alembic-autogenerate-enums==0.0.2
alembic==1.0.0 # via flask-migrate
amqp==2.3.2 # via kombu
aniso8601==3.0.2 # via flask-restful
arrow==0.12.1
asn1crypto==0.24.0 # via cryptography
asyncpool==1.0
bcrypt==3.1.4 # via flask-bcrypt, paramiko
billiard==3.5.0.4 # via celery
blinker==1.4 # via flask-mail, flask-principal, raven
boto3==1.7.79
botocore==1.10.84 # via boto3, s3transfer
botocore==1.10.84
celery[redis]==4.2.1
certifi==2018.8.24
cffi==1.11.5 # via bcrypt, cryptography, pynacl
chardet==3.0.4 # via requests
@ -43,11 +46,12 @@ jinja2==2.10
jmespath==0.9.3 # via boto3, botocore
josepy==1.1.0 # via acme
jsonlines==1.2.0 # via cloudflare
kombu==4.2.1 # via celery
lockfile==0.12.2
mako==1.0.7 # via alembic
markupsafe==1.0 # via jinja2, mako
marshmallow-sqlalchemy==0.14.1
marshmallow==2.15.4
marshmallow==2.15.5
mock==2.0.0 # via acme
ndg-httpsclient==0.5.1
paramiko==2.4.1
@ -64,17 +68,19 @@ pyrfc3339==1.1 # via acme
python-dateutil==2.7.3 # via alembic, arrow, botocore
python-editor==1.0.3 # via alembic
python-ldap==3.1.0
pytz==2018.5 # via acme, flask-restful, pyrfc3339
pytz==2018.5 # via acme, celery, flask-restful, pyrfc3339
pyyaml==3.13 # via cloudflare
raven[flask]==6.9.0
redis==2.10.6 # via celery
requests-toolbelt==0.8.0 # via acme
requests[security]==2.19.1
retrying==1.3.3
s3transfer==0.1.13 # via boto3
six==1.11.0
sqlalchemy-utils==0.33.3
sqlalchemy-utils==0.33.4
sqlalchemy==1.2.11 # via alembic, flask-sqlalchemy, marshmallow-sqlalchemy, sqlalchemy-utils
tabulate==0.8.2
urllib3==1.23 # via requests
vine==1.1.4 # via amqp
werkzeug==0.14.1 # via flask
xmltodict==0.11.0