Updated DB_Helper by adding firebase methods.
This commit is contained in:
parent
485cc3bbba
commit
c82121d036
1810 changed files with 537281 additions and 1 deletions
56
venv/Lib/site-packages/jws/__init__.py
Normal file
56
venv/Lib/site-packages/jws/__init__.py
Normal file
|
@ -0,0 +1,56 @@
|
|||
from __future__ import absolute_import
|
||||
|
||||
import json
|
||||
|
||||
import jws.utils as utils
|
||||
|
||||
# local
|
||||
import jws.algos as algos
|
||||
import jws.header as header
|
||||
from jws.exceptions import *
|
||||
|
||||
##############
|
||||
# public api #
|
||||
##############
|
||||
def sign(head, payload, key=None, is_json=False):
|
||||
data = {
|
||||
'key': key,
|
||||
'header': json.loads(head) if is_json else head,
|
||||
'payload': json.loads(payload) if is_json else payload,
|
||||
'signer': None
|
||||
}
|
||||
# TODO: re-evaluate whether to pass ``data`` by reference, or to copy and reassign
|
||||
header.process(data, 'sign')
|
||||
if not data['key']:
|
||||
raise MissingKey("Key was not passed as a param and a key could not be found from the header")
|
||||
if not data['signer']:
|
||||
raise MissingSigner("Header was processed, but no algorithm was found to sign the message")
|
||||
signer = data['signer']
|
||||
signature = signer(_signing_input(head, payload, is_json), key)
|
||||
return utils.to_base64(signature)
|
||||
|
||||
|
||||
def verify(head, payload, encoded_signature, key=None, is_json=False):
|
||||
data = {
|
||||
'key': key,
|
||||
'header': json.loads(head) if is_json else head,
|
||||
'payload': json.loads(payload) if is_json else payload,
|
||||
'verifier': None
|
||||
}
|
||||
# TODO: re-evaluate whether to pass ``data`` by reference, or to copy and reassign
|
||||
header.process(data, 'verify')
|
||||
if not data['key']:
|
||||
raise MissingKey("Key was not passed as a param and a key could not be found from the header")
|
||||
if not data['verifier']:
|
||||
raise MissingVerifier("Header was processed, but no algorithm was found to sign the message")
|
||||
verifier = data['verifier']
|
||||
signature = utils.from_base64(encoded_signature)
|
||||
return verifier(_signing_input(head, payload, is_json), signature, key)
|
||||
|
||||
####################
|
||||
# semi-private api #
|
||||
####################
|
||||
def _signing_input(head, payload, is_json=False):
|
||||
enc = utils.to_base64 if is_json else utils.encode
|
||||
head_input, payload_input = map(enc, [head, payload])
|
||||
return "%s.%s" % (head_input, payload_input)
|
BIN
venv/Lib/site-packages/jws/__pycache__/__init__.cpython-36.pyc
Normal file
BIN
venv/Lib/site-packages/jws/__pycache__/__init__.cpython-36.pyc
Normal file
Binary file not shown.
BIN
venv/Lib/site-packages/jws/__pycache__/algos.cpython-36.pyc
Normal file
BIN
venv/Lib/site-packages/jws/__pycache__/algos.cpython-36.pyc
Normal file
Binary file not shown.
BIN
venv/Lib/site-packages/jws/__pycache__/exceptions.cpython-36.pyc
Normal file
BIN
venv/Lib/site-packages/jws/__pycache__/exceptions.cpython-36.pyc
Normal file
Binary file not shown.
BIN
venv/Lib/site-packages/jws/__pycache__/header.cpython-36.pyc
Normal file
BIN
venv/Lib/site-packages/jws/__pycache__/header.cpython-36.pyc
Normal file
Binary file not shown.
BIN
venv/Lib/site-packages/jws/__pycache__/utils.cpython-36.pyc
Normal file
BIN
venv/Lib/site-packages/jws/__pycache__/utils.cpython-36.pyc
Normal file
Binary file not shown.
187
venv/Lib/site-packages/jws/algos.py
Normal file
187
venv/Lib/site-packages/jws/algos.py
Normal file
|
@ -0,0 +1,187 @@
|
|||
from __future__ import absolute_import
|
||||
|
||||
import sys
|
||||
import re
|
||||
|
||||
from .exceptions import SignatureError, RouteMissingError, RouteEndpointError
|
||||
from .utils import to_bytes_2and3, constant_time_compare
|
||||
|
||||
class AlgorithmBase(object):
|
||||
"""Base for algorithm support classes."""
|
||||
pass
|
||||
|
||||
class HasherBase(AlgorithmBase):
|
||||
"""
|
||||
Base for algos which need a hash function. The ``bits`` param can be
|
||||
passed in from the capturing group of the routing regexp
|
||||
"""
|
||||
supported_bits = (256, 384, 512)
|
||||
def __init__(self, bits):
|
||||
"""
|
||||
Determine if the algorithm supports the requested bit depth and set up
|
||||
matching hash method from ``hashlib`` if necessary.
|
||||
"""
|
||||
self.bits = int(bits)
|
||||
if self.bits not in self.supported_bits:
|
||||
raise NotImplementedError("%s implements %s bit algorithms (given %d)" %
|
||||
(self.__class__, ', '.join(self.supported_bits), self.bits))
|
||||
if not getattr(self, 'hasher', None):
|
||||
import hashlib
|
||||
self.hasher = getattr(hashlib, 'sha%d' % self.bits)
|
||||
|
||||
class HMAC(HasherBase):
|
||||
"""
|
||||
Support for HMAC signing.
|
||||
"""
|
||||
def sign(self, msg, key):
|
||||
import hmac
|
||||
if sys.version < '3':
|
||||
utfkey = unicode(key).encode('utf8')
|
||||
else:
|
||||
utfkey = to_bytes_2and3(key)
|
||||
msg = to_bytes_2and3(msg)
|
||||
return hmac.new(utfkey, msg, self.hasher).digest()
|
||||
|
||||
def verify(self, msg, crypto, key):
|
||||
if not constant_time_compare(self.sign(msg, key), crypto):
|
||||
raise SignatureError("Could not validate signature")
|
||||
return True
|
||||
|
||||
class RSABase(HasherBase):
|
||||
"""
|
||||
Support for RSA signing.
|
||||
|
||||
The ``Crypto`` package >= 2.5 is required.
|
||||
|
||||
"""
|
||||
supported_bits = (256,384,512,) #:Seems to worka > 256
|
||||
|
||||
def __init__(self, padder, bits):
|
||||
super(RSABase,self).__init__(bits)
|
||||
self.padder = padder
|
||||
from Crypto.Hash import SHA256,SHA384,SHA512
|
||||
self.hashm = __import__('Crypto.Hash.SHA%d'%self.bits, globals(), locals(), ['*']).new()
|
||||
|
||||
def sign(self, msg, key):
|
||||
"""
|
||||
Signs a message with an RSA PrivateKey and hash method
|
||||
"""
|
||||
import Crypto.PublicKey.RSA as RSA
|
||||
|
||||
self.hashm.update(msg.encode('UTF-8'))
|
||||
## assume we are dealing with a real key
|
||||
# private_key = RSA.importKey(key)
|
||||
return self.padder.new(key).sign(self.hashm) # pycrypto 2.5
|
||||
|
||||
def verify(self, msg, crypto, key):
|
||||
"""
|
||||
Verifies a message using RSA cryptographic signature and key.
|
||||
|
||||
``crypto`` is the cryptographic signature
|
||||
``key`` is the verifying key. Can be a real key object or a string.
|
||||
"""
|
||||
import Crypto.PublicKey.RSA as RSA
|
||||
|
||||
self.hashm.update(msg.encode('UTF-8'))
|
||||
private_key = key
|
||||
if not isinstance(key, RSA._RSAobj):
|
||||
private_key = RSA.importKey(key)
|
||||
if not self.padder.new( private_key ).verify(self.hashm, crypto): #:pycrypto 2.5
|
||||
raise SignatureError("Could not validate signature")
|
||||
return True
|
||||
|
||||
class RSA_PKCS1_5(RSABase):
|
||||
def __init__(self, bits):
|
||||
import Crypto.Signature.PKCS1_v1_5 as PKCS
|
||||
super(RSA_PKCS1_5,self).__init__(PKCS, bits)
|
||||
|
||||
class RSA_PSS(RSABase):
|
||||
def __init__(self, bits):
|
||||
import Crypto.Signature.PKCS1_PSS as PSS
|
||||
super(RSA_PSS,self).__init__(PSS, bits)
|
||||
|
||||
class ECDSA(HasherBase):
|
||||
"""
|
||||
Support for ECDSA signing. This is the preferred algorithm for private/public key
|
||||
verification.
|
||||
|
||||
The ``ecdsa`` package is required. ``pip install ecdsa``
|
||||
"""
|
||||
bits_to_curve = {
|
||||
256: 'NIST256p',
|
||||
384: 'NIST384p',
|
||||
512: 'NIST521p',
|
||||
}
|
||||
def sign(self, msg, key):
|
||||
"""
|
||||
Signs a message with an ECDSA SigningKey and hash method matching the
|
||||
bit depth of curve algorithm.
|
||||
"""
|
||||
import ecdsa
|
||||
## assume the signing key is already a real key
|
||||
# curve = getattr(ecdsa, self.bits_to_curve[self.bits])
|
||||
# signing_key = ecdsa.SigningKey.from_string(key, curve=curve)
|
||||
msg = to_bytes_2and3(msg)
|
||||
return key.sign(msg, hashfunc=self.hasher)
|
||||
|
||||
def verify(self, msg, crypto, key):
|
||||
"""
|
||||
Verifies a message using ECDSA cryptographic signature and key.
|
||||
|
||||
``crypto`` is the cryptographic signature
|
||||
``key`` is the verifying key. Can be a real key object or a string.
|
||||
"""
|
||||
import ecdsa
|
||||
curve = getattr(ecdsa, self.bits_to_curve[self.bits])
|
||||
vk = key
|
||||
if not isinstance(vk, ecdsa.VerifyingKey):
|
||||
vk = ecdsa.VerifyingKey.from_string(key, curve=curve)
|
||||
try:
|
||||
vk.verify(crypto, to_bytes_2and3(msg), hashfunc=self.hasher)
|
||||
except ecdsa.BadSignatureError:
|
||||
raise SignatureError("Could not validate signature")
|
||||
except AssertionError:
|
||||
raise SignatureError("Could not validate signature")
|
||||
return True
|
||||
|
||||
# algorithm routing
|
||||
def route(name):
|
||||
return resolve(*find(name))
|
||||
|
||||
def find(name):
|
||||
# TODO: more error checking around custom algorithms
|
||||
algorithms = CUSTOM + list(DEFAULT)
|
||||
for (route, endpoint) in algorithms:
|
||||
match = re.match(route, name)
|
||||
if match:
|
||||
return (endpoint, match)
|
||||
raise RouteMissingError('endpoint matching %s could not be found' % name)
|
||||
|
||||
def resolve(endpoint, match):
|
||||
if callable(endpoint):
|
||||
# send result back through
|
||||
return resolve(endpoint(**match.groupdict()), match)
|
||||
|
||||
# get the sign and verify methods from dict or obj
|
||||
try:
|
||||
crypt = { 'sign': endpoint['sign'], 'verify': endpoint['verify'] }
|
||||
except TypeError:
|
||||
try:
|
||||
crypt = { 'sign': endpoint.sign, 'verify': endpoint.verify }
|
||||
except AttributeError as e:
|
||||
raise RouteEndpointError('route enpoint must have sign, verify as attributes or items of dict')
|
||||
# verify callability
|
||||
try:
|
||||
assert callable(crypt['sign'])
|
||||
assert callable(crypt['verify'])
|
||||
except AssertionError as e:
|
||||
raise RouteEndpointError('sign, verify of endpoint must be callable')
|
||||
return crypt
|
||||
|
||||
DEFAULT = (
|
||||
(r'^HS(?P<bits>256|384|512)$', HMAC),
|
||||
(r'^RS(?P<bits>256|384|512)$', RSA_PKCS1_5),
|
||||
(r'^PS(?P<bits>256|384|512)$', RSA_PSS),
|
||||
(r'^ES(?P<bits>256|384|512)$', ECDSA),
|
||||
)
|
||||
CUSTOM = []
|
11
venv/Lib/site-packages/jws/exceptions.py
Normal file
11
venv/Lib/site-packages/jws/exceptions.py
Normal file
|
@ -0,0 +1,11 @@
|
|||
class MissingKey(Exception): pass
|
||||
class MissingSigner(Exception): pass
|
||||
class MissingVerifier(Exception): pass
|
||||
|
||||
class SignatureError(Exception): pass
|
||||
class RouteMissingError(Exception): pass
|
||||
class RouteEndpointError(Exception): pass
|
||||
|
||||
class AlgorithmNotImplemented(Exception): pass
|
||||
class ParameterNotImplemented(Exception): pass
|
||||
class ParameterNotUnderstood(Exception): pass
|
70
venv/Lib/site-packages/jws/header.py
Normal file
70
venv/Lib/site-packages/jws/header.py
Normal file
|
@ -0,0 +1,70 @@
|
|||
from __future__ import absolute_import
|
||||
|
||||
import jws.algos as algos
|
||||
|
||||
from .exceptions import AlgorithmNotImplemented, ParameterNotImplemented, ParameterNotUnderstood, RouteMissingError
|
||||
|
||||
class HeaderBase(object):
|
||||
def __init__(self, name, value, data):
|
||||
self.name = name
|
||||
self.value = self.clean(value)
|
||||
self.data = data
|
||||
def sign(self): return self.value
|
||||
def verify(self): return self.value
|
||||
def clean(self, value): return value
|
||||
|
||||
class GenericString(HeaderBase):
|
||||
def clean(self, value):
|
||||
return str(value)
|
||||
|
||||
class SignNotImplemented(HeaderBase):
|
||||
def sign(self):
|
||||
raise ParameterNotImplemented("Header Parameter %s not implemented in the context of signing" % self.name)
|
||||
|
||||
class VerifyNotImplemented(HeaderBase):
|
||||
def verify(self):
|
||||
raise ParameterNotImplemented("Header Parameter %s not implemented in the context of verifying" % self.name)
|
||||
|
||||
class NotImplemented(HeaderBase):
|
||||
def clean(self, *a):
|
||||
raise ParameterNotUnderstood("Could not find an action for Header Parameter '%s'" % self.name)
|
||||
|
||||
class Algorithm(HeaderBase):
|
||||
def clean(self, value):
|
||||
try:
|
||||
self.methods = algos.route(value)
|
||||
except RouteMissingError as e:
|
||||
raise AlgorithmNotImplemented('"%s" not implemented.' % value)
|
||||
|
||||
def sign(self):
|
||||
self.data['signer'] = self.methods['sign']
|
||||
def verify(self):
|
||||
self.data['verifier'] = self.methods['verify']
|
||||
|
||||
KNOWN_HEADERS = {
|
||||
# REQUIRED, signing algo, see signing_methods
|
||||
'alg': Algorithm,
|
||||
# OPTIONAL, type of signed content
|
||||
'typ': GenericString,
|
||||
# OPTIONAL, JSON Key URL. See http://self-issued.info/docs/draft-jones-json-web-key.html
|
||||
'jku': VerifyNotImplemented,
|
||||
# OPTIONAL, key id, hint for which key to use.
|
||||
'kid': VerifyNotImplemented,
|
||||
# OPTIONAL, x.509 URL pointing to certificate or certificate chain
|
||||
'x5u': VerifyNotImplemented,
|
||||
# OPTIONAL, x.509 certificate thumbprint
|
||||
'x5t': VerifyNotImplemented,
|
||||
}
|
||||
|
||||
# data is by reference
|
||||
def process(data, step):
|
||||
for param in data['header']:
|
||||
# The JWS Header Input MUST be validated to only include parameters
|
||||
# and values whose syntax and semantics are both understood and
|
||||
# supported. --- this is why it defaults to NotImplemented, which
|
||||
# raises an exception
|
||||
cls = KNOWN_HEADERS.get(param, NotImplemented)
|
||||
instance = cls(param, data['header'][param], data)
|
||||
procedure = getattr(instance, step)
|
||||
procedure()
|
||||
return data
|
195
venv/Lib/site-packages/jws/tests.py
Normal file
195
venv/Lib/site-packages/jws/tests.py
Normal file
|
@ -0,0 +1,195 @@
|
|||
import unittest
|
||||
import jws
|
||||
import ecdsa
|
||||
import hashlib
|
||||
import Crypto.PublicKey.RSA as rsa
|
||||
|
||||
class TestJWS_helpers(unittest.TestCase):
|
||||
def test_default_algorithm_finding(self):
|
||||
names = [('ES256', jws.algos.ECDSA), ('ES384', jws.algos.ECDSA), ('ES512', jws.algos.ECDSA),
|
||||
('RS256', jws.algos.RSA_PKCS1_5), ('RS384', jws.algos.RSA_PKCS1_5), ('RS512', jws.algos.RSA_PKCS1_5),
|
||||
('PS256', jws.algos.RSA_PSS), ('PS384', jws.algos.RSA_PSS), ('PS512', jws.algos.RSA_PSS),
|
||||
('HS256', jws.algos.HMAC), ('HS384', jws.algos.HMAC), ('HS512', jws.algos.HMAC)]
|
||||
|
||||
# map(lambda (name, fn): self.assertIn(fn, jws.algos.find(name)), names)
|
||||
# Python 3+ support (no tuple unpacking)
|
||||
map(lambda name_fn: self.assertIn(name_fn[1], jws.algos.find(name_fn[0])), names)
|
||||
|
||||
def test_bad_algorithm_route(self):
|
||||
self.assertRaises(jws.algos.RouteMissingError, jws.algos.route, 'f7u12')
|
||||
|
||||
def test_algorithm_resolve(self):
|
||||
resolved = jws.algos.resolve(*jws.algos.find('ES256'))
|
||||
self.assertTrue(callable(resolved['sign']))
|
||||
self.assertTrue(callable(resolved['verify']))
|
||||
|
||||
def test_header_algo_find(self):
|
||||
data = {'header': {'alg': 'ES256'}}
|
||||
jws.header.process(data, 'sign')
|
||||
self.assertIn('signer', data)
|
||||
self.assertTrue(callable(data['signer']))
|
||||
|
||||
# make sure algo can actually sign
|
||||
sk256 = ecdsa.SigningKey.generate(ecdsa.NIST256p)
|
||||
found = data['signer']
|
||||
self.assertTrue(found('what', sk256))
|
||||
|
||||
def test_header_algo_missing(self):
|
||||
header = {'alg': 'f7u12'}
|
||||
self.assertRaises(jws.header.AlgorithmNotImplemented, jws.header.process, {'header':header}, 'sign')
|
||||
|
||||
def test_header_param_not_implemented(self):
|
||||
header = {'something': "i don't understand"}
|
||||
self.assertRaises(jws.header.ParameterNotUnderstood, jws.header.process, {'header':header}, 'sign')
|
||||
|
||||
def test_custom_header_handler(self):
|
||||
header = {'changekey':'somethingelse'}
|
||||
class ChangeKey(jws.header.HeaderBase):
|
||||
def sign(self): self.data['key'] = self.value
|
||||
jws.header.KNOWN_HEADERS.update({'changekey': ChangeKey})
|
||||
data = {'header': header}
|
||||
jws.header.process(data, 'sign')
|
||||
self.assertEqual(data['key'], 'somethingelse')
|
||||
|
||||
def test_custom_algorithm(self):
|
||||
class F7U12(jws.algos.AlgorithmBase):
|
||||
def __init__(self): pass
|
||||
def sign(self, msg, key):
|
||||
return 'u mad?' + key
|
||||
def verify(self, msg, sig, key):
|
||||
import sys
|
||||
if sys.version < '3':
|
||||
if sig == 'u mad?' + key: return '<trollface>'
|
||||
else:
|
||||
if sig == b'u mad?' + bytes(key, 'UTF-8'): return '<trollface>'
|
||||
raise jws.SignatureError('Y U NO GIVE GOOD SIGNATURE')
|
||||
jws.algos.CUSTOM = [ ('F7U12', F7U12) ]
|
||||
header = {'alg': 'F7U12'}
|
||||
payload = {'some': 'claim'}
|
||||
|
||||
sig = jws.sign(header, payload, 'wutlol')
|
||||
self.assertEqual(jws.verify(header,payload,sig, 'wutlol'), '<trollface>')
|
||||
self.assertRaises(jws.SignatureError, jws.verify, header, payload, sig, 'raaaaage')
|
||||
|
||||
|
||||
class TestJWS_ecdsa(unittest.TestCase):
|
||||
sk256 = ecdsa.SigningKey.generate(ecdsa.NIST256p)
|
||||
sk384 = ecdsa.SigningKey.generate(ecdsa.NIST384p)
|
||||
sk512 = ecdsa.SigningKey.generate(ecdsa.NIST521p) # yes, 521
|
||||
|
||||
def setUp(self):
|
||||
self.payload = {
|
||||
'whine': {'luke': 'But I was going into Tosche station to pick up some power converters!'},
|
||||
'rebuttal': {'owen': "You can waste time with your friends when you're done with your chores."},
|
||||
}
|
||||
|
||||
def test_valid_ecdsa256(self):
|
||||
key = self.sk256
|
||||
header = {'alg': 'ES256'}
|
||||
sig = jws.sign(header, self.payload, key)
|
||||
self.assertTrue(len(sig) > 0)
|
||||
self.assertTrue(jws.verify(header, self.payload, sig, key.get_verifying_key()))
|
||||
|
||||
def test_valid_ecdsa384(self):
|
||||
key = self.sk384
|
||||
header = {'alg': 'ES384'}
|
||||
sig = jws.sign(header, self.payload, key)
|
||||
self.assertTrue(len(sig) > 0)
|
||||
self.assertTrue(jws.verify(header, self.payload, sig, key.get_verifying_key()))
|
||||
|
||||
def test_valid_ecdsa512(self):
|
||||
key = self.sk512
|
||||
header = {'alg': 'ES512'}
|
||||
sig = jws.sign(header, self.payload, key)
|
||||
self.assertTrue(len(sig) > 0)
|
||||
self.assertTrue(jws.verify(header, self.payload, sig, key.get_verifying_key()))
|
||||
|
||||
def test_invalid_ecdsa_decode(self):
|
||||
header = {'alg': 'ES256'}
|
||||
sig = jws.sign(header, self.payload, self.sk256)
|
||||
vk = self.sk256.get_verifying_key()
|
||||
badkey = self.sk384.get_verifying_key()
|
||||
self.assertRaises(jws.SignatureError, jws.verify, header, self.payload, 'not a good sig', vk)
|
||||
self.assertRaises(jws.SignatureError, jws.verify, header, {'bad':1}, sig, vk)
|
||||
self.assertRaises(jws.SignatureError, jws.verify, header, {'bad':1}, sig, badkey)
|
||||
|
||||
|
||||
class TestJWS_hmac(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.payload = {
|
||||
'whine': {'luke': 'But I was going into Tosche station to pick up some power converters!'},
|
||||
'rebuttal': {'owen': "You can waste time with your friends when you're done with your chores."},
|
||||
}
|
||||
|
||||
def test_valid_hmac256(self):
|
||||
header = {'alg': 'HS256'}
|
||||
sig = jws.sign(header, self.payload, 'secret')
|
||||
self.assertTrue(len(sig) > 0)
|
||||
self.assertTrue(jws.verify(header, self.payload, sig, 'secret'))
|
||||
|
||||
def test_valid_hmac384(self):
|
||||
header = {'alg': 'HS384'}
|
||||
sig = jws.sign(header, self.payload, 'secret')
|
||||
self.assertTrue(len(sig) > 0)
|
||||
self.assertTrue(jws.verify(header, self.payload, sig, 'secret'))
|
||||
|
||||
def test_valid_hmac512(self):
|
||||
header = {'alg': 'HS512'}
|
||||
sig = jws.sign(header, self.payload, 'secret')
|
||||
self.assertTrue(len(sig) > 0)
|
||||
self.assertTrue(jws.verify(header, self.payload, sig, 'secret'))
|
||||
|
||||
def test_invalid_hmac(self):
|
||||
header = {'alg': 'HS512'}
|
||||
sig = jws.sign(header, self.payload, 'secret')
|
||||
self.assertRaises(jws.SignatureError(header, self.payload, sig, 'failwhale'))
|
||||
|
||||
class TestJWS_rsa(unittest.TestCase):
|
||||
private = rsa.generate(2048)
|
||||
def setUp(self):
|
||||
self.payload = {
|
||||
'whine': {'luke': 'But I was going into Tosche station to pick up some power converters!'},
|
||||
'rebuttal': {'owen': "You can waste time with your friends when you're done with your chores."},
|
||||
}
|
||||
|
||||
def test_valid_rsa256_pkcs1_5(self):
|
||||
header = {'alg': 'RS256'}
|
||||
sig = jws.sign(header, self.payload, self.private)
|
||||
public = self.private.publickey()
|
||||
self.assertTrue(len(sig) > 0)
|
||||
self.assertTrue(jws.verify(header, self.payload, sig, public))
|
||||
|
||||
def test_valid_rsa384_pkcs1_5(self):
|
||||
header = {'alg': 'RS384'}
|
||||
sig = jws.sign(header, self.payload, self.private)
|
||||
public = self.private.publickey()
|
||||
self.assertTrue(len(sig) > 0)
|
||||
self.assertTrue(jws.verify(header, self.payload, sig, public))
|
||||
|
||||
def test_valid_rsa512_pkcs1_5(self):
|
||||
header = {'alg': 'RS512'}
|
||||
sig = jws.sign(header, self.payload, self.private)
|
||||
public = self.private.publickey()
|
||||
self.assertTrue(len(sig) > 0)
|
||||
self.assertTrue(jws.verify(header, self.payload, sig, public))
|
||||
|
||||
def test_valid_rsa256_pss(self):
|
||||
header = {'alg': 'PS256'}
|
||||
sig = jws.sign(header, self.payload, self.private)
|
||||
public = self.private.publickey()
|
||||
self.assertTrue(len(sig) > 0)
|
||||
self.assertTrue(jws.verify(header, self.payload, sig, public))
|
||||
|
||||
def test_valid_rsa384_pss(self):
|
||||
header = {'alg': 'PS384'}
|
||||
sig = jws.sign(header, self.payload, self.private)
|
||||
public = self.private.publickey()
|
||||
self.assertTrue(len(sig) > 0)
|
||||
self.assertTrue(jws.verify(header, self.payload, sig, public))
|
||||
|
||||
def test_valid_rsa512_pss(self):
|
||||
header = {'alg': 'PS512'}
|
||||
sig = jws.sign(header, self.payload, self.private)
|
||||
public = self.private.publickey()
|
||||
self.assertTrue(len(sig) > 0)
|
||||
self.assertTrue(jws.verify(header, self.payload, sig, public))
|
56
venv/Lib/site-packages/jws/utils.py
Normal file
56
venv/Lib/site-packages/jws/utils.py
Normal file
|
@ -0,0 +1,56 @@
|
|||
from __future__ import unicode_literals
|
||||
|
||||
import base64
|
||||
import json
|
||||
|
||||
import sys
|
||||
if sys.version < '3':
|
||||
text_type = unicode
|
||||
binary_type = str
|
||||
else:
|
||||
text_type = str
|
||||
binary_type = bytes
|
||||
|
||||
def to_bytes_2and3(s):
|
||||
if type(s) != binary_type:
|
||||
s = bytes(s, 'UTF-8')
|
||||
return s
|
||||
|
||||
def base64url_decode(input):
|
||||
input = to_bytes_2and3(input)
|
||||
input += b'=' * (4 - (len(input) % 4))
|
||||
return base64.urlsafe_b64decode(input)
|
||||
def base64url_encode(input):
|
||||
return base64.urlsafe_b64encode(to_bytes_2and3(input)).replace(b'=', b'')
|
||||
|
||||
def to_json(a): return json.dumps(a)
|
||||
def from_json(a): return json.loads(a)
|
||||
def to_base64(a): return base64url_encode(a)
|
||||
def from_base64(a): return base64url_decode(a)
|
||||
def encode(a): return to_base64(to_json(a))
|
||||
def decode(a): return from_json(from_base64(a))
|
||||
|
||||
#Taken from Django Source Code
|
||||
|
||||
def _ord(val):
|
||||
if sys.version < '3':
|
||||
return ord(val)
|
||||
else:
|
||||
return val
|
||||
|
||||
def constant_time_compare(val1, val2):
|
||||
"""
|
||||
Returns True if the two strings are equal, False otherwise.
|
||||
|
||||
The time taken is independent of the number of characters that match.
|
||||
|
||||
For the sake of simplicity, this function executes in constant time only
|
||||
when the two strings have the same length. It short-circuits when they
|
||||
have different lengths.
|
||||
"""
|
||||
if len(val1) != len(val2):
|
||||
return False
|
||||
result = 0
|
||||
for x, y in zip(val1, val2):
|
||||
result |= _ord(x) ^ _ord(y)
|
||||
return result == 0
|
Loading…
Add table
Add a link
Reference in a new issue