lemur/lemur/certificates/models.py

495 lines
15 KiB
Python

"""
.. module: lemur.certificates.models
:platform: Unix
:copyright: (c) 2018 by Netflix Inc., see AUTHORS for more
:license: Apache, see LICENSE for more details.
.. moduleauthor:: Kevin Glisson <kglisson@netflix.com>
"""
from datetime import timedelta
import arrow
from cryptography import x509
from flask import current_app
from idna.core import InvalidCodepoint
from sqlalchemy import (
event,
Integer,
ForeignKey,
String,
PassiveDefault,
func,
Column,
Text,
Boolean,
Index,
)
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import relationship
from sqlalchemy.sql.expression import case, extract
from sqlalchemy_utils.types.arrow import ArrowType
from werkzeug.utils import cached_property
from lemur.common import defaults, utils, validators
from lemur.constants import SUCCESS_METRIC_STATUS, FAILURE_METRIC_STATUS
from lemur.database import db
from lemur.domains.models import Domain
from lemur.extensions import metrics
from lemur.extensions import sentry
from lemur.models import (
certificate_associations,
certificate_source_associations,
certificate_destination_associations,
certificate_notification_associations,
certificate_replacement_associations,
roles_certificates,
pending_cert_replacement_associations,
)
from lemur.plugins.base import plugins
from lemur.policies.models import RotationPolicy
from lemur.utils import Vault
def get_sequence(name):
if "-" not in name:
return name, None
parts = name.split("-")
# see if we have an int at the end of our name
try:
seq = int(parts[-1])
except ValueError:
return name, None
# we might have a date at the end of our name
if len(parts[-1]) == 8:
return name, None
root = "-".join(parts[:-1])
return root, seq
def get_or_increase_name(name, serial):
certificates = Certificate.query.filter(Certificate.name == name).all()
if not certificates:
return name
serial_name = "{0}-{1}".format(name, hex(int(serial))[2:].upper())
certificates = Certificate.query.filter(Certificate.name == serial_name).all()
if not certificates:
return serial_name
certificates = Certificate.query.filter(
Certificate.name.ilike("{0}%".format(serial_name))
).all()
ends = [0]
root, end = get_sequence(serial_name)
for cert in certificates:
root, end = get_sequence(cert.name)
if end:
ends.append(end)
return "{0}-{1}".format(root, max(ends) + 1)
class Certificate(db.Model):
__tablename__ = "certificates"
__table_args__ = (
Index(
"ix_certificates_cn",
"cn",
postgresql_ops={"cn": "gin_trgm_ops"},
postgresql_using="gin",
),
Index(
"ix_certificates_name",
"name",
postgresql_ops={"name": "gin_trgm_ops"},
postgresql_using="gin",
),
)
id = Column(Integer, primary_key=True)
ix = Index(
"ix_certificates_id_desc", id.desc(), postgresql_using="btree", unique=True
)
external_id = Column(String(128))
owner = Column(String(128), nullable=False)
name = Column(String(256), unique=True)
description = Column(String(1024))
notify = Column(Boolean, default=True)
body = Column(Text(), nullable=False)
chain = Column(Text())
csr = Column(Text())
private_key = Column(Vault)
issuer = Column(String(128))
serial = Column(String(128))
cn = Column(String(128))
deleted = Column(Boolean, index=True, default=False)
dns_provider_id = Column(
Integer(), ForeignKey("dns_providers.id", ondelete="CASCADE"), nullable=True
)
not_before = Column(ArrowType)
not_after = Column(ArrowType)
not_after_ix = Index("ix_certificates_not_after", not_after.desc())
date_created = Column(ArrowType, PassiveDefault(func.now()), nullable=False)
signing_algorithm = Column(String(128))
status = Column(String(128))
bits = Column(Integer())
san = Column(String(1024)) # TODO this should be migrated to boolean
rotation = Column(Boolean, default=False)
user_id = Column(Integer, ForeignKey("users.id"))
authority_id = Column(Integer, ForeignKey("authorities.id", ondelete="CASCADE"))
root_authority_id = Column(
Integer, ForeignKey("authorities.id", ondelete="CASCADE")
)
rotation_policy_id = Column(Integer, ForeignKey("rotation_policies.id"))
key_type = Column(String(128))
notifications = relationship(
"Notification",
secondary=certificate_notification_associations,
backref="certificate",
)
destinations = relationship(
"Destination",
secondary=certificate_destination_associations,
backref="certificate",
)
sources = relationship(
"Source", secondary=certificate_source_associations, backref="certificate"
)
domains = relationship(
"Domain", secondary=certificate_associations, backref="certificate"
)
roles = relationship("Role", secondary=roles_certificates, backref="certificate")
replaces = relationship(
"Certificate",
secondary=certificate_replacement_associations,
primaryjoin=id == certificate_replacement_associations.c.certificate_id, # noqa
secondaryjoin=id
== certificate_replacement_associations.c.replaced_certificate_id, # noqa
backref="replaced",
)
replaced_by_pending = relationship(
"PendingCertificate",
secondary=pending_cert_replacement_associations,
backref="pending_replace",
viewonly=True,
)
logs = relationship("Log", backref="certificate")
endpoints = relationship("Endpoint", backref="certificate")
rotation_policy = relationship("RotationPolicy")
sensitive_fields = ("private_key",)
def __init__(self, **kwargs):
self.body = kwargs["body"].strip()
cert = self.parsed_cert
self.issuer = defaults.issuer(cert)
self.cn = defaults.common_name(cert)
self.san = defaults.san(cert)
self.not_before = defaults.not_before(cert)
self.not_after = defaults.not_after(cert)
self.serial = defaults.serial(cert)
# when destinations are appended they require a valid name.
if kwargs.get("name"):
self.name = get_or_increase_name(
defaults.text_to_slug(kwargs["name"]), self.serial
)
else:
self.name = get_or_increase_name(
defaults.certificate_name(
self.cn, self.issuer, self.not_before, self.not_after, self.san
),
self.serial,
)
self.owner = kwargs["owner"]
if kwargs.get("private_key"):
self.private_key = kwargs["private_key"].strip()
if kwargs.get("chain"):
self.chain = kwargs["chain"].strip()
if kwargs.get("csr"):
self.csr = kwargs["csr"].strip()
self.notify = kwargs.get("notify", True)
self.destinations = kwargs.get("destinations", [])
self.notifications = kwargs.get("notifications", [])
self.description = kwargs.get("description")
self.roles = list(set(kwargs.get("roles", [])))
self.replaces = kwargs.get("replaces", [])
self.rotation = kwargs.get("rotation")
self.rotation_policy = kwargs.get("rotation_policy")
self.key_type = kwargs.get("key_type")
self.signing_algorithm = defaults.signing_algorithm(cert)
self.bits = defaults.bitstrength(cert)
self.external_id = kwargs.get("external_id")
self.authority_id = kwargs.get("authority_id")
self.dns_provider_id = kwargs.get("dns_provider_id")
for domain in defaults.domains(cert):
self.domains.append(Domain(name=domain))
# Check integrity before saving anything into the database.
# For user-facing API calls, validation should also be done in schema validators.
self.check_integrity()
def check_integrity(self):
"""
Integrity checks: Does the cert have a valid chain and matching private key?
"""
if self.private_key:
validators.verify_private_key_match(
utils.parse_private_key(self.private_key),
self.parsed_cert,
error_class=AssertionError,
)
if self.chain:
chain = [self.parsed_cert] + utils.parse_cert_chain(self.chain)
validators.verify_cert_chain(chain, error_class=AssertionError)
@cached_property
def parsed_cert(self):
assert self.body, "Certificate body not set"
return utils.parse_certificate(self.body)
@property
def active(self):
return self.notify
@property
def organization(self):
return defaults.organization(self.parsed_cert)
@property
def organizational_unit(self):
return defaults.organizational_unit(self.parsed_cert)
@property
def country(self):
return defaults.country(self.parsed_cert)
@property
def state(self):
return defaults.state(self.parsed_cert)
@property
def location(self):
return defaults.location(self.parsed_cert)
@property
def distinguished_name(self):
return self.parsed_cert.subject.rfc4514_string()
"""
# Commenting this property as key_type is now added as a column. This code can be removed in future.
@property
def key_type(self):
if isinstance(self.parsed_cert.public_key(), rsa.RSAPublicKey):
return "RSA{key_size}".format(
key_size=self.parsed_cert.public_key().key_size
)
elif isinstance(self.parsed_cert.public_key(), ec.EllipticCurvePublicKey):
return get_key_type_from_ec_curve(self.parsed_cert.public_key().curve.name)
"""
@property
def validity_remaining(self):
return abs(self.not_after - arrow.utcnow())
@property
def validity_range(self):
return self.not_after - self.not_before
@property
def subject(self):
return self.parsed_cert.subject
@property
def public_key(self):
return self.parsed_cert.public_key()
@hybrid_property
def expired(self):
# can't compare offset-naive and offset-aware datetimes
if arrow.Arrow.fromdatetime(self.not_after) <= arrow.utcnow():
return True
@expired.expression
def expired(cls):
return case([(cls.not_after <= arrow.utcnow(), True)], else_=False)
@hybrid_property
def revoked(self):
if "revoked" == self.status:
return True
@revoked.expression
def revoked(cls):
return case([(cls.status == "revoked", True)], else_=False)
@hybrid_property
def has_private_key(self):
return self.private_key is not None
@has_private_key.expression
def has_private_key(cls):
return case([(cls.private_key.is_(None), True)], else_=False)
@hybrid_property
def in_rotation_window(self):
"""
Determines if a certificate is available for rotation based
on the rotation policy associated.
:return:
"""
now = arrow.utcnow()
end = now + timedelta(days=self.rotation_policy.days)
if self.not_after <= end:
return True
@in_rotation_window.expression
def in_rotation_window(cls):
"""
Determines if a certificate is available for rotation based
on the rotation policy associated.
:return:
"""
return case(
[(extract("day", cls.not_after - func.now()) <= RotationPolicy.days, True)],
else_=False,
)
@property
def extensions(self):
# setup default values
return_extensions = {"sub_alt_names": {"names": []}}
try:
for extension in self.parsed_cert.extensions:
value = extension.value
if isinstance(value, x509.BasicConstraints):
return_extensions["basic_constraints"] = value
elif isinstance(value, x509.SubjectAlternativeName):
return_extensions["sub_alt_names"]["names"] = value
elif isinstance(value, x509.ExtendedKeyUsage):
return_extensions["extended_key_usage"] = value
elif isinstance(value, x509.KeyUsage):
return_extensions["key_usage"] = value
elif isinstance(value, x509.SubjectKeyIdentifier):
return_extensions["subject_key_identifier"] = {"include_ski": True}
elif isinstance(value, x509.AuthorityInformationAccess):
return_extensions["certificate_info_access"] = {"include_aia": True}
elif isinstance(value, x509.AuthorityKeyIdentifier):
aki = {"use_key_identifier": False, "use_authority_cert": False}
if value.key_identifier:
aki["use_key_identifier"] = True
if value.authority_cert_issuer:
aki["use_authority_cert"] = True
return_extensions["authority_key_identifier"] = aki
elif isinstance(value, x509.CRLDistributionPoints):
return_extensions["crl_distribution_points"] = {
"include_crl_dp": value
}
# TODO: Not supporting custom OIDs yet. https://github.com/Netflix/lemur/issues/665
else:
current_app.logger.warning(
"Custom OIDs not yet supported for clone operation."
)
except InvalidCodepoint as e:
sentry.captureException()
current_app.logger.warning(
"Unable to parse extensions due to underscore in dns name"
)
except ValueError as e:
sentry.captureException()
current_app.logger.warning("Unable to parse")
current_app.logger.exception(e)
return return_extensions
def __repr__(self):
return "Certificate(name={name})".format(name=self.name)
@event.listens_for(Certificate.destinations, "append")
def update_destinations(target, value, initiator):
"""
Attempt to upload certificate to the new destination
:param target:
:param value:
:param initiator:
:return:
"""
destination_plugin = plugins.get(value.plugin_name)
status = FAILURE_METRIC_STATUS
if target.expired:
return
try:
if target.private_key or not destination_plugin.requires_key:
destination_plugin.upload(
target.name,
target.body,
target.private_key,
target.chain,
value.options,
)
status = SUCCESS_METRIC_STATUS
except Exception as e:
sentry.captureException()
raise
metrics.send(
"destination_upload",
"counter",
1,
metric_tags={
"status": status,
"certificate": target.name,
"destination": value.label,
},
)
@event.listens_for(Certificate.replaces, "append")
def update_replacement(target, value, initiator):
"""
When a certificate is marked as 'replaced' we should not notify.
:param target:
:param value:
:param initiator:
:return:
"""
value.notify = False