Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Extra GpuColumnarToRow when using ParquetCachedBatchSerializer on databricks #2896

Closed
viadea opened this issue Jul 9, 2021 · 5 comments
Assignees
Labels
bug Something isn't working P1 Nice to have for release

Comments

@viadea
Copy link
Collaborator

viadea commented Jul 9, 2021

Describe the bug
A clear and concise description of what the bug is.

This is a follow-up issue related to #2880.
When using ParquetCachedBatchSerializer on databricks 8.2ML GPU, I found there is an extra GpuColumnarToRow right before InMemoryRelation.

For example:

  1. Databricks plan:
== Physical Plan ==
GpuColumnarToRow false
+- GpuHashAggregate(keys=[], functions=[gpucount(distinct _gen_alias_217#217)]), filters=List(None))
   +- GpuShuffleCoalesce 2147483647
      +- GpuColumnarExchange gpusinglepartitioning$(), ENSURE_REQUIREMENTS, [id=#963]
         +- GpuHashAggregate(keys=[], functions=[partial_gpucount(distinct _gen_alias_217#217)]), filters=List(None))
            +- GpuHashAggregate(keys=[_gen_alias_217#217], functions=[]), filters=List())
               +- GpuShuffleCoalesce 2147483647
                  +- GpuColumnarExchange gpuhashpartitioning(_gen_alias_217#217, 200), ENSURE_REQUIREMENTS, [id=#952]
                     +- GpuHashAggregate(keys=[_gen_alias_217#217], functions=[]), filters=List())
                        +- GpuProject [col#152.name.firstname AS _gen_alias_217#217]
                           +- GpuInMemoryTableScan [col#152]
                                 +- InMemoryRelation [col#152], StorageLevel(disk, memory, deserialized, 1 replicas)
                                       +- GpuColumnarToRow false
                                          +- GpuProject [named_struct(name, name#57, newname, named_struct(firstname, name#57.firstname, lastname, name#57.lastname)) AS col#152]
                                             +- GpuFileGpuScan parquet [name#57] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/tmp/testparquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<name:struct<firstname:string,middlename:string,lastname:string>
  1. Standalone cluster's plan:
== Physical Plan ==
GpuColumnarToRowTransition false
+- GpuHashAggregate(keys=[], functions=[gpucount(distinct _gen_alias_117#117)]), filters=List(None))
   +- GpuShuffleCoalesce 2147483647
      +- GpuColumnarExchange gpusinglepartitioning$(), ENSURE_REQUIREMENTS, [id=#266]
         +- GpuHashAggregate(keys=[], functions=[partial_gpucount(distinct _gen_alias_117#117)]), filters=List(None))
            +- GpuHashAggregate(keys=[_gen_alias_117#117], functions=[]), filters=List())
               +- GpuShuffleCoalesce 2147483647
                  +- GpuColumnarExchange gpuhashpartitioning(_gen_alias_117#117, 200), ENSURE_REQUIREMENTS, [id=#255]
                     +- GpuHashAggregate(keys=[_gen_alias_117#117], functions=[]), filters=List())
                        +- GpuProject [col#62.name.firstname AS _gen_alias_117#117]
                           +- GpuInMemoryTableScan [col#62]
                                 +- InMemoryRelation [col#62], StorageLevel(disk, memory, deserialized, 1 replicas)
                                       +- GpuProject [named_struct(name, name#16, newname, named_struct(firstname, name#16.firstname, lastname, name#16.lastname)) AS col#62]
                                          +- GpuFileGpuScan parquet [name#16] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/testparquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<name:struct<firstname:string,middlename:string,lastname:string>>

Steps/Code to reproduce bug
Please provide a list of steps or a code sample to reproduce the issue.
Avoid posting private or sensitive data.

Same reproduce code as #2856

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

val data = Seq(
    Row(Row("Adam ","","Green"),"1","M",1000),
    Row(Row("Bob ","Middle","Green"),"2","M",2000),
    Row(Row("Cathy ","","Green"),"3","F",3000)
)

val schema = (new StructType()
  .add("name",new StructType()
    .add("firstname",StringType)
    .add("middlename",StringType)
    .add("lastname",StringType)) 
  .add("id",StringType)
  .add("gender",StringType)
  .add("salary",IntegerType))

val df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema)
df.write.format("parquet").mode("overwrite").save("/tmp/testparquet")
val df2 = spark.read.parquet("/tmp/testparquet")
df2.createOrReplaceTempView("df2")
val df3=spark.sql("select struct(name, struct(name.firstname, name.lastname) as newname) as col from df2").cache
df3.createOrReplaceTempView("df3")

spark.sql("select count(distinct col.name.firstname) from df3").show
spark.sql("select count(distinct col.name.firstname) from df3").explain

Expected behavior
A clear and concise description of what you expected to happen.

The expectation is the databricks' plan should not have the extra GpuColumnarToRow.

Environment details (please complete the following information)

  • Environment location: [Standalone, YARN, Kubernetes, Cloud(specify cloud provider)]
  • Spark configuration settings related to the issue

Databricks 8.2ML GPU with spark 3.1.1
Using the 21.08 snapshot jar with the fix from #2880 .

Additional context
Add any other context about the problem here.

@viadea viadea added bug Something isn't working ? - Needs Triage Need team to review and classify labels Jul 9, 2021
@Salonijain27 Salonijain27 removed the ? - Needs Triage Need team to review and classify label Jul 13, 2021
@sperlingxx sperlingxx self-assigned this Sep 15, 2021
@sperlingxx
Copy link
Collaborator

sperlingxx commented Sep 17, 2021

Hi @viadea, with some investigation, I infer that the existence of extra GpuColumnarToRow is due to the extra ColumnarToRow.

According to ApplyColumnarRulesAndInsertTransitions, there will be a ColumnarToRow inserted on the top of the plan if the root node supports columnar.

  private def insertTransitions(plan: SparkPlan): SparkPlan = {
    if (plan.supportsColumnar) {
      // The tree feels kind of backwards
      // This is the end of the columnar processing so go back to rows
      ColumnarToRowExec(insertRowToColumnar(plan))
    } else if (!plan.isInstanceOf[ColumnarToRowTransition]) {
      plan.withNewChildren(plan.children.map(insertTransitions))
    } else {
      plan
    }
  }

In current case, the GpuColumnarToRow doesn't exist on the top of the cached plan because it will be removed when building InMemoryRelation from the plan to cache.

  def convertToColumnarIfPossible(plan: SparkPlan): SparkPlan = plan match {
    case gen: WholeStageCodegenExec => gen.child match {
      case c2r: ColumnarToRowTransition => c2r.child match {
        case ia: InputAdapter => ia.child
        case _ => plan
      }
      case _ => plan
    }
    case c2r: ColumnarToRowTransition => // This matches when whole stage code gen is disabled.
      c2r.child
    case _ => plan
  }

Therefore, the existence of GpuColumnarToRow on the Databricks runtime may be caused by some internal modifications in terms of InMemoryRelation. If it is true, I think we can do little about the issue.

@jlowe
Copy link
Contributor

jlowe commented Sep 21, 2021

If the GpuColumnarToRow is indeed extraneous, we should be able to remove it since we're the ones placing it in the plan. There have been other plugin rules to catch similar redundant transitions, and I think we could check for a sequence of ColumnarToRow -> InMemoryRelation -> InMemoryTableScan and replace it with InMemoryRelation -> GpuInMemoryTableScan or alternatively do a post-process after replacing with GPU nodes to detect the problematic sequence and remove the extraneous transition at that point.

@Salonijain27 Salonijain27 added the P1 Nice to have for release label Sep 21, 2021
@sperlingxx
Copy link
Collaborator

sperlingxx commented Sep 24, 2021

Hi @viadea @jlowe , I ran the same query on Databricks cluster with Spark 3.1.1 and Rapids 21.08, but I didn't catch the extra GpuColumnarToRow beneath the InMemoryRelation.

GpuColumnarToRowTransition false
+- GpuHashAggregate(keys=[], functions=[gpucount(distinct _gen_alias_88#88)]), filters=List(None))
   +- GpuShuffleCoalesce 2147483647
      +- GpuColumnarExchange gpusinglepartitioning$(), ENSURE_REQUIREMENTS, [id=#235]
         +- GpuHashAggregate(keys=[], functions=[partial_gpucount(distinct _gen_alias_88#88)]), filters=List(None))
            +- GpuHashAggregate(keys=[_gen_alias_88#88], functions=[]), filters=List())
               +- GpuShuffleCoalesce 2147483647
                  +- GpuColumnarExchange gpuhashpartitioning(_gen_alias_88#88, 200), ENSURE_REQUIREMENTS, [id=#224]
                     +- GpuHashAggregate(keys=[_gen_alias_88#88], functions=[]), filters=List())
                        +- GpuProject [col#23.name.firstname AS _gen_alias_88#88]
                           +- GpuInMemoryTableScan [col#23]
                                 +- InMemoryRelation [col#23], StorageLevel(disk, memory, deserialized, 1 replicas)
                                       +- GpuProject [named_struct(name, name#14, newname, named_struct(firstname, name#14.firstname, lastname, name#14.lastname)) AS col#23]
                                          +- GpuFileGpuScan parquet [name#14] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/tmp/testparquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<name:struct<firstname:string,middlename:string,lastname:string>>

Here is the driver log: https://dbc-9ff9942e-a9c4.cloud.databricks.com/?o=8721196619973675#setting/sparkui/0924-064308-stem303/driver-logs
Here is the python script which I ran with:

from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("debug123")\
    .config("spark.sql.cache.serializer", "com.nvidia.spark.rapids.shims.spark311.ParquetCachedBatchSerializer")\
    .getOrCreate()
data = [
    Row(Row("Adam ", "", "Green"), "1", "M", 1000),
    Row(Row("Bob ", "Middle", "Green"), "2", "M", 2000),
    Row(Row("Cathy ", "", "Green"), "3", "F", 3000)]
schema = StructType() \
    .add("name", StructType()
         .add("firstname", StringType())
         .add("middlename", StringType())
         .add("lastname", StringType())) \
    .add("id", StringType()) \
    .add("gender", StringType()) \
    .add("salary", IntegerType())
df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema)
df.write.format("parquet").mode("overwrite").save("/tmp/testparquet")
df2 = spark.read.parquet("/tmp/testparquet")
df2.createOrReplaceTempView("df2")
df3 = spark.sql("select struct(name, struct(name.firstname, name.lastname) as newname) as col from df2").cache()
df3.createOrReplaceTempView("df3")
spark.sql("select count(distinct col.name.firstname) from df3").show()
spark.sql("select count(distinct col.name.firstname) from df3").explain()

@viadea
Copy link
Collaborator Author

viadea commented Oct 14, 2021

I also tested 21.10 snapshot and found the same thing, the extra GpuColumnarToRow is gone.
Even though I do not know what changes/PR made this work, but I am assuming this issue is fixed,right?

@jlowe
Copy link
Contributor

jlowe commented Oct 22, 2021

Even though I do not know what changes/PR made this work, but I am assuming this issue is fixed,right?

I don't know of anything that would have explicitly fixed this. Pinging @razajafri in case he knows of a change that could be related.

Closing this as it's not reproducible on recent builds. We can reopen if it appears again.

@jlowe jlowe closed this as completed Oct 22, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working P1 Nice to have for release
Projects
None yet
Development

No branches or pull requests

4 participants