Python3 (#417)
* Fixing tests. * Fixing issue where decrypted credentials were not returning valid strings. * Fixing issues with python3 authentication.
This commit is contained in:
parent
7e6278684c
commit
53d0636574
|
@ -8,11 +8,9 @@
|
||||||
.. moduleauthor:: Kevin Glisson <kglisson@netflix.com>
|
.. moduleauthor:: Kevin Glisson <kglisson@netflix.com>
|
||||||
|
|
||||||
"""
|
"""
|
||||||
from __future__ import unicode_literals
|
import sys
|
||||||
from builtins import bytes
|
|
||||||
import jwt
|
import jwt
|
||||||
import json
|
import json
|
||||||
import base64
|
|
||||||
import binascii
|
import binascii
|
||||||
|
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
|
@ -34,19 +32,6 @@ from lemur.auth.permissions import CertificateCreatorNeed, \
|
||||||
AuthorityCreatorNeed, ViewRoleCredentialsNeed
|
AuthorityCreatorNeed, ViewRoleCredentialsNeed
|
||||||
|
|
||||||
|
|
||||||
def base64url_decode(data):
|
|
||||||
rem = len(data) % 4
|
|
||||||
|
|
||||||
if rem > 0:
|
|
||||||
data += '=' * (4 - rem)
|
|
||||||
|
|
||||||
return base64.urlsafe_b64decode(bytes(data.encode('latin-1')))
|
|
||||||
|
|
||||||
|
|
||||||
def base64url_encode(data):
|
|
||||||
return base64.urlsafe_b64encode(data).replace('=', '')
|
|
||||||
|
|
||||||
|
|
||||||
def get_rsa_public_key(n, e):
|
def get_rsa_public_key(n, e):
|
||||||
"""
|
"""
|
||||||
Retrieve an RSA public key based on a module and exponent as provided by the JWKS format.
|
Retrieve an RSA public key based on a module and exponent as provided by the JWKS format.
|
||||||
|
@ -55,8 +40,13 @@ def get_rsa_public_key(n, e):
|
||||||
:param e:
|
:param e:
|
||||||
:return: a RSA Public Key in PEM format
|
:return: a RSA Public Key in PEM format
|
||||||
"""
|
"""
|
||||||
n = int(binascii.hexlify(base64url_decode(n)), 16)
|
if sys.version_info >= (3, 0):
|
||||||
e = int(binascii.hexlify(base64url_decode(e)), 16)
|
n = int(binascii.hexlify(jwt.utils.base64url_decode(bytes(n, 'utf-8'))), 16)
|
||||||
|
e = int(binascii.hexlify(jwt.utils.base64url_decode(bytes(e, 'utf-8'))), 16)
|
||||||
|
else:
|
||||||
|
n = int(binascii.hexlify(jwt.utils.base64url_decode(n)), 16)
|
||||||
|
e = int(binascii.hexlify(jwt.utils.base64url_decode(e, 'utf-8')), 16)
|
||||||
|
|
||||||
pub = RSAPublicNumbers(e, n).public_key(default_backend())
|
pub = RSAPublicNumbers(e, n).public_key(default_backend())
|
||||||
return pub.public_bytes(
|
return pub.public_bytes(
|
||||||
encoding=serialization.Encoding.PEM,
|
encoding=serialization.Encoding.PEM,
|
||||||
|
@ -138,13 +128,13 @@ def fetch_token_header(token):
|
||||||
raise jwt.DecodeError('Not enough segments')
|
raise jwt.DecodeError('Not enough segments')
|
||||||
|
|
||||||
try:
|
try:
|
||||||
return json.loads(base64url_decode(header_segment))
|
if sys.version_info >= (3, 0):
|
||||||
|
return json.loads(jwt.utils.base64url_decode(header_segment).decode('utf-8'))
|
||||||
|
else:
|
||||||
|
return json.loads(jwt.utils.base64url_decode(header_segment))
|
||||||
except TypeError as e:
|
except TypeError as e:
|
||||||
current_app.logger.exception(e)
|
current_app.logger.exception(e)
|
||||||
raise jwt.DecodeError('Invalid header padding')
|
raise jwt.DecodeError('Invalid header padding')
|
||||||
except binascii.Error as e:
|
|
||||||
current_app.logger.exception(e)
|
|
||||||
raise jwt.DecodeError('Invalid header padding')
|
|
||||||
|
|
||||||
|
|
||||||
@identity_loaded.connect
|
@identity_loaded.connect
|
||||||
|
|
|
@ -5,6 +5,7 @@
|
||||||
:license: Apache, see LICENSE for more details.
|
:license: Apache, see LICENSE for more details.
|
||||||
.. moduleauthor:: Kevin Glisson <kglisson@netflix.com>
|
.. moduleauthor:: Kevin Glisson <kglisson@netflix.com>
|
||||||
"""
|
"""
|
||||||
|
import sys
|
||||||
import jwt
|
import jwt
|
||||||
import base64
|
import base64
|
||||||
import requests
|
import requests
|
||||||
|
@ -140,8 +141,14 @@ class Ping(Resource):
|
||||||
user_api_url = current_app.config.get('PING_USER_API_URL')
|
user_api_url = current_app.config.get('PING_USER_API_URL')
|
||||||
|
|
||||||
# the secret and cliendId will be given to you when you signup for the provider
|
# the secret and cliendId will be given to you when you signup for the provider
|
||||||
basic = base64.b64encode('{0}:{1}'.format(args['clientId'], current_app.config.get("PING_SECRET")))
|
token = '{0}:{1}'.format(args['clientId'], current_app.config.get("PING_SECRET"))
|
||||||
headers = {'Authorization': 'Basic {0}'.format(basic)}
|
|
||||||
|
if sys.version_info >= (3, 0):
|
||||||
|
basic = base64.b64encode(bytes(token, 'utf-8'))
|
||||||
|
headers = {'authorization': 'basic {0}'.format(basic.decode('utf-8'))}
|
||||||
|
else:
|
||||||
|
basic = base64.b64encode(token, 'utf-8')
|
||||||
|
headers = {'authorization': 'basic {0}'.format(basic)}
|
||||||
|
|
||||||
# exchange authorization code for access token.
|
# exchange authorization code for access token.
|
||||||
|
|
||||||
|
@ -165,7 +172,10 @@ class Ping(Resource):
|
||||||
|
|
||||||
# validate your token based on the key it was signed with
|
# validate your token based on the key it was signed with
|
||||||
try:
|
try:
|
||||||
jwt.decode(id_token, secret, algorithms=[algo], audience=args['clientId'])
|
if sys.version_info >= (3, 0):
|
||||||
|
jwt.decode(id_token, secret.decode('utf-8'), algorithms=[algo], audience=args['clientId'])
|
||||||
|
else:
|
||||||
|
jwt.decode(id_token, secret, algorithms=[algo], audience=args['clientId'])
|
||||||
except jwt.DecodeError:
|
except jwt.DecodeError:
|
||||||
return dict(message='Token is invalid'), 403
|
return dict(message='Token is invalid'), 403
|
||||||
except jwt.ExpiredSignatureError:
|
except jwt.ExpiredSignatureError:
|
||||||
|
|
Loading…
Reference in New Issue