basic ldap support (#842)

This commit is contained in:
Ian Stahnke
2017-09-04 13:41:43 +10:00
committed by kevgliss
parent c0784b40e0
commit 79d12578c7
8 changed files with 378 additions and 4 deletions

180
lemur/auth/ldap.py Normal file
View File

@ -0,0 +1,180 @@
"""
.. module: lemur.auth.ldap
:platform: Unix
:copyright: (c) 2015 by Netflix Inc., see AUTHORS for more
:license: Apache, see LICENSE for more details.
.. moduleauthor:: Ian Stahnke <ian.stahnke@myob.com>
"""
import ldap
from flask import current_app
from lemur.users import service as user_service
from lemur.roles import service as role_service
from lemur.common.utils import validate_conf, get_psuedo_random_string
class LdapPrincipal():
"""
Provides methods for authenticating against an LDAP server.
"""
def __init__(self, args):
self._ldap_validate_conf()
# setup ldap config
if not args['username']:
raise Exception("missing ldap username")
if not args['password']:
self.error_message = "missing ldap password"
raise Exception("missing ldap password")
self.ldap_principal = args['username']
self.ldap_email_domain = current_app.config.get("LDAP_EMAIL_DOMAIN", None)
if '@' not in self.ldap_principal:
self.ldap_principal = '%s@%s' % (self.ldap_principal, self.ldap_email_domain)
self.ldap_username = args['username']
if '@' in self.ldap_username:
self.ldap_username = args['username'].split("@")[0]
self.ldap_password = args['password']
self.ldap_server = current_app.config.get('LDAP_BIND_URI', None)
self.ldap_base_dn = current_app.config.get("LDAP_BASE_DN", None)
self.ldap_use_tls = current_app.config.get("LDAP_USE_TLS", False)
self.ldap_cacert_file = current_app.config.get("LDAP_CACERT_FILE", None)
self.ldap_default_role = current_app.config.get("LEMUR_DEFAULT_ROLE", None)
self.ldap_required_group = current_app.config.get("LDAP_REQUIRED_GROUP", None)
self.ldap_groups_to_roles = current_app.config.get("LDAP_GROUPS_TO_ROLES", None)
self.ldap_attrs = ['memberOf']
self.ldap_client = None
self.ldap_groups = None
def _update_user(self, roles):
"""
create or update a local user instance.
"""
# try to get user from local database
user = user_service.get_by_email(self.ldap_principal)
# create them a local account
if not user:
user = user_service.create(
self.ldap_username,
get_psuedo_random_string(),
self.ldap_principal,
True,
'', # thumbnailPhotoUrl
list(roles)
)
else:
# we add 'lemur' specific roles, so they do not get marked as removed
for ur in user.roles:
if ur.authority_id:
roles.add(ur)
# update any changes to the user
user_service.update(
user.id,
self.ldap_username,
self.ldap_principal,
user.active,
user.profile_picture,
list(roles)
)
return user
def _authorize(self):
"""
check groups and roles to confirm access.
return a list of roles if ok.
raise an exception on error.
"""
if not self.ldap_principal:
return None
if self.ldap_required_group:
# ensure the user has the required group in their group list
if self.ldap_required_group not in self.ldap_groups:
return None
roles = set()
if self.ldap_default_role:
role = role_service.get_by_name(self.ldap_default_role)
if role:
roles.add(role)
# update their 'roles'
role = role_service.get_by_name(self.ldap_principal)
if not role:
description = "auto generated role based on owner: {0}".format(self.ldap_principal)
role = role_service.create(self.ldap_principal, description=description)
roles.add(role)
if not self.ldap_groups_to_roles:
return roles
for ldap_group_name, role_name in self.ldap_groups_to_roles.items():
role = role_service.get_by_name(role_name)
if role:
if ldap_group_name in self.ldap_groups:
current_app.logger.debug("assigning role {0} to ldap user {1}".format(self.ldap_principal, role))
roles.add(role)
return roles
def authenticate(self):
"""
orchestrate the ldap login.
raise an exception on error.
"""
self._bind()
roles = self._authorize()
if not roles:
raise Exception('ldap authorization failed')
return self._update_user(roles)
def _bind(self):
"""
authenticate an ldap user.
list groups for a user.
raise an exception on error.
"""
if '@' not in self.ldap_principal:
self.ldap_principal = '%s@%s' % (self.ldap_principal, self.ldap_email_domain)
ldap_filter = 'userPrincipalName=%s' % self.ldap_principal
# query ldap for auth
try:
# build a client
if not self.ldap_client:
self.ldap_client = ldap.initialize(self.ldap_server)
# perform a synchronous bind
self.ldap_client.set_option(ldap.OPT_REFERRALS, 0)
if self.ldap_use_tls:
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
self.ldap_client.set_option(ldap.OPT_PROTOCOL_VERSION, 3)
self.ldap_client.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_DEMAND)
self.ldap_client.set_option(ldap.OPT_X_TLS_DEMAND, True)
self.ldap_client.set_option(ldap.OPT_DEBUG_LEVEL, 255)
if self.ldap_cacert_file:
self.ldap_client.set_option(ldap.OPT_X_TLS_CACERTFILE, self.ldap_cacert_file)
self.ldap_client.simple_bind_s(self.ldap_principal, self.ldap_password)
except ldap.INVALID_CREDENTIALS:
self.ldap_client.unbind()
raise Exception('The supplied ldap credentials are invalid')
except ldap.SERVER_DOWN:
raise Exception('ldap server unavailable')
except ldap.LDAPError as e:
raise Exception("ldap error: {0}".format(e))
lgroups = self.ldap_client.search_s(self.ldap_base_dn,
ldap.SCOPE_SUBTREE, ldap_filter, self.ldap_attrs)[0][1]['memberOf']
# lgroups is a list of utf-8 encoded strings
# convert to a single string of groups to allow matching
self.ldap_groups = b''.join(lgroups).decode('ascii')
self.ldap_client.unbind()
def _ldap_validate_conf(self):
"""
Confirms required ldap config settings exist.
"""
required_vars = [
'LDAP_BIND_URI',
'LDAP_BASE_DN',
'LDAP_EMAIL_DOMAIN',
]
validate_conf(current_app, required_vars)

View File

@ -21,6 +21,7 @@ from lemur.common.utils import get_psuedo_random_string
from lemur.users import service as user_service
from lemur.roles import service as role_service
from lemur.auth.service import create_token, fetch_token_header, get_rsa_public_key
import lemur.auth.ldap as ldap
mod = Blueprint('auth', __name__)
@ -94,6 +95,7 @@ class Login(Resource):
else:
user = user_service.get_by_username(args['username'])
# default to local authentication
if user and user.check_password(args['password']) and user.active:
# Tell Flask-Principal the identity changed
identity_changed.send(current_app._get_current_object(),
@ -102,6 +104,24 @@ class Login(Resource):
metrics.send('successful_login', 'counter', 1)
return dict(token=create_token(user))
# try ldap login
if current_app.config.get("LDAP_AUTH"):
try:
ldap_principal = ldap.LdapPrincipal(args)
user = ldap_principal.authenticate()
if user and user.active:
# Tell Flask-Principal the identity changed
identity_changed.send(current_app._get_current_object(),
identity=Identity(user.id))
metrics.send('successful_login', 'counter', 1)
return dict(token=create_token(user))
except Exception as e:
current_app.logger.error("ldap error: {0}".format(e))
ldap_message = 'ldap error: %s' % e
metrics.send('invalid_login', 'counter', 1)
return dict(message=ldap_message), 403
# if not valid user - no certificates for you
metrics.send('invalid_login', 'counter', 1)
return dict(message='The supplied credentials are invalid'), 403

View File

@ -8,7 +8,7 @@
</button>
</div>
</div>
<div class="login-or">
<div class="login-or" ng-if="providers.length > 0">
<hr class="hr-or">
<span class="span-or">or</span>
</div>

View File

@ -177,3 +177,10 @@ ACME_URL = 'https://acme-v01.api.letsencrypt.org'
ACME_EMAIL = 'jim@example.com'
ACME_TEL = '4088675309'
ACME_DIRECTORY_URL = 'https://acme-v01.api.letsencrypt.org'
LDAP_AUTH = True
LDAP_BIND_URI = 'ldap://localhost'
LDAP_BASE_DN = 'dc=example,dc=com'
LDAP_EMAIL_DOMAIN = 'example.com'
LDAP_REQUIRED_GROUP = 'Lemur Access'
LDAP_DEFAULT_ROLE = 'role1'

64
lemur/tests/test_ldap.py Normal file
View File

@ -0,0 +1,64 @@
import pytest
from lemur.auth.ldap import * # noqa
from mock import patch, MagicMock
class LdapPrincipalTester(LdapPrincipal):
def __init__(self, args):
super().__init__(args)
self.ldap_server = 'ldap://localhost'
def bind_test(self):
groups = [('user', {'memberOf': ['CN=Lemur Access,OU=Groups,DC=example,DC=com'.encode('utf-8'),
'CN=Pen Pushers,OU=Groups,DC=example,DC=com'.encode('utf-8')]})]
self.ldap_client = MagicMock()
self.ldap_client.search_s.return_value = groups
self._bind()
def authorize_test_groups_to_roles_admin(self):
self.ldap_groups = ''.join(['CN=Pen Pushers,OU=Groups,DC=example,DC=com',
'CN=Lemur Admins,OU=Groups,DC=example,DC=com',
'CN=Lemur Read Only,OU=Groups,DC=example,DC=com'])
self.ldap_required_group = None
self.ldap_groups_to_roles = {'Lemur Admins': 'admin', 'Lemur Read Only': 'read-only'}
return self._authorize()
def authorize_test_required_group(self, group):
self.ldap_groups = ''.join(['CN=Lemur Access,OU=Groups,DC=example,DC=com',
'CN=Pen Pushers,OU=Groups,DC=example,DC=com'])
self.ldap_required_group = group
return self._authorize()
@pytest.fixture()
def principal(session):
args = {'username': 'user', 'password': 'p4ssw0rd'}
yield LdapPrincipalTester(args)
class TestLdapPrincipal:
@patch('ldap.initialize')
def test_bind(self, app, principal):
self.test_ldap_user = principal
self.test_ldap_user.bind_test()
group = 'Pen Pushers'
assert group in self.test_ldap_user.ldap_groups
assert self.test_ldap_user.ldap_principal == 'user@example.com'
def test_authorize_groups_to_roles_admin(self, app, principal):
self.test_ldap_user = principal
roles = self.test_ldap_user.authorize_test_groups_to_roles_admin()
assert any(x.name == "admin" for x in roles)
def test_authorize_required_group_missing(self, app, principal):
self.test_ldap_user = principal
roles = self.test_ldap_user.authorize_test_required_group('Not Allowed')
assert not roles
def test_authorize_required_group_access(self, session, principal):
self.test_ldap_user = principal
roles = self.test_ldap_user.authorize_test_required_group('Lemur Access')
assert len(roles) >= 1
assert any(x.name == "user@example.com" for x in roles)