Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WS_V2] Improve upon yielding control to the event loop #3135

Merged
merged 2 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions newsfragments/3135.internal.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improvements on yielding to the event loop while searching in response caches and calling ``recv()`` on the websocket connection.
12 changes: 8 additions & 4 deletions tests/core/providers/test_wsv2_provider.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import json
import pytest
import sys
Expand Down Expand Up @@ -87,7 +88,7 @@ async def test_async_make_request_returns_cached_response_with_no_recv_if_cached

# cache the response, so we should get it immediately & should never call `recv()`
desired_response = {"jsonrpc": "2.0", "id": 0, "result": "0x1337"}
await provider._request_processor.cache_raw_response(desired_response)
provider._request_processor.cache_raw_response(desired_response)

response = await method_under_test(RPCEndpoint("some_method"), ["desired_params"])
assert response == desired_response
Expand All @@ -104,15 +105,18 @@ async def test_async_make_request_returns_cached_response_with_no_recv_if_cached
reason="Uses AsyncMock, not supported by python 3.7",
)
async def test_async_make_request_times_out_of_while_loop_looking_for_response():
provider = WebsocketProviderV2("ws://mocked", request_timeout=0.1)
timeout = 0.001
provider = WebsocketProviderV2("ws://mocked", request_timeout=timeout)

method_under_test = provider.make_request

_mock_ws(provider)
provider._ws.recv.side_effect = lambda *args, **kwargs: b'{"jsonrpc": "2.0"}'
# mock the websocket to never receive a response & sleep longer than the timeout
provider._ws.recv = lambda *args, **kwargs: asyncio.sleep(1)

with pytest.raises(
TimeExhausted,
match=r"Timed out waiting for response with request id `0` after 0.1 second",
match=r"Timed out waiting for response with request id `0` after "
rf"{timeout} second\(s\)",
):
await method_under_test(RPCEndpoint("some_method"), ["desired_params"])
16 changes: 8 additions & 8 deletions web3/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,13 @@ async def _ws_recv_stream(self) -> AsyncGenerator[RPCResponse, None]:
)

while True:
# sleep(0) here seems to be the most efficient way to yield control back to
# the event loop while waiting for the response to be cached or received on
# the websocket.
await asyncio.sleep(0)

# look in the cache for a response
response = await self._request_processor.pop_raw_response(subscription=True)
response = self._request_processor.pop_raw_response(subscription=True)
if response is not None:
break
else:
Expand All @@ -380,20 +385,15 @@ async def _ws_recv_stream(self) -> AsyncGenerator[RPCResponse, None]:
try:
# keep timeout low but reasonable to check both the cache
# and the websocket connection for new responses
response = await self._provider._ws_recv(timeout=2)
response = await self._provider._ws_recv(timeout=0.5)
except asyncio.TimeoutError:
# if no response received, continue to next iteration
continue

if response.get("method") == "eth_subscription":
break
else:
await self._provider._request_processor.cache_raw_response(
response
)

# this is important to let asyncio run other tasks
await asyncio.sleep(0.05)
self._provider._request_processor.cache_raw_response(response)

yield await self._process_ws_response(response)

Expand Down
2 changes: 1 addition & 1 deletion web3/providers/persistent.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
RPCResponse,
)

DEFAULT_PERSISTENT_CONNECTION_TIMEOUT = 20
DEFAULT_PERSISTENT_CONNECTION_TIMEOUT = 50


class PersistentConnectionProvider(AsyncJSONBaseProvider, ABC):
Expand Down
6 changes: 2 additions & 4 deletions web3/providers/websocket/request_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,7 @@ def append_middleware_response_processor(

# raw response cache

async def cache_raw_response(
self, raw_response: Any, subscription: bool = False
) -> None:
def cache_raw_response(self, raw_response: Any, subscription: bool = False) -> None:
if subscription:
self._provider.logger.debug(
f"Caching subscription response:\n response={raw_response}"
Expand All @@ -208,7 +206,7 @@ async def cache_raw_response(
)
self._request_response_cache.cache(cache_key, raw_response)

async def pop_raw_response(
def pop_raw_response(
self, cache_key: str = None, subscription: bool = False
) -> Any:
if subscription:
Expand Down
14 changes: 8 additions & 6 deletions web3/providers/websocket/websocket_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,18 @@ async def _match_response_id_to_request_id() -> RPCResponse:
request_cache_key = generate_cache_key(request_id)

while True:
# sleep(0) here seems to be the most efficient way to yield control
# back to the event loop while waiting for the response to be cached
# or received on the websocket.
await asyncio.sleep(0)

if request_cache_key in self._request_processor._request_response_cache:
# if response is already cached, pop it from cache
self.logger.debug(
f"Response for id {request_id} is already cached, pop it "
"from the cache."
)
return await self._request_processor.pop_raw_response(
return self._request_processor.pop_raw_response(
cache_key=request_cache_key,
)

Expand All @@ -189,7 +194,7 @@ async def _match_response_id_to_request_id() -> RPCResponse:
try:
# keep timeout low but reasonable to check both the
# cache and the websocket connection for new responses
response = await self._ws_recv(timeout=2)
response = await self._ws_recv(timeout=0.5)
except asyncio.TimeoutError:
# keep the request timeout around the whole of this
# while loop in case the response sneaks into the cache
Expand All @@ -209,13 +214,10 @@ async def _match_response_id_to_request_id() -> RPCResponse:
is_subscription = (
response.get("method") == "eth_subscription"
)
await self._request_processor.cache_raw_response(
self._request_processor.cache_raw_response(
response, subscription=is_subscription
)

# this is important to let asyncio run other tasks
await asyncio.sleep(0.05)

try:
# Add the request timeout around the while loop that checks the request
# cache and tried to recv(). If the request is neither in the cache, nor
Expand Down