""" This module controls defines celery tasks and their applicable schedules. The celery beat server and workers will start when invoked. When ran in development mode (LEMUR_CONFIG= ACME_ADDITIONAL_ATTEMPTS: error_log["message"] = "Deleting pending certificate" send_pending_failure_notification( pending_cert, notify_owner=pending_cert.notify ) # Mark the pending cert as resolved pending_certificate_service.update( cert.get("pending_cert").id, resolved=True ) else: pending_certificate_service.increment_attempt(pending_cert) pending_certificate_service.update( cert.get("pending_cert").id, status=str(cert.get("last_error")) ) # Add failed pending cert task back to queue fetch_acme_cert.delay(id) current_app.logger.error(error_log) log_data["message"] = "Complete" log_data["new"] = new log_data["failed"] = failed log_data["wrong_issuer"] = wrong_issuer current_app.logger.debug(log_data) metrics.send(f"{function}.resolved", "gauge", new) metrics.send(f"{function}.failed", "gauge", failed) metrics.send(f"{function}.wrong_issuer", "gauge", wrong_issuer) print( "[+] Certificates: New: {new} Failed: {failed} Not using ACME: {wrong_issuer}".format( new=new, failed=failed, wrong_issuer=wrong_issuer ) ) return log_data @celery.task() def fetch_all_pending_acme_certs(): """Instantiate celery workers to resolve all pending Acme certificates""" function = f"{__name__}.{sys._getframe().f_code.co_name}" task_id = None if celery.current_task: task_id = celery.current_task.request.id log_data = { "function": function, "message": "Starting job.", "task_id": task_id, } if task_id and is_task_active(function, task_id, None): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return current_app.logger.debug(log_data) pending_certs = pending_certificate_service.get_unresolved_pending_certs() # We only care about certs using the acme-issuer plugin for cert in pending_certs: cert_authority = get_authority(cert.authority_id) if cert_authority.plugin_name == "acme-issuer": if datetime.now(timezone.utc) - cert.last_updated > timedelta(minutes=5): log_data["message"] = "Triggering job for cert {}".format(cert.name) log_data["cert_name"] = cert.name log_data["cert_id"] = cert.id current_app.logger.debug(log_data) fetch_acme_cert.delay(cert.id) metrics.send(f"{function}.success", "counter", 1) return log_data @celery.task() def remove_old_acme_certs(): """Prune old pending acme certificates from the database""" function = f"{__name__}.{sys._getframe().f_code.co_name}" task_id = None if celery.current_task: task_id = celery.current_task.request.id log_data = { "function": function, "message": "Starting job.", "task_id": task_id, } if task_id and is_task_active(function, task_id, None): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return pending_certs = pending_certificate_service.get_pending_certs("all") # Delete pending certs more than a week old for cert in pending_certs: if datetime.now(timezone.utc) - cert.last_updated > timedelta(days=7): log_data["pending_cert_id"] = cert.id log_data["pending_cert_name"] = cert.name log_data["message"] = "Deleting pending certificate" current_app.logger.debug(log_data) pending_certificate_service.delete(cert) metrics.send(f"{function}.success", "counter", 1) return log_data @celery.task() def clean_all_sources(): """ This function will clean unused certificates from sources. This is a destructive operation and should only be ran periodically. This function triggers one celery task per source. """ function = f"{__name__}.{sys._getframe().f_code.co_name}" task_id = None if celery.current_task: task_id = celery.current_task.request.id log_data = { "function": function, "message": "Creating celery task to clean source", "task_id": task_id, } if task_id and is_task_active(function, task_id, None): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return sources = validate_sources("all") for source in sources: log_data["source"] = source.label current_app.logger.debug(log_data) clean_source.delay(source.label) metrics.send(f"{function}.success", "counter", 1) return log_data @celery.task(soft_time_limit=3600) def clean_source(source): """ This celery task will clean the specified source. This is a destructive operation that will delete unused certificates from each source. :param source: :return: """ function = f"{__name__}.{sys._getframe().f_code.co_name}" task_id = None if celery.current_task: task_id = celery.current_task.request.id log_data = { "function": function, "message": "Cleaning source", "source": source, "task_id": task_id, } if task_id and is_task_active(function, task_id, (source,)): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return current_app.logger.debug(log_data) try: clean([source], True) except SoftTimeLimitExceeded: log_data["message"] = "Clean source: Time limit exceeded." current_app.logger.error(log_data) sentry.captureException() metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) return log_data @celery.task() def sync_all_sources(): """ This function will sync certificates from all sources. This function triggers one celery task per source. """ function = f"{__name__}.{sys._getframe().f_code.co_name}" task_id = None if celery.current_task: task_id = celery.current_task.request.id log_data = { "function": function, "message": "creating celery task to sync source", "task_id": task_id, } if task_id and is_task_active(function, task_id, None): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return sources = validate_sources("all") for source in sources: log_data["source"] = source.label current_app.logger.debug(log_data) sync_source.delay(source.label) metrics.send(f"{function}.success", "counter", 1) return log_data @celery.task(soft_time_limit=7200) def sync_source(source): """ This celery task will sync the specified source. :param source: :return: """ function = f"{__name__}.{sys._getframe().f_code.co_name}" task_id = None if celery.current_task: task_id = celery.current_task.request.id log_data = { "function": function, "message": "Syncing source", "source": source, "task_id": task_id, } if task_id and is_task_active(function, task_id, (source,)): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return current_app.logger.debug(log_data) try: sync([source]) metrics.send( f"{function}.success", "counter", 1, metric_tags={"source": source} ) except SoftTimeLimitExceeded: log_data["message"] = "Error syncing source: Time limit exceeded." current_app.logger.error(log_data) sentry.captureException() metrics.send( "sync_source_timeout", "counter", 1, metric_tags={"source": source} ) metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) return log_data["message"] = "Done syncing source" current_app.logger.debug(log_data) metrics.send(f"{function}.success", "counter", 1, metric_tags={"source": source}) return log_data @celery.task() def sync_source_destination(): """ This celery task will sync destination and source, to make sure all new destinations are also present as source. Some destinations do not qualify as sources, and hence should be excluded from being added as sources We identify qualified destinations based on the sync_as_source attributed of the plugin. The destination sync_as_source_name reveals the name of the suitable source-plugin. We rely on account numbers to avoid duplicates. """ function = f"{__name__}.{sys._getframe().f_code.co_name}" task_id = None if celery.current_task: task_id = celery.current_task.request.id log_data = { "function": function, "message": "syncing AWS destinations and sources", "task_id": task_id, } if task_id and is_task_active(function, task_id, None): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return current_app.logger.debug(log_data) for dst in destinations_service.get_all(): if add_aws_destination_to_sources(dst): log_data["message"] = "new source added" log_data["source"] = dst.label current_app.logger.debug(log_data) log_data["message"] = "completed Syncing AWS destinations and sources" current_app.logger.debug(log_data) metrics.send(f"{function}.success", "counter", 1) return log_data @celery.task(soft_time_limit=3600) def certificate_reissue(): """ This celery task reissues certificates which are pending reissue :return: """ function = f"{__name__}.{sys._getframe().f_code.co_name}" task_id = None if celery.current_task: task_id = celery.current_task.request.id log_data = { "function": function, "message": "reissuing certificates", "task_id": task_id, } if task_id and is_task_active(function, task_id, None): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return current_app.logger.debug(log_data) try: cli_certificate.reissue(None, True) except SoftTimeLimitExceeded: log_data["message"] = "Certificate reissue: Time limit exceeded." current_app.logger.error(log_data) sentry.captureException() metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) return log_data["message"] = "reissuance completed" current_app.logger.debug(log_data) metrics.send(f"{function}.success", "counter", 1) return log_data @celery.task(soft_time_limit=3600) def certificate_rotate(**kwargs): """ This celery task rotates certificates which are reissued but having endpoints attached to the replaced cert :return: """ function = f"{__name__}.{sys._getframe().f_code.co_name}" task_id = None if celery.current_task: task_id = celery.current_task.request.id region = kwargs.get("region") log_data = { "function": function, "message": "rotating certificates", "task_id": task_id, } if task_id and is_task_active(function, task_id, None): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return current_app.logger.debug(log_data) try: notify = current_app.config.get("ENABLE_ROTATION_NOTIFICATION", None) if region: log_data["region"] = region cli_certificate.rotate_region(None, None, None, notify, True, region) else: cli_certificate.rotate(None, None, None, notify, True) except SoftTimeLimitExceeded: log_data["message"] = "Certificate rotate: Time limit exceeded." current_app.logger.error(log_data) sentry.captureException() metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) return log_data["message"] = "rotation completed" current_app.logger.debug(log_data) metrics.send(f"{function}.success", "counter", 1) return log_data @celery.task(soft_time_limit=3600) def endpoints_expire(): """ This celery task removes all endpoints that have not been recently updated :return: """ function = f"{__name__}.{sys._getframe().f_code.co_name}" task_id = None if celery.current_task: task_id = celery.current_task.request.id log_data = { "function": function, "message": "endpoints expire", "task_id": task_id, } if task_id and is_task_active(function, task_id, None): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return current_app.logger.debug(log_data) try: cli_endpoints.expire(2) # Time in hours except SoftTimeLimitExceeded: log_data["message"] = "endpoint expire: Time limit exceeded." current_app.logger.error(log_data) sentry.captureException() metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) return metrics.send(f"{function}.success", "counter", 1) return log_data @celery.task(soft_time_limit=600) def get_all_zones(): """ This celery syncs all zones from the available dns providers :return: """ function = f"{__name__}.{sys._getframe().f_code.co_name}" task_id = None if celery.current_task: task_id = celery.current_task.request.id log_data = { "function": function, "message": "refresh all zones from available DNS providers", "task_id": task_id, } if task_id and is_task_active(function, task_id, None): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return current_app.logger.debug(log_data) try: cli_dns_providers.get_all_zones() except SoftTimeLimitExceeded: log_data["message"] = "get all zones: Time limit exceeded." current_app.logger.error(log_data) sentry.captureException() metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) return metrics.send(f"{function}.success", "counter", 1) return log_data @celery.task(soft_time_limit=3600) def check_revoked(): """ This celery task attempts to check if any certs are expired :return: """ function = f"{__name__}.{sys._getframe().f_code.co_name}" task_id = None if celery.current_task: task_id = celery.current_task.request.id log_data = { "function": function, "message": "check if any valid certificate is revoked", "task_id": task_id, } if task_id and is_task_active(function, task_id, None): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return current_app.logger.debug(log_data) try: cli_certificate.check_revoked() except SoftTimeLimitExceeded: log_data["message"] = "Checking revoked: Time limit exceeded." current_app.logger.error(log_data) sentry.captureException() metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) return metrics.send(f"{function}.success", "counter", 1) return log_data @celery.task(soft_time_limit=3600) def notify_expirations(): """ This celery task notifies about expiring certs :return: """ function = f"{__name__}.{sys._getframe().f_code.co_name}" task_id = None if celery.current_task: task_id = celery.current_task.request.id log_data = { "function": function, "message": "notify for cert expiration", "task_id": task_id, } if task_id and is_task_active(function, task_id, None): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return current_app.logger.debug(log_data) try: cli_notification.expirations( current_app.config.get("EXCLUDE_CN_FROM_NOTIFICATION", []) ) except SoftTimeLimitExceeded: log_data["message"] = "Notify expiring Time limit exceeded." current_app.logger.error(log_data) sentry.captureException() metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) return metrics.send(f"{function}.success", "counter", 1) return log_data @celery.task(soft_time_limit=3600) def notify_authority_expirations(): """ This celery task notifies about expiring certificate authority certs :return: """ function = f"{__name__}.{sys._getframe().f_code.co_name}" task_id = None if celery.current_task: task_id = celery.current_task.request.id log_data = { "function": function, "message": "notify for certificate authority cert expiration", "task_id": task_id, } if task_id and is_task_active(function, task_id, None): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return current_app.logger.debug(log_data) try: cli_notification.authority_expirations() except SoftTimeLimitExceeded: log_data["message"] = "Notify expiring CA Time limit exceeded." current_app.logger.error(log_data) sentry.captureException() metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) return metrics.send(f"{function}.success", "counter", 1) return log_data @celery.task(soft_time_limit=3600) def send_security_expiration_summary(): """ This celery task sends a summary about expiring certificates to the security team. :return: """ function = f"{__name__}.{sys._getframe().f_code.co_name}" task_id = None if celery.current_task: task_id = celery.current_task.request.id log_data = { "function": function, "message": "send summary for certificate expiration", "task_id": task_id, } if task_id and is_task_active(function, task_id, None): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return current_app.logger.debug(log_data) try: cli_notification.security_expiration_summary(current_app.config.get("EXCLUDE_CN_FROM_NOTIFICATION", [])) except SoftTimeLimitExceeded: log_data["message"] = "Send summary for expiring certs Time limit exceeded." current_app.logger.error(log_data) sentry.captureException() metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) return metrics.send(f"{function}.success", "counter", 1) return log_data @celery.task(soft_time_limit=3600) def enable_autorotate_for_certs_attached_to_endpoint(): """ This celery task automatically enables autorotation for unexpired certificates that are attached to an endpoint but do not have autorotate enabled. :return: """ function = f"{__name__}.{sys._getframe().f_code.co_name}" task_id = None if celery.current_task: task_id = celery.current_task.request.id log_data = { "function": function, "task_id": task_id, "message": "Enabling autorotate to eligible certificates", } current_app.logger.debug(log_data) cli_certificate.automatically_enable_autorotate() metrics.send(f"{function}.success", "counter", 1) return log_data @celery.task(soft_time_limit=3600) def deactivate_entrust_test_certificates(): """ This celery task attempts to deactivate all not yet deactivated Entrust certificates, and should only run in TEST :return: """ function = f"{__name__}.{sys._getframe().f_code.co_name}" task_id = None if celery.current_task: task_id = celery.current_task.request.id log_data = { "function": function, "message": "deactivate entrust certificates", "task_id": task_id, } if task_id and is_task_active(function, task_id, None): log_data["message"] = "Skipping task: Task is already active" current_app.logger.debug(log_data) return current_app.logger.debug(log_data) try: cli_certificate.deactivate_entrust_certificates() except SoftTimeLimitExceeded: log_data["message"] = "Time limit exceeded." current_app.logger.error(log_data) sentry.captureException() metrics.send("celery.timeout", "counter", 1, metric_tags={"function": function}) return metrics.send(f"{function}.success", "counter", 1) return log_data