diff --git a/PyPDF2/_reader.py b/PyPDF2/_reader.py index 0d6d75b9f6..9babe7195f 100644 --- a/PyPDF2/_reader.py +++ b/PyPDF2/_reader.py @@ -27,11 +27,11 @@ # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. +import os import re import struct import warnings from hashlib import md5 -import os from io import BytesIO from typing import ( Any, @@ -55,8 +55,8 @@ deprecate_with_replacement, ord_, read_non_whitespace, - read_until_whitespace, read_previous_line, + read_until_whitespace, skip_over_comment, skip_over_whitespace, ) @@ -1554,6 +1554,44 @@ def _pairs(self, array: List[int]) -> Iterable[Tuple[int, int]]: if (i + 1) >= len(array): break + def read_next_end_line(self, stream: StreamType, limit_offset: int = 0) -> bytes: + """.. deprecated:: 2.1.0""" + deprecate_no_replacement("read_next_end_line", removed_in="4.0.0") + line_parts = [] + while True: + # Prevent infinite loops in malformed PDFs + if stream.tell() == 0 or stream.tell() == limit_offset: + raise PdfReadError("Could not read malformed PDF file") + x = stream.read(1) + if stream.tell() < 2: + raise PdfReadError("EOL marker not found") + stream.seek(-2, 1) + if x == b_("\n") or x == b_("\r"): # \n = LF; \r = CR + crlf = False + while x == b_("\n") or x == b_("\r"): + x = stream.read(1) + if x == b_("\n") or x == b_("\r"): # account for CR+LF + stream.seek(-1, 1) + crlf = True + if stream.tell() < 2: + raise PdfReadError("EOL marker not found") + stream.seek(-2, 1) + stream.seek( + 2 if crlf else 1, 1 + ) # if using CR+LF, go back 2 bytes, else 1 + break + else: + line_parts.append(x) + line_parts.reverse() + return b"".join(line_parts) + + def readNextEndLine( + self, stream: StreamType, limit_offset: int = 0 + ) -> bytes: # pragma: no cover + """.. deprecated:: 1.28.0""" + deprecate_no_replacement("readNextEndLine") + return self.read_next_end_line(stream, limit_offset) + def decrypt(self, password: Union[str, bytes]) -> int: """ When using an encrypted / secured PDF file with the PDF Standard diff --git a/PyPDF2/_utils.py b/PyPDF2/_utils.py index 9f9afda433..bfc460d8ab 100644 --- a/PyPDF2/_utils.py +++ b/PyPDF2/_utils.py @@ -31,10 +31,16 @@ __author__ = "Mathieu Fenniak" __author_email__ = "biziqe@mathieu.fenniak.net" +import os import warnings from codecs import getencoder -from io import BufferedReader, BufferedWriter, BytesIO, FileIO, DEFAULT_BUFFER_SIZE -import os +from io import ( + DEFAULT_BUFFER_SIZE, + BufferedReader, + BufferedWriter, + BytesIO, + FileIO, +) from typing import Any, Dict, Optional, Tuple, Union, overload try: @@ -56,7 +62,7 @@ StreamType = Union[BytesIO, BufferedReader, BufferedWriter, FileIO] StrByteType = Union[str, StreamType] -DEPR_MSG_NO_REPLACEMENT = "{} is deprecated and will be removed in PyPDF2 3.0.0." +DEPR_MSG_NO_REPLACEMENT = "{} is deprecated and will be removed in PyPDF2 {}." DEPR_MSG = "{} is deprecated and will be removed in PyPDF2 3.0.0. Use {} instead." @@ -132,7 +138,7 @@ def read_until_regex(stream: StreamType, regex: Any, ignore_eof: bool = False) - return name -CRLF = b'\r\n' +CRLF = b"\r\n" def read_block_backwards(stream: StreamType, to_read: int) -> bytes: @@ -141,14 +147,14 @@ def read_block_backwards(stream: StreamType, to_read: int) -> bytes: The stream's position should be unchanged. """ if stream.tell() < to_read: - raise PdfStreamError('Could not read malformed PDF file') + raise PdfStreamError("Could not read malformed PDF file") # Seek to the start of the block we want to read. stream.seek(-to_read, os.SEEK_CUR) read = stream.read(to_read) # Seek to the start of the block we read after reading it. stream.seek(-to_read, os.SEEK_CUR) if len(read) != to_read: - raise PdfStreamError('EOF: read %s, expected %s?' % (len(read), to_read)) + raise PdfStreamError(f"EOF: read {len(read)}, expected {to_read}?") return read @@ -184,7 +190,7 @@ def read_previous_line(stream: StreamType) -> bytes: # a previous one). # Our combined line is the remainder of the block # plus any previously read blocks. - line_content.append(block[idx + 1:]) + line_content.append(block[idx + 1 :]) # Continue to read off any more CRLF characters. while idx >= 0 and block[idx] in CRLF: idx -= 1 @@ -198,7 +204,7 @@ def read_previous_line(stream: StreamType) -> bytes: stream.seek(idx + 1, os.SEEK_CUR) break # Join all the blocks in the line (which are in reverse order) - return b''.join(line_content[::-1]) + return b"".join(line_content[::-1]) def matrix_multiply( @@ -315,9 +321,11 @@ def deprecate(msg: str, stacklevel: int = 3) -> None: warnings.warn(msg, PendingDeprecationWarning, stacklevel=stacklevel) -def deprecate_with_replacement(old_name: str, new_name: str) -> None: - deprecate(DEPR_MSG.format(old_name, new_name), 4) +def deprecate_with_replacement( + old_name: str, new_name: str, removed_in="3.0.0" +) -> None: + deprecate(DEPR_MSG.format(old_name, new_name, removed_in), 4) -def deprecate_no_replacement(name: str) -> None: - deprecate(DEPR_MSG_NO_REPLACEMENT.format(name), 4) +def deprecate_no_replacement(name: str, removed_in="3.0.0") -> None: + deprecate(DEPR_MSG_NO_REPLACEMENT.format(name, removed_in), 4) diff --git a/tests/test_utils.py b/tests/test_utils.py index 3e29963c2d..627ee9fbbf 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -7,12 +7,12 @@ from PyPDF2._utils import ( mark_location, matrix_multiply, + read_block_backwards, + read_previous_line, read_until_regex, read_until_whitespace, skip_over_comment, skip_over_whitespace, - read_block_backwards, - read_previous_line ) from PyPDF2.errors import PdfStreamError @@ -128,9 +128,9 @@ def test_paeth_predictor(left, up, upleft, expected): @pytest.mark.parametrize( ("dat", "pos", "to_read"), [ - (b'', 0, 1), - (b'a', 0, 1), - (b'abc', 0, 10), + (b"", 0, 1), + (b"a", 0, 1), + (b"abc", 0, 10), ], ) def test_read_block_backwards_errs(dat, pos, to_read): @@ -143,13 +143,13 @@ def test_read_block_backwards_errs(dat, pos, to_read): @pytest.mark.parametrize( ("dat", "pos", "to_read", "expected", "expected_pos"), [ - (b'abc', 1, 0, b'', 1), - (b'abc', 1, 1, b'a', 0), - (b'abc', 2, 1, b'b', 1), - (b'abc', 2, 2, b'ab', 0), - (b'abc', 3, 1, b'c', 2), - (b'abc', 3, 2, b'bc', 1), - (b'abc', 3, 3, b'abc', 0), + (b"abc", 1, 0, b"", 1), + (b"abc", 1, 1, b"a", 0), + (b"abc", 2, 1, b"b", 1), + (b"abc", 2, 2, b"ab", 0), + (b"abc", 3, 1, b"c", 2), + (b"abc", 3, 2, b"bc", 1), + (b"abc", 3, 3, b"abc", 0), ], ) def test_read_block_backwards(dat, pos, to_read, expected, expected_pos): @@ -160,7 +160,7 @@ def test_read_block_backwards(dat, pos, to_read, expected, expected_pos): def test_read_block_backwards_at_start(): - s = io.BytesIO(b'abc') + s = io.BytesIO(b"abc") with pytest.raises(PdfStreamError) as _: read_previous_line(s) @@ -168,22 +168,38 @@ def test_read_block_backwards_at_start(): @pytest.mark.parametrize( ("dat", "pos", "expected", "expected_pos"), [ - (b'abc', 1, b'a', 0), - (b'abc', 2, b'ab', 0), - (b'abc', 3, b'abc', 0), - (b'abc\n', 3, b'abc', 0), - (b'abc\n', 4, b'', 3), - (b'abc\n\r', 4, b'', 3), - (b'abc\nd', 5, b'd', 3), + (b"abc", 1, b"a", 0), + (b"abc", 2, b"ab", 0), + (b"abc", 3, b"abc", 0), + (b"abc\n", 3, b"abc", 0), + (b"abc\n", 4, b"", 3), + (b"abc\n\r", 4, b"", 3), + (b"abc\nd", 5, b"d", 3), # Skip over multiple CR/LF bytes - (b'abc\n\r\ndef', 9, b'def', 3), + (b"abc\n\r\ndef", 9, b"def", 3), # Include a block full of newlines... - (b'abc' + b'\n' * (2 * io.DEFAULT_BUFFER_SIZE) + b'd', 2 * io.DEFAULT_BUFFER_SIZE + 4, b'd', 3), + ( + b"abc" + b"\n" * (2 * io.DEFAULT_BUFFER_SIZE) + b"d", + 2 * io.DEFAULT_BUFFER_SIZE + 4, + b"d", + 3, + ), # Include a block full of non-newline characters - (b'abc\n' + b'd' * (2 * io.DEFAULT_BUFFER_SIZE), 2 * io.DEFAULT_BUFFER_SIZE + 4, b'd' * (2 * io.DEFAULT_BUFFER_SIZE), 3), + ( + b"abc\n" + b"d" * (2 * io.DEFAULT_BUFFER_SIZE), + 2 * io.DEFAULT_BUFFER_SIZE + 4, + b"d" * (2 * io.DEFAULT_BUFFER_SIZE), + 3, + ), # Both - (b'abcxyz' + b'\n' * (2 * io.DEFAULT_BUFFER_SIZE) + b'd' * (2 * io.DEFAULT_BUFFER_SIZE),\ - 4 * io.DEFAULT_BUFFER_SIZE + 6, b'd' * (2 * io.DEFAULT_BUFFER_SIZE), 6), + ( + b"abcxyz" + + b"\n" * (2 * io.DEFAULT_BUFFER_SIZE) + + b"d" * (2 * io.DEFAULT_BUFFER_SIZE), + 4 * io.DEFAULT_BUFFER_SIZE + 6, + b"d" * (2 * io.DEFAULT_BUFFER_SIZE), + 6, + ), ], ) def test_read_previous_line(dat, pos, expected, expected_pos):