Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix handling of CancelledError by async client #2607

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions redis/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,12 @@ async def execute_command(self, *args, **options):
),
lambda error: self._disconnect_raise(conn, error),
)
except asyncio.CancelledError:
# If we're cancelled and cancel happened after we sent the command
# but before we read the response, we need to read the response
# to avoid reading responses out of order.
await conn.read_response(timeout=0.001)
raise
finally:
if self.single_connection_client:
self._single_conn_lock.release()
Expand Down
45 changes: 45 additions & 0 deletions tests/test_asyncio/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,3 +273,48 @@ async def open_connection(*args, **kwargs):

vals = await asyncio.gather(do_read(), do_close())
assert vals == [b"Hello, World!", None]


@pytest.mark.onlynoncluster
async def test_client_handle_canceled_error(create_redis):
"""
This test reproduces the case in issue #2539
where asyncio.CancelledError is raised when the parser is reading to feed the
internal buffer. The stream `readline()` will be interrupted by the CancelledError,
which will result in not reading the response after executing the command. This will
cause responses to be mixed up between commands. In this test, we execute a command
after the CancelledError is raised, and verify that the response is correct.
"""
r = await create_redis(single_connection_client=True)

async def do_pings():
while True:
await r.ping()

future = asyncio.ensure_future(do_pings())
await asyncio.sleep(0.01)
future.cancel()
with pytest.raises(asyncio.CancelledError):
await future
# To reproduce the issue, we need to execute a command which returns a different
# response type than PING. In this case, we use EXISTS because it should return an
# integer.
assert await r.exists("foo") == 0

await r.sadd("set", "one")
await r.sadd("set", "two")
await r.sadd("set", "three")

async def do_smembers():
while True:
await r.smembers("set")

future = asyncio.ensure_future(do_smembers())
await asyncio.sleep(0.01)
future.cancel()
with pytest.raises(asyncio.CancelledError):
await future

assert await r.exists("foo") == 0

await r.connection.disconnect()