diff --git a/CHANGES/4054.feature b/CHANGES/4054.feature new file mode 100644 index 00000000000..436bf352f6d --- /dev/null +++ b/CHANGES/4054.feature @@ -0,0 +1 @@ +Implemented readuntil in StreamResponse diff --git a/aiohttp/streams.py b/aiohttp/streams.py index 5fef9af5ffb..dcfa7fe4bc7 100644 --- a/aiohttp/streams.py +++ b/aiohttp/streams.py @@ -310,34 +310,41 @@ async def _wait(self, func_name: str) -> None: self._waiter = None async def readline(self) -> bytes: + return await self.readuntil() + + async def readuntil(self, separator: bytes = b"\n") -> bytes: + seplen = len(separator) + if seplen == 0: + raise ValueError("Separator should be at least one-byte string") + if self._exception is not None: raise self._exception - line = [] - line_size = 0 + chunk = b"" + chunk_size = 0 not_enough = True while not_enough: while self._buffer and not_enough: offset = self._buffer_offset - ichar = self._buffer[0].find(b"\n", offset) + 1 - # Read from current offset to found b'\n' or to the end. + ichar = self._buffer[0].find(separator, offset) + 1 + # Read from current offset to found separator or to the end. data = self._read_nowait_chunk(ichar - offset if ichar else -1) - line.append(data) - line_size += len(data) + chunk += data + chunk_size += len(data) if ichar: not_enough = False - if line_size > self._high_water: - raise ValueError("Line is too long") + if chunk_size > self._high_water: + raise ValueError("Chunk too big") if self._eof: break if not_enough: - await self._wait("readline") + await self._wait("readuntil") - return b"".join(line) + return chunk async def read(self, n: int = -1) -> bytes: if self._exception is not None: @@ -517,6 +524,8 @@ async def readline(self) -> bytes: async def read(self, n: int = -1) -> bytes: return b"" + # TODO add async def readuntil + async def readany(self) -> bytes: return b"" diff --git a/docs/streams.rst b/docs/streams.rst index 69fffe21379..617a1a26f1b 100644 --- a/docs/streams.rst +++ b/docs/streams.rst @@ -70,6 +70,19 @@ Reading Methods :return bytes: the given line +.. comethod:: StreamReader.readuntil(separator="\n") + + Read until separator, where `separator` is a sequence of bytes. + + If EOF is received, and `separator` was not found, the method will + return the partial read bytes. + + If the EOF was received and the internal buffer is empty, return an + empty bytes object. + + .. versionadded:: 3.8 + + :return bytes: the given data .. comethod:: StreamReader.readchunk() diff --git a/tests/test_streams.py b/tests/test_streams.py index 9747f3e7cf7..9341c82cc29 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -376,6 +376,117 @@ async def test_readline_exception(self) -> None: with pytest.raises(ValueError): await stream.readline() + async def test_readuntil(self) -> None: + loop = asyncio.get_event_loop() + # Read one chunk. 'readuntil' will need to wait for the data + # to come from 'cb' + stream = self._make_one() + stream.feed_data(b"chunk1 ") + read_task = loop.create_task(stream.readuntil(b"*")) + + def cb(): + stream.feed_data(b"chunk2 ") + stream.feed_data(b"chunk3 ") + stream.feed_data(b"* chunk4") + + loop.call_soon(cb) + + line = await read_task + assert b"chunk1 chunk2 chunk3 *" == line + + stream.feed_eof() + data = await stream.read() + assert b" chunk4" == data + + async def test_readuntil_limit_with_existing_data(self) -> None: + # Read one chunk. The data is in StreamReader's buffer + # before the event loop is run. + + stream = self._make_one(limit=2) + stream.feed_data(b"li") + stream.feed_data(b"ne1&line2&") + + with pytest.raises(ValueError): + await stream.readuntil(b"&") + # The buffer should contain the remaining data after exception + stream.feed_eof() + data = await stream.read() + assert b"line2&" == data + + async def test_readuntil_limit(self) -> None: + loop = asyncio.get_event_loop() + # Read one chunk. StreamReaders are fed with data after + # their 'readuntil' methods are called. + stream = self._make_one(limit=4) + + def cb(): + stream.feed_data(b"chunk1") + stream.feed_data(b"chunk2$") + stream.feed_data(b"chunk3#") + stream.feed_eof() + + loop.call_soon(cb) + + with pytest.raises(ValueError): + await stream.readuntil(b"$") + data = await stream.read() + assert b"chunk3#" == data + + async def test_readuntil_nolimit_nowait(self) -> None: + # All needed data for the first 'readuntil' call will be + # in the buffer. + stream = self._make_one() + data = b"line1!line2!line3!" + stream.feed_data(data[:6]) + stream.feed_data(data[6:]) + + line = await stream.readuntil(b"!") + assert b"line1!" == line + + stream.feed_eof() + data = await stream.read() + assert b"line2!line3!" == data + + async def test_readuntil_eof(self) -> None: + stream = self._make_one() + stream.feed_data(b"some data") + stream.feed_eof() + + line = await stream.readuntil(b"@") + assert b"some data" == line + + async def test_readuntil_empty_eof(self) -> None: + stream = self._make_one() + stream.feed_eof() + + line = await stream.readuntil(b"@") + assert b"" == line + + async def test_readuntil_read_byte_count(self) -> None: + stream = self._make_one() + data = b"line1!line2!line3!" + stream.feed_data(data) + + await stream.readuntil(b"!") + + data = await stream.read(7) + assert b"line2!l" == data + + stream.feed_eof() + data = await stream.read() + assert b"ine3!" == data + + async def test_readuntil_exception(self) -> None: + stream = self._make_one() + stream.feed_data(b"line#") + + data = await stream.readuntil(b"#") + assert b"line#" == data + + stream.set_exception(ValueError()) + with pytest.raises(ValueError): + await stream.readuntil(b"#") + async def test_readexactly_zero_or_less(self) -> None: # Read exact number of bytes (zero or less). stream = self._make_one()