Skip to content

Commit

Permalink
Set an explicit timeout when polling websocket connections (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
ojarjur authored Oct 15, 2024
2 parents a3596dd + 93928e3 commit ec326d5
Showing 1 changed file with 10 additions and 1 deletion.
11 changes: 10 additions & 1 deletion google/cloud/dataproc_spark_connect/client/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,13 @@ def __init__(self, websocket_conn):
self._conn = websocket_conn

def recv(self, buff_size):
msg = self._conn.recv()
# N.B. The websockets [recv method](https://websockets.readthedocs.io/en/stable/reference/sync/client.html#websockets.sync.client.ClientConnection.recv)
# does not support the buff_size parameter, but it does add a `timeout` keyword parameter not supported by normal
# socket objects.
#
# We set that timeout to 60 seconds to prevent any scenarios where we wind up stuck waiting for a message from a websocket connection
# that never comes.
msg = self._conn.recv(timeout=60)
return bytes.fromhex(msg)

def send(self, msg_bytes):
Expand Down Expand Up @@ -97,6 +103,9 @@ def forward_bytes(name, from_sock, to_sock):
if not bs:
return
to_sock.send(bs)
except TimeoutError:
# On timeouts we simply want to poll again.
pass
except Exception as ex:
logger.debug(f"[{name}] Exception forwarding bytes: {ex}")
to_sock.close()
Expand Down

0 comments on commit ec326d5

Please sign in to comment.