From 3ceec3b9c9502ba8ed5d83b45a3e33ab814409bb Mon Sep 17 00:00:00 2001 From: Juliusz Sompolski Date: Thu, 7 Sep 2023 10:48:28 +0900 Subject: [PATCH] [SPARK-44835][CONNECT] Make INVALID_CURSOR.DISCONNECTED a retriable error ### What changes were proposed in this pull request? Make INVALID_CURSOR.DISCONNECTED a retriable error. ### Why are the changes needed? This error can happen if two RPCs are racing to reattach to the query, and the client is still using the losing one. SPARK-44833 was a bug that exposed such a situation. That was fixed, but to be more robust, we can make this error retryable. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Tests will be added in https://github.com/apache/spark/pull/42560 ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42818 from juliuszsompolski/SPARK-44835. Authored-by: Juliusz Sompolski Signed-off-by: Hyukjin Kwon (cherry picked from commit f13743de04e430e59c4eaeca464447608bd32b1d) Signed-off-by: Hyukjin Kwon --- .../sql/connect/client/GrpcRetryHandler.scala | 17 +++++++++- python/pyspark/sql/connect/client/core.py | 31 +++++++++++++++++-- 2 files changed, 44 insertions(+), 4 deletions(-) diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala index a6841e7f1182e..8791530607c3a 100644 --- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala +++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala @@ -217,7 +217,22 @@ private[sql] object GrpcRetryHandler extends Logging { */ private[client] def retryException(e: Throwable): Boolean = { e match { - case e: StatusRuntimeException => e.getStatus.getCode == Status.Code.UNAVAILABLE + case e: StatusRuntimeException => + val statusCode: Status.Code = e.getStatus.getCode + + if (statusCode == Status.Code.INTERNAL) { + val msg: String = e.toString + + // This error happens if another RPC preempts this RPC. + if (msg.contains("INVALID_CURSOR.DISCONNECTED")) { + return true + } + } + + if (statusCode == Status.Code.UNAVAILABLE) { + return true + } + false case _ => false } } diff --git a/python/pyspark/sql/connect/client/core.py b/python/pyspark/sql/connect/client/core.py index 4b8a2348adc9d..7b3299d123b97 100644 --- a/python/pyspark/sql/connect/client/core.py +++ b/python/pyspark/sql/connect/client/core.py @@ -585,11 +585,36 @@ class SparkConnectClient(object): @classmethod def retry_exception(cls, e: Exception) -> bool: - if isinstance(e, grpc.RpcError): - return e.code() == grpc.StatusCode.UNAVAILABLE - else: + """ + Helper function that is used to identify if an exception thrown by the server + can be retried or not. + + Parameters + ---------- + e : Exception + The GRPC error as received from the server. Typed as Exception, because other exception + thrown during client processing can be passed here as well. + + Returns + ------- + True if the exception can be retried, False otherwise. + + """ + if not isinstance(e, grpc.RpcError): return False + if e.code() in [grpc.StatusCode.INTERNAL]: + msg = str(e) + + # This error happens if another RPC preempts this RPC. + if "INVALID_CURSOR.DISCONNECTED" in msg: + return True + + if e.code() == grpc.StatusCode.UNAVAILABLE: + return True + + return False + def __init__( self, connection: Union[str, ChannelBuilder],