Merge pull request #2828 from hosseinsh/improved-logging-alerting-v2
Improved logging alerting v2
This commit is contained in:
commit
ee151bdc43
|
@ -9,6 +9,7 @@ command: celery -A lemur.common.celery worker --loglevel=info -l DEBUG -B
|
||||||
"""
|
"""
|
||||||
import copy
|
import copy
|
||||||
import sys
|
import sys
|
||||||
|
import time
|
||||||
from datetime import datetime, timezone, timedelta
|
from datetime import datetime, timezone, timedelta
|
||||||
|
|
||||||
from celery import Celery
|
from celery import Celery
|
||||||
|
@ -16,6 +17,7 @@ from celery.exceptions import SoftTimeLimitExceeded
|
||||||
from flask import current_app
|
from flask import current_app
|
||||||
|
|
||||||
from lemur.authorities.service import get as get_authority
|
from lemur.authorities.service import get as get_authority
|
||||||
|
from lemur.common.redis import RedisHandler
|
||||||
from lemur.destinations import service as destinations_service
|
from lemur.destinations import service as destinations_service
|
||||||
from lemur.extensions import metrics, sentry
|
from lemur.extensions import metrics, sentry
|
||||||
from lemur.factory import create_app
|
from lemur.factory import create_app
|
||||||
|
@ -30,6 +32,8 @@ if current_app:
|
||||||
else:
|
else:
|
||||||
flask_app = create_app()
|
flask_app = create_app()
|
||||||
|
|
||||||
|
red = RedisHandler().redis()
|
||||||
|
|
||||||
|
|
||||||
def make_celery(app):
|
def make_celery(app):
|
||||||
celery = Celery(
|
celery = Celery(
|
||||||
|
@ -68,6 +72,30 @@ def is_task_active(fun, task_id, args):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
@celery.task()
|
||||||
|
def report_celery_last_success_metrics():
|
||||||
|
"""
|
||||||
|
For each celery task, this will determine the number of seconds since it has last been successful.
|
||||||
|
|
||||||
|
Celery tasks should be emitting redis stats with a deterministic key (In our case, `f"{task}.last_success"`.
|
||||||
|
report_celery_last_success_metrics should be ran periodically to emit metrics on when a task was last successful.
|
||||||
|
Admins can then alert when tasks are not ran when intended. Admins should also alert when no metrics are emitted
|
||||||
|
from this function.
|
||||||
|
|
||||||
|
"""
|
||||||
|
function = f"{__name__}.{sys._getframe().f_code.co_name}"
|
||||||
|
current_time = int(time.time())
|
||||||
|
schedule = current_app.config.get('CELERYBEAT_SCHEDULE')
|
||||||
|
for _, t in schedule.items():
|
||||||
|
task = t.get("task")
|
||||||
|
last_success = int(red.get(f"{task}.last_success") or 0)
|
||||||
|
metrics.send(f"{task}.time_since_last_success", 'gauge', current_time - last_success)
|
||||||
|
red.set(
|
||||||
|
f"{function}.last_success", int(time.time())
|
||||||
|
) # Alert if this metric is not seen
|
||||||
|
metrics.send(f"{function}.success", 'counter', 1)
|
||||||
|
|
||||||
|
|
||||||
@celery.task(soft_time_limit=600)
|
@celery.task(soft_time_limit=600)
|
||||||
def fetch_acme_cert(id):
|
def fetch_acme_cert(id):
|
||||||
"""
|
"""
|
||||||
|
@ -80,8 +108,9 @@ def fetch_acme_cert(id):
|
||||||
if celery.current_task:
|
if celery.current_task:
|
||||||
task_id = celery.current_task.request.id
|
task_id = celery.current_task.request.id
|
||||||
|
|
||||||
|
function = f"{__name__}.{sys._getframe().f_code.co_name}"
|
||||||
log_data = {
|
log_data = {
|
||||||
"function": "{}.{}".format(__name__, sys._getframe().f_code.co_name),
|
"function": function,
|
||||||
"message": "Resolving pending certificate {}".format(id),
|
"message": "Resolving pending certificate {}".format(id),
|
||||||
"task_id": task_id,
|
"task_id": task_id,
|
||||||
"id": id,
|
"id": id,
|
||||||
|
@ -165,11 +194,15 @@ def fetch_acme_cert(id):
|
||||||
log_data["failed"] = failed
|
log_data["failed"] = failed
|
||||||
log_data["wrong_issuer"] = wrong_issuer
|
log_data["wrong_issuer"] = wrong_issuer
|
||||||
current_app.logger.debug(log_data)
|
current_app.logger.debug(log_data)
|
||||||
|
metrics.send(f"{function}.resolved", 'gauge', new)
|
||||||
|
metrics.send(f"{function}.failed", 'gauge', failed)
|
||||||
|
metrics.send(f"{function}.wrong_issuer", 'gauge', wrong_issuer)
|
||||||
print(
|
print(
|
||||||
"[+] Certificates: New: {new} Failed: {failed} Not using ACME: {wrong_issuer}".format(
|
"[+] Certificates: New: {new} Failed: {failed} Not using ACME: {wrong_issuer}".format(
|
||||||
new=new, failed=failed, wrong_issuer=wrong_issuer
|
new=new, failed=failed, wrong_issuer=wrong_issuer
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
red.set(f'{function}.last_success', int(time.time()))
|
||||||
|
|
||||||
|
|
||||||
@celery.task()
|
@celery.task()
|
||||||
|
@ -177,8 +210,9 @@ def fetch_all_pending_acme_certs():
|
||||||
"""Instantiate celery workers to resolve all pending Acme certificates"""
|
"""Instantiate celery workers to resolve all pending Acme certificates"""
|
||||||
pending_certs = pending_certificate_service.get_unresolved_pending_certs()
|
pending_certs = pending_certificate_service.get_unresolved_pending_certs()
|
||||||
|
|
||||||
|
function = f"{__name__}.{sys._getframe().f_code.co_name}"
|
||||||
log_data = {
|
log_data = {
|
||||||
"function": "{}.{}".format(__name__, sys._getframe().f_code.co_name),
|
"function": function,
|
||||||
"message": "Starting job.",
|
"message": "Starting job.",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -195,11 +229,18 @@ def fetch_all_pending_acme_certs():
|
||||||
current_app.logger.debug(log_data)
|
current_app.logger.debug(log_data)
|
||||||
fetch_acme_cert.delay(cert.id)
|
fetch_acme_cert.delay(cert.id)
|
||||||
|
|
||||||
|
red.set(f'{function}.last_success', int(time.time()))
|
||||||
|
metrics.send(f"{function}.success", 'counter', 1)
|
||||||
|
|
||||||
|
|
||||||
@celery.task()
|
@celery.task()
|
||||||
def remove_old_acme_certs():
|
def remove_old_acme_certs():
|
||||||
"""Prune old pending acme certificates from the database"""
|
"""Prune old pending acme certificates from the database"""
|
||||||
log_data = {"function": "{}.{}".format(__name__, sys._getframe().f_code.co_name)}
|
function = f"{__name__}.{sys._getframe().f_code.co_name}"
|
||||||
|
log_data = {
|
||||||
|
"function": function,
|
||||||
|
"message": "Starting job.",
|
||||||
|
}
|
||||||
pending_certs = pending_certificate_service.get_pending_certs("all")
|
pending_certs = pending_certificate_service.get_pending_certs("all")
|
||||||
|
|
||||||
# Delete pending certs more than a week old
|
# Delete pending certs more than a week old
|
||||||
|
@ -211,6 +252,9 @@ def remove_old_acme_certs():
|
||||||
current_app.logger.debug(log_data)
|
current_app.logger.debug(log_data)
|
||||||
pending_certificate_service.delete(cert)
|
pending_certificate_service.delete(cert)
|
||||||
|
|
||||||
|
red.set(f'{function}.last_success', int(time.time()))
|
||||||
|
metrics.send(f"{function}.success", 'counter', 1)
|
||||||
|
|
||||||
|
|
||||||
@celery.task()
|
@celery.task()
|
||||||
def clean_all_sources():
|
def clean_all_sources():
|
||||||
|
@ -218,6 +262,7 @@ def clean_all_sources():
|
||||||
This function will clean unused certificates from sources. This is a destructive operation and should only
|
This function will clean unused certificates from sources. This is a destructive operation and should only
|
||||||
be ran periodically. This function triggers one celery task per source.
|
be ran periodically. This function triggers one celery task per source.
|
||||||
"""
|
"""
|
||||||
|
function = f"{__name__}.{sys._getframe().f_code.co_name}"
|
||||||
sources = validate_sources("all")
|
sources = validate_sources("all")
|
||||||
for source in sources:
|
for source in sources:
|
||||||
current_app.logger.debug(
|
current_app.logger.debug(
|
||||||
|
@ -225,6 +270,9 @@ def clean_all_sources():
|
||||||
)
|
)
|
||||||
clean_source.delay(source.label)
|
clean_source.delay(source.label)
|
||||||
|
|
||||||
|
red.set(f'{function}.last_success', int(time.time()))
|
||||||
|
metrics.send(f"{function}.success", 'counter', 1)
|
||||||
|
|
||||||
|
|
||||||
@celery.task()
|
@celery.task()
|
||||||
def clean_source(source):
|
def clean_source(source):
|
||||||
|
@ -244,6 +292,7 @@ def sync_all_sources():
|
||||||
"""
|
"""
|
||||||
This function will sync certificates from all sources. This function triggers one celery task per source.
|
This function will sync certificates from all sources. This function triggers one celery task per source.
|
||||||
"""
|
"""
|
||||||
|
function = f"{__name__}.{sys._getframe().f_code.co_name}"
|
||||||
sources = validate_sources("all")
|
sources = validate_sources("all")
|
||||||
for source in sources:
|
for source in sources:
|
||||||
current_app.logger.debug(
|
current_app.logger.debug(
|
||||||
|
@ -251,6 +300,9 @@ def sync_all_sources():
|
||||||
)
|
)
|
||||||
sync_source.delay(source.label)
|
sync_source.delay(source.label)
|
||||||
|
|
||||||
|
red.set(f'{function}.last_success', int(time.time()))
|
||||||
|
metrics.send(f"{function}.success", 'counter', 1)
|
||||||
|
|
||||||
|
|
||||||
@celery.task(soft_time_limit=7200)
|
@celery.task(soft_time_limit=7200)
|
||||||
def sync_source(source):
|
def sync_source(source):
|
||||||
|
@ -261,7 +313,7 @@ def sync_source(source):
|
||||||
:return:
|
:return:
|
||||||
"""
|
"""
|
||||||
|
|
||||||
function = "{}.{}".format(__name__, sys._getframe().f_code.co_name)
|
function = f"{__name__}.{sys._getframe().f_code.co_name}"
|
||||||
task_id = None
|
task_id = None
|
||||||
if celery.current_task:
|
if celery.current_task:
|
||||||
task_id = celery.current_task.request.id
|
task_id = celery.current_task.request.id
|
||||||
|
@ -279,6 +331,7 @@ def sync_source(source):
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
sync([source])
|
sync([source])
|
||||||
|
metrics.send(f"{function}.success", 'counter', '1', metric_tags={"source": source})
|
||||||
except SoftTimeLimitExceeded:
|
except SoftTimeLimitExceeded:
|
||||||
log_data["message"] = "Error syncing source: Time limit exceeded."
|
log_data["message"] = "Error syncing source: Time limit exceeded."
|
||||||
current_app.logger.error(log_data)
|
current_app.logger.error(log_data)
|
||||||
|
@ -290,6 +343,8 @@ def sync_source(source):
|
||||||
|
|
||||||
log_data["message"] = "Done syncing source"
|
log_data["message"] = "Done syncing source"
|
||||||
current_app.logger.debug(log_data)
|
current_app.logger.debug(log_data)
|
||||||
|
metrics.send(f"{function}.success", 'counter', 1, metric_tags=source)
|
||||||
|
red.set(f'{function}.last_success', int(time.time()))
|
||||||
|
|
||||||
|
|
||||||
@celery.task()
|
@celery.task()
|
||||||
|
@ -302,9 +357,12 @@ def sync_source_destination():
|
||||||
We rely on account numbers to avoid duplicates.
|
We rely on account numbers to avoid duplicates.
|
||||||
"""
|
"""
|
||||||
current_app.logger.debug("Syncing AWS destinations and sources")
|
current_app.logger.debug("Syncing AWS destinations and sources")
|
||||||
|
function = f"{__name__}.{sys._getframe().f_code.co_name}"
|
||||||
|
|
||||||
for dst in destinations_service.get_all():
|
for dst in destinations_service.get_all():
|
||||||
if add_aws_destination_to_sources(dst):
|
if add_aws_destination_to_sources(dst):
|
||||||
current_app.logger.debug("Source: %s added", dst.label)
|
current_app.logger.debug("Source: %s added", dst.label)
|
||||||
|
|
||||||
current_app.logger.debug("Completed Syncing AWS destinations and sources")
|
current_app.logger.debug("Completed Syncing AWS destinations and sources")
|
||||||
|
red.set(f'{function}.last_success', int(time.time()))
|
||||||
|
metrics.send(f"{function}.success", 'counter', 1)
|
||||||
|
|
|
@ -0,0 +1,38 @@
|
||||||
|
"""
|
||||||
|
Helper Class for Redis
|
||||||
|
|
||||||
|
"""
|
||||||
|
import redis
|
||||||
|
from flask import current_app
|
||||||
|
from lemur.factory import create_app
|
||||||
|
|
||||||
|
if current_app:
|
||||||
|
flask_app = current_app
|
||||||
|
else:
|
||||||
|
flask_app = create_app()
|
||||||
|
|
||||||
|
|
||||||
|
class RedisHandler:
|
||||||
|
def __init__(self, host=flask_app.config.get('REDIS_HOST', 'localhost'),
|
||||||
|
port=flask_app.config.get('REDIS_PORT', 6379),
|
||||||
|
db=flask_app.config.get('REDIS_DB', 0)):
|
||||||
|
self.host = host
|
||||||
|
self.port = port
|
||||||
|
self.db = db
|
||||||
|
|
||||||
|
def redis(self, db=0):
|
||||||
|
# The decode_responses flag here directs the client to convert the responses from Redis into Python strings
|
||||||
|
# using the default encoding utf-8. This is client specific.
|
||||||
|
red = redis.StrictRedis(host=self.host, port=self.port, db=self.db, encoding="utf-8", decode_responses=True)
|
||||||
|
return red
|
||||||
|
|
||||||
|
|
||||||
|
def redis_get(key, default=None):
|
||||||
|
red = RedisHandler().redis()
|
||||||
|
try:
|
||||||
|
v = red.get(key)
|
||||||
|
except redis.exceptions.ConnectionError:
|
||||||
|
v = None
|
||||||
|
if not v:
|
||||||
|
return default
|
||||||
|
return v
|
|
@ -0,0 +1,13 @@
|
||||||
|
import fakeredis
|
||||||
|
import time
|
||||||
|
import sys
|
||||||
|
|
||||||
|
|
||||||
|
def test_write_and_read_from_redis():
|
||||||
|
function = f"{__name__}.{sys._getframe().f_code.co_name}"
|
||||||
|
|
||||||
|
red = fakeredis.FakeStrictRedis()
|
||||||
|
key = f"{function}.last_success"
|
||||||
|
value = int(time.time())
|
||||||
|
assert red.set(key, value) is True
|
||||||
|
assert (int(red.get(key)) == value) is True
|
|
@ -5,6 +5,7 @@ black
|
||||||
coverage
|
coverage
|
||||||
factory-boy
|
factory-boy
|
||||||
Faker
|
Faker
|
||||||
|
fakeredis
|
||||||
freezegun
|
freezegun
|
||||||
moto
|
moto
|
||||||
nose
|
nose
|
||||||
|
|
|
@ -28,6 +28,7 @@ docutils==0.14 # via botocore
|
||||||
ecdsa==0.13.2 # via python-jose, sshpubkeys
|
ecdsa==0.13.2 # via python-jose, sshpubkeys
|
||||||
factory-boy==2.12.0
|
factory-boy==2.12.0
|
||||||
faker==1.0.7
|
faker==1.0.7
|
||||||
|
fakeredis==1.0.3
|
||||||
flask==1.1.1 # via pytest-flask
|
flask==1.1.1 # via pytest-flask
|
||||||
freezegun==0.3.12
|
freezegun==0.3.12
|
||||||
future==0.17.1 # via aws-xray-sdk, python-jose
|
future==0.17.1 # via aws-xray-sdk, python-jose
|
||||||
|
@ -64,6 +65,7 @@ python-dateutil==2.8.0 # via botocore, faker, freezegun, moto
|
||||||
python-jose==3.0.1 # via moto
|
python-jose==3.0.1 # via moto
|
||||||
pytz==2019.1 # via datetime, moto
|
pytz==2019.1 # via datetime, moto
|
||||||
pyyaml==5.1.1
|
pyyaml==5.1.1
|
||||||
|
redis==3.2.1 # via fakeredis
|
||||||
requests-mock==1.6.0
|
requests-mock==1.6.0
|
||||||
requests==2.22.0 # via cfn-lint, docker, moto, requests-mock, responses
|
requests==2.22.0 # via cfn-lint, docker, moto, requests-mock, responses
|
||||||
responses==0.10.6 # via moto
|
responses==0.10.6 # via moto
|
||||||
|
@ -72,6 +74,7 @@ s3transfer==0.2.1 # via boto3
|
||||||
six==1.12.0 # via aws-sam-translator, bandit, cfn-lint, cryptography, docker, faker, freezegun, jsonschema, mock, moto, packaging, pyrsistent, python-dateutil, python-jose, requests-mock, responses, stevedore, websocket-client
|
six==1.12.0 # via aws-sam-translator, bandit, cfn-lint, cryptography, docker, faker, freezegun, jsonschema, mock, moto, packaging, pyrsistent, python-dateutil, python-jose, requests-mock, responses, stevedore, websocket-client
|
||||||
smmap2==2.0.5 # via gitdb2
|
smmap2==2.0.5 # via gitdb2
|
||||||
sshpubkeys==3.1.0 # via moto
|
sshpubkeys==3.1.0 # via moto
|
||||||
|
sortedcontainers==2.1.0 # via fakeredis
|
||||||
stevedore==1.30.1 # via bandit
|
stevedore==1.30.1 # via bandit
|
||||||
text-unidecode==1.2 # via faker
|
text-unidecode==1.2 # via faker
|
||||||
toml==0.10.0 # via black
|
toml==0.10.0 # via black
|
||||||
|
|
Loading…
Reference in New Issue