diff --git a/pypdf/_reader.py b/pypdf/_reader.py index 08437476c..d1515bb28 100644 --- a/pypdf/_reader.py +++ b/pypdf/_reader.py @@ -124,9 +124,19 @@ def __init__( self.xref_objStm: Dict[int, Tuple[Any, Any]] = {} self.trailer = DictionaryObject() - self._page_id2num: Optional[ - Dict[Any, Any] - ] = None # map page indirect_reference number to Page Number + # map page indirect_reference number to page number + self._page_id2num: Optional[Dict[Any, Any]] = None + + self._initialize_stream(stream) + + self._override_encryption = False + self._encryption: Optional[Encryption] = None + if self.is_encrypted: + self._handle_encryption(password) + elif password is not None: + raise PdfReadError("Not an encrypted file") + + def _initialize_stream(self, stream: Union[StrByteType, Path]) -> None: if hasattr(stream, "mode") and "b" not in stream.mode: logger_warning( "PdfReader stream/file object is not in binary mode. " @@ -142,31 +152,25 @@ def __init__( self.read(stream) self.stream = stream + def _handle_encryption(self, password: Optional[Union[str, bytes]]) -> None: + self._override_encryption = True + # Some documents may not have a /ID, use two empty + # byte strings instead. Solves + # https://github.com/py-pdf/pypdf/issues/608 + id_entry = self.trailer.get(TK.ID) + id1_entry = id_entry[0].get_object().original_bytes if id_entry else b"" + encrypt_entry = cast(DictionaryObject, self.trailer[TK.ENCRYPT].get_object()) + self._encryption = Encryption.read(encrypt_entry, id1_entry) + + # try empty password if no password provided + pwd = password if password is not None else b"" + if ( + self._encryption.verify(pwd) == PasswordType.NOT_DECRYPTED + and password is not None + ): + # raise if password provided + raise WrongPasswordError("Wrong password") self._override_encryption = False - self._encryption: Optional[Encryption] = None - if self.is_encrypted: - self._override_encryption = True - # Some documents may not have a /ID, use two empty - # byte strings instead. Solves - # https://github.com/py-pdf/pypdf/issues/608 - id_entry = self.trailer.get(TK.ID) - id1_entry = id_entry[0].get_object().original_bytes if id_entry else b"" - encrypt_entry = cast( - DictionaryObject, self.trailer[TK.ENCRYPT].get_object() - ) - self._encryption = Encryption.read(encrypt_entry, id1_entry) - - # try empty password if no password provided - pwd = password if password is not None else b"" - if ( - self._encryption.verify(pwd) == PasswordType.NOT_DECRYPTED - and password is not None - ): - # raise if password provided - raise WrongPasswordError("Wrong password") - self._override_encryption = False - elif password is not None: - raise PdfReadError("Not encrypted file") def __enter__(self) -> "PdfReader": return self @@ -286,13 +290,13 @@ def _get_page_number_by_indirect( self, indirect_reference: Union[None, int, NullObject, IndirectObject] ) -> Optional[int]: """ - Generate _page_id2num. + Retrieve the page number from an indirect reference. Args: - indirect_reference: + indirect_reference: The indirect reference to locate. Returns: - The page number or None + Page number or None. """ if self._page_id2num is None: self._page_id2num = { @@ -562,6 +566,12 @@ def _replace_object(self, indirect: IndirectObject, obj: PdfObject) -> PdfObject return obj def read(self, stream: StreamType) -> None: + """ + Read and process the PDF stream, extracting necessary data. + + Args: + stream: The PDF file stream. + """ self._basic_validation(stream) self._find_eof_marker(stream) startxref = self._find_startxref_pos(stream) @@ -621,7 +631,7 @@ def read(self, stream: StreamType) -> None: stream.seek(loc, 0) # return to where it was def _basic_validation(self, stream: StreamType) -> None: - """Ensure file is not empty. Read at most 5 bytes.""" + """Ensure the stream is valid and not empty.""" stream.seek(0, os.SEEK_SET) try: header_byte = stream.read(5) @@ -819,6 +829,7 @@ def _read_standard_xref_table(self, stream: StreamType) -> None: def _read_xref_tables_and_trailers( self, stream: StreamType, startxref: Optional[int], xref_issue_nr: int ) -> None: + """Read the cross-reference tables and trailers in the PDF stream.""" self.xref = {} self.xref_free_entry = {} self.xref_objStm = {} @@ -843,21 +854,12 @@ def _read_xref_tables_and_trailers( except Exception as e: if TK.ROOT in self.trailer: logger_warning( - f"Previous trailer can not be read {e.args}", - __name__, + f"Previous trailer cannot be read: {e.args}", __name__ ) break else: - raise PdfReadError(f"trailer can not be read {e.args}") - trailer_keys = TK.ROOT, TK.ENCRYPT, TK.INFO, TK.ID, TK.SIZE - for key in trailer_keys: - if key in xrefstream and key not in self.trailer: - self.trailer[NameObject(key)] = xrefstream.raw_get(key) - if "/XRefStm" in xrefstream: - p = stream.tell() - stream.seek(cast(int, xrefstream["/XRefStm"]) + 1, 0) - self._read_pdf15_xref_stream(stream) - stream.seek(p, 0) + raise PdfReadError(f"Trailer cannot be read: {e.args}") + self._process_xref_stream(xrefstream) if "/Prev" in xrefstream: startxref = cast(int, xrefstream["/Prev"]) else: @@ -865,6 +867,18 @@ def _read_xref_tables_and_trailers( else: startxref = self._read_xref_other_error(stream, startxref) + def _process_xref_stream(self, xrefstream: DictionaryObject) -> None: + """Process and handle the xref stream.""" + trailer_keys = TK.ROOT, TK.ENCRYPT, TK.INFO, TK.ID, TK.SIZE + for key in trailer_keys: + if key in xrefstream and key not in self.trailer: + self.trailer[NameObject(key)] = xrefstream.raw_get(key) + if "/XRefStm" in xrefstream: + p = self.stream.tell() + self.stream.seek(cast(int, xrefstream["/XRefStm"]) + 1, 0) + self._read_pdf15_xref_stream(self.stream) + self.stream.seek(p, 0) + def _read_xref(self, stream: StreamType) -> Optional[int]: self._read_standard_xref_table(stream) if stream.read(1) == b"": @@ -937,7 +951,7 @@ def _read_xref_other_error( def _read_pdf15_xref_stream( self, stream: StreamType ) -> Union[ContentStream, EncodedStreamObject, DecodedStreamObject]: - # PDF 1.5+ Cross-Reference Stream + """Read the cross-reference stream for PDF 1.5+.""" stream.seek(-1, 1) idnum, generation = self.read_object_header(stream) xrefstream = cast(ContentStream, read_object(stream, self)) @@ -1065,6 +1079,7 @@ def _read_xref_subsections( get_entry: Callable[[int], Union[int, Tuple[int, ...]]], used_before: Callable[[int, Union[int, Tuple[int, ...]]], bool], ) -> None: + """Read and process the subsections of the xref.""" for start, size in self._pairs(idx_pairs): # The subsections must increase for num in range(start, start + size): @@ -1094,12 +1109,11 @@ def _read_xref_subsections( raise PdfReadError(f"Unknown xref type: {xref_type}") def _pairs(self, array: List[int]) -> Iterable[Tuple[int, int]]: + """Iterate over pairs in the array.""" i = 0 - while True: + while i + 1 < len(array): yield array[i], array[i + 1] i += 2 - if (i + 1) >= len(array): - break def decrypt(self, password: Union[str, bytes]) -> PasswordType: """ diff --git a/pypdf/generic/_base.py b/pypdf/generic/_base.py index fd7d1a8ff..e05e00b39 100644 --- a/pypdf/generic/_base.py +++ b/pypdf/generic/_base.py @@ -28,11 +28,17 @@ import codecs import hashlib import re +import sys from binascii import unhexlify from math import log10 from struct import iter_unpack from typing import Any, Callable, ClassVar, Dict, Optional, Sequence, Union, cast +if sys.version_info[:2] >= (3, 10): + from typing import TypeGuard +else: + from typing_extensions import TypeGuard # PEP 647 + from .._codecs import _pdfdoc_encoding_rev from .._protocols import PdfObjectProtocol, PdfWriterProtocol from .._utils import ( @@ -214,16 +220,6 @@ def __repr__(self) -> str: return "NullObject" -def is_null_or_none(x: Any) -> bool: - """ - Returns: - True if x is None or NullObject. - """ - return x is None or ( - isinstance(x, PdfObject) and isinstance(x.get_object(), NullObject) - ) - - class BooleanObject(PdfObject): def __init__(self, value: Any) -> None: self.value = value @@ -853,3 +849,13 @@ def encode_pdfdocencoding(unicode_string: str) -> bytes: -1, "does not exist in translation table", ) + + +def is_null_or_none(x: Any) -> TypeGuard[Union[None, NullObject, IndirectObject]]: + """ + Returns: + True if x is None or NullObject. + """ + return x is None or ( + isinstance(x, PdfObject) and isinstance(x.get_object(), NullObject) + ) diff --git a/tests/test_encryption.py b/tests/test_encryption.py index f5c494cb9..be92e40a9 100644 --- a/tests/test_encryption.py +++ b/tests/test_encryption.py @@ -205,7 +205,7 @@ def test_attempt_decrypt_unencrypted_pdf(): path = RESOURCE_ROOT / "crazyones.pdf" with pytest.raises(PdfReadError) as exc: PdfReader(path, password="nonexistent") - assert exc.value.args[0] == "Not encrypted file" + assert exc.value.args[0] == "Not an encrypted file" @pytest.mark.skipif(not HAS_AES, reason="No AES implementation") diff --git a/tests/test_reader.py b/tests/test_reader.py index 8d6bc2d6b..898a86a77 100644 --- a/tests/test_reader.py +++ b/tests/test_reader.py @@ -1294,7 +1294,7 @@ def test_reader(caplog): url = "https://github.com/py-pdf/pypdf/files/9464742/shiv_resume.pdf" name = "shiv_resume.pdf" reader = PdfReader(BytesIO(get_data_from_url(url, name=name))) - assert "Previous trailer can not be read" in caplog.text + assert "Previous trailer cannot be read" in caplog.text caplog.clear() # first call requires some reparations... reader.pages[0].extract_text()