lemur/lemur/certificates/models.py

411 lines
14 KiB
Python

"""
.. module: lemur.certificates.models
:platform: Unix
:copyright: (c) 2018 by Netflix Inc., see AUTHORS for more
:license: Apache, see LICENSE for more details.
.. moduleauthor:: Kevin Glisson <kglisson@netflix.com>
"""
from datetime import timedelta
import arrow
from cryptography import x509
from cryptography.hazmat.primitives.asymmetric import rsa
from flask import current_app
from idna.core import InvalidCodepoint
from sqlalchemy import event, Integer, ForeignKey, String, PassiveDefault, func, Column, Text, Boolean, Index
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import relationship
from sqlalchemy.sql.expression import case, extract
from sqlalchemy_utils.types.arrow import ArrowType
from werkzeug.utils import cached_property
from lemur.common import defaults, utils, validators
from lemur.constants import SUCCESS_METRIC_STATUS, FAILURE_METRIC_STATUS
from lemur.database import db
from lemur.domains.models import Domain
from lemur.extensions import metrics
from lemur.extensions import sentry
from lemur.models import certificate_associations, certificate_source_associations, \
certificate_destination_associations, certificate_notification_associations, \
certificate_replacement_associations, roles_certificates, pending_cert_replacement_associations
from lemur.plugins.base import plugins
from lemur.policies.models import RotationPolicy
from lemur.utils import Vault
def get_sequence(name):
if '-' not in name:
return name, None
parts = name.split('-')
# see if we have an int at the end of our name
try:
seq = int(parts[-1])
except ValueError:
return name, None
# we might have a date at the end of our name
if len(parts[-1]) == 8:
return name, None
root = '-'.join(parts[:-1])
return root, seq
def get_or_increase_name(name, serial):
certificates = Certificate.query.filter(Certificate.name.ilike('{0}%'.format(name))).all()
if not certificates:
return name
serial_name = '{0}-{1}'.format(name, hex(int(serial))[2:].upper())
certificates = Certificate.query.filter(Certificate.name.ilike('{0}%'.format(serial_name))).all()
if not certificates:
return serial_name
ends = [0]
root, end = get_sequence(serial_name)
for cert in certificates:
root, end = get_sequence(cert.name)
if end:
ends.append(end)
return '{0}-{1}'.format(root, max(ends) + 1)
class Certificate(db.Model):
__tablename__ = 'certificates'
__table_args__ = (
Index('ix_certificates_cn', "cn",
postgresql_ops={"cn": "gin_trgm_ops"},
postgresql_using='gin'),
Index('ix_certificates_name', "name",
postgresql_ops={"name": "gin_trgm_ops"},
postgresql_using='gin'),
)
id = Column(Integer, primary_key=True)
ix = Index('ix_certificates_id_desc', id.desc(), postgresql_using='btree', unique=True)
external_id = Column(String(128))
owner = Column(String(128), nullable=False)
name = Column(String(256), unique=True)
description = Column(String(1024))
notify = Column(Boolean, default=True)
body = Column(Text(), nullable=False)
chain = Column(Text())
csr = Column(Text())
private_key = Column(Vault)
issuer = Column(String(128))
serial = Column(String(128))
cn = Column(String(128))
deleted = Column(Boolean, index=True, default=False)
dns_provider_id = Column(Integer(), ForeignKey('dns_providers.id', ondelete='CASCADE'), nullable=True)
not_before = Column(ArrowType)
not_after = Column(ArrowType)
date_created = Column(ArrowType, PassiveDefault(func.now()), nullable=False)
signing_algorithm = Column(String(128))
status = Column(String(128))
bits = Column(Integer())
san = Column(String(1024)) # TODO this should be migrated to boolean
rotation = Column(Boolean, default=False)
user_id = Column(Integer, ForeignKey('users.id'))
authority_id = Column(Integer, ForeignKey('authorities.id', ondelete="CASCADE"))
root_authority_id = Column(Integer, ForeignKey('authorities.id', ondelete="CASCADE"))
rotation_policy_id = Column(Integer, ForeignKey('rotation_policies.id'))
notifications = relationship('Notification', secondary=certificate_notification_associations, backref='certificate')
destinations = relationship('Destination', secondary=certificate_destination_associations, backref='certificate')
sources = relationship('Source', secondary=certificate_source_associations, backref='certificate')
domains = relationship('Domain', secondary=certificate_associations, backref='certificate')
roles = relationship('Role', secondary=roles_certificates, backref='certificate')
replaces = relationship('Certificate',
secondary=certificate_replacement_associations,
primaryjoin=id == certificate_replacement_associations.c.certificate_id, # noqa
secondaryjoin=id == certificate_replacement_associations.c.replaced_certificate_id, # noqa
backref='replaced')
replaced_by_pending = relationship('PendingCertificate',
secondary=pending_cert_replacement_associations,
backref='pending_replace',
viewonly=True)
logs = relationship('Log', backref='certificate')
endpoints = relationship('Endpoint', backref='certificate')
rotation_policy = relationship("RotationPolicy")
sensitive_fields = ('private_key',)
def __init__(self, **kwargs):
self.body = kwargs['body'].strip()
cert = self.parsed_cert
self.issuer = defaults.issuer(cert)
self.cn = defaults.common_name(cert)
self.san = defaults.san(cert)
self.not_before = defaults.not_before(cert)
self.not_after = defaults.not_after(cert)
self.serial = defaults.serial(cert)
# when destinations are appended they require a valid name.
if kwargs.get('name'):
self.name = get_or_increase_name(defaults.text_to_slug(kwargs['name']), self.serial)
else:
self.name = get_or_increase_name(
defaults.certificate_name(self.cn, self.issuer, self.not_before, self.not_after, self.san), self.serial)
self.owner = kwargs['owner']
if kwargs.get('private_key'):
self.private_key = kwargs['private_key'].strip()
if kwargs.get('chain'):
self.chain = kwargs['chain'].strip()
if kwargs.get('csr'):
self.csr = kwargs['csr'].strip()
self.notify = kwargs.get('notify', True)
self.destinations = kwargs.get('destinations', [])
self.notifications = kwargs.get('notifications', [])
self.description = kwargs.get('description')
self.roles = list(set(kwargs.get('roles', [])))
self.replaces = kwargs.get('replaces', [])
self.rotation = kwargs.get('rotation')
self.rotation_policy = kwargs.get('rotation_policy')
self.signing_algorithm = defaults.signing_algorithm(cert)
self.bits = defaults.bitstrength(cert)
self.external_id = kwargs.get('external_id')
self.authority_id = kwargs.get('authority_id')
self.dns_provider_id = kwargs.get('dns_provider_id')
for domain in defaults.domains(cert):
self.domains.append(Domain(name=domain))
# Check integrity before saving anything into the database.
# For user-facing API calls, validation should also be done in schema validators.
self.check_integrity()
def check_integrity(self):
"""
Integrity checks: Does the cert have a valid chain and matching private key?
"""
if self.private_key:
validators.verify_private_key_match(utils.parse_private_key(self.private_key), self.parsed_cert,
error_class=AssertionError)
if self.chain:
chain = [self.parsed_cert] + utils.parse_cert_chain(self.chain)
validators.verify_cert_chain(chain, error_class=AssertionError)
@cached_property
def parsed_cert(self):
assert self.body, "Certificate body not set"
return utils.parse_certificate(self.body)
@property
def active(self):
return self.notify
@property
def organization(self):
return defaults.organization(self.parsed_cert)
@property
def organizational_unit(self):
return defaults.organizational_unit(self.parsed_cert)
@property
def country(self):
return defaults.country(self.parsed_cert)
@property
def state(self):
return defaults.state(self.parsed_cert)
@property
def location(self):
return defaults.location(self.parsed_cert)
@property
def distinguished_name(self):
return self.parsed_cert.subject.rfc4514_string()
@property
def key_type(self):
if isinstance(self.parsed_cert.public_key(), rsa.RSAPublicKey):
return 'RSA{key_size}'.format(key_size=self.parsed_cert.public_key().key_size)
@property
def validity_remaining(self):
return abs(self.not_after - arrow.utcnow())
@property
def validity_range(self):
return self.not_after - self.not_before
@property
def subject(self):
return self.parsed_cert.subject
@property
def public_key(self):
return self.parsed_cert.public_key()
@hybrid_property
def expired(self):
if self.not_after <= arrow.utcnow():
return True
@expired.expression
def expired(cls):
return case(
[
(cls.not_after <= arrow.utcnow(), True)
],
else_=False
)
@hybrid_property
def revoked(self):
if 'revoked' == self.status:
return True
@revoked.expression
def revoked(cls):
return case(
[
(cls.status == 'revoked', True)
],
else_=False
)
@hybrid_property
def in_rotation_window(self):
"""
Determines if a certificate is available for rotation based
on the rotation policy associated.
:return:
"""
now = arrow.utcnow()
end = now + timedelta(days=self.rotation_policy.days)
if self.not_after <= end:
return True
@in_rotation_window.expression
def in_rotation_window(cls):
"""
Determines if a certificate is available for rotation based
on the rotation policy associated.
:return:
"""
return case(
[
(extract('day', cls.not_after - func.now()) <= RotationPolicy.days, True)
],
else_=False
)
@property
def extensions(self):
# setup default values
return_extensions = {
'sub_alt_names': {'names': []}
}
try:
for extension in self.parsed_cert.extensions:
value = extension.value
if isinstance(value, x509.BasicConstraints):
return_extensions['basic_constraints'] = value
elif isinstance(value, x509.SubjectAlternativeName):
return_extensions['sub_alt_names']['names'] = value
elif isinstance(value, x509.ExtendedKeyUsage):
return_extensions['extended_key_usage'] = value
elif isinstance(value, x509.KeyUsage):
return_extensions['key_usage'] = value
elif isinstance(value, x509.SubjectKeyIdentifier):
return_extensions['subject_key_identifier'] = {'include_ski': True}
elif isinstance(value, x509.AuthorityInformationAccess):
return_extensions['certificate_info_access'] = {'include_aia': True}
elif isinstance(value, x509.AuthorityKeyIdentifier):
aki = {
'use_key_identifier': False,
'use_authority_cert': False
}
if value.key_identifier:
aki['use_key_identifier'] = True
if value.authority_cert_issuer:
aki['use_authority_cert'] = True
return_extensions['authority_key_identifier'] = aki
elif isinstance(value, x509.CRLDistributionPoints):
return_extensions['crl_distribution_points'] = {'include_crl_dp': value}
# TODO: Not supporting custom OIDs yet. https://github.com/Netflix/lemur/issues/665
else:
current_app.logger.warning('Custom OIDs not yet supported for clone operation.')
except InvalidCodepoint as e:
sentry.captureException()
current_app.logger.warning('Unable to parse extensions due to underscore in dns name')
except ValueError as e:
sentry.captureException()
current_app.logger.warning('Unable to parse')
current_app.logger.exception(e)
return return_extensions
def __repr__(self):
return "Certificate(name={name})".format(name=self.name)
@event.listens_for(Certificate.destinations, 'append')
def update_destinations(target, value, initiator):
"""
Attempt to upload certificate to the new destination
:param target:
:param value:
:param initiator:
:return:
"""
destination_plugin = plugins.get(value.plugin_name)
status = FAILURE_METRIC_STATUS
try:
if target.private_key or not destination_plugin.requires_key:
destination_plugin.upload(target.name, target.body, target.private_key, target.chain, value.options)
status = SUCCESS_METRIC_STATUS
except Exception as e:
sentry.captureException()
raise
metrics.send('destination_upload', 'counter', 1,
metric_tags={'status': status, 'certificate': target.name, 'destination': value.label})
@event.listens_for(Certificate.replaces, 'append')
def update_replacement(target, value, initiator):
"""
When a certificate is marked as 'replaced' we should not notify.
:param target:
:param value:
:param initiator:
:return:
"""
value.notify = False