Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Input partition differences using show() #882

Closed
jlowe opened this issue Sep 29, 2020 · 9 comments
Closed

[BUG] Input partition differences using show() #882

jlowe opened this issue Sep 29, 2020 · 9 comments
Assignees
Labels
P1 Nice to have for release performance A performance related task/issue

Comments

@jlowe
Copy link
Contributor

jlowe commented Sep 29, 2020

Describe the bug
While working with AWS EMR Spark, I noticed a difference in the number of input partitions for a simple query. I'm not sure this is a bug, per-se, as both the CPU and GPU queries appeared to execute correctly. However I thought it might be worth investigating to see if there's a potential performance problem/opportunity for the plugin case.

Steps/Code to reproduce bug
Execute the following query and notice the difference between the number of tasks in the first stage between a run on the CPU and a run on the GPU. Update some_bucket_path with an appropriate writable bucket path.

val data = 1 to 10000
val d1 = sc.parallelize(data).toDF()
d1.write.mode("overwrite").parquet("s3://some_bucket_path/df1.parquet")
spark.read.parquet("s3://some_bucket_path/df1.parquet").createOrReplaceTempView("test")
spark.sql("SELECT * FROM test").show()

Expected behavior
The same number of input partitions (i.e.: tasks in the first stage) appear regardless of CPU vs. GPU run.

**Environment details **
AWS EMR w/ Spark 3.0

@jlowe jlowe added bug Something isn't working performance A performance related task/issue labels Sep 29, 2020
@tgravescs tgravescs added this to the Sep 28 - Oct 9 milestone Sep 30, 2020
@tgravescs
Copy link
Collaborator

I ran the query above and am seeing the same number of partitions in the stages.

@tgravescs
Copy link
Collaborator

ok I tried reading a large file and I"m seeing cpu use 1 task on CPU side and 71 on the GPU side just when I do a show().

@tgravescs
Copy link
Collaborator

tgravescs commented Sep 30, 2020

this actually happens in local mode as well. It looks like the way we are handling limit. Plan on GPU:

*(1) GpuColumnarToRow false
+- GpuGlobalLimit 21
   +- GpuCoalesceBatches TargetSize(2147483647)
      +- GpuColumnarExchange gpusinglepartitioning(), [id=#82]
         +- GpuLocalLimit 21
            +- GpuProject [cast(RECORD_ID#80L as string) AS RECORD_ID#184, RECIPIENT_ID#81, MESSAGE_ID#82, CHANNEL#83, DELIVERY_ID#84, cast(DELIVERY_TIME#85 as string) AS DELIVERY_TIME#203, cast(DOW#86 as string) AS DOW#189, cast(HOD#87 as string) AS HOD#190, cast(OPEN_TIME#88 as string) AS OPEN_TIME#204, cast(CLICK_TIME#89 as string) AS CLICK_TIME#205, cast(OPTOUT_TIME#90 as string) AS OPTOUT_TIME#206, cast(OUTCOME_END_TIME#91 as string) AS OUTCOME_END_TIME#207, cast(LABEL_SENT#92 as string) AS LABEL_SENT#191, cast(LABEL_OPEN#93 as string) AS LABEL_OPEN#192, cast(LABEL_CLICK#94 as string) AS LABEL_CLICK#193, cast(LABEL_OPTOUT#95 as string) AS LABEL_OPTOUT#194, cast(SENT_COUNT#96 as string) AS SENT_COUNT#195, cast(OPEN_COUNT#97 as string) AS OPEN_COUNT#196, cast(CLICK_COUNT#98 as string) AS CLICK_COUNT#197, cast(SENT_DISTINCT_COUNT_BY_DAY#99 as string) AS SENT_DISTINCT_COUNT_BY_DAY#198, cast(OPEN_DISTINCT_COUNT_BY_DAY#100 as string) AS OPEN_DISTINCT_COUNT_BY_DAY#199, cast(CLICK_DISTINCT_COUNT_BY_DAY#101 as string) AS CLICK_DISTINCT_COUNT_BY_DAY#200, cast(OPEN_DELAY#102 as string) AS OPEN_DELAY#201, cast(CLICK_DELAY#103 as string) AS CLICK_DELAY#202]
               +- GpuBatchScan[RECORD_ID#80L, RECIPIENT_ID#81, MESSAGE_ID#82, CHANNEL#83, DELIVERY_ID#84, DELIVERY_TIME#85, DOW#86, HOD#87, OPEN_TIME#88, CLICK_TIME#89, OPTOUT_TIME#90, OUTCOME_END_TIME#91, LABEL_SENT#92, LABEL_OPEN#93, LABEL_CLICK#94, LABEL_OPTOUT#95, SENT_COUNT#96, OPEN_COUNT#97, CLICK_COUNT#98, SENT_DISTINCT_COUNT_BY_DAY#99, OPEN_DISTINCT_COUNT_BY_DAY#100, CLICK_DISTINCT_COUNT_BY_DAY#101, OPEN_DELAY#102, CLICK_DELAY#103] GpuParquetScan DataFilters: [], Location: InMemoryFileIndex[s3://journeyai/tgraves_all_labels_out], PartitionFilters: [], ReadSchema: struct<RECORD_ID:bigint,RECIPIENT_ID:string,MESSAGE_ID:string,CHANNEL:string,DELIVERY_ID:string,D..., PushedFilters: []

Plan on CPU:

CollectLimit 21
+- *(1) Project [cast(RECORD_ID#281L as string) AS RECORD_ID#353, RECIPIENT_ID#282, MESSAGE_ID#283, CHANNEL#284, DELIVERY_ID#285, cast(DELIVERY_TIME#286 as string) AS DELIVERY_TIME#372, cast(DOW#287 as string) AS DOW#358, cast(HOD#288 as string) AS HOD#359, cast(OPEN_TIME#289 as string) AS OPEN_TIME#373, cast(CLICK_TIME#290 as string) AS CLICK_TIME#374, cast(OPTOUT_TIME#291 as string) AS OPTOUT_TIME#375, cast(OUTCOME_END_TIME#292 as string) AS OUTCOME_END_TIME#376, cast(LABEL_SENT#293 as string) AS LABEL_SENT#360, cast(LABEL_OPEN#294 as string) AS LABEL_OPEN#361, cast(LABEL_CLICK#295 as string) AS LABEL_CLICK#362, cast(LABEL_OPTOUT#296 as string) AS LABEL_OPTOUT#363, cast(SENT_COUNT#297 as string) AS SENT_COUNT#364, cast(OPEN_COUNT#298 as string) AS OPEN_COUNT#365, cast(CLICK_COUNT#299 as string) AS CLICK_COUNT#366, cast(SENT_DISTINCT_COUNT_BY_DAY#300 as string) AS SENT_DISTINCT_COUNT_BY_DAY#367, cast(OPEN_DISTINCT_COUNT_BY_DAY#301 as string) AS OPEN_DISTINCT_COUNT_BY_DAY#368, cast(CLICK_DISTINCT_COUNT_BY_DAY#302 as string) AS CLICK_DISTINCT_COUNT_BY_DAY#369, cast(OPEN_DELAY#303 as string) AS OPEN_DELAY#370, cast(CLICK_DELAY#304 as string) AS CLICK_DELAY#371]
   +- *(1) ColumnarToRow
      +- BatchScan[RECORD_ID#281L, RECIPIENT_ID#282, MESSAGE_ID#283, CHANNEL#284, DELIVERY_ID#285, DELIVERY_TIME#286, DOW#287, HOD#288, OPEN_TIME#289, CLICK_TIME#290, OPTOUT_TIME#291, OUTCOME_END_TIME#292, LABEL_SENT#293, LABEL_OPEN#294, LABEL_CLICK#295, LABEL_OPTOUT#296, SENT_COUNT#297, OPEN_COUNT#298, CLICK_COUNT#299, SENT_DISTINCT_COUNT_BY_DAY#300, OPEN_DISTINCT_COUNT_BY_DAY#301, CLICK_DISTINCT_COUNT_BY_DAY#302, OPEN_DELAY#303, CLICK_DELAY#304] ParquetScan DataFilters: [], Location: InMemoryFileIndex[s3://journeyai/tgraves_all_labels_out], PartitionFilters: [], ReadSchema: struct<RECORD_ID:bigint,RECIPIENT_ID:string,MESSAGE_ID:string,CHANNEL:string,DELIVERY_ID:string,D..., PushedFilters: []

@tgravescs tgravescs changed the title [BUG] Input partition differences in EMR [BUG] Input partition differences using show() Sep 30, 2020
@revans2
Copy link
Collaborator

revans2 commented Sep 30, 2020

We have an explicit shuffle in the GPU plan where as GlobalLimit on the CPU hides the shuffle (does it all internally with an RDD). I don't see how that would impact the input partitioning of the batch scan.

@tgravescs
Copy link
Collaborator

this is because we do a local limit on all the partitions first, then shuffle, then global limit. So we naturally run tasks on all the partitions doing a local limit on that. Spark itself does a special executeTake that starts with 1 partition and executes a job on that and then scales up the number of partitions it reads each iteration if it doesn't get the limit it needs.

@tgravescs
Copy link
Collaborator

tgravescs commented Oct 1, 2020

so running this on a bunch of small files really shows a performance hit on the GPU side. if we just do a read and then show my little test took 25 seconds on GPU and it took < 1 sec(0.3 sec).

CollectLimitExec is a bit tricky because there are 2 paths it can take. The first is when you actually pull data back to driver so like show(), limit.collect(), take, etc. These call CollectLimitExec.executeCollect(). The second one is when there its not the last operation, I'm not sure all the cases but one I found is .limit(20).cache(). In the second case it actually calls doExecute(). In the second case the first time you execute after the cache (df.count()) it run over all partitions on both cpu and gpu.

The problem with the executeCollect is that we have to be the first thing in the plan to get that executed, otherwise it goes calls spark default executeCollect which does all partitions and we have a wholestagecodegen that does the columnartorow as the first thing so it never gets called.

I did a few tests:

  1. just do CollectLimitExec on CPU - took about 1 second
  2. hacked it so instead of replacing CollectLimitExec we leave it but insert into a GPULimit in front of it so the limit happens on the GPU first. This though end up reading all the data and then limiting afterwards, where as leaving it on the CPU only ended up reading just above the limit number (1 batch) and stopped.

Doing some more testing rather than just reading and show I haven't seen any worse performance in us not replacing CollectLimitExec. spark is smart and just stops calling next after it reaches. Now for us that is at least 1 batch, which could potentially be millions of rows. The test I ran pulled back 130000 rows and didn't see any performance difference.

Another thing, reading and then sorting and then show takes another path through TakeOrderedAndProject that we don't replace either.

I'm tempted to just shut this off for now so we don't replace CollectLimitExec. There might be some cases where our batches are huge - millions of rows where there is some difference but I've never seen times this is hurting our performance.

@tgravescs tgravescs removed the bug Something isn't working label Oct 1, 2020
@tgravescs
Copy link
Collaborator

put up PR #900 to turn off for now and we can leave this open to investigate better solutions.

@tgravescs
Copy link
Collaborator

so I was actually able to reproduce the different number of parititons on EMR on a tpcds q38 with hte collectLimitExec turned off on the gpu side. If we use the EMR file reader the CPU gets a lot less partitions. If we change the reader back to the v2 reader then we get the same number on the CPU as the GPU.
This somewhat seems broken to me as I assume its not honoring the spark config, but I'll have to investigate further

@tgravescs tgravescs removed this from the Sep 28 - Oct 9 milestone Oct 12, 2020
@sameerz sameerz added the P1 Nice to have for release label Nov 17, 2020
@mattahrens
Copy link
Collaborator

Closing as won't fix for now

tgravescs pushed a commit to tgravescs/spark-rapids that referenced this issue Nov 30, 2023
…IDIA#882)

Signed-off-by: spark-rapids automation <[email protected]>

Signed-off-by: spark-rapids automation <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
P1 Nice to have for release performance A performance related task/issue
Projects
None yet
Development

No branches or pull requests

5 participants