From eec090755aa5b7e6048fc004264a8f5d3591df1a Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Tue, 19 Sep 2023 16:56:38 +0800 Subject: [PATCH] [SPARK-45211][CONNECT] Eliminated ambiguous references in `CloseableIterator#apply` to fix Scala 2.13 daily test ### What changes were proposed in this pull request? This pr eliminated an ambiguous references in `org.apache.spark.sql.connect.client.CloseableIterator#apply` function to make the test case `abandoned query gets INVALID_HANDLE.OPERATION_ABANDONED error` can test pass with Scala 2.13. ### Why are the changes needed? `abandoned query gets INVALID_HANDLE.OPERATION_ABANDONED error` failed in the daily test of Scala 2.13: - https://github.com/apache/spark/actions/runs/6215331575/job/16868131377 image ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass GitHub Actions - Manual check run ``` dev/change-scala-version.sh 2.13 build/sbt "connect/testOnly org.apache.spark.sql.connect.execution.ReattachableExecuteSuite" -Pscala-2.13 ``` **Before** ``` [info] ReattachableExecuteSuite: [info] - reattach after initial RPC ends (2 seconds, 258 milliseconds) [info] - raw interrupted RPC results in INVALID_CURSOR.DISCONNECTED error (30 milliseconds) [info] - raw new RPC interrupts previous RPC with INVALID_CURSOR.DISCONNECTED error (21 milliseconds) [info] - client INVALID_CURSOR.DISCONNECTED error is retried when rpc sender gets interrupted (602 milliseconds) [info] - client INVALID_CURSOR.DISCONNECTED error is retried when other RPC preempts this one (637 milliseconds) [info] - abandoned query gets INVALID_HANDLE.OPERATION_ABANDONED error *** FAILED *** (70 milliseconds) [info] Expected exception org.apache.spark.SparkException to be thrown, but java.lang.StackOverflowError was thrown (ReattachableExecuteSuite.scala:172) [info] org.scalatest.exceptions.TestFailedException: [info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) [info] at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) [info] at org.scalatest.funsuite.AnyFunSuite.newAssertionFailedException(AnyFunSuite.scala:1564) [info] at org.scalatest.Assertions.intercept(Assertions.scala:756) [info] at org.scalatest.Assertions.intercept$(Assertions.scala:746) [info] at org.scalatest.funsuite.AnyFunSuite.intercept(AnyFunSuite.scala:1564) [info] at org.apache.spark.sql.connect.execution.ReattachableExecuteSuite.$anonfun$new$18(ReattachableExecuteSuite.scala:172) [info] at org.apache.spark.sql.connect.execution.ReattachableExecuteSuite.$anonfun$new$18$adapted(ReattachableExecuteSuite.scala:168) [info] at org.apache.spark.sql.connect.SparkConnectServerTest.withCustomBlockingStub(SparkConnectServerTest.scala:222) [info] at org.apache.spark.sql.connect.SparkConnectServerTest.withCustomBlockingStub$(SparkConnectServerTest.scala:216) [info] at org.apache.spark.sql.connect.execution.ReattachableExecuteSuite.withCustomBlockingStub(ReattachableExecuteSuite.scala:30) [info] at org.apache.spark.sql.connect.execution.ReattachableExecuteSuite.$anonfun$new$16(ReattachableExecuteSuite.scala:168) [info] at org.apache.spark.sql.connect.execution.ReattachableExecuteSuite.$anonfun$new$16$adapted(ReattachableExecuteSuite.scala:151) [info] at org.apache.spark.sql.connect.SparkConnectServerTest.withClient(SparkConnectServerTest.scala:199) [info] at org.apache.spark.sql.connect.SparkConnectServerTest.withClient$(SparkConnectServerTest.scala:191) [info] at org.apache.spark.sql.connect.execution.ReattachableExecuteSuite.withClient(ReattachableExecuteSuite.scala:30) [info] at org.apache.spark.sql.connect.execution.ReattachableExecuteSuite.$anonfun$new$15(ReattachableExecuteSuite.scala:151) [info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) [info] at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127) [info] at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282) [info] at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231) [info] at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230) [info] at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69) [info] at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155) [info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) [info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) [info] at org.scalatest.Transformer.apply(Transformer.scala:22) [info] at org.scalatest.Transformer.apply(Transformer.scala:20) [info] at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226) [info] at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227) [info] at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224) [info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236) [info] at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) [info] at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236) [info] at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218) [info] at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69) [info] at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234) [info] at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227) [info] at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:69) [info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269) [info] at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413) [info] at scala.collection.immutable.List.foreach(List.scala:333) [info] at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) [info] at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396) [info] at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475) [info] at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269) [info] at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268) [info] at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564) [info] at org.scalatest.Suite.run(Suite.scala:1114) [info] at org.scalatest.Suite.run$(Suite.scala:1096) [info] at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564) [info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273) [info] at org.scalatest.SuperEngine.runImpl(Engine.scala:535) [info] at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273) [info] at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272) [info] at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69) [info] at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213) [info] at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210) [info] at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208) [info] at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:69) [info] at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321) [info] at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517) [info] at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414) [info] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [info] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [info] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [info] at java.lang.Thread.run(Thread.java:750) [info] Cause: java.lang.StackOverflowError: [info] at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36) [info] at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36) [info] at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36) [info] at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36) [info] at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36) [info] at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36) [info] at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36) [info] at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36) [info] at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36) [info] at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36) [info] at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36) [info] at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36) [info] at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36) [info] at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36) [info] at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36) [info] at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36) ... [info] - client releases responses directly after consuming them (236 milliseconds) [info] - server releases responses automatically when client moves ahead (336 milliseconds) [info] - big query (863 milliseconds) [info] - big query and slow client (7 seconds, 14 milliseconds) [info] - big query with frequent reattach (735 milliseconds) [info] - big query with frequent reattach and slow client (7 seconds, 606 milliseconds) [info] - long sleeping query (10 seconds, 156 milliseconds) [info] Run completed in 34 seconds, 522 milliseconds. [info] Total number of tests run: 13 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 12, failed 1, canceled 0, ignored 0, pending 0 [info] *** 1 TEST FAILED *** [error] Failed tests: [error] org.apache.spark.sql.connect.execution.ReattachableExecuteSuite ``` **After** ``` [info] ReattachableExecuteSuite: [info] - reattach after initial RPC ends (2 seconds, 134 milliseconds) [info] - raw interrupted RPC results in INVALID_CURSOR.DISCONNECTED error (26 milliseconds) [info] - raw new RPC interrupts previous RPC with INVALID_CURSOR.DISCONNECTED error (19 milliseconds) [info] - client INVALID_CURSOR.DISCONNECTED error is retried when rpc sender gets interrupted (328 milliseconds) [info] - client INVALID_CURSOR.DISCONNECTED error is retried when other RPC preempts this one (562 milliseconds) [info] - abandoned query gets INVALID_HANDLE.OPERATION_ABANDONED error (46 milliseconds) [info] - client releases responses directly after consuming them (231 milliseconds) [info] - server releases responses automatically when client moves ahead (359 milliseconds) [info] - big query (978 milliseconds) [info] - big query and slow client (7 seconds, 50 milliseconds) [info] - big query with frequent reattach (703 milliseconds) [info] - big query with frequent reattach and slow client (7 seconds, 626 milliseconds) [info] - long sleeping query (10 seconds, 141 milliseconds) [info] Run completed in 33 seconds, 844 milliseconds. [info] Total number of tests run: 13 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 13, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes #42981 from LuciferYang/CloseableIterator-apply. Authored-by: yangjie01 Signed-off-by: yangjie01 --- .../apache/spark/sql/connect/client/CloseableIterator.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/CloseableIterator.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/CloseableIterator.scala index d3fc9963edc7a..810158b2ac8b3 100644 --- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/CloseableIterator.scala +++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/CloseableIterator.scala @@ -48,9 +48,9 @@ private[sql] object CloseableIterator { */ def apply[T](iterator: Iterator[T]): CloseableIterator[T] = iterator match { case closeable: CloseableIterator[T] => closeable - case _ => + case iter => new WrappedCloseableIterator[T] { - override def innerIterator = iterator + override def innerIterator: Iterator[T] = iter } } }