438 lines
10 KiB
Python
438 lines
10 KiB
Python
from datetime import date
|
|
|
|
from factory import Sequence, post_generation, SubFactory
|
|
from factory.alchemy import SQLAlchemyModelFactory
|
|
from factory.fuzzy import FuzzyChoice, FuzzyText, FuzzyDate, FuzzyInteger
|
|
|
|
|
|
from lemur.database import db
|
|
from lemur.authorities.models import Authority
|
|
from lemur.certificates.models import Certificate
|
|
from lemur.destinations.models import Destination
|
|
from lemur.sources.models import Source
|
|
from lemur.notifications.models import Notification
|
|
from lemur.pending_certificates.models import PendingCertificate
|
|
from lemur.users.models import User
|
|
from lemur.roles.models import Role
|
|
from lemur.endpoints.models import Policy, Endpoint
|
|
from lemur.policies.models import RotationPolicy
|
|
from lemur.api_keys.models import ApiKey
|
|
|
|
from .vectors import (
|
|
SAN_CERT_STR,
|
|
SAN_CERT_KEY,
|
|
CSR_STR,
|
|
INTERMEDIATE_CERT_STR,
|
|
ROOTCA_CERT_STR,
|
|
INTERMEDIATE_KEY,
|
|
WILDCARD_CERT_KEY,
|
|
INVALID_CERT_STR,
|
|
)
|
|
|
|
|
|
class BaseFactory(SQLAlchemyModelFactory):
|
|
"""Base factory."""
|
|
|
|
class Meta:
|
|
"""Factory configuration."""
|
|
|
|
abstract = True
|
|
sqlalchemy_session = db.session
|
|
|
|
|
|
class RotationPolicyFactory(BaseFactory):
|
|
"""Rotation Factory."""
|
|
|
|
name = Sequence(lambda n: "policy{0}".format(n))
|
|
days = 30
|
|
|
|
class Meta:
|
|
"""Factory configuration."""
|
|
|
|
model = RotationPolicy
|
|
|
|
|
|
class CertificateFactory(BaseFactory):
|
|
"""Certificate factory."""
|
|
|
|
name = Sequence(lambda n: "certificate{0}".format(n))
|
|
chain = INTERMEDIATE_CERT_STR
|
|
body = SAN_CERT_STR
|
|
private_key = SAN_CERT_KEY
|
|
owner = "joe@example.com"
|
|
status = FuzzyChoice(["valid", "revoked", "unknown"])
|
|
deleted = False
|
|
description = FuzzyText(length=128)
|
|
active = True
|
|
date_created = FuzzyDate(date(2016, 1, 1), date(2020, 1, 1))
|
|
rotation_policy = SubFactory(RotationPolicyFactory)
|
|
|
|
class Meta:
|
|
"""Factory Configuration."""
|
|
|
|
model = Certificate
|
|
|
|
@post_generation
|
|
def user(self, create, extracted, **kwargs):
|
|
if not create:
|
|
return
|
|
|
|
if extracted:
|
|
self.user_id = extracted.id
|
|
|
|
@post_generation
|
|
def authority(self, create, extracted, **kwargs):
|
|
if not create:
|
|
return
|
|
|
|
if extracted:
|
|
self.authority_id = extracted.id
|
|
|
|
@post_generation
|
|
def notifications(self, create, extracted, **kwargs):
|
|
if not create:
|
|
return
|
|
|
|
if extracted:
|
|
for notification in extracted:
|
|
self.notifications.append(notification)
|
|
|
|
@post_generation
|
|
def destinations(self, create, extracted, **kwargs):
|
|
if not create:
|
|
return
|
|
|
|
if extracted:
|
|
for destination in extracted:
|
|
self.destintations.append(destination)
|
|
|
|
@post_generation
|
|
def replaces(self, create, extracted, **kwargs):
|
|
if not create:
|
|
return
|
|
|
|
if extracted:
|
|
for replace in extracted:
|
|
self.replaces.append(replace)
|
|
|
|
@post_generation
|
|
def sources(self, create, extracted, **kwargs):
|
|
if not create:
|
|
return
|
|
|
|
if extracted:
|
|
for source in extracted:
|
|
self.sources.append(source)
|
|
|
|
@post_generation
|
|
def domains(self, create, extracted, **kwargs):
|
|
if not create:
|
|
return
|
|
|
|
if extracted:
|
|
for domain in extracted:
|
|
self.domains.append(domain)
|
|
|
|
@post_generation
|
|
def roles(self, create, extracted, **kwargs):
|
|
if not create:
|
|
return
|
|
|
|
if extracted:
|
|
for domain in extracted:
|
|
self.roles.append(domain)
|
|
|
|
|
|
class CACertificateFactory(CertificateFactory):
|
|
chain = ROOTCA_CERT_STR
|
|
body = INTERMEDIATE_CERT_STR
|
|
private_key = INTERMEDIATE_KEY
|
|
|
|
|
|
class InvalidCertificateFactory(CertificateFactory):
|
|
body = INVALID_CERT_STR
|
|
private_key = ""
|
|
chain = ""
|
|
|
|
|
|
class AuthorityFactory(BaseFactory):
|
|
"""Authority factory."""
|
|
|
|
name = Sequence(lambda n: "authority{0}".format(n))
|
|
owner = "joe@example.com"
|
|
plugin = {"slug": "test-issuer"}
|
|
description = FuzzyText(length=128)
|
|
authority_certificate = SubFactory(CACertificateFactory)
|
|
|
|
class Meta:
|
|
"""Factory configuration."""
|
|
|
|
model = Authority
|
|
|
|
@post_generation
|
|
def roles(self, create, extracted, **kwargs):
|
|
if not create:
|
|
return
|
|
|
|
if extracted:
|
|
for role in extracted:
|
|
self.roles.append(role)
|
|
|
|
|
|
class AsyncAuthorityFactory(AuthorityFactory):
|
|
"""Async Authority factory."""
|
|
|
|
name = Sequence(lambda n: "authority{0}".format(n))
|
|
owner = "joe@example.com"
|
|
plugin = {"slug": "test-issuer-async"}
|
|
description = FuzzyText(length=128)
|
|
authority_certificate = SubFactory(CertificateFactory)
|
|
|
|
|
|
class CryptoAuthorityFactory(AuthorityFactory):
|
|
"""Authority factory based on 'cryptography' plugin."""
|
|
|
|
plugin = {"slug": "cryptography-issuer"}
|
|
|
|
|
|
class DestinationFactory(BaseFactory):
|
|
"""Destination factory."""
|
|
|
|
plugin_name = "test-destination"
|
|
label = Sequence(lambda n: "destination{0}".format(n))
|
|
|
|
class Meta:
|
|
"""Factory Configuration."""
|
|
|
|
model = Destination
|
|
|
|
|
|
class SourceFactory(BaseFactory):
|
|
"""Source factory."""
|
|
|
|
plugin_name = "test-source"
|
|
label = Sequence(lambda n: "source{0}".format(n))
|
|
|
|
class Meta:
|
|
"""Factory Configuration."""
|
|
|
|
model = Source
|
|
|
|
|
|
class NotificationFactory(BaseFactory):
|
|
"""Notification factory."""
|
|
|
|
plugin_name = "test-notification"
|
|
label = Sequence(lambda n: "notification{0}".format(n))
|
|
|
|
class Meta:
|
|
"""Factory Configuration."""
|
|
|
|
model = Notification
|
|
|
|
|
|
class RoleFactory(BaseFactory):
|
|
"""Role factory."""
|
|
|
|
name = Sequence(lambda n: "role{0}".format(n))
|
|
|
|
class Meta:
|
|
"""Factory Configuration."""
|
|
|
|
model = Role
|
|
|
|
@post_generation
|
|
def users(self, create, extracted, **kwargs):
|
|
if not create:
|
|
return
|
|
|
|
if extracted:
|
|
for user in extracted:
|
|
self.users.append(user)
|
|
|
|
|
|
class UserFactory(BaseFactory):
|
|
"""User Factory."""
|
|
|
|
username = Sequence(lambda n: "user{0}".format(n))
|
|
email = Sequence(lambda n: "user{0}@example.com".format(n))
|
|
active = True
|
|
password = FuzzyText(length=24)
|
|
certificates = []
|
|
|
|
class Meta:
|
|
"""Factory Configuration."""
|
|
|
|
model = User
|
|
|
|
@post_generation
|
|
def roles(self, create, extracted, **kwargs):
|
|
if not create:
|
|
return
|
|
|
|
if extracted:
|
|
for role in extracted:
|
|
self.roles.append(role)
|
|
|
|
@post_generation
|
|
def certificates(self, create, extracted, **kwargs):
|
|
if not create:
|
|
return
|
|
|
|
if extracted:
|
|
for cert in extracted:
|
|
self.certificates.append(cert)
|
|
|
|
@post_generation
|
|
def authorities(self, create, extracted, **kwargs):
|
|
if not create:
|
|
return
|
|
|
|
if extracted:
|
|
for authority in extracted:
|
|
self.authorities.append(authority)
|
|
|
|
|
|
class PolicyFactory(BaseFactory):
|
|
"""Policy Factory."""
|
|
|
|
name = Sequence(lambda n: "endpoint{0}".format(n))
|
|
|
|
class Meta:
|
|
"""Factory Configuration."""
|
|
|
|
model = Policy
|
|
|
|
|
|
class EndpointFactory(BaseFactory):
|
|
"""Endpoint Factory."""
|
|
|
|
owner = "joe@example.com"
|
|
name = Sequence(lambda n: "endpoint{0}".format(n))
|
|
type = FuzzyChoice(["elb"])
|
|
active = True
|
|
port = FuzzyInteger(0, high=65535)
|
|
dnsname = "endpoint.example.com"
|
|
policy = SubFactory(PolicyFactory)
|
|
certificate = SubFactory(CertificateFactory)
|
|
source = SubFactory(SourceFactory)
|
|
|
|
class Meta:
|
|
"""Factory Configuration."""
|
|
|
|
model = Endpoint
|
|
|
|
|
|
class ApiKeyFactory(BaseFactory):
|
|
"""Api Key Factory."""
|
|
|
|
name = Sequence(lambda n: "api_key_{0}".format(n))
|
|
revoked = False
|
|
ttl = -1
|
|
issued_at = 1
|
|
|
|
class Meta:
|
|
"""Factory Configuration."""
|
|
|
|
model = ApiKey
|
|
|
|
@post_generation
|
|
def user(self, create, extracted, **kwargs):
|
|
if not create:
|
|
return
|
|
|
|
if extracted:
|
|
self.userId = extracted.id
|
|
|
|
|
|
class PendingCertificateFactory(BaseFactory):
|
|
"""PendingCertificate factory."""
|
|
|
|
name = Sequence(lambda n: "pending_certificate{0}".format(n))
|
|
external_id = 12345
|
|
csr = CSR_STR
|
|
chain = INTERMEDIATE_CERT_STR
|
|
private_key = WILDCARD_CERT_KEY
|
|
owner = "joe@example.com"
|
|
status = FuzzyChoice(["valid", "revoked", "unknown"])
|
|
deleted = False
|
|
description = FuzzyText(length=128)
|
|
date_created = FuzzyDate(date(2016, 1, 1), date(2020, 1, 1))
|
|
number_attempts = 0
|
|
rename = False
|
|
|
|
class Meta:
|
|
"""Factory Configuration."""
|
|
|
|
model = PendingCertificate
|
|
|
|
@post_generation
|
|
def user(self, create, extracted, **kwargs):
|
|
if not create:
|
|
return
|
|
|
|
if extracted:
|
|
self.user_id = extracted.id
|
|
|
|
@post_generation
|
|
def authority(self, create, extracted, **kwargs):
|
|
if not create:
|
|
return
|
|
|
|
if extracted:
|
|
self.authority_id = extracted.id
|
|
|
|
@post_generation
|
|
def notifications(self, create, extracted, **kwargs):
|
|
if not create:
|
|
return
|
|
|
|
if extracted:
|
|
for notification in extracted:
|
|
self.notifications.append(notification)
|
|
|
|
@post_generation
|
|
def destinations(self, create, extracted, **kwargs):
|
|
if not create:
|
|
return
|
|
|
|
if extracted:
|
|
for destination in extracted:
|
|
self.destintations.append(destination)
|
|
|
|
@post_generation
|
|
def replaces(self, create, extracted, **kwargs):
|
|
if not create:
|
|
return
|
|
|
|
if extracted:
|
|
for replace in extracted:
|
|
self.replaces.append(replace)
|
|
|
|
@post_generation
|
|
def sources(self, create, extracted, **kwargs):
|
|
if not create:
|
|
return
|
|
|
|
if extracted:
|
|
for source in extracted:
|
|
self.sources.append(source)
|
|
|
|
@post_generation
|
|
def domains(self, create, extracted, **kwargs):
|
|
if not create:
|
|
return
|
|
|
|
if extracted:
|
|
for domain in extracted:
|
|
self.domains.append(domain)
|
|
|
|
@post_generation
|
|
def roles(self, create, extracted, **kwargs):
|
|
if not create:
|
|
return
|
|
|
|
if extracted:
|
|
for domain in extracted:
|
|
self.roles.append(domain)
|