lemur/lemur/tests/factories.py

438 lines
10 KiB
Python

from datetime import date
from factory import Sequence, post_generation, SubFactory
from factory.alchemy import SQLAlchemyModelFactory
from factory.fuzzy import FuzzyChoice, FuzzyText, FuzzyDate, FuzzyInteger
from lemur.database import db
from lemur.authorities.models import Authority
from lemur.certificates.models import Certificate
from lemur.destinations.models import Destination
from lemur.sources.models import Source
from lemur.notifications.models import Notification
from lemur.pending_certificates.models import PendingCertificate
from lemur.users.models import User
from lemur.roles.models import Role
from lemur.endpoints.models import Policy, Endpoint
from lemur.policies.models import RotationPolicy
from lemur.api_keys.models import ApiKey
from .vectors import (
SAN_CERT_STR,
SAN_CERT_KEY,
CSR_STR,
INTERMEDIATE_CERT_STR,
ROOTCA_CERT_STR,
INTERMEDIATE_KEY,
WILDCARD_CERT_KEY,
INVALID_CERT_STR,
)
class BaseFactory(SQLAlchemyModelFactory):
"""Base factory."""
class Meta:
"""Factory configuration."""
abstract = True
sqlalchemy_session = db.session
class RotationPolicyFactory(BaseFactory):
"""Rotation Factory."""
name = Sequence(lambda n: "policy{0}".format(n))
days = 30
class Meta:
"""Factory configuration."""
model = RotationPolicy
class CertificateFactory(BaseFactory):
"""Certificate factory."""
name = Sequence(lambda n: "certificate{0}".format(n))
chain = INTERMEDIATE_CERT_STR
body = SAN_CERT_STR
private_key = SAN_CERT_KEY
owner = "joe@example.com"
status = FuzzyChoice(["valid", "revoked", "unknown"])
deleted = False
description = FuzzyText(length=128)
active = True
date_created = FuzzyDate(date(2016, 1, 1), date(2020, 1, 1))
rotation_policy = SubFactory(RotationPolicyFactory)
class Meta:
"""Factory Configuration."""
model = Certificate
@post_generation
def user(self, create, extracted, **kwargs):
if not create:
return
if extracted:
self.user_id = extracted.id
@post_generation
def authority(self, create, extracted, **kwargs):
if not create:
return
if extracted:
self.authority_id = extracted.id
@post_generation
def notifications(self, create, extracted, **kwargs):
if not create:
return
if extracted:
for notification in extracted:
self.notifications.append(notification)
@post_generation
def destinations(self, create, extracted, **kwargs):
if not create:
return
if extracted:
for destination in extracted:
self.destintations.append(destination)
@post_generation
def replaces(self, create, extracted, **kwargs):
if not create:
return
if extracted:
for replace in extracted:
self.replaces.append(replace)
@post_generation
def sources(self, create, extracted, **kwargs):
if not create:
return
if extracted:
for source in extracted:
self.sources.append(source)
@post_generation
def domains(self, create, extracted, **kwargs):
if not create:
return
if extracted:
for domain in extracted:
self.domains.append(domain)
@post_generation
def roles(self, create, extracted, **kwargs):
if not create:
return
if extracted:
for domain in extracted:
self.roles.append(domain)
class CACertificateFactory(CertificateFactory):
chain = ROOTCA_CERT_STR
body = INTERMEDIATE_CERT_STR
private_key = INTERMEDIATE_KEY
class InvalidCertificateFactory(CertificateFactory):
body = INVALID_CERT_STR
private_key = ""
chain = ""
class AuthorityFactory(BaseFactory):
"""Authority factory."""
name = Sequence(lambda n: "authority{0}".format(n))
owner = "joe@example.com"
plugin = {"slug": "test-issuer"}
description = FuzzyText(length=128)
authority_certificate = SubFactory(CACertificateFactory)
class Meta:
"""Factory configuration."""
model = Authority
@post_generation
def roles(self, create, extracted, **kwargs):
if not create:
return
if extracted:
for role in extracted:
self.roles.append(role)
class AsyncAuthorityFactory(AuthorityFactory):
"""Async Authority factory."""
name = Sequence(lambda n: "authority{0}".format(n))
owner = "joe@example.com"
plugin = {"slug": "test-issuer-async"}
description = FuzzyText(length=128)
authority_certificate = SubFactory(CertificateFactory)
class CryptoAuthorityFactory(AuthorityFactory):
"""Authority factory based on 'cryptography' plugin."""
plugin = {"slug": "cryptography-issuer"}
class DestinationFactory(BaseFactory):
"""Destination factory."""
plugin_name = "test-destination"
label = Sequence(lambda n: "destination{0}".format(n))
class Meta:
"""Factory Configuration."""
model = Destination
class SourceFactory(BaseFactory):
"""Source factory."""
plugin_name = "test-source"
label = Sequence(lambda n: "source{0}".format(n))
class Meta:
"""Factory Configuration."""
model = Source
class NotificationFactory(BaseFactory):
"""Notification factory."""
plugin_name = "test-notification"
label = Sequence(lambda n: "notification{0}".format(n))
class Meta:
"""Factory Configuration."""
model = Notification
class RoleFactory(BaseFactory):
"""Role factory."""
name = Sequence(lambda n: "role{0}".format(n))
class Meta:
"""Factory Configuration."""
model = Role
@post_generation
def users(self, create, extracted, **kwargs):
if not create:
return
if extracted:
for user in extracted:
self.users.append(user)
class UserFactory(BaseFactory):
"""User Factory."""
username = Sequence(lambda n: "user{0}".format(n))
email = Sequence(lambda n: "user{0}@example.com".format(n))
active = True
password = FuzzyText(length=24)
certificates = []
class Meta:
"""Factory Configuration."""
model = User
@post_generation
def roles(self, create, extracted, **kwargs):
if not create:
return
if extracted:
for role in extracted:
self.roles.append(role)
@post_generation
def certificates(self, create, extracted, **kwargs):
if not create:
return
if extracted:
for cert in extracted:
self.certificates.append(cert)
@post_generation
def authorities(self, create, extracted, **kwargs):
if not create:
return
if extracted:
for authority in extracted:
self.authorities.append(authority)
class PolicyFactory(BaseFactory):
"""Policy Factory."""
name = Sequence(lambda n: "endpoint{0}".format(n))
class Meta:
"""Factory Configuration."""
model = Policy
class EndpointFactory(BaseFactory):
"""Endpoint Factory."""
owner = "joe@example.com"
name = Sequence(lambda n: "endpoint{0}".format(n))
type = FuzzyChoice(["elb"])
active = True
port = FuzzyInteger(0, high=65535)
dnsname = "endpoint.example.com"
policy = SubFactory(PolicyFactory)
certificate = SubFactory(CertificateFactory)
source = SubFactory(SourceFactory)
class Meta:
"""Factory Configuration."""
model = Endpoint
class ApiKeyFactory(BaseFactory):
"""Api Key Factory."""
name = Sequence(lambda n: "api_key_{0}".format(n))
revoked = False
ttl = -1
issued_at = 1
class Meta:
"""Factory Configuration."""
model = ApiKey
@post_generation
def user(self, create, extracted, **kwargs):
if not create:
return
if extracted:
self.userId = extracted.id
class PendingCertificateFactory(BaseFactory):
"""PendingCertificate factory."""
name = Sequence(lambda n: "pending_certificate{0}".format(n))
external_id = 12345
csr = CSR_STR
chain = INTERMEDIATE_CERT_STR
private_key = WILDCARD_CERT_KEY
owner = "joe@example.com"
status = FuzzyChoice(["valid", "revoked", "unknown"])
deleted = False
description = FuzzyText(length=128)
date_created = FuzzyDate(date(2016, 1, 1), date(2020, 1, 1))
number_attempts = 0
rename = False
class Meta:
"""Factory Configuration."""
model = PendingCertificate
@post_generation
def user(self, create, extracted, **kwargs):
if not create:
return
if extracted:
self.user_id = extracted.id
@post_generation
def authority(self, create, extracted, **kwargs):
if not create:
return
if extracted:
self.authority_id = extracted.id
@post_generation
def notifications(self, create, extracted, **kwargs):
if not create:
return
if extracted:
for notification in extracted:
self.notifications.append(notification)
@post_generation
def destinations(self, create, extracted, **kwargs):
if not create:
return
if extracted:
for destination in extracted:
self.destintations.append(destination)
@post_generation
def replaces(self, create, extracted, **kwargs):
if not create:
return
if extracted:
for replace in extracted:
self.replaces.append(replace)
@post_generation
def sources(self, create, extracted, **kwargs):
if not create:
return
if extracted:
for source in extracted:
self.sources.append(source)
@post_generation
def domains(self, create, extracted, **kwargs):
if not create:
return
if extracted:
for domain in extracted:
self.domains.append(domain)
@post_generation
def roles(self, create, extracted, **kwargs):
if not create:
return
if extracted:
for domain in extracted:
self.roles.append(domain)