Skip to content

Commit

Permalink
[SPARK-44872][CONNECT][FOLLOWUP] Deflake ReattachableExecuteSuite and…
Browse files Browse the repository at this point in the history
… increase retry buffer

### What changes were proposed in this pull request?

Deflake tests in ReattachableExecuteSuite and increase CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE.

### Why are the changes needed?

Two tests could be flaky with errors `INVALID_CURSOR.POSITION_NOT_AVAILABLE`.
This is caused when a server releases the response when it falls more than CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE behind the latest response it sent. However, because of HTTP2 flow control, the responses could still be in transit. In the test suite, we were explicitly disconnecting the iterators and later reconnect... In some cases they could not reconnect, because the response they last seen have fallen too fare behind.

This not only changes the suite, but also adjust the default config. This potentially makes the reconnecting more robust. In normal situation, it should not lead to increased memory pressure, because the clients also release the responses using ReleaseExecute as soon as they are received. Normally, buffered responses should be freed by ReleaseExecute and this retry buffer is only a fallback mechanism. Therefore, it is safe to increase the default.

In practice, this would only have effect in cases where there are actual network errors, and the increased buffer size should make the reconnects more robust in these cases.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

ReattachableExecuteSuite.
Did more manual experiments of how far the response sent by client can be behind the response sent by server (because of HTTP2 flow control window)

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #42908 from juliuszsompolski/SPARK-44872-followup.

Authored-by: Juliusz Sompolski <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
juliuszsompolski authored and hvanhovell committed Sep 19, 2023
1 parent f357f93 commit b2aead9
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ object Connect {
"With any value greater than 0, the last sent response will always be buffered.")
.version("3.5.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("1m")
.createWithDefaultString("10m")

val CONNECT_EXTENSIONS_RELATION_CLASSES =
buildStaticConf("spark.connect.extensions.relation.classes")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.test.SharedSparkSession
* Base class and utilities for a test suite that starts and tests the real SparkConnectService
* with a real SparkConnectClient, communicating over RPC, but both in-process.
*/
class SparkConnectServerTest extends SharedSparkSession {
trait SparkConnectServerTest extends SharedSparkSession {

// Server port
val serverPort: Int =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import io.grpc.StatusRuntimeException
import org.scalatest.concurrent.Eventually
import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkException
import org.apache.spark.{SparkEnv, SparkException}
import org.apache.spark.sql.connect.SparkConnectServerTest
import org.apache.spark.sql.connect.config.Connect
import org.apache.spark.sql.connect.service.SparkConnectService
Expand All @@ -32,7 +32,7 @@ class ReattachableExecuteSuite extends SparkConnectServerTest {
// Tests assume that this query will result in at least a couple ExecutePlanResponses on the
// stream. If this is no longer the case because of changes in how much is returned in a single
// ExecutePlanResponse, it may need to be adjusted.
val MEDIUM_RESULTS_QUERY = "select * from range(1000000)"
val MEDIUM_RESULTS_QUERY = "select * from range(10000000)"

test("reattach after initial RPC ends") {
withClient { client =>
Expand Down Expand Up @@ -138,13 +138,12 @@ class ReattachableExecuteSuite extends SparkConnectServerTest {
val reattachIter = stub.reattachExecute(
buildReattachExecuteRequest(operationId, Some(response.getResponseId)))
assert(reattachIter.hasNext)
reattachIter.next()

// Nevertheless, the original iterator will handle the INVALID_CURSOR.DISCONNECTED error
iter.next()
// iterator changed because it had to reconnect
assert(reattachableIter.innerIterator ne initialInnerIter)
}

// Nevertheless, the original iterator will handle the INVALID_CURSOR.DISCONNECTED error
iter.next()
// iterator changed because it had to reconnect
assert(reattachableIter.innerIterator ne initialInnerIter)
}
}

Expand Down Expand Up @@ -246,19 +245,26 @@ class ReattachableExecuteSuite extends SparkConnectServerTest {
val iter = stub.executePlan(
buildExecutePlanRequest(buildPlan(MEDIUM_RESULTS_QUERY), operationId = operationId))
var lastSeenResponse: String = null
val serverRetryBuffer = SparkEnv.get.conf
.get(Connect.CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE)
.toLong

iter.hasNext // open iterator
val execution = getExecutionHolder

// after consuming enough from the iterator, server should automatically start releasing
var lastSeenIndex = 0
while (iter.hasNext && execution.responseObserver.releasedUntilIndex == 0) {
var totalSizeSeen = 0
while (iter.hasNext && totalSizeSeen <= 1.1 * serverRetryBuffer) {
val r = iter.next()
lastSeenResponse = r.getResponseId()
totalSizeSeen += r.getSerializedSize
lastSeenIndex += 1
}
assert(iter.hasNext)
assert(execution.responseObserver.releasedUntilIndex > 0)
Eventually.eventually(timeout(eventuallyTimeout)) {
assert(execution.responseObserver.releasedUntilIndex > 0)
}

// Reattach from the beginning is not available.
val reattach = stub.reattachExecute(buildReattachExecuteRequest(operationId, None))
Expand Down

0 comments on commit b2aead9

Please sign in to comment.