Merge branch 'master' into up-dependencies-20Sep2019

This commit is contained in:
Hossein Shafagh
2019-09-20 15:19:25 -07:00
committed by GitHub
18 changed files with 1156 additions and 54 deletions

View File

@ -9,6 +9,7 @@ command: celery -A lemur.common.celery worker --loglevel=info -l DEBUG -B
"""
import copy
import sys
import time
from datetime import datetime, timezone, timedelta
from celery import Celery
@ -16,6 +17,7 @@ from celery.exceptions import SoftTimeLimitExceeded
from flask import current_app
from lemur.authorities.service import get as get_authority
from lemur.common.redis import RedisHandler
from lemur.destinations import service as destinations_service
from lemur.extensions import metrics, sentry
from lemur.factory import create_app
@ -24,12 +26,19 @@ from lemur.pending_certificates import service as pending_certificate_service
from lemur.plugins.base import plugins
from lemur.sources.cli import clean, sync, validate_sources
from lemur.sources.service import add_aws_destination_to_sources
from lemur.certificates import cli as cli_certificate
from lemur.dns_providers import cli as cli_dns_providers
from lemur.notifications import cli as cli_notification
from lemur.endpoints import cli as cli_endpoints
if current_app:
flask_app = current_app
else:
flask_app = create_app()
red = RedisHandler().redis()
def make_celery(app):
celery = Celery(
@ -57,6 +66,9 @@ celery = make_celery(flask_app)
def is_task_active(fun, task_id, args):
from celery.task.control import inspect
if not args:
args = '()' # empty args
i = inspect()
active_tasks = i.active()
for _, tasks in active_tasks.items():
@ -68,6 +80,45 @@ def is_task_active(fun, task_id, args):
return False
@celery.task()
def report_celery_last_success_metrics():
"""
For each celery task, this will determine the number of seconds since it has last been successful.
Celery tasks should be emitting redis stats with a deterministic key (In our case, `f"{task}.last_success"`.
report_celery_last_success_metrics should be ran periodically to emit metrics on when a task was last successful.
Admins can then alert when tasks are not ran when intended. Admins should also alert when no metrics are emitted
from this function.
"""
function = f"{__name__}.{sys._getframe().f_code.co_name}"
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
log_data = {
"function": function,
"message": "recurrent task",
"task_id": task_id,
}
if task_id and is_task_active(function, task_id, None):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
current_time = int(time.time())
schedule = current_app.config.get('CELERYBEAT_SCHEDULE')
for _, t in schedule.items():
task = t.get("task")
last_success = int(red.get(f"{task}.last_success") or 0)
metrics.send(f"{task}.time_since_last_success", 'gauge', current_time - last_success)
red.set(
f"{function}.last_success", int(time.time())
) # Alert if this metric is not seen
metrics.send(f"{function}.success", 'counter', 1)
@celery.task(soft_time_limit=600)
def fetch_acme_cert(id):
"""
@ -80,8 +131,9 @@ def fetch_acme_cert(id):
if celery.current_task:
task_id = celery.current_task.request.id
function = f"{__name__}.{sys._getframe().f_code.co_name}"
log_data = {
"function": "{}.{}".format(__name__, sys._getframe().f_code.co_name),
"function": function,
"message": "Resolving pending certificate {}".format(id),
"task_id": task_id,
"id": id,
@ -165,24 +217,39 @@ def fetch_acme_cert(id):
log_data["failed"] = failed
log_data["wrong_issuer"] = wrong_issuer
current_app.logger.debug(log_data)
metrics.send(f"{function}.resolved", 'gauge', new)
metrics.send(f"{function}.failed", 'gauge', failed)
metrics.send(f"{function}.wrong_issuer", 'gauge', wrong_issuer)
print(
"[+] Certificates: New: {new} Failed: {failed} Not using ACME: {wrong_issuer}".format(
new=new, failed=failed, wrong_issuer=wrong_issuer
)
)
red.set(f'{function}.last_success', int(time.time()))
@celery.task()
def fetch_all_pending_acme_certs():
"""Instantiate celery workers to resolve all pending Acme certificates"""
pending_certs = pending_certificate_service.get_unresolved_pending_certs()
function = f"{__name__}.{sys._getframe().f_code.co_name}"
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
log_data = {
"function": "{}.{}".format(__name__, sys._getframe().f_code.co_name),
"function": function,
"message": "Starting job.",
"task_id": task_id,
}
if task_id and is_task_active(function, task_id, None):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
current_app.logger.debug(log_data)
pending_certs = pending_certificate_service.get_unresolved_pending_certs()
# We only care about certs using the acme-issuer plugin
for cert in pending_certs:
@ -195,11 +262,29 @@ def fetch_all_pending_acme_certs():
current_app.logger.debug(log_data)
fetch_acme_cert.delay(cert.id)
red.set(f'{function}.last_success', int(time.time()))
metrics.send(f"{function}.success", 'counter', 1)
@celery.task()
def remove_old_acme_certs():
"""Prune old pending acme certificates from the database"""
log_data = {"function": "{}.{}".format(__name__, sys._getframe().f_code.co_name)}
function = f"{__name__}.{sys._getframe().f_code.co_name}"
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
log_data = {
"function": function,
"message": "Starting job.",
"task_id": task_id,
}
if task_id and is_task_active(function, task_id, None):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
pending_certs = pending_certificate_service.get_pending_certs("all")
# Delete pending certs more than a week old
@ -211,6 +296,9 @@ def remove_old_acme_certs():
current_app.logger.debug(log_data)
pending_certificate_service.delete(cert)
red.set(f'{function}.last_success', int(time.time()))
metrics.send(f"{function}.success", 'counter', 1)
@celery.task()
def clean_all_sources():
@ -218,15 +306,33 @@ def clean_all_sources():
This function will clean unused certificates from sources. This is a destructive operation and should only
be ran periodically. This function triggers one celery task per source.
"""
function = f"{__name__}.{sys._getframe().f_code.co_name}"
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
log_data = {
"function": function,
"message": "Creating celery task to clean source",
"task_id": task_id,
}
if task_id and is_task_active(function, task_id, None):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
sources = validate_sources("all")
for source in sources:
current_app.logger.debug(
"Creating celery task to clean source {}".format(source.label)
)
log_data["source"] = source.label
current_app.logger.debug(log_data)
clean_source.delay(source.label)
red.set(f'{function}.last_success', int(time.time()))
metrics.send(f"{function}.success", 'counter', 1)
@celery.task()
@celery.task(soft_time_limit=600)
def clean_source(source):
"""
This celery task will clean the specified source. This is a destructive operation that will delete unused
@ -235,8 +341,31 @@ def clean_source(source):
:param source:
:return:
"""
current_app.logger.debug("Cleaning source {}".format(source))
clean([source], True)
function = f"{__name__}.{sys._getframe().f_code.co_name}"
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
log_data = {
"function": function,
"message": "Cleaning source",
"source": source,
"task_id": task_id,
}
if task_id and is_task_active(function, task_id, (source,)):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
current_app.logger.debug(log_data)
try:
clean([source], True)
except SoftTimeLimitExceeded:
log_data["message"] = "Clean source: Time limit exceeded."
current_app.logger.error(log_data)
sentry.captureException()
metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function})
@celery.task()
@ -244,13 +373,31 @@ def sync_all_sources():
"""
This function will sync certificates from all sources. This function triggers one celery task per source.
"""
function = f"{__name__}.{sys._getframe().f_code.co_name}"
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
log_data = {
"function": function,
"message": "creating celery task to sync source",
"task_id": task_id,
}
if task_id and is_task_active(function, task_id, None):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
sources = validate_sources("all")
for source in sources:
current_app.logger.debug(
"Creating celery task to sync source {}".format(source.label)
)
log_data["source"] = source.label
current_app.logger.debug(log_data)
sync_source.delay(source.label)
red.set(f'{function}.last_success', int(time.time()))
metrics.send(f"{function}.success", 'counter', 1)
@celery.task(soft_time_limit=7200)
def sync_source(source):
@ -261,35 +408,39 @@ def sync_source(source):
:return:
"""
function = "{}.{}".format(__name__, sys._getframe().f_code.co_name)
function = f"{__name__}.{sys._getframe().f_code.co_name}"
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
log_data = {
"function": function,
"message": "Syncing source",
"source": source,
"task_id": task_id,
}
current_app.logger.debug(log_data)
if task_id and is_task_active(function, task_id, (source,)):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
current_app.logger.debug(log_data)
try:
sync([source])
metrics.send(f"{function}.success", 'counter', 1, metric_tags={"source": source})
except SoftTimeLimitExceeded:
log_data["message"] = "Error syncing source: Time limit exceeded."
current_app.logger.error(log_data)
sentry.captureException()
metrics.send(
"sync_source_timeout", "counter", 1, metric_tags={"source": source}
)
metrics.send("sync_source_timeout", "counter", 1, metric_tags={"source": source})
metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function})
return
log_data["message"] = "Done syncing source"
current_app.logger.debug(log_data)
metrics.send(f"{function}.success", 'counter', 1, metric_tags={"source": source})
red.set(f'{function}.last_success', int(time.time()))
@celery.task()
@ -301,10 +452,251 @@ def sync_source_destination():
The destination sync_as_source_name reveals the name of the suitable source-plugin.
We rely on account numbers to avoid duplicates.
"""
current_app.logger.debug("Syncing AWS destinations and sources")
function = f"{__name__}.{sys._getframe().f_code.co_name}"
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
log_data = {
"function": function,
"message": "syncing AWS destinations and sources",
"task_id": task_id,
}
if task_id and is_task_active(function, task_id, None):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
current_app.logger.debug(log_data)
for dst in destinations_service.get_all():
if add_aws_destination_to_sources(dst):
current_app.logger.debug("Source: %s added", dst.label)
log_data["message"] = "new source added"
log_data["source"] = dst.label
current_app.logger.debug(log_data)
current_app.logger.debug("Completed Syncing AWS destinations and sources")
log_data["message"] = "completed Syncing AWS destinations and sources"
current_app.logger.debug(log_data)
red.set(f'{function}.last_success', int(time.time()))
metrics.send(f"{function}.success", 'counter', 1)
@celery.task(soft_time_limit=3600)
def certificate_reissue():
"""
This celery task reissues certificates which are pending reissue
:return:
"""
function = f"{__name__}.{sys._getframe().f_code.co_name}"
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
log_data = {
"function": function,
"message": "reissuing certificates",
"task_id": task_id,
}
if task_id and is_task_active(function, task_id, None):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
current_app.logger.debug(log_data)
try:
cli_certificate.reissue(None, True)
except SoftTimeLimitExceeded:
log_data["message"] = "Certificate reissue: Time limit exceeded."
current_app.logger.error(log_data)
sentry.captureException()
metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function})
return
log_data["message"] = "reissuance completed"
current_app.logger.debug(log_data)
red.set(f'{function}.last_success', int(time.time()))
metrics.send(f"{function}.success", 'counter', 1)
@celery.task(soft_time_limit=3600)
def certificate_rotate():
"""
This celery task rotates certificates which are reissued but having endpoints attached to the replaced cert
:return:
"""
function = f"{__name__}.{sys._getframe().f_code.co_name}"
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
log_data = {
"function": function,
"message": "rotating certificates",
"task_id": task_id,
}
if task_id and is_task_active(function, task_id, None):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
current_app.logger.debug(log_data)
try:
cli_certificate.rotate(None, None, None, None, True)
except SoftTimeLimitExceeded:
log_data["message"] = "Certificate rotate: Time limit exceeded."
current_app.logger.error(log_data)
sentry.captureException()
metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function})
return
log_data["message"] = "rotation completed"
current_app.logger.debug(log_data)
red.set(f'{function}.last_success', int(time.time()))
metrics.send(f"{function}.success", 'counter', 1)
@celery.task(soft_time_limit=3600)
def endpoints_expire():
"""
This celery task removes all endpoints that have not been recently updated
:return:
"""
function = f"{__name__}.{sys._getframe().f_code.co_name}"
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
log_data = {
"function": function,
"message": "endpoints expire",
"task_id": task_id,
}
if task_id and is_task_active(function, task_id, None):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
current_app.logger.debug(log_data)
try:
cli_endpoints.expire(2) # Time in hours
except SoftTimeLimitExceeded:
log_data["message"] = "endpoint expire: Time limit exceeded."
current_app.logger.error(log_data)
sentry.captureException()
metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function})
return
red.set(f'{function}.last_success', int(time.time()))
metrics.send(f"{function}.success", 'counter', 1)
@celery.task(soft_time_limit=600)
def get_all_zones():
"""
This celery syncs all zones from the available dns providers
:return:
"""
function = f"{__name__}.{sys._getframe().f_code.co_name}"
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
log_data = {
"function": function,
"message": "refresh all zones from available DNS providers",
"task_id": task_id,
}
if task_id and is_task_active(function, task_id, None):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
current_app.logger.debug(log_data)
try:
cli_dns_providers.get_all_zones()
except SoftTimeLimitExceeded:
log_data["message"] = "get all zones: Time limit exceeded."
current_app.logger.error(log_data)
sentry.captureException()
metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function})
return
red.set(f'{function}.last_success', int(time.time()))
metrics.send(f"{function}.success", 'counter', 1)
@celery.task(soft_time_limit=3600)
def check_revoked():
"""
This celery task attempts to check if any certs are expired
:return:
"""
function = f"{__name__}.{sys._getframe().f_code.co_name}"
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
log_data = {
"function": function,
"message": "check if any certificates are revoked revoked",
"task_id": task_id,
}
if task_id and is_task_active(function, task_id, None):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
current_app.logger.debug(log_data)
try:
cli_certificate.check_revoked()
except SoftTimeLimitExceeded:
log_data["message"] = "Checking revoked: Time limit exceeded."
current_app.logger.error(log_data)
sentry.captureException()
metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function})
return
red.set(f'{function}.last_success', int(time.time()))
metrics.send(f"{function}.success", 'counter', 1)
@celery.task(soft_time_limit=3600)
def notify_expirations():
"""
This celery task notifies about expiring certs
:return:
"""
function = f"{__name__}.{sys._getframe().f_code.co_name}"
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
log_data = {
"function": function,
"message": "notify for cert expiration",
"task_id": task_id,
}
if task_id and is_task_active(function, task_id, None):
log_data["message"] = "Skipping task: Task is already active"
current_app.logger.debug(log_data)
return
current_app.logger.debug(log_data)
try:
cli_notification.expirations(current_app.config.get("EXCLUDE_CN_FROM_NOTIFICATION", []))
except SoftTimeLimitExceeded:
log_data["message"] = "Notify expiring Time limit exceeded."
current_app.logger.error(log_data)
sentry.captureException()
metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function})
return
red.set(f'{function}.last_success', int(time.time()))
metrics.send(f"{function}.success", 'counter', 1)

52
lemur/common/redis.py Normal file
View File

@ -0,0 +1,52 @@
"""
Helper Class for Redis
"""
import redis
import sys
from flask import current_app
from lemur.extensions import sentry
from lemur.factory import create_app
if current_app:
flask_app = current_app
else:
flask_app = create_app()
class RedisHandler:
def __init__(self, host=flask_app.config.get('REDIS_HOST', 'localhost'),
port=flask_app.config.get('REDIS_PORT', 6379),
db=flask_app.config.get('REDIS_DB', 0)):
self.host = host
self.port = port
self.db = db
def redis(self, db=0):
# The decode_responses flag here directs the client to convert the responses from Redis into Python strings
# using the default encoding utf-8. This is client specific.
function = f"{__name__}.{sys._getframe().f_code.co_name}"
try:
red = redis.StrictRedis(host=self.host, port=self.port, db=self.db, encoding="utf-8", decode_responses=True)
red.set("test", 0)
except redis.ConnectionError:
log_data = {
"function": function,
"message": "Redis Connection error",
"host": self.host,
"port": self.port
}
current_app.logger.error(log_data)
sentry.captureException()
return red
def redis_get(key, default=None):
red = RedisHandler().redis()
try:
v = red.get(key)
except redis.exceptions.ConnectionError:
v = None
if not v:
return default
return v