Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-44872][CONNECT] Server testing infra and ReattachableExecuteSuite #42560

Closed

Conversation

juliuszsompolski
Copy link
Contributor

@juliuszsompolski juliuszsompolski commented Aug 18, 2023

What changes were proposed in this pull request?

Add SparkConnectServerTest with infra to test real server with real client in the same process, but communicating over RPC.

Add ReattachableExecuteSuite with some tests for reattachable execute.

Two bugs were found by the tests:

  • Fix bug in SparkConnectExecutionManager.createExecuteHolder when attempting to resubmit an operation that was deemed abandoned. This bug is benign in reattachable execute, because reattachable execute would first send a ReattachExecute, which would be handled correctly in SparkConnectReattachExecuteHandler. For non-reattachable execute (disabled or old client), this is also a very unlikely scenario, because the retrying mechanism should be able to resubmit before the query is declared abandoned, and hence get an INVALID_HANDLE.OPERATION_ALREADY_EXISTS. This bug can manifest only if a non-reattachable execution is retried with so much delay that the operation was declared abandoned.
  • In ExecuteGrpcResponseSender there was an assertion that assumed that if sendResponse did not send, it was because deadline was reached. But it can also be because of interrupt. This would have resulted in interrupt returning an assertion error instead of CURSOR_DISCONNECTED in testing. Outside of testing assertions are not enabled, so this was not a problem outside of testing.

Why are the changes needed?

Testing of reattachable execute.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Tests added.

Comment on lines +46 to +50
// Other suites using mocks leave a mess in the global executionManager,
// shut it down so that it's cleared before starting server.
SparkConnectService.executionManager.shutdown()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that we use singletons is ugly. I raised https://issues.apache.org/jira/browse/SPARK-44779 for it some time ago already.

@juliuszsompolski
Copy link
Contributor Author

@hvanhovell @grundprinzip

HyukjinKwon pushed a commit that referenced this pull request Sep 7, 2023
…rror

### What changes were proposed in this pull request?

Make INVALID_CURSOR.DISCONNECTED a retriable error.

### Why are the changes needed?

This error can happen if two RPCs are racing to reattach to the query, and the client is still using the losing one. SPARK-44833 was a bug that exposed such a situation. That was fixed, but to be more robust, we can make this error retryable.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Tests will be added in #42560

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #42818 from juliuszsompolski/SPARK-44835.

Authored-by: Juliusz Sompolski <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
HyukjinKwon pushed a commit that referenced this pull request Sep 7, 2023
…rror

### What changes were proposed in this pull request?

Make INVALID_CURSOR.DISCONNECTED a retriable error.

### Why are the changes needed?

This error can happen if two RPCs are racing to reattach to the query, and the client is still using the losing one. SPARK-44833 was a bug that exposed such a situation. That was fixed, but to be more robust, we can make this error retryable.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Tests will be added in #42560

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #42818 from juliuszsompolski/SPARK-44835.

Authored-by: Juliusz Sompolski <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit f13743d)
Signed-off-by: Hyukjin Kwon <[email protected]>
@juliuszsompolski
Copy link
Contributor Author

(deleting comment - was supposed to be on another PR)

@juliuszsompolski
Copy link
Contributor Author

https://github.com/juliuszsompolski/apache-spark/actions/runs/6157633940/job/16708789265

[info] *** 1 TEST FAILED ***
[error] Failed tests:
[error] 	org.apache.spark.sql.kafka010.KafkaMicroBatchV1SourceWithAdminSuite

flaky

@hvanhovell
Copy link
Contributor

Alright merging.

hvanhovell pushed a commit that referenced this pull request Sep 12, 2023
### What changes were proposed in this pull request?

Add `SparkConnectServerTest` with infra to test real server with real client in the same process, but communicating over RPC.

Add `ReattachableExecuteSuite` with some tests for reattachable execute.

Two bugs were found by the tests:
* Fix bug in `SparkConnectExecutionManager.createExecuteHolder` when attempting to resubmit an operation that was deemed abandoned. This bug is benign in reattachable execute, because reattachable execute would first send a ReattachExecute, which would be handled correctly in SparkConnectReattachExecuteHandler. For non-reattachable execute (disabled or old client), this is also a very unlikely scenario, because the retrying mechanism should be able to resubmit before the query is declared abandoned, and hence get an INVALID_HANDLE.OPERATION_ALREADY_EXISTS. This bug can manifest only if a non-reattachable execution is retried with so much delay that the operation was declared abandoned.
* In `ExecuteGrpcResponseSender` there was an assertion that assumed that if `sendResponse` did not send, it was because deadline was reached. But it can also be because of interrupt. This would have resulted in interrupt returning an assertion error instead of CURSOR_DISCONNECTED in testing. Outside of testing assertions are not enabled, so this was not a problem outside of testing.

### Why are the changes needed?

Testing of reattachable execute.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Tests added.

Closes #42560 from juliuszsompolski/sc-reattachable-tests.

Authored-by: Juliusz Sompolski <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
(cherry picked from commit 4b96add)
Signed-off-by: Herman van Hovell <[email protected]>
HyukjinKwon pushed a commit that referenced this pull request Sep 13, 2023
…st result task is finished

### What changes were proposed in this pull request?

In the situation before, query will only be FINISHED when all results have been pushed into the output buffers (not necessarily received by client, but pushed out of the server).

For LocalTableScanExec, post FINISHED before sending result batches, because nothing is executed, only cached local results are returned. For regular execution, post FINISHED after all task results have been returned from Spark, not after they have been processed and sent out.

### Why are the changes needed?

Currently, even if a query finished running in Spark, it keeps being RUNNING until all results are sent. Then there is a very small difference between FINISHED and CLOSED. This change makes it behave more similar to e.g. Thriftserver.

### Does this PR introduce _any_ user-facing change?

Yes. Queries will be posted as FINISHED when they finish executing, not when they finish sending results.

### How was this patch tested?

Will add test in #42560

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #42889 from juliuszsompolski/SPARK-45133.

Authored-by: Juliusz Sompolski <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@juliuszsompolski
Copy link
Contributor Author

Apologies. I ran the final version 10+ times on my machine without flakes, but it may be related to the CI runner having less resources. Looking.

@juliuszsompolski
Copy link
Contributor Author

juliuszsompolski commented Sep 13, 2023

All the linked runs fail with the same:

[info] - client INVALID_CURSOR.DISCONNECTED error is retried when other RPC preempts this one *** FAILED *** (349 milliseconds)
[info]   org.apache.spark.SparkException: io.grpc.StatusRuntimeException: INTERNAL: [INVALID_CURSOR.POSITION_NOT_AVAILABLE] The cursor is invalid. The cursor position id 3367aa74-828f-4248-8a62-b7d6efe59019 is no longer available at index 2.
[info]   at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toThrowable(GrpcExceptionConverter.scala:113)
[info]   at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:41)
[info]   at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.next(GrpcExceptionConverter.scala:58)
[info]   at org.apache.spark.sql.connect.execution.ReattachableExecuteSuite.$anonfun$new$14(ReattachableExecuteSuite.scala:144)
[info]   at org.apache.spark.sql.connect.execution.ReattachableExecuteSuite.$anonfun$new$14$adapted(ReattachableExecuteSuite.scala:137)

It doesn't reproduced for me, but I know what could be going on, so will tweak the test.

@dongjoon-hyun
Copy link
Member

Thank you for checking, @juliuszsompolski .

@LuciferYang
Copy link
Contributor

@juliuszsompolski For the daily tests of Scala 2.13, in addition to client INVALID_CURSOR.DISCONNECTED error is retried when other RPC preempts this on, there is another unstable test abandoned query gets INVALID_HANDLE.OPERATION_ABANDONED error

https://github.com/apache/spark/actions/runs/6163764050/job/16728064919

[info] - abandoned query gets INVALID_HANDLE.OPERATION_ABANDONED error *** FAILED *** (133 milliseconds)
[info]   Expected exception org.apache.spark.SparkException to be thrown, but java.lang.StackOverflowError was thrown (ReattachableExecuteSuite.scala:173)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at org.scalatest.funsuite.AnyFunSuite.newAssertionFailedException(AnyFunSuite.scala:1564)
[info]   at org.scalatest.Assertions.intercept(Assertions.scala:756)
[info]   at org.scalatest.Assertions.intercept$(Assertions.scala:746)
[info]   at org.scalatest.funsuite.AnyFunSuite.intercept(AnyFunSuite.scala:1564)
[info]   at org.apache.spark.sql.connect.execution.ReattachableExecuteSuite.$anonfun$new$18(ReattachableExecuteSuite.scala:173)
[info]   at org.apache.spark.sql.connect.execution.ReattachableExecuteSuite.$anonfun$new$18$adapted(ReattachableExecuteSuite.scala:169)
[info]   at org.apache.spark.sql.connect.SparkConnectServerTest.withCustomBlockingStub(SparkConnectServerTest.scala:222)
[info]   at org.apache.spark.sql.connect.execution.ReattachableExecuteSuite.$anonfun$new$16(ReattachableExecuteSuite.scala:169)
[info]   at org.apache.spark.sql.connect.execution.ReattachableExecuteSuite.$anonfun$new$16$adapted(ReattachableExecuteSuite.scala:152)
[info]   at org.apache.spark.sql.connect.SparkConnectServerTest.withClient(SparkConnectServerTest.scala:199)
[info]   at org.apache.spark.sql.connect.execution.ReattachableExecuteSuite.$anonfun$new$15(ReattachableExecuteSuite.scala:152)
[info]   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
[info]   at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
[info]   at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
[info]   at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
[info]   at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
[info]   at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
[info]   at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
[info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)
[info]   at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
[info]   at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
[info]   at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:69)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info]   at scala.collection.immutable.List.foreach(List.scala:333)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
[info]   at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
[info]   at org.scalatest.Suite.run(Suite.scala:1114)
[info]   at org.scalatest.Suite.run$(Suite.scala:1096)
[info]   at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69)
[info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info]   at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:69)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info]   at java.lang.Thread.run(Thread.java:750)
[info]   Cause: java.lang.StackOverflowError:
[info]   at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36)
[info]   at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36)
[info]   at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36)
[info]   at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36)
[info]   at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36)
[info]   at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36)
[info]   at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36)
[info]   at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36)
[info]   at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36)
[info]   at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36)
[info]   at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36)
[info]   at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36)
[info]   at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36)
[info]   at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants