Merge branch 'master' into lemur_vault_plugin

This commit is contained in:
alwaysjolley 2019-03-01 09:58:43 -05:00 committed by GitHub
commit 20518bc377
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 567 additions and 89 deletions

View File

@ -54,7 +54,7 @@ def get_by_name(name):
def get_by_serial(serial):
"""
Retrieves certificate by it's Serial.
Retrieves certificate(s) by serial number.
:param serial:
:return:
"""
@ -64,6 +64,22 @@ def get_by_serial(serial):
return Certificate.query.filter(Certificate.serial == serial).all()
def get_by_attributes(conditions):
"""
Retrieves certificate(s) by conditions given in a hash of given key=>value pairs.
:param serial:
:return:
"""
# Ensure that each of the given conditions corresponds to actual columns
# if not, silently remove it
for attr in conditions.keys():
if attr not in Certificate.__table__.columns:
conditions.pop(attr)
query = database.session_query(Certificate)
return database.find_all(query, Certificate, conditions).all()
def delete(cert_id):
"""
Delete's a certificate.
@ -221,11 +237,6 @@ def upload(**kwargs):
else:
kwargs['roles'] = roles
if kwargs.get('private_key'):
private_key = kwargs['private_key']
if not isinstance(private_key, bytes):
kwargs['private_key'] = private_key.encode('utf-8')
cert = Certificate(**kwargs)
cert.authority = kwargs.get('authority')
cert = database.create(cert)
@ -432,10 +443,7 @@ def create_csr(**csr_config):
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL, # would like to use PKCS8 but AWS ELBs don't like it
encryption_algorithm=serialization.NoEncryption()
)
if isinstance(private_key, bytes):
private_key = private_key.decode('utf-8')
).decode('utf-8')
csr = request.public_bytes(
encoding=serialization.Encoding.PEM

View File

@ -6,6 +6,7 @@
.. moduleauthor:: Kevin Glisson <kglisson@netflix.com>
"""
import base64
import arrow
from builtins import str
from flask import Blueprint, make_response, jsonify, g
@ -660,6 +661,51 @@ class Certificates(AuthenticatedResource):
log_service.create(g.current_user, 'update_cert', certificate=cert)
return cert
def delete(self, certificate_id, data=None):
"""
.. http:delete:: /certificates/1
Delete a certificate
**Example request**:
.. sourcecode:: http
DELETE /certificates/1 HTTP/1.1
Host: example.com
**Example response**:
.. sourcecode:: http
HTTP/1.1 200 OK
:reqheader Authorization: OAuth token to authenticate
:statuscode 204: no error
:statuscode 403: unauthenticated
:statusoode 404: certificate not found
"""
cert = service.get(certificate_id)
if not cert:
return dict(message="Cannot find specified certificate"), 404
# allow creators
if g.current_user != cert.user:
owner_role = role_service.get_by_name(cert.owner)
permission = CertificatePermission(owner_role, [x.name for x in cert.roles])
if not permission.can():
return dict(message='You are not authorized to delete this certificate'), 403
if arrow.get(cert.not_after) > arrow.utcnow():
return dict(message='Certificate is still valid, only expired certificates can be deleted'), 412
service.update(certificate_id, deleted=True)
log_service.create(g.current_user, 'delete_cert', certificate=cert)
return '', 204
class NotificationCertificatesList(AuthenticatedResource):
""" Defines the 'certificates' endpoint """

View File

@ -3,6 +3,8 @@ import unicodedata
from cryptography import x509
from flask import current_app
from lemur.common.utils import is_selfsigned
from lemur.extensions import sentry
from lemur.constants import SAN_NAMING_TEMPLATE, DEFAULT_NAMING_TEMPLATE
@ -229,15 +231,22 @@ def issuer(cert):
"""
Gets a sane issuer slug from a given certificate, stripping non-alphanumeric characters.
:param cert:
For self-signed certificates, the special value '<selfsigned>' is returned.
If issuer cannot be determined, '<unknown>' is returned.
:param cert: Parsed certificate object
:return: Issuer slug
"""
# If certificate is self-signed, we return a special value -- there really is no distinct "issuer" for it
if is_selfsigned(cert):
return '<selfsigned>'
# Try Common Name or fall back to Organization name
attrs = (cert.issuer.get_attributes_for_oid(x509.OID_COMMON_NAME) or
cert.issuer.get_attributes_for_oid(x509.OID_ORGANIZATION_NAME))
if not attrs:
current_app.logger.error("Unable to get issuer! Cert serial {:x}".format(cert.serial_number))
return "Unknown"
return '<unknown>'
return text_to_slug(attrs[0].value, '')

View File

@ -11,9 +11,10 @@ import string
import sqlalchemy
from cryptography import x509
from cryptography.exceptions import InvalidSignature, UnsupportedAlgorithm
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import rsa, ec
from cryptography.hazmat.primitives.asymmetric import rsa, ec, padding
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from flask_restful.reqparse import RequestParser
from sqlalchemy import and_, func
@ -48,24 +49,22 @@ def parse_certificate(body):
:param body:
:return:
"""
if isinstance(body, str):
body = body.encode('utf-8')
assert isinstance(body, str)
return x509.load_pem_x509_certificate(body, default_backend())
return x509.load_pem_x509_certificate(body.encode('utf-8'), default_backend())
def parse_private_key(private_key):
"""
Parses a PEM-format private key (RSA, DSA, ECDSA or any other supported algorithm).
Raises ValueError for an invalid string.
Raises ValueError for an invalid string. Raises AssertionError when passed value is not str-type.
:param private_key: String containing PEM private key
"""
if isinstance(private_key, str):
private_key = private_key.encode('utf8')
assert isinstance(private_key, str)
return load_pem_private_key(private_key, password=None, backend=default_backend())
return load_pem_private_key(private_key.encode('utf8'), password=None, backend=default_backend())
def parse_csr(csr):
@ -75,10 +74,9 @@ def parse_csr(csr):
:param csr:
:return:
"""
if isinstance(csr, str):
csr = csr.encode('utf-8')
assert isinstance(csr, str)
return x509.load_pem_x509_csr(csr, default_backend())
return x509.load_pem_x509_csr(csr.encode('utf-8'), default_backend())
def get_authority_key(body):
@ -146,6 +144,42 @@ def generate_private_key(key_type):
)
def check_cert_signature(cert, issuer_public_key):
"""
Check a certificate's signature against an issuer public key.
Before EC validation, make sure we support the algorithm, otherwise raise UnsupportedAlgorithm
On success, returns None; on failure, raises UnsupportedAlgorithm or InvalidSignature.
"""
if isinstance(issuer_public_key, rsa.RSAPublicKey):
# RSA requires padding, just to make life difficult for us poor developers :(
if cert.signature_algorithm_oid == x509.SignatureAlgorithmOID.RSASSA_PSS:
# In 2005, IETF devised a more secure padding scheme to replace PKCS #1 v1.5. To make sure that
# nobody can easily support or use it, they mandated lots of complicated parameters, unlike any
# other X.509 signature scheme.
# https://tools.ietf.org/html/rfc4056
raise UnsupportedAlgorithm("RSASSA-PSS not supported")
else:
padder = padding.PKCS1v15()
issuer_public_key.verify(cert.signature, cert.tbs_certificate_bytes, padder, cert.signature_hash_algorithm)
elif isinstance(issuer_public_key, ec.EllipticCurvePublicKey) and isinstance(ec.ECDSA(cert.signature_hash_algorithm), ec.ECDSA):
issuer_public_key.verify(cert.signature, cert.tbs_certificate_bytes, ec.ECDSA(cert.signature_hash_algorithm))
else:
raise UnsupportedAlgorithm("Unsupported Algorithm '{var}'.".format(var=cert.signature_algorithm_oid._name))
def is_selfsigned(cert):
"""
Returns True if the certificate is self-signed.
Returns False for failed verification or unsupported signing algorithm.
"""
try:
check_cert_signature(cert, cert.public_key())
# If verification was successful, it's self-signed.
return True
except InvalidSignature:
return False
def is_weekend(date):
"""
Determines if a given date is on a weekend.

View File

@ -18,6 +18,6 @@ class Log(db.Model):
__tablename__ = 'logs'
id = Column(Integer, primary_key=True)
certificate_id = Column(Integer, ForeignKey('certificates.id'))
log_type = Column(Enum('key_view', 'create_cert', 'update_cert', 'revoke_cert', name='log_type'), nullable=False)
log_type = Column(Enum('key_view', 'create_cert', 'update_cert', 'revoke_cert', 'delete_cert', name='log_type'), nullable=False)
logged_at = Column(ArrowType(), PassiveDefault(func.now()), nullable=False)
user_id = Column(Integer, ForeignKey('users.id'), nullable=False)

View File

@ -0,0 +1,22 @@
""" Add delete_cert to log_type enum
Revision ID: 9f79024fe67b
Revises: ee827d1e1974
Create Date: 2019-01-03 15:36:59.181911
"""
# revision identifiers, used by Alembic.
revision = '9f79024fe67b'
down_revision = 'ee827d1e1974'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.sync_enum_values('public', 'log_type', ['create_cert', 'key_view', 'revoke_cert', 'update_cert'], ['create_cert', 'delete_cert', 'key_view', 'revoke_cert', 'update_cert'])
def downgrade():
op.sync_enum_values('public', 'log_type', ['create_cert', 'delete_cert', 'key_view', 'revoke_cert', 'update_cert'], ['create_cert', 'key_view', 'revoke_cert', 'update_cert'])

View File

@ -0,0 +1,6 @@
"""Set the version information."""
try:
VERSION = __import__('pkg_resources') \
.get_distribution(__name__).version
except Exception as e:
VERSION = 'unknown'

View File

@ -0,0 +1,116 @@
from lemur.plugins.bases import IssuerPlugin, SourcePlugin
import requests
from lemur.plugins import lemur_adcs as ADCS
from certsrv import Certsrv
from OpenSSL import crypto
from flask import current_app
class ADCSIssuerPlugin(IssuerPlugin):
title = 'ADCS'
slug = 'adcs-issuer'
description = 'Enables the creation of certificates by ADCS (Active Directory Certificate Services)'
version = ADCS.VERSION
author = 'sirferl'
author_url = 'https://github.com/sirferl/lemur'
def __init__(self, *args, **kwargs):
"""Initialize the issuer with the appropriate details."""
self.session = requests.Session()
super(ADCSIssuerPlugin, self).__init__(*args, **kwargs)
@staticmethod
def create_authority(options):
"""Create an authority.
Creates an authority, this authority is then used by Lemur to
allow a user to specify which Certificate Authority they want
to sign their certificate.
:param options:
:return:
"""
adcs_root = current_app.config.get('ADCS_ROOT')
adcs_issuing = current_app.config.get('ADCS_ISSUING')
role = {'username': '', 'password': '', 'name': 'adcs'}
return adcs_root, adcs_issuing, [role]
def create_certificate(self, csr, issuer_options):
adcs_server = current_app.config.get('ADCS_SERVER')
adcs_user = current_app.config.get('ADCS_USER')
adcs_pwd = current_app.config.get('ADCS_PWD')
adcs_auth_method = current_app.config.get('ADCS_AUTH_METHOD')
adcs_template = current_app.config.get('ADCS_TEMPLATE')
ca_server = Certsrv(adcs_server, adcs_user, adcs_pwd, auth_method=adcs_auth_method)
current_app.logger.info("Requesting CSR: {0}".format(csr))
current_app.logger.info("Issuer options: {0}".format(issuer_options))
cert, req_id = ca_server.get_cert(csr, adcs_template, encoding='b64').decode('utf-8').replace('\r\n', '\n')
chain = ca_server.get_ca_cert(encoding='b64').decode('utf-8').replace('\r\n', '\n')
return cert, chain, req_id
def revoke_certificate(self, certificate, comments):
raise NotImplementedError('Not implemented\n', self, certificate, comments)
def get_ordered_certificate(self, order_id):
raise NotImplementedError('Not implemented\n', self, order_id)
def canceled_ordered_certificate(self, pending_cert, **kwargs):
raise NotImplementedError('Not implemented\n', self, pending_cert, **kwargs)
class ADCSSourcePlugin(SourcePlugin):
title = 'ADCS'
slug = 'adcs-source'
description = 'Enables the collecion of certificates'
version = ADCS.VERSION
author = 'sirferl'
author_url = 'https://github.com/sirferl/lemur'
options = [
{
'name': 'dummy',
'type': 'str',
'required': False,
'validation': '/^[0-9]{12,12}$/',
'helpMessage': 'Just to prevent error'
}
]
def get_certificates(self, options, **kwargs):
adcs_server = current_app.config.get('ADCS_SERVER')
adcs_user = current_app.config.get('ADCS_USER')
adcs_pwd = current_app.config.get('ADCS_PWD')
adcs_auth_method = current_app.config.get('ADCS_AUTH_METHOD')
adcs_start = current_app.config.get('ADCS_START')
adcs_stop = current_app.config.get('ADCS_STOP')
ca_server = Certsrv(adcs_server, adcs_user, adcs_pwd, auth_method=adcs_auth_method)
out_certlist = []
for id in range(adcs_start, adcs_stop):
try:
cert = ca_server.get_existing_cert(id, encoding='b64').decode('utf-8').replace('\r\n', '\n')
except Exception as err:
if '{0}'.format(err).find("CERTSRV_E_PROPERTY_EMPTY"):
# this error indicates end of certificate list(?), so we stop
break
else:
# We do nothing in case there is no certificate returned for other reasons
current_app.logger.info("Error with id {0}: {1}".format(id, err))
else:
# we have a certificate
pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
# loop through extensions to see if we find "TLS Web Server Authentication"
for e_id in range(0, pubkey.get_extension_count() - 1):
try:
extension = '{0}'.format(pubkey.get_extension(e_id))
except Exception:
extensionn = ''
if extension.find("TLS Web Server Authentication") != -1:
out_certlist.append({
'name': format(pubkey.get_subject().CN),
'body': cert})
break
return out_certlist
def get_endpoints(self, options, **kwargs):
# There are no endpoints in the ADCS
raise NotImplementedError('Not implemented\n', self, options, **kwargs)

View File

@ -64,6 +64,7 @@ def upload_cert(name, body, private_key, path, cert_chain=None, **kwargs):
:param path:
:return:
"""
assert isinstance(private_key, str)
client = kwargs.pop('client')
if not path or path == '/':
@ -72,8 +73,6 @@ def upload_cert(name, body, private_key, path, cert_chain=None, **kwargs):
name = name + '-' + path.strip('/')
try:
if isinstance(private_key, bytes):
private_key = private_key.decode("utf-8")
if cert_chain:
return client.upload_server_certificate(
Path=path,

View File

@ -14,6 +14,7 @@ from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from lemur.common.utils import parse_private_key
from lemur.plugins.bases import IssuerPlugin
from lemur.plugins import lemur_cryptography as cryptography_issuer
@ -40,7 +41,8 @@ def issue_certificate(csr, options, private_key=None):
if options.get("authority"):
# Issue certificate signed by an existing lemur_certificates authority
issuer_subject = options['authority'].authority_certificate.subject
issuer_private_key = options['authority'].authority_certificate.private_key
assert private_key is None, "Private would be ignored, authority key used instead"
private_key = options['authority'].authority_certificate.private_key
chain_cert_pem = options['authority'].authority_certificate.body
authority_key_identifier_public = options['authority'].authority_certificate.public_key
authority_key_identifier_subject = x509.SubjectKeyIdentifier.from_public_key(authority_key_identifier_public)
@ -52,7 +54,6 @@ def issue_certificate(csr, options, private_key=None):
else:
# Issue certificate that is self-signed (new lemur_certificates root authority)
issuer_subject = csr.subject
issuer_private_key = private_key
chain_cert_pem = ""
authority_key_identifier_public = csr.public_key()
authority_key_identifier_subject = None
@ -112,11 +113,7 @@ def issue_certificate(csr, options, private_key=None):
# FIXME: Not implemented in lemur/schemas.py yet https://github.com/Netflix/lemur/issues/662
pass
private_key = serialization.load_pem_private_key(
bytes(str(issuer_private_key).encode('utf-8')),
password=None,
backend=default_backend()
)
private_key = parse_private_key(private_key)
cert = builder.sign(private_key, hashes.SHA256(), default_backend())
cert_pem = cert.public_bytes(

View File

@ -38,14 +38,9 @@ def create_csr(cert, chain, csr_tmp, key):
:param csr_tmp:
:param key:
"""
if isinstance(cert, bytes):
cert = cert.decode('utf-8')
if isinstance(chain, bytes):
chain = chain.decode('utf-8')
if isinstance(key, bytes):
key = key.decode('utf-8')
assert isinstance(cert, str)
assert isinstance(chain, str)
assert isinstance(key, str)
with mktempfile() as key_tmp:
with open(key_tmp, 'w') as f:

View File

@ -15,6 +15,8 @@ from cryptography.fernet import Fernet
from lemur.utils import mktempfile, mktemppath
from lemur.plugins.bases import ExportPlugin
from lemur.plugins import lemur_java as java
from lemur.common.utils import parse_certificate
from lemur.common.defaults import common_name
def run_process(command):
@ -59,11 +61,8 @@ def split_chain(chain):
def create_truststore(cert, chain, jks_tmp, alias, passphrase):
if isinstance(cert, bytes):
cert = cert.decode('utf-8')
if isinstance(chain, bytes):
chain = chain.decode('utf-8')
assert isinstance(cert, str)
assert isinstance(chain, str)
with mktempfile() as cert_tmp:
with open(cert_tmp, 'w') as f:
@ -98,14 +97,9 @@ def create_truststore(cert, chain, jks_tmp, alias, passphrase):
def create_keystore(cert, chain, jks_tmp, key, alias, passphrase):
if isinstance(cert, bytes):
cert = cert.decode('utf-8')
if isinstance(chain, bytes):
chain = chain.decode('utf-8')
if isinstance(key, bytes):
key = key.decode('utf-8')
assert isinstance(cert, str)
assert isinstance(chain, str)
assert isinstance(key, str)
# Create PKCS12 keystore from private key and public certificate
with mktempfile() as cert_tmp:
@ -241,7 +235,7 @@ class JavaKeystoreExportPlugin(ExportPlugin):
if self.get_option('alias', options):
alias = self.get_option('alias', options)
else:
alias = "blah"
alias = common_name(parse_certificate(body))
with mktemppath() as jks_tmp:
create_keystore(body, chain, jks_tmp, key, alias, passphrase)

View File

@ -14,7 +14,8 @@ from flask import current_app
from lemur.utils import mktempfile, mktemppath
from lemur.plugins.bases import ExportPlugin
from lemur.plugins import lemur_openssl as openssl
from lemur.common.utils import get_psuedo_random_string
from lemur.common.utils import get_psuedo_random_string, parse_certificate
from lemur.common.defaults import common_name
def run_process(command):
@ -44,14 +45,9 @@ def create_pkcs12(cert, chain, p12_tmp, key, alias, passphrase):
:param alias:
:param passphrase:
"""
if isinstance(cert, bytes):
cert = cert.decode('utf-8')
if isinstance(chain, bytes):
chain = chain.decode('utf-8')
if isinstance(key, bytes):
key = key.decode('utf-8')
assert isinstance(cert, str)
assert isinstance(chain, str)
assert isinstance(key, str)
with mktempfile() as key_tmp:
with open(key_tmp, 'w') as f:
@ -127,7 +123,7 @@ class OpenSSLExportPlugin(ExportPlugin):
if self.get_option('alias', options):
alias = self.get_option('alias', options)
else:
alias = "blah"
alias = common_name(parse_certificate(body))
type = self.get_option('type', options)

View File

@ -116,7 +116,12 @@ def sync_certificates(source, user):
for certificate in certificates:
exists = False
if certificate.get('name'):
if certificate.get('search', None):
conditions = certificate.pop('search')
exists = certificate_service.get_by_attributes(conditions)
if not exists and certificate.get('name'):
result = certificate_service.get_by_name(certificate['name'])
if result:
exists = [result]

View File

@ -4,18 +4,20 @@ import datetime
import pytest
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.primitives import hashes
from flask import current_app
from flask_principal import identity_changed, Identity
from lemur import create_app
from lemur.common.utils import parse_private_key
from lemur.database import db as _db
from lemur.auth.service import create_token
from lemur.tests.vectors import SAN_CERT_KEY, INTERMEDIATE_KEY
from .factories import ApiKeyFactory, AuthorityFactory, NotificationFactory, DestinationFactory, \
CertificateFactory, UserFactory, RoleFactory, SourceFactory, EndpointFactory, \
RotationPolicyFactory, PendingCertificateFactory, AsyncAuthorityFactory, CryptoAuthorityFactory
RotationPolicyFactory, PendingCertificateFactory, AsyncAuthorityFactory, InvalidCertificateFactory, \
CryptoAuthorityFactory
def pytest_runtest_setup(item):
@ -168,6 +170,15 @@ def pending_certificate(session):
return p
@pytest.fixture
def invalid_certificate(session):
u = UserFactory()
a = AsyncAuthorityFactory()
i = InvalidCertificateFactory(user=u, authority=a)
session.commit()
return i
@pytest.fixture
def admin_user(session):
u = UserFactory()
@ -235,12 +246,12 @@ def logged_in_admin(session, app):
@pytest.fixture
def private_key():
return load_pem_private_key(SAN_CERT_KEY.encode(), password=None, backend=default_backend())
return parse_private_key(SAN_CERT_KEY)
@pytest.fixture
def issuer_private_key():
return load_pem_private_key(INTERMEDIATE_KEY.encode(), password=None, backend=default_backend())
return parse_private_key(INTERMEDIATE_KEY)
@pytest.fixture
@ -254,6 +265,12 @@ def cert_builder(private_key):
.not_valid_after(datetime.datetime(2040, 1, 1)))
@pytest.fixture
def selfsigned_cert(cert_builder, private_key):
# cert_builder uses the same cert public key as 'private_key'
return cert_builder.sign(private_key, hashes.SHA256(), default_backend())
@pytest.fixture(scope='function')
def aws_credentials():
os.environ['AWS_ACCESS_KEY_ID'] = 'testing'

View File

@ -20,7 +20,7 @@ from lemur.policies.models import RotationPolicy
from lemur.api_keys.models import ApiKey
from .vectors import SAN_CERT_STR, SAN_CERT_KEY, CSR_STR, INTERMEDIATE_CERT_STR, ROOTCA_CERT_STR, INTERMEDIATE_KEY, \
WILDCARD_CERT_KEY
WILDCARD_CERT_KEY, INVALID_CERT_STR
class BaseFactory(SQLAlchemyModelFactory):
@ -137,6 +137,11 @@ class CACertificateFactory(CertificateFactory):
private_key = INTERMEDIATE_KEY
class InvalidCertificateFactory(CertificateFactory):
body = INVALID_CERT_STR
private_key = ''
class AuthorityFactory(BaseFactory):
"""Authority factory."""
name = Sequence(lambda n: 'authority{0}'.format(n))

View File

@ -41,6 +41,89 @@ def test_get_or_increase_name(session, certificate):
assert get_or_increase_name('certificate1', int(serial, 16)) == 'certificate1-{}-1'.format(serial)
def test_get_all_certs(session, certificate):
from lemur.certificates.service import get_all_certs
assert len(get_all_certs()) > 1
def test_get_by_name(session, certificate):
from lemur.certificates.service import get_by_name
found = get_by_name(certificate.name)
assert found
def test_get_by_serial(session, certificate):
from lemur.certificates.service import get_by_serial
found = get_by_serial(certificate.serial)
assert found
def test_delete_cert(session):
from lemur.certificates.service import delete, get
from lemur.tests.factories import CertificateFactory
delete_this = CertificateFactory(name='DELETEME')
session.commit()
cert_exists = get(delete_this.id)
# it needs to exist first
assert cert_exists
delete(delete_this.id)
cert_exists = get(delete_this.id)
# then not exist after delete
assert not cert_exists
def test_get_by_attributes(session, certificate):
from lemur.certificates.service import get_by_attributes
# Should get one cert
certificate1 = get_by_attributes({
'name': 'SAN-san.example.org-LemurTrustUnittestsClass1CA2018-20171231-20471231'
})
# Should get one cert using multiple attrs
certificate2 = get_by_attributes({
'name': 'test-cert-11111111-1',
'cn': 'san.example.org'
})
# Should get multiple certs
multiple = get_by_attributes({
'cn': 'LemurTrust Unittests Class 1 CA 2018',
'issuer': 'LemurTrustUnittestsRootCA2018'
})
assert len(certificate1) == 1
assert len(certificate2) == 1
assert len(multiple) > 1
def test_find_duplicates(session):
from lemur.certificates.service import find_duplicates
cert = {
'body': SAN_CERT_STR,
'chain': INTERMEDIATE_CERT_STR
}
dups1 = find_duplicates(cert)
cert['chain'] = ''
dups2 = find_duplicates(cert)
assert len(dups1) > 0
assert len(dups2) > 0
def test_get_certificate_primitives(certificate):
from lemur.certificates.service import get_certificate_primitives
@ -653,15 +736,26 @@ def test_certificate_put_with_data(client, certificate, issuer_plugin):
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405)
(VALID_USER_HEADER_TOKEN, 403),
(VALID_ADMIN_HEADER_TOKEN, 412),
(VALID_ADMIN_API_TOKEN, 412),
('', 401)
])
def test_certificate_delete(client, token, status):
assert client.delete(api.url_for(Certificates, certificate_id=1), headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 403),
(VALID_ADMIN_HEADER_TOKEN, 204),
(VALID_ADMIN_API_TOKEN, 204),
('', 401)
])
def test_invalid_certificate_delete(client, invalid_certificate, token, status):
assert client.delete(
api.url_for(Certificates, certificate_id=invalid_certificate.id), headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405),

View File

@ -81,6 +81,13 @@ def test_create_name(client):
datetime(2015, 5, 12, 0, 0, 0),
False
) == 'xn--mnchen-3ya.de-VertrauenswurdigAutoritat-20150507-20150512'
assert certificate_name(
'selfie.example.org',
'<selfsigned>',
datetime(2015, 5, 7, 0, 0, 0),
datetime(2025, 5, 12, 13, 37, 0),
False
) == 'selfie.example.org-selfsigned-20150507-20250512'
def test_issuer(client, cert_builder, issuer_private_key):
@ -106,4 +113,9 @@ def test_issuer(client, cert_builder, issuer_private_key):
cert = (cert_builder
.issuer_name(x509.Name([]))
.sign(issuer_private_key, hashes.SHA256(), default_backend()))
assert issuer(cert) == 'Unknown'
assert issuer(cert) == '<unknown>'
def test_issuer_selfsigned(selfsigned_cert):
from lemur.common.defaults import issuer
assert issuer(selfsigned_cert) == '<selfsigned>'

View File

@ -1,5 +1,7 @@
import pytest
from lemur.tests.vectors import SAN_CERT, INTERMEDIATE_CERT, ROOTCA_CERT, EC_CERT_EXAMPLE, ECDSA_PRIME256V1_CERT, ECDSA_SECP384r1_CERT, DSA_CERT
def test_generate_private_key():
from lemur.common.utils import generate_private_key
@ -71,3 +73,21 @@ KFfxwrO1
-----END CERTIFICATE-----'''
authority_key = get_authority_key(test_cert)
assert authority_key == 'feacb541be81771293affa412d8dc9f66a3ebb80'
def test_is_selfsigned(selfsigned_cert):
from lemur.common.utils import is_selfsigned
assert is_selfsigned(selfsigned_cert) is True
assert is_selfsigned(SAN_CERT) is False
assert is_selfsigned(INTERMEDIATE_CERT) is False
# Root CA certificates are also technically self-signed
assert is_selfsigned(ROOTCA_CERT) is True
assert is_selfsigned(EC_CERT_EXAMPLE) is False
# selfsigned certs
assert is_selfsigned(ECDSA_PRIME256V1_CERT) is True
assert is_selfsigned(ECDSA_SECP384r1_CERT) is True
# unsupported algorithm (DSA)
with pytest.raises(Exception):
is_selfsigned(DSA_CERT)

View File

@ -45,6 +45,7 @@ ssvobJ6Xe2D4cCVjUmsqtFEztMgdqgmlcWyGdUKeXdi7CMoeTb4uO+9qRQq46wYW
n7K1z+W0Kp5yhnnPAoOioAP4vjASDx3z3RnLaZvMmcO7YdCIwhE5oGV0
-----END CERTIFICATE-----
"""
ROOTCA_CERT = parse_certificate(ROOTCA_CERT_STR)
ROOTCA_KEY = """\
-----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEAvyVpe0tfIzri3l3PYH2r7hW86wKF58GLY+Ua52rEO5E3eXQq
@ -393,3 +394,98 @@ zm3Cn4Ul8DO26w9QS4fmZjmnPOZFXYMWoOR6osHzb62PWQ8FBMqXcdToBV2Q9Iw4
PiFAxlc0tVjlLqQ=
-----END CERTIFICATE REQUEST-----
"""
EC_CERT_STR = """
-----BEGIN CERTIFICATE-----
MIIDxzCCAq+gAwIBAgIIHsJeci1JWAkwDQYJKoZIhvcNAQELBQAwVDELMAkGA1UE
BhMCVVMxHjAcBgNVBAoTFUdvb2dsZSBUcnVzdCBTZXJ2aWNlczElMCMGA1UEAxMc
R29vZ2xlIEludGVybmV0IEF1dGhvcml0eSBHMzAeFw0xOTAyMTMxNTM1NTdaFw0x
OTA1MDgxNTM1MDBaMGgxCzAJBgNVBAYTAlVTMRMwEQYDVQQIDApDYWxpZm9ybmlh
MRYwFAYDVQQHDA1Nb3VudGFpbiBWaWV3MRMwEQYDVQQKDApHb29nbGUgTExDMRcw
FQYDVQQDDA53d3cuZ29vZ2xlLmNvbTBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IA
BKwMlIbd4rAwf6eWoa6RrR2w0s5k1M40XOORPf96PByPmld+qhjRMLvA/xcAxdCR
XdcMfaX6EUr0Zw8CepitMB2jggFSMIIBTjATBgNVHSUEDDAKBggrBgEFBQcDATAO
BgNVHQ8BAf8EBAMCB4AwGQYDVR0RBBIwEIIOd3d3Lmdvb2dsZS5jb20waAYIKwYB
BQUHAQEEXDBaMC0GCCsGAQUFBzAChiFodHRwOi8vcGtpLmdvb2cvZ3NyMi9HVFNH
SUFHMy5jcnQwKQYIKwYBBQUHMAGGHWh0dHA6Ly9vY3NwLnBraS5nb29nL0dUU0dJ
QUczMB0GA1UdDgQWBBQLovm8GG0oG91gOGCL58YPNoAlejAMBgNVHRMBAf8EAjAA
MB8GA1UdIwQYMBaAFHfCuFCaZ3Z2sS3ChtCDoH6mfrpLMCEGA1UdIAQaMBgwDAYK
KwYBBAHWeQIFAzAIBgZngQwBAgIwMQYDVR0fBCowKDAmoCSgIoYgaHR0cDovL2Ny
bC5wa2kuZ29vZy9HVFNHSUFHMy5jcmwwDQYJKoZIhvcNAQELBQADggEBAKFbmNOA
e3pJ7UVI5EmkAMZgSDRdrsLHV6F7WluuyYCyE/HFpZjBd6y8xgGtYWcask6edwrq
zrcXNEN/GY34AYre0M+p0xAs+lKSwkrJd2sCgygmzsBFtGwjW6lhjm+rg83zPHhH
mQZ0ShUR1Kp4TvzXgxj44RXOsS5ZyDe3slGiG4aw/hl+igO8Y8JMvcv/Tpzo+V75
BkDAFmLRi08NayfeyCqK/TcRpzxKMKhS7jEHK8Pzu5P+FyFHKqIsobi+BA+psOix
5nZLhrweLdKNz387mE2lSSKzr7qeLGHSOMt+ajQtZio4YVyZqJvg4Y++J0n5+Rjw
MXp8GrvTfn1DQ+o=
-----END CERTIFICATE-----
"""
EC_CERT_EXAMPLE = parse_certificate(EC_CERT_STR)
ECDSA_PRIME256V1_CERT_STR = """
-----BEGIN CERTIFICATE-----
MIICUTCCAfYCCQCvH7H/e2nuiDAKBggqhkjOPQQDAjCBrzELMAkGA1UEBhMCVVMx
EzARBgNVBAgMCkNhbGlmb3JuaWExEjAQBgNVBAcMCUxvcyBHYXRvczEjMCEGA1UE
CgwaTGVtdXJUcnVzdCBFbnRlcnByaXNlcyBMdGQxJjAkBgNVBAsMHVVuaXR0ZXN0
aW5nIE9wZXJhdGlvbnMgQ2VudGVyMSowKAYDVQQDDCFMZW11clRydXN0IFVuaXR0
ZXN0cyBSb290IENBIDIwMTkwHhcNMTkwMjI2MTgxMTUyWhcNMjkwMjIzMTgxMTUy
WjCBrzELMAkGA1UEBhMCVVMxEzARBgNVBAgMCkNhbGlmb3JuaWExEjAQBgNVBAcM
CUxvcyBHYXRvczEjMCEGA1UECgwaTGVtdXJUcnVzdCBFbnRlcnByaXNlcyBMdGQx
JjAkBgNVBAsMHVVuaXR0ZXN0aW5nIE9wZXJhdGlvbnMgQ2VudGVyMSowKAYDVQQD
DCFMZW11clRydXN0IFVuaXR0ZXN0cyBSb290IENBIDIwMTkwWTATBgcqhkjOPQIB
BggqhkjOPQMBBwNCAAQsnAVUtpDCFMK/k9Chynu8BWRVUBUYbGQ9Q9xeLR60J4fD
uBt48YpTqg5RMZEclVknMReXqTmqphOBo37/YVdlMAoGCCqGSM49BAMCA0kAMEYC
IQDQZ6xfBiCTHxY4GM4+zLeG1iPBUSfIJOjkFNViFZY/XAIhAJYmrkVQb/YjWCdd
Vl89McYhmV4IV7WDgUmUhkUSFXgy
-----END CERTIFICATE-----
"""
ECDSA_PRIME256V1_CERT = parse_certificate(ECDSA_PRIME256V1_CERT_STR)
ECDSA_SECP384r1_CERT_STR = """
-----BEGIN CERTIFICATE-----
MIICjjCCAhMCCQD2UadeQ7ub1jAKBggqhkjOPQQDAjCBrzELMAkGA1UEBhMCVVMx
EzARBgNVBAgMCkNhbGlmb3JuaWExEjAQBgNVBAcMCUxvcyBHYXRvczEjMCEGA1UE
CgwaTGVtdXJUcnVzdCBFbnRlcnByaXNlcyBMdGQxJjAkBgNVBAsMHVVuaXR0ZXN0
aW5nIE9wZXJhdGlvbnMgQ2VudGVyMSowKAYDVQQDDCFMZW11clRydXN0IFVuaXR0
ZXN0cyBSb290IENBIDIwMTgwHhcNMTkwMjI2MTgxODU2WhcNMjkwMjIzMTgxODU2
WjCBrzELMAkGA1UEBhMCVVMxEzARBgNVBAgMCkNhbGlmb3JuaWExEjAQBgNVBAcM
CUxvcyBHYXRvczEjMCEGA1UECgwaTGVtdXJUcnVzdCBFbnRlcnByaXNlcyBMdGQx
JjAkBgNVBAsMHVVuaXR0ZXN0aW5nIE9wZXJhdGlvbnMgQ2VudGVyMSowKAYDVQQD
DCFMZW11clRydXN0IFVuaXR0ZXN0cyBSb290IENBIDIwMTgwdjAQBgcqhkjOPQIB
BgUrgQQAIgNiAARuKyHIRp2e6PB5UcY8L/bUdavkL5Zf3IegNKvaAsvkDenhDGAI
zwWgsk3rOo7jmpMibn7yJQn404uZovwyeKcApn8uVv8ltheeYAx+ySzzn/APxNGy
cye/nv1D9cDW628wCgYIKoZIzj0EAwIDaQAwZgIxANl1ljDH4ykNK2OaRqKOkBOW
cKk1SvtiEZDS/wytiZGCeaxYteSYF+3GE8V2W1geWAIxAI8D7DY0HU5zw+oxAlTD
Uw/TeHA6q0QV4otPvrINW3V09iXDwFSPe265fTkHSfT6hQ==
-----END CERTIFICATE-----
"""
ECDSA_SECP384r1_CERT = parse_certificate(ECDSA_SECP384r1_CERT_STR)
DSA_CERT_STR = """
-----BEGIN CERTIFICATE-----
MIIDmTCCA1YCCQD5h/cM7xYO9jALBglghkgBZQMEAwIwga8xCzAJBgNVBAYTAlVT
MRMwEQYDVQQIDApDYWxpZm9ybmlhMRIwEAYDVQQHDAlMb3MgR2F0b3MxIzAhBgNV
BAoMGkxlbXVyVHJ1c3QgRW50ZXJwcmlzZXMgTHRkMSYwJAYDVQQLDB1Vbml0dGVz
dGluZyBPcGVyYXRpb25zIENlbnRlcjEqMCgGA1UEAwwhTGVtdXJUcnVzdCBVbml0
dGVzdHMgUm9vdCBDQSAyMDE4MB4XDTE5MDIyNjE4MjUyMloXDTI5MDIyMzE4MjUy
Mlowga8xCzAJBgNVBAYTAlVTMRMwEQYDVQQIDApDYWxpZm9ybmlhMRIwEAYDVQQH
DAlMb3MgR2F0b3MxIzAhBgNVBAoMGkxlbXVyVHJ1c3QgRW50ZXJwcmlzZXMgTHRk
MSYwJAYDVQQLDB1Vbml0dGVzdGluZyBPcGVyYXRpb25zIENlbnRlcjEqMCgGA1UE
AwwhTGVtdXJUcnVzdCBVbml0dGVzdHMgUm9vdCBDQSAyMDE4MIIBtjCCASsGByqG
SM44BAEwggEeAoGBAO2+6wO20rn9K7RtXJ7/kCSVFzYZsY1RKvmJ6BBkMFIepBkz
2pk62tRhJgNH07GKF7pyTPRRKqt38CaPK4ERUpavx3Ok6vZ3PKq8tMac/PMKBmT1
Xfpch54KDlCdreEMJqYiCwbIyiSCR4+PCH+7xC5Uh0PIZo6otNWe3Wkk53CfAhUA
8d4YAtto6D30f7qkEa7DMAccUS8CgYAiv8r0k0aUEaeioblcCAjmhvE0v8/tD5u1
anHO4jZIIv7uOrNFIGfqcNEOBs5AQkt5Bxn6x0b/VvtZ0FSrD0j4f36pTgro6noG
/0oRt0JngxsMSfo0LV4+bY62v21A0SneNgTgY+ugdfgGWvb0+9tpsIhiY69T+7c8
Oa0S6OWSPAOBhAACgYB5wa+nJJNZPoTWFum27JlWGYLO2flg5EpWlOvcEE0o5RfB
FPnMM033kKQQEI0YpCAq9fIMKhhUMk1X4mKUBUTt+Nrn1pY2l/wt5G6AQdHI8QXz
P1ecBbHPNZtWe3iVnfOgz/Pd8tU9slcXP9z5XbZ7R/oGcF/TPRTtbLEkYZNaDDAL
BglghkgBZQMEAwIDMAAwLQIVANubSNMSLt8plN9ZV3cp4pe3lMYCAhQPLLE7rTgm
92X+hWfyz000QEpYEQ==
-----END CERTIFICATE-----
"""
DSA_CERT = parse_certificate(DSA_CERT_STR)

View File

@ -4,4 +4,5 @@ flake8==3.5.0 # flake8 3.6.0 is giving erroneous "W605 invalid escape sequence"
pre-commit
invoke
twine
nodeenv
nodeenv
pyyaml>=4.2b1

View File

@ -14,7 +14,6 @@ flake8==3.5.0
identify==1.3.0 # via pre-commit
idna==2.8 # via requests
importlib-metadata==0.8 # via pre-commit
importlib-resources==1.0.2 # via pre-commit
invoke==1.2.0
mccabe==0.6.1 # via flake8
nodeenv==1.3.3
@ -23,7 +22,7 @@ pre-commit==1.14.4
pycodestyle==2.3.1 # via flake8
pyflakes==1.6.0 # via flake8
pygments==2.3.1 # via readme-renderer
pyyaml==3.13 # via aspy.yaml, pre-commit
pyyaml==5.1b1
readme-renderer==24.0 # via twine
requests-toolbelt==0.9.1 # via twine
requests==2.21.0 # via requests-toolbelt, twine

View File

@ -17,10 +17,11 @@ babel==2.6.0 # via sphinx
bcrypt==3.1.6
billiard==3.5.0.5
blinker==1.4
boto3==1.9.101
botocore==1.12.101
boto3==1.9.102
botocore==1.12.102
celery[redis]==4.2.1
certifi==2018.11.29
certsrv==2.1.1
cffi==1.12.1
chardet==3.0.4
click==7.0
@ -75,7 +76,7 @@ pyrfc3339==1.1
python-dateutil==2.8.0
python-editor==1.0.4
pytz==2018.9
pyyaml==3.13
pyyaml==5.1b1
raven[flask]==6.10.0
redis==2.10.6
requests-toolbelt==0.9.1

View File

@ -11,3 +11,4 @@ pytest
pytest-flask
pytest-mock
requests-mock
pyyaml>=4.2b1

View File

@ -8,9 +8,9 @@ asn1crypto==0.24.0 # via cryptography
atomicwrites==1.3.0 # via pytest
attrs==18.2.0 # via pytest
aws-xray-sdk==0.95 # via moto
boto3==1.9.101 # via moto
boto3==1.9.102 # via moto
boto==2.49.0 # via moto
botocore==1.12.101 # via boto3, moto, s3transfer
botocore==1.12.102 # via boto3, moto, s3transfer
certifi==2018.11.29 # via requests
cffi==1.12.1 # via cryptography
chardet==3.0.4 # via requests
@ -50,7 +50,7 @@ pytest==4.3.0
python-dateutil==2.8.0 # via botocore, faker, freezegun, moto
python-jose==2.0.2 # via moto
pytz==2018.9 # via moto
pyyaml==3.13 # via pyaml
pyyaml==5.1b1
requests-mock==1.5.2
requests==2.21.0 # via aws-xray-sdk, docker, moto, requests-mock, responses
responses==0.10.5 # via moto

View File

@ -8,6 +8,7 @@ boto3
botocore
celery[redis]
certifi
certsrv
CloudFlare
cryptography
dnspython3
@ -44,3 +45,4 @@ six
SQLAlchemy-Utils
tabulate
xmltodict
pyyaml>=4.2b1 #high severity alert

View File

@ -15,10 +15,11 @@ asyncpool==1.0
bcrypt==3.1.6 # via flask-bcrypt, paramiko
billiard==3.5.0.5 # via celery
blinker==1.4 # via flask-mail, flask-principal, raven
boto3==1.9.101
botocore==1.12.101
boto3==1.9.102
botocore==1.12.102
celery[redis]==4.2.1
certifi==2018.11.29
certsrv==2.1.1
cffi==1.12.1 # via bcrypt, cryptography, pynacl
chardet==3.0.4 # via requests
click==7.0 # via flask
@ -70,7 +71,7 @@ python-dateutil==2.8.0 # via alembic, arrow, botocore
python-editor==1.0.4 # via alembic
python-ldap==3.1.0
pytz==2018.9 # via acme, celery, flask-restful, pyrfc3339
pyyaml==3.13 # via cloudflare
pyyaml==5.1b1
raven[flask]==6.10.0
redis==2.10.6
requests-toolbelt==0.9.1 # via acme

View File

@ -155,7 +155,9 @@ setup(
'digicert_cis_source = lemur.plugins.lemur_digicert.plugin:DigiCertCISSourcePlugin',
'csr_export = lemur.plugins.lemur_csr.plugin:CSRExportPlugin',
'sftp_destination = lemur.plugins.lemur_sftp.plugin:SFTPDestinationPlugin',
'vault_desination = lemur.plugins.lemur_vault_dest.plugin:VaultDestinationPlugin'
'vault_desination = lemur.plugins.lemur_vault_dest.plugin:VaultDestinationPlugin',
'adcs_issuer = lemur.plugins.lemur_adcs.plugin:ADCSIssuerPlugin',
'adcs_source = lemur.plugins.lemur_adcs.plugin:ADCSSourcePlugin'
],
},
classifiers=[