From 93928e36a053ee93317650dd8e04b42aa9e6e399 Mon Sep 17 00:00:00 2001 From: Omar Jarjur Date: Tue, 15 Oct 2024 11:11:36 -0700 Subject: [PATCH] Set an explicit timeout when polling websocket connections --- google/cloud/dataproc_spark_connect/client/proxy.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/google/cloud/dataproc_spark_connect/client/proxy.py b/google/cloud/dataproc_spark_connect/client/proxy.py index e5e3bbb..14b6c90 100755 --- a/google/cloud/dataproc_spark_connect/client/proxy.py +++ b/google/cloud/dataproc_spark_connect/client/proxy.py @@ -43,7 +43,13 @@ def __init__(self, websocket_conn): self._conn = websocket_conn def recv(self, buff_size): - msg = self._conn.recv() + # N.B. The websockets [recv method](https://websockets.readthedocs.io/en/stable/reference/sync/client.html#websockets.sync.client.ClientConnection.recv) + # does not support the buff_size parameter, but it does add a `timeout` keyword parameter not supported by normal + # socket objects. + # + # We set that timeout to 60 seconds to prevent any scenarios where we wind up stuck waiting for a message from a websocket connection + # that never comes. + msg = self._conn.recv(timeout=60) return bytes.fromhex(msg) def send(self, msg_bytes): @@ -97,6 +103,9 @@ def forward_bytes(name, from_sock, to_sock): if not bs: return to_sock.send(bs) + except TimeoutError: + # On timeouts we simply want to poll again. + pass except Exception as ex: logger.debug(f"[{name}] Exception forwarding bytes: {ex}") to_sock.close()