From 0149f8b0d3b9c86739000c8372d92d7ac2d2ace3 Mon Sep 17 00:00:00 2001 From: csine-nflx Date: Thu, 26 Mar 2020 22:15:10 -0700 Subject: [PATCH 1/8] add support for wildcard and naked domains to PowerDNS module --- lemur/plugins/lemur_acme/powerdns.py | 210 +++++++++++++----- .../plugins/lemur_acme/tests/test_powerdns.py | 63 +++++- 2 files changed, 207 insertions(+), 66 deletions(-) diff --git a/lemur/plugins/lemur_acme/powerdns.py b/lemur/plugins/lemur_acme/powerdns.py index a26faaac..e3f7e575 100644 --- a/lemur/plugins/lemur_acme/powerdns.py +++ b/lemur/plugins/lemur_acme/powerdns.py @@ -49,16 +49,20 @@ class Record: return self._data["name"] @property - def disabled(self): - return self._data["disabled"] + def type(self): + return self._data["type"] + + @property + def ttl(self): + return self._data["ttl"] @property def content(self): return self._data["content"] @property - def ttl(self): - return self._data["ttl"] + def disabled(self): + return self._data["disabled"] def get_zones(account_number): @@ -92,42 +96,32 @@ def get_zones(account_number): def create_txt_record(domain, token, account_number): """ Create a TXT record for the given domain and token and return a change_id tuple """ _check_conf() - zone_name = _get_zone_name(domain, account_number) - server_id = current_app.config.get("ACME_POWERDNS_SERVERID", "localhost") - zone_id = zone_name + "." - domain_id = domain + "." - path = f"/api/v1/servers/{server_id}/zones/{zone_id}" - payload = { - "rrsets": [ - { - "name": domain_id, - "type": "TXT", - "ttl": 300, - "changetype": "REPLACE", - "records": [ - { - "content": f"\"{token}\"", - "disabled": False - } - ], - "comments": [] - } - ] - } + function = sys._getframe().f_code.co_name log_data = { "function": function, "fqdn": domain, "token": token, } + + # Create new record + domain_id = domain + "." + records = [Record({'name': domain_id, 'content': f"\"{token}\"", 'disabled': False})] + + # Get current records + cur_records = _get_txt_records(domain) + for record in cur_records: + if record.content != token: + records.append(record) + try: - _patch(path, payload) - log_data["message"] = "TXT record successfully created" + _patch_txt_records(domain, account_number, records) + log_data["message"] = "TXT record(s) successfully created" current_app.logger.debug(log_data) except Exception as e: sentry.captureException() log_data["Exception"] = e - log_data["message"] = "Unable to create TXT record" + log_data["message"] = "Unable to create TXT record(s)" current_app.logger.debug(log_data) change_id = (domain, token) @@ -173,43 +167,78 @@ def wait_for_dns_change(change_id, account_number=None): def delete_txt_record(change_id, account_number, domain, token): """ Delete the TXT record for the given domain and token """ _check_conf() - zone_name = _get_zone_name(domain, account_number) - server_id = current_app.config.get("ACME_POWERDNS_SERVERID", "localhost") - zone_id = zone_name + "." - domain_id = domain + "." - path = f"/api/v1/servers/{server_id}/zones/{zone_id}" - payload = { - "rrsets": [ - { - "name": domain_id, - "type": "TXT", - "ttl": 300, - "changetype": "DELETE", - "records": [ - { - "content": f"\"{token}\"", - "disabled": False - } - ], - "comments": [] - } - ] - } + function = sys._getframe().f_code.co_name log_data = { "function": function, "fqdn": domain, - "token": token + "token": token, } - try: - _patch(path, payload) - log_data["message"] = "TXT record successfully deleted" - current_app.logger.debug(log_data) - except Exception as e: - sentry.captureException() - log_data["Exception"] = e - log_data["message"] = "Unable to delete TXT record" + + # Determine if we can delete whole RRset or just one record + cur_records = _get_txt_records(domain) + found = False + new_records = [] + for record in cur_records: + if record.content == f"\"{token}\"": + found = True + else: + new_records.append(record) + + if not found: # Record not found in DNS + log_data["message"] = "Unable to delete TXT record: TXT record not found" current_app.logger.debug(log_data) + return + + elif new_records: # Removing Record from RRSet via Patch + try: + _patch_txt_records(domain, account_number, new_records) + log_data["message"] = "TXT record successfully deleted" + current_app.logger.debug(log_data) + except Exception as e: + sentry.captureException() + log_data["Exception"] = e + log_data["message"] = "Unable to delete TXT record: patching exception" + current_app.logger.debug(log_data) + + else: # Delete current records + zone_name = _get_zone_name(domain, account_number) + server_id = current_app.config.get("ACME_POWERDNS_SERVERID", "localhost") + zone_id = zone_name + "." + domain_id = domain + "." + path = f"/api/v1/servers/{server_id}/zones/{zone_id}" + payload = { + "rrsets": [ + { + "name": domain_id, + "type": "TXT", + "ttl": 300, + "changetype": "DELETE", + "records": [ + { + "content": f"\"{token}\"", + "disabled": False + } + ], + "comments": [] + } + ] + } + function = sys._getframe().f_code.co_name + log_data = { + "function": function, + "fqdn": domain, + "token": token + } + try: + _patch(path, payload) + log_data["message"] = "TXT record successfully deleted" + current_app.logger.debug(log_data) + except Exception as e: + sentry.captureException() + log_data["Exception"] = e + log_data["message"] = "Unable to delete TXT record" + current_app.logger.debug(log_data) def _check_conf(): @@ -243,6 +272,33 @@ def _get_zone_name(domain, account_number): return zone_name +def _get_txt_records(domain): + """Retrieve TXT records for a given domain and return list of Record Objects""" + server_id = current_app.config.get("ACME_POWERDNS_SERVERID", "localhost") + + path = f"/api/v1/servers/{server_id}/search-data?q={domain}&max=100&object_type=record" + function = sys._getframe().f_code.co_name + log_data = { + "function": function + } + try: + records = _get(path) + log_data["message"] = "Retrieved TXT Records Successfully" + current_app.logger.debug(log_data) + + except Exception as e: + sentry.captureException() + log_data["message"] = "Failed to Retrieve TXT Records" + current_app.logger.debug(log_data) + raise + + txt_records = [] + for record in records: + cur_record = Record(record) + txt_records.append(cur_record) + return txt_records + + def _get(path, params=None): """ Execute a GET request on the given URL (base_uri + path) and return response as JSON object """ base_uri = current_app.config.get("ACME_POWERDNS_DOMAIN") @@ -257,6 +313,40 @@ def _get(path, params=None): return resp.json() +def _patch_txt_records(domain, account_number, records): + """Send Patch request to PowerDNS Server""" + + domain_id = domain + "." + + # Create records + txt_records = [] + for record in records: + txt_records.append( + {'content': record.content, 'disabled': record.disabled} + ) + + # Create RRSet + payload = { + "rrsets": [ + { + "name": domain_id, + "type": "TXT", + "ttl": 300, + "changetype": "REPLACE", + "records": txt_records, + "comments": [] + } + ] + } + + # Create Txt Records + server_id = current_app.config.get("ACME_POWERDNS_SERVERID", "localhost") + zone_name = _get_zone_name(domain, account_number) + zone_id = zone_name + "." + path = f"/api/v1/servers/{server_id}/zones/{zone_id}" + _patch(path, payload) + + def _patch(path, payload): """ Execute a Patch request on the given URL (base_uri + path) with given payload """ base_uri = current_app.config.get("ACME_POWERDNS_DOMAIN") diff --git a/lemur/plugins/lemur_acme/tests/test_powerdns.py b/lemur/plugins/lemur_acme/tests/test_powerdns.py index c8b0a11e..d24827bb 100644 --- a/lemur/plugins/lemur_acme/tests/test_powerdns.py +++ b/lemur/plugins/lemur_acme/tests/test_powerdns.py @@ -48,13 +48,14 @@ class TestPowerdns(unittest.TestCase): self.assertEqual(result, zone) @patch("lemur.plugins.lemur_acme.powerdns.current_app") - def test_create_txt_record(self, mock_current_app): + def test_create_txt_record_write_only(self, mock_current_app): domain = "_acme_challenge.test.example.com" zone = "test.example.com" token = "ABCDEFGHIJ" account_number = "1234567890" change_id = (domain, token) powerdns._check_conf = Mock() + powerdns._get_txt_records = Mock(return_value=[]) powerdns._get_zone_name = Mock(return_value=zone) mock_current_app.logger.debug = Mock() mock_current_app.config.get = Mock(return_value="localhost") @@ -63,24 +64,74 @@ class TestPowerdns(unittest.TestCase): "function": "create_txt_record", "fqdn": domain, "token": token, - "message": "TXT record successfully created" + "message": "TXT record(s) successfully created" } result = powerdns.create_txt_record(domain, token, account_number) mock_current_app.logger.debug.assert_called_with(log_data) self.assertEqual(result, change_id) + @patch("lemur.plugins.lemur_acme.powerdns.current_app") + def test_create_txt_record_append(self, mock_current_app): + domain = "_acme_challenge.test.example.com" + zone = "test.example.com" + token = "ABCDEFGHIJ" + account_number = "1234567890" + change_id = (domain, token) + powerdns._check_conf = Mock() + cur_token = "123456" + cur_records = [powerdns.Record({'name': domain, 'content': cur_token, 'disabled': False})] + powerdns._get_txt_records = Mock(return_value=cur_records) + powerdns._get_zone_name = Mock(return_value=zone) + mock_current_app.logger.debug = Mock() + mock_current_app.config.get = Mock(return_value="localhost") + powerdns._patch = Mock() + log_data = { + "function": "create_txt_record", + "fqdn": domain, + "token": token, + "message": "TXT record(s) successfully created" + } + expected_path = f"/api/v1/servers/localhost/zones/test.example.com." + expected_payload = { + "rrsets": [ + { + "name": domain + ".", + "type": "TXT", + "ttl": 300, + "changetype": "REPLACE", + "records": [ + { + "content": f"\"{token}\"", + "disabled": False + }, + { + "content": f"\"{cur_token}\"", + "disabled": False + } + ], + "comments": [] + } + ] + } + + result = powerdns.create_txt_record(domain, token, account_number) + mock_current_app.logger.debug.assert_called_with(log_data) + powerdns._patch.assert_called_with(expected_path, expected_payload) + self.assertEqual(result, change_id) + @patch("lemur.plugins.lemur_acme.powerdns.dnsutil") @patch("lemur.plugins.lemur_acme.powerdns.current_app") @patch("lemur.extensions.metrics") @patch("time.sleep") def test_wait_for_dns_change(self, mock_sleep, mock_metrics, mock_current_app, mock_dnsutil): domain = "_acme-challenge.test.example.com" - token = "ABCDEFG" + token1 = "ABCDEFG" + token2 = "HIJKLMN" zone_name = "test.example.com" nameserver = "1.1.1.1" - change_id = (domain, token) + change_id = (domain, token1) powerdns._check_conf = Mock() - mock_records = (token,) + mock_records = (token2, token1) mock_current_app.config.get = Mock(return_value=1) powerdns._get_zone_name = Mock(return_value=zone_name) mock_dnsutil.get_authoritative_nameserver = Mock(return_value=nameserver) @@ -114,7 +165,7 @@ class TestPowerdns(unittest.TestCase): "function": "delete_txt_record", "fqdn": domain, "token": token, - "message": "TXT record successfully deleted" + "message": "Unable to delete TXT record: TXT record not found" } powerdns.delete_txt_record(change_id, account_number, domain, token) mock_current_app.logger.debug.assert_called_with(log_data) From 0e314d00281fbdcd31769cda2335d27df22d0d0b Mon Sep 17 00:00:00 2001 From: csine-nflx Date: Fri, 27 Mar 2020 10:18:38 -0700 Subject: [PATCH 2/8] adding documentation and final cleanup --- lemur/plugins/lemur_acme/powerdns.py | 97 ++++++++++++++++--- .../plugins/lemur_acme/tests/test_powerdns.py | 2 +- 2 files changed, 82 insertions(+), 17 deletions(-) diff --git a/lemur/plugins/lemur_acme/powerdns.py b/lemur/plugins/lemur_acme/powerdns.py index e3f7e575..c988fac9 100644 --- a/lemur/plugins/lemur_acme/powerdns.py +++ b/lemur/plugins/lemur_acme/powerdns.py @@ -1,11 +1,10 @@ -import time -import requests import json import sys +import time import lemur.common.utils as utils import lemur.dns_providers.util as dnsutil - +import requests from flask import current_app from lemur.extensions import metrics, sentry @@ -17,7 +16,9 @@ REQUIRED_VARIABLES = [ class Zone: - """ This class implements a PowerDNS zone in JSON. """ + """ + This class implements a PowerDNS zone in JSON. + """ def __init__(self, _data): self._data = _data @@ -39,7 +40,9 @@ class Zone: class Record: - """ This class implements a PowerDNS record. """ + """ + This class implements a PowerDNS record. + """ def __init__(self, _data): self._data = _data @@ -66,7 +69,12 @@ class Record: def get_zones(account_number): - """Retrieve authoritative zones from the PowerDNS API and return a list""" + """ + Retrieve authoritative zones from the PowerDNS API and return a list + :param account_number: + :raise: Exception + :return: list of Zone Objects + """ _check_conf() server_id = current_app.config.get("ACME_POWERDNS_SERVERID", "localhost") path = f"/api/v1/servers/{server_id}/zones" @@ -94,7 +102,14 @@ def get_zones(account_number): def create_txt_record(domain, token, account_number): - """ Create a TXT record for the given domain and token and return a change_id tuple """ + """ + Create a TXT record for the given domain and token and return a change_id tuple + + :param domain: FQDN + :param token: challenge value + :param account_number: + :return: tuple of domain/token + """ _check_conf() function = sys._getframe().f_code.co_name @@ -130,8 +145,11 @@ def create_txt_record(domain, token, account_number): def wait_for_dns_change(change_id, account_number=None): """ - Checks the authoritative DNS Server to see if changes have propagated to DNS - Retries and waits until successful. + Checks the authoritative DNS Server to see if changes have propagated. + + :param change_id: tuple of domain/token + :param account_number: + :return: """ _check_conf() domain, token = change_id @@ -165,7 +183,15 @@ def wait_for_dns_change(change_id, account_number=None): def delete_txt_record(change_id, account_number, domain, token): - """ Delete the TXT record for the given domain and token """ + """ + Delete the TXT record for the given domain and token + + :param change_id: tuple of domain/token + :param account_number: + :param domain: FQDN + :param token: challenge to delete + :return: + """ _check_conf() function = sys._getframe().f_code.co_name @@ -242,11 +268,20 @@ def delete_txt_record(change_id, account_number, domain, token): def _check_conf(): + """ + Verifies required configuration variables are set + + :return: + """ utils.validate_conf(current_app, REQUIRED_VARIABLES) def _generate_header(): - """Generate a PowerDNS API header and return it as a dictionary""" + """ + Generate a PowerDNS API header and return it as a dictionary + + :return: Dict of header parameters + """ api_key_name = current_app.config.get("ACME_POWERDNS_APIKEYNAME") api_key = current_app.config.get("ACME_POWERDNS_APIKEY") headers = {api_key_name: api_key} @@ -254,7 +289,13 @@ def _generate_header(): def _get_zone_name(domain, account_number): - """Get most specific matching zone for the given domain and return as a String""" + """ + Get most specific matching zone for the given domain and return as a String + + :param domain: FQDN + :param account_number: + :return: FQDN of domain + """ zones = get_zones(account_number) zone_name = "" for z in zones: @@ -273,7 +314,13 @@ def _get_zone_name(domain, account_number): def _get_txt_records(domain): - """Retrieve TXT records for a given domain and return list of Record Objects""" + """ + Retrieve TXT records for a given domain and return list of Record Objects + + :param domain: FQDN + :raise: Exception + :return: list of Record objects + """ server_id = current_app.config.get("ACME_POWERDNS_SERVERID", "localhost") path = f"/api/v1/servers/{server_id}/search-data?q={domain}&max=100&object_type=record" @@ -300,7 +347,13 @@ def _get_txt_records(domain): def _get(path, params=None): - """ Execute a GET request on the given URL (base_uri + path) and return response as JSON object """ + """ + Execute a GET request on the given URL (base_uri + path) and return response as JSON object + + :param path: Relative URL path + :param params: additional parameters + :return: json response + """ base_uri = current_app.config.get("ACME_POWERDNS_DOMAIN") verify_value = current_app.config.get("ACME_POWERDNS_VERIFY", True) resp = requests.get( @@ -314,8 +367,14 @@ def _get(path, params=None): def _patch_txt_records(domain, account_number, records): - """Send Patch request to PowerDNS Server""" + """ + Send Patch request to PowerDNS Server + :param domain: FQDN + :param account_number: + :param records: List of Record objects + :return: + """ domain_id = domain + "." # Create records @@ -348,7 +407,13 @@ def _patch_txt_records(domain, account_number, records): def _patch(path, payload): - """ Execute a Patch request on the given URL (base_uri + path) with given payload """ + """ + Execute a Patch request on the given URL (base_uri + path) with given payload + + :param path: + :param payload: + :return: + """ base_uri = current_app.config.get("ACME_POWERDNS_DOMAIN") verify_value = current_app.config.get("ACME_POWERDNS_VERIFY", True) resp = requests.patch( diff --git a/lemur/plugins/lemur_acme/tests/test_powerdns.py b/lemur/plugins/lemur_acme/tests/test_powerdns.py index d24827bb..707ce9e4 100644 --- a/lemur/plugins/lemur_acme/tests/test_powerdns.py +++ b/lemur/plugins/lemur_acme/tests/test_powerdns.py @@ -79,7 +79,7 @@ class TestPowerdns(unittest.TestCase): change_id = (domain, token) powerdns._check_conf = Mock() cur_token = "123456" - cur_records = [powerdns.Record({'name': domain, 'content': cur_token, 'disabled': False})] + cur_records = [powerdns.Record({'name': domain, 'content': f"\"{cur_token}\"", 'disabled': False})] powerdns._get_txt_records = Mock(return_value=cur_records) powerdns._get_zone_name = Mock(return_value=zone) mock_current_app.logger.debug = Mock() From d6cc8a8a9a86f6c1353d9ab15c9c0391856a139f Mon Sep 17 00:00:00 2001 From: csine-nflx Date: Mon, 30 Mar 2020 09:01:28 -0700 Subject: [PATCH 3/8] fixing whitespace --- lemur/plugins/lemur_acme/powerdns.py | 1 + 1 file changed, 1 insertion(+) diff --git a/lemur/plugins/lemur_acme/powerdns.py b/lemur/plugins/lemur_acme/powerdns.py index c988fac9..1542ad7b 100644 --- a/lemur/plugins/lemur_acme/powerdns.py +++ b/lemur/plugins/lemur_acme/powerdns.py @@ -71,6 +71,7 @@ class Record: def get_zones(account_number): """ Retrieve authoritative zones from the PowerDNS API and return a list + :param account_number: :raise: Exception :return: list of Zone Objects From 6f3ba23fa0bb3f046fd76ac04cc2629dcf48eac7 Mon Sep 17 00:00:00 2001 From: csine-nflx Date: Mon, 30 Mar 2020 13:34:24 -0700 Subject: [PATCH 4/8] updating sinlge line of comments --- lemur/plugins/lemur_acme/powerdns.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lemur/plugins/lemur_acme/powerdns.py b/lemur/plugins/lemur_acme/powerdns.py index 1542ad7b..c1e5e19b 100644 --- a/lemur/plugins/lemur_acme/powerdns.py +++ b/lemur/plugins/lemur_acme/powerdns.py @@ -70,8 +70,8 @@ class Record: def get_zones(account_number): """ - Retrieve authoritative zones from the PowerDNS API and return a list - + Retrieve authoritative zones from the PowerDNS API and return a list of zones + :param account_number: :raise: Exception :return: list of Zone Objects From 5add64714883d5c07b688a50f5fc5bbc9722839f Mon Sep 17 00:00:00 2001 From: Hossein Shafagh Date: Fri, 3 Apr 2020 16:51:24 -0700 Subject: [PATCH 5/8] # emitting the count of certificates on the source --- lemur/sources/service.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lemur/sources/service.py b/lemur/sources/service.py index f4783313..408d411a 100644 --- a/lemur/sources/service.py +++ b/lemur/sources/service.py @@ -193,6 +193,11 @@ def sync_certificates(source, user): s = plugins.get(source.plugin_name) certificates = s.get_certificates(source.options) + # emitting the count of certificates on the source + metrics.send("sync_certificates_count", + "gauge", len(certificates), + metric_tags={"source": source.label}) + for certificate in certificates: exists, updated_by_hash = find_cert(certificate) From f82ec24dfaf6a1e8fb56fac3a394f2990085b711 Mon Sep 17 00:00:00 2001 From: csine-nflx Date: Sun, 5 Apr 2020 21:46:33 -0700 Subject: [PATCH 6/8] updating _get_txt_records return values and docstrings --- lemur/plugins/lemur_acme/powerdns.py | 24 +++++++++++++------ .../plugins/lemur_acme/tests/test_powerdns.py | 2 +- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/lemur/plugins/lemur_acme/powerdns.py b/lemur/plugins/lemur_acme/powerdns.py index c1e5e19b..a5d02353 100644 --- a/lemur/plugins/lemur_acme/powerdns.py +++ b/lemur/plugins/lemur_acme/powerdns.py @@ -202,7 +202,11 @@ def delete_txt_record(change_id, account_number, domain, token): "token": token, } - # Determine if we can delete whole RRset or just one record + """ + Get existing TXT records matching the domain from DNS + The token to be deleted should already exist + There may be other records with different tokens as well + """ cur_records = _get_txt_records(domain) found = False new_records = [] @@ -212,12 +216,16 @@ def delete_txt_record(change_id, account_number, domain, token): else: new_records.append(record) - if not found: # Record not found in DNS - log_data["message"] = "Unable to delete TXT record: TXT record not found" + # Since the matching token is not in DNS, there is nothing to delete + if not found: + log_data["message"] = "Unable to delete TXT record: Token not found in existing TXT records" current_app.logger.debug(log_data) return - elif new_records: # Removing Record from RRSet via Patch + # The record to delete has been found AND there are other tokens set on the same domain + # Since we only want to delete one token value from the RRSet, we need to use the Patch command to + # overwrite the current RRSet with the existing records. + elif new_records: try: _patch_txt_records(domain, account_number, new_records) log_data["message"] = "TXT record successfully deleted" @@ -228,7 +236,9 @@ def delete_txt_record(change_id, account_number, domain, token): log_data["message"] = "Unable to delete TXT record: patching exception" current_app.logger.debug(log_data) - else: # Delete current records + # The record to delete has been found AND there are no other token values set on the same domain + # Use the Delete command to delete the whole RRSet. + else: zone_name = _get_zone_name(domain, account_number) server_id = current_app.config.get("ACME_POWERDNS_SERVERID", "localhost") zone_id = zone_name + "." @@ -319,7 +329,6 @@ def _get_txt_records(domain): Retrieve TXT records for a given domain and return list of Record Objects :param domain: FQDN - :raise: Exception :return: list of Record objects """ server_id = current_app.config.get("ACME_POWERDNS_SERVERID", "localhost") @@ -336,9 +345,10 @@ def _get_txt_records(domain): except Exception as e: sentry.captureException() + log_data["Exception"] = e log_data["message"] = "Failed to Retrieve TXT Records" current_app.logger.debug(log_data) - raise + return [] txt_records = [] for record in records: diff --git a/lemur/plugins/lemur_acme/tests/test_powerdns.py b/lemur/plugins/lemur_acme/tests/test_powerdns.py index 707ce9e4..167381f2 100644 --- a/lemur/plugins/lemur_acme/tests/test_powerdns.py +++ b/lemur/plugins/lemur_acme/tests/test_powerdns.py @@ -165,7 +165,7 @@ class TestPowerdns(unittest.TestCase): "function": "delete_txt_record", "fqdn": domain, "token": token, - "message": "Unable to delete TXT record: TXT record not found" + "message": "Unable to delete TXT record: Token not found in existing TXT records" } powerdns.delete_txt_record(change_id, account_number, domain, token) mock_current_app.logger.debug.assert_called_with(log_data) From eb138fc96011167138159590178f9584d6972014 Mon Sep 17 00:00:00 2001 From: Curtis Castrapel Date: Wed, 8 Apr 2020 08:38:40 -0700 Subject: [PATCH 7/8] Add default celery metrics and logging using celery signals --- lemur/common/celery.py | 223 +++++++++++++++++++++++++++++++++-------- 1 file changed, 180 insertions(+), 43 deletions(-) diff --git a/lemur/common/celery.py b/lemur/common/celery.py index ebf85ed7..b0193515 100644 --- a/lemur/common/celery.py +++ b/lemur/common/celery.py @@ -10,27 +10,27 @@ command: celery -A lemur.common.celery worker --loglevel=info -l DEBUG -B import copy import sys import time -from datetime import datetime, timezone, timedelta - from celery import Celery +from celery.app.task import Context from celery.exceptions import SoftTimeLimitExceeded +from celery.signals import task_failure, task_received, task_revoked, task_success +from datetime import datetime, timezone, timedelta from flask import current_app from lemur.authorities.service import get as get_authority +from lemur.certificates import cli as cli_certificate from lemur.common.redis import RedisHandler from lemur.destinations import service as destinations_service +from lemur.dns_providers import cli as cli_dns_providers +from lemur.endpoints import cli as cli_endpoints from lemur.extensions import metrics, sentry from lemur.factory import create_app +from lemur.notifications import cli as cli_notification from lemur.notifications.messaging import send_pending_failure_notification from lemur.pending_certificates import service as pending_certificate_service from lemur.plugins.base import plugins from lemur.sources.cli import clean, sync, validate_sources from lemur.sources.service import add_aws_destination_to_sources -from lemur.certificates import cli as cli_certificate -from lemur.dns_providers import cli as cli_dns_providers -from lemur.notifications import cli as cli_notification -from lemur.endpoints import cli as cli_endpoints - if current_app: flask_app = current_app @@ -67,7 +67,7 @@ def is_task_active(fun, task_id, args): from celery.task.control import inspect if not args: - args = '()' # empty args + args = "()" # empty args i = inspect() active_tasks = i.active() @@ -80,6 +80,37 @@ def is_task_active(fun, task_id, args): return False +def get_celery_request_tags(**kwargs): + request = kwargs.get("request") + sender_hostname = "unknown" + sender = kwargs.get("sender") + if sender: + try: + sender_hostname = sender.hostname + except AttributeError: + sender_hostname = vars(sender.request).get("origin", "unknown") + if request and not isinstance( + request, Context + ): # unlike others, task_revoked sends a Context for `request` + task_name = request.name + task_id = request.id + receiver_hostname = request.hostname + else: + task_name = sender.name + task_id = sender.request.id + receiver_hostname = sender.request.hostname + + tags = { + "task_name": task_name, + "task_id": task_id, + "sender_hostname": sender_hostname, + "receiver_hostname": receiver_hostname, + } + if kwargs.get("exception"): + tags["error"] = repr(kwargs["exception"]) + return tags + + @celery.task() def report_celery_last_success_metrics(): """ @@ -108,15 +139,115 @@ def report_celery_last_success_metrics(): return current_time = int(time.time()) - schedule = current_app.config.get('CELERYBEAT_SCHEDULE') + schedule = current_app.config.get("CELERYBEAT_SCHEDULE") for _, t in schedule.items(): task = t.get("task") last_success = int(red.get(f"{task}.last_success") or 0) - metrics.send(f"{task}.time_since_last_success", 'gauge', current_time - last_success) + metrics.send( + f"{task}.time_since_last_success", "gauge", current_time - last_success + ) red.set( f"{function}.last_success", int(time.time()) ) # Alert if this metric is not seen - metrics.send(f"{function}.success", 'counter', 1) + metrics.send(f"{function}.success", "counter", 1) + + +@task_received.connect +def report_number_pending_tasks(**kwargs): + """ + Report the number of pending tasks to our metrics broker every time a task is published. This metric can be used + for autoscaling workers. + https://docs.celeryproject.org/en/latest/userguide/signals.html#task-received + + :param sender: + :param headers: + :param body: + :param kwargs: + :return: + """ + with flask_app.app_context(): + metrics.send( + "celery.new_pending_task", + "TIMER", + 1, + metric_tags=get_celery_request_tags(**kwargs), + ) + + +@task_success.connect +def report_successful_task(**kwargs): + """ + Report a generic success metric as tasks to our metrics broker every time a task finished correctly. + This metric can be used for autoscaling workers. + https://docs.celeryproject.org/en/latest/userguide/signals.html#task-success + + :param sender: + :param headers: + :param body: + :param kwargs: + :return: + """ + with flask_app.app_context(): + tags = get_celery_request_tags(**kwargs) + red.set(f"{tags['task_name']}.last_success", int(time.time())) + metrics.send("celery.successful_task", "TIMER", 1, metric_tags=tags) + + +@task_failure.connect +def report_failed_task(**kwargs): + """ + Report a generic failure metric as tasks to our metrics broker every time a task fails. + This metric can be used for alerting. + https://docs.celeryproject.org/en/latest/userguide/signals.html#task-failure + + :param sender: + :param headers: + :param body: + :param kwargs: + :return: + """ + with flask_app.app_context(): + log_data = { + "function": f"{__name__}.{sys._getframe().f_code.co_name}", + "Message": "Celery Task Failure", + } + + # Add traceback if exception info is in the kwargs + einfo = kwargs.get("einfo") + if einfo: + log_data["traceback"] = einfo.traceback + + error_tags = get_celery_request_tags(**kwargs) + + log_data.update(error_tags) + current_app.logger.error(log_data) + metrics.send("celery.failed_task", "TIMER", 1, metric_tags=error_tags) + + +@task_revoked.connect +def report_revoked_task(**kwargs): + """ + Report a generic failure metric as tasks to our metrics broker every time a task is revoked. + This metric can be used for alerting. + https://docs.celeryproject.org/en/latest/userguide/signals.html#task-revoked + + :param sender: + :param headers: + :param body: + :param kwargs: + :return: + """ + with flask_app.app_context(): + log_data = { + "function": f"{__name__}.{sys._getframe().f_code.co_name}", + "Message": "Celery Task Revoked", + } + + error_tags = get_celery_request_tags(**kwargs) + + log_data.update(error_tags) + current_app.logger.error(log_data) + metrics.send("celery.revoked_task", "TIMER", 1, metric_tags=error_tags) @celery.task(soft_time_limit=600) @@ -217,15 +348,15 @@ def fetch_acme_cert(id): log_data["failed"] = failed log_data["wrong_issuer"] = wrong_issuer current_app.logger.debug(log_data) - metrics.send(f"{function}.resolved", 'gauge', new) - metrics.send(f"{function}.failed", 'gauge', failed) - metrics.send(f"{function}.wrong_issuer", 'gauge', wrong_issuer) + metrics.send(f"{function}.resolved", "gauge", new) + metrics.send(f"{function}.failed", "gauge", failed) + metrics.send(f"{function}.wrong_issuer", "gauge", wrong_issuer) print( "[+] Certificates: New: {new} Failed: {failed} Not using ACME: {wrong_issuer}".format( new=new, failed=failed, wrong_issuer=wrong_issuer ) ) - red.set(f'{function}.last_success', int(time.time())) + return log_data @celery.task() @@ -262,8 +393,8 @@ def fetch_all_pending_acme_certs(): current_app.logger.debug(log_data) fetch_acme_cert.delay(cert.id) - red.set(f'{function}.last_success', int(time.time())) - metrics.send(f"{function}.success", 'counter', 1) + metrics.send(f"{function}.success", "counter", 1) + return log_data @celery.task() @@ -296,8 +427,8 @@ def remove_old_acme_certs(): current_app.logger.debug(log_data) pending_certificate_service.delete(cert) - red.set(f'{function}.last_success', int(time.time())) - metrics.send(f"{function}.success", 'counter', 1) + metrics.send(f"{function}.success", "counter", 1) + return log_data @celery.task() @@ -328,8 +459,8 @@ def clean_all_sources(): current_app.logger.debug(log_data) clean_source.delay(source.label) - red.set(f'{function}.last_success', int(time.time())) - metrics.send(f"{function}.success", 'counter', 1) + metrics.send(f"{function}.success", "counter", 1) + return log_data @celery.task(soft_time_limit=3600) @@ -366,6 +497,7 @@ def clean_source(source): current_app.logger.error(log_data) sentry.captureException() metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) + return log_data @celery.task() @@ -395,8 +527,8 @@ def sync_all_sources(): current_app.logger.debug(log_data) sync_source.delay(source.label) - red.set(f'{function}.last_success', int(time.time())) - metrics.send(f"{function}.success", 'counter', 1) + metrics.send(f"{function}.success", "counter", 1) + return log_data @celery.task(soft_time_limit=7200) @@ -428,19 +560,23 @@ def sync_source(source): current_app.logger.debug(log_data) try: sync([source]) - metrics.send(f"{function}.success", 'counter', 1, metric_tags={"source": source}) + metrics.send( + f"{function}.success", "counter", 1, metric_tags={"source": source} + ) except SoftTimeLimitExceeded: log_data["message"] = "Error syncing source: Time limit exceeded." current_app.logger.error(log_data) sentry.captureException() - metrics.send("sync_source_timeout", "counter", 1, metric_tags={"source": source}) + metrics.send( + "sync_source_timeout", "counter", 1, metric_tags={"source": source} + ) metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) return log_data["message"] = "Done syncing source" current_app.logger.debug(log_data) - metrics.send(f"{function}.success", 'counter', 1, metric_tags={"source": source}) - red.set(f'{function}.last_success', int(time.time())) + metrics.send(f"{function}.success", "counter", 1, metric_tags={"source": source}) + return log_data @celery.task() @@ -477,8 +613,8 @@ def sync_source_destination(): log_data["message"] = "completed Syncing AWS destinations and sources" current_app.logger.debug(log_data) - red.set(f'{function}.last_success', int(time.time())) - metrics.send(f"{function}.success", 'counter', 1) + metrics.send(f"{function}.success", "counter", 1) + return log_data @celery.task(soft_time_limit=3600) @@ -515,8 +651,8 @@ def certificate_reissue(): log_data["message"] = "reissuance completed" current_app.logger.debug(log_data) - red.set(f'{function}.last_success', int(time.time())) - metrics.send(f"{function}.success", 'counter', 1) + metrics.send(f"{function}.success", "counter", 1) + return log_data @celery.task(soft_time_limit=3600) @@ -534,7 +670,6 @@ def certificate_rotate(): "function": function, "message": "rotating certificates", "task_id": task_id, - } if task_id and is_task_active(function, task_id, None): @@ -554,8 +689,8 @@ def certificate_rotate(): log_data["message"] = "rotation completed" current_app.logger.debug(log_data) - red.set(f'{function}.last_success', int(time.time())) - metrics.send(f"{function}.success", 'counter', 1) + metrics.send(f"{function}.success", "counter", 1) + return log_data @celery.task(soft_time_limit=3600) @@ -590,8 +725,8 @@ def endpoints_expire(): metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) return - red.set(f'{function}.last_success', int(time.time())) - metrics.send(f"{function}.success", 'counter', 1) + metrics.send(f"{function}.success", "counter", 1) + return log_data @celery.task(soft_time_limit=600) @@ -626,8 +761,8 @@ def get_all_zones(): metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) return - red.set(f'{function}.last_success', int(time.time())) - metrics.send(f"{function}.success", 'counter', 1) + metrics.send(f"{function}.success", "counter", 1) + return log_data @celery.task(soft_time_limit=3600) @@ -662,8 +797,8 @@ def check_revoked(): metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) return - red.set(f'{function}.last_success', int(time.time())) - metrics.send(f"{function}.success", 'counter', 1) + metrics.send(f"{function}.success", "counter", 1) + return log_data @celery.task(soft_time_limit=3600) @@ -690,7 +825,9 @@ def notify_expirations(): current_app.logger.debug(log_data) try: - cli_notification.expirations(current_app.config.get("EXCLUDE_CN_FROM_NOTIFICATION", [])) + cli_notification.expirations( + current_app.config.get("EXCLUDE_CN_FROM_NOTIFICATION", []) + ) except SoftTimeLimitExceeded: log_data["message"] = "Notify expiring Time limit exceeded." current_app.logger.error(log_data) @@ -698,5 +835,5 @@ def notify_expirations(): metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) return - red.set(f'{function}.last_success', int(time.time())) - metrics.send(f"{function}.success", 'counter', 1) + metrics.send(f"{function}.success", "counter", 1) + return log_data From 11b15e7e234e0d5b7b494e35697d2ced5fbb5972 Mon Sep 17 00:00:00 2001 From: Curtis Castrapel Date: Wed, 8 Apr 2020 08:41:48 -0700 Subject: [PATCH 8/8] Clean up docstrings --- lemur/common/celery.py | 25 ------------------------- 1 file changed, 25 deletions(-) diff --git a/lemur/common/celery.py b/lemur/common/celery.py index b0193515..7c183dc9 100644 --- a/lemur/common/celery.py +++ b/lemur/common/celery.py @@ -120,7 +120,6 @@ def report_celery_last_success_metrics(): report_celery_last_success_metrics should be ran periodically to emit metrics on when a task was last successful. Admins can then alert when tasks are not ran when intended. Admins should also alert when no metrics are emitted from this function. - """ function = f"{__name__}.{sys._getframe().f_code.co_name}" task_id = None @@ -158,12 +157,6 @@ def report_number_pending_tasks(**kwargs): Report the number of pending tasks to our metrics broker every time a task is published. This metric can be used for autoscaling workers. https://docs.celeryproject.org/en/latest/userguide/signals.html#task-received - - :param sender: - :param headers: - :param body: - :param kwargs: - :return: """ with flask_app.app_context(): metrics.send( @@ -180,12 +173,6 @@ def report_successful_task(**kwargs): Report a generic success metric as tasks to our metrics broker every time a task finished correctly. This metric can be used for autoscaling workers. https://docs.celeryproject.org/en/latest/userguide/signals.html#task-success - - :param sender: - :param headers: - :param body: - :param kwargs: - :return: """ with flask_app.app_context(): tags = get_celery_request_tags(**kwargs) @@ -199,12 +186,6 @@ def report_failed_task(**kwargs): Report a generic failure metric as tasks to our metrics broker every time a task fails. This metric can be used for alerting. https://docs.celeryproject.org/en/latest/userguide/signals.html#task-failure - - :param sender: - :param headers: - :param body: - :param kwargs: - :return: """ with flask_app.app_context(): log_data = { @@ -230,12 +211,6 @@ def report_revoked_task(**kwargs): Report a generic failure metric as tasks to our metrics broker every time a task is revoked. This metric can be used for alerting. https://docs.celeryproject.org/en/latest/userguide/signals.html#task-revoked - - :param sender: - :param headers: - :param body: - :param kwargs: - :return: """ with flask_app.app_context(): log_data = {