Merge pull request #2837 from hosseinsh/moving-cronjobs-to-celery-v2

moving all cron jobs to become celery jobs
This commit is contained in:
Hossein Shafagh 2019-07-31 14:11:25 -07:00 committed by GitHub
commit bbc3bf513d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 140 additions and 11 deletions

View File

@ -26,6 +26,11 @@ from lemur.pending_certificates import service as pending_certificate_service
from lemur.plugins.base import plugins from lemur.plugins.base import plugins
from lemur.sources.cli import clean, sync, validate_sources from lemur.sources.cli import clean, sync, validate_sources
from lemur.sources.service import add_aws_destination_to_sources from lemur.sources.service import add_aws_destination_to_sources
from lemur.certificates import cli as cli_certificate
from lemur.dns_providers import cli as cli_dns_providers
from lemur.notifications import cli as cli_notification
from lemur.endpoints import cli as cli_endpoints
if current_app: if current_app:
flask_app = current_app flask_app = current_app
@ -263,11 +268,14 @@ def clean_all_sources():
be ran periodically. This function triggers one celery task per source. be ran periodically. This function triggers one celery task per source.
""" """
function = f"{__name__}.{sys._getframe().f_code.co_name}" function = f"{__name__}.{sys._getframe().f_code.co_name}"
log_data = {
"function": function,
"message": "Creating celery task to clean source",
}
sources = validate_sources("all") sources = validate_sources("all")
for source in sources: for source in sources:
current_app.logger.debug( log_data["source"] = source.label
"Creating celery task to clean source {}".format(source.label) current_app.logger.debug(log_data)
)
clean_source.delay(source.label) clean_source.delay(source.label)
red.set(f'{function}.last_success', int(time.time())) red.set(f'{function}.last_success', int(time.time()))
@ -283,7 +291,13 @@ def clean_source(source):
:param source: :param source:
:return: :return:
""" """
current_app.logger.debug("Cleaning source {}".format(source)) function = f"{__name__}.{sys._getframe().f_code.co_name}"
log_data = {
"function": function,
"message": "Cleaning source",
"source": source,
}
current_app.logger.debug(log_data)
clean([source], True) clean([source], True)
@ -293,11 +307,14 @@ def sync_all_sources():
This function will sync certificates from all sources. This function triggers one celery task per source. This function will sync certificates from all sources. This function triggers one celery task per source.
""" """
function = f"{__name__}.{sys._getframe().f_code.co_name}" function = f"{__name__}.{sys._getframe().f_code.co_name}"
log_data = {
"function": function,
"message": "creating celery task to sync source",
}
sources = validate_sources("all") sources = validate_sources("all")
for source in sources: for source in sources:
current_app.logger.debug( log_data["source"] = source.label
"Creating celery task to sync source {}".format(source.label) current_app.logger.debug(log_data)
)
sync_source.delay(source.label) sync_source.delay(source.label)
red.set(f'{function}.last_success', int(time.time())) red.set(f'{function}.last_success', int(time.time()))
@ -356,13 +373,125 @@ def sync_source_destination():
The destination sync_as_source_name reveals the name of the suitable source-plugin. The destination sync_as_source_name reveals the name of the suitable source-plugin.
We rely on account numbers to avoid duplicates. We rely on account numbers to avoid duplicates.
""" """
current_app.logger.debug("Syncing AWS destinations and sources")
function = f"{__name__}.{sys._getframe().f_code.co_name}" function = f"{__name__}.{sys._getframe().f_code.co_name}"
log_data = {
"function": function,
"message": "syncing AWS destinations and sources",
}
current_app.logger.debug(log_data)
for dst in destinations_service.get_all(): for dst in destinations_service.get_all():
if add_aws_destination_to_sources(dst): if add_aws_destination_to_sources(dst):
current_app.logger.debug("Source: %s added", dst.label) log_data["message"] = "new source added"
log_data["source"] = dst.label
current_app.logger.debug(log_data)
current_app.logger.debug("Completed Syncing AWS destinations and sources") log_data["message"] = "completed Syncing AWS destinations and sources"
current_app.logger.debug(log_data)
red.set(f'{function}.last_success', int(time.time()))
metrics.send(f"{function}.success", 'counter', 1)
@celery.task()
def certificate_reissue():
"""
This celery task reissues certificates which are pending reissue
:return:
"""
function = f"{__name__}.{sys._getframe().f_code.co_name}"
log_data = {
"function": function,
"message": "reissuing certificates",
}
current_app.logger.debug(log_data)
cli_certificate.reissue(None, True)
log_data["message"] = "reissuance completed"
current_app.logger.debug(log_data)
red.set(f'{function}.last_success', int(time.time()))
metrics.send(f"{function}.success", 'counter', 1)
@celery.task()
def certificate_rotate():
"""
This celery task rotates certificates which are reissued but having endpoints attached to the replaced cert
:return:
"""
function = f"{__name__}.{sys._getframe().f_code.co_name}"
log_data = {
"function": function,
"message": "rotating certificates",
}
current_app.logger.debug(log_data)
cli_certificate.rotate(None, None, None, None, True)
log_data["message"] = "rotation completed"
current_app.logger.debug(log_data)
red.set(f'{function}.last_success', int(time.time()))
metrics.send(f"{function}.success", 'counter', 1)
@celery.task()
def endpoints_expire():
"""
This celery task removes all endpoints that have not been recently updated
:return:
"""
function = f"{__name__}.{sys._getframe().f_code.co_name}"
log_data = {
"function": function,
"message": "endpoints expire",
}
current_app.logger.debug(log_data)
cli_endpoints.expire(2) # Time in hours
red.set(f'{function}.last_success', int(time.time()))
metrics.send(f"{function}.success", 'counter', 1)
@celery.task()
def get_all_zones():
"""
This celery syncs all zones from the available dns providers
:return:
"""
function = f"{__name__}.{sys._getframe().f_code.co_name}"
log_data = {
"function": function,
"message": "refresh all zones from available DNS providers",
}
current_app.logger.debug(log_data)
cli_dns_providers.get_all_zones()
red.set(f'{function}.last_success', int(time.time()))
metrics.send(f"{function}.success", 'counter', 1)
@celery.task()
def check_revoked():
"""
This celery task attempts to check if any certs are expired
:return:
"""
function = f"{__name__}.{sys._getframe().f_code.co_name}"
log_data = {
"function": function,
"message": "check if any certificates are revoked revoked",
}
current_app.logger.debug(log_data)
cli_certificate.check_revoked()
red.set(f'{function}.last_success', int(time.time()))
metrics.send(f"{function}.success", 'counter', 1)
@celery.task()
def notify_expirations():
"""
This celery task notifies about expiring certs
:return:
"""
function = f"{__name__}.{sys._getframe().f_code.co_name}"
log_data = {
"function": function,
"message": "notify for cert expiration",
}
current_app.logger.debug(log_data)
cli_notification.expirations(current_app.config.get("EXCLUDE_CN_FROM_NOTIFICATION", []))
red.set(f'{function}.last_success', int(time.time())) red.set(f'{function}.last_success', int(time.time()))
metrics.send(f"{function}.success", 'counter', 1) metrics.send(f"{function}.success", 'counter', 1)