From ab46dc048baf2a864c1cc3ff1211fb5363c95af3 Mon Sep 17 00:00:00 2001 From: Juliusz Sompolski Date: Fri, 15 Sep 2023 19:06:58 -0700 Subject: [PATCH] [SPARK-44872][CONNECT][FOLLOWUP] Deflake ReattachableExecuteSuite and increase retry buffer ### What changes were proposed in this pull request? Deflake tests in ReattachableExecuteSuite and increase CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE. ### Why are the changes needed? Two tests could be flaky with errors `INVALID_CURSOR.POSITION_NOT_AVAILABLE`. This is caused when a server releases the response when it falls more than CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE behind the latest response it sent. However, because of HTTP2 flow control, the responses could still be in transit. In the test suite, we were explicitly disconnecting the iterators and later reconnect... In some cases they could not reconnect, because the response they last seen have fallen too fare behind. This not only changes the suite, but also adjust the default config. This potentially makes the reconnecting more robust. In normal situation, it should not lead to increased memory pressure, because the clients also release the responses using ReleaseExecute as soon as they are received. Normally, buffered responses should be freed by ReleaseExecute and this retry buffer is only a fallback mechanism. Therefore, it is safe to increase the default. In practice, this would only have effect in cases where there are actual network errors, and the increased buffer size should make the reconnects more robust in these cases. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? ReattachableExecuteSuite. Did more manual experiments of how far the response sent by client can be behind the response sent by server (because of HTTP2 flow control window) ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42908 from juliuszsompolski/SPARK-44872-followup. Authored-by: Juliusz Sompolski Signed-off-by: Dongjoon Hyun --- .../spark/sql/connect/config/Connect.scala | 2 +- .../sql/connect/SparkConnectServerTest.scala | 2 +- .../execution/ReattachableExecuteSuite.scala | 26 ++++++++++++------- 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala index f7daca8542d6a..dfd6008ac09a1 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala @@ -139,7 +139,7 @@ object Connect { "With any value greater than 0, the last sent response will always be buffered.") .version("3.5.0") .bytesConf(ByteUnit.BYTE) - .createWithDefaultString("1m") + .createWithDefaultString("10m") val CONNECT_EXTENSIONS_RELATION_CLASSES = buildStaticConf("spark.connect.extensions.relation.classes") diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala index 488858d33ea12..eddd1c6be72b1 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.test.SharedSparkSession * Base class and utilities for a test suite that starts and tests the real SparkConnectService * with a real SparkConnectClient, communicating over RPC, but both in-process. */ -class SparkConnectServerTest extends SharedSparkSession { +trait SparkConnectServerTest extends SharedSparkSession { // Server port val serverPort: Int = diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala index 169b15582b698..0e29a07b719af 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala @@ -22,7 +22,7 @@ import io.grpc.StatusRuntimeException import org.scalatest.concurrent.Eventually import org.scalatest.time.SpanSugar._ -import org.apache.spark.SparkException +import org.apache.spark.{SparkEnv, SparkException} import org.apache.spark.sql.connect.SparkConnectServerTest import org.apache.spark.sql.connect.config.Connect import org.apache.spark.sql.connect.service.SparkConnectService @@ -32,7 +32,7 @@ class ReattachableExecuteSuite extends SparkConnectServerTest { // Tests assume that this query will result in at least a couple ExecutePlanResponses on the // stream. If this is no longer the case because of changes in how much is returned in a single // ExecutePlanResponse, it may need to be adjusted. - val MEDIUM_RESULTS_QUERY = "select * from range(1000000)" + val MEDIUM_RESULTS_QUERY = "select * from range(10000000)" test("reattach after initial RPC ends") { withClient { client => @@ -138,13 +138,12 @@ class ReattachableExecuteSuite extends SparkConnectServerTest { val reattachIter = stub.reattachExecute( buildReattachExecuteRequest(operationId, Some(response.getResponseId))) assert(reattachIter.hasNext) - reattachIter.next() - - // Nevertheless, the original iterator will handle the INVALID_CURSOR.DISCONNECTED error - iter.next() - // iterator changed because it had to reconnect - assert(reattachableIter.innerIterator ne initialInnerIter) } + + // Nevertheless, the original iterator will handle the INVALID_CURSOR.DISCONNECTED error + iter.next() + // iterator changed because it had to reconnect + assert(reattachableIter.innerIterator ne initialInnerIter) } } @@ -246,19 +245,26 @@ class ReattachableExecuteSuite extends SparkConnectServerTest { val iter = stub.executePlan( buildExecutePlanRequest(buildPlan(MEDIUM_RESULTS_QUERY), operationId = operationId)) var lastSeenResponse: String = null + val serverRetryBuffer = SparkEnv.get.conf + .get(Connect.CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE) + .toLong iter.hasNext // open iterator val execution = getExecutionHolder // after consuming enough from the iterator, server should automatically start releasing var lastSeenIndex = 0 - while (iter.hasNext && execution.responseObserver.releasedUntilIndex == 0) { + var totalSizeSeen = 0 + while (iter.hasNext && totalSizeSeen <= 1.1 * serverRetryBuffer) { val r = iter.next() lastSeenResponse = r.getResponseId() + totalSizeSeen += r.getSerializedSize lastSeenIndex += 1 } assert(iter.hasNext) - assert(execution.responseObserver.releasedUntilIndex > 0) + Eventually.eventually(timeout(eventuallyTimeout)) { + assert(execution.responseObserver.releasedUntilIndex > 0) + } // Reattach from the beginning is not available. val reattach = stub.reattachExecute(buildReattachExecuteRequest(operationId, None))