diff --git a/.gitignore b/.gitignore index 669c805..41079f8 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ cscope.out .tox .coverage *.egg-info +env diff --git a/jwcrypto/jwa.py b/jwcrypto/jwa.py index 5f0d642..a3812f3 100644 --- a/jwcrypto/jwa.py +++ b/jwcrypto/jwa.py @@ -870,10 +870,10 @@ def verify(self, key, payload, signature): class _RawJWE: - def encrypt(self, k, a, m): + def encrypt(self, k, aad, m): raise NotImplementedError - def decrypt(self, k, a, iv, e, t): + def decrypt(self, k, aad, iv, e, t): raise NotImplementedError @@ -887,10 +887,10 @@ def __init__(self, hashfn): self.blocksize = algorithms.AES.block_size self.wrap_key_size = self.keysize * 2 - def _mac(self, k, a, iv, e): - al = _encode_int(_bitsize(a), 64) + def _mac(self, k, ai, iv, e): + al = _encode_int(_bitsize(ai), 64) h = hmac.HMAC(k, self.hashfn, backend=self.backend) - h.update(a) + h.update(ai) h.update(iv) h.update(e) h.update(al) @@ -898,7 +898,7 @@ def _mac(self, k, a, iv, e): return m[:_inbytes(self.keysize)] # RFC 7518 - 5.2.2 - def encrypt(self, k, a, m): + def encrypt(self, k, ai, m): """ Encrypt according to the selected encryption and hashing functions. @@ -924,11 +924,11 @@ def encrypt(self, k, a, m): e = encryptor.update(padded_data) + encryptor.finalize() # mac - t = self._mac(hkey, a, iv, e) + t = self._mac(hkey, ai, iv, e) return (iv, e, t) - def decrypt(self, k, a, iv, e, t): + def decrypt(self, k, ai, iv, e, t): """ Decrypt according to the selected encryption and hashing functions. :param k: Encryption key @@ -946,7 +946,7 @@ def decrypt(self, k, a, iv, e, t): dkey = k[_inbytes(self.keysize):] # verify mac - if not constant_time.bytes_eq(t, self._mac(hkey, a, iv, e)): + if not constant_time.bytes_eq(t, self._mac(hkey, ai, iv, e)): raise InvalidSignature('Failed to verify MAC') # decrypt @@ -1003,7 +1003,7 @@ def __init__(self): self.wrap_key_size = self.keysize # RFC 7518 - 5.3 - def encrypt(self, k, a, m): + def encrypt(self, k, ai, m): """ Encrypt according to the selected encryption and hashing functions. @@ -1017,16 +1017,16 @@ def encrypt(self, k, a, m): cipher = Cipher(algorithms.AES(k), modes.GCM(iv), backend=self.backend) encryptor = cipher.encryptor() - encryptor.authenticate_additional_data(a) + encryptor.authenticate_additional_data(ai) e = encryptor.update(m) + encryptor.finalize() return (iv, e, encryptor.tag) - def decrypt(self, k, a, iv, e, t): + def decrypt(self, k, ai, iv, e, t): """ Decrypt according to the selected encryption and hashing functions. :param k: Encryption key - :param a: Additional Authenticated Data + :param ai: Additional Authenticated Data :param iv: Initialization Vector :param e: Ciphertext :param t: Authentication Tag @@ -1036,7 +1036,7 @@ def decrypt(self, k, a, iv, e, t): cipher = Cipher(algorithms.AES(k), modes.GCM(iv, t), backend=self.backend) decryptor = cipher.decryptor() - decryptor.authenticate_additional_data(a) + decryptor.authenticate_additional_data(ai) return decryptor.update(e) + decryptor.finalize() diff --git a/jwcrypto/jwe.py b/jwcrypto/jwe.py index 0567810..9412881 100644 --- a/jwcrypto/jwe.py +++ b/jwcrypto/jwe.py @@ -525,17 +525,17 @@ def deserialize(self, raw_jwe, key=None): o['header'] = json_encode(djwe['header']) except ValueError as e: - c = raw_jwe.split('.') - if len(c) != 5: + data = raw_jwe.split('.') + if len(data) != 5: raise InvalidJWEData() from e - p = base64url_decode(c[0]) + p = base64url_decode(data[0]) o['protected'] = p.decode('utf-8') - ekey = base64url_decode(c[1]) + ekey = base64url_decode(data[1]) if ekey != b'': - o['encrypted_key'] = base64url_decode(c[1]) - o['iv'] = base64url_decode(c[2]) - o['ciphertext'] = base64url_decode(c[3]) - o['tag'] = base64url_decode(c[4]) + o['encrypted_key'] = base64url_decode(data[1]) + o['iv'] = base64url_decode(data[2]) + o['ciphertext'] = base64url_decode(data[3]) + o['tag'] = base64url_decode(data[4]) self.objects = o @@ -581,11 +581,11 @@ def __eq__(self, other): try: return self.serialize() == other.serialize() except Exception: # pylint: disable=broad-except - a = {'plaintext': self.plaintext} - a.update(self.objects) - b = {'plaintext': other.plaintext} - b.update(other.objects) - return a == b + data1 = {'plaintext': self.plaintext} + data1.update(self.objects) + data2 = {'plaintext': other.plaintext} + data2.update(other.objects) + return data1 == data2 def __str__(self): try: diff --git a/jwcrypto/jwk.py b/jwcrypto/jwk.py index d25c6b5..43e8b67 100644 --- a/jwcrypto/jwk.py +++ b/jwcrypto/jwk.py @@ -606,11 +606,11 @@ def import_key(self, **kwargs): # check key_ops if 'key_ops' in newkey: for ko in newkey['key_ops']: - c = 0 + cnt = 0 for cko in newkey['key_ops']: if ko == cko: - c += 1 - if c != 1: + cnt += 1 + if cnt != 1: raise InvalidJWKValue('Duplicate values in "key_ops"') # check use/key_ops consistency @@ -1032,26 +1032,26 @@ def export_to_pem(self, private_key=False, password=False): :return: A serialized bytes buffer containing a PEM formatted key. :rtype: `bytes` """ - e = serialization.Encoding.PEM + enc = serialization.Encoding.PEM if private_key: if not self.has_private: raise InvalidJWKType("No private key available") f = serialization.PrivateFormat.PKCS8 if password is None: - a = serialization.NoEncryption() + enc_alg = serialization.NoEncryption() elif isinstance(password, bytes): - a = serialization.BestAvailableEncryption(password) + enc_alg = serialization.BestAvailableEncryption(password) elif password is False: raise ValueError("The password must be None or a bytes string") else: raise TypeError("The password string must be bytes") return self._get_private_key().private_bytes( - encoding=e, format=f, encryption_algorithm=a) + encoding=enc, format=f, encryption_algorithm=enc_alg) else: if not self.has_public: raise InvalidJWKType("No public key available") f = serialization.PublicFormat.SubjectPublicKeyInfo - return self._get_public_key().public_bytes(encoding=e, format=f) + return self._get_public_key().public_bytes(encoding=enc, format=f) @classmethod def from_pyca(cls, key): @@ -1284,6 +1284,7 @@ class JWKSet(dict): Creates a special key 'keys' that is of a type derived from 'set' The 'keys' attribute accepts only :class:`jwcrypto.jwk.JWK` elements. """ + def __init__(self, *args, **kwargs): super(JWKSet, self).__init__() super(JWKSet, self).__setitem__('keys', _JWKkeys()) diff --git a/jwcrypto/jws.py b/jwcrypto/jws.py index a8284d9..4beb6d9 100644 --- a/jwcrypto/jws.py +++ b/jwcrypto/jws.py @@ -279,18 +279,20 @@ def _verify(self, alg, key, payload, signature, protected, header=None): raise InvalidJWSSignature('No "alg" in headers') if alg: if 'alg' in p and alg != p['alg']: - raise InvalidJWSSignature('"alg" mismatch, requested ' - '"%s", found "%s"' % (alg, - p['alg'])) - a = alg + raise InvalidJWSSignature( + '"alg" mismatch, requested' + f''' "{alg}", found "{p['alg']}"''' + ) + resulting_alg = alg else: - a = p['alg'] + resulting_alg = p['alg'] # the following will verify the "alg" is supported and the signature # verifies if isinstance(key, JWK): - c = JWSCore(a, key, protected, payload, self._allowed_algs) - c.verify(signature) + signer = JWSCore(resulting_alg, key, protected, + payload, self._allowed_algs) + signer.verify(signature) self.verifylog.append("Success") elif isinstance(key, JWKSet): keys = key @@ -303,8 +305,11 @@ def _verify(self, alg, key, payload, signature, protected, header=None): for k in keys: try: - c = JWSCore(a, k, protected, payload, self._allowed_algs) - c.verify(signature) + signer2 = JWSCore( + resulting_alg, k, protected, + payload, self._allowed_algs + ) + signer2.verify(signature) self.verifylog.append("Success") break except Exception as e: # pylint: disable=broad-except @@ -455,16 +460,16 @@ def deserialize(self, raw_jws, key=None, alg=None): o['payload'] = djws['payload'] except ValueError: - c = raw_jws.split('.') - if len(c) != 3: + data = raw_jws.split('.') + if len(data) != 3: raise InvalidJWSObject('Unrecognized' ' representation') from None - p = base64url_decode(str(c[0])) + p = base64url_decode(str(data[0])) if len(p) > 0: o['protected'] = p.decode('utf-8') self._deserialize_b64(o, o['protected']) - o['payload'] = base64url_decode(str(c[1])) - o['signature'] = base64url_decode(str(c[2])) + o['payload'] = base64url_decode(str(data[1])) + o['signature'] = base64url_decode(str(data[2])) self.objects = o diff --git a/jwcrypto/jwt.py b/jwcrypto/jwt.py index b7a29b8..a30804b 100644 --- a/jwcrypto/jwt.py +++ b/jwcrypto/jwt.py @@ -256,20 +256,20 @@ def claims(self): return self._claims @claims.setter - def claims(self, c): - if not isinstance(c, dict): + def claims(self, data): + if not isinstance(data, dict): if not self._reg_claims: # no default_claims, can return immediately - self._claims = c + self._claims = data return - c = json_decode(c) + data = json_decode(data) else: # _add_default_claims modifies its argument # so we must always copy it. - c = copy.deepcopy(c) + data = copy.deepcopy(data) - self._add_default_claims(c) - self._claims = json_encode(c) + self._add_default_claims(data) + self._claims = json_encode(data) @property def token(self): @@ -661,10 +661,10 @@ def deserialize(self, jwt, key=None): decryption key, or a (:class:`jwcrypto.jwk.JWKSet`) that contains a key indexed by the 'kid' header. """ - c = jwt.count('.') - if c == 2: + data = jwt.count('.') + if data == 2: self.token = JWS() - elif c == 4: + elif data == 4: self.token = JWE() else: raise ValueError("Token format unrecognized") diff --git a/jwcrypto/tests.py b/jwcrypto/tests.py index 988a33d..6069fab 100644 --- a/jwcrypto/tests.py +++ b/jwcrypto/tests.py @@ -703,10 +703,10 @@ def test_pem_okp(self): pubkey = jwk.JWK.from_pem(Ed25519PublicPEM) self.assertTrue(pubkey.has_public) self.assertFalse(pubkey.has_private) - c = jws.JWS() - c.deserialize(sig, pubkey, alg="EdDSA") - self.assertTrue(c.objects['valid']) - self.assertEqual(c.payload, payload) + jws_token = jws.JWS() + jws_token.deserialize(sig, pubkey, alg="EdDSA") + self.assertTrue(jws_token.objects['valid']) + self.assertEqual(jws_token.payload, payload) def test_jwk_as_dict(self): key = jwk.JWK(**PublicKeys['keys'][0]) @@ -1818,9 +1818,9 @@ def test_empty_claims(self): t.make_signed_token(key) token = t.serialize() - c = jwt.JWT() - c.deserialize(token, key) - self.assertEqual('{}', c.claims) + _jwt = jwt.JWT() + _jwt.deserialize(token, key) + self.assertEqual('{}', _jwt.claims) # empty string is also valid t = jwt.JWT('{"alg":"HS256"}', '') @@ -1833,9 +1833,9 @@ def test_empty_claims(self): t.make_signed_token(key) token = t.serialize() - c = jwt.JWT() - c.deserialize(token, key) - self.assertEqual(' ', c.claims) + _jwt = jwt.JWT() + _jwt.deserialize(token, key) + self.assertEqual(' ', _jwt.claims) def test_Issue_209(self): key = jwk.JWK(**A3_key) @@ -2211,26 +2211,26 @@ class TestOverloadedOperators(unittest.TestCase): def test_jws_equality(self): key = jwk.JWK.generate(kty='oct', size=256) payload = "My Integrity protected message" - a = jws.JWS(payload.encode('utf-8')) - b = jws.JWS(payload.encode('utf-8')) - self.assertEqual(a, b) + signer_a = jws.JWS(payload.encode('utf-8')) + signer_b = jws.JWS(payload.encode('utf-8')) + self.assertEqual(signer_a, signer_b) - a.add_signature(key, None, - json_encode({"alg": "HS256"}), - json_encode({"kid": key.thumbprint()})) + signer_a.add_signature(key, None, + json_encode({"alg": "HS256"}), + json_encode({"kid": key.thumbprint()})) # One is signed, the other is not - self.assertNotEqual(a, b) + self.assertNotEqual(signer_a, signer_b) - b.add_signature(key, None, - json_encode({"alg": "HS256"}), - json_encode({"kid": key.thumbprint()})) + signer_b.add_signature(key, None, + json_encode({"alg": "HS256"}), + json_encode({"kid": key.thumbprint()})) # This kind of signature is deterministic so they should be equal - self.assertEqual(a, b) + self.assertEqual(signer_a, signer_b) - c = jws.JWS.from_jose_token(a.serialize()) - self.assertNotEqual(a, c) - c.verify(key) - self.assertEqual(a, c) + signer_c = jws.JWS.from_jose_token(signer_a.serialize()) + self.assertNotEqual(signer_a, signer_c) + signer_c.verify(key) + self.assertEqual(signer_a, signer_c) def test_jws_representations(self): key = jwk.JWK.generate(kty='oct', size=256) @@ -2250,24 +2250,24 @@ def test_jws_representations(self): def test_jwe_equality(self): key = jwk.JWK.generate(kty='oct', size=256) payload = "My Encrypted message" - a = jwe.JWE(payload.encode('utf-8'), - json_encode({"alg": "A256KW", - "enc": "A256CBC-HS512"})) - b = jwe.JWE(payload.encode('utf-8'), - json_encode({"alg": "A256KW", - "enc": "A256CBC-HS512"})) - self.assertEqual(a, b) - - a.add_recipient(key) + signer_a = jwe.JWE(payload.encode('utf-8'), + json_encode({"alg": "A256KW", + "enc": "A256CBC-HS512"})) + signer_b = jwe.JWE(payload.encode('utf-8'), + json_encode({"alg": "A256KW", + "enc": "A256CBC-HS512"})) + self.assertEqual(signer_a, signer_b) + + signer_a.add_recipient(key) # One is encrypted, the other is not - self.assertNotEqual(a, b) + self.assertNotEqual(signer_a, signer_b) - b.add_recipient(key) + signer_b.add_recipient(key) # Encryption generates a random CEK so tokens will always differ - self.assertNotEqual(a, b) + self.assertNotEqual(signer_a, signer_b) - c = jwe.JWE.from_jose_token(a.serialize()) - self.assertEqual(a, c) + signer_c = jwe.JWE.from_jose_token(signer_a.serialize()) + self.assertEqual(signer_a, signer_c) def test_jwe_representations(self): key = jwk.JWK.generate(kty='oct', size=256) @@ -2289,29 +2289,29 @@ def test_jwe_representations(self): def test_jwt_equality(self): key = jwk.JWK.generate(kty='oct', size=256) - a = jwt.JWT(header={"alg": "HS256"}, - claims={"info": "I'm a signed token"}) - b = jwt.JWT(header={"alg": "HS256"}, - claims={"info": "I'm a signed token"}) - self.assertEqual(a, b) + signer_a = jwt.JWT(header={"alg": "HS256"}, + claims={"info": "I'm a signed token"}) + signer_b = jwt.JWT(header={"alg": "HS256"}, + claims={"info": "I'm a signed token"}) + self.assertEqual(signer_a, signer_b) - a.make_signed_token(key) + signer_a.make_signed_token(key) # One is signed, the other is not - self.assertNotEqual(a, b) + self.assertNotEqual(signer_a, signer_b) - b.make_signed_token(key) + signer_b.make_signed_token(key) # This kind of signature is deterministic so they should be equal - self.assertEqual(a, b) + self.assertEqual(signer_a, signer_b) - c = jwt.JWT.from_jose_token(a.serialize()) - self.assertNotEqual(a, c) - c.validate(key) - self.assertEqual(a, c) + signer_c = jwt.JWT.from_jose_token(signer_a.serialize()) + self.assertNotEqual(signer_a, signer_c) + signer_c.validate(key) + self.assertEqual(signer_a, signer_c) ea = jwt.JWT(header={"alg": "A256KW", "enc": "A256CBC-HS512"}, - claims=a.serialize()) + claims=signer_a.serialize()) eb = jwt.JWT(header={"alg": "A256KW", "enc": "A256CBC-HS512"}, - claims=b.serialize()) + claims=signer_b.serialize()) self.assertEqual(ea, eb) ea.make_encrypted_token(key)