Merge pull request #2945 from Netflix/celery_metrics

Add default celery metrics and logging using celery signals
This commit is contained in:
Curtis 2020-04-08 12:23:04 -04:00 committed by GitHub
commit 7b2d5c9103
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 156 additions and 44 deletions

View File

@ -10,27 +10,27 @@ command: celery -A lemur.common.celery worker --loglevel=info -l DEBUG -B
import copy import copy
import sys import sys
import time import time
from datetime import datetime, timezone, timedelta
from celery import Celery from celery import Celery
from celery.app.task import Context
from celery.exceptions import SoftTimeLimitExceeded from celery.exceptions import SoftTimeLimitExceeded
from celery.signals import task_failure, task_received, task_revoked, task_success
from datetime import datetime, timezone, timedelta
from flask import current_app from flask import current_app
from lemur.authorities.service import get as get_authority from lemur.authorities.service import get as get_authority
from lemur.certificates import cli as cli_certificate
from lemur.common.redis import RedisHandler from lemur.common.redis import RedisHandler
from lemur.destinations import service as destinations_service from lemur.destinations import service as destinations_service
from lemur.dns_providers import cli as cli_dns_providers
from lemur.endpoints import cli as cli_endpoints
from lemur.extensions import metrics, sentry from lemur.extensions import metrics, sentry
from lemur.factory import create_app from lemur.factory import create_app
from lemur.notifications import cli as cli_notification
from lemur.notifications.messaging import send_pending_failure_notification from lemur.notifications.messaging import send_pending_failure_notification
from lemur.pending_certificates import service as pending_certificate_service from lemur.pending_certificates import service as pending_certificate_service
from lemur.plugins.base import plugins from lemur.plugins.base import plugins
from lemur.sources.cli import clean, sync, validate_sources from lemur.sources.cli import clean, sync, validate_sources
from lemur.sources.service import add_aws_destination_to_sources from lemur.sources.service import add_aws_destination_to_sources
from lemur.certificates import cli as cli_certificate
from lemur.dns_providers import cli as cli_dns_providers
from lemur.notifications import cli as cli_notification
from lemur.endpoints import cli as cli_endpoints
if current_app: if current_app:
flask_app = current_app flask_app = current_app
@ -67,7 +67,7 @@ def is_task_active(fun, task_id, args):
from celery.task.control import inspect from celery.task.control import inspect
if not args: if not args:
args = '()' # empty args args = "()" # empty args
i = inspect() i = inspect()
active_tasks = i.active() active_tasks = i.active()
@ -80,6 +80,37 @@ def is_task_active(fun, task_id, args):
return False return False
def get_celery_request_tags(**kwargs):
request = kwargs.get("request")
sender_hostname = "unknown"
sender = kwargs.get("sender")
if sender:
try:
sender_hostname = sender.hostname
except AttributeError:
sender_hostname = vars(sender.request).get("origin", "unknown")
if request and not isinstance(
request, Context
): # unlike others, task_revoked sends a Context for `request`
task_name = request.name
task_id = request.id
receiver_hostname = request.hostname
else:
task_name = sender.name
task_id = sender.request.id
receiver_hostname = sender.request.hostname
tags = {
"task_name": task_name,
"task_id": task_id,
"sender_hostname": sender_hostname,
"receiver_hostname": receiver_hostname,
}
if kwargs.get("exception"):
tags["error"] = repr(kwargs["exception"])
return tags
@celery.task() @celery.task()
def report_celery_last_success_metrics(): def report_celery_last_success_metrics():
""" """
@ -89,7 +120,6 @@ def report_celery_last_success_metrics():
report_celery_last_success_metrics should be ran periodically to emit metrics on when a task was last successful. report_celery_last_success_metrics should be ran periodically to emit metrics on when a task was last successful.
Admins can then alert when tasks are not ran when intended. Admins should also alert when no metrics are emitted Admins can then alert when tasks are not ran when intended. Admins should also alert when no metrics are emitted
from this function. from this function.
""" """
function = f"{__name__}.{sys._getframe().f_code.co_name}" function = f"{__name__}.{sys._getframe().f_code.co_name}"
task_id = None task_id = None
@ -108,15 +138,91 @@ def report_celery_last_success_metrics():
return return
current_time = int(time.time()) current_time = int(time.time())
schedule = current_app.config.get('CELERYBEAT_SCHEDULE') schedule = current_app.config.get("CELERYBEAT_SCHEDULE")
for _, t in schedule.items(): for _, t in schedule.items():
task = t.get("task") task = t.get("task")
last_success = int(red.get(f"{task}.last_success") or 0) last_success = int(red.get(f"{task}.last_success") or 0)
metrics.send(f"{task}.time_since_last_success", 'gauge', current_time - last_success) metrics.send(
f"{task}.time_since_last_success", "gauge", current_time - last_success
)
red.set( red.set(
f"{function}.last_success", int(time.time()) f"{function}.last_success", int(time.time())
) # Alert if this metric is not seen ) # Alert if this metric is not seen
metrics.send(f"{function}.success", 'counter', 1) metrics.send(f"{function}.success", "counter", 1)
@task_received.connect
def report_number_pending_tasks(**kwargs):
"""
Report the number of pending tasks to our metrics broker every time a task is published. This metric can be used
for autoscaling workers.
https://docs.celeryproject.org/en/latest/userguide/signals.html#task-received
"""
with flask_app.app_context():
metrics.send(
"celery.new_pending_task",
"TIMER",
1,
metric_tags=get_celery_request_tags(**kwargs),
)
@task_success.connect
def report_successful_task(**kwargs):
"""
Report a generic success metric as tasks to our metrics broker every time a task finished correctly.
This metric can be used for autoscaling workers.
https://docs.celeryproject.org/en/latest/userguide/signals.html#task-success
"""
with flask_app.app_context():
tags = get_celery_request_tags(**kwargs)
red.set(f"{tags['task_name']}.last_success", int(time.time()))
metrics.send("celery.successful_task", "TIMER", 1, metric_tags=tags)
@task_failure.connect
def report_failed_task(**kwargs):
"""
Report a generic failure metric as tasks to our metrics broker every time a task fails.
This metric can be used for alerting.
https://docs.celeryproject.org/en/latest/userguide/signals.html#task-failure
"""
with flask_app.app_context():
log_data = {
"function": f"{__name__}.{sys._getframe().f_code.co_name}",
"Message": "Celery Task Failure",
}
# Add traceback if exception info is in the kwargs
einfo = kwargs.get("einfo")
if einfo:
log_data["traceback"] = einfo.traceback
error_tags = get_celery_request_tags(**kwargs)
log_data.update(error_tags)
current_app.logger.error(log_data)
metrics.send("celery.failed_task", "TIMER", 1, metric_tags=error_tags)
@task_revoked.connect
def report_revoked_task(**kwargs):
"""
Report a generic failure metric as tasks to our metrics broker every time a task is revoked.
This metric can be used for alerting.
https://docs.celeryproject.org/en/latest/userguide/signals.html#task-revoked
"""
with flask_app.app_context():
log_data = {
"function": f"{__name__}.{sys._getframe().f_code.co_name}",
"Message": "Celery Task Revoked",
}
error_tags = get_celery_request_tags(**kwargs)
log_data.update(error_tags)
current_app.logger.error(log_data)
metrics.send("celery.revoked_task", "TIMER", 1, metric_tags=error_tags)
@celery.task(soft_time_limit=600) @celery.task(soft_time_limit=600)
@ -217,15 +323,15 @@ def fetch_acme_cert(id):
log_data["failed"] = failed log_data["failed"] = failed
log_data["wrong_issuer"] = wrong_issuer log_data["wrong_issuer"] = wrong_issuer
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
metrics.send(f"{function}.resolved", 'gauge', new) metrics.send(f"{function}.resolved", "gauge", new)
metrics.send(f"{function}.failed", 'gauge', failed) metrics.send(f"{function}.failed", "gauge", failed)
metrics.send(f"{function}.wrong_issuer", 'gauge', wrong_issuer) metrics.send(f"{function}.wrong_issuer", "gauge", wrong_issuer)
print( print(
"[+] Certificates: New: {new} Failed: {failed} Not using ACME: {wrong_issuer}".format( "[+] Certificates: New: {new} Failed: {failed} Not using ACME: {wrong_issuer}".format(
new=new, failed=failed, wrong_issuer=wrong_issuer new=new, failed=failed, wrong_issuer=wrong_issuer
) )
) )
red.set(f'{function}.last_success', int(time.time())) return log_data
@celery.task() @celery.task()
@ -262,8 +368,8 @@ def fetch_all_pending_acme_certs():
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
fetch_acme_cert.delay(cert.id) fetch_acme_cert.delay(cert.id)
red.set(f'{function}.last_success', int(time.time())) metrics.send(f"{function}.success", "counter", 1)
metrics.send(f"{function}.success", 'counter', 1) return log_data
@celery.task() @celery.task()
@ -296,8 +402,8 @@ def remove_old_acme_certs():
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
pending_certificate_service.delete(cert) pending_certificate_service.delete(cert)
red.set(f'{function}.last_success', int(time.time())) metrics.send(f"{function}.success", "counter", 1)
metrics.send(f"{function}.success", 'counter', 1) return log_data
@celery.task() @celery.task()
@ -328,8 +434,8 @@ def clean_all_sources():
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
clean_source.delay(source.label) clean_source.delay(source.label)
red.set(f'{function}.last_success', int(time.time())) metrics.send(f"{function}.success", "counter", 1)
metrics.send(f"{function}.success", 'counter', 1) return log_data
@celery.task(soft_time_limit=3600) @celery.task(soft_time_limit=3600)
@ -366,6 +472,7 @@ def clean_source(source):
current_app.logger.error(log_data) current_app.logger.error(log_data)
sentry.captureException() sentry.captureException()
metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function})
return log_data
@celery.task() @celery.task()
@ -395,8 +502,8 @@ def sync_all_sources():
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
sync_source.delay(source.label) sync_source.delay(source.label)
red.set(f'{function}.last_success', int(time.time())) metrics.send(f"{function}.success", "counter", 1)
metrics.send(f"{function}.success", 'counter', 1) return log_data
@celery.task(soft_time_limit=7200) @celery.task(soft_time_limit=7200)
@ -428,19 +535,23 @@ def sync_source(source):
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
try: try:
sync([source]) sync([source])
metrics.send(f"{function}.success", 'counter', 1, metric_tags={"source": source}) metrics.send(
f"{function}.success", "counter", 1, metric_tags={"source": source}
)
except SoftTimeLimitExceeded: except SoftTimeLimitExceeded:
log_data["message"] = "Error syncing source: Time limit exceeded." log_data["message"] = "Error syncing source: Time limit exceeded."
current_app.logger.error(log_data) current_app.logger.error(log_data)
sentry.captureException() sentry.captureException()
metrics.send("sync_source_timeout", "counter", 1, metric_tags={"source": source}) metrics.send(
"sync_source_timeout", "counter", 1, metric_tags={"source": source}
)
metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function})
return return
log_data["message"] = "Done syncing source" log_data["message"] = "Done syncing source"
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
metrics.send(f"{function}.success", 'counter', 1, metric_tags={"source": source}) metrics.send(f"{function}.success", "counter", 1, metric_tags={"source": source})
red.set(f'{function}.last_success', int(time.time())) return log_data
@celery.task() @celery.task()
@ -477,8 +588,8 @@ def sync_source_destination():
log_data["message"] = "completed Syncing AWS destinations and sources" log_data["message"] = "completed Syncing AWS destinations and sources"
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
red.set(f'{function}.last_success', int(time.time())) metrics.send(f"{function}.success", "counter", 1)
metrics.send(f"{function}.success", 'counter', 1) return log_data
@celery.task(soft_time_limit=3600) @celery.task(soft_time_limit=3600)
@ -515,8 +626,8 @@ def certificate_reissue():
log_data["message"] = "reissuance completed" log_data["message"] = "reissuance completed"
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
red.set(f'{function}.last_success', int(time.time())) metrics.send(f"{function}.success", "counter", 1)
metrics.send(f"{function}.success", 'counter', 1) return log_data
@celery.task(soft_time_limit=3600) @celery.task(soft_time_limit=3600)
@ -534,7 +645,6 @@ def certificate_rotate():
"function": function, "function": function,
"message": "rotating certificates", "message": "rotating certificates",
"task_id": task_id, "task_id": task_id,
} }
if task_id and is_task_active(function, task_id, None): if task_id and is_task_active(function, task_id, None):
@ -554,8 +664,8 @@ def certificate_rotate():
log_data["message"] = "rotation completed" log_data["message"] = "rotation completed"
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
red.set(f'{function}.last_success', int(time.time())) metrics.send(f"{function}.success", "counter", 1)
metrics.send(f"{function}.success", 'counter', 1) return log_data
@celery.task(soft_time_limit=3600) @celery.task(soft_time_limit=3600)
@ -590,8 +700,8 @@ def endpoints_expire():
metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function})
return return
red.set(f'{function}.last_success', int(time.time())) metrics.send(f"{function}.success", "counter", 1)
metrics.send(f"{function}.success", 'counter', 1) return log_data
@celery.task(soft_time_limit=600) @celery.task(soft_time_limit=600)
@ -626,8 +736,8 @@ def get_all_zones():
metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function})
return return
red.set(f'{function}.last_success', int(time.time())) metrics.send(f"{function}.success", "counter", 1)
metrics.send(f"{function}.success", 'counter', 1) return log_data
@celery.task(soft_time_limit=3600) @celery.task(soft_time_limit=3600)
@ -662,8 +772,8 @@ def check_revoked():
metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function})
return return
red.set(f'{function}.last_success', int(time.time())) metrics.send(f"{function}.success", "counter", 1)
metrics.send(f"{function}.success", 'counter', 1) return log_data
@celery.task(soft_time_limit=3600) @celery.task(soft_time_limit=3600)
@ -690,7 +800,9 @@ def notify_expirations():
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
try: try:
cli_notification.expirations(current_app.config.get("EXCLUDE_CN_FROM_NOTIFICATION", [])) cli_notification.expirations(
current_app.config.get("EXCLUDE_CN_FROM_NOTIFICATION", [])
)
except SoftTimeLimitExceeded: except SoftTimeLimitExceeded:
log_data["message"] = "Notify expiring Time limit exceeded." log_data["message"] = "Notify expiring Time limit exceeded."
current_app.logger.error(log_data) current_app.logger.error(log_data)
@ -698,5 +810,5 @@ def notify_expirations():
metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function})
return return
red.set(f'{function}.last_success', int(time.time())) metrics.send(f"{function}.success", "counter", 1)
metrics.send(f"{function}.success", 'counter', 1) return log_data