From 22f6fcf1268233470029b5febdf0aa9497cb9898 Mon Sep 17 00:00:00 2001 From: fselmo Date: Wed, 25 Oct 2023 11:37:27 -0600 Subject: [PATCH 1/2] Improve upon yielding control to the event loop - In order to guarantee that the event loop can run other tasks more efficiently, asyncio.sleep(0) seems to be the most efficient way to yield control back to the event loop in a way that many tasks can still run concurrently without quickly timing out. - Increase the default timeout to look for a response to a request from 20 seconds to 50 seconds. - Make the caching methods in the request processor synchronous since they don't need to be async. --- tests/core/providers/test_wsv2_provider.py | 12 ++++++++---- web3/manager.py | 16 ++++++++-------- web3/providers/persistent.py | 2 +- web3/providers/websocket/request_processor.py | 6 ++---- web3/providers/websocket/websocket_v2.py | 14 ++++++++------ 5 files changed, 27 insertions(+), 23 deletions(-) diff --git a/tests/core/providers/test_wsv2_provider.py b/tests/core/providers/test_wsv2_provider.py index 313ee148f4..14216ebc15 100644 --- a/tests/core/providers/test_wsv2_provider.py +++ b/tests/core/providers/test_wsv2_provider.py @@ -1,3 +1,4 @@ +import asyncio import json import pytest import sys @@ -87,7 +88,7 @@ async def test_async_make_request_returns_cached_response_with_no_recv_if_cached # cache the response, so we should get it immediately & should never call `recv()` desired_response = {"jsonrpc": "2.0", "id": 0, "result": "0x1337"} - await provider._request_processor.cache_raw_response(desired_response) + provider._request_processor.cache_raw_response(desired_response) response = await method_under_test(RPCEndpoint("some_method"), ["desired_params"]) assert response == desired_response @@ -104,15 +105,18 @@ async def test_async_make_request_returns_cached_response_with_no_recv_if_cached reason="Uses AsyncMock, not supported by python 3.7", ) async def test_async_make_request_times_out_of_while_loop_looking_for_response(): - provider = WebsocketProviderV2("ws://mocked", request_timeout=0.1) + timeout = 0.001 + provider = WebsocketProviderV2("ws://mocked", request_timeout=timeout) method_under_test = provider.make_request _mock_ws(provider) - provider._ws.recv.side_effect = lambda *args, **kwargs: b'{"jsonrpc": "2.0"}' + # mock the websocket to never receive a response & sleep longer than the timeout + provider._ws.recv = lambda *args, **kwargs: asyncio.sleep(1) with pytest.raises( TimeExhausted, - match=r"Timed out waiting for response with request id `0` after 0.1 second", + match=r"Timed out waiting for response with request id `0` after " + rf"{timeout} second\(s\)", ): await method_under_test(RPCEndpoint("some_method"), ["desired_params"]) diff --git a/web3/manager.py b/web3/manager.py index 624b6ed6be..c0adfcca99 100644 --- a/web3/manager.py +++ b/web3/manager.py @@ -369,8 +369,13 @@ async def _ws_recv_stream(self) -> AsyncGenerator[RPCResponse, None]: ) while True: + # sleep(0) here seems to be the most efficient way to yield control back to + # the event loop while waiting for the response to be cached or received on + # the websocket. + await asyncio.sleep(0) + # look in the cache for a response - response = await self._request_processor.pop_raw_response(subscription=True) + response = self._request_processor.pop_raw_response(subscription=True) if response is not None: break else: @@ -380,7 +385,7 @@ async def _ws_recv_stream(self) -> AsyncGenerator[RPCResponse, None]: try: # keep timeout low but reasonable to check both the cache # and the websocket connection for new responses - response = await self._provider._ws_recv(timeout=2) + response = await self._provider._ws_recv(timeout=0.5) except asyncio.TimeoutError: # if no response received, continue to next iteration continue @@ -388,12 +393,7 @@ async def _ws_recv_stream(self) -> AsyncGenerator[RPCResponse, None]: if response.get("method") == "eth_subscription": break else: - await self._provider._request_processor.cache_raw_response( - response - ) - - # this is important to let asyncio run other tasks - await asyncio.sleep(0.05) + self._provider._request_processor.cache_raw_response(response) yield await self._process_ws_response(response) diff --git a/web3/providers/persistent.py b/web3/providers/persistent.py index 9aed7a0c5b..bc23ae3bbc 100644 --- a/web3/providers/persistent.py +++ b/web3/providers/persistent.py @@ -21,7 +21,7 @@ RPCResponse, ) -DEFAULT_PERSISTENT_CONNECTION_TIMEOUT = 20 +DEFAULT_PERSISTENT_CONNECTION_TIMEOUT = 50 class PersistentConnectionProvider(AsyncJSONBaseProvider, ABC): diff --git a/web3/providers/websocket/request_processor.py b/web3/providers/websocket/request_processor.py index a94b2f350e..e57fb09a61 100644 --- a/web3/providers/websocket/request_processor.py +++ b/web3/providers/websocket/request_processor.py @@ -191,9 +191,7 @@ def append_middleware_response_processor( # raw response cache - async def cache_raw_response( - self, raw_response: Any, subscription: bool = False - ) -> None: + def cache_raw_response(self, raw_response: Any, subscription: bool = False) -> None: if subscription: self._provider.logger.debug( f"Caching subscription response:\n response={raw_response}" @@ -208,7 +206,7 @@ async def cache_raw_response( ) self._request_response_cache.cache(cache_key, raw_response) - async def pop_raw_response( + def pop_raw_response( self, cache_key: str = None, subscription: bool = False ) -> Any: if subscription: diff --git a/web3/providers/websocket/websocket_v2.py b/web3/providers/websocket/websocket_v2.py index c03c9bdf58..949c6b0c68 100644 --- a/web3/providers/websocket/websocket_v2.py +++ b/web3/providers/websocket/websocket_v2.py @@ -169,13 +169,18 @@ async def _match_response_id_to_request_id() -> RPCResponse: request_cache_key = generate_cache_key(request_id) while True: + # sleep(0) here seems to be the most efficient way to yield control + # back to the event loop while waiting for the response to be cached + # or received on the websocket. + await asyncio.sleep(0) + if request_cache_key in self._request_processor._request_response_cache: # if response is already cached, pop it from cache self.logger.debug( f"Response for id {request_id} is already cached, pop it " "from the cache." ) - return await self._request_processor.pop_raw_response( + return self._request_processor.pop_raw_response( cache_key=request_cache_key, ) @@ -189,7 +194,7 @@ async def _match_response_id_to_request_id() -> RPCResponse: try: # keep timeout low but reasonable to check both the # cache and the websocket connection for new responses - response = await self._ws_recv(timeout=2) + response = await self._ws_recv(timeout=0.5) except asyncio.TimeoutError: # keep the request timeout around the whole of this # while loop in case the response sneaks into the cache @@ -209,13 +214,10 @@ async def _match_response_id_to_request_id() -> RPCResponse: is_subscription = ( response.get("method") == "eth_subscription" ) - await self._request_processor.cache_raw_response( + self._request_processor.cache_raw_response( response, subscription=is_subscription ) - # this is important to let asyncio run other tasks - await asyncio.sleep(0.05) - try: # Add the request timeout around the while loop that checks the request # cache and tried to recv(). If the request is neither in the cache, nor From ef90f39a8959fdd2674fc73ca85e29b633ad545b Mon Sep 17 00:00:00 2001 From: fselmo Date: Thu, 26 Oct 2023 11:09:15 -0600 Subject: [PATCH 2/2] relevant newsfragments for #3135 --- newsfragments/3135.internal.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 newsfragments/3135.internal.rst diff --git a/newsfragments/3135.internal.rst b/newsfragments/3135.internal.rst new file mode 100644 index 0000000000..45c91ad3af --- /dev/null +++ b/newsfragments/3135.internal.rst @@ -0,0 +1 @@ +Improvements on yielding to the event loop while searching in response caches and calling ``recv()`` on the websocket connection.