Skip to content

Commit

Permalink
STY: Make encryption module private, apply pre-commit (#1010)
Browse files Browse the repository at this point in the history
Related to #749
  • Loading branch information
MartinThoma authored Jun 19, 2022
1 parent 868f977 commit 797963a
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 74 deletions.
188 changes: 136 additions & 52 deletions PyPDF2/encryption.py → PyPDF2/_encryption.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# coding: utf-8
#
# Copyright (c) 2022, exiledkingcc
# All rights reserved.
#
Expand Down Expand Up @@ -30,24 +28,17 @@
import hashlib
import random
import struct
from typing import (
Dict,
Optional,
Tuple,
Union,
cast,
)
from typing import Dict, Optional, Tuple, Union, cast

from PyPDF2.errors import DependencyError
from PyPDF2.generic import (
PdfObject,
ArrayObject,
ByteStringObject,
DictionaryObject,
PdfObject,
StreamObject,
TextStringObject,
createStringObject,

)


Expand All @@ -64,7 +55,7 @@ class CryptIdentity(CryptBase):


try:
from Crypto.Cipher import ARC4, AES
from Crypto.Cipher import AES, ARC4

class CryptRC4(CryptBase):
def __init__(self, key: bytes) -> None:
Expand Down Expand Up @@ -92,7 +83,7 @@ def decrypt(self, data: bytes) -> bytes:
data = data[16:]
aes = AES.new(self.key, AES.MODE_CBC, iv)
d = aes.decrypt(data)
return d[:-d[-1]]
return d[: -d[-1]]

def RC4_encrypt(key: bytes, data: bytes) -> bytes:
return ARC4.ARC4Cipher(key).encrypt(data)
Expand Down Expand Up @@ -167,7 +158,9 @@ def AES_CBC_decrypt(key: bytes, iv: bytes, data: bytes) -> bytes:


class CryptFilter:
def __init__(self, stmCrypt: CryptBase, strCrypt: CryptBase, efCrypt: CryptBase) -> None:
def __init__(
self, stmCrypt: CryptBase, strCrypt: CryptBase, efCrypt: CryptBase
) -> None:
self.stmCrypt = stmCrypt
self.strCrypt = strCrypt
self.efCrypt = efCrypt
Expand All @@ -191,12 +184,42 @@ def decrypt_object(self, obj: PdfObject) -> PdfObject:
return obj


_PADDING = bytes([
0x28, 0xBF, 0x4E, 0x5E, 0x4E, 0x75, 0x8A, 0x41,
0x64, 0x00, 0x4E, 0x56, 0xFF, 0xFA, 0x01, 0x08,
0x2E, 0x2E, 0x00, 0xB6, 0xD0, 0x68, 0x3E, 0x80,
0x2F, 0x0C, 0xA9, 0xFE, 0x64, 0x53, 0x69, 0x7A
])
_PADDING = bytes(
[
0x28,
0xBF,
0x4E,
0x5E,
0x4E,
0x75,
0x8A,
0x41,
0x64,
0x00,
0x4E,
0x56,
0xFF,
0xFA,
0x01,
0x08,
0x2E,
0x2E,
0x00,
0xB6,
0xD0,
0x68,
0x3E,
0x80,
0x2F,
0x0C,
0xA9,
0xFE,
0x64,
0x53,
0x69,
0x7A,
]
)


def _padding(data: bytes) -> bytes:
Expand All @@ -207,14 +230,22 @@ def _bytes(value: Union[bytes, str]) -> bytes:
if isinstance(value, bytes):
return value
try:
return value.encode('latin-1')
return value.encode("latin-1")
except Exception: # noqa
return value.encode('utf-8')
return value.encode("utf-8")


class AlgR4:
@staticmethod
def compute_key(password: bytes, rev: int, key_size: int, o_entry: bytes, P: int, id1_entry: bytes, metadata_encrypted: bool) -> bytes:
def compute_key(
password: bytes,
rev: int,
key_size: int,
o_entry: bytes,
P: int,
id1_entry: bytes,
metadata_encrypted: bool,
) -> bytes:
"""
Algorithm 2: Computing an encryption key
Expand Down Expand Up @@ -247,7 +278,7 @@ def compute_key(password: bytes, rev: int, key_size: int, o_entry: bytes, P: int
a = _padding(password)
u_hash = hashlib.md5(a)
u_hash.update(o_entry)
u_hash.update(struct.pack('<I', P))
u_hash.update(struct.pack("<I", P))
u_hash.update(id1_entry)
if rev >= 3 and not metadata_encrypted:
u_hash.update(b"\xff\xff\xff\xff")
Expand Down Expand Up @@ -288,7 +319,7 @@ def compute_O_value_key(owner_pwd: bytes, rev: int, key_size: int) -> bytes:
for _ in range(50):
o_hash_digest = hashlib.md5(o_hash_digest).digest()

rc4_key = o_hash_digest[:key_size // 8]
rc4_key = o_hash_digest[: key_size // 8]
return rc4_key

@staticmethod
Expand All @@ -298,7 +329,7 @@ def compute_O_value(rc4_key: bytes, user_pwd: bytes, rev: int) -> bytes:
rc4_enc = RC4_encrypt(rc4_key, a)
if rev >= 3:
for i in range(1, 20):
key = bytes(bytearray([x ^ i for x in rc4_key]))
key = bytes(bytearray(x ^ i for x in rc4_key))
rc4_enc = RC4_encrypt(key, rc4_enc)
return rc4_enc

Expand Down Expand Up @@ -338,14 +369,20 @@ def compute_U_value(key: bytes, rev: int, id1_entry: bytes) -> bytes:
u_hash.update(id1_entry)
rc4_enc = RC4_encrypt(key, u_hash.digest())
for i in range(1, 20):
rc4_key = bytes(bytearray([x ^ i for x in key]))
rc4_key = bytes(bytearray(x ^ i for x in key))
rc4_enc = RC4_encrypt(rc4_key, rc4_enc)
return _padding(rc4_enc)

@staticmethod
def verify_user_password(
user_pwd: bytes, rev: int, key_size: int, o_entry: bytes, u_entry: bytes,
P: int, id1_entry: bytes, metadata_encrypted: bool
user_pwd: bytes,
rev: int,
key_size: int,
o_entry: bytes,
u_entry: bytes,
P: int,
id1_entry: bytes,
metadata_encrypted: bool,
) -> bytes:
"""
Algorithm 6: Authenticating the user password
Expand All @@ -360,7 +397,9 @@ def verify_user_password(
encryption dictionary’s U (user password) value (Security handlers of revision 3 or greater)") shall be used
to decrypt the document.
"""
key = AlgR4.compute_key(user_pwd, rev, key_size, o_entry, P, id1_entry, metadata_encrypted)
key = AlgR4.compute_key(
user_pwd, rev, key_size, o_entry, P, id1_entry, metadata_encrypted
)
u_value = AlgR4.compute_U_value(key, rev, id1_entry)
if rev >= 3:
u_value = u_value[:16]
Expand All @@ -371,8 +410,14 @@ def verify_user_password(

@staticmethod
def verify_owner_password(
owner_pwd: bytes, rev: int, key_size: int, o_entry: bytes, u_entry: bytes,
P: int, id1_entry: bytes, metadata_encrypted: bool
owner_pwd: bytes,
rev: int,
key_size: int,
o_entry: bytes,
u_entry: bytes,
P: int,
id1_entry: bytes,
metadata_encrypted: bool,
) -> bytes:
"""
Algorithm 7: Authenticating the owner password
Expand All @@ -396,14 +441,18 @@ def verify_owner_password(
else:
u_pwd = o_entry
for i in range(19, -1, -1):
key = bytes(bytearray([x ^ i for x in rc4_key]))
key = bytes(bytearray(x ^ i for x in rc4_key))
u_pwd = RC4_decrypt(key, u_pwd)
return AlgR4.verify_user_password(u_pwd, rev, key_size, o_entry, u_entry, P, id1_entry, metadata_encrypted)
return AlgR4.verify_user_password(
u_pwd, rev, key_size, o_entry, u_entry, P, id1_entry, metadata_encrypted
)


class AlgR5:
@staticmethod
def verify_owner_password(password: bytes, o_value: bytes, oe_value: bytes, u_value: bytes) -> bytes:
def verify_owner_password(
password: bytes, o_value: bytes, oe_value: bytes, u_value: bytes
) -> bytes:
"""
Algorithm 3.2a Computing an encryption key
Expand Down Expand Up @@ -452,15 +501,19 @@ def verify_user_password(password: bytes, u_value: bytes, ue_value: bytes) -> by
return AES_CBC_decrypt(tmp_key, iv, ue_value)

@staticmethod
def verify_perms(key: bytes, perms: bytes, p: int, metadata_encrypted: bool) -> bool:
def verify_perms(
key: bytes, perms: bytes, p: int, metadata_encrypted: bool
) -> bool:
"""see :func:`verify_owner_password` and :func:`compute_Perms_value`"""
b8 = b'T' if metadata_encrypted else b'F'
b8 = b"T" if metadata_encrypted else b"F"
p1 = struct.pack("<I", p) + b"\xff\xff\xff\xff" + b8 + b"adb"
p2 = AES_ECB_decrypt(key, perms)
return p1 == p2[:12]

@staticmethod
def generate_values(user_pwd: bytes, owner_pwd: bytes, key: bytes, p: int, metadata_encrypted: bool) -> dict:
def generate_values(
user_pwd: bytes, owner_pwd: bytes, key: bytes, p: int, metadata_encrypted: bool
) -> dict:
u_value, ue_value = AlgR5.compute_U_value(user_pwd, key)
o_value, oe_value = AlgR5.compute_O_value(owner_pwd, key, u_value)
perms = AlgR5.compute_Perms_value(key, p, metadata_encrypted)
Expand Down Expand Up @@ -496,7 +549,9 @@ def compute_U_value(password: bytes, key: bytes) -> Tuple[bytes, bytes]:
return u_value, ue_value

@staticmethod
def compute_O_value(password: bytes, key: bytes, u_value: bytes) -> Tuple[bytes, bytes]:
def compute_O_value(
password: bytes, key: bytes, u_value: bytes
) -> Tuple[bytes, bytes]:
"""
Algorithm 3.9 Computing the encryption dictionary’s O (owner password) and OE (owner encryption key) values
Expand All @@ -513,7 +568,9 @@ def compute_O_value(password: bytes, key: bytes, u_value: bytes) -> Tuple[bytes,
random_bytes = bytes(random.randrange(0, 256) for _ in range(16))
val_salt = random_bytes[:8]
key_salt = random_bytes[8:]
o_value = hashlib.sha256(password + val_salt + u_value).digest() + val_salt + key_salt
o_value = (
hashlib.sha256(password + val_salt + u_value).digest() + val_salt + key_salt
)

tmp_key = hashlib.sha256(password + key_salt + u_value).digest()
iv = bytes(0 for _ in range(16))
Expand All @@ -535,15 +592,24 @@ def compute_Perms_value(key: bytes, p: int, metadata_encrypted: bool) -> bytes:
encryption key as the key. The result (16 bytes) is stored as the Perms string, and checked for validity
when the file is opened.
"""
b8 = b'T' if metadata_encrypted else b'F'
b8 = b"T" if metadata_encrypted else b"F"
rr = bytes(random.randrange(0, 256) for _ in range(4))
data = struct.pack("<I", p) + b"\xff\xff\xff\xff" + b8 + b"adb" + rr
perms = AES_ECB_encrypt(key, data)
return perms


class Encryption:
def __init__(self, algV: int, entry: DictionaryObject, first_id_entry: bytes, StmF: str, StrF: str, EFF: str) -> None:
def __init__(
self,
algV: int,
entry: DictionaryObject,
first_id_entry: bytes,
StmF: str,
StrF: str,
EFF: str,
) -> None:
# See TABLE 3.18 Entries common to all encryption dictionaries
self.algV = algV
self.entry = entry
self.key_size = entry.get("/Length", 40)
Expand All @@ -558,8 +624,6 @@ def __init__(self, algV: int, entry: DictionaryObject, first_id_entry: bytes, St
self._owner_keys: Dict = {}

def decrypt_object(self, obj: PdfObject, idnum: int, generation: int) -> PdfObject:
"""decrypt object"""

"""
Algorithm 1: Encryption of data using the RC4 or AES algorithms
Expand All @@ -580,9 +644,7 @@ def decrypt_object(self, obj: PdfObject, idnum: int, generation: int) -> PdfObje
If using the AES algorithm, the Cipher Block Chaining (CBC) mode, which requires an initialization vector,
is used. The block size parameter is set to 16 bytes, and the initialization vector is a 16-byte random
number that is stored as the first 16 bytes of the encrypted stream or string.
"""
"""
Algorithm 3.1a Encryption of data using the AES algorithm
1. Use the 32-byte file encryption key for the AES-256 symmetric key algorithm, along with the string or
stream data to be encrypted.
Expand All @@ -599,10 +661,10 @@ def decrypt_object(self, obj: PdfObject, idnum: int, generation: int) -> PdfObje
n = 5 if self.algV == 1 else self.key_size // 8
key_data = key[:n] + pack1 + pack2
key_hash = hashlib.md5(key_data)
rc4_key = key_hash.digest()[:min(n + 5, 16)]
rc4_key = key_hash.digest()[: min(n + 5, 16)]
# for AES-128
key_hash.update(b"sAlT")
aes128_key = key_hash.digest()[:min(n + 5, 16)]
aes128_key = key_hash.digest()[: min(n + 5, 16)]

# for V=5 use AES-256
aes256_key = key
Expand All @@ -615,7 +677,9 @@ def decrypt_object(self, obj: PdfObject, idnum: int, generation: int) -> PdfObje
return cf.decrypt_object(obj)

@staticmethod
def _get_crypt(method: str, rc4_key: bytes, aes128_key: bytes, aes256_key: bytes) -> CryptBase:
def _get_crypt(
method: str, rc4_key: bytes, aes128_key: bytes, aes256_key: bytes
) -> CryptBase:
if method == "/AESV3":
return CryptAES(aes256_key)
if method == "/AESV2":
Expand Down Expand Up @@ -662,10 +726,28 @@ def verify_r4(self, user_pwd: bytes, owner_pwd: bytes) -> Tuple[bytes, int]:
o_entry = cast(ByteStringObject, self.entry["/O"].get_object()).original_bytes
u_entry = cast(ByteStringObject, self.entry["/U"].get_object()).original_bytes

key = AlgR4.verify_user_password(user_pwd, R, self.key_size, o_entry, u_entry, P, self.id1_entry, metadata_encrypted)
key = AlgR4.verify_user_password(
user_pwd,
R,
self.key_size,
o_entry,
u_entry,
P,
self.id1_entry,
metadata_encrypted,
)
if key:
return key, 1
key = AlgR4.verify_owner_password(owner_pwd, R, self.key_size, o_entry, u_entry, P, self.id1_entry, metadata_encrypted)
key = AlgR4.verify_owner_password(
owner_pwd,
R,
self.key_size,
o_entry,
u_entry,
P,
self.id1_entry,
metadata_encrypted,
)
if key:
return key, 2
return b"", 0
Expand Down Expand Up @@ -700,7 +782,9 @@ def verify_r5(self, user_pwd: bytes, owner_pwd: bytes) -> Tuple[bytes, int]:
def read(encryption_entry: DictionaryObject, first_id_entry: bytes) -> "Encryption":
filter = encryption_entry.get("/Filter")
if filter != "/Standard":
raise NotImplementedError("only Standard PDF encryption handler is available")
raise NotImplementedError(
"only Standard PDF encryption handler is available"
)
if "/SubFilter" in encryption_entry:
raise NotImplementedError("/SubFilter NOT supported")

Expand Down
Loading

0 comments on commit 797963a

Please sign in to comment.