diff --git a/lemur/plugins/lemur_acme/ultradns.py b/lemur/plugins/lemur_acme/ultradns.py index 40661740..dcf3e3c6 100644 --- a/lemur/plugins/lemur_acme/ultradns.py +++ b/lemur/plugins/lemur_acme/ultradns.py @@ -91,7 +91,7 @@ def get_ultradns_token(): "password": current_app.config.get("ACME_ULTRADNS_PASSWORD", ""), } base_uri = current_app.config.get("ACME_ULTRADNS_DOMAIN", "") - resp = requests.post("{0}{1}".format(base_uri, path), data=data, verify=True) + resp = requests.post(f"{base_uri}{path}", data=data, verify=True) return resp.json()["access_token"] @@ -102,7 +102,7 @@ def _generate_header(): Contains the Authorization access_key obtained from the get_ultradns_token() function. """ access_token = get_ultradns_token() - return {"Authorization": "Bearer {}".format(access_token), "Content-Type": "application/json"} + return {"Authorization": f"Bearer {access_token}", "Content-Type": "application/json"} def _paginate(path, key): @@ -120,7 +120,7 @@ def _get(path, params=None): """Function to execute a GET request on the given URL (base_uri + path) with given params""" base_uri = current_app.config.get("ACME_ULTRADNS_DOMAIN", "") resp = requests.get( - "{0}{1}".format(base_uri, path), + f"{base_uri}{path}", headers=_generate_header(), params=params, verify=True, @@ -133,7 +133,7 @@ def _delete(path): """Function to execute a DELETE request on the given URL""" base_uri = current_app.config.get("ACME_ULTRADNS_DOMAIN", "") resp = requests.delete( - "{0}{1}".format(base_uri, path), + f"{base_uri}{path}", headers=_generate_header(), verify=True, ) @@ -144,7 +144,7 @@ def _post(path, params): """Executes a POST request on given URL. Body is sent in JSON format""" base_uri = current_app.config.get("ACME_ULTRADNS_DOMAIN", "") resp = requests.post( - "{0}{1}".format(base_uri, path), + f"{base_uri}{path}", headers=_generate_header(), data=json.dumps(params), verify=True, @@ -168,13 +168,13 @@ def _has_dns_propagated(name, token, domain): txt_records.append(txt_record.decode("utf-8")) except dns.exception.DNSException: function = sys._getframe().f_code.co_name - metrics.send("{}.fail".format(function), "counter", 1) + metrics.send(f"{function}.fail", "counter", 1) return False for txt_record in txt_records: if txt_record == token: function = sys._getframe().f_code.co_name - metrics.send("{}.success".format(function), "counter", 1) + metrics.send(f"{function}.success", "counter", 1) return True return False @@ -216,11 +216,11 @@ def wait_for_dns_change(change_id, account_number=None): } current_app.logger.debug(log_data) if status: - metrics.send("{}.success".format(function), "counter", 1) + metrics.send(f"{function}.success", "counter", 1) break time.sleep(10) if not status: - metrics.send("{}.fail".format, "counter", 1, metric_tags={"fqdn": fqdn, "txt_record": token}) + metrics.send(f"{function}.fail", "counter", 1, metric_tags={"fqdn": fqdn, "txt_record": token}) sentry.captureException(extra={"fqdn": str(fqdn), "txt_record": str(token)}) return @@ -253,8 +253,8 @@ def get_zone_name(domain, account_number): zone_name = z if not zone_name: function = sys._getframe().f_code.co_name - metrics.send("{}.fail".format(function), "counter", 1) - raise Exception("No UltraDNS zone found for domain: {}".format(domain)) + metrics.send(f"{function}.fail", "counter", 1) + raise Exception(f"No UltraDNS zone found for domain: {domain}") return zone_name @@ -274,12 +274,12 @@ def create_txt_record(domain, token, account_number): zone_name = get_zone_name(domain, account_number) zone_parts = len(zone_name.split(".")) node_name = ".".join(domain.split(".")[:-zone_parts]) - fqdn = "{0}.{1}".format(node_name, zone_name) - path = "/v2/zones/{0}/rrsets/TXT/{1}".format(zone_name, node_name) + fqdn = f"{node_name}.{zone_name}" + path = f"/v2/zones/{zone_name}/rrsets/TXT/{node_name}" params = { "ttl": 5, "rdata": [ - "{}".format(token) + f"{token}" ], } @@ -334,19 +334,19 @@ def delete_txt_record(change_id, account_number, domain, token): zone_name = get_zone_name(domain, account_number) zone_parts = len(zone_name.split(".")) node_name = ".".join(domain.split(".")[:-zone_parts]) - path = "/v2/zones/{}/rrsets/16/{}".format(zone_name, node_name) + path = f"/v2/zones/{zone_name}/rrsets/16/{node_name}" try: rrsets = _get(path) record = Record(rrsets) except Exception as e: function = sys._getframe().f_code.co_name - metrics.send("{}.geterror".format(function), "counter", 1) + metrics.send(f"{function}.geterror", "counter", 1) # No Text Records remain or host is not in the zone anymore because all records have been deleted. return try: # Remove the record from the RRSet locally - record.rdata.remove("{}".format(token)) + record.rdata.remove(f"{token}") except ValueError: function = sys._getframe().f_code.co_name log_data = { @@ -394,7 +394,7 @@ def delete_acme_txt_records(domain): zone_name = get_zone_name(domain) zone_parts = len(zone_name.split(".")) node_name = ".".join(domain.split(".")[:-zone_parts]) - path = "/v2/zones/{}/rrsets/16/{}".format(zone_name, node_name) + path = f"/v2/zones/{zone_name}/rrsets/16/{node_name}" _delete(path) @@ -420,7 +420,7 @@ def get_authoritative_nameserver(domain): rcode = response.rcode() if rcode != dns.rcode.NOERROR: function = sys._getframe().f_code.co_name - metrics.send("{}.error".format(function), "counter", 1) + metrics.send(f"{function}.error", "counter", 1) if rcode == dns.rcode.NXDOMAIN: raise Exception("%s does not exist." % sub) else: