Check that stored certificate chain matches certificate
Similar to how the private key is checked.
This commit is contained in:
@ -192,12 +192,16 @@ class Certificate(db.Model):
|
||||
|
||||
def check_integrity(self):
|
||||
"""
|
||||
Integrity checks: Does the cert have a matching private key?
|
||||
Integrity checks: Does the cert have a valid chain and matching private key?
|
||||
"""
|
||||
if self.private_key:
|
||||
validators.verify_private_key_match(utils.parse_private_key(self.private_key), self.parsed_cert,
|
||||
error_class=AssertionError)
|
||||
|
||||
if self.chain:
|
||||
chain = [self.parsed_cert] + utils.parse_cert_chain(self.chain)
|
||||
validators.verify_cert_chain(chain, error_class=AssertionError)
|
||||
|
||||
@cached_property
|
||||
def parsed_cert(self):
|
||||
assert self.body, "Certificate body not set"
|
||||
|
@ -245,8 +245,7 @@ class CertificateUploadInputSchema(CertificateCreationSchema):
|
||||
external_id = fields.String(missing=None, allow_none=True)
|
||||
private_key = fields.String()
|
||||
body = fields.String(required=True)
|
||||
chain = fields.String(validate=validators.public_certificate, missing=None,
|
||||
allow_none=True) # TODO this could be multiple certificates
|
||||
chain = fields.String(missing=None, allow_none=True)
|
||||
|
||||
destinations = fields.Nested(AssociatedDestinationSchema, missing=[], many=True)
|
||||
notifications = fields.Nested(AssociatedNotificationSchema, missing=[], many=True)
|
||||
@ -260,7 +259,7 @@ class CertificateUploadInputSchema(CertificateCreationSchema):
|
||||
raise ValidationError('Destinations require private key.')
|
||||
|
||||
@validates_schema
|
||||
def validate_cert_private_key(self, data):
|
||||
def validate_cert_private_key_chain(self, data):
|
||||
cert = None
|
||||
key = None
|
||||
if data.get('body'):
|
||||
@ -279,6 +278,15 @@ class CertificateUploadInputSchema(CertificateCreationSchema):
|
||||
# Throws ValidationError
|
||||
validators.verify_private_key_match(key, cert)
|
||||
|
||||
if data.get('chain'):
|
||||
try:
|
||||
chain = utils.parse_cert_chain(data['chain'])
|
||||
except ValueError:
|
||||
raise ValidationError("Invalid certificate in certificate chain.", field_names=['chain'])
|
||||
|
||||
# Throws ValidationError
|
||||
validators.verify_cert_chain([cert] + chain)
|
||||
|
||||
|
||||
class CertificateExportInputSchema(LemurInputSchema):
|
||||
plugin = fields.Nested(PluginInputSchema)
|
||||
|
Reference in New Issue
Block a user