2015-06-22 22:47:27 +02:00
|
|
|
"""
|
2015-07-23 17:52:56 +02:00
|
|
|
.. module: lemur.plugins.lemur_aws.iam
|
2015-06-22 22:47:27 +02:00
|
|
|
:platform: Unix
|
|
|
|
:synopsis: Contains helper functions for interactive with AWS IAM Apis.
|
|
|
|
:copyright: (c) 2015 by Netflix Inc., see AUTHORS for more
|
|
|
|
:license: Apache, see LICENSE for more details.
|
|
|
|
.. moduleauthor:: Kevin Glisson <kglisson@netflix.com>
|
|
|
|
"""
|
2015-07-11 02:06:57 +02:00
|
|
|
from lemur.plugins.lemur_aws.sts import assume_service
|
|
|
|
|
|
|
|
|
|
|
|
def get_name_from_arn(arn):
|
|
|
|
"""
|
|
|
|
Extract the certificate name from an arn.
|
|
|
|
|
|
|
|
:param arn: IAM SSL arn
|
|
|
|
:return: name of the certificate as uploaded to AWS
|
|
|
|
"""
|
|
|
|
return arn.split("/", 1)[1]
|
2015-06-22 22:47:27 +02:00
|
|
|
|
|
|
|
|
|
|
|
def upload_cert(account_number, cert, private_key, cert_chain=None):
|
|
|
|
"""
|
|
|
|
Upload a certificate to AWS
|
|
|
|
|
|
|
|
:param account_number:
|
|
|
|
:param cert:
|
|
|
|
:param private_key:
|
|
|
|
:param cert_chain:
|
|
|
|
:return:
|
|
|
|
"""
|
2015-07-21 22:06:13 +02:00
|
|
|
return assume_service(account_number, 'iam').upload_server_cert(cert.name, str(cert.body), str(private_key),
|
|
|
|
cert_chain=str(cert_chain))
|
2015-06-22 22:47:27 +02:00
|
|
|
|
|
|
|
|
|
|
|
def delete_cert(account_number, cert):
|
|
|
|
"""
|
|
|
|
Delete a certificate from AWS
|
|
|
|
|
|
|
|
:param account_number:
|
|
|
|
:param cert:
|
|
|
|
:return:
|
|
|
|
"""
|
|
|
|
return assume_service(account_number, 'iam').delete_server_cert(cert.name)
|
|
|
|
|
|
|
|
|
|
|
|
def get_all_server_certs(account_number):
|
|
|
|
"""
|
|
|
|
Use STS to fetch all of the SSL certificates from a given account
|
|
|
|
|
|
|
|
:param account_number:
|
|
|
|
"""
|
|
|
|
marker = None
|
|
|
|
certs = []
|
|
|
|
while True:
|
|
|
|
response = assume_service(account_number, 'iam').get_all_server_certs(marker=marker)
|
|
|
|
result = response['list_server_certificates_response']['list_server_certificates_result']
|
|
|
|
|
|
|
|
for cert in result['server_certificate_metadata_list']:
|
|
|
|
certs.append(cert)
|
|
|
|
|
|
|
|
if result['is_truncated'] == 'true':
|
|
|
|
marker = result['marker']
|
|
|
|
else:
|
|
|
|
return certs
|
|
|
|
|
|
|
|
|
|
|
|
def get_cert_from_arn(arn):
|
|
|
|
"""
|
|
|
|
Retrieves an SSL certificate from a given ARN.
|
|
|
|
|
|
|
|
:param arn:
|
|
|
|
:return:
|
|
|
|
"""
|
|
|
|
name = arn.split("/", 1)[1]
|
|
|
|
account_number = arn.split(":")[4]
|
|
|
|
name = name.split("/")[-1]
|
|
|
|
|
|
|
|
response = assume_service(account_number, 'iam').get_server_certificate(name.strip())
|
|
|
|
return digest_aws_cert_response(response)
|
|
|
|
|
|
|
|
|
|
|
|
def digest_aws_cert_response(response):
|
|
|
|
"""
|
|
|
|
Processes an AWS certifcate response and retrieves the certificate body and chain.
|
|
|
|
|
|
|
|
:param response:
|
|
|
|
:return:
|
|
|
|
"""
|
|
|
|
chain = None
|
|
|
|
cert = response['get_server_certificate_response']['get_server_certificate_result']['server_certificate']
|
|
|
|
body = cert['certificate_body']
|
|
|
|
|
|
|
|
if 'certificate_chain' in cert:
|
|
|
|
chain = cert['certificate_chain']
|
|
|
|
|
|
|
|
return str(body), str(chain),
|