From 62a99dea9ed3fd627987188cccc4383008d22cfa Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Wed, 7 Aug 2024 19:01:05 -0500 Subject: [PATCH 01/18] Fix WebSocket ping tasks being prematurely garbage collected The event loop only keeps weak references to tasks, we need to hold a strong reference to ensure that the ping task is not prematurely garbage collected. https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task In almost all cases the ping can be done synchronously if the task is created eagerly which avoids scheduling the ping task on the event loop. fixes #8614 --- aiohttp/client_reqrep.py | 14 +++++--------- aiohttp/client_ws.py | 21 +++++++++++++++------ aiohttp/helpers.py | 15 +++++++++++++++ aiohttp/web_ws.py | 27 +++++++++++++++++++++------ 4 files changed, 56 insertions(+), 21 deletions(-) diff --git a/aiohttp/client_reqrep.py b/aiohttp/client_reqrep.py index e7b9d382794..2744ca50757 100644 --- a/aiohttp/client_reqrep.py +++ b/aiohttp/client_reqrep.py @@ -48,6 +48,7 @@ HeadersMixin, TimerNoop, basicauth_from_netrc, + create_eager_task, is_expected_content_type, netrc_from_env, parse_mimetype, @@ -668,15 +669,10 @@ async def send(self, conn: "Connection") -> "ClientResponse": await writer.write_headers(status_line, self.headers) coro = self.write_bytes(writer, conn) - if sys.version_info >= (3, 12): - # Optimization for Python 3.12, try to write - # bytes immediately to avoid having to schedule - # the task on the event loop. - task = asyncio.Task(coro, loop=self.loop, eager_start=True) - else: - task = self.loop.create_task(coro) - - self._writer = task + # Optimization for Python 3.12+, try to write + # bytes immediately to avoid having to schedule + # the task on the event loop. + self._writer = create_eager_task(coro, self.loop) response_class = self.response_class assert response_class is not None self.response = response_class( diff --git a/aiohttp/client_ws.py b/aiohttp/client_ws.py index 5008c0bc336..8fd30a1954e 100644 --- a/aiohttp/client_ws.py +++ b/aiohttp/client_ws.py @@ -7,7 +7,7 @@ from .client_exceptions import ClientError, ServerTimeoutError from .client_reqrep import ClientResponse -from .helpers import calculate_timeout_when, set_result +from .helpers import calculate_timeout_when, create_eager_task, set_result from .http import ( WS_CLOSED_MESSAGE, WS_CLOSING_MESSAGE, @@ -82,6 +82,7 @@ def __init__( self._exception: Optional[BaseException] = None self._compress = compress self._client_notakeover = client_notakeover + self._ping_task: Optional[asyncio.Task[None]] = None self._reset_heartbeat() @@ -90,6 +91,9 @@ def _cancel_heartbeat(self) -> None: if self._heartbeat_cb is not None: self._heartbeat_cb.cancel() self._heartbeat_cb = None + if self._ping_task is not None: + self._ping_task.cancel() + self._ping_task = None def _cancel_pong_response_cb(self) -> None: if self._pong_response_cb is not None: @@ -128,11 +132,6 @@ def _send_heartbeat(self) -> None: ) return - # fire-and-forget a task is not perfect but maybe ok for - # sending ping. Otherwise we need a long-living heartbeat - # task in the class. - loop.create_task(self._writer.ping()) # type: ignore[unused-awaitable] - conn = self._conn timeout_ceil_threshold = ( conn._connector._timeout_ceil_threshold if conn is not None else 5 @@ -141,6 +140,16 @@ def _send_heartbeat(self) -> None: self._cancel_pong_response_cb() self._pong_response_cb = loop.call_at(when, self._pong_not_received) + self._ping_task = create_eager_task(self._writer.ping(), loop) + if self._ping_task.done(): + self._ping_task = None + else: + self._ping_task.add_done_callback(self._ping_task_done) + + def _ping_task_done(self, task: asyncio.Task[None]) -> None: + """Callback for when the ping task completes.""" + self._ping_task = None + def _pong_not_received(self) -> None: if not self._closed: self._set_closed() diff --git a/aiohttp/helpers.py b/aiohttp/helpers.py index 496f3ad1d05..0830398385d 100644 --- a/aiohttp/helpers.py +++ b/aiohttp/helpers.py @@ -592,6 +592,21 @@ def weakref_handle( return None +def create_eager_task( + coro: Callable[..., Any], + loop: asyncio.AbstractEventLoop, +) -> asyncio.Task: + """Create a task that will be scheduled immediately if possible.""" + if sys.version_info >= (3, 12): + # Optimization for Python 3.12, try to write + # bytes immediately to avoid having to schedule + # the task on the event loop. + return asyncio.Task(coro, loop=loop, eager_start=True) + # For older python versions, we need to schedule the task + # on the event loop as eager_start is not available. + return loop.create_task(coro) + + def call_later( cb: Callable[[], Any], timeout: Optional[float], diff --git a/aiohttp/web_ws.py b/aiohttp/web_ws.py index 7cc206b4ac1..9e82ce02801 100644 --- a/aiohttp/web_ws.py +++ b/aiohttp/web_ws.py @@ -11,7 +11,12 @@ from . import hdrs from .abc import AbstractStreamWriter -from .helpers import calculate_timeout_when, set_exception, set_result +from .helpers import ( + calculate_timeout_when, + create_eager_task, + set_exception, + set_result, +) from .http import ( WS_CLOSED_MESSAGE, WS_CLOSING_MESSAGE, @@ -80,6 +85,7 @@ class WebSocketResponse(StreamResponse): "_pong_response_cb", "_compress", "_max_msg_size", + "_ping_task", ) def __init__( @@ -120,12 +126,16 @@ def __init__( self._pong_response_cb: Optional[asyncio.TimerHandle] = None self._compress = compress self._max_msg_size = max_msg_size + self._ping_task: Optional[asyncio.Task[None]] = None def _cancel_heartbeat(self) -> None: self._cancel_pong_response_cb() if self._heartbeat_cb is not None: self._heartbeat_cb.cancel() self._heartbeat_cb = None + if self._ping_task is not None: + self._ping_task.cancel() + self._ping_task = None def _cancel_pong_response_cb(self) -> None: if self._pong_response_cb is not None: @@ -165,11 +175,6 @@ def _send_heartbeat(self) -> None: ) return - # fire-and-forget a task is not perfect but maybe ok for - # sending ping. Otherwise we need a long-living heartbeat - # task in the class. - loop.create_task(self._writer.ping()) # type: ignore[unused-awaitable] - req = self._req timeout_ceil_threshold = ( req._protocol._timeout_ceil_threshold if req is not None else 5 @@ -178,6 +183,16 @@ def _send_heartbeat(self) -> None: self._cancel_pong_response_cb() self._pong_response_cb = loop.call_at(when, self._pong_not_received) + self._ping_task = create_eager_task(self._writer.ping(), loop) + if self._ping_task.done(): + self._ping_task = None + else: + self._ping_task.add_done_callback(self._ping_task_done) + + def _ping_task_done(self, task: asyncio.Task[None]) -> None: + """Callback for when the ping task completes.""" + self._ping_task = None + def _pong_not_received(self) -> None: if self._req is not None and self._req.transport is not None: self._set_closed() From 0fbcdc2c6690777151d7c2d2906eef870d63cb51 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Wed, 7 Aug 2024 19:04:36 -0500 Subject: [PATCH 02/18] typing --- aiohttp/helpers.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/aiohttp/helpers.py b/aiohttp/helpers.py index 0830398385d..740ec5455dd 100644 --- a/aiohttp/helpers.py +++ b/aiohttp/helpers.py @@ -29,6 +29,7 @@ Any, Callable, ContextManager, + Coroutine, Dict, Generic, Iterable, @@ -593,7 +594,7 @@ def weakref_handle( def create_eager_task( - coro: Callable[..., Any], + coro: Coroutine[Any, Any, None], loop: asyncio.AbstractEventLoop, ) -> asyncio.Task: """Create a task that will be scheduled immediately if possible.""" From 604f1ba2e28a1e6459b82303d1010b3ee098a62e Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Wed, 7 Aug 2024 19:05:14 -0500 Subject: [PATCH 03/18] typing --- aiohttp/helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aiohttp/helpers.py b/aiohttp/helpers.py index 740ec5455dd..665469c5598 100644 --- a/aiohttp/helpers.py +++ b/aiohttp/helpers.py @@ -596,7 +596,7 @@ def weakref_handle( def create_eager_task( coro: Coroutine[Any, Any, None], loop: asyncio.AbstractEventLoop, -) -> asyncio.Task: +) -> asyncio.Task[Any]: """Create a task that will be scheduled immediately if possible.""" if sys.version_info >= (3, 12): # Optimization for Python 3.12, try to write From 443e182b3376b4c9ef08c0aea2bd61cafe0ae54e Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Wed, 7 Aug 2024 19:05:35 -0500 Subject: [PATCH 04/18] typing --- aiohttp/helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aiohttp/helpers.py b/aiohttp/helpers.py index 665469c5598..eb2225f1300 100644 --- a/aiohttp/helpers.py +++ b/aiohttp/helpers.py @@ -596,7 +596,7 @@ def weakref_handle( def create_eager_task( coro: Coroutine[Any, Any, None], loop: asyncio.AbstractEventLoop, -) -> asyncio.Task[Any]: +) -> asyncio.Task[None]: """Create a task that will be scheduled immediately if possible.""" if sys.version_info >= (3, 12): # Optimization for Python 3.12, try to write From 2d175bb996daf0f0bde629ef55a7aef2f9ca92f3 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Wed, 7 Aug 2024 19:11:25 -0500 Subject: [PATCH 05/18] py3.8 --- aiohttp/helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aiohttp/helpers.py b/aiohttp/helpers.py index eb2225f1300..cd9d0c72853 100644 --- a/aiohttp/helpers.py +++ b/aiohttp/helpers.py @@ -596,7 +596,7 @@ def weakref_handle( def create_eager_task( coro: Coroutine[Any, Any, None], loop: asyncio.AbstractEventLoop, -) -> asyncio.Task[None]: +) -> "asyncio.Task[None]": """Create a task that will be scheduled immediately if possible.""" if sys.version_info >= (3, 12): # Optimization for Python 3.12, try to write From 196955a1a9203413e04607cc1ff154e55a1ed7fd Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Wed, 7 Aug 2024 19:12:52 -0500 Subject: [PATCH 06/18] Update aiohttp/helpers.py --- aiohttp/helpers.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/aiohttp/helpers.py b/aiohttp/helpers.py index cd9d0c72853..6fd7e1f956b 100644 --- a/aiohttp/helpers.py +++ b/aiohttp/helpers.py @@ -599,9 +599,8 @@ def create_eager_task( ) -> "asyncio.Task[None]": """Create a task that will be scheduled immediately if possible.""" if sys.version_info >= (3, 12): - # Optimization for Python 3.12, try to write - # bytes immediately to avoid having to schedule - # the task on the event loop. + # Optimization for Python 3.12, try start eagerly + # to avoid being scheduled on the event loop. return asyncio.Task(coro, loop=loop, eager_start=True) # For older python versions, we need to schedule the task # on the event loop as eager_start is not available. From f3c730360f719714312ede4fbfcd6b1d0fa4b41a Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Wed, 7 Aug 2024 19:13:03 -0500 Subject: [PATCH 07/18] Update aiohttp/helpers.py --- aiohttp/helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aiohttp/helpers.py b/aiohttp/helpers.py index 6fd7e1f956b..f7db843eca5 100644 --- a/aiohttp/helpers.py +++ b/aiohttp/helpers.py @@ -599,7 +599,7 @@ def create_eager_task( ) -> "asyncio.Task[None]": """Create a task that will be scheduled immediately if possible.""" if sys.version_info >= (3, 12): - # Optimization for Python 3.12, try start eagerly + # Optimization for Python 3.12+, try start eagerly # to avoid being scheduled on the event loop. return asyncio.Task(coro, loop=loop, eager_start=True) # For older python versions, we need to schedule the task From 6c8f7e7724ec91fefd20e319226c9751bb16d629 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Wed, 7 Aug 2024 19:13:13 -0500 Subject: [PATCH 08/18] Update aiohttp/helpers.py --- aiohttp/helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aiohttp/helpers.py b/aiohttp/helpers.py index f7db843eca5..a82c65edf8a 100644 --- a/aiohttp/helpers.py +++ b/aiohttp/helpers.py @@ -597,7 +597,7 @@ def create_eager_task( coro: Coroutine[Any, Any, None], loop: asyncio.AbstractEventLoop, ) -> "asyncio.Task[None]": - """Create a task that will be scheduled immediately if possible.""" + """Create a task that will be run immediately if possible.""" if sys.version_info >= (3, 12): # Optimization for Python 3.12+, try start eagerly # to avoid being scheduled on the event loop. From 25616f6509bdde474bb5bdb8b4c7fba2bd4226d0 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Wed, 7 Aug 2024 19:18:05 -0500 Subject: [PATCH 09/18] fix py38 --- aiohttp/client_ws.py | 2 +- aiohttp/web_ws.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/aiohttp/client_ws.py b/aiohttp/client_ws.py index 8fd30a1954e..18d75409479 100644 --- a/aiohttp/client_ws.py +++ b/aiohttp/client_ws.py @@ -146,7 +146,7 @@ def _send_heartbeat(self) -> None: else: self._ping_task.add_done_callback(self._ping_task_done) - def _ping_task_done(self, task: asyncio.Task[None]) -> None: + def _ping_task_done(self, task: "asyncio.Task[None]") -> None: """Callback for when the ping task completes.""" self._ping_task = None diff --git a/aiohttp/web_ws.py b/aiohttp/web_ws.py index 9e82ce02801..ea91187ad5a 100644 --- a/aiohttp/web_ws.py +++ b/aiohttp/web_ws.py @@ -189,7 +189,7 @@ def _send_heartbeat(self) -> None: else: self._ping_task.add_done_callback(self._ping_task_done) - def _ping_task_done(self, task: asyncio.Task[None]) -> None: + def _ping_task_done(self, task: "asyncio.Task[None]") -> None: """Callback for when the ping task completes.""" self._ping_task = None From 7e61dddabd7490602dc85657ae5815fa9e81e5f1 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Wed, 7 Aug 2024 19:25:04 -0500 Subject: [PATCH 10/18] changelog --- CHANGES/8641.bugfix.rst | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 CHANGES/8641.bugfix.rst diff --git a/CHANGES/8641.bugfix.rst b/CHANGES/8641.bugfix.rst new file mode 100644 index 00000000000..9ab9af537a3 --- /dev/null +++ b/CHANGES/8641.bugfix.rst @@ -0,0 +1,3 @@ +Fixed WebSocket ping tasks being prematurely garbage collected -- by :user:`bdraco`. + +There was a small risk that WebSocket ping tasks would be prematurely garbage collected because the event loop only holds a weak reference to the task. The garbage collection risk has been fixed by holding a strong reference to the task. Additionally, the task is not scheduled eagerly with Python 3.12+ to increase the chance it can be completed immediately and avoid having to hold any references to the task. From f027134576750ebdf6346d6671ce874c0824ec7f Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Wed, 7 Aug 2024 19:25:37 -0500 Subject: [PATCH 11/18] Update CHANGES/8641.bugfix.rst --- CHANGES/8641.bugfix.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES/8641.bugfix.rst b/CHANGES/8641.bugfix.rst index 9ab9af537a3..9c85ac04419 100644 --- a/CHANGES/8641.bugfix.rst +++ b/CHANGES/8641.bugfix.rst @@ -1,3 +1,3 @@ Fixed WebSocket ping tasks being prematurely garbage collected -- by :user:`bdraco`. -There was a small risk that WebSocket ping tasks would be prematurely garbage collected because the event loop only holds a weak reference to the task. The garbage collection risk has been fixed by holding a strong reference to the task. Additionally, the task is not scheduled eagerly with Python 3.12+ to increase the chance it can be completed immediately and avoid having to hold any references to the task. +There was a small risk that WebSocket ping tasks would be prematurely garbage collected because the event loop only holds a weak reference to the task. The garbage collection risk has been fixed by holding a strong reference to the task. Additionally, the task is now scheduled eagerly with Python 3.12+ to increase the chance it can be completed immediately and avoid having to hold any references to the task. From 63f0e19c39432796a24c4b03d84024bc0661e805 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Wed, 7 Aug 2024 19:35:12 -0500 Subject: [PATCH 12/18] add coverage --- tests/test_client_ws_functional.py | 47 ++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/tests/test_client_ws_functional.py b/tests/test_client_ws_functional.py index 81bc6e6c7c8..9410dba1b3f 100644 --- a/tests/test_client_ws_functional.py +++ b/tests/test_client_ws_functional.py @@ -793,6 +793,53 @@ async def handler(request: web.Request) -> NoReturn: assert isinstance(msg.data, ServerTimeoutError) +async def test_heartbeat_cancel_while_ping_inflight( + aiohttp_client: AiohttpClient, +) -> None: + """Test canceling the websocket while a ping is in-flight.""" + ping_received = False + + async def handler(request: web.Request) -> NoReturn: + nonlocal ping_received + ws = web.WebSocketResponse(autoping=False) + await ws.prepare(request) + msg = await ws.receive() + assert msg.type is aiohttp.WSMsgType.BINARY + msg = await ws.receive() + ping_received = msg.type is aiohttp.WSMsgType.PING + await ws.receive() + assert False + + app = web.Application() + app.router.add_route("GET", "/", handler) + + client = await aiohttp_client(app) + resp = await client.ws_connect("/", heartbeat=0.1) + await resp.send_bytes(b"ask") + + original_ping = resp.ping + cancelled = False + ping_stated = False + + async def delayed_ping() -> None: + nonlocal cancelled, ping_stated + ping_stated = True + try: + await asyncio.sleep(0.2) + await original_ping() + except asyncio.CancelledError: + cancelled = True + raise + + with mock.patch.object(resp._writer, "ping", delayed_ping): + await asyncio.sleep(0.1) + + await resp.close() + await asyncio.sleep(0) + assert ping_stated is True + assert cancelled is True + + async def test_send_recv_compress(aiohttp_client: AiohttpClient) -> None: async def handler(request: web.Request) -> web.WebSocketResponse: ws = web.WebSocketResponse() From 62106a994403dd4385f7897f53c468e956c668ee Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Wed, 7 Aug 2024 19:36:37 -0500 Subject: [PATCH 13/18] add coverage --- tests/test_client_ws_functional.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_client_ws_functional.py b/tests/test_client_ws_functional.py index 9410dba1b3f..c6ff09c0e88 100644 --- a/tests/test_client_ws_functional.py +++ b/tests/test_client_ws_functional.py @@ -793,10 +793,10 @@ async def handler(request: web.Request) -> NoReturn: assert isinstance(msg.data, ServerTimeoutError) -async def test_heartbeat_cancel_while_ping_inflight( +async def test_close_websocket_while_ping_inflight( aiohttp_client: AiohttpClient, ) -> None: - """Test canceling the websocket while a ping is in-flight.""" + """Test closing the websocket while a ping is in-flight.""" ping_received = False async def handler(request: web.Request) -> NoReturn: From eefe7c167dde1b8c36f87ca6de808d0abf6d88cd Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Wed, 7 Aug 2024 19:44:19 -0500 Subject: [PATCH 14/18] reduce --- aiohttp/client_ws.py | 7 +++---- aiohttp/web_ws.py | 9 ++++----- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/aiohttp/client_ws.py b/aiohttp/client_ws.py index 18d75409479..a3b0f47ad6c 100644 --- a/aiohttp/client_ws.py +++ b/aiohttp/client_ws.py @@ -140,10 +140,9 @@ def _send_heartbeat(self) -> None: self._cancel_pong_response_cb() self._pong_response_cb = loop.call_at(when, self._pong_not_received) - self._ping_task = create_eager_task(self._writer.ping(), loop) - if self._ping_task.done(): - self._ping_task = None - else: + ping_task = create_eager_task(self._writer.ping(), loop) + if not ping_task.done(): + self._ping_task = ping_task self._ping_task.add_done_callback(self._ping_task_done) def _ping_task_done(self, task: "asyncio.Task[None]") -> None: diff --git a/aiohttp/web_ws.py b/aiohttp/web_ws.py index ea91187ad5a..1e5616469cf 100644 --- a/aiohttp/web_ws.py +++ b/aiohttp/web_ws.py @@ -183,11 +183,10 @@ def _send_heartbeat(self) -> None: self._cancel_pong_response_cb() self._pong_response_cb = loop.call_at(when, self._pong_not_received) - self._ping_task = create_eager_task(self._writer.ping(), loop) - if self._ping_task.done(): - self._ping_task = None - else: - self._ping_task.add_done_callback(self._ping_task_done) + ping_task = create_eager_task(self._writer.ping(), loop) + if not ping_task.done(): + self._ping_task = ping_task + ping_task.add_done_callback(self._ping_task_done) def _ping_task_done(self, task: "asyncio.Task[None]") -> None: """Callback for when the ping task completes.""" From e43674e696c6c083d87384c367a08cc2cfb39319 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Wed, 7 Aug 2024 19:45:05 -0500 Subject: [PATCH 15/18] reduce --- aiohttp/client_ws.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aiohttp/client_ws.py b/aiohttp/client_ws.py index a3b0f47ad6c..9ac0f8d87e8 100644 --- a/aiohttp/client_ws.py +++ b/aiohttp/client_ws.py @@ -143,7 +143,7 @@ def _send_heartbeat(self) -> None: ping_task = create_eager_task(self._writer.ping(), loop) if not ping_task.done(): self._ping_task = ping_task - self._ping_task.add_done_callback(self._ping_task_done) + ping_task.add_done_callback(self._ping_task_done) def _ping_task_done(self, task: "asyncio.Task[None]") -> None: """Callback for when the ping task completes.""" From db8ad265eecf04b6d89b5c9e8cae384aaeba6e47 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Wed, 7 Aug 2024 19:46:08 -0500 Subject: [PATCH 16/18] reduce --- tests/test_client_ws_functional.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/test_client_ws_functional.py b/tests/test_client_ws_functional.py index c6ff09c0e88..a63d0d57979 100644 --- a/tests/test_client_ws_functional.py +++ b/tests/test_client_ws_functional.py @@ -817,7 +817,6 @@ async def handler(request: web.Request) -> NoReturn: resp = await client.ws_connect("/", heartbeat=0.1) await resp.send_bytes(b"ask") - original_ping = resp.ping cancelled = False ping_stated = False @@ -825,8 +824,7 @@ async def delayed_ping() -> None: nonlocal cancelled, ping_stated ping_stated = True try: - await asyncio.sleep(0.2) - await original_ping() + await asyncio.sleep(1) except asyncio.CancelledError: cancelled = True raise From 713c04318a5a87ce77d4b05954fd476d69114319 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 8 Aug 2024 09:03:10 -0500 Subject: [PATCH 17/18] undo dry --- aiohttp/client_reqrep.py | 14 +++++++++----- aiohttp/client_ws.py | 11 +++++++++-- aiohttp/helpers.py | 15 --------------- aiohttp/web_ws.py | 16 +++++++++------- 4 files changed, 27 insertions(+), 29 deletions(-) diff --git a/aiohttp/client_reqrep.py b/aiohttp/client_reqrep.py index 2744ca50757..e7b9d382794 100644 --- a/aiohttp/client_reqrep.py +++ b/aiohttp/client_reqrep.py @@ -48,7 +48,6 @@ HeadersMixin, TimerNoop, basicauth_from_netrc, - create_eager_task, is_expected_content_type, netrc_from_env, parse_mimetype, @@ -669,10 +668,15 @@ async def send(self, conn: "Connection") -> "ClientResponse": await writer.write_headers(status_line, self.headers) coro = self.write_bytes(writer, conn) - # Optimization for Python 3.12+, try to write - # bytes immediately to avoid having to schedule - # the task on the event loop. - self._writer = create_eager_task(coro, self.loop) + if sys.version_info >= (3, 12): + # Optimization for Python 3.12, try to write + # bytes immediately to avoid having to schedule + # the task on the event loop. + task = asyncio.Task(coro, loop=self.loop, eager_start=True) + else: + task = self.loop.create_task(coro) + + self._writer = task response_class = self.response_class assert response_class is not None self.response = response_class( diff --git a/aiohttp/client_ws.py b/aiohttp/client_ws.py index 9ac0f8d87e8..fafa99adb7b 100644 --- a/aiohttp/client_ws.py +++ b/aiohttp/client_ws.py @@ -7,7 +7,7 @@ from .client_exceptions import ClientError, ServerTimeoutError from .client_reqrep import ClientResponse -from .helpers import calculate_timeout_when, create_eager_task, set_result +from .helpers import calculate_timeout_when, set_result from .http import ( WS_CLOSED_MESSAGE, WS_CLOSING_MESSAGE, @@ -140,7 +140,14 @@ def _send_heartbeat(self) -> None: self._cancel_pong_response_cb() self._pong_response_cb = loop.call_at(when, self._pong_not_received) - ping_task = create_eager_task(self._writer.ping(), loop) + if sys.version_info >= (3, 12): + # Optimization for Python 3.12, try to send the ping + # immediately to avoid having to schedule + # the task on the event loop. + ping_task = asyncio.Task(self._writer.ping(), loop=loop, eager_start=True) + else: + ping_task = self.loop.create_task(self._writer.ping()) + if not ping_task.done(): self._ping_task = ping_task ping_task.add_done_callback(self._ping_task_done) diff --git a/aiohttp/helpers.py b/aiohttp/helpers.py index a82c65edf8a..496f3ad1d05 100644 --- a/aiohttp/helpers.py +++ b/aiohttp/helpers.py @@ -29,7 +29,6 @@ Any, Callable, ContextManager, - Coroutine, Dict, Generic, Iterable, @@ -593,20 +592,6 @@ def weakref_handle( return None -def create_eager_task( - coro: Coroutine[Any, Any, None], - loop: asyncio.AbstractEventLoop, -) -> "asyncio.Task[None]": - """Create a task that will be run immediately if possible.""" - if sys.version_info >= (3, 12): - # Optimization for Python 3.12+, try start eagerly - # to avoid being scheduled on the event loop. - return asyncio.Task(coro, loop=loop, eager_start=True) - # For older python versions, we need to schedule the task - # on the event loop as eager_start is not available. - return loop.create_task(coro) - - def call_later( cb: Callable[[], Any], timeout: Optional[float], diff --git a/aiohttp/web_ws.py b/aiohttp/web_ws.py index 1e5616469cf..5990ca08141 100644 --- a/aiohttp/web_ws.py +++ b/aiohttp/web_ws.py @@ -11,12 +11,7 @@ from . import hdrs from .abc import AbstractStreamWriter -from .helpers import ( - calculate_timeout_when, - create_eager_task, - set_exception, - set_result, -) +from .helpers import calculate_timeout_when, set_exception, set_result from .http import ( WS_CLOSED_MESSAGE, WS_CLOSING_MESSAGE, @@ -183,7 +178,14 @@ def _send_heartbeat(self) -> None: self._cancel_pong_response_cb() self._pong_response_cb = loop.call_at(when, self._pong_not_received) - ping_task = create_eager_task(self._writer.ping(), loop) + if sys.version_info >= (3, 12): + # Optimization for Python 3.12, try to send the ping + # immediately to avoid having to schedule + # the task on the event loop. + ping_task = asyncio.Task(self._writer.ping(), loop=loop, eager_start=True) + else: + ping_task = self.loop.create_task(self._writer.ping()) + if not ping_task.done(): self._ping_task = ping_task ping_task.add_done_callback(self._ping_task_done) From ce73fd6585a4db446e72f5152d90decc07411714 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 8 Aug 2024 09:13:26 -0500 Subject: [PATCH 18/18] fix moved code --- aiohttp/client_ws.py | 2 +- aiohttp/web_ws.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/aiohttp/client_ws.py b/aiohttp/client_ws.py index fafa99adb7b..0ce8e7e8f8e 100644 --- a/aiohttp/client_ws.py +++ b/aiohttp/client_ws.py @@ -146,7 +146,7 @@ def _send_heartbeat(self) -> None: # the task on the event loop. ping_task = asyncio.Task(self._writer.ping(), loop=loop, eager_start=True) else: - ping_task = self.loop.create_task(self._writer.ping()) + ping_task = loop.create_task(self._writer.ping()) if not ping_task.done(): self._ping_task = ping_task diff --git a/aiohttp/web_ws.py b/aiohttp/web_ws.py index 5990ca08141..e76716e6ae7 100644 --- a/aiohttp/web_ws.py +++ b/aiohttp/web_ws.py @@ -184,7 +184,7 @@ def _send_heartbeat(self) -> None: # the task on the event loop. ping_task = asyncio.Task(self._writer.ping(), loop=loop, eager_start=True) else: - ping_task = self.loop.create_task(self._writer.ping()) + ping_task = loop.create_task(self._writer.ping()) if not ping_task.done(): self._ping_task = ping_task