Initial work allowing certificates to be revoked. (#941)

* Initial work allowing for certificates to be revoked.
This commit is contained in:
kevgliss
2017-09-28 18:27:56 -07:00
committed by GitHub
parent ea6f5c920b
commit bb08b1e637
23 changed files with 286 additions and 47 deletions

View File

@@ -21,3 +21,6 @@ class IssuerPlugin(Plugin):
def create_authority(self, options):
raise NotImplementedError
def revoke_certificate(self, certificate, comments):
raise NotImplementedError

View File

@@ -49,7 +49,8 @@ class CfsslIssuerPlugin(IssuerPlugin):
response_json = json.loads(response.content.decode('utf_8'))
cert = response_json['result']['certificate']
return cert, current_app.config.get('CFSSL_INTERMEDIATE'),
# TODO add external ID
return cert, current_app.config.get('CFSSL_INTERMEDIATE'), None
@staticmethod
def create_authority(options):

View File

@@ -187,7 +187,7 @@ class CryptographyIssuerPlugin(IssuerPlugin):
"""
current_app.logger.debug("Issuing new cryptography certificate with options: {0}".format(options))
cert_pem, chain_cert_pem = issue_certificate(csr, options)
return cert_pem, chain_cert_pem
return cert_pem, chain_cert_pem, None
@staticmethod
def create_authority(options):

View File

@@ -312,7 +312,17 @@ class DigiCertIssuerPlugin(IssuerPlugin):
# retrieve certificate
certificate_url = "{0}/services/v2/certificate/{1}/download/format/pem_all".format(base_url, certificate_id)
end_entity, intermediate, root = pem.parse(self.session.get(certificate_url).content)
return "\n".join(str(end_entity).splitlines()), "\n".join(str(intermediate).splitlines())
return "\n".join(str(end_entity).splitlines()), "\n".join(str(intermediate).splitlines()), certificate_id
def revoke_certificate(self, certificate, comments):
"""Revoke a Digicert certificate."""
base_url = current_app.config.get('DIGICERT_URL')
# make certificate revoke request
create_url = '{0}/certificate/{1}/revoke'.format(base_url, certificate.external_id)
metrics.send('digicert_revoke_certificate', 'counter', 1)
response = self.session.put(create_url, data=json.dumps({'comments': comments}))
return handle_response(response)
@staticmethod
def create_authority(options):
@@ -379,7 +389,22 @@ class DigiCertCISIssuerPlugin(IssuerPlugin):
self.session.headers.pop('Accept')
end_entity = pem.parse(certificate_pem)[0]
return "\n".join(str(end_entity).splitlines()), current_app.config.get('DIGICERT_CIS_INTERMEDIATE')
return "\n".join(str(end_entity).splitlines()), current_app.config.get('DIGICERT_CIS_INTERMEDIATE'), data['id']
def revoke_certificate(self, certificate, comments):
"""Revoke a Digicert certificate."""
base_url = current_app.config.get('DIGICERT_CIS_URL')
# make certificate revoke request
revoke_url = '{0}/platform/cis/certificate/{1}/revoke'.format(base_url, certificate.external_id)
metrics.send('digicert_revoke_certificate_success', 'counter', 1)
response = self.session.put(revoke_url, data=json.dumps({'comments': comments}))
if response.status_code != 204:
metrics.send('digicert_revoke_certificate_failure', 'counter', 1)
raise Exception('Failed to revoke certificate.')
metrics.send('digicert_revoke_certificate_success', 'counter', 1)
@staticmethod
def create_authority(options):

View File

@@ -171,7 +171,7 @@ ghi
adapter.register_uri('GET', 'mock://www.digicert.com/services/v2/certificate/cert123/download/format/pem_all', text=pem_fixture)
subject.session.mount('mock', adapter)
cert, intermediate = subject.create_certificate("", {'common_name': 'test.com'})
cert, intermediate, external_id = subject.create_certificate("", {'common_name': 'test.com'})
assert cert == "-----BEGIN CERTIFICATE-----\nabc\n-----END CERTIFICATE-----"
assert intermediate == "-----BEGIN CERTIFICATE-----\ndef\n-----END CERTIFICATE-----"

View File

@@ -195,7 +195,8 @@ class VerisignIssuerPlugin(IssuerPlugin):
response = self.session.post(url, data=data)
cert = handle_response(response.content)['Response']['Certificate']
return cert, current_app.config.get('VERISIGN_INTERMEDIATE'),
# TODO add external id
return cert, current_app.config.get('VERISIGN_INTERMEDIATE'), None
@staticmethod
def create_authority(options):