Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for configuring the request information cache size on wsV2 #3226

Merged
merged 2 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/providers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ Persistent Connection Providers
connection and waiting for a response to be received from the listener task.
Defaults to ``50.0``.

* ``request_information_cache_size`` is the size of the cache used to store
fselmo marked this conversation as resolved.
Show resolved Hide resolved
request information so that when a response is received, the provider knows
how to process it based on the original request. Defaults to ``500``.

* ``subscription_response_queue_size`` is the size of the queue used to store
subscription responses, defaults to ``500``. While messages are being consumed,
this queue should never fill up as it is a transient queue and meant to handle
Expand Down
1 change: 1 addition & 0 deletions newsfragments/3226.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow for configuring the ``request_information_cache_size`` for ``PersistentConnectionProvider`` classes. Issue a warning when the cache is full and unexpected behavior may occur.
22 changes: 22 additions & 0 deletions tests/core/providers/test_wsv2_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,3 +297,25 @@ async def test_listen_event_awaits_msg_processing_when_subscription_queue_is_ful

# proper cleanup
await async_w3.provider.disconnect()


@pytest.mark.asyncio
async def test_wsv2_req_info_cache_size_configuration_and_warns_when_full(caplog):
with patch(
"web3.providers.websocket.websocket_v2.connect", new=lambda *_1, **_2: _coro()
):
async_w3 = await AsyncWeb3.persistent_websocket(
WebsocketProviderV2("ws://mocked", request_information_cache_size=1)
)

# fill the cache + check warning
async_w3.provider._request_processor.cache_request_information(
RPCEndpoint("eth_getBlockByNumber"),
["latest"],
tuple(),
)
assert len(async_w3.provider._request_processor._request_information_cache) == 1
assert (
"Request information cache is full. This may result in unexpected behavior. "
"Consider increasing the ``request_information_cache_size`` on the provider."
) in caplog.text
2 changes: 2 additions & 0 deletions web3/providers/persistent.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ class PersistentConnectionProvider(AsyncJSONBaseProvider, ABC):
def __init__(
self,
request_timeout: float = DEFAULT_PERSISTENT_CONNECTION_TIMEOUT,
request_information_cache_size: int = 500,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It probably doesn't matter much, but the order of the kwargs may be confusing if anyone was using the subscription_response_queue_size. Consider adding it after subscription_response_queue_size?

subscription_response_queue_size: int = 500,
) -> None:
super().__init__()
self._request_processor = RequestProcessor(
self,
request_information_cache_size=request_information_cache_size,
subscription_response_queue_size=subscription_response_queue_size,
)
self.request_timeout = request_timeout
Expand Down
11 changes: 10 additions & 1 deletion web3/providers/websocket/request_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@ class RequestProcessor:
def __init__(
self,
provider: "PersistentConnectionProvider",
request_information_cache_size: int = 500,
subscription_response_queue_size: int = 500,
) -> None:
self._provider = provider

self._request_information_cache: SimpleCache = SimpleCache(500)
self._request_information_cache: SimpleCache = SimpleCache(
request_information_cache_size
)
self._request_response_cache: SimpleCache = SimpleCache(500)
self._subscription_response_queue: asyncio.Queue[RPCResponse] = asyncio.Queue(
maxsize=subscription_response_queue_size
Expand Down Expand Up @@ -81,6 +84,12 @@ def cache_request_information(
cache_key,
request_info,
)
if self._request_information_cache.is_full():
self._provider.logger.warning(
"Request information cache is full. This may result in unexpected "
"behavior. Consider increasing the ``request_information_cache_size`` "
"on the provider."
)
return cache_key

def _bump_cache_if_key_present(self, cache_key: str, request_id: int) -> None:
Expand Down
15 changes: 9 additions & 6 deletions web3/utils/caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ def __init__(self, size: int = 100):
self._size = size
self._data: OrderedDict[str, Any] = OrderedDict()

def __contains__(self, key: str) -> bool:
return key in self._data

def __len__(self) -> int:
return len(self._data)

def is_full(self) -> bool:
return len(self._data) >= self._size

def cache(self, key: str, value: Any) -> Tuple[Any, Dict[str, Any]]:
evicted_items = None
# If the key is already in the OrderedDict just update it
Expand Down Expand Up @@ -47,9 +56,3 @@ def pop(self, key: str) -> Optional[Any]:
return None

return self._data.pop(key)

def __contains__(self, key: str) -> bool:
return key in self._data

def __len__(self) -> int:
return len(self._data)