Skip to content

Commit

Permalink
Disable zero copy writes in the StreamWriter (#10125)
Browse files Browse the repository at this point in the history
(cherry picked from commit d58d2c3)
  • Loading branch information
bdraco authored and patchback[bot] committed Dec 5, 2024
1 parent da4e00b commit 78cddd6
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGES/10125.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Disabled zero copy writes in the ``StreamWriter`` -- by :user:`bdraco`.
2 changes: 1 addition & 1 deletion aiohttp/http_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def _writelines(self, chunks: Iterable[bytes]) -> None:
transport = self._protocol.transport
if transport is None or transport.is_closing():
raise ClientConnectionResetError("Cannot write to closing transport")
transport.writelines(chunks)
transport.write(b"".join(chunks))

async def write(
self, chunk: bytes, *, drain: bool = True, LIMIT: int = 0x10000
Expand Down
27 changes: 13 additions & 14 deletions tests/test_http_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,15 @@ async def test_write_large_payload_deflate_compression_data_in_eof(
assert transport.write.called # type: ignore[attr-defined]
chunks = [c[1][0] for c in list(transport.write.mock_calls)] # type: ignore[attr-defined]
transport.write.reset_mock() # type: ignore[attr-defined]
assert not transport.writelines.called # type: ignore[attr-defined]

# This payload compresses to 20447 bytes
payload = b"".join(
[bytes((*range(0, i), *range(i, 0, -1))) for i in range(255) for _ in range(64)]
)
await msg.write_eof(payload)
assert not transport.write.called # type: ignore[attr-defined]
assert transport.writelines.called # type: ignore[attr-defined]
chunks.extend(transport.writelines.mock_calls[0][1][0]) # type: ignore[attr-defined]
chunks.extend([c[1][0] for c in list(transport.write.mock_calls)]) # type: ignore[attr-defined]

assert all(chunks)
content = b"".join(chunks)
assert zlib.decompress(content) == (b"data" * 4096) + payload

Expand Down Expand Up @@ -180,7 +179,7 @@ async def test_write_payload_deflate_compression_chunked(
await msg.write(b"data")
await msg.write_eof()

chunks = [b"".join(c[1][0]) for c in list(transport.writelines.mock_calls)] # type: ignore[attr-defined]
chunks = [c[1][0] for c in list(transport.write.mock_calls)] # type: ignore[attr-defined]
assert all(chunks)
content = b"".join(chunks)
assert content == expected
Expand Down Expand Up @@ -216,7 +215,7 @@ async def test_write_payload_deflate_compression_chunked_data_in_eof(
await msg.write(b"data")
await msg.write_eof(b"end")

chunks = [b"".join(c[1][0]) for c in list(transport.writelines.mock_calls)] # type: ignore[attr-defined]
chunks = [c[1][0] for c in list(transport.write.mock_calls)] # type: ignore[attr-defined]
assert all(chunks)
content = b"".join(chunks)
assert content == expected
Expand All @@ -235,16 +234,16 @@ async def test_write_large_payload_deflate_compression_chunked_data_in_eof(
# This payload compresses to 1111 bytes
payload = b"".join([bytes((*range(0, i), *range(i, 0, -1))) for i in range(255)])
await msg.write_eof(payload)
assert not transport.write.called # type: ignore[attr-defined]

chunks = []
for write_lines_call in transport.writelines.mock_calls: # type: ignore[attr-defined]
chunked_payload = list(write_lines_call[1][0])[1:]
chunked_payload.pop()
chunks.extend(chunked_payload)
compressed = []
chunks = [c[1][0] for c in list(transport.write.mock_calls)] # type: ignore[attr-defined]
chunked_body = b"".join(chunks)
split_body = chunked_body.split(b"\r\n")
while split_body:
if split_body.pop(0):
compressed.append(split_body.pop(0))

assert all(chunks)
content = b"".join(chunks)
content = b"".join(compressed)
assert zlib.decompress(content) == (b"data" * 4096) + payload


Expand Down

0 comments on commit 78cddd6

Please sign in to comment.