From 2628ed1a8272e100c2c36e852cf502349c9b7e96 Mon Sep 17 00:00:00 2001 From: Hossein Shafagh Date: Thu, 11 Jul 2019 23:00:35 -0700 Subject: [PATCH 1/7] better alerting --- lemur/common/celery.py | 65 ++++++++++++++++++++++++++++++++++++++++-- lemur/common/redis.py | 33 +++++++++++++++++++++ 2 files changed, 95 insertions(+), 3 deletions(-) create mode 100644 lemur/common/redis.py diff --git a/lemur/common/celery.py b/lemur/common/celery.py index d3cc7621..b775396a 100644 --- a/lemur/common/celery.py +++ b/lemur/common/celery.py @@ -9,6 +9,7 @@ command: celery -A lemur.common.celery worker --loglevel=info -l DEBUG -B """ import copy import sys +import time from datetime import datetime, timezone, timedelta from celery import Celery @@ -16,6 +17,7 @@ from celery.exceptions import SoftTimeLimitExceeded from flask import current_app from lemur.authorities.service import get as get_authority +from lemur.common.redis import RedisHandler from lemur.destinations import service as destinations_service from lemur.extensions import metrics, sentry from lemur.factory import create_app @@ -30,6 +32,9 @@ if current_app: else: flask_app = create_app() +red = RedisHandler(host=current_app.config.get('REDIS_HOST', 'localhost'), + port=current_app.config.get('REDIS_PORT', 6379), + db=current_app.config.get('REDIS_DB', 0)).redis() def make_celery(app): celery = Celery( @@ -68,6 +73,30 @@ def is_task_active(fun, task_id, args): return False +@celery.task() +def report_celery_last_success_metrics(): + """ + For each celery task, this will determine the number of seconds since it has last been successful. + + Celery tasks should be emitting redis stats with a deterministic key (In our case, `f"{task}.last_success"`. + report_celery_last_success_metrics should be ran periodically to emit metrics on when a task was last successful. + Admins can then alert when tasks are not ran when intended. Admins should also alert when no metrics are emitted + from this function. + + """ + function = f"{__name__}.{sys._getframe().f_code.co_name}" + current_time = int(time.time()) + schedule = current_app.config.get('CELERYBEAT_SCHEDULE') + for _, t in schedule.items(): + task = t.get("task") + last_success = int(red.get(f"{task}.last_success") or 0) + metrics.send(f"{task}.time_since_last_success", 'gauge', current_time - last_success) + red.set( + f"{function}.last_success", int(time.time()) + ) # Alert if this metric is not seen + metrics.send(f"{function}.success", 'counter', 1) + + @celery.task(soft_time_limit=600) def fetch_acme_cert(id): """ @@ -80,8 +109,9 @@ def fetch_acme_cert(id): if celery.current_task: task_id = celery.current_task.request.id + function = "{}.{}".format(__name__, sys._getframe().f_code.co_name) log_data = { - "function": "{}.{}".format(__name__, sys._getframe().f_code.co_name), + "function": function, "message": "Resolving pending certificate {}".format(id), "task_id": task_id, "id": id, @@ -165,11 +195,15 @@ def fetch_acme_cert(id): log_data["failed"] = failed log_data["wrong_issuer"] = wrong_issuer current_app.logger.debug(log_data) + metrics.send(f"{function}.resolved", 'gauge', new) + metrics.send(f"{function}.failed", 'gauge', failed) + metrics.send(f"{function}.wrong_issuer", 'gauge', wrong_issuer) print( "[+] Certificates: New: {new} Failed: {failed} Not using ACME: {wrong_issuer}".format( new=new, failed=failed, wrong_issuer=wrong_issuer ) ) + red.set(f'{function}.last_success', int(time.time())) @celery.task() @@ -177,8 +211,9 @@ def fetch_all_pending_acme_certs(): """Instantiate celery workers to resolve all pending Acme certificates""" pending_certs = pending_certificate_service.get_unresolved_pending_certs() + function = "{}.{}".format(__name__, sys._getframe().f_code.co_name) log_data = { - "function": "{}.{}".format(__name__, sys._getframe().f_code.co_name), + "function": function, "message": "Starting job.", } @@ -195,11 +230,18 @@ def fetch_all_pending_acme_certs(): current_app.logger.debug(log_data) fetch_acme_cert.delay(cert.id) + red.set(f'{function}.last_success', int(time.time())) + metrics.send(f"{function}.success", 'counter', 1) + @celery.task() def remove_old_acme_certs(): """Prune old pending acme certificates from the database""" - log_data = {"function": "{}.{}".format(__name__, sys._getframe().f_code.co_name)} + function = "{}.{}".format(__name__, sys._getframe().f_code.co_name) + log_data = { + "function": function, + "message": "Starting job.", + } pending_certs = pending_certificate_service.get_pending_certs("all") # Delete pending certs more than a week old @@ -211,6 +253,9 @@ def remove_old_acme_certs(): current_app.logger.debug(log_data) pending_certificate_service.delete(cert) + red.set(f'{function}.last_success', int(time.time())) + metrics.send(f"{function}.success", 'counter', 1) + @celery.task() def clean_all_sources(): @@ -218,6 +263,7 @@ def clean_all_sources(): This function will clean unused certificates from sources. This is a destructive operation and should only be ran periodically. This function triggers one celery task per source. """ + function = "{}.{}".format(__name__, sys._getframe().f_code.co_name) sources = validate_sources("all") for source in sources: current_app.logger.debug( @@ -225,6 +271,9 @@ def clean_all_sources(): ) clean_source.delay(source.label) + red.set(f'{function}.last_success', int(time.time())) + metrics.send(f"{function}.success", 'counter', 1) + @celery.task() def clean_source(source): @@ -244,6 +293,7 @@ def sync_all_sources(): """ This function will sync certificates from all sources. This function triggers one celery task per source. """ + function = "{}.{}".format(__name__, sys._getframe().f_code.co_name) sources = validate_sources("all") for source in sources: current_app.logger.debug( @@ -251,6 +301,9 @@ def sync_all_sources(): ) sync_source.delay(source.label) + red.set(f'{function}.last_success', int(time.time())) + metrics.send(f"{function}.success", 'counter', 1) + @celery.task(soft_time_limit=7200) def sync_source(source): @@ -279,6 +332,7 @@ def sync_source(source): return try: sync([source]) + metrics.send(f"{function}.success", 'counter', '1', metric_tags={"source": source}) except SoftTimeLimitExceeded: log_data["message"] = "Error syncing source: Time limit exceeded." current_app.logger.error(log_data) @@ -290,6 +344,8 @@ def sync_source(source): log_data["message"] = "Done syncing source" current_app.logger.debug(log_data) + metrics.send(f"{function}.success", 'counter', 1, metric_tags=source) + red.set(f'{function}.last_success', int(time.time())) @celery.task() @@ -302,9 +358,12 @@ def sync_source_destination(): We rely on account numbers to avoid duplicates. """ current_app.logger.debug("Syncing AWS destinations and sources") + function = "{}.{}".format(__name__, sys._getframe().f_code.co_name) for dst in destinations_service.get_all(): if add_aws_destination_to_sources(dst): current_app.logger.debug("Source: %s added", dst.label) current_app.logger.debug("Completed Syncing AWS destinations and sources") + red.set(f'{function}.last_success', int(time.time())) + metrics.send(f"{function}.success", 'counter', 1) diff --git a/lemur/common/redis.py b/lemur/common/redis.py new file mode 100644 index 00000000..a996ad67 --- /dev/null +++ b/lemur/common/redis.py @@ -0,0 +1,33 @@ +""" +Helper Class for Redis + +""" +import redis +#from flask import current_app + + +class RedisHandler: + #def __init__(self, host=current_app.config.get('REDIS_HOST', 'localhost'), + # port=current_app.config.get('REDIS_PORT', 6379), + # db=current_app.config.get('REDIS_DB', 0)): + def __init__(self, host, port, db): + self.host = host + self.port = port + self.db = db + + def redis(self, db=0): + # The decode_responses flag here directs the client to convert the responses from Redis into Python strings + # using the default encoding utf-8. This is client specific. + red = redis.StrictRedis(host=self.host, port=self.port, db=self.db, charset="utf-8", decode_responses=True) + return red + + +def redis_get(key, default=None): + red = RedisHandler().redis() + try: + v = red.get(key) + except redis.exceptions.ConnectionError: + v = None + if not v: + return default + return v From 97d74bfa1d4a946e8002042eb2c20032353dc1e7 Mon Sep 17 00:00:00 2001 From: Hossein Shafagh Date: Fri, 12 Jul 2019 08:47:39 -0700 Subject: [PATCH 2/7] fixing the app context issue. we will create an app if no current_app available --- lemur/common/celery.py | 5 ++--- lemur/common/redis.py | 14 +++++++++----- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/lemur/common/celery.py b/lemur/common/celery.py index b775396a..05e66926 100644 --- a/lemur/common/celery.py +++ b/lemur/common/celery.py @@ -32,9 +32,8 @@ if current_app: else: flask_app = create_app() -red = RedisHandler(host=current_app.config.get('REDIS_HOST', 'localhost'), - port=current_app.config.get('REDIS_PORT', 6379), - db=current_app.config.get('REDIS_DB', 0)).redis() +red = RedisHandler().redis() + def make_celery(app): celery = Celery( diff --git a/lemur/common/redis.py b/lemur/common/redis.py index a996ad67..4af39aef 100644 --- a/lemur/common/redis.py +++ b/lemur/common/redis.py @@ -3,14 +3,18 @@ Helper Class for Redis """ import redis -#from flask import current_app +from flask import current_app +from lemur.factory import create_app +if current_app: + flask_app = current_app +else: + flask_app = create_app() class RedisHandler: - #def __init__(self, host=current_app.config.get('REDIS_HOST', 'localhost'), - # port=current_app.config.get('REDIS_PORT', 6379), - # db=current_app.config.get('REDIS_DB', 0)): - def __init__(self, host, port, db): + def __init__(self, host=flask_app.config.get('REDIS_HOST', 'localhost'), + port=flask_app.config.get('REDIS_PORT', 6379), + db=flask_app.config.get('REDIS_DB', 0)): self.host = host self.port = port self.db = db From 1b1bdbb261c3f6b03bf317cf91a154c3b19d06c9 Mon Sep 17 00:00:00 2001 From: Hossein Shafagh Date: Fri, 12 Jul 2019 10:25:37 -0700 Subject: [PATCH 3/7] spacing --- lemur/common/redis.py | 1 + 1 file changed, 1 insertion(+) diff --git a/lemur/common/redis.py b/lemur/common/redis.py index 4af39aef..0bddf9b4 100644 --- a/lemur/common/redis.py +++ b/lemur/common/redis.py @@ -11,6 +11,7 @@ if current_app: else: flask_app = create_app() + class RedisHandler: def __init__(self, host=flask_app.config.get('REDIS_HOST', 'localhost'), port=flask_app.config.get('REDIS_PORT', 6379), From cd1aeb15f179061ab02adb3b02c01909f6f4b19b Mon Sep 17 00:00:00 2001 From: Hossein Shafagh Date: Fri, 12 Jul 2019 11:50:12 -0700 Subject: [PATCH 4/7] adding testing for redis --- lemur/common/redis.py | 2 +- lemur/tests/test_redis.py | 13 +++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) create mode 100644 lemur/tests/test_redis.py diff --git a/lemur/common/redis.py b/lemur/common/redis.py index 0bddf9b4..34a8778f 100644 --- a/lemur/common/redis.py +++ b/lemur/common/redis.py @@ -23,7 +23,7 @@ class RedisHandler: def redis(self, db=0): # The decode_responses flag here directs the client to convert the responses from Redis into Python strings # using the default encoding utf-8. This is client specific. - red = redis.StrictRedis(host=self.host, port=self.port, db=self.db, charset="utf-8", decode_responses=True) + red = redis.StrictRedis(host=self.host, port=self.port, db=self.db, encoding="utf-8", decode_responses=True) return red diff --git a/lemur/tests/test_redis.py b/lemur/tests/test_redis.py new file mode 100644 index 00000000..aab2e397 --- /dev/null +++ b/lemur/tests/test_redis.py @@ -0,0 +1,13 @@ +import fakeredis +import time +import sys + + +def test_write_and_read_from_redis(): + function = f"{__name__}.{sys._getframe().f_code.co_name}" + + red = fakeredis.FakeStrictRedis() + key = f"{function}.last_success" + value = int(time.time()) + assert red.set(key, value) is True + assert (int(red.get(key)) == value) is True From 0ed00c5011665b316ff66180a3cbd58dd9f29a8b Mon Sep 17 00:00:00 2001 From: Hossein Shafagh Date: Tue, 16 Jul 2019 09:01:04 -0700 Subject: [PATCH 5/7] updating test requirement --- requirements-tests.in | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements-tests.in b/requirements-tests.in index d315cf7a..5d152fce 100644 --- a/requirements-tests.in +++ b/requirements-tests.in @@ -5,6 +5,7 @@ black coverage factory-boy Faker +fakeredis-1.0.3 freezegun moto nose From 54ecda4e1a129abbe03440da8ab051e7cca679b4 Mon Sep 17 00:00:00 2001 From: Hossein Shafagh Date: Tue, 16 Jul 2019 09:09:12 -0700 Subject: [PATCH 6/7] updating fakeredis --- requirements-tests.in | 2 +- requirements-tests.txt | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/requirements-tests.in b/requirements-tests.in index 5d152fce..610f26f9 100644 --- a/requirements-tests.in +++ b/requirements-tests.in @@ -5,7 +5,7 @@ black coverage factory-boy Faker -fakeredis-1.0.3 +fakeredis freezegun moto nose diff --git a/requirements-tests.txt b/requirements-tests.txt index 77bc92af..1c4b276e 100644 --- a/requirements-tests.txt +++ b/requirements-tests.txt @@ -27,6 +27,7 @@ docutils==0.14 # via botocore ecdsa==0.13.2 # via python-jose factory-boy==2.12.0 faker==1.0.7 +fakeredis==1.0.3 flask==1.0.3 # via pytest-flask freezegun==0.3.12 future==0.17.1 # via aws-xray-sdk, python-jose @@ -62,13 +63,15 @@ python-dateutil==2.8.0 # via botocore, faker, freezegun, moto python-jose==3.0.1 # via moto pytz==2019.1 # via moto pyyaml==5.1 +redis==3.2.1 # via fakeredis requests-mock==1.6.0 requests==2.22.0 # via cfn-lint, docker, moto, requests-mock, responses responses==0.10.6 # via moto rsa==4.0 # via python-jose s3transfer==0.2.0 # via boto3 -six==1.12.0 # via aws-sam-translator, bandit, cfn-lint, cryptography, docker, faker, freezegun, mock, moto, packaging, pytest, python-dateutil, python-jose, requests-mock, responses, stevedore, websocket-client +six==1.12.0 # via aws-sam-translator, bandit, cfn-lint, cryptography, docker, faker, fakeredis, freezegun, mock, moto, packaging, pytest, python-dateutil, python-jose, requests-mock, responses, stevedore, websocket-client smmap2==2.0.5 # via gitdb2 +sortedcontainers==2.1.0 # via fakeredis stevedore==1.30.1 # via bandit text-unidecode==1.2 # via faker toml==0.10.0 # via black @@ -79,3 +82,6 @@ werkzeug==0.15.4 # via flask, moto, pytest-flask wrapt==1.11.1 # via aws-xray-sdk xmltodict==0.12.0 # via moto zipp==0.5.1 # via importlib-metadata + +# The following packages are considered to be unsafe in a requirements file: +# setuptools==41.0.1 # via cfn-lint From 09c0fa0f940a9d4706f704362b7f3e2ee2692d7d Mon Sep 17 00:00:00 2001 From: Hossein Shafagh Date: Tue, 16 Jul 2019 17:21:01 -0700 Subject: [PATCH 7/7] updating the function declaration --- lemur/common/celery.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/lemur/common/celery.py b/lemur/common/celery.py index 05e66926..67780957 100644 --- a/lemur/common/celery.py +++ b/lemur/common/celery.py @@ -108,7 +108,7 @@ def fetch_acme_cert(id): if celery.current_task: task_id = celery.current_task.request.id - function = "{}.{}".format(__name__, sys._getframe().f_code.co_name) + function = f"{__name__}.{sys._getframe().f_code.co_name}" log_data = { "function": function, "message": "Resolving pending certificate {}".format(id), @@ -210,7 +210,7 @@ def fetch_all_pending_acme_certs(): """Instantiate celery workers to resolve all pending Acme certificates""" pending_certs = pending_certificate_service.get_unresolved_pending_certs() - function = "{}.{}".format(__name__, sys._getframe().f_code.co_name) + function = f"{__name__}.{sys._getframe().f_code.co_name}" log_data = { "function": function, "message": "Starting job.", @@ -236,7 +236,7 @@ def fetch_all_pending_acme_certs(): @celery.task() def remove_old_acme_certs(): """Prune old pending acme certificates from the database""" - function = "{}.{}".format(__name__, sys._getframe().f_code.co_name) + function = f"{__name__}.{sys._getframe().f_code.co_name}" log_data = { "function": function, "message": "Starting job.", @@ -262,7 +262,7 @@ def clean_all_sources(): This function will clean unused certificates from sources. This is a destructive operation and should only be ran periodically. This function triggers one celery task per source. """ - function = "{}.{}".format(__name__, sys._getframe().f_code.co_name) + function = f"{__name__}.{sys._getframe().f_code.co_name}" sources = validate_sources("all") for source in sources: current_app.logger.debug( @@ -292,7 +292,7 @@ def sync_all_sources(): """ This function will sync certificates from all sources. This function triggers one celery task per source. """ - function = "{}.{}".format(__name__, sys._getframe().f_code.co_name) + function = f"{__name__}.{sys._getframe().f_code.co_name}" sources = validate_sources("all") for source in sources: current_app.logger.debug( @@ -313,7 +313,7 @@ def sync_source(source): :return: """ - function = "{}.{}".format(__name__, sys._getframe().f_code.co_name) + function = f"{__name__}.{sys._getframe().f_code.co_name}" task_id = None if celery.current_task: task_id = celery.current_task.request.id @@ -357,7 +357,7 @@ def sync_source_destination(): We rely on account numbers to avoid duplicates. """ current_app.logger.debug("Syncing AWS destinations and sources") - function = "{}.{}".format(__name__, sys._getframe().f_code.co_name) + function = f"{__name__}.{sys._getframe().f_code.co_name}" for dst in destinations_service.get_all(): if add_aws_destination_to_sources(dst):