Skip to content

Commit

Permalink
[SPARK-44835][CONNECT] Make INVALID_CURSOR.DISCONNECTED a retriable e…
Browse files Browse the repository at this point in the history
…rror

### What changes were proposed in this pull request?

Make INVALID_CURSOR.DISCONNECTED a retriable error.

### Why are the changes needed?

This error can happen if two RPCs are racing to reattach to the query, and the client is still using the losing one. SPARK-44833 was a bug that exposed such a situation. That was fixed, but to be more robust, we can make this error retryable.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Tests will be added in #42560

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #42818 from juliuszsompolski/SPARK-44835.

Authored-by: Juliusz Sompolski <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
juliuszsompolski authored and HyukjinKwon committed Sep 7, 2023
1 parent a0a78b6 commit f13743d
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,22 @@ private[sql] object GrpcRetryHandler extends Logging {
*/
private[client] def retryException(e: Throwable): Boolean = {
e match {
case e: StatusRuntimeException => e.getStatus.getCode == Status.Code.UNAVAILABLE
case e: StatusRuntimeException =>
val statusCode: Status.Code = e.getStatus.getCode

if (statusCode == Status.Code.INTERNAL) {
val msg: String = e.toString

// This error happens if another RPC preempts this RPC.
if (msg.contains("INVALID_CURSOR.DISCONNECTED")) {
return true
}
}

if (statusCode == Status.Code.UNAVAILABLE) {
return true
}
false
case _ => false
}
}
Expand Down
31 changes: 28 additions & 3 deletions python/pyspark/sql/connect/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,11 +585,36 @@ class SparkConnectClient(object):

@classmethod
def retry_exception(cls, e: Exception) -> bool:
if isinstance(e, grpc.RpcError):
return e.code() == grpc.StatusCode.UNAVAILABLE
else:
"""
Helper function that is used to identify if an exception thrown by the server
can be retried or not.
Parameters
----------
e : Exception
The GRPC error as received from the server. Typed as Exception, because other exception
thrown during client processing can be passed here as well.
Returns
-------
True if the exception can be retried, False otherwise.
"""
if not isinstance(e, grpc.RpcError):
return False

if e.code() in [grpc.StatusCode.INTERNAL]:
msg = str(e)

# This error happens if another RPC preempts this RPC.
if "INVALID_CURSOR.DISCONNECTED" in msg:
return True

if e.code() == grpc.StatusCode.UNAVAILABLE:
return True

return False

def __init__(
self,
connection: Union[str, ChannelBuilder],
Expand Down

0 comments on commit f13743d

Please sign in to comment.