better alerting

This commit is contained in:
Hossein Shafagh 2019-07-11 23:00:35 -07:00
parent ea8524f035
commit 2628ed1a82
2 changed files with 95 additions and 3 deletions

View File

@ -9,6 +9,7 @@ command: celery -A lemur.common.celery worker --loglevel=info -l DEBUG -B
""" """
import copy import copy
import sys import sys
import time
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
from celery import Celery from celery import Celery
@ -16,6 +17,7 @@ from celery.exceptions import SoftTimeLimitExceeded
from flask import current_app from flask import current_app
from lemur.authorities.service import get as get_authority from lemur.authorities.service import get as get_authority
from lemur.common.redis import RedisHandler
from lemur.destinations import service as destinations_service from lemur.destinations import service as destinations_service
from lemur.extensions import metrics, sentry from lemur.extensions import metrics, sentry
from lemur.factory import create_app from lemur.factory import create_app
@ -30,6 +32,9 @@ if current_app:
else: else:
flask_app = create_app() flask_app = create_app()
red = RedisHandler(host=current_app.config.get('REDIS_HOST', 'localhost'),
port=current_app.config.get('REDIS_PORT', 6379),
db=current_app.config.get('REDIS_DB', 0)).redis()
def make_celery(app): def make_celery(app):
celery = Celery( celery = Celery(
@ -68,6 +73,30 @@ def is_task_active(fun, task_id, args):
return False return False
@celery.task()
def report_celery_last_success_metrics():
"""
For each celery task, this will determine the number of seconds since it has last been successful.
Celery tasks should be emitting redis stats with a deterministic key (In our case, `f"{task}.last_success"`.
report_celery_last_success_metrics should be ran periodically to emit metrics on when a task was last successful.
Admins can then alert when tasks are not ran when intended. Admins should also alert when no metrics are emitted
from this function.
"""
function = f"{__name__}.{sys._getframe().f_code.co_name}"
current_time = int(time.time())
schedule = current_app.config.get('CELERYBEAT_SCHEDULE')
for _, t in schedule.items():
task = t.get("task")
last_success = int(red.get(f"{task}.last_success") or 0)
metrics.send(f"{task}.time_since_last_success", 'gauge', current_time - last_success)
red.set(
f"{function}.last_success", int(time.time())
) # Alert if this metric is not seen
metrics.send(f"{function}.success", 'counter', 1)
@celery.task(soft_time_limit=600) @celery.task(soft_time_limit=600)
def fetch_acme_cert(id): def fetch_acme_cert(id):
""" """
@ -80,8 +109,9 @@ def fetch_acme_cert(id):
if celery.current_task: if celery.current_task:
task_id = celery.current_task.request.id task_id = celery.current_task.request.id
function = "{}.{}".format(__name__, sys._getframe().f_code.co_name)
log_data = { log_data = {
"function": "{}.{}".format(__name__, sys._getframe().f_code.co_name), "function": function,
"message": "Resolving pending certificate {}".format(id), "message": "Resolving pending certificate {}".format(id),
"task_id": task_id, "task_id": task_id,
"id": id, "id": id,
@ -165,11 +195,15 @@ def fetch_acme_cert(id):
log_data["failed"] = failed log_data["failed"] = failed
log_data["wrong_issuer"] = wrong_issuer log_data["wrong_issuer"] = wrong_issuer
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
metrics.send(f"{function}.resolved", 'gauge', new)
metrics.send(f"{function}.failed", 'gauge', failed)
metrics.send(f"{function}.wrong_issuer", 'gauge', wrong_issuer)
print( print(
"[+] Certificates: New: {new} Failed: {failed} Not using ACME: {wrong_issuer}".format( "[+] Certificates: New: {new} Failed: {failed} Not using ACME: {wrong_issuer}".format(
new=new, failed=failed, wrong_issuer=wrong_issuer new=new, failed=failed, wrong_issuer=wrong_issuer
) )
) )
red.set(f'{function}.last_success', int(time.time()))
@celery.task() @celery.task()
@ -177,8 +211,9 @@ def fetch_all_pending_acme_certs():
"""Instantiate celery workers to resolve all pending Acme certificates""" """Instantiate celery workers to resolve all pending Acme certificates"""
pending_certs = pending_certificate_service.get_unresolved_pending_certs() pending_certs = pending_certificate_service.get_unresolved_pending_certs()
function = "{}.{}".format(__name__, sys._getframe().f_code.co_name)
log_data = { log_data = {
"function": "{}.{}".format(__name__, sys._getframe().f_code.co_name), "function": function,
"message": "Starting job.", "message": "Starting job.",
} }
@ -195,11 +230,18 @@ def fetch_all_pending_acme_certs():
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
fetch_acme_cert.delay(cert.id) fetch_acme_cert.delay(cert.id)
red.set(f'{function}.last_success', int(time.time()))
metrics.send(f"{function}.success", 'counter', 1)
@celery.task() @celery.task()
def remove_old_acme_certs(): def remove_old_acme_certs():
"""Prune old pending acme certificates from the database""" """Prune old pending acme certificates from the database"""
log_data = {"function": "{}.{}".format(__name__, sys._getframe().f_code.co_name)} function = "{}.{}".format(__name__, sys._getframe().f_code.co_name)
log_data = {
"function": function,
"message": "Starting job.",
}
pending_certs = pending_certificate_service.get_pending_certs("all") pending_certs = pending_certificate_service.get_pending_certs("all")
# Delete pending certs more than a week old # Delete pending certs more than a week old
@ -211,6 +253,9 @@ def remove_old_acme_certs():
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
pending_certificate_service.delete(cert) pending_certificate_service.delete(cert)
red.set(f'{function}.last_success', int(time.time()))
metrics.send(f"{function}.success", 'counter', 1)
@celery.task() @celery.task()
def clean_all_sources(): def clean_all_sources():
@ -218,6 +263,7 @@ def clean_all_sources():
This function will clean unused certificates from sources. This is a destructive operation and should only This function will clean unused certificates from sources. This is a destructive operation and should only
be ran periodically. This function triggers one celery task per source. be ran periodically. This function triggers one celery task per source.
""" """
function = "{}.{}".format(__name__, sys._getframe().f_code.co_name)
sources = validate_sources("all") sources = validate_sources("all")
for source in sources: for source in sources:
current_app.logger.debug( current_app.logger.debug(
@ -225,6 +271,9 @@ def clean_all_sources():
) )
clean_source.delay(source.label) clean_source.delay(source.label)
red.set(f'{function}.last_success', int(time.time()))
metrics.send(f"{function}.success", 'counter', 1)
@celery.task() @celery.task()
def clean_source(source): def clean_source(source):
@ -244,6 +293,7 @@ def sync_all_sources():
""" """
This function will sync certificates from all sources. This function triggers one celery task per source. This function will sync certificates from all sources. This function triggers one celery task per source.
""" """
function = "{}.{}".format(__name__, sys._getframe().f_code.co_name)
sources = validate_sources("all") sources = validate_sources("all")
for source in sources: for source in sources:
current_app.logger.debug( current_app.logger.debug(
@ -251,6 +301,9 @@ def sync_all_sources():
) )
sync_source.delay(source.label) sync_source.delay(source.label)
red.set(f'{function}.last_success', int(time.time()))
metrics.send(f"{function}.success", 'counter', 1)
@celery.task(soft_time_limit=7200) @celery.task(soft_time_limit=7200)
def sync_source(source): def sync_source(source):
@ -279,6 +332,7 @@ def sync_source(source):
return return
try: try:
sync([source]) sync([source])
metrics.send(f"{function}.success", 'counter', '1', metric_tags={"source": source})
except SoftTimeLimitExceeded: except SoftTimeLimitExceeded:
log_data["message"] = "Error syncing source: Time limit exceeded." log_data["message"] = "Error syncing source: Time limit exceeded."
current_app.logger.error(log_data) current_app.logger.error(log_data)
@ -290,6 +344,8 @@ def sync_source(source):
log_data["message"] = "Done syncing source" log_data["message"] = "Done syncing source"
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
metrics.send(f"{function}.success", 'counter', 1, metric_tags=source)
red.set(f'{function}.last_success', int(time.time()))
@celery.task() @celery.task()
@ -302,9 +358,12 @@ def sync_source_destination():
We rely on account numbers to avoid duplicates. We rely on account numbers to avoid duplicates.
""" """
current_app.logger.debug("Syncing AWS destinations and sources") current_app.logger.debug("Syncing AWS destinations and sources")
function = "{}.{}".format(__name__, sys._getframe().f_code.co_name)
for dst in destinations_service.get_all(): for dst in destinations_service.get_all():
if add_aws_destination_to_sources(dst): if add_aws_destination_to_sources(dst):
current_app.logger.debug("Source: %s added", dst.label) current_app.logger.debug("Source: %s added", dst.label)
current_app.logger.debug("Completed Syncing AWS destinations and sources") current_app.logger.debug("Completed Syncing AWS destinations and sources")
red.set(f'{function}.last_success', int(time.time()))
metrics.send(f"{function}.success", 'counter', 1)

33
lemur/common/redis.py Normal file
View File

@ -0,0 +1,33 @@
"""
Helper Class for Redis
"""
import redis
#from flask import current_app
class RedisHandler:
#def __init__(self, host=current_app.config.get('REDIS_HOST', 'localhost'),
# port=current_app.config.get('REDIS_PORT', 6379),
# db=current_app.config.get('REDIS_DB', 0)):
def __init__(self, host, port, db):
self.host = host
self.port = port
self.db = db
def redis(self, db=0):
# The decode_responses flag here directs the client to convert the responses from Redis into Python strings
# using the default encoding utf-8. This is client specific.
red = redis.StrictRedis(host=self.host, port=self.port, db=self.db, charset="utf-8", decode_responses=True)
return red
def redis_get(key, default=None):
red = RedisHandler().redis()
try:
v = red.get(key)
except redis.exceptions.ConnectionError:
v = None
if not v:
return default
return v