Azure-Dest: Linted

This commit is contained in:
sirferl 2020-11-14 13:03:27 +01:00
parent 62230228a7
commit 48302b6acc
1 changed files with 23 additions and 27 deletions

View File

@ -9,14 +9,11 @@
.. moduleauthor:: sirferl .. moduleauthor:: sirferl
""" """
import os
import re
from flask import current_app from flask import current_app
from lemur.common.defaults import common_name, country, state, location, organizational_unit, organization from lemur.common.defaults import common_name
from lemur.common.utils import parse_certificate, parse_private_key from lemur.common.utils import parse_certificate, parse_private_key
from lemur.plugins.bases import DestinationPlugin from lemur.plugins.bases import DestinationPlugin
from lemur.plugins.bases import SourcePlugin
from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives import serialization
import requests import requests
@ -26,8 +23,8 @@ import base64
def base64encode(string): def base64encode(string):
# Performs Base64 encoding of string to string using the base64.b64encode() function # Performs Base64 encoding of string to string using the base64.b64encode() function
# which encodes bytes to bytes. # which encodes bytes to bytes.
return base64.b64encode(string.encode()).decode() return base64.b64encode(string.encode()).decode()
@ -80,13 +77,13 @@ def get_access_token(tenant, appID, password, self):
# prepare the call for the access_token # prepare the call for the access_token
auth_url = f"https://login.microsoftonline.com/{tenant}/oauth2/token" auth_url = f"https://login.microsoftonline.com/{tenant}/oauth2/token"
post_data = { post_data = {
'grant_type' : 'client_credentials', 'grant_type': 'client_credentials',
'client_id' : appID, 'client_id': appID,
'client_secret' : password, 'client_secret': password,
'resource' : 'https://vault.azure.net' 'resource': 'https://vault.azure.net'
} }
try: try:
response = self.session.post(auth_url, data = post_data) response = self.session.post(auth_url, data=post_data)
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
current_app.logger.exception(f"AZURE: Error for POST {e}") current_app.logger.exception(f"AZURE: Error for POST {e}")
@ -139,7 +136,6 @@ class AzureDestinationPlugin(DestinationPlugin):
self.session = requests.Session() self.session = requests.Session()
super(AzureDestinationPlugin, self).__init__(*args, **kwargs) super(AzureDestinationPlugin, self).__init__(*args, **kwargs)
def upload(self, name, body, private_key, cert_chain, options, **kwargs): def upload(self, name, body, private_key, cert_chain, options, **kwargs):
""" """
Upload certificate and private key Upload certificate and private key
@ -152,18 +148,18 @@ class AzureDestinationPlugin(DestinationPlugin):
# we use the common name to identify the certificate # we use the common name to identify the certificate
# Azure does not allow "." in the certificate name we replace them with "-" # Azure does not allow "." in the certificate name we replace them with "-"
cert = parse_certificate(body) cert = parse_certificate(body)
certificate_name = common_name(cert).replace(".","-") certificate_name = common_name(cert).replace(".", "-")
vault_URI = self.get_option("vaultUrl", options) vault_URI = self.get_option("vaultUrl", options)
tenant = self.get_option("azureTenant", options) tenant = self.get_option("azureTenant", options)
app_id = self.get_option("appID", options) app_id = self.get_option("appID", options)
password = self.get_option("azurePassword", options) password = self.get_option("azurePassword", options)
access_token = get_access_token(tenant, app_id, password, self) access_token = get_access_token(tenant, app_id, password, self)
cert_url = f"{vault_URI}/certificates/{certificate_name}/import?api-version=7.1" cert_url = f"{vault_URI}/certificates/{certificate_name}/import?api-version=7.1"
post_header = { post_header = {
"Authorization" : f"Bearer {access_token}" "Authorization": f"Bearer {access_token}"
} }
key_pkcs8 = parse_private_key(private_key).private_bytes( key_pkcs8 = parse_private_key(private_key).private_bytes(
encoding=serialization.Encoding.PEM, encoding=serialization.Encoding.PEM,
@ -171,26 +167,26 @@ class AzureDestinationPlugin(DestinationPlugin):
encryption_algorithm=serialization.NoEncryption(), encryption_algorithm=serialization.NoEncryption(),
) )
key_pkcs8 = key_pkcs8.decode("utf-8").replace('\\n', '\n') key_pkcs8 = key_pkcs8.decode("utf-8").replace('\\n', '\n')
cert_package = f"{body}\n{key_pkcs8}" cert_package = f"{body}\n{key_pkcs8}"
current_app.logger.debug(f"AZURE: encoded certificate: {cert_package}") current_app.logger.debug(f"AZURE: encoded certificate: {cert_package}")
post_body = { post_body = {
"value" : cert_package, "value": cert_package,
"policy" : { "policy": {
"key_props": { "key_props": {
"exportable" : True, "exportable": True,
"kty" : "RSA", "kty": "RSA",
"key_size" : 2048, "key_size": 2048,
"reuse_key" : True "reuse_key": True
}, },
"secret_props": { "secret_props":{
"contentType": "application/x-pem-file" "contentType": "application/x-pem-file"
} }
} }
} }
try: try:
response = self.session.post(cert_url, headers = post_header, json = post_body) response = self.session.post(cert_url, headers=post_header, json=post_body)
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
current_app.logger.exception(f"AZURE: Error for POST {e}") current_app.logger.exception(f"AZURE: Error for POST {e}")
treturn_value = handle_response(response) treturn_value = handle_response(response)