""" .. module: lemur.certificates.models :platform: Unix :copyright: (c) 2018 by Netflix Inc., see AUTHORS for more :license: Apache, see LICENSE for more details. .. moduleauthor:: Kevin Glisson """ from datetime import timedelta import arrow from cryptography import x509 from flask import current_app from idna.core import InvalidCodepoint from sqlalchemy import ( event, Integer, ForeignKey, String, DefaultClause, func, Column, Text, Boolean, Index, ) from sqlalchemy.ext.hybrid import hybrid_property from sqlalchemy.orm import relationship from sqlalchemy.sql.expression import case, extract from sqlalchemy_utils.types.arrow import ArrowType from werkzeug.utils import cached_property from lemur.common import defaults, utils, validators from lemur.constants import SUCCESS_METRIC_STATUS, FAILURE_METRIC_STATUS from lemur.database import db from lemur.domains.models import Domain from lemur.extensions import metrics from lemur.extensions import sentry from lemur.models import ( certificate_associations, certificate_source_associations, certificate_destination_associations, certificate_notification_associations, certificate_replacement_associations, roles_certificates, pending_cert_replacement_associations, ) from lemur.plugins.base import plugins from lemur.policies.models import RotationPolicy from lemur.utils import Vault def get_sequence(name): if "-" not in name: return name, None parts = name.split("-") # see if we have an int at the end of our name try: seq = int(parts[-1]) except ValueError: return name, None # we might have a date at the end of our name if len(parts[-1]) == 8: return name, None root = "-".join(parts[:-1]) return root, seq def get_or_increase_name(name, serial): certificates = Certificate.query.filter(Certificate.name == name).all() if not certificates: return name serial_name = "{0}-{1}".format(name, hex(int(serial))[2:].upper()) certificates = Certificate.query.filter(Certificate.name == serial_name).all() if not certificates: return serial_name certificates = Certificate.query.filter( Certificate.name.ilike("{0}%".format(serial_name)) ).all() ends = [0] root, end = get_sequence(serial_name) for cert in certificates: root, end = get_sequence(cert.name) if end: ends.append(end) return "{0}-{1}".format(root, max(ends) + 1) class Certificate(db.Model): __tablename__ = "certificates" __table_args__ = ( Index( "ix_certificates_cn", "cn", postgresql_ops={"cn": "gin_trgm_ops"}, postgresql_using="gin", ), Index( "ix_certificates_name", "name", postgresql_ops={"name": "gin_trgm_ops"}, postgresql_using="gin", ), ) id = Column(Integer, primary_key=True) ix = Index( "ix_certificates_id_desc", id.desc(), postgresql_using="btree", unique=True ) external_id = Column(String(128)) owner = Column(String(128), nullable=False) name = Column(String(256), unique=True) description = Column(String(1024)) notify = Column(Boolean, default=True) body = Column(Text(), nullable=False) chain = Column(Text()) csr = Column(Text()) private_key = Column(Vault) issuer = Column(String(128)) serial = Column(String(128)) cn = Column(String(128)) deleted = Column(Boolean, index=True, default=False) dns_provider_id = Column( Integer(), ForeignKey("dns_providers.id", ondelete="CASCADE"), nullable=True ) not_before = Column(ArrowType) not_after = Column(ArrowType) not_after_ix = Index("ix_certificates_not_after", not_after.desc()) date_created = Column(ArrowType, DefaultClause(func.now()), nullable=False) signing_algorithm = Column(String(128)) status = Column(String(128)) bits = Column(Integer()) san = Column(String(1024)) # TODO this should be migrated to boolean rotation = Column(Boolean, default=False) user_id = Column(Integer, ForeignKey("users.id")) authority_id = Column(Integer, ForeignKey("authorities.id", ondelete="CASCADE")) root_authority_id = Column( Integer, ForeignKey("authorities.id", ondelete="CASCADE") ) rotation_policy_id = Column(Integer, ForeignKey("rotation_policies.id")) key_type = Column(String(128)) notifications = relationship( "Notification", secondary=certificate_notification_associations, backref="certificate", ) destinations = relationship( "Destination", secondary=certificate_destination_associations, backref="certificate", ) sources = relationship( "Source", secondary=certificate_source_associations, backref="certificate" ) domains = relationship( "Domain", secondary=certificate_associations, backref="certificate" ) roles = relationship("Role", secondary=roles_certificates, backref="certificate") replaces = relationship( "Certificate", secondary=certificate_replacement_associations, primaryjoin=id == certificate_replacement_associations.c.certificate_id, # noqa secondaryjoin=id == certificate_replacement_associations.c.replaced_certificate_id, # noqa backref="replaced", ) replaced_by_pending = relationship( "PendingCertificate", secondary=pending_cert_replacement_associations, backref="pending_replace", ) logs = relationship("Log", backref="certificate") endpoints = relationship("Endpoint", backref="certificate") rotation_policy = relationship("RotationPolicy") sensitive_fields = ("private_key",) def __init__(self, **kwargs): self.body = kwargs["body"].strip() cert = self.parsed_cert self.issuer = defaults.issuer(cert) self.cn = defaults.common_name(cert) self.san = defaults.san(cert) self.not_before = defaults.not_before(cert) self.not_after = defaults.not_after(cert) self.serial = defaults.serial(cert) # when destinations are appended they require a valid name. if kwargs.get("name"): self.name = get_or_increase_name( defaults.text_to_slug(kwargs["name"]), self.serial ) else: self.name = get_or_increase_name( defaults.certificate_name( self.cn, self.issuer, self.not_before, self.not_after, self.san ), self.serial, ) self.owner = kwargs["owner"] if kwargs.get("private_key"): self.private_key = kwargs["private_key"].strip() if kwargs.get("chain"): self.chain = kwargs["chain"].strip() if kwargs.get("csr"): self.csr = kwargs["csr"].strip() self.notify = kwargs.get("notify", True) self.destinations = kwargs.get("destinations", []) self.notifications = kwargs.get("notifications", []) self.description = kwargs.get("description") self.roles = list(set(kwargs.get("roles", []))) self.replaces = kwargs.get("replaces", []) self.rotation = kwargs.get("rotation") self.rotation_policy = kwargs.get("rotation_policy") self.key_type = kwargs.get("key_type") self.signing_algorithm = defaults.signing_algorithm(cert) self.bits = defaults.bitstrength(cert) self.external_id = kwargs.get("external_id") self.authority_id = kwargs.get("authority_id") self.dns_provider_id = kwargs.get("dns_provider_id") for domain in defaults.domains(cert): self.domains.append(Domain(name=domain)) # Check integrity before saving anything into the database. # For user-facing API calls, validation should also be done in schema validators. self.check_integrity() def check_integrity(self): """ Integrity checks: Does the cert have a valid chain and matching private key? """ if self.private_key: validators.verify_private_key_match( utils.parse_private_key(self.private_key), self.parsed_cert, error_class=AssertionError, ) if self.chain: chain = [self.parsed_cert] + utils.parse_cert_chain(self.chain) validators.verify_cert_chain(chain, error_class=AssertionError) @cached_property def parsed_cert(self): assert self.body, "Certificate body not set" return utils.parse_certificate(self.body) @property def active(self): return self.notify @property def organization(self): return defaults.organization(self.parsed_cert) @property def organizational_unit(self): return defaults.organizational_unit(self.parsed_cert) @property def country(self): return defaults.country(self.parsed_cert) @property def state(self): return defaults.state(self.parsed_cert) @property def location(self): return defaults.location(self.parsed_cert) @property def distinguished_name(self): return self.parsed_cert.subject.rfc4514_string() """ # Commenting this property as key_type is now added as a column. This code can be removed in future. @property def key_type(self): if isinstance(self.parsed_cert.public_key(), rsa.RSAPublicKey): return "RSA{key_size}".format( key_size=self.parsed_cert.public_key().key_size ) elif isinstance(self.parsed_cert.public_key(), ec.EllipticCurvePublicKey): return get_key_type_from_ec_curve(self.parsed_cert.public_key().curve.name) """ @property def validity_remaining(self): return abs(self.not_after - arrow.utcnow()) @property def validity_range(self): return self.not_after - self.not_before @property def subject(self): return self.parsed_cert.subject @property def public_key(self): return self.parsed_cert.public_key() @hybrid_property def expired(self): # can't compare offset-naive and offset-aware datetimes if arrow.Arrow.fromdatetime(self.not_after) <= arrow.utcnow(): return True @expired.expression def expired(cls): return case([(cls.not_after <= arrow.utcnow(), True)], else_=False) @hybrid_property def revoked(self): if "revoked" == self.status: return True @revoked.expression def revoked(cls): return case([(cls.status == "revoked", True)], else_=False) @hybrid_property def has_private_key(self): return self.private_key is not None @has_private_key.expression def has_private_key(cls): return case([(cls.private_key.is_(None), True)], else_=False) @hybrid_property def in_rotation_window(self): """ Determines if a certificate is available for rotation based on the rotation policy associated. :return: """ now = arrow.utcnow() end = now + timedelta(days=self.rotation_policy.days) if self.not_after <= end: return True @in_rotation_window.expression def in_rotation_window(cls): """ Determines if a certificate is available for rotation based on the rotation policy associated. :return: """ return case( [(extract("day", cls.not_after - func.now()) <= RotationPolicy.days, True)], else_=False, ) @property def extensions(self): # setup default values return_extensions = {"sub_alt_names": {"names": []}} try: for extension in self.parsed_cert.extensions: value = extension.value if isinstance(value, x509.BasicConstraints): return_extensions["basic_constraints"] = value elif isinstance(value, x509.SubjectAlternativeName): return_extensions["sub_alt_names"]["names"] = value elif isinstance(value, x509.ExtendedKeyUsage): return_extensions["extended_key_usage"] = value elif isinstance(value, x509.KeyUsage): return_extensions["key_usage"] = value elif isinstance(value, x509.SubjectKeyIdentifier): return_extensions["subject_key_identifier"] = {"include_ski": True} elif isinstance(value, x509.AuthorityInformationAccess): return_extensions["certificate_info_access"] = {"include_aia": True} elif isinstance(value, x509.AuthorityKeyIdentifier): aki = {"use_key_identifier": False, "use_authority_cert": False} if value.key_identifier: aki["use_key_identifier"] = True if value.authority_cert_issuer: aki["use_authority_cert"] = True return_extensions["authority_key_identifier"] = aki elif isinstance(value, x509.CRLDistributionPoints): return_extensions["crl_distribution_points"] = { "include_crl_dp": value } # TODO: Not supporting custom OIDs yet. https://github.com/Netflix/lemur/issues/665 else: current_app.logger.warning( "Custom OIDs not yet supported for clone operation." ) except InvalidCodepoint as e: sentry.captureException() current_app.logger.warning( "Unable to parse extensions due to underscore in dns name" ) except ValueError as e: sentry.captureException() current_app.logger.warning("Unable to parse") current_app.logger.exception(e) return return_extensions def __repr__(self): return "Certificate(name={name})".format(name=self.name) @event.listens_for(Certificate.destinations, "append") def update_destinations(target, value, initiator): """ Attempt to upload certificate to the new destination :param target: :param value: :param initiator: :return: """ destination_plugin = plugins.get(value.plugin_name) status = FAILURE_METRIC_STATUS if target.expired: return try: if target.private_key or not destination_plugin.requires_key: destination_plugin.upload( target.name, target.body, target.private_key, target.chain, value.options, ) status = SUCCESS_METRIC_STATUS except Exception as e: sentry.captureException() raise metrics.send( "destination_upload", "counter", 1, metric_tags={ "status": status, "certificate": target.name, "destination": value.label, }, ) @event.listens_for(Certificate.replaces, "append") def update_replacement(target, value, initiator): """ When a certificate is marked as 'replaced' we should not notify. :param target: :param value: :param initiator: :return: """ value.notify = False