diff --git a/CHANGES/3361.bugfix b/CHANGES/3361.bugfix new file mode 100644 index 00000000000..4153f6bc709 --- /dev/null +++ b/CHANGES/3361.bugfix @@ -0,0 +1 @@ +fix false-negative indicator end_of_HTTP_chunk in StreamReader.readchunk function diff --git a/CONTRIBUTORS.txt b/CONTRIBUTORS.txt index e5d63bac572..6a1d792342f 100644 --- a/CONTRIBUTORS.txt +++ b/CONTRIBUTORS.txt @@ -149,6 +149,7 @@ Matthieu Rigal Michael Ihnatenko Mikhail Kashkin Mikhail Lukyanchenko +Mikhail Nacharov Misha Behersky Mitchell Ferree Morgan Delahaye-Prat diff --git a/aiohttp/streams.py b/aiohttp/streams.py index 060e0990eae..ba5cdd497f5 100644 --- a/aiohttp/streams.py +++ b/aiohttp/streams.py @@ -254,6 +254,12 @@ def end_http_chunk_receiving(self) -> None: self._http_chunk_splits[-1] != self.total_bytes: self._http_chunk_splits.append(self.total_bytes) + # wake up readchunk when end of http chunk received + waiter = self._waiter + if waiter is not None: + self._waiter = None + set_result(waiter, False) + async def _wait(self, func_name: str) -> None: # StreamReader uses a future to link the protocol feed_data() method # to a read coroutine. Running two read coroutines at the same time @@ -366,13 +372,15 @@ async def readchunk(self) -> Tuple[bytes, bool]: return (b"", True) await self._wait('readchunk') - if not self._buffer: + if not self._buffer and not self._http_chunk_splits: # end of file return (b"", False) elif self._http_chunk_splits is not None: while self._http_chunk_splits: pos = self._http_chunk_splits[0] self._http_chunk_splits = self._http_chunk_splits[1:] + if pos == self._cursor: + return (b"", True) if pos > self._cursor: return (self._read_nowait(pos-self._cursor), True) return (self._read_nowait(-1), False) diff --git a/tests/test_streams.py b/tests/test_streams.py index 39fcea6fce6..da911168a7a 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -712,6 +712,68 @@ async def test_readchunk_with_other_read_calls(self) -> None: assert b'' == data assert not end_of_chunk + async def test_readchunk_separate_http_chunk_tail(self) -> None: + """Test that stream.readchunk returns (b'', True) when end of + http chunk received after body + """ + loop = asyncio.get_event_loop() + stream = self._make_one() + + stream.begin_http_chunk_receiving() + stream.feed_data(b'part1') + + data, end_of_chunk = await stream.readchunk() + assert b'part1' == data + assert not end_of_chunk + + async def cb(): + await asyncio.sleep(0.1) + stream.end_http_chunk_receiving() + + loop.create_task(cb()) + data, end_of_chunk = await stream.readchunk() + assert b'' == data + assert end_of_chunk + + stream.begin_http_chunk_receiving() + stream.feed_data(b'part2') + data, end_of_chunk = await stream.readchunk() + assert b'part2' == data + assert not end_of_chunk + + stream.end_http_chunk_receiving() + stream.begin_http_chunk_receiving() + stream.feed_data(b'part3') + stream.end_http_chunk_receiving() + + data, end_of_chunk = await stream.readchunk() + assert b'' == data + assert end_of_chunk + + data, end_of_chunk = await stream.readchunk() + assert b'part3' == data + assert end_of_chunk + + stream.begin_http_chunk_receiving() + stream.feed_data(b'part4') + data, end_of_chunk = await stream.readchunk() + assert b'part4' == data + assert not end_of_chunk + + async def cb(): + await asyncio.sleep(0.1) + stream.end_http_chunk_receiving() + stream.feed_eof() + + loop.create_task(cb()) + data, end_of_chunk = await stream.readchunk() + assert b'' == data + assert end_of_chunk + + data, end_of_chunk = await stream.readchunk() + assert b'' == data + assert not end_of_chunk + async def test___repr__(self) -> None: stream = self._make_one() assert "" == repr(stream)