Skip to content

Commit

Permalink
Async session cache improvements
Browse files Browse the repository at this point in the history
- Start and cache a new ``ClientSession`` for a cache key for async if the event loop for the cached session was already closed or if the session itself is closed.
  • Loading branch information
fselmo committed Nov 9, 2022
1 parent 0207dd5 commit 2cc692f
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 0 deletions.
70 changes: 70 additions & 0 deletions tests/core/utilities/test_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,3 +396,73 @@ def target_function(endpoint_uri):

# appropriately close the test sessions
[await session.close() for session in test_sessions]


@pytest.mark.asyncio
async def test_async_use_new_session_if_loop_closed_for_cached_session():
# create new loop, cache a session wihin the loop, close the loop
loop1 = asyncio.new_event_loop()

session1 = ClientSession(raise_for_status=True)
session1._loop = loop1

await cache_and_return_async_session(TEST_URI, session=session1)

# assert session1 was cached
cache_key = generate_cache_key(f"{threading.get_ident()}:{TEST_URI}")

assert len(request._async_session_cache) == 1
cached_session = request._async_session_cache.get_cache_entry(cache_key)
assert cached_session == session1

# close loop that was used with session1
loop1.close()

# assert we create a new session when trying to retrieve the session at the
# cache key for TEST_URI
session2 = await cache_and_return_async_session(TEST_URI)
assert not session2._loop.is_closed()
assert session2 != session1

# assert we appropriately closed session1, evicted it from the cache, and cached
# the new session2 at the cache key
assert session1.closed
assert len(request._async_session_cache) == 1
cached_session = request._async_session_cache.get_cache_entry(cache_key)
assert cached_session == session2

# -- teardown -- #

# appropriately close the new session
await session2.close()


@pytest.mark.asyncio
async def test_async_use_new_session_if_session_closed_for_cached_session():
# create a session, close it, and cache it at the cache key for TEST_URI
session1 = ClientSession(raise_for_status=True)
await session1.close()
await cache_and_return_async_session(TEST_URI, session=session1)

# assert session1 was cached
cache_key = generate_cache_key(f"{threading.get_ident()}:{TEST_URI}")

assert len(request._async_session_cache) == 1
cached_session = request._async_session_cache.get_cache_entry(cache_key)
assert cached_session == session1

# assert we create a new session when trying to retrieve closed session from cache
session2 = await cache_and_return_async_session(TEST_URI)
assert not session2.closed
assert session2 != session1

# assert we evicted session1 from the cache, and cached the new session2
# at the cache key
assert len(request._async_session_cache) == 1
cached_session = request._async_session_cache.get_cache_entry(cache_key)
assert cached_session == session2

# -- teardown -- #

# appropriately close the new session
await session2.close()
33 changes: 33 additions & 0 deletions web3/_utils/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,39 @@ async def cache_and_return_async_session(
evicted_items = _async_session_cache.cache(cache_key, session)
logger.debug(f"Async session cached: {endpoint_uri}, {session}")

else:
# get the cached session
cached_session = _async_session_cache.get_cache_entry(cache_key)
session_is_closed = cached_session.closed
session_loop_is_closed = cached_session._loop.is_closed()

warning = (
"Async session was closed"
if session_is_closed
else "Loop was closed for async session"
if session_loop_is_closed
else None
)
if warning:
logger.debug(
f"{warning}: {endpoint_uri}, {cached_session}. "
f"Creating and caching a new async session for uri."
)

_async_session_cache._data.pop(cache_key)
if not session_is_closed:
# if loop was closed but not the session, close the session
await cached_session.close()
logger.debug(
f"Async session closed and evicted from cache: {cached_session}"
)

# replace stale session with a new session at the cache key
_session = ClientSession(raise_for_status=True)
evicted_items = _async_session_cache.cache(cache_key, _session)
logger.debug(f"Async session cached: {endpoint_uri}, {_session}")

# get the cached session
cached_session = _async_session_cache.get_cache_entry(cache_key)

if evicted_items is not None:
Expand Down

0 comments on commit 2cc692f

Please sign in to comment.