Merge branch 'master' of github.com:Netflix/lemur into improving-cert-lookup-time

This commit is contained in:
Hossein Shafagh
2019-05-30 08:55:49 -07:00
227 changed files with 9420 additions and 5972 deletions

View File

@ -34,7 +34,7 @@ from lemur.certificates.service import (
get_all_pending_reissue,
get_by_name,
get_all_certs,
get
get,
)
from lemur.certificates.verify import verify_string
@ -56,11 +56,14 @@ def print_certificate_details(details):
"\t[+] Authority: {authority_name}\n"
"\t[+] Validity Start: {validity_start}\n"
"\t[+] Validity End: {validity_end}\n".format(
common_name=details['commonName'],
sans=",".join(x['value'] for x in details['extensions']['subAltNames']['names']) or None,
authority_name=details['authority']['name'],
validity_start=details['validityStart'],
validity_end=details['validityEnd']
common_name=details["commonName"],
sans=",".join(
x["value"] for x in details["extensions"]["subAltNames"]["names"]
)
or None,
authority_name=details["authority"]["name"],
validity_start=details["validityStart"],
validity_end=details["validityEnd"],
)
)
@ -120,13 +123,11 @@ def request_rotation(endpoint, certificate, message, commit):
except Exception as e:
print(
"[!] Failed to rotate endpoint {0} to certificate {1} reason: {2}".format(
endpoint.name,
certificate.name,
e
endpoint.name, certificate.name, e
)
)
metrics.send('endpoint_rotation', 'counter', 1, metric_tags={'status': status})
metrics.send("endpoint_rotation", "counter", 1, metric_tags={"status": status})
def request_reissue(certificate, commit):
@ -154,17 +155,52 @@ def request_reissue(certificate, commit):
except Exception as e:
sentry.captureException(extra={"certificate_name": str(certificate.name)})
current_app.logger.exception(f"Error reissuing certificate: {certificate.name}", exc_info=True)
current_app.logger.exception(
f"Error reissuing certificate: {certificate.name}", exc_info=True
)
print(f"[!] Failed to reissue certificate: {certificate.name}. Reason: {e}")
metrics.send('certificate_reissue', 'counter', 1, metric_tags={'status': status, 'certificate': certificate.name})
metrics.send(
"certificate_reissue",
"counter",
1,
metric_tags={"status": status, "certificate": certificate.name},
)
@manager.option('-e', '--endpoint', dest='endpoint_name', help='Name of the endpoint you wish to rotate.')
@manager.option('-n', '--new-certificate', dest='new_certificate_name', help='Name of the certificate you wish to rotate to.')
@manager.option('-o', '--old-certificate', dest='old_certificate_name', help='Name of the certificate you wish to rotate.')
@manager.option('-a', '--notify', dest='message', action='store_true', help='Send a rotation notification to the certificates owner.')
@manager.option('-c', '--commit', dest='commit', action='store_true', default=False, help='Persist changes.')
@manager.option(
"-e",
"--endpoint",
dest="endpoint_name",
help="Name of the endpoint you wish to rotate.",
)
@manager.option(
"-n",
"--new-certificate",
dest="new_certificate_name",
help="Name of the certificate you wish to rotate to.",
)
@manager.option(
"-o",
"--old-certificate",
dest="old_certificate_name",
help="Name of the certificate you wish to rotate.",
)
@manager.option(
"-a",
"--notify",
dest="message",
action="store_true",
help="Send a rotation notification to the certificates owner.",
)
@manager.option(
"-c",
"--commit",
dest="commit",
action="store_true",
default=False,
help="Persist changes.",
)
def rotate(endpoint_name, new_certificate_name, old_certificate_name, message, commit):
"""
Rotates an endpoint and reissues it if it has not already been replaced. If it has
@ -183,7 +219,9 @@ def rotate(endpoint_name, new_certificate_name, old_certificate_name, message, c
endpoint = validate_endpoint(endpoint_name)
if endpoint and new_cert:
print(f"[+] Rotating endpoint: {endpoint.name} to certificate {new_cert.name}")
print(
f"[+] Rotating endpoint: {endpoint.name} to certificate {new_cert.name}"
)
request_rotation(endpoint, new_cert, message, commit)
elif old_cert and new_cert:
@ -197,16 +235,27 @@ def rotate(endpoint_name, new_certificate_name, old_certificate_name, message, c
print("[+] Rotating all endpoints that have new certificates available")
for endpoint in endpoint_service.get_all_pending_rotation():
if len(endpoint.certificate.replaced) == 1:
print(f"[+] Rotating {endpoint.name} to {endpoint.certificate.replaced[0].name}")
request_rotation(endpoint, endpoint.certificate.replaced[0], message, commit)
print(
f"[+] Rotating {endpoint.name} to {endpoint.certificate.replaced[0].name}"
)
request_rotation(
endpoint, endpoint.certificate.replaced[0], message, commit
)
else:
metrics.send('endpoint_rotation', 'counter', 1, metric_tags={
'status': FAILURE_METRIC_STATUS,
"old_certificate_name": str(old_cert),
"new_certificate_name": str(endpoint.certificate.replaced[0].name),
"endpoint_name": str(endpoint.name),
"message": str(message),
})
metrics.send(
"endpoint_rotation",
"counter",
1,
metric_tags={
"status": FAILURE_METRIC_STATUS,
"old_certificate_name": str(old_cert),
"new_certificate_name": str(
endpoint.certificate.replaced[0].name
),
"endpoint_name": str(endpoint.name),
"message": str(message),
},
)
print(
f"[!] Failed to rotate endpoint {endpoint.name} reason: "
"Multiple replacement certificates found."
@ -222,20 +271,38 @@ def rotate(endpoint_name, new_certificate_name, old_certificate_name, message, c
"new_certificate_name": str(new_certificate_name),
"endpoint_name": str(endpoint_name),
"message": str(message),
})
}
)
metrics.send('endpoint_rotation_job', 'counter', 1, metric_tags={
"status": status,
"old_certificate_name": str(old_certificate_name),
"new_certificate_name": str(new_certificate_name),
"endpoint_name": str(endpoint_name),
"message": str(message),
"endpoint": str(globals().get("endpoint"))
})
metrics.send(
"endpoint_rotation_job",
"counter",
1,
metric_tags={
"status": status,
"old_certificate_name": str(old_certificate_name),
"new_certificate_name": str(new_certificate_name),
"endpoint_name": str(endpoint_name),
"message": str(message),
"endpoint": str(globals().get("endpoint")),
},
)
@manager.option('-o', '--old-certificate', dest='old_certificate_name', help='Name of the certificate you wish to reissue.')
@manager.option('-c', '--commit', dest='commit', action='store_true', default=False, help='Persist changes.')
@manager.option(
"-o",
"--old-certificate",
dest="old_certificate_name",
help="Name of the certificate you wish to reissue.",
)
@manager.option(
"-c",
"--commit",
dest="commit",
action="store_true",
default=False,
help="Persist changes.",
)
def reissue(old_certificate_name, commit):
"""
Reissues certificate with the same parameters as it was originally issued with.
@ -263,76 +330,94 @@ def reissue(old_certificate_name, commit):
except Exception as e:
sentry.captureException()
current_app.logger.exception("Error reissuing certificate.", exc_info=True)
print(
"[!] Failed to reissue certificates. Reason: {}".format(
e
)
)
print("[!] Failed to reissue certificates. Reason: {}".format(e))
metrics.send('certificate_reissue_job', 'counter', 1, metric_tags={'status': status})
metrics.send(
"certificate_reissue_job", "counter", 1, metric_tags={"status": status}
)
@manager.option('-f', '--fqdns', dest='fqdns', help='FQDNs to query. Multiple fqdns specified via comma.')
@manager.option('-i', '--issuer', dest='issuer', help='Issuer to query for.')
@manager.option('-o', '--owner', dest='owner', help='Owner to query for.')
@manager.option('-e', '--expired', dest='expired', type=bool, default=False, help='Include expired certificates.')
@manager.option(
"-f",
"--fqdns",
dest="fqdns",
help="FQDNs to query. Multiple fqdns specified via comma.",
)
@manager.option("-i", "--issuer", dest="issuer", help="Issuer to query for.")
@manager.option("-o", "--owner", dest="owner", help="Owner to query for.")
@manager.option(
"-e",
"--expired",
dest="expired",
type=bool,
default=False,
help="Include expired certificates.",
)
def query(fqdns, issuer, owner, expired):
"""Prints certificates that match the query params."""
table = []
q = database.session_query(Certificate)
if issuer:
sub_query = database.session_query(Authority.id) \
.filter(Authority.name.ilike('%{0}%'.format(issuer))) \
sub_query = (
database.session_query(Authority.id)
.filter(Authority.name.ilike("%{0}%".format(issuer)))
.subquery()
)
q = q.filter(
or_(
Certificate.issuer.ilike('%{0}%'.format(issuer)),
Certificate.authority_id.in_(sub_query)
Certificate.issuer.ilike("%{0}%".format(issuer)),
Certificate.authority_id.in_(sub_query),
)
)
if owner:
q = q.filter(Certificate.owner.ilike('%{0}%'.format(owner)))
q = q.filter(Certificate.owner.ilike("%{0}%".format(owner)))
if not expired:
q = q.filter(Certificate.expired == False) # noqa
if fqdns:
for f in fqdns.split(','):
for f in fqdns.split(","):
q = q.filter(
or_(
Certificate.cn.ilike('%{0}%'.format(f)),
Certificate.domains.any(Domain.name.ilike('%{0}%'.format(f)))
Certificate.cn.ilike("%{0}%".format(f)),
Certificate.domains.any(Domain.name.ilike("%{0}%".format(f))),
)
)
for c in q.all():
table.append([c.id, c.name, c.owner, c.issuer])
print(tabulate(table, headers=['Id', 'Name', 'Owner', 'Issuer'], tablefmt='csv'))
print(tabulate(table, headers=["Id", "Name", "Owner", "Issuer"], tablefmt="csv"))
def worker(data, commit, reason):
parts = [x for x in data.split(' ') if x]
parts = [x for x in data.split(" ") if x]
try:
cert = get(int(parts[0].strip()))
plugin = plugins.get(cert.authority.plugin_name)
print('[+] Revoking certificate. Id: {0} Name: {1}'.format(cert.id, cert.name))
print("[+] Revoking certificate. Id: {0} Name: {1}".format(cert.id, cert.name))
if commit:
plugin.revoke_certificate(cert, reason)
metrics.send('certificate_revoke', 'counter', 1, metric_tags={'status': SUCCESS_METRIC_STATUS})
metrics.send(
"certificate_revoke",
"counter",
1,
metric_tags={"status": SUCCESS_METRIC_STATUS},
)
except Exception as e:
sentry.captureException()
metrics.send('certificate_revoke', 'counter', 1, metric_tags={'status': FAILURE_METRIC_STATUS})
print(
"[!] Failed to revoke certificates. Reason: {}".format(
e
)
metrics.send(
"certificate_revoke",
"counter",
1,
metric_tags={"status": FAILURE_METRIC_STATUS},
)
print("[!] Failed to revoke certificates. Reason: {}".format(e))
@manager.command
@ -341,13 +426,22 @@ def clear_pending():
Function clears all pending certificates.
:return:
"""
v = plugins.get('verisign-issuer')
v = plugins.get("verisign-issuer")
v.clear_pending_certificates()
@manager.option('-p', '--path', dest='path', help='Absolute file path to a Lemur query csv.')
@manager.option('-r', '--reason', dest='reason', help='Reason to revoke certificate.')
@manager.option('-c', '--commit', dest='commit', action='store_true', default=False, help='Persist changes.')
@manager.option(
"-p", "--path", dest="path", help="Absolute file path to a Lemur query csv."
)
@manager.option("-r", "--reason", dest="reason", help="Reason to revoke certificate.")
@manager.option(
"-c",
"--commit",
dest="commit",
action="store_true",
default=False,
help="Persist changes.",
)
def revoke(path, reason, commit):
"""
Revokes given certificate.
@ -357,7 +451,7 @@ def revoke(path, reason, commit):
print("[+] Starting certificate revocation.")
with open(path, 'r') as f:
with open(path, "r") as f:
args = [[x, commit, reason] for x in f.readlines()[2:]]
with multiprocessing.Pool(processes=3) as pool:
@ -380,11 +474,11 @@ def check_revoked():
else:
status = verify_string(cert.body, "")
cert.status = 'valid' if status else 'revoked'
cert.status = "valid" if status else "revoked"
except Exception as e:
sentry.captureException()
current_app.logger.exception(e)
cert.status = 'unknown'
cert.status = "unknown"
database.update(cert)

View File

@ -12,21 +12,30 @@ import subprocess
from flask import current_app
from lemur.certificates.service import csr_created, csr_imported, certificate_issued, certificate_imported
from lemur.certificates.service import (
csr_created,
csr_imported,
certificate_issued,
certificate_imported,
)
def csr_dump_handler(sender, csr, **kwargs):
try:
subprocess.run(['openssl', 'req', '-text', '-noout', '-reqopt', 'no_sigdump,no_pubkey'],
input=csr.encode('utf8'))
subprocess.run(
["openssl", "req", "-text", "-noout", "-reqopt", "no_sigdump,no_pubkey"],
input=csr.encode("utf8"),
)
except Exception as err:
current_app.logger.warning("Error inspecting CSR: %s", err)
def cert_dump_handler(sender, certificate, **kwargs):
try:
subprocess.run(['openssl', 'x509', '-text', '-noout', '-certopt', 'no_sigdump,no_pubkey'],
input=certificate.body.encode('utf8'))
subprocess.run(
["openssl", "x509", "-text", "-noout", "-certopt", "no_sigdump,no_pubkey"],
input=certificate.body.encode("utf8"),
)
except Exception as err:
current_app.logger.warning("Error inspecting certificate: %s", err)

View File

@ -12,7 +12,18 @@ from cryptography import x509
from cryptography.hazmat.primitives.asymmetric import rsa
from flask import current_app
from idna.core import InvalidCodepoint
from sqlalchemy import event, Integer, ForeignKey, String, PassiveDefault, func, Column, Text, Boolean, Index
from sqlalchemy import (
event,
Integer,
ForeignKey,
String,
PassiveDefault,
func,
Column,
Text,
Boolean,
Index,
)
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import relationship
from sqlalchemy.sql.expression import case, extract
@ -25,19 +36,25 @@ from lemur.database import db
from lemur.domains.models import Domain
from lemur.extensions import metrics
from lemur.extensions import sentry
from lemur.models import certificate_associations, certificate_source_associations, \
certificate_destination_associations, certificate_notification_associations, \
certificate_replacement_associations, roles_certificates, pending_cert_replacement_associations
from lemur.models import (
certificate_associations,
certificate_source_associations,
certificate_destination_associations,
certificate_notification_associations,
certificate_replacement_associations,
roles_certificates,
pending_cert_replacement_associations,
)
from lemur.plugins.base import plugins
from lemur.policies.models import RotationPolicy
from lemur.utils import Vault
def get_sequence(name):
if '-' not in name:
if "-" not in name:
return name, None
parts = name.split('-')
parts = name.split("-")
# see if we have an int at the end of our name
try:
@ -49,22 +66,26 @@ def get_sequence(name):
if len(parts[-1]) == 8:
return name, None
root = '-'.join(parts[:-1])
root = "-".join(parts[:-1])
return root, seq
def get_or_increase_name(name, serial):
certificates = Certificate.query.filter(Certificate.name.ilike('{0}%'.format(name))).all()
certificates = Certificate.query.filter(Certificate.name == name).all()
if not certificates:
return name
serial_name = '{0}-{1}'.format(name, hex(int(serial))[2:].upper())
certificates = Certificate.query.filter(Certificate.name.ilike('{0}%'.format(serial_name))).all()
serial_name = "{0}-{1}".format(name, hex(int(serial))[2:].upper())
certificates = Certificate.query.filter(Certificate.name == serial_name).all()
if not certificates:
return serial_name
certificates = Certificate.query.filter(
Certificate.name.ilike("{0}%".format(serial_name))
).all()
ends = [0]
root, end = get_sequence(serial_name)
for cert in certificates:
@ -72,21 +93,29 @@ def get_or_increase_name(name, serial):
if end:
ends.append(end)
return '{0}-{1}'.format(root, max(ends) + 1)
return "{0}-{1}".format(root, max(ends) + 1)
class Certificate(db.Model):
__tablename__ = 'certificates'
__tablename__ = "certificates"
__table_args__ = (
Index('ix_certificates_cn', "cn",
postgresql_ops={"cn": "gin_trgm_ops"},
postgresql_using='gin'),
Index('ix_certificates_name', "name",
postgresql_ops={"name": "gin_trgm_ops"},
postgresql_using='gin'),
Index(
"ix_certificates_cn",
"cn",
postgresql_ops={"cn": "gin_trgm_ops"},
postgresql_using="gin",
),
Index(
"ix_certificates_name",
"name",
postgresql_ops={"name": "gin_trgm_ops"},
postgresql_using="gin",
),
)
id = Column(Integer, primary_key=True)
ix = Index('ix_certificates_id_desc', id.desc(), postgresql_using='btree', unique=True)
ix = Index(
"ix_certificates_id_desc", id.desc(), postgresql_using="btree", unique=True
)
external_id = Column(String(128))
owner = Column(String(128), nullable=False)
name = Column(String(256), unique=True)
@ -102,7 +131,9 @@ class Certificate(db.Model):
serial = Column(String(128))
cn = Column(String(128))
deleted = Column(Boolean, index=True, default=False)
dns_provider_id = Column(Integer(), ForeignKey('dns_providers.id', ondelete='CASCADE'), nullable=True)
dns_provider_id = Column(
Integer(), ForeignKey("dns_providers.id", ondelete="CASCADE"), nullable=True
)
not_before = Column(ArrowType)
not_after = Column(ArrowType)
@ -116,34 +147,53 @@ class Certificate(db.Model):
san = Column(String(1024)) # TODO this should be migrated to boolean
rotation = Column(Boolean, default=False)
user_id = Column(Integer, ForeignKey('users.id'))
authority_id = Column(Integer, ForeignKey('authorities.id', ondelete="CASCADE"))
root_authority_id = Column(Integer, ForeignKey('authorities.id', ondelete="CASCADE"))
rotation_policy_id = Column(Integer, ForeignKey('rotation_policies.id'))
user_id = Column(Integer, ForeignKey("users.id"))
authority_id = Column(Integer, ForeignKey("authorities.id", ondelete="CASCADE"))
root_authority_id = Column(
Integer, ForeignKey("authorities.id", ondelete="CASCADE")
)
rotation_policy_id = Column(Integer, ForeignKey("rotation_policies.id"))
notifications = relationship('Notification', secondary=certificate_notification_associations, backref='certificate')
destinations = relationship('Destination', secondary=certificate_destination_associations, backref='certificate')
sources = relationship('Source', secondary=certificate_source_associations, backref='certificate')
domains = relationship('Domain', secondary=certificate_associations, backref='certificate')
roles = relationship('Role', secondary=roles_certificates, backref='certificate')
replaces = relationship('Certificate',
secondary=certificate_replacement_associations,
primaryjoin=id == certificate_replacement_associations.c.certificate_id, # noqa
secondaryjoin=id == certificate_replacement_associations.c.replaced_certificate_id, # noqa
backref='replaced')
notifications = relationship(
"Notification",
secondary=certificate_notification_associations,
backref="certificate",
)
destinations = relationship(
"Destination",
secondary=certificate_destination_associations,
backref="certificate",
)
sources = relationship(
"Source", secondary=certificate_source_associations, backref="certificate"
)
domains = relationship(
"Domain", secondary=certificate_associations, backref="certificate"
)
roles = relationship("Role", secondary=roles_certificates, backref="certificate")
replaces = relationship(
"Certificate",
secondary=certificate_replacement_associations,
primaryjoin=id == certificate_replacement_associations.c.certificate_id, # noqa
secondaryjoin=id
== certificate_replacement_associations.c.replaced_certificate_id, # noqa
backref="replaced",
)
replaced_by_pending = relationship('PendingCertificate',
secondary=pending_cert_replacement_associations,
backref='pending_replace',
viewonly=True)
replaced_by_pending = relationship(
"PendingCertificate",
secondary=pending_cert_replacement_associations,
backref="pending_replace",
viewonly=True,
)
logs = relationship('Log', backref='certificate')
endpoints = relationship('Endpoint', backref='certificate')
logs = relationship("Log", backref="certificate")
endpoints = relationship("Endpoint", backref="certificate")
rotation_policy = relationship("RotationPolicy")
sensitive_fields = ('private_key',)
sensitive_fields = ("private_key",)
def __init__(self, **kwargs):
self.body = kwargs['body'].strip()
self.body = kwargs["body"].strip()
cert = self.parsed_cert
self.issuer = defaults.issuer(cert)
@ -154,36 +204,42 @@ class Certificate(db.Model):
self.serial = defaults.serial(cert)
# when destinations are appended they require a valid name.
if kwargs.get('name'):
self.name = get_or_increase_name(defaults.text_to_slug(kwargs['name']), self.serial)
if kwargs.get("name"):
self.name = get_or_increase_name(
defaults.text_to_slug(kwargs["name"]), self.serial
)
else:
self.name = get_or_increase_name(
defaults.certificate_name(self.cn, self.issuer, self.not_before, self.not_after, self.san), self.serial)
defaults.certificate_name(
self.cn, self.issuer, self.not_before, self.not_after, self.san
),
self.serial,
)
self.owner = kwargs['owner']
self.owner = kwargs["owner"]
if kwargs.get('private_key'):
self.private_key = kwargs['private_key'].strip()
if kwargs.get("private_key"):
self.private_key = kwargs["private_key"].strip()
if kwargs.get('chain'):
self.chain = kwargs['chain'].strip()
if kwargs.get("chain"):
self.chain = kwargs["chain"].strip()
if kwargs.get('csr'):
self.csr = kwargs['csr'].strip()
if kwargs.get("csr"):
self.csr = kwargs["csr"].strip()
self.notify = kwargs.get('notify', True)
self.destinations = kwargs.get('destinations', [])
self.notifications = kwargs.get('notifications', [])
self.description = kwargs.get('description')
self.roles = list(set(kwargs.get('roles', [])))
self.replaces = kwargs.get('replaces', [])
self.rotation = kwargs.get('rotation')
self.rotation_policy = kwargs.get('rotation_policy')
self.notify = kwargs.get("notify", True)
self.destinations = kwargs.get("destinations", [])
self.notifications = kwargs.get("notifications", [])
self.description = kwargs.get("description")
self.roles = list(set(kwargs.get("roles", [])))
self.replaces = kwargs.get("replaces", [])
self.rotation = kwargs.get("rotation")
self.rotation_policy = kwargs.get("rotation_policy")
self.signing_algorithm = defaults.signing_algorithm(cert)
self.bits = defaults.bitstrength(cert)
self.external_id = kwargs.get('external_id')
self.authority_id = kwargs.get('authority_id')
self.dns_provider_id = kwargs.get('dns_provider_id')
self.external_id = kwargs.get("external_id")
self.authority_id = kwargs.get("authority_id")
self.dns_provider_id = kwargs.get("dns_provider_id")
for domain in defaults.domains(cert):
self.domains.append(Domain(name=domain))
@ -197,8 +253,11 @@ class Certificate(db.Model):
Integrity checks: Does the cert have a valid chain and matching private key?
"""
if self.private_key:
validators.verify_private_key_match(utils.parse_private_key(self.private_key), self.parsed_cert,
error_class=AssertionError)
validators.verify_private_key_match(
utils.parse_private_key(self.private_key),
self.parsed_cert,
error_class=AssertionError,
)
if self.chain:
chain = [self.parsed_cert] + utils.parse_cert_chain(self.chain)
@ -240,7 +299,9 @@ class Certificate(db.Model):
@property
def key_type(self):
if isinstance(self.parsed_cert.public_key(), rsa.RSAPublicKey):
return 'RSA{key_size}'.format(key_size=self.parsed_cert.public_key().key_size)
return "RSA{key_size}".format(
key_size=self.parsed_cert.public_key().key_size
)
@property
def validity_remaining(self):
@ -265,26 +326,16 @@ class Certificate(db.Model):
@expired.expression
def expired(cls):
return case(
[
(cls.not_after <= arrow.utcnow(), True)
],
else_=False
)
return case([(cls.not_after <= arrow.utcnow(), True)], else_=False)
@hybrid_property
def revoked(self):
if 'revoked' == self.status:
if "revoked" == self.status:
return True
@revoked.expression
def revoked(cls):
return case(
[
(cls.status == 'revoked', True)
],
else_=False
)
return case([(cls.status == "revoked", True)], else_=False)
@hybrid_property
def in_rotation_window(self):
@ -307,66 +358,65 @@ class Certificate(db.Model):
:return:
"""
return case(
[
(extract('day', cls.not_after - func.now()) <= RotationPolicy.days, True)
],
else_=False
[(extract("day", cls.not_after - func.now()) <= RotationPolicy.days, True)],
else_=False,
)
@property
def extensions(self):
# setup default values
return_extensions = {
'sub_alt_names': {'names': []}
}
return_extensions = {"sub_alt_names": {"names": []}}
try:
for extension in self.parsed_cert.extensions:
value = extension.value
if isinstance(value, x509.BasicConstraints):
return_extensions['basic_constraints'] = value
return_extensions["basic_constraints"] = value
elif isinstance(value, x509.SubjectAlternativeName):
return_extensions['sub_alt_names']['names'] = value
return_extensions["sub_alt_names"]["names"] = value
elif isinstance(value, x509.ExtendedKeyUsage):
return_extensions['extended_key_usage'] = value
return_extensions["extended_key_usage"] = value
elif isinstance(value, x509.KeyUsage):
return_extensions['key_usage'] = value
return_extensions["key_usage"] = value
elif isinstance(value, x509.SubjectKeyIdentifier):
return_extensions['subject_key_identifier'] = {'include_ski': True}
return_extensions["subject_key_identifier"] = {"include_ski": True}
elif isinstance(value, x509.AuthorityInformationAccess):
return_extensions['certificate_info_access'] = {'include_aia': True}
return_extensions["certificate_info_access"] = {"include_aia": True}
elif isinstance(value, x509.AuthorityKeyIdentifier):
aki = {
'use_key_identifier': False,
'use_authority_cert': False
}
aki = {"use_key_identifier": False, "use_authority_cert": False}
if value.key_identifier:
aki['use_key_identifier'] = True
aki["use_key_identifier"] = True
if value.authority_cert_issuer:
aki['use_authority_cert'] = True
aki["use_authority_cert"] = True
return_extensions['authority_key_identifier'] = aki
return_extensions["authority_key_identifier"] = aki
elif isinstance(value, x509.CRLDistributionPoints):
return_extensions['crl_distribution_points'] = {'include_crl_dp': value}
return_extensions["crl_distribution_points"] = {
"include_crl_dp": value
}
# TODO: Not supporting custom OIDs yet. https://github.com/Netflix/lemur/issues/665
else:
current_app.logger.warning('Custom OIDs not yet supported for clone operation.')
current_app.logger.warning(
"Custom OIDs not yet supported for clone operation."
)
except InvalidCodepoint as e:
sentry.captureException()
current_app.logger.warning('Unable to parse extensions due to underscore in dns name')
current_app.logger.warning(
"Unable to parse extensions due to underscore in dns name"
)
except ValueError as e:
sentry.captureException()
current_app.logger.warning('Unable to parse')
current_app.logger.warning("Unable to parse")
current_app.logger.exception(e)
return return_extensions
@ -375,7 +425,7 @@ class Certificate(db.Model):
return "Certificate(name={name})".format(name=self.name)
@event.listens_for(Certificate.destinations, 'append')
@event.listens_for(Certificate.destinations, "append")
def update_destinations(target, value, initiator):
"""
Attempt to upload certificate to the new destination
@ -389,17 +439,31 @@ def update_destinations(target, value, initiator):
status = FAILURE_METRIC_STATUS
try:
if target.private_key or not destination_plugin.requires_key:
destination_plugin.upload(target.name, target.body, target.private_key, target.chain, value.options)
destination_plugin.upload(
target.name,
target.body,
target.private_key,
target.chain,
value.options,
)
status = SUCCESS_METRIC_STATUS
except Exception as e:
sentry.captureException()
raise
metrics.send('destination_upload', 'counter', 1,
metric_tags={'status': status, 'certificate': target.name, 'destination': value.label})
metrics.send(
"destination_upload",
"counter",
1,
metric_tags={
"status": status,
"certificate": target.name,
"destination": value.label,
},
)
@event.listens_for(Certificate.replaces, 'append')
@event.listens_for(Certificate.replaces, "append")
def update_replacement(target, value, initiator):
"""
When a certificate is marked as 'replaced' we should not notify.

View File

@ -39,22 +39,26 @@ from lemur.users.schemas import UserNestedOutputSchema
class CertificateSchema(LemurInputSchema):
owner = fields.Email(required=True)
description = fields.String(missing='', allow_none=True)
description = fields.String(missing="", allow_none=True)
class CertificateCreationSchema(CertificateSchema):
@post_load
def default_notification(self, data):
if not data['notifications']:
data['notifications'] += notification_service.create_default_expiration_notifications(
"DEFAULT_{0}".format(data['owner'].split('@')[0].upper()),
[data['owner']],
if not data["notifications"]:
data[
"notifications"
] += notification_service.create_default_expiration_notifications(
"DEFAULT_{0}".format(data["owner"].split("@")[0].upper()),
[data["owner"]],
)
data['notifications'] += notification_service.create_default_expiration_notifications(
'DEFAULT_SECURITY',
current_app.config.get('LEMUR_SECURITY_TEAM_EMAIL'),
current_app.config.get('LEMUR_SECURITY_TEAM_EMAIL_INTERVALS', None)
data[
"notifications"
] += notification_service.create_default_expiration_notifications(
"DEFAULT_SECURITY",
current_app.config.get("LEMUR_SECURITY_TEAM_EMAIL"),
current_app.config.get("LEMUR_SECURITY_TEAM_EMAIL_INTERVALS", None),
)
return data
@ -71,37 +75,53 @@ class CertificateInputSchema(CertificateCreationSchema):
destinations = fields.Nested(AssociatedDestinationSchema, missing=[], many=True)
notifications = fields.Nested(AssociatedNotificationSchema, missing=[], many=True)
replaces = fields.Nested(AssociatedCertificateSchema, missing=[], many=True)
replacements = fields.Nested(AssociatedCertificateSchema, missing=[], many=True) # deprecated
replacements = fields.Nested(
AssociatedCertificateSchema, missing=[], many=True
) # deprecated
roles = fields.Nested(AssociatedRoleSchema, missing=[], many=True)
dns_provider = fields.Nested(AssociatedDnsProviderSchema, missing=None, allow_none=True, required=False)
dns_provider = fields.Nested(
AssociatedDnsProviderSchema, missing=None, allow_none=True, required=False
)
csr = fields.String(allow_none=True, validate=validators.csr)
key_type = fields.String(
validate=validate.OneOf(CERTIFICATE_KEY_TYPES),
missing='RSA2048')
validate=validate.OneOf(CERTIFICATE_KEY_TYPES), missing="RSA2048"
)
notify = fields.Boolean(default=True)
rotation = fields.Boolean()
rotation_policy = fields.Nested(AssociatedRotationPolicySchema, missing={'name': 'default'}, allow_none=True,
default={'name': 'default'})
rotation_policy = fields.Nested(
AssociatedRotationPolicySchema,
missing={"name": "default"},
allow_none=True,
default={"name": "default"},
)
# certificate body fields
organizational_unit = fields.String(missing=lambda: current_app.config.get('LEMUR_DEFAULT_ORGANIZATIONAL_UNIT'))
organization = fields.String(missing=lambda: current_app.config.get('LEMUR_DEFAULT_ORGANIZATION'))
location = fields.String(missing=lambda: current_app.config.get('LEMUR_DEFAULT_LOCATION'))
country = fields.String(missing=lambda: current_app.config.get('LEMUR_DEFAULT_COUNTRY'))
state = fields.String(missing=lambda: current_app.config.get('LEMUR_DEFAULT_STATE'))
organizational_unit = fields.String(
missing=lambda: current_app.config.get("LEMUR_DEFAULT_ORGANIZATIONAL_UNIT")
)
organization = fields.String(
missing=lambda: current_app.config.get("LEMUR_DEFAULT_ORGANIZATION")
)
location = fields.String(
missing=lambda: current_app.config.get("LEMUR_DEFAULT_LOCATION")
)
country = fields.String(
missing=lambda: current_app.config.get("LEMUR_DEFAULT_COUNTRY")
)
state = fields.String(missing=lambda: current_app.config.get("LEMUR_DEFAULT_STATE"))
extensions = fields.Nested(ExtensionSchema)
@validates_schema
def validate_authority(self, data):
if isinstance(data['authority'], str):
if isinstance(data["authority"], str):
raise ValidationError("Authority not found.")
if not data['authority'].active:
raise ValidationError("The authority is inactive.", ['authority'])
if not data["authority"].active:
raise ValidationError("The authority is inactive.", ["authority"])
@validates_schema
def validate_dates(self, data):
@ -109,23 +129,19 @@ class CertificateInputSchema(CertificateCreationSchema):
@pre_load
def load_data(self, data):
if data.get('replacements'):
data['replaces'] = data['replacements'] # TODO remove when field is deprecated
if data.get('csr'):
csr_sans = cert_utils.get_sans_from_csr(data['csr'])
if not data.get('extensions'):
data['extensions'] = {
'subAltNames': {
'names': []
}
}
elif not data['extensions'].get('subAltNames'):
data['extensions']['subAltNames'] = {
'names': []
}
elif not data['extensions']['subAltNames'].get('names'):
data['extensions']['subAltNames']['names'] = []
data['extensions']['subAltNames']['names'] += csr_sans
if data.get("replacements"):
data["replaces"] = data[
"replacements"
] # TODO remove when field is deprecated
if data.get("csr"):
csr_sans = cert_utils.get_sans_from_csr(data["csr"])
if not data.get("extensions"):
data["extensions"] = {"subAltNames": {"names": []}}
elif not data["extensions"].get("subAltNames"):
data["extensions"]["subAltNames"] = {"names": []}
elif not data["extensions"]["subAltNames"].get("names"):
data["extensions"]["subAltNames"]["names"] = []
data["extensions"]["subAltNames"]["names"] += csr_sans
return missing.convert_validity_years(data)
@ -138,13 +154,17 @@ class CertificateEditInputSchema(CertificateSchema):
destinations = fields.Nested(AssociatedDestinationSchema, missing=[], many=True)
notifications = fields.Nested(AssociatedNotificationSchema, missing=[], many=True)
replaces = fields.Nested(AssociatedCertificateSchema, missing=[], many=True)
replacements = fields.Nested(AssociatedCertificateSchema, missing=[], many=True) # deprecated
replacements = fields.Nested(
AssociatedCertificateSchema, missing=[], many=True
) # deprecated
roles = fields.Nested(AssociatedRoleSchema, missing=[], many=True)
@pre_load
def load_data(self, data):
if data.get('replacements'):
data['replaces'] = data['replacements'] # TODO remove when field is deprecated
if data.get("replacements"):
data["replaces"] = data[
"replacements"
] # TODO remove when field is deprecated
return data
@post_load
@ -155,10 +175,15 @@ class CertificateEditInputSchema(CertificateSchema):
:param data:
:return:
"""
if data['owner']:
notification_name = "DEFAULT_{0}".format(data['owner'].split('@')[0].upper())
data['notifications'] += notification_service.create_default_expiration_notifications(notification_name,
[data['owner']])
if data["owner"]:
notification_name = "DEFAULT_{0}".format(
data["owner"].split("@")[0].upper()
)
data[
"notifications"
] += notification_service.create_default_expiration_notifications(
notification_name, [data["owner"]]
)
return data
@ -184,13 +209,13 @@ class CertificateNestedOutputSchema(LemurOutputSchema):
# Note aliasing is the first step in deprecating these fields.
cn = fields.String() # deprecated
common_name = fields.String(attribute='cn')
common_name = fields.String(attribute="cn")
not_after = fields.DateTime() # deprecated
validity_end = ArrowDateTime(attribute='not_after')
validity_end = ArrowDateTime(attribute="not_after")
not_before = fields.DateTime() # deprecated
validity_start = ArrowDateTime(attribute='not_before')
validity_start = ArrowDateTime(attribute="not_before")
issuer = fields.Nested(AuthorityNestedOutputSchema)
@ -221,22 +246,22 @@ class CertificateOutputSchema(LemurOutputSchema):
# Note aliasing is the first step in deprecating these fields.
notify = fields.Boolean()
active = fields.Boolean(attribute='notify')
active = fields.Boolean(attribute="notify")
cn = fields.String()
common_name = fields.String(attribute='cn')
common_name = fields.String(attribute="cn")
distinguished_name = fields.String()
not_after = fields.DateTime()
validity_end = ArrowDateTime(attribute='not_after')
validity_end = ArrowDateTime(attribute="not_after")
not_before = fields.DateTime()
validity_start = ArrowDateTime(attribute='not_before')
validity_start = ArrowDateTime(attribute="not_before")
owner = fields.Email()
san = fields.Boolean()
serial = fields.String()
serial_hex = Hex(attribute='serial')
serial_hex = Hex(attribute="serial")
signing_algorithm = fields.String()
status = fields.String()
@ -253,7 +278,9 @@ class CertificateOutputSchema(LemurOutputSchema):
dns_provider = fields.Nested(DnsProvidersNestedOutputSchema)
roles = fields.Nested(RoleNestedOutputSchema, many=True)
endpoints = fields.Nested(EndpointNestedOutputSchema, many=True, missing=[])
replaced_by = fields.Nested(CertificateNestedOutputSchema, many=True, attribute='replaced')
replaced_by = fields.Nested(
CertificateNestedOutputSchema, many=True, attribute="replaced"
)
rotation_policy = fields.Nested(RotationPolicyNestedOutputSchema)
@ -274,35 +301,41 @@ class CertificateUploadInputSchema(CertificateCreationSchema):
@validates_schema
def keys(self, data):
if data.get('destinations'):
if not data.get('private_key'):
raise ValidationError('Destinations require private key.')
if data.get("destinations"):
if not data.get("private_key"):
raise ValidationError("Destinations require private key.")
@validates_schema
def validate_cert_private_key_chain(self, data):
cert = None
key = None
if data.get('body'):
if data.get("body"):
try:
cert = utils.parse_certificate(data['body'])
cert = utils.parse_certificate(data["body"])
except ValueError:
raise ValidationError("Public certificate presented is not valid.", field_names=['body'])
raise ValidationError(
"Public certificate presented is not valid.", field_names=["body"]
)
if data.get('private_key'):
if data.get("private_key"):
try:
key = utils.parse_private_key(data['private_key'])
key = utils.parse_private_key(data["private_key"])
except ValueError:
raise ValidationError("Private key presented is not valid.", field_names=['private_key'])
raise ValidationError(
"Private key presented is not valid.", field_names=["private_key"]
)
if cert and key:
# Throws ValidationError
validators.verify_private_key_match(key, cert)
if data.get('chain'):
if data.get("chain"):
try:
chain = utils.parse_cert_chain(data['chain'])
chain = utils.parse_cert_chain(data["chain"])
except ValueError:
raise ValidationError("Invalid certificate in certificate chain.", field_names=['chain'])
raise ValidationError(
"Invalid certificate in certificate chain.", field_names=["chain"]
)
# Throws ValidationError
validators.verify_cert_chain([cert] + chain)
@ -318,8 +351,10 @@ class CertificateNotificationOutputSchema(LemurOutputSchema):
name = fields.String()
owner = fields.Email()
user = fields.Nested(UserNestedOutputSchema)
validity_end = ArrowDateTime(attribute='not_after')
replaced_by = fields.Nested(CertificateNestedOutputSchema, many=True, attribute='replaced')
validity_end = ArrowDateTime(attribute="not_after")
replaced_by = fields.Nested(
CertificateNestedOutputSchema, many=True, attribute="replaced"
)
endpoints = fields.Nested(EndpointNestedOutputSchema, many=True, missing=[])

View File

@ -26,10 +26,14 @@ from lemur.plugins.base import plugins
from lemur.roles import service as role_service
from lemur.roles.models import Role
csr_created = signals.signal('csr_created', "CSR generated")
csr_imported = signals.signal('csr_imported', "CSR imported from external source")
certificate_issued = signals.signal('certificate_issued', "Authority issued a certificate")
certificate_imported = signals.signal('certificate_imported', "Certificate imported from external source")
csr_created = signals.signal("csr_created", "CSR generated")
csr_imported = signals.signal("csr_imported", "CSR imported from external source")
certificate_issued = signals.signal(
"certificate_issued", "Authority issued a certificate"
)
certificate_imported = signals.signal(
"certificate_imported", "Certificate imported from external source"
)
def get(cert_id):
@ -49,7 +53,7 @@ def get_by_name(name):
:param name:
:return:
"""
return database.get(Certificate, name, field='name')
return database.get(Certificate, name, field="name")
def get_by_serial(serial):
@ -105,8 +109,12 @@ def get_all_pending_cleaning(source):
:param source:
:return:
"""
return Certificate.query.filter(Certificate.sources.any(id=source.id)) \
.filter(not_(Certificate.endpoints.any())).filter(Certificate.expired).all()
return (
Certificate.query.filter(Certificate.sources.any(id=source.id))
.filter(not_(Certificate.endpoints.any()))
.filter(Certificate.expired)
.all()
)
def get_all_pending_reissue():
@ -119,9 +127,12 @@ def get_all_pending_reissue():
:return:
"""
return Certificate.query.filter(Certificate.rotation == True) \
.filter(not_(Certificate.replaced.any())) \
.filter(Certificate.in_rotation_window == True).all() # noqa
return (
Certificate.query.filter(Certificate.rotation == True)
.filter(not_(Certificate.replaced.any()))
.filter(Certificate.in_rotation_window == True)
.all()
) # noqa
def find_duplicates(cert):
@ -133,10 +144,12 @@ def find_duplicates(cert):
:param cert:
:return:
"""
if cert['chain']:
return Certificate.query.filter_by(body=cert['body'].strip(), chain=cert['chain'].strip()).all()
if cert["chain"]:
return Certificate.query.filter_by(
body=cert["body"].strip(), chain=cert["chain"].strip()
).all()
else:
return Certificate.query.filter_by(body=cert['body'].strip(), chain=None).all()
return Certificate.query.filter_by(body=cert["body"].strip(), chain=None).all()
def export(cert, export_plugin):
@ -148,8 +161,10 @@ def export(cert, export_plugin):
:param cert:
:return:
"""
plugin = plugins.get(export_plugin['slug'])
return plugin.export(cert.body, cert.chain, cert.private_key, export_plugin['pluginOptions'])
plugin = plugins.get(export_plugin["slug"])
return plugin.export(
cert.body, cert.chain, cert.private_key, export_plugin["pluginOptions"]
)
def update(cert_id, **kwargs):
@ -168,17 +183,19 @@ def update(cert_id, **kwargs):
def create_certificate_roles(**kwargs):
# create an role for the owner and assign it
owner_role = role_service.get_by_name(kwargs['owner'])
owner_role = role_service.get_by_name(kwargs["owner"])
if not owner_role:
owner_role = role_service.create(
kwargs['owner'],
description="Auto generated role based on owner: {0}".format(kwargs['owner'])
kwargs["owner"],
description="Auto generated role based on owner: {0}".format(
kwargs["owner"]
),
)
# ensure that the authority's owner is also associated with the certificate
if kwargs.get('authority'):
authority_owner_role = role_service.get_by_name(kwargs['authority'].owner)
if kwargs.get("authority"):
authority_owner_role = role_service.get_by_name(kwargs["authority"].owner)
return [owner_role, authority_owner_role]
return [owner_role]
@ -190,16 +207,16 @@ def mint(**kwargs):
Support for multiple authorities is handled by individual plugins.
"""
authority = kwargs['authority']
authority = kwargs["authority"]
issuer = plugins.get(authority.plugin_name)
# allow the CSR to be specified by the user
if not kwargs.get('csr'):
if not kwargs.get("csr"):
csr, private_key = create_csr(**kwargs)
csr_created.send(authority=authority, csr=csr)
else:
csr = str(kwargs.get('csr'))
csr = str(kwargs.get("csr"))
private_key = None
csr_imported.send(authority=authority, csr=csr)
@ -220,8 +237,8 @@ def import_certificate(**kwargs):
:param kwargs:
"""
if not kwargs.get('owner'):
kwargs['owner'] = current_app.config.get('LEMUR_SECURITY_TEAM_EMAIL')[0]
if not kwargs.get("owner"):
kwargs["owner"] = current_app.config.get("LEMUR_SECURITY_TEAM_EMAIL")[0]
return upload(**kwargs)
@ -232,16 +249,16 @@ def upload(**kwargs):
"""
roles = create_certificate_roles(**kwargs)
if kwargs.get('roles'):
kwargs['roles'] += roles
if kwargs.get("roles"):
kwargs["roles"] += roles
else:
kwargs['roles'] = roles
kwargs["roles"] = roles
cert = Certificate(**kwargs)
cert.authority = kwargs.get('authority')
cert.authority = kwargs.get("authority")
cert = database.create(cert)
kwargs['creator'].certificates.append(cert)
kwargs["creator"].certificates.append(cert)
cert = database.update(cert)
certificate_imported.send(certificate=cert, authority=cert.authority)
@ -258,39 +275,45 @@ def create(**kwargs):
current_app.logger.error("Exception minting certificate", exc_info=True)
sentry.captureException()
raise
kwargs['body'] = cert_body
kwargs['private_key'] = private_key
kwargs['chain'] = cert_chain
kwargs['external_id'] = external_id
kwargs['csr'] = csr
kwargs["body"] = cert_body
kwargs["private_key"] = private_key
kwargs["chain"] = cert_chain
kwargs["external_id"] = external_id
kwargs["csr"] = csr
roles = create_certificate_roles(**kwargs)
if kwargs.get('roles'):
kwargs['roles'] += roles
if kwargs.get("roles"):
kwargs["roles"] += roles
else:
kwargs['roles'] = roles
kwargs["roles"] = roles
if cert_body:
cert = Certificate(**kwargs)
kwargs['creator'].certificates.append(cert)
kwargs["creator"].certificates.append(cert)
else:
cert = PendingCertificate(**kwargs)
kwargs['creator'].pending_certificates.append(cert)
kwargs["creator"].pending_certificates.append(cert)
cert.authority = kwargs['authority']
cert.authority = kwargs["authority"]
database.commit()
if isinstance(cert, Certificate):
certificate_issued.send(certificate=cert, authority=cert.authority)
metrics.send('certificate_issued', 'counter', 1, metric_tags=dict(owner=cert.owner, issuer=cert.issuer))
metrics.send(
"certificate_issued",
"counter",
1,
metric_tags=dict(owner=cert.owner, issuer=cert.issuer),
)
if isinstance(cert, PendingCertificate):
# We need to refresh the pending certificate to avoid "Instance is not bound to a Session; "
# "attribute refresh operation cannot proceed"
pending_cert = database.session_query(PendingCertificate).get(cert.id)
from lemur.common.celery import fetch_acme_cert
if not current_app.config.get("ACME_DISABLE_AUTORESOLVE", False):
fetch_acme_cert.apply_async((pending_cert.id,), countdown=5)
@ -306,51 +329,55 @@ def render(args):
"""
query = database.session_query(Certificate)
time_range = args.pop('time_range')
destination_id = args.pop('destination_id')
notification_id = args.pop('notification_id', None)
show = args.pop('show')
time_range = args.pop("time_range")
destination_id = args.pop("destination_id")
notification_id = args.pop("notification_id", None)
show = args.pop("show")
# owner = args.pop('owner')
# creator = args.pop('creator') # TODO we should enabling filtering by owner
filt = args.pop('filter')
filt = args.pop("filter")
if filt:
terms = filt.split(';')
term = '%{0}%'.format(terms[1])
terms = filt.split(";")
term = "%{0}%".format(terms[1])
# Exact matches for quotes. Only applies to name, issuer, and cn
if terms[1].startswith('"') and terms[1].endswith('"'):
term = terms[1][1:-1]
if 'issuer' in terms:
if "issuer" in terms:
# we can't rely on issuer being correct in the cert directly so we combine queries
sub_query = database.session_query(Authority.id) \
.filter(Authority.name.ilike(term)) \
sub_query = (
database.session_query(Authority.id)
.filter(Authority.name.ilike(term))
.subquery()
)
query = query.filter(
or_(
Certificate.issuer.ilike(term),
Certificate.authority_id.in_(sub_query)
Certificate.authority_id.in_(sub_query),
)
)
elif 'destination' in terms:
query = query.filter(Certificate.destinations.any(Destination.id == terms[1]))
elif 'notify' in filt:
elif "destination" in terms:
query = query.filter(
Certificate.destinations.any(Destination.id == terms[1])
)
elif "notify" in filt:
query = query.filter(Certificate.notify == truthiness(terms[1]))
elif 'active' in filt:
elif "active" in filt:
query = query.filter(Certificate.active == truthiness(terms[1]))
elif 'cn' in terms:
elif "cn" in terms:
query = query.filter(
or_(
Certificate.cn.ilike(term),
Certificate.domains.any(Domain.name.ilike(term))
Certificate.domains.any(Domain.name.ilike(term)),
)
)
elif 'id' in terms:
elif "id" in terms:
query = query.filter(Certificate.id == cast(terms[1], Integer))
elif 'name' in terms:
elif "name" in terms:
query = query.filter(
or_(
Certificate.name.ilike(term),
@ -362,26 +389,35 @@ def render(args):
query = database.filter(query, Certificate, terms)
if show:
sub_query = database.session_query(Role.name).filter(Role.user_id == args['user'].id).subquery()
sub_query = (
database.session_query(Role.name)
.filter(Role.user_id == args["user"].id)
.subquery()
)
query = query.filter(
or_(
Certificate.user_id == args['user'].id,
Certificate.owner.in_(sub_query)
Certificate.user_id == args["user"].id, Certificate.owner.in_(sub_query)
)
)
if destination_id:
query = query.filter(Certificate.destinations.any(Destination.id == destination_id))
query = query.filter(
Certificate.destinations.any(Destination.id == destination_id)
)
if notification_id:
query = query.filter(Certificate.notifications.any(Notification.id == notification_id))
query = query.filter(
Certificate.notifications.any(Notification.id == notification_id)
)
if time_range:
to = arrow.now().replace(weeks=+time_range).format('YYYY-MM-DD')
now = arrow.now().format('YYYY-MM-DD')
query = query.filter(Certificate.not_after <= to).filter(Certificate.not_after >= now)
to = arrow.now().replace(weeks=+time_range).format("YYYY-MM-DD")
now = arrow.now().format("YYYY-MM-DD")
query = query.filter(Certificate.not_after <= to).filter(
Certificate.not_after >= now
)
if current_app.config.get('ALLOW_CERT_DELETION', False):
if current_app.config.get("ALLOW_CERT_DELETION", False):
query = query.filter(Certificate.deleted == False) # noqa
result = database.sort_and_page(query, Certificate, args)
@ -409,18 +445,20 @@ def query_common_name(common_name, args):
:param args:
:return:
"""
owner = args.pop('owner')
owner = args.pop("owner")
if not owner:
owner = '%'
owner = "%"
# only not expired certificates
current_time = arrow.utcnow()
result = Certificate.query.filter(Certificate.cn.ilike(common_name)) \
.filter(Certificate.owner.ilike(owner))\
.filter(Certificate.not_after >= current_time.format('YYYY-MM-DD')) \
.filter(Certificate.rotation.is_(True))\
result = (
Certificate.query.filter(Certificate.cn.ilike(common_name))
.filter(Certificate.owner.ilike(owner))
.filter(Certificate.not_after >= current_time.format("YYYY-MM-DD"))
.filter(Certificate.rotation.is_(True))
.all()
)
return result
@ -432,62 +470,77 @@ def create_csr(**csr_config):
:param csr_config:
"""
private_key = generate_private_key(csr_config.get('key_type'))
private_key = generate_private_key(csr_config.get("key_type"))
builder = x509.CertificateSigningRequestBuilder()
name_list = [x509.NameAttribute(x509.OID_COMMON_NAME, csr_config['common_name'])]
if current_app.config.get('LEMUR_OWNER_EMAIL_IN_SUBJECT', True):
name_list.append(x509.NameAttribute(x509.OID_EMAIL_ADDRESS, csr_config['owner']))
if 'organization' in csr_config and csr_config['organization'].strip():
name_list.append(x509.NameAttribute(x509.OID_ORGANIZATION_NAME, csr_config['organization']))
if 'organizational_unit' in csr_config and csr_config['organizational_unit'].strip():
name_list.append(x509.NameAttribute(x509.OID_ORGANIZATIONAL_UNIT_NAME, csr_config['organizational_unit']))
if 'country' in csr_config and csr_config['country'].strip():
name_list.append(x509.NameAttribute(x509.OID_COUNTRY_NAME, csr_config['country']))
if 'state' in csr_config and csr_config['state'].strip():
name_list.append(x509.NameAttribute(x509.OID_STATE_OR_PROVINCE_NAME, csr_config['state']))
if 'location' in csr_config and csr_config['location'].strip():
name_list.append(x509.NameAttribute(x509.OID_LOCALITY_NAME, csr_config['location']))
name_list = [x509.NameAttribute(x509.OID_COMMON_NAME, csr_config["common_name"])]
if current_app.config.get("LEMUR_OWNER_EMAIL_IN_SUBJECT", True):
name_list.append(
x509.NameAttribute(x509.OID_EMAIL_ADDRESS, csr_config["owner"])
)
if "organization" in csr_config and csr_config["organization"].strip():
name_list.append(
x509.NameAttribute(x509.OID_ORGANIZATION_NAME, csr_config["organization"])
)
if (
"organizational_unit" in csr_config
and csr_config["organizational_unit"].strip()
):
name_list.append(
x509.NameAttribute(
x509.OID_ORGANIZATIONAL_UNIT_NAME, csr_config["organizational_unit"]
)
)
if "country" in csr_config and csr_config["country"].strip():
name_list.append(
x509.NameAttribute(x509.OID_COUNTRY_NAME, csr_config["country"])
)
if "state" in csr_config and csr_config["state"].strip():
name_list.append(
x509.NameAttribute(x509.OID_STATE_OR_PROVINCE_NAME, csr_config["state"])
)
if "location" in csr_config and csr_config["location"].strip():
name_list.append(
x509.NameAttribute(x509.OID_LOCALITY_NAME, csr_config["location"])
)
builder = builder.subject_name(x509.Name(name_list))
extensions = csr_config.get('extensions', {})
critical_extensions = ['basic_constraints', 'sub_alt_names', 'key_usage']
noncritical_extensions = ['extended_key_usage']
extensions = csr_config.get("extensions", {})
critical_extensions = ["basic_constraints", "sub_alt_names", "key_usage"]
noncritical_extensions = ["extended_key_usage"]
for k, v in extensions.items():
if v:
if k in critical_extensions:
current_app.logger.debug('Adding Critical Extension: {0} {1}'.format(k, v))
if k == 'sub_alt_names':
if v['names']:
builder = builder.add_extension(v['names'], critical=True)
current_app.logger.debug(
"Adding Critical Extension: {0} {1}".format(k, v)
)
if k == "sub_alt_names":
if v["names"]:
builder = builder.add_extension(v["names"], critical=True)
else:
builder = builder.add_extension(v, critical=True)
if k in noncritical_extensions:
current_app.logger.debug('Adding Extension: {0} {1}'.format(k, v))
current_app.logger.debug("Adding Extension: {0} {1}".format(k, v))
builder = builder.add_extension(v, critical=False)
ski = extensions.get('subject_key_identifier', {})
if ski.get('include_ski', False):
ski = extensions.get("subject_key_identifier", {})
if ski.get("include_ski", False):
builder = builder.add_extension(
x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()),
critical=False
critical=False,
)
request = builder.sign(
private_key, hashes.SHA256(), default_backend()
)
request = builder.sign(private_key, hashes.SHA256(), default_backend())
# serialize our private key and CSR
private_key = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL, # would like to use PKCS8 but AWS ELBs don't like it
encryption_algorithm=serialization.NoEncryption()
).decode('utf-8')
encryption_algorithm=serialization.NoEncryption(),
).decode("utf-8")
csr = request.public_bytes(
encoding=serialization.Encoding.PEM
).decode('utf-8')
csr = request.public_bytes(encoding=serialization.Encoding.PEM).decode("utf-8")
return csr, private_key
@ -499,16 +552,19 @@ def stats(**kwargs):
:param kwargs:
:return:
"""
if kwargs.get('metric') == 'not_after':
if kwargs.get("metric") == "not_after":
start = arrow.utcnow()
end = start.replace(weeks=+32)
items = database.db.session.query(Certificate.issuer, func.count(Certificate.id)) \
.group_by(Certificate.issuer) \
.filter(Certificate.not_after <= end.format('YYYY-MM-DD')) \
.filter(Certificate.not_after >= start.format('YYYY-MM-DD')).all()
items = (
database.db.session.query(Certificate.issuer, func.count(Certificate.id))
.group_by(Certificate.issuer)
.filter(Certificate.not_after <= end.format("YYYY-MM-DD"))
.filter(Certificate.not_after >= start.format("YYYY-MM-DD"))
.all()
)
else:
attr = getattr(Certificate, kwargs.get('metric'))
attr = getattr(Certificate, kwargs.get("metric"))
query = database.db.session.query(attr, func.count(attr))
items = query.group_by(attr).all()
@ -519,7 +575,7 @@ def stats(**kwargs):
keys.append(key)
values.append(count)
return {'labels': keys, 'values': values}
return {"labels": keys, "values": values}
def get_account_number(arn):
@ -566,22 +622,24 @@ def get_certificate_primitives(certificate):
certificate via `create`.
"""
start, end = calculate_reissue_range(certificate.not_before, certificate.not_after)
ser = CertificateInputSchema().load(CertificateOutputSchema().dump(certificate).data)
ser = CertificateInputSchema().load(
CertificateOutputSchema().dump(certificate).data
)
assert not ser.errors, "Error re-serializing certificate: %s" % ser.errors
data = ser.data
# we can't quite tell if we are using a custom name, as this is an automated process (typically)
# we will rely on the Lemur generated name
data.pop('name', None)
data.pop("name", None)
# TODO this can be removed once we migrate away from cn
data['cn'] = data['common_name']
data["cn"] = data["common_name"]
# needed until we move off not_*
data['not_before'] = start
data['not_after'] = end
data['validity_start'] = start
data['validity_end'] = end
data["not_before"] = start
data["not_after"] = end
data["validity_start"] = start
data["validity_end"] = end
return data
@ -599,13 +657,13 @@ def reissue_certificate(certificate, replace=None, user=None):
# We do not want to re-use the CSR when creating a certificate because this defeats the purpose of rotation.
del primitives["csr"]
if not user:
primitives['creator'] = certificate.user
primitives["creator"] = certificate.user
else:
primitives['creator'] = user
primitives["creator"] = user
if replace:
primitives['replaces'] = [certificate]
primitives["replaces"] = [certificate]
new_cert = create(**primitives)

View File

@ -23,17 +23,18 @@ def get_sans_from_csr(data):
"""
sub_alt_names = []
try:
request = x509.load_pem_x509_csr(data.encode('utf-8'), default_backend())
request = x509.load_pem_x509_csr(data.encode("utf-8"), default_backend())
except Exception:
raise ValidationError('CSR presented is not valid.')
raise ValidationError("CSR presented is not valid.")
try:
alt_names = request.extensions.get_extension_for_class(x509.SubjectAlternativeName)
alt_names = request.extensions.get_extension_for_class(
x509.SubjectAlternativeName
)
for alt_name in alt_names.value:
sub_alt_names.append({
'nameType': type(alt_name).__name__,
'value': alt_name.value
})
sub_alt_names.append(
{"nameType": type(alt_name).__name__, "value": alt_name.value}
)
except x509.ExtensionNotFound:
pass

View File

@ -29,31 +29,45 @@ def ocsp_verify(cert, cert_path, issuer_chain_path):
:param issuer_chain_path:
:return bool: True if certificate is valid, False otherwise
"""
command = ['openssl', 'x509', '-noout', '-ocsp_uri', '-in', cert_path]
command = ["openssl", "x509", "-noout", "-ocsp_uri", "-in", cert_path]
p1 = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
url, err = p1.communicate()
if not url:
current_app.logger.debug("No OCSP URL in certificate {}".format(cert.serial_number))
current_app.logger.debug(
"No OCSP URL in certificate {}".format(cert.serial_number)
)
return None
p2 = subprocess.Popen(['openssl', 'ocsp', '-issuer', issuer_chain_path,
'-cert', cert_path, "-url", url.strip()],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
p2 = subprocess.Popen(
[
"openssl",
"ocsp",
"-issuer",
issuer_chain_path,
"-cert",
cert_path,
"-url",
url.strip(),
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
message, err = p2.communicate()
p_message = message.decode('utf-8')
p_message = message.decode("utf-8")
if 'error' in p_message or 'Error' in p_message:
if "error" in p_message or "Error" in p_message:
raise Exception("Got error when parsing OCSP url")
elif 'revoked' in p_message:
current_app.logger.debug("OCSP reports certificate revoked: {}".format(cert.serial_number))
elif "revoked" in p_message:
current_app.logger.debug(
"OCSP reports certificate revoked: {}".format(cert.serial_number)
)
return False
elif 'good' not in p_message:
elif "good" not in p_message:
raise Exception("Did not receive a valid response")
return True
@ -73,7 +87,9 @@ def crl_verify(cert, cert_path):
x509.OID_CRL_DISTRIBUTION_POINTS
).value
except x509.ExtensionNotFound:
current_app.logger.debug("No CRLDP extension in certificate {}".format(cert.serial_number))
current_app.logger.debug(
"No CRLDP extension in certificate {}".format(cert.serial_number)
)
return None
for p in distribution_points:
@ -92,8 +108,9 @@ def crl_verify(cert, cert_path):
except ConnectionError:
raise Exception("Unable to retrieve CRL: {0}".format(point))
crl_cache[point] = x509.load_der_x509_crl(response.content,
backend=default_backend())
crl_cache[point] = x509.load_der_x509_crl(
response.content, backend=default_backend()
)
else:
current_app.logger.debug("CRL point is cached {}".format(point))
@ -110,8 +127,9 @@ def crl_verify(cert, cert_path):
except x509.ExtensionNotFound:
pass
current_app.logger.debug("CRL reports certificate "
"revoked: {}".format(cert.serial_number))
current_app.logger.debug(
"CRL reports certificate " "revoked: {}".format(cert.serial_number)
)
return False
return True
@ -125,7 +143,7 @@ def verify(cert_path, issuer_chain_path):
:param issuer_chain_path:
:return: True if valid, False otherwise
"""
with open(cert_path, 'rt') as c:
with open(cert_path, "rt") as c:
try:
cert = parse_certificate(c.read())
except ValueError as e:
@ -154,10 +172,10 @@ def verify_string(cert_string, issuer_string):
:return: True if valid, False otherwise
"""
with mktempfile() as cert_tmp:
with open(cert_tmp, 'w') as f:
with open(cert_tmp, "w") as f:
f.write(cert_string)
with mktempfile() as issuer_tmp:
with open(issuer_tmp, 'w') as f:
with open(issuer_tmp, "w") as f:
f.write(issuer_string)
status = verify(cert_tmp, issuer_tmp)
return status

View File

@ -26,14 +26,14 @@ from lemur.certificates.schemas import (
certificate_upload_input_schema,
certificates_output_schema,
certificate_export_input_schema,
certificate_edit_input_schema
certificate_edit_input_schema,
)
from lemur.roles import service as role_service
from lemur.logs import service as log_service
mod = Blueprint('certificates', __name__)
mod = Blueprint("certificates", __name__)
api = Api(mod)
@ -128,8 +128,8 @@ class CertificatesListValid(AuthenticatedResource):
"""
parser = paginated_parser.copy()
args = parser.parse_args()
args['user'] = g.user
common_name = args['filter'].split(';')[1]
args["user"] = g.user
common_name = args["filter"].split(";")[1]
return service.query_common_name(common_name, args)
@ -228,16 +228,18 @@ class CertificatesNameQuery(AuthenticatedResource):
"""
parser = paginated_parser.copy()
parser.add_argument('timeRange', type=int, dest='time_range', location='args')
parser.add_argument('owner', type=inputs.boolean, location='args')
parser.add_argument('id', type=str, location='args')
parser.add_argument('active', type=inputs.boolean, location='args')
parser.add_argument('destinationId', type=int, dest="destination_id", location='args')
parser.add_argument('creator', type=str, location='args')
parser.add_argument('show', type=str, location='args')
parser.add_argument("timeRange", type=int, dest="time_range", location="args")
parser.add_argument("owner", type=inputs.boolean, location="args")
parser.add_argument("id", type=str, location="args")
parser.add_argument("active", type=inputs.boolean, location="args")
parser.add_argument(
"destinationId", type=int, dest="destination_id", location="args"
)
parser.add_argument("creator", type=str, location="args")
parser.add_argument("show", type=str, location="args")
args = parser.parse_args()
args['user'] = g.user
args["user"] = g.user
return service.query_name(certificate_name, args)
@ -336,16 +338,18 @@ class CertificatesList(AuthenticatedResource):
"""
parser = paginated_parser.copy()
parser.add_argument('timeRange', type=int, dest='time_range', location='args')
parser.add_argument('owner', type=inputs.boolean, location='args')
parser.add_argument('id', type=str, location='args')
parser.add_argument('active', type=inputs.boolean, location='args')
parser.add_argument('destinationId', type=int, dest="destination_id", location='args')
parser.add_argument('creator', type=str, location='args')
parser.add_argument('show', type=str, location='args')
parser.add_argument("timeRange", type=int, dest="time_range", location="args")
parser.add_argument("owner", type=inputs.boolean, location="args")
parser.add_argument("id", type=str, location="args")
parser.add_argument("active", type=inputs.boolean, location="args")
parser.add_argument(
"destinationId", type=int, dest="destination_id", location="args"
)
parser.add_argument("creator", type=str, location="args")
parser.add_argument("show", type=str, location="args")
args = parser.parse_args()
args['user'] = g.user
args["user"] = g.user
return service.render(args)
@validate_schema(certificate_input_schema, certificate_output_schema)
@ -463,24 +467,31 @@ class CertificatesList(AuthenticatedResource):
:statuscode 403: unauthenticated
"""
role = role_service.get_by_name(data['authority'].owner)
role = role_service.get_by_name(data["authority"].owner)
# all the authority role members should be allowed
roles = [x.name for x in data['authority'].roles]
roles = [x.name for x in data["authority"].roles]
# allow "owner" roles by team DL
roles.append(role)
authority_permission = AuthorityPermission(data['authority'].id, roles)
authority_permission = AuthorityPermission(data["authority"].id, roles)
if authority_permission.can():
data['creator'] = g.user
data["creator"] = g.user
cert = service.create(**data)
if isinstance(cert, Certificate):
# only log if created, not pending
log_service.create(g.user, 'create_cert', certificate=cert)
log_service.create(g.user, "create_cert", certificate=cert)
return cert
return dict(message="You are not authorized to use the authority: {0}".format(data['authority'].name)), 403
return (
dict(
message="You are not authorized to use the authority: {0}".format(
data["authority"].name
)
),
403,
)
class CertificatesUpload(AuthenticatedResource):
@ -583,12 +594,14 @@ class CertificatesUpload(AuthenticatedResource):
:statuscode 200: no error
"""
data['creator'] = g.user
if data.get('destinations'):
if data.get('private_key'):
data["creator"] = g.user
if data.get("destinations"):
if data.get("private_key"):
return service.upload(**data)
else:
raise Exception("Private key must be provided in order to upload certificate to AWS")
raise Exception(
"Private key must be provided in order to upload certificate to AWS"
)
return service.upload(**data)
@ -600,10 +613,12 @@ class CertificatesStats(AuthenticatedResource):
super(CertificatesStats, self).__init__()
def get(self):
self.reqparse.add_argument('metric', type=str, location='args')
self.reqparse.add_argument('range', default=32, type=int, location='args')
self.reqparse.add_argument('destinationId', dest='destination_id', location='args')
self.reqparse.add_argument('active', type=str, default='true', location='args')
self.reqparse.add_argument("metric", type=str, location="args")
self.reqparse.add_argument("range", default=32, type=int, location="args")
self.reqparse.add_argument(
"destinationId", dest="destination_id", location="args"
)
self.reqparse.add_argument("active", type=str, default="true", location="args")
args = self.reqparse.parse_args()
@ -655,12 +670,12 @@ class CertificatePrivateKey(AuthenticatedResource):
permission = CertificatePermission(owner_role, [x.name for x in cert.roles])
if not permission.can():
return dict(message='You are not authorized to view this key'), 403
return dict(message="You are not authorized to view this key"), 403
log_service.create(g.current_user, 'key_view', certificate=cert)
log_service.create(g.current_user, "key_view", certificate=cert)
response = make_response(jsonify(key=cert.private_key), 200)
response.headers['cache-control'] = 'private, max-age=0, no-cache, no-store'
response.headers['pragma'] = 'no-cache'
response.headers["cache-control"] = "private, max-age=0, no-cache, no-store"
response.headers["pragma"] = "no-cache"
return response
@ -850,19 +865,25 @@ class Certificates(AuthenticatedResource):
permission = CertificatePermission(owner_role, [x.name for x in cert.roles])
if not permission.can():
return dict(message='You are not authorized to update this certificate'), 403
return (
dict(message="You are not authorized to update this certificate"),
403,
)
for destination in data['destinations']:
for destination in data["destinations"]:
if destination.plugin.requires_key:
if not cert.private_key:
return dict(
message='Unable to add destination: {0}. Certificate does not have required private key.'.format(
destination.label
)
), 400
return (
dict(
message="Unable to add destination: {0}. Certificate does not have required private key.".format(
destination.label
)
),
400,
)
cert = service.update(certificate_id, **data)
log_service.create(g.current_user, 'update_cert', certificate=cert)
log_service.create(g.current_user, "update_cert", certificate=cert)
return cert
def delete(self, certificate_id, data=None):
@ -891,7 +912,7 @@ class Certificates(AuthenticatedResource):
:statuscode 405: certificate deletion is disabled
"""
if not current_app.config.get('ALLOW_CERT_DELETION', False):
if not current_app.config.get("ALLOW_CERT_DELETION", False):
return dict(message="Certificate deletion is disabled"), 405
cert = service.get(certificate_id)
@ -908,11 +929,14 @@ class Certificates(AuthenticatedResource):
permission = CertificatePermission(owner_role, [x.name for x in cert.roles])
if not permission.can():
return dict(message='You are not authorized to delete this certificate'), 403
return (
dict(message="You are not authorized to delete this certificate"),
403,
)
service.update(certificate_id, deleted=True)
log_service.create(g.current_user, 'delete_cert', certificate=cert)
return 'Certificate deleted', 204
log_service.create(g.current_user, "delete_cert", certificate=cert)
return "Certificate deleted", 204
class NotificationCertificatesList(AuthenticatedResource):
@ -1012,17 +1036,19 @@ class NotificationCertificatesList(AuthenticatedResource):
"""
parser = paginated_parser.copy()
parser.add_argument('timeRange', type=int, dest='time_range', location='args')
parser.add_argument('owner', type=inputs.boolean, location='args')
parser.add_argument('id', type=str, location='args')
parser.add_argument('active', type=inputs.boolean, location='args')
parser.add_argument('destinationId', type=int, dest="destination_id", location='args')
parser.add_argument('creator', type=str, location='args')
parser.add_argument('show', type=str, location='args')
parser.add_argument("timeRange", type=int, dest="time_range", location="args")
parser.add_argument("owner", type=inputs.boolean, location="args")
parser.add_argument("id", type=str, location="args")
parser.add_argument("active", type=inputs.boolean, location="args")
parser.add_argument(
"destinationId", type=int, dest="destination_id", location="args"
)
parser.add_argument("creator", type=str, location="args")
parser.add_argument("show", type=str, location="args")
args = parser.parse_args()
args['notification_id'] = notification_id
args['user'] = g.current_user
args["notification_id"] = notification_id
args["user"] = g.current_user
return service.render(args)
@ -1195,30 +1221,48 @@ class CertificateExport(AuthenticatedResource):
if not cert:
return dict(message="Cannot find specified certificate"), 404
plugin = data['plugin']['plugin_object']
plugin = data["plugin"]["plugin_object"]
if plugin.requires_key:
if not cert.private_key:
return dict(
message='Unable to export certificate, plugin: {0} requires a private key but no key was found.'.format(
plugin.slug)), 400
return (
dict(
message="Unable to export certificate, plugin: {0} requires a private key but no key was found.".format(
plugin.slug
)
),
400,
)
else:
# allow creators
if g.current_user != cert.user:
owner_role = role_service.get_by_name(cert.owner)
permission = CertificatePermission(owner_role, [x.name for x in cert.roles])
permission = CertificatePermission(
owner_role, [x.name for x in cert.roles]
)
if not permission.can():
return dict(message='You are not authorized to export this certificate.'), 403
return (
dict(
message="You are not authorized to export this certificate."
),
403,
)
options = data['plugin']['plugin_options']
options = data["plugin"]["plugin_options"]
log_service.create(g.current_user, 'key_view', certificate=cert)
extension, passphrase, data = plugin.export(cert.body, cert.chain, cert.private_key, options)
log_service.create(g.current_user, "key_view", certificate=cert)
extension, passphrase, data = plugin.export(
cert.body, cert.chain, cert.private_key, options
)
# we take a hit in message size when b64 encoding
return dict(extension=extension, passphrase=passphrase, data=base64.b64encode(data).decode('utf-8'))
return dict(
extension=extension,
passphrase=passphrase,
data=base64.b64encode(data).decode("utf-8"),
)
class CertificateRevoke(AuthenticatedResource):
@ -1269,30 +1313,66 @@ class CertificateRevoke(AuthenticatedResource):
permission = CertificatePermission(owner_role, [x.name for x in cert.roles])
if not permission.can():
return dict(message='You are not authorized to revoke this certificate.'), 403
return (
dict(message="You are not authorized to revoke this certificate."),
403,
)
if not cert.external_id:
return dict(message='Cannot revoke certificate. No external id found.'), 400
return dict(message="Cannot revoke certificate. No external id found."), 400
if cert.endpoints:
return dict(message='Cannot revoke certificate. Endpoints are deployed with the given certificate.'), 403
return (
dict(
message="Cannot revoke certificate. Endpoints are deployed with the given certificate."
),
403,
)
plugin = plugins.get(cert.authority.plugin_name)
plugin.revoke_certificate(cert, data)
log_service.create(g.current_user, 'revoke_cert', certificate=cert)
log_service.create(g.current_user, "revoke_cert", certificate=cert)
return dict(id=cert.id)
api.add_resource(CertificateRevoke, '/certificates/<int:certificate_id>/revoke', endpoint='revokeCertificate')
api.add_resource(CertificatesNameQuery, '/certificates/name/<string:certificate_name>', endpoint='certificatesNameQuery')
api.add_resource(CertificatesList, '/certificates', endpoint='certificates')
api.add_resource(CertificatesListValid, '/certificates/valid', endpoint='certificatesListValid')
api.add_resource(Certificates, '/certificates/<int:certificate_id>', endpoint='certificate')
api.add_resource(CertificatesStats, '/certificates/stats', endpoint='certificateStats')
api.add_resource(CertificatesUpload, '/certificates/upload', endpoint='certificateUpload')
api.add_resource(CertificatePrivateKey, '/certificates/<int:certificate_id>/key', endpoint='privateKeyCertificates')
api.add_resource(CertificateExport, '/certificates/<int:certificate_id>/export', endpoint='exportCertificate')
api.add_resource(NotificationCertificatesList, '/notifications/<int:notification_id>/certificates',
endpoint='notificationCertificates')
api.add_resource(CertificatesReplacementsList, '/certificates/<int:certificate_id>/replacements',
endpoint='replacements')
api.add_resource(
CertificateRevoke,
"/certificates/<int:certificate_id>/revoke",
endpoint="revokeCertificate",
)
api.add_resource(
CertificatesNameQuery,
"/certificates/name/<string:certificate_name>",
endpoint="certificatesNameQuery",
)
api.add_resource(CertificatesList, "/certificates", endpoint="certificates")
api.add_resource(
CertificatesListValid, "/certificates/valid", endpoint="certificatesListValid"
)
api.add_resource(
Certificates, "/certificates/<int:certificate_id>", endpoint="certificate"
)
api.add_resource(CertificatesStats, "/certificates/stats", endpoint="certificateStats")
api.add_resource(
CertificatesUpload, "/certificates/upload", endpoint="certificateUpload"
)
api.add_resource(
CertificatePrivateKey,
"/certificates/<int:certificate_id>/key",
endpoint="privateKeyCertificates",
)
api.add_resource(
CertificateExport,
"/certificates/<int:certificate_id>/export",
endpoint="exportCertificate",
)
api.add_resource(
NotificationCertificatesList,
"/notifications/<int:notification_id>/certificates",
endpoint="notificationCertificates",
)
api.add_resource(
CertificatesReplacementsList,
"/certificates/<int:certificate_id>/replacements",
endpoint="replacements",
)