|
|
@ -48,15 +48,10 @@ def split_cert(body):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def sign_certificate(common_name, public_key, authority_private_key, user, extensions, not_before, not_after):
|
|
|
|
def sign_certificate(common_name, public_key, authority_private_key, user, extensions, not_before, not_after):
|
|
|
|
private_key = parse_private_key(authority_private_key).private_bytes(
|
|
|
|
|
|
|
|
encoding=serialization.Encoding.PEM,
|
|
|
|
|
|
|
|
format=serialization.PrivateFormat.OpenSSH,
|
|
|
|
|
|
|
|
encryption_algorithm=serialization.NoEncryption(),
|
|
|
|
|
|
|
|
).decode()
|
|
|
|
|
|
|
|
with mktempfile() as issuer_tmp:
|
|
|
|
with mktempfile() as issuer_tmp:
|
|
|
|
cmd = ['ssh-keygen', '-s', issuer_tmp]
|
|
|
|
cmd = ['ssh-keygen', '-s', issuer_tmp]
|
|
|
|
with open(issuer_tmp, 'w') as i:
|
|
|
|
with open(issuer_tmp, 'w') as i:
|
|
|
|
i.writelines(private_key)
|
|
|
|
i.writelines(authority_private_key)
|
|
|
|
if 'extendedKeyUsage' in extensions and extensions['extendedKeyUsage'].get('useClientAuthentication'):
|
|
|
|
if 'extendedKeyUsage' in extensions and extensions['extendedKeyUsage'].get('useClientAuthentication'):
|
|
|
|
cmd.extend(['-I', user['username'] + ' user key',
|
|
|
|
cmd.extend(['-I', user['username'] + ' user key',
|
|
|
|
'-n', user['username']])
|
|
|
|
'-n', user['username']])
|
|
|
@ -68,9 +63,9 @@ def sign_certificate(common_name, public_key, authority_private_key, user, exten
|
|
|
|
cmd.extend(['-I', common_name + ' host key',
|
|
|
|
cmd.extend(['-I', common_name + ' host key',
|
|
|
|
'-n', ','.join(domains),
|
|
|
|
'-n', ','.join(domains),
|
|
|
|
'-h'])
|
|
|
|
'-h'])
|
|
|
|
# something like 20201024102030
|
|
|
|
# something like 20201024
|
|
|
|
ssh_not_before = datetime.fromisoformat(not_before).strftime("%Y%m%d%H%M%S")
|
|
|
|
ssh_not_before = datetime.fromisoformat(not_before).strftime("%Y%m%d")
|
|
|
|
ssh_not_after = datetime.fromisoformat(not_after).strftime("%Y%m%d%H%M%S")
|
|
|
|
ssh_not_after = datetime.fromisoformat(not_after).strftime("%Y%m%d")
|
|
|
|
cmd.extend(['-V', ssh_not_before + ':' + ssh_not_after])
|
|
|
|
cmd.extend(['-V', ssh_not_before + ':' + ssh_not_after])
|
|
|
|
with mktempfile() as cert_tmp:
|
|
|
|
with mktempfile() as cert_tmp:
|
|
|
|
with open(cert_tmp, 'w') as f:
|
|
|
|
with open(cert_tmp, 'w') as f:
|
|
|
@ -107,8 +102,6 @@ class OpenSSHIssuerPlugin(CryptographyIssuerPlugin):
|
|
|
|
return cert_pem, private_key, chain_cert_pem, roles
|
|
|
|
return cert_pem, private_key, chain_cert_pem, roles
|
|
|
|
|
|
|
|
|
|
|
|
def wrap_certificate(self, cert):
|
|
|
|
def wrap_certificate(self, cert):
|
|
|
|
if 'body' not in cert:
|
|
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
# get public_key in OpenSSH format
|
|
|
|
# get public_key in OpenSSH format
|
|
|
|
public_key = parse_certificate(cert['body']).public_key().public_bytes(
|
|
|
|
public_key = parse_certificate(cert['body']).public_key().public_bytes(
|
|
|
|
encoding=serialization.Encoding.OpenSSH,
|
|
|
|
encoding=serialization.Encoding.OpenSSH,
|
|
|
@ -116,11 +109,10 @@ class OpenSSHIssuerPlugin(CryptographyIssuerPlugin):
|
|
|
|
).decode()
|
|
|
|
).decode()
|
|
|
|
public_key += ' ' + cert['user']['email']
|
|
|
|
public_key += ' ' + cert['user']['email']
|
|
|
|
# sign it with authority private key
|
|
|
|
# sign it with authority private key
|
|
|
|
if 'root_authority' in cert and cert['root_authority']:
|
|
|
|
if 'root_authority' in cert:
|
|
|
|
authority = cert['root_authority']
|
|
|
|
root_authority = cert['root_authority']
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
authority = cert['authority']
|
|
|
|
root_authority = get_by_root_authority(cert['authority']['id'])
|
|
|
|
root_authority = get_by_root_authority(authority['id'])
|
|
|
|
|
|
|
|
authority_private_key = root_authority.private_key
|
|
|
|
authority_private_key = root_authority.private_key
|
|
|
|
cert['body'] = sign_certificate(
|
|
|
|
cert['body'] = sign_certificate(
|
|
|
|
cert['common_name'],
|
|
|
|
cert['common_name'],
|
|
|
|