Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] GpuCompressedColumnVector cannot be cast to GpuColumnVector with AQE #2901

Closed
abellina opened this issue Jul 9, 2021 · 3 comments · Fixed by #2924
Closed

[BUG] GpuCompressedColumnVector cannot be cast to GpuColumnVector with AQE #2901

abellina opened this issue Jul 9, 2021 · 3 comments · Fixed by #2924
Assignees
Labels
bug Something isn't working P0 Must have for release

Comments

@abellina
Copy link
Collaborator

abellina commented Jul 9, 2021

For a query like:

 val df = sc.parallelize(0 until 100000).toDF
 df.createOrReplaceTempView("foo")
 spark.sql("select sum(value) as s from foo group by value").repartition(1000).collect

With a plan:

== Physical Plan ==
AdaptiveSparkPlan (13)
+- == Final Plan ==
   GpuColumnarToRow (12)
   +- ShuffleQueryStage (11)
      +- GpuColumnarExchange (10)
         +- GpuHashAggregate (9)
            +- GpuCoalesceBatches (8)
               +- GpuCustomShuffleReader (7)
                  +- ShuffleQueryStage (6)
                     +- GpuColumnarExchange (5)
                        +- GpuHashAggregate (4)
                           +- GpuRowToColumnar (3)
                              +- * SerializeFromObject (2)
                                 +- Scan (1)

Fails with AQE with the following exception:

Caused by: java.lang.ClassCastException: com.nvidia.spark.rapids.GpuCompressedColumnVector cannot be cast to com.nvidia.spark.rapids.GpuColumnVector
  at com.nvidia.spark.rapids.GpuColumnVector.extractColumns(GpuColumnVector.java:976)
  at com.nvidia.spark.rapids.ColumnarToRowIterator.$anonfun$loadNextBatch$2(GpuColumnarToRowExec.scala:203)
  at com.nvidia.spark.rapids.ColumnarToRowIterator.$anonfun$loadNextBatch$2$adapted(GpuColumnarToRowExec.scala:201)
  at com.nvidia.spark.rapids.Arm.withResource(Arm.scala:28)
  at com.nvidia.spark.rapids.Arm.withResource$(Arm.scala:26)
  at com.nvidia.spark.rapids.ColumnarToRowIterator.withResource(GpuColumnarToRowExec.scala:177)
  at com.nvidia.spark.rapids.ColumnarToRowIterator.$anonfun$loadNextBatch$1(GpuColumnarToRowExec.scala:201)
  at com.nvidia.spark.rapids.ColumnarToRowIterator.$anonfun$loadNextBatch$1$adapted(GpuColumnarToRowExec.scala:200)
  at scala.Option.foreach(Option.scala:407)
  at com.nvidia.spark.rapids.ColumnarToRowIterator.loadNextBatch(GpuColumnarToRowExec.scala:200)
  at com.nvidia.spark.rapids.ColumnarToRowIterator.hasNext(GpuColumnarToRowExec.scala:238)
  at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  at org.apache.spark.scheduler.Task.run(Task.scala:131)
  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

Without AQE, the query works, and the plan is:

== Physical Plan ==
GpuColumnarToRow false
+- GpuCoalesceBatches targetsize(2147483647)
   +- GpuColumnarExchange gpuroundrobinpartitioning(1000), REPARTITION_WITH_NUM, [id=#230]
      +- GpuHashAggregate(keys=[value#2], functions=[gpusum(cast(value#2 as bigint), LongType)], output=[s#34L])
         +- GpuCoalesceBatches targetsize(2147483647)
            +- GpuColumnarExchange gpuhashpartitioning(value#2, 10), ENSURE_REQUIREMENTS, [id=#227]
               +- GpuHashAggregate(keys=[value#2], functions=[partial_gpusum(cast(value#2 as bigint), LongType)], output=[value#2, sum#38L])
                  +- GpuRowToColumnar targetsize(2147483647)
                     +- *(1) SerializeFromObject [input[0, int, false] AS value#2]
                        +- Scan[obj#1]

It looks like with AQE we are missing a coalesce after the shuffle in this case. Another instance of: #2378

@abellina abellina added bug Something isn't working ? - Needs Triage Need team to review and classify labels Jul 9, 2021
@abellina abellina added the P0 Must have for release label Jul 9, 2021
@Salonijain27 Salonijain27 removed the ? - Needs Triage Need team to review and classify label Jul 12, 2021
@andygrove andygrove self-assigned this Jul 12, 2021
@andygrove andygrove added this to the July 5 - July 16 milestone Jul 12, 2021
@andygrove
Copy link
Contributor

andygrove commented Jul 12, 2021

I haven't been able to reproduce this yet. I see this plan on GPU and it contains GpuShuffleCoalesce which is missing from the plan in the issue description.

AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   GpuColumnarToRow false
   +- GpuShuffleCoalesce 2147483647
      +- ShuffleQueryStage 1
         +- GpuColumnarExchange gpuroundrobinpartitioning(1000), REPARTITION_WITH_NUM, [id=#109]
            +- GpuHashAggregate(keys=[value#2], functions=[gpusum(cast(value#2 as bigint), LongType)], output=[s#6L])
               +- GpuShuffleCoalesce 2147483647
                  +- GpuCustomShuffleReader coalesced
                     +- ShuffleQueryStage 0
                        +- GpuColumnarExchange gpuhashpartitioning(value#2, 200), ENSURE_REQUIREMENTS, [id=#70]
                           +- GpuHashAggregate(keys=[value#2], functions=[partial_gpusum(cast(value#2 as bigint), LongType)], output=[value#2, sum#10L])
                              +- GpuRowToColumnar targetsize(2147483647)
                                 +- *(1) SerializeFromObject [input[0, int, false] AS value#2]
                                    +- Scan[obj#1]

@abellina Can you provide more information such as Spark version and full configs that were in use?

@abellina
Copy link
Collaborator Author

I am using spark 3.1.1, here's a minimum reproducing set of options:

$SPARK_HOME/bin/spark-shell \
--master $SPARK_MASTER \
--conf spark.sql.adaptive.enabled=true \
--conf spark.plugins=com.nvidia.spark.SQLPlugin \
--conf spark.executor.extraClassPath=$RAPIDS_CUDF_JAR:$RAPIDS_PLUGIN_JAR:$RAPIDS_SHUFFLE_JAR:$BENCH_JAR \
--conf spark.driver.extraClassPath=$RAPIDS_CUDF_JAR:$RAPIDS_PLUGIN_JAR:$RAPIDS_SHUFFLE_JAR:$BENCH_JAR \
--conf spark.shuffle.manager=com.nvidia.spark.rapids.spark311.RapidsShuffleManager

This seems specific to a rule around GpuCoalesceBatches not GpuShuffleCoalesce. If I disable the RapidsShuffleManager:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   GpuColumnarToRow false
   +- GpuShuffleCoalesce 2147483647
      +- ShuffleQueryStage 1
         +- GpuColumnarExchange gpuroundrobinpartitioning(1000), REPARTITION_WITH_NUM, [id=#100]
            +- GpuHashAggregate(keys=[value#2], functions=[gpusum(cast(value#2 as bigint), LongType)], output=[s#6L])
               +- GpuShuffleCoalesce 2147483647
                  +- GpuCustomShuffleReader coalesced
                     +- ShuffleQueryStage 0
                        +- GpuColumnarExchange gpuhashpartitioning(value#2, 200), ENSURE_REQUIREMENTS, [id=#61]
                           +- GpuHashAggregate(keys=[value#2], functions=[partial_gpusum(cast(value#2 as bigint), LongType)], output=[value#2, sum#10L])
                              +- GpuRowToColumnar targetsize(2147483647)
                                 +- *(1) SerializeFromObject [input[0, int, false] AS value#2]
                                    +- Scan[obj#1]

If I enable it (this is final because I disabled lz4 compression, so the query finished):

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   GpuColumnarToRow false
   +- ShuffleQueryStage 1
      +- GpuColumnarExchange gpuroundrobinpartitioning(1000), REPARTITION_WITH_NUM, [id=#100]
         +- GpuHashAggregate(keys=[value#2], functions=[gpusum(cast(value#2 as bigint), LongType)], output=[s#6L])
            +- GpuCoalesceBatches targetsize(2147483647)
               +- GpuCustomShuffleReader coalesced
                  +- ShuffleQueryStage 0
                     +- GpuColumnarExchange gpuhashpartitioning(value#2, 200), ENSURE_REQUIREMENTS, [id=#61]
                        +- GpuHashAggregate(keys=[value#2], functions=[partial_gpusum(cast(value#2 as bigint), LongType)], output=[value#2, sum#10L])
                           +- GpuRowToColumnar targetsize(2147483647)
                              +- *(1) SerializeFromObject [input[0, int, false] AS value#2]
                                 +- Scan[obj#1]

@andygrove
Copy link
Contributor

I have been able to reproduce the issue locally now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working P0 Must have for release
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants