Skip to content

Commit

Permalink
Avoid deadlock when closing sync connection with unread messages.
Browse files Browse the repository at this point in the history
Fix #1336.
  • Loading branch information
aaugustin committed Apr 18, 2023
1 parent f55c141 commit 0ce16b2
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 9 deletions.
11 changes: 11 additions & 0 deletions docs/project/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ fixing regressions shortly after a release.
Only documented APIs are public. Undocumented, private APIs may change without
notice.

11.0.2
------

*April 18, 2023*

Bug fixes
.........

* Fixed a deadlock in the :mod:`threading` implementation when closing a
connection without reading all messages.

11.0.1
------

Expand Down
23 changes: 15 additions & 8 deletions src/websockets/sync/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,9 @@ def close(self, code: int = 1000, reason: str = "") -> None:
"""
Perform the closing handshake.
:meth:`close` waits for the other end to complete the handshake and
for the TCP connection to terminate.
:meth:`close` waits for the other end to complete the handshake, for the
TCP connection to terminate, and for all incoming messages to be read
with :meth:`recv`.
:meth:`close` is idempotent: it doesn't do anything once the
connection is closed.
Expand Down Expand Up @@ -574,9 +575,13 @@ def recv_events(self) -> None:
# Given that automatic responses write small amounts of data,
# this should be uncommon, so we don't handle the edge case.

for event in events:
# This isn't expected to raise an exception.
self.process_event(event)
try:
for event in events:
# This may raise EOFError if the closing handshake
# times out while a message is waiting to be read.
self.process_event(event)
except EOFError:
break

# Breaking out of the while True: ... loop means that we believe
# that the socket doesn't work anymore.
Expand All @@ -600,7 +605,6 @@ def recv_events(self) -> None:
self.protocol.state = CLOSED
finally:
# This isn't expected to raise an exception.
self.recv_messages.close()
self.close_socket()

@contextlib.contextmanager
Expand Down Expand Up @@ -745,13 +749,16 @@ def set_recv_events_exc(self, exc: Optional[BaseException]) -> None:

def close_socket(self) -> None:
"""
Shutdown and close socket.
Shutdown and close socket. Close message assembler.
shutdown() is required to interrupt recv() on Linux.
Calling close_socket() guarantees that recv_events() terminates. Indeed,
recv_events() may block only on socket.recv() or on recv_messages.put().
"""
# shutdown() is required to interrupt recv() on Linux.
try:
self.socket.shutdown(socket.SHUT_RDWR)
except OSError:
pass # socket is already closed
self.socket.close()
self.recv_messages.close()
2 changes: 1 addition & 1 deletion src/websockets/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

released = True

tag = version = commit = "11.0.1"
tag = version = commit = "11.0.2"


if not released: # pragma: no cover
Expand Down
43 changes: 43 additions & 0 deletions tests/sync/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,49 @@ def test_close_timeout_waiting_for_connection_closed(self):
# Remove socket.timeout when dropping Python < 3.10.
self.assertIsInstance(exc.__cause__, (socket.timeout, TimeoutError))

def test_close_waits_for_recv(self):
self.remote_connection.send("😀")

close_thread = threading.Thread(target=self.connection.close)
close_thread.start()

# Let close() initiate the closing handshake and send a close frame.
time.sleep(MS)
self.assertTrue(close_thread.is_alive())

# Connection isn't closed yet.
self.connection.recv()

# Let close() receive a close frame and finish the closing handshake.
time.sleep(MS)
self.assertFalse(close_thread.is_alive())

# Connection is closed now.
with self.assertRaises(ConnectionClosedOK) as raised:
self.connection.recv()

exc = raised.exception
self.assertEqual(str(exc), "sent 1000 (OK); then received 1000 (OK)")
self.assertIsNone(exc.__cause__)

def test_close_timeout_waiting_for_recv(self):
self.remote_connection.send("😀")

close_thread = threading.Thread(target=self.connection.close)
close_thread.start()

# Let close() time out during the closing handshake.
time.sleep(3 * MS)
self.assertFalse(close_thread.is_alive())

# Connection is closed now.
with self.assertRaises(ConnectionClosedError) as raised:
self.connection.recv()

exc = raised.exception
self.assertEqual(str(exc), "sent 1000 (OK); no close frame received")
self.assertIsInstance(exc.__cause__, TimeoutError)

def test_close_idempotency(self):
"""close does nothing if the connection is already closed."""
self.connection.close()
Expand Down

0 comments on commit 0ce16b2

Please sign in to comment.