Making CLI command ; Running black

This commit is contained in:
Curtis Castrapel 2020-04-28 12:16:46 -07:00
parent 273c3e2793
commit 863af7a3e5
3 changed files with 43 additions and 30 deletions

View File

@ -5,29 +5,18 @@
:license: Apache, see LICENSE for more details.
.. moduleauthor:: Kevin Glisson <kglisson@netflix.com>
"""
import sys
import multiprocessing
from tabulate import tabulate
from sqlalchemy import or_
import sys
from flask import current_app
from flask_script import Manager
from flask_principal import Identity, identity_changed
from flask_script import Manager
from sqlalchemy import or_
from tabulate import tabulate
from lemur import database
from lemur.extensions import sentry
from lemur.extensions import metrics
from lemur.plugins.base import plugins
from lemur.constants import SUCCESS_METRIC_STATUS, FAILURE_METRIC_STATUS
from lemur.deployment import service as deployment_service
from lemur.endpoints import service as endpoint_service
from lemur.notifications.messaging import send_rotation_notification
from lemur.domains.models import Domain
from lemur.authorities.models import Authority
from lemur.certificates.schemas import CertificateOutputSchema
from lemur.certificates.models import Certificate
from lemur.certificates.schemas import CertificateOutputSchema
from lemur.certificates.service import (
reissue_certificate,
get_certificate_primitives,
@ -35,9 +24,16 @@ from lemur.certificates.service import (
get_by_name,
get_all_certs,
get,
get_all_certs_attached_to_endpoint_without_autorotate,
)
from lemur.certificates.verify import verify_string
from lemur.constants import SUCCESS_METRIC_STATUS, FAILURE_METRIC_STATUS
from lemur.deployment import service as deployment_service
from lemur.domains.models import Domain
from lemur.endpoints import service as endpoint_service
from lemur.extensions import sentry, metrics
from lemur.notifications.messaging import send_rotation_notification
from lemur.plugins.base import plugins
manager = Manager(usage="Handles all certificate related tasks.")
@ -482,3 +478,23 @@ def check_revoked():
cert.status = "unknown"
database.update(cert)
@manager.command
def automatically_enable_autorotate():
"""
This function automatically enables autorotation for unexpired certificates that are
attached to an endpoint but do not have autorotate enabled.
"""
log_data = {
"function": f"{__name__}.{sys._getframe().f_code.co_name}",
}
eligible_certs = get_all_certs_attached_to_endpoint_without_autorotate()
for cert in eligible_certs:
log_data["certificate"] = cert.name
log_data["certificate_id"] = cert.id
log_data["message"] = "Enabling auto-rotate for certificate"
current_app.logger.info(log_data)
cert.rotation = True
database.update(cert)

View File

@ -118,7 +118,7 @@ def get_all_pending_cleaning_expired(source):
)
def get_all_certs_attached_to_endpoint_without_rotate():
def get_all_certs_attached_to_endpoint_without_autorotate():
"""
Retrieves all certificates that are attached to an endpoint, but that do not have autorotate enabled.

View File

@ -17,10 +17,8 @@ from celery.signals import task_failure, task_received, task_revoked, task_succe
from datetime import datetime, timezone, timedelta
from flask import current_app
from lemur import database
from lemur.authorities.service import get as get_authority
from lemur.certificates import cli as cli_certificate
from lemur.certificates.service import get_all_certs_attached_to_endpoint_without_rotate
from lemur.common.redis import RedisHandler
from lemur.destinations import service as destinations_service
from lemur.dns_providers import cli as cli_dns_providers
@ -818,21 +816,20 @@ def notify_expirations():
@celery.task(soft_time_limit=3600)
def enable_autorotate_for_certs_attached_to_endpoint():
function = f"{__name__}.{sys._getframe().f_code.co_name}"
"""
This celery task automatically enables autorotation for unexpired certificates that are
attached to an endpoint but do not have autorotate enabled.
:return:
"""
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
log_data = {
"function": function,
"function": f"{__name__}.{sys._getframe().f_code.co_name}",
"task_id": task_id,
"message": "Enabling autorotate to eligible certificates",
}
current_app.logger.debug(log_data)
eligible_certs = get_all_certs_attached_to_endpoint_without_rotate()
for cert in eligible_certs:
log_data["certificate"] = cert.name
log_data["certificate_id"] = cert.id
log_data["message"] = "Enabling auto-rotate for certificate"
current_app.logger.info(log_data)
cert.rotation = True
database.update(cert)
cli_certificate.automatically_enable_autorotate()