Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async session cache improvements #2713

Merged
merged 2 commits into from
Nov 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions newsfragments/2713.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
If the loop for a cached async session is closed, or the session itself was closed, create a new session at that cache key and properly close and evict the stale session.
70 changes: 70 additions & 0 deletions tests/core/utilities/test_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,3 +396,73 @@ def target_function(endpoint_uri):

# appropriately close the test sessions
[await session.close() for session in test_sessions]


@pytest.mark.asyncio
async def test_async_use_new_session_if_loop_closed_for_cached_session():
# create new loop, cache a session wihin the loop, close the loop
loop1 = asyncio.new_event_loop()

session1 = ClientSession(raise_for_status=True)
session1._loop = loop1

await cache_and_return_async_session(TEST_URI, session=session1)

# assert session1 was cached
cache_key = generate_cache_key(f"{threading.get_ident()}:{TEST_URI}")

assert len(request._async_session_cache) == 1
cached_session = request._async_session_cache.get_cache_entry(cache_key)
assert cached_session == session1

# close loop that was used with session1
loop1.close()

# assert we create a new session when trying to retrieve the session at the
# cache key for TEST_URI
session2 = await cache_and_return_async_session(TEST_URI)
assert not session2._loop.is_closed()
assert session2 != session1

# assert we appropriately closed session1, evicted it from the cache, and cached
# the new session2 at the cache key
assert session1.closed
assert len(request._async_session_cache) == 1
cached_session = request._async_session_cache.get_cache_entry(cache_key)
assert cached_session == session2

# -- teardown -- #

# appropriately close the new session
await session2.close()


@pytest.mark.asyncio
async def test_async_use_new_session_if_session_closed_for_cached_session():
# create a session, close it, and cache it at the cache key for TEST_URI
session1 = ClientSession(raise_for_status=True)
await session1.close()
await cache_and_return_async_session(TEST_URI, session=session1)

# assert session1 was cached
cache_key = generate_cache_key(f"{threading.get_ident()}:{TEST_URI}")

assert len(request._async_session_cache) == 1
cached_session = request._async_session_cache.get_cache_entry(cache_key)
assert cached_session == session1

# assert we create a new session when trying to retrieve closed session from cache
session2 = await cache_and_return_async_session(TEST_URI)
assert not session2.closed
assert session2 != session1

# assert we evicted session1 from the cache, and cached the new session2
# at the cache key
assert len(request._async_session_cache) == 1
cached_session = request._async_session_cache.get_cache_entry(cache_key)
assert cached_session == session2

# -- teardown -- #

# appropriately close the new session
await session2.close()
33 changes: 33 additions & 0 deletions web3/_utils/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,39 @@ async def cache_and_return_async_session(
evicted_items = _async_session_cache.cache(cache_key, session)
logger.debug(f"Async session cached: {endpoint_uri}, {session}")

else:
# get the cached session
cached_session = _async_session_cache.get_cache_entry(cache_key)
session_is_closed = cached_session.closed
session_loop_is_closed = cached_session._loop.is_closed()

warning = (
"Async session was closed"
if session_is_closed
else "Loop was closed for async session"
if session_loop_is_closed
else None
)
if warning:
logger.debug(
f"{warning}: {endpoint_uri}, {cached_session}. "
f"Creating and caching a new async session for uri."
)

_async_session_cache._data.pop(cache_key)
if not session_is_closed:
# if loop was closed but not the session, close the session
await cached_session.close()
logger.debug(
f"Async session closed and evicted from cache: {cached_session}"
)

# replace stale session with a new session at the cache key
_session = ClientSession(raise_for_status=True)
evicted_items = _async_session_cache.cache(cache_key, _session)
logger.debug(f"Async session cached: {endpoint_uri}, {_session}")

# get the cached session
cached_session = _async_session_cache.get_cache_entry(cache_key)

if evicted_items is not None:
Expand Down