Fix LetsEncrypt Dyn flow for duplicate CN/SAN
This commit is contained in:
parent
42ffeda90d
commit
7f88c24e83
|
@ -12,6 +12,7 @@ import string
|
||||||
import sqlalchemy
|
import sqlalchemy
|
||||||
from cryptography import x509
|
from cryptography import x509
|
||||||
from cryptography.hazmat.backends import default_backend
|
from cryptography.hazmat.backends import default_backend
|
||||||
|
from cryptography.hazmat.primitives import hashes
|
||||||
from cryptography.hazmat.primitives.asymmetric import rsa, ec
|
from cryptography.hazmat.primitives.asymmetric import rsa, ec
|
||||||
from cryptography.hazmat.primitives.serialization import load_pem_private_key
|
from cryptography.hazmat.primitives.serialization import load_pem_private_key
|
||||||
from flask_restful.reqparse import RequestParser
|
from flask_restful.reqparse import RequestParser
|
||||||
|
@ -226,3 +227,13 @@ def truthiness(s):
|
||||||
"""If input string resembles something truthy then return True, else False."""
|
"""If input string resembles something truthy then return True, else False."""
|
||||||
|
|
||||||
return s.lower() in ('true', 'yes', 'on', 't', '1')
|
return s.lower() in ('true', 'yes', 'on', 't', '1')
|
||||||
|
|
||||||
|
|
||||||
|
def find_matching_certificates_by_hash(cert, matching_certs):
|
||||||
|
"""Given a Cryptography-formatted certificate cert, and Lemur-formatted certificates (matching_certs),
|
||||||
|
determine if any of the certificate hashes match and return the matches."""
|
||||||
|
matching = []
|
||||||
|
for c in matching_certs:
|
||||||
|
if parse_certificate(c).fingerprint(hashes.SHA256()) == cert.body.fingerprint(hashes.SHA256()):
|
||||||
|
matching.append(c)
|
||||||
|
return matching
|
||||||
|
|
|
@ -5,7 +5,7 @@ import dns.exception
|
||||||
import dns.name
|
import dns.name
|
||||||
import dns.query
|
import dns.query
|
||||||
import dns.resolver
|
import dns.resolver
|
||||||
from dyn.tm.errors import DynectCreateError
|
from dyn.tm.errors import DynectCreateError, DynectGetError
|
||||||
from dyn.tm.session import DynectSession
|
from dyn.tm.session import DynectSession
|
||||||
from dyn.tm.zones import Node, Zone, get_all_zones
|
from dyn.tm.zones import Node, Zone, get_all_zones
|
||||||
from flask import current_app
|
from flask import current_app
|
||||||
|
@ -119,7 +119,11 @@ def delete_txt_record(change_id, account_number, domain, token):
|
||||||
zone = Zone(zone_name)
|
zone = Zone(zone_name)
|
||||||
node = Node(zone_name, fqdn)
|
node = Node(zone_name, fqdn)
|
||||||
|
|
||||||
all_txt_records = node.get_all_records_by_type('TXT')
|
try:
|
||||||
|
all_txt_records = node.get_all_records_by_type('TXT')
|
||||||
|
except DynectGetError:
|
||||||
|
# No Text Records remain or host is not in the zone anymore because all records have been deleted.
|
||||||
|
return
|
||||||
for txt_record in all_txt_records:
|
for txt_record in all_txt_records:
|
||||||
if txt_record.txtdata == ("{}".format(token)):
|
if txt_record.txtdata == ("{}".format(token)):
|
||||||
current_app.logger.debug("Deleting TXT record name: {0}".format(fqdn))
|
current_app.logger.debug("Deleting TXT record name: {0}".format(fqdn))
|
||||||
|
|
|
@ -17,7 +17,7 @@ from lemur.endpoints import service as endpoint_service
|
||||||
from lemur.destinations import service as destination_service
|
from lemur.destinations import service as destination_service
|
||||||
|
|
||||||
from lemur.certificates.schemas import CertificateUploadInputSchema
|
from lemur.certificates.schemas import CertificateUploadInputSchema
|
||||||
from lemur.common.utils import parse_certificate
|
from lemur.common.utils import find_matching_certificates_by_hash, parse_certificate
|
||||||
from lemur.common.defaults import serial
|
from lemur.common.defaults import serial
|
||||||
|
|
||||||
from lemur.plugins.base import plugins
|
from lemur.plugins.base import plugins
|
||||||
|
@ -126,7 +126,8 @@ def sync_certificates(source, user):
|
||||||
|
|
||||||
if not exists:
|
if not exists:
|
||||||
cert = parse_certificate(certificate['body'])
|
cert = parse_certificate(certificate['body'])
|
||||||
exists = certificate_service.get_by_serial(serial(cert))
|
matching_serials = certificate_service.get_by_serial(serial(cert))
|
||||||
|
exists = find_matching_certificates_by_hash(cert, matching_serials)
|
||||||
|
|
||||||
if not certificate.get('owner'):
|
if not certificate.get('owner'):
|
||||||
certificate['owner'] = user.email
|
certificate['owner'] = user.email
|
||||||
|
|
Loading…
Reference in New Issue