Adding Digicert CIS Sourceplugin (#959)

* Adding necessary features to complete backfill

* Fixing pagination logic.
This commit is contained in:
kevgliss 2017-10-04 16:56:01 -07:00 committed by GitHub
parent 9e2578be1e
commit a6305a5cae
5 changed files with 91 additions and 4 deletions

View File

@ -227,7 +227,8 @@ def reissue(old_certificate_name, commit):
@manager.option('-f', '--fqdns', dest='fqdns', help='FQDNs to query. Multiple fqdns specified via comma.')
@manager.option('-i', '--issuer', dest='issuer', help='Issuer to query for.')
@manager.option('-o', '--owner', dest='owner', help='Owner to query for.')
def query(fqdns, issuer, owner):
@manager.option('-e', '--expired', dest='expired', type=bool, default=False, help='Include expired certificates.')
def query(fqdns, issuer, owner, expired):
"""Prints certificates that match the query params."""
table = []
@ -246,6 +247,9 @@ def query(fqdns, issuer, owner):
q = q.filter(Certificate.owner.ilike('%{0}%'.format(owner)))
if not expired:
q = q.filter(Certificate.expired == False) # noqa
for f in fqdns.split(','):
q = q.filter(
or_(

View File

@ -57,6 +57,15 @@ def get_by_name(name):
return database.get(Certificate, name, field='name')
def get_by_serial(serial):
"""
Retrieves certificate by it's Serial.
:param serial:
:return:
"""
return database.get(Certificate, serial, field='serial')
def delete(cert_id):
"""
Delete's a certificate.

View File

@ -339,6 +339,69 @@ class DigiCertIssuerPlugin(IssuerPlugin):
return current_app.config.get('DIGICERT_ROOT'), "", [role]
class DigiCertCISSourcePlugin(SourcePlugin):
"""Wrap the Digicert CIS Certifcate API."""
title = 'DigiCert'
slug = 'digicert-cis-source'
description = "Enables the use of Digicert as a source of existing certificates."
version = digicert.VERSION
author = 'Kevin Glisson'
author_url = 'https://github.com/netflix/lemur.git'
def __init__(self, *args, **kwargs):
"""Initialize source with appropriate details."""
required_vars = [
'DIGICERT_CIS_API_KEY',
'DIGICERT_CIS_URL',
'DIGICERT_CIS_ROOT',
'DIGICERT_CIS_INTERMEDIATE',
'DIGICERT_CIS_PROFILE_NAME'
]
validate_conf(current_app, required_vars)
self.session = requests.Session()
self.session.headers.update(
{
'X-DC-DEVKEY': current_app.config['DIGICERT_CIS_API_KEY'],
'Content-Type': 'application/json'
}
)
self.session.hooks = dict(response=log_status_code)
super(DigiCertCISSourcePlugin, self).__init__(*args, **kwargs)
def get_certificates(self, options, **kwargs):
"""Fetch all Digicert certificates."""
base_url = current_app.config.get('DIGICERT_CIS_URL')
# make request
search_url = '{0}/platform/cis/certificate/search'.format(base_url)
certs = []
page = 1
while True:
response = self.session.get(search_url, params={'status': ['issued'], 'page': page})
data = handle_cis_response(response)
for c in data['certificates']:
download_url = '{0}/platform/cis/certificate/{1}'.format(base_url, c['id'])
certificate = self.session.get(download_url)
# normalize serial
serial = str(int(c['serial_number'], 16))
cert = {'body': certificate.content, 'serial': serial, 'external_id': c['id']}
certs.append(cert)
if page == data['total_pages']:
break
page += 1
return certs
class DigiCertCISIssuerPlugin(IssuerPlugin):
"""Wrap the Digicert Certificate Issuing API."""
title = 'DigiCert CIS'

View File

@ -17,6 +17,8 @@ from lemur.endpoints import service as endpoint_service
from lemur.destinations import service as destination_service
from lemur.certificates.schemas import CertificateUploadInputSchema
from lemur.common.utils import parse_certificate
from lemur.common.defaults import serial
from lemur.plugins.base import plugins
@ -112,7 +114,15 @@ def sync_certificates(source, user):
certificates = s.get_certificates(source.options)
for certificate in certificates:
exists = certificate_service.get_by_name(certificate['name'])
if certificate.get('name'):
exists = certificate_service.get_by_name(certificate['name'])
elif certificate.get('serial'):
exists = certificate_service.get_by_serial(certificate['serial'])
else:
cert = parse_certificate(certificate['body'])
exists = certificate_service.get_by_serial(serial(cert))
if not certificate.get('owner'):
certificate['owner'] = user.email
@ -120,12 +130,12 @@ def sync_certificates(source, user):
certificate['creator'] = user
if not exists:
current_app.logger.debug("Creating Certificate. Name: {name}".format(name=certificate['name']))
certificate_create(certificate, source)
new += 1
else:
current_app.logger.debug("Updating Certificate. Name: {name}".format(name=certificate['name']))
if certificate.get('external_id'):
exists.external_id = certificate['external_id']
certificate_update(exists, source)
updated += 1

View File

@ -196,6 +196,7 @@ setup(
'cfssl_issuer = lemur.plugins.lemur_cfssl.plugin:CfsslIssuerPlugin',
'digicert_issuer = lemur.plugins.lemur_digicert.plugin:DigiCertIssuerPlugin',
'digicert_cis_issuer = lemur.plugins.lemur_digicert.plugin:DigiCertCISIssuerPlugin',
'digicert_cis_source = lemur.plugins.lemur_digicert.plugin:DigiCertCISSourcePlugin'
],
},
classifiers=[