unit test
This commit is contained in:
parent
2381d0a4bb
commit
e074a14ee9
|
@ -138,7 +138,6 @@ class Certificate(db.Model):
|
||||||
logs = relationship('Log', backref='certificate')
|
logs = relationship('Log', backref='certificate')
|
||||||
endpoints = relationship('Endpoint', backref='certificate')
|
endpoints = relationship('Endpoint', backref='certificate')
|
||||||
rotation_policy = relationship("RotationPolicy")
|
rotation_policy = relationship("RotationPolicy")
|
||||||
|
|
||||||
sensitive_fields = ('private_key',)
|
sensitive_fields = ('private_key',)
|
||||||
|
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
|
|
|
@ -281,6 +281,7 @@ def create(**kwargs):
|
||||||
# "attribute refresh operation cannot proceed"
|
# "attribute refresh operation cannot proceed"
|
||||||
pending_cert = database.session_query(PendingCertificate).get(cert.id)
|
pending_cert = database.session_query(PendingCertificate).get(cert.id)
|
||||||
from lemur.common.celery import fetch_acme_cert
|
from lemur.common.celery import fetch_acme_cert
|
||||||
|
if not current_app.config.get("ACME_DISABLE_AUTORESOLVE", False):
|
||||||
fetch_acme_cert.delay(pending_cert.id)
|
fetch_acme_cert.delay(pending_cert.id)
|
||||||
|
|
||||||
return cert
|
return cert
|
||||||
|
|
|
@ -25,8 +25,8 @@ flask_app = create_app()
|
||||||
|
|
||||||
|
|
||||||
def make_celery(app):
|
def make_celery(app):
|
||||||
celery = Celery(app.import_name, backend=app.config['CELERY_RESULT_BACKEND'],
|
celery = Celery(app.import_name, backend=app.config.get('CELERY_RESULT_BACKEND'),
|
||||||
broker=app.config['CELERY_BROKER_URL'])
|
broker=app.config.get('CELERY_BROKER_URL'))
|
||||||
celery.conf.update(app.config)
|
celery.conf.update(app.config)
|
||||||
TaskBase = celery.Task
|
TaskBase = celery.Task
|
||||||
|
|
||||||
|
|
|
@ -47,7 +47,7 @@ from lemur.logs.models import Log # noqa
|
||||||
from lemur.endpoints.models import Endpoint # noqa
|
from lemur.endpoints.models import Endpoint # noqa
|
||||||
from lemur.policies.models import RotationPolicy # noqa
|
from lemur.policies.models import RotationPolicy # noqa
|
||||||
from lemur.pending_certificates.models import PendingCertificate # noqa
|
from lemur.pending_certificates.models import PendingCertificate # noqa
|
||||||
|
from lemur.dns_providers.models import DnsProvider # noqa
|
||||||
|
|
||||||
manager = Manager(create_app)
|
manager = Manager(create_app)
|
||||||
manager.add_option('-c', '--config', dest='config')
|
manager.add_option('-c', '--config', dest='config')
|
||||||
|
|
|
@ -333,9 +333,12 @@ class ACMEIssuerPlugin(IssuerPlugin):
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
super(ACMEIssuerPlugin, self).__init__(*args, **kwargs)
|
super(ACMEIssuerPlugin, self).__init__(*args, **kwargs)
|
||||||
self.acme = AcmeHandler()
|
self.acme = None
|
||||||
|
|
||||||
def get_dns_provider(self, type):
|
def get_dns_provider(self, type):
|
||||||
|
if not self.acme:
|
||||||
|
self.acme = AcmeHandler()
|
||||||
|
|
||||||
provider_types = {
|
provider_types = {
|
||||||
'cloudflare': cloudflare,
|
'cloudflare': cloudflare,
|
||||||
'dyn': dyn,
|
'dyn': dyn,
|
||||||
|
@ -347,12 +350,16 @@ class ACMEIssuerPlugin(IssuerPlugin):
|
||||||
return provider
|
return provider
|
||||||
|
|
||||||
def get_all_zones(self, dns_provider):
|
def get_all_zones(self, dns_provider):
|
||||||
|
if not self.acme:
|
||||||
|
self.acme = AcmeHandler()
|
||||||
dns_provider_options = json.loads(dns_provider.credentials)
|
dns_provider_options = json.loads(dns_provider.credentials)
|
||||||
account_number = dns_provider_options.get("account_id")
|
account_number = dns_provider_options.get("account_id")
|
||||||
dns_provider_plugin = self.get_dns_provider(dns_provider.provider_type)
|
dns_provider_plugin = self.get_dns_provider(dns_provider.provider_type)
|
||||||
return dns_provider_plugin.get_zones(account_number=account_number)
|
return dns_provider_plugin.get_zones(account_number=account_number)
|
||||||
|
|
||||||
def get_ordered_certificate(self, pending_cert):
|
def get_ordered_certificate(self, pending_cert):
|
||||||
|
if not self.acme:
|
||||||
|
self.acme = AcmeHandler()
|
||||||
acme_client, registration = self.acme.setup_acme_client(pending_cert.authority)
|
acme_client, registration = self.acme.setup_acme_client(pending_cert.authority)
|
||||||
order_info = authorization_service.get(pending_cert.external_id)
|
order_info = authorization_service.get(pending_cert.external_id)
|
||||||
if pending_cert.dns_provider_id:
|
if pending_cert.dns_provider_id:
|
||||||
|
@ -388,6 +395,8 @@ class ACMEIssuerPlugin(IssuerPlugin):
|
||||||
return cert
|
return cert
|
||||||
|
|
||||||
def get_ordered_certificates(self, pending_certs):
|
def get_ordered_certificates(self, pending_certs):
|
||||||
|
if not self.acme:
|
||||||
|
self.acme = AcmeHandler()
|
||||||
pending = []
|
pending = []
|
||||||
certs = []
|
certs = []
|
||||||
for pending_cert in pending_certs:
|
for pending_cert in pending_certs:
|
||||||
|
@ -470,6 +479,8 @@ class ACMEIssuerPlugin(IssuerPlugin):
|
||||||
:param issuer_options:
|
:param issuer_options:
|
||||||
:return: :raise Exception:
|
:return: :raise Exception:
|
||||||
"""
|
"""
|
||||||
|
if not self.acme:
|
||||||
|
self.acme = AcmeHandler()
|
||||||
authority = issuer_options.get('authority')
|
authority = issuer_options.get('authority')
|
||||||
create_immediately = issuer_options.get('create_immediately', False)
|
create_immediately = issuer_options.get('create_immediately', False)
|
||||||
acme_client, registration = self.acme.setup_acme_client(authority)
|
acme_client, registration = self.acme.setup_acme_client(authority)
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
|
|
||||||
# This is just Python which means you can inherit and tweak settings
|
# This is just Python which means you can inherit and tweak settings
|
||||||
|
|
||||||
import os
|
import os
|
||||||
|
|
||||||
_basedir = os.path.abspath(os.path.dirname(__file__))
|
_basedir = os.path.abspath(os.path.dirname(__file__))
|
||||||
|
|
||||||
THREADS_PER_PAGE = 8
|
THREADS_PER_PAGE = 8
|
||||||
|
@ -78,14 +78,12 @@ DIGICERT_API_KEY = 'api-key'
|
||||||
DIGICERT_ORG_ID = 111111
|
DIGICERT_ORG_ID = 111111
|
||||||
DIGICERT_ROOT = "ROOT"
|
DIGICERT_ROOT = "ROOT"
|
||||||
|
|
||||||
|
|
||||||
VERISIGN_URL = 'http://example.com'
|
VERISIGN_URL = 'http://example.com'
|
||||||
VERISIGN_PEM_PATH = '~/'
|
VERISIGN_PEM_PATH = '~/'
|
||||||
VERISIGN_FIRST_NAME = 'Jim'
|
VERISIGN_FIRST_NAME = 'Jim'
|
||||||
VERISIGN_LAST_NAME = 'Bob'
|
VERISIGN_LAST_NAME = 'Bob'
|
||||||
VERSIGN_EMAIL = 'jim@example.com'
|
VERSIGN_EMAIL = 'jim@example.com'
|
||||||
|
|
||||||
|
|
||||||
ACME_AWS_ACCOUNT_NUMBER = '11111111111'
|
ACME_AWS_ACCOUNT_NUMBER = '11111111111'
|
||||||
|
|
||||||
ACME_PRIVATE_KEY = '''
|
ACME_PRIVATE_KEY = '''
|
||||||
|
@ -180,6 +178,7 @@ ACME_URL = 'https://acme-v01.api.letsencrypt.org'
|
||||||
ACME_EMAIL = 'jim@example.com'
|
ACME_EMAIL = 'jim@example.com'
|
||||||
ACME_TEL = '4088675309'
|
ACME_TEL = '4088675309'
|
||||||
ACME_DIRECTORY_URL = 'https://acme-v01.api.letsencrypt.org'
|
ACME_DIRECTORY_URL = 'https://acme-v01.api.letsencrypt.org'
|
||||||
|
ACME_DISABLE_AUTORESOLVE = True
|
||||||
|
|
||||||
LDAP_AUTH = True
|
LDAP_AUTH = True
|
||||||
LDAP_BIND_URI = 'ldap://localhost'
|
LDAP_BIND_URI = 'ldap://localhost'
|
||||||
|
|
|
@ -2,11 +2,10 @@ import json
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
|
from lemur.pending_certificates.views import * # noqa
|
||||||
from .vectors import CSR_STR, INTERMEDIATE_CERT_STR, VALID_ADMIN_API_TOKEN, VALID_ADMIN_HEADER_TOKEN, \
|
from .vectors import CSR_STR, INTERMEDIATE_CERT_STR, VALID_ADMIN_API_TOKEN, VALID_ADMIN_HEADER_TOKEN, \
|
||||||
VALID_USER_HEADER_TOKEN, WILDCARD_CERT_STR
|
VALID_USER_HEADER_TOKEN, WILDCARD_CERT_STR
|
||||||
|
|
||||||
from lemur.pending_certificates.views import * # noqa
|
|
||||||
|
|
||||||
|
|
||||||
def test_increment_attempt(pending_certificate):
|
def test_increment_attempt(pending_certificate):
|
||||||
from lemur.pending_certificates.service import increment_attempt
|
from lemur.pending_certificates.service import increment_attempt
|
||||||
|
@ -17,7 +16,8 @@ def test_increment_attempt(pending_certificate):
|
||||||
|
|
||||||
def test_create_pending_certificate(async_issuer_plugin, async_authority, user):
|
def test_create_pending_certificate(async_issuer_plugin, async_authority, user):
|
||||||
from lemur.certificates.service import create
|
from lemur.certificates.service import create
|
||||||
pending_cert = create(authority=async_authority, csr=CSR_STR, owner='joe@example.com', creator=user['user'], common_name='ACommonName')
|
pending_cert = create(authority=async_authority, csr=CSR_STR, owner='joe@example.com', creator=user['user'],
|
||||||
|
common_name='ACommonName')
|
||||||
assert pending_cert.external_id == '12345'
|
assert pending_cert.external_id == '12345'
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue