Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Execution error when I run TPC-DS Query-67 [QST] #458

Closed
chenrui17 opened this issue Jul 29, 2020 · 26 comments
Closed

Execution error when I run TPC-DS Query-67 [QST] #458

chenrui17 opened this issue Jul 29, 2020 · 26 comments
Labels
question Further information is requested

Comments

@chenrui17
Copy link

image

my configuration is :
dataset is TPC-DS 2.8T parquet;
spark-rapids version is 0.15;
spark-shell
--master yarn
--num-executors 3
--conf spark.plugins=com.nvidia.spark.SQLPlugin
--conf spark.executor.cores=48
--conf spark.executor.memory=144g
--conf spark.rapids.sql.concurrentGpuTasks=1
--conf spark.rapids.memory.pinnedPool.size=30G
--conf spark.executor.memoryOverhead=30g
--conf spark.sql.broadcastTimeout=1500
--conf spark.locality.wait=0s
--conf spark.sql.files.maxPartitionBytes=1024m
--conf spark.rapids.sql.explain=ALL
--conf spark.rapids.sql.castFloatToString.enabled=true
--conf spark.sql.shuffle.partitions=288
--conf spark.rapids.sql.variableFloatAgg.enabled=true
--conf spark.sql.optimizer.inSetConversionThreshold=1000
--conf spark.rapids.memory.gpu.pooling.enabled=false
--conf spark.executor.resource.gpu.amount=1
--conf spark.task.resource.gpu.amount=0.020833
--conf spark.executor.resource.gpu.discoveryScript=/home/hduser/nvidia-spark3.0-rapids/getGpusResources.sh
--files /home/hduser/spark-3.0.1-SNAPSHOT-bin-hadoop3/conf/executor_log4j.properties
--jars ${SPARK_CUDF_JAR},${SPARK_RAPIDS_PLUGIN_JAR} \

query-67 code:
val rdd_store_sales = spark.read.parquet("/tpcds/9.1T_parquet/store_sales/part*.parquet");
val df_store_sales = rdd_store_sales.toDF("ss_sold_date_sk", "ss_sold_time_sk", "ss_item_sk", "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", "ss_addr_sk", "ss_store_sk", "ss_promo_sk", "ss_ticket_number", "ss_quantity", "ss_wholesale_cost", "ss_list_price", "ss_sales_price", "ss_ext_discount_amt", "ss_ext_sales_price", "ss_ext_wholesale_cost", "ss_ext_list_price", "ss_ext_tax", "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax", "ss_net_profit");
df_store_sales.createOrReplaceTempView("store_sales");

val rdd_store = spark.read.parquet("/tpcds/9.1T_parquet/store/part*.parquet");
val df_store = rdd_store.toDF("s_store_sk", "s_store_id", "s_rec_start_date", "s_rec_end_date", "s_closed_date_sk", "s_store_name", "s_number_employees", "s_floor_space", "s_hours", "s_manager", "s_market_id", "s_geography_class", "s_market_desc", "s_market_manager", "s_division_id", "s_division_name", "s_company_id", "s_company_name", "s_street_number", "s_street_name", "s_street_type", "s_suite_number", "s_city", "s_county", "s_state", "s_zip", "s_country", "s_gmt_offset", "s_tax_precentage");
df_store.createOrReplaceTempView("store");

val rdd_date_dim = spark.read.parquet("/tpcds/9.1T_parquet/date_dim/part*.parquet");
val df_date_dim = rdd_date_dim.toDF("d_date_sk", "d_date_id", "d_date", "d_month_seq", "d_week_seq", "d_quarter_seq", "d_year", "d_dow", "d_moy", "d_dom", "d_qoy", "d_fy_year", "d_fy_quarter_seq", "d_fy_week_seq", "d_day_name", "d_quarter_name", "d_holiday", "d_weekend", "d_following_holiday", "d_first_dom", "d_last_dom", "d_same_day_ly", "d_same_day_lq", "d_current_day", "d_current_week", "d_current_month", "d_current_quarter", "d_current_year");
df_date_dim.createOrReplaceTempView("date_dim");

val rdd_item = spark.read.parquet("/tpcds/9.1T_parquet/item/part*.parquet");
val df_item = rdd_item.toDF("i_item_sk", "i_item_id", "i_rec_start_date", "i_rec_end_date", "i_item_desc", "i_current_price", "i_wholesale_cost", "i_brand_id", "i_brand", "i_class_id", "i_class", "i_category_id", "i_category", "i_manufact_id", "i_manufact", "i_size", "i_formulation", "i_color", "i_units", "i_container", "i_manager_id", "i_product_name");
df_item.createOrReplaceTempView("item");

spark.sql("SELECT * FROM (SELECT i_category, i_class, i_brand, i_product_name, d_year, d_qoy, d_moy, s_store_id, sumsales, rank() OVER (PARTITION BY i_category ORDER BY sumsales DESC) rk FROM (SELECT i_category, i_class, i_brand, i_product_name, d_year, d_qoy, d_moy, s_store_id, sum(coalesce(ss_sales_price * ss_quantity, 0)) sumsales FROM store_sales, date_dim, store, item WHERE ss_sold_date_sk = d_date_sk AND ss_item_sk = i_item_sk AND ss_store_sk = s_store_sk AND d_month_seq BETWEEN 1200 AND 1200 + 11 GROUP BY ROLLUP (i_category, i_class, i_brand, i_product_name, d_year, d_qoy, d_moy, s_store_id)) dw1) dw2 WHERE rk <= 100 ORDER BY i_category, i_class, i_brand, i_product_name, d_year, d_qoy, d_moy, s_store_id, sumsales, rk LIMIT 100 ").show;

@chenrui17 chenrui17 added ? - Needs Triage Need team to review and classify question Further information is requested labels Jul 29, 2020
@chenrui17 chenrui17 changed the title Execution ERROR when I run TPC-DS Query-67 [QST] Execution error when I run TPC-DS Query-67 [QST] Jul 29, 2020
@jlowe
Copy link
Member

jlowe commented Jul 29, 2020

This error indicates a string column is trying to be constructed that contains more than 2GB of total data in the column across all string values. The only fix is to reduce the number of rows in the columnar batch until this limit is not exceeded.

The stacktrace shows this is occurring in a sort which requires the entire partition to fit in a single table. That means we need to reduce the amount of data being processed in this partition. I recommend increasing the value for spark.sql.shuffle.partitions to reduce the amount of data in the average task partition. If there is skew or there are very, very long string values in the dataset then this may not be sufficient to resolve the issue.

@jlowe jlowe removed the ? - Needs Triage Need team to review and classify label Jul 29, 2020
@JustPlay
Copy link

JustPlay commented Jul 30, 2020

@jlowe

“ I recommend increasing the value for spark.sql.shuffle.partitions to reduce the amount of data in the average task partition. If there is skew or there are very, very long string values in the dataset then this may not be sufficient to resolve the issue.”

for skew, does the sql.rapids.gpu_bach_size_bytes (default to 2GB) help?

@JustPlay
Copy link

@jlowe

"This error indicates a string column is trying to be constructed that contains more than 2GB of total data in the column across all string values"

another question: does the '2GB' only limit the input column size for gpu, or even limits the size of the gpu memory to save the generated output column? (e.g. when i feed 2GB data into the gpu, but the data will become 4GB after processing, does this OK or not)

@jlowe
Copy link
Member

jlowe commented Jul 30, 2020

for skew, does the sql.rapids.gpu_bach_size_bytes (default to 2GB) help?

Maybe. spark.rapids.sql.batchSizeBytes could help if the operation being performed does not need all of the partition data at once to complete the operation. For example, currently a GPU sort operation requires the sort to be performed on all of the partition data at once. Attempting to collect all of the data could exceed the 2GB limit on a string column. If the operation does not require all of the data and can be performed in batches then yes, lowering spark.rapids.sql.batchSizeBytes can help avoid exceeding the string data limit.

does the '2GB' only limit the input column size for gpu, or even limits the size of the gpu memory to save the generated output column?

Yes, the 2GB data limit on a cudf string column applies whether the column is an input or an output.

@chenrui17
Copy link
Author

@jlowe I found this query is skew , and Because some operators do not support spill, so oom is occured . do you have some suggestions to me ? like you said in another issue, I have to wait for rapids-0.2 ?
In addition,i want to ask another question about read_parquet performance , i test many querys about TPC-DS,I found that part of querys is slow than cpu spark, these querys generally do not have a large amount of shuffle data size , however , performance about querys that shuffle data size is huge is better than CPU spark. and I simpley see the DAG about query, I found that GpuScan parquet operator takes much time,As shown in the figure:
image

my question is that :

  1. The execution time of GpuScan is very long. Where is it mainly spent ?
  2. to these a little shuffle query case , I found that Gpu sm_act is always Relatively high, about to 70% ,I want to know What is the performance bottleneck of these query?I guess IO read ?

@jlowe
Copy link
Member

jlowe commented Jul 31, 2020

I found this query is skew , and Because some operators do not support spill, so oom is occured . do you have some suggestions to me ?

Unfortunately there are not a lot of great options for the skewed join scenario in the 0.1 release:

  • If either side of the join is small enough, you could try forcing Spark to turn this join into a broadcast join. If neither table is small then this too will lead to another GPU OOM because the broadcasted table will not fit.
  • You could prevent the plugin from replacing joins in the query. I assume these are all SortMergeJoinExec in the CPU plan. If so you would set spark.rapids.sql.exec.SortMergeJoinExec=false. This will likely negatively impact the plugin's performance on the query, of course, but it increases the chance of the query succeeding.
  • You could modify the query to mitigate the join skew. Given this is part of the TPC-DS benchmark, I assume that's not really an option.

The execution time of GpuScan is very long. Where is it mainly spent ?

Typically these are the places where GPU scans spend their time:

  • buffering data from the distributed filesystem into host memory
  • waiting for the GPU semaphore because other tasks are busy using the GPU
  • parsing the data on the GPU

We do have metrics to track some of these things (buffer time, for example) but I'm not seeing them on your UI screenshot. Can you post the query plan to help debug this? I'm guessing this is an artifact between what GpuBatchScanExec and GpuFileSourceScanExec are doing with respect to metrics handling. We should be showing at least the buffer time and GPU parse time there which are easy to measure.

I found that Gpu sm_act is always Relatively high, about to 70%

Parsing Parquet data is relatively expensive, even for the GPU. The unsnap kernel is one of the single most expensive kernels the plugin invokes from libcudf, so I suspect that's where most of the time is being spent on the GPU. If you want to get a lot more insight into what the GPU is doing, I highly recommend hooking up the Nsight Systems profiler to one of the executors and getting a trace. This will show you the ranges of time on a graph where the plugin is buffering data, waiting for the semaphore, and waiting for the parquet data parse to complete. See the NVTX profiling docs for more information if you're interested in pursuing that option.

@chenrui17
Copy link
Author

chenrui17 commented Aug 6, 2020

Can you post the query plan to help debug this?

thisi is tpc-ds query-2 physical plan , thanks~
Q2-queryPlan.txt

@jlowe
Copy link
Member

jlowe commented Aug 7, 2020

I filed #524 to track adding GPU metrics to GpuFileSourceScanExec.

@jlowe
Copy link
Member

jlowe commented Aug 14, 2020

The lack of GPU-level metrics in scans has been fixed, will be available in the 0.2 release. The lack of out-of-core computation is a known limitation, some initial work on that is being tracked by #19 but it will be some time before it will be available. In the meantime OOMs due to large column batches will have to be mitigated by the user by either tuning configs as discussed in our tuning guide or by modifying the query to work around severe data skew.

@chenrui17 is there anything else that needs answering for this question?

@chenrui17
Copy link
Author

The lack of GPU-level metrics in scans has been fixed, will be available in the 0.2 release. The lack of out-of-core computation is a known limitation, some initial work on that is being tracked by #19 but it will be some time before it will be available. In the meantime OOMs due to large column batches will have to be mitigated by the user by either tuning configs as discussed in our tuning guide or by modifying the query to work around severe data skew.

@chenrui17 is there anything else that needs answering for this question?

thanks a lot , no more about this

@chenrui17
Copy link
Author

chenrui17 commented Sep 1, 2020

This error indicates a string column is trying to be constructed that contains more than 2GB of total data in the column across all string values. The only fix is to reduce the number of rows in the columnar batch until this limit is not exceeded.

The stacktrace shows this is occurring in a sort which requires the entire partition to fit in a single table. That means we need to reduce the amount of data being processed in this partition. I recommend increasing the value for spark.sql.shuffle.partitions to reduce the amount of data in the average task partition. If there is skew or there are very, very long string values in the dataset then this may not be sufficient to resolve the issue.

about this query , I met this problem too on 10T data set , so I need to reopen this and for help .
befor you said : "That means we need to reduce the amount of data being processed in this partition. I recommend increasing the value for spark.sql.shuffle.partitions to reduce the amount of data in the average task partition."
so , I set spark.sql.shuffle.partitions = 3000 , but it not work, because in last stage all datas map to only 10 tasks , and I turn down the spark.rapids.sql.batchSizeBytes = 32m , but it also errors : "java.lang.IllegalStateException: A single batch is required for this operation, but cuDF only supports 2147483647 bytes in a single string column. At least 2147521504 are in a single column in this partition. Please try increasing your partition count." what should i do?

@chenrui17 chenrui17 reopened this Sep 1, 2020
@jlowe
Copy link
Member

jlowe commented Sep 1, 2020

The problem is data skew.

Note this part of the error message:

A single batch is required for this operation

That means the plugin needs to collect all of the batches in the task partition into a single batch (e.g.: before a sort or trying to setup the build-table of a hash join) but ran into the int32 string column data limit in cudf which is tracked by rapidsai/cudf#3958. Decreasing the batch size bytes setting won't help in this case because it needs to collect all of the batches together into a single batch to perform the operation.

Increasing the partition count may not help due to the severe data skew (i.e.: you mentioned only 10 out of 3000 partitions received any data).

As I mentioned in this earlier comment there aren't a lot of great options in the short-term. Long-term we know the plugin needs to support spilling to host memory and ultimately disk during sorts, joins, etc. to handle very large task partitions that can occur due to data skew.

@JustPlay
Copy link

JustPlay commented Sep 2, 2020

Yes, the 2GB data limit on a cudf string column applies whether the column is an input or an output.

Why? because cudf use int32 (or int) instead of size_t (or int64_t, uint64_t)?

@jlowe
Copy link
Member

jlowe commented Sep 2, 2020

Yes, cudf uses int32 for compatibility with Arrow and also for space savings. Using a full 64-bit offset vector for a column with short strings (e.g.: less than 8 characters) inflates the memory footprint of the column by over 2x.

Updating cudf to allow larger strings is one way to solve the problem. Another is to support chunking/spilling of sorts and joins, which currently require the operation to be performed with a single batch on the GPU. Supporting spill in these operations is not easy, but we have discussed ways to implement it.

@JustPlay
Copy link

JustPlay commented Sep 2, 2020

Does there exist a method to fallback this sort op to CPU (and other sort still on GPU)

@chenrui17
Copy link
Author

@jlowe it's said that adaptive query execution(AQE) can not only merge partitions but also split partitions, so if I turn on AQE , can it resolve my problem about tpcds-q67 ?

@jlowe
Copy link
Member

jlowe commented Sep 2, 2020

Does there exist a method to fallback this sort op to CPU (and other sort still on GPU)

No, there's currently no way to exempt individual query plan nodes from translation to the GPU without exempting all nodes of the same type. If you can think of a good way for the user to identify the instance, please feel free to file a feature request.

if I turn on AQE , can it resolve my problem about tpcds-q67 ?

Maybe? I personally haven't tried it, but yes it may resolve the issue since this appears to be a case of severe skew. We don't support AQE in the 0.1 release, but you can run with Spark 3.0.1, cudf-0.15-SNAPSHOT, and a build of the latest plugin code on branch-0.2 (i.e.: 0.2.0-SNAPSHOT) to try it out.

@JustPlay
Copy link

JustPlay commented Sep 5, 2020

The problem is data skew.

Note this part of the error message:

A single batch is required for this operation

That means the plugin needs to collect all of the batches in the task partition into a single batch (e.g.: before a sort or trying to setup the build-table of a hash join) but ran into the int32 string column data limit in cudf which is tracked by rapidsai/cudf#3958. Decreasing the batch size bytes setting won't help in this case because it needs to collect all of the batches together into a single batch to perform the operation.

Increasing the partition count may not help due to the severe data skew (i.e.: you mentioned only 10 out of 3000 partitions received any data).

As I mentioned in this earlier comment there aren't a lot of great options in the short-term. Long-term we know the plugin needs to support spilling to host memory and ultimately disk during sorts, joins, etc. to handle very large task partitions that can occur due to data skew.

I think it is not only a skew, it a more strange problem

we have set shuffle_partition=64 and num_executors=8, and we can see there exists 64 tasks in stage6, but stage 6 only use 4 executor not 8, so does stage 7
e.g. stage6 use executor 0-3 on node0, stage 6 (retry1) use executor 4-7 on node1

pic for all stage
image

pic for stage6, all task use executor 0-3
image
image
image

the shuffle write by the upstream stage is around 90GB

pic for executors
image

Other info
the A single batch is required for this operation related error occured in stage-11 according to pic provided by chenrui17 above

Thanks @jlowe

@JustPlay
Copy link

JustPlay commented Sep 5, 2020

what is the partiton method when doing the shuffle partiton,can we use round-robin partition method (or any other method) to generate more small patition?

So, I want to know, which file, which class, which function need to be changed (we are using spark-3.0.1-rc3)? I need some help.
@jlowe

@jlowe
Copy link
Member

jlowe commented Oct 21, 2020

I apologies for the late response, this was somehow dropped.

we have set shuffle_partition=64 and num_executors=8, and we can see there exists 64 tasks in stage6, but stage 6 only use 4 executor not 8, so does stage 7
e.g. stage6 use executor 0-3 on node0, stage 6 (retry1) use executor 4-7 on node1

This behavior should be unrelated to the reported execution error. Which executor a task runs on does not impact how much data that task receives as input. If it did, task retries would not be possible once a node went down. This odd scheduling behavior is likely caused by Spark trying to schedule for shuffle locality. Please verify you have spark.locality.wait=0s and spark.shuffle.reduceLocality.enabled=false so Spark doesn't try to wait for locality and spreads tasks across executors.

what is the partiton method when doing the shuffle partiton,can we use round-robin partition method (or any other method) to generate more small patition?

Shuffle partitioning can be performed with a number of partitioner algorithms -- it depends upon the nature of why the data is being shuffled. For example, for a join or hash aggregate, the data is hash partitioned on the join or groupby keys so that all the values associated with a particular key end up in the same task so it's able to properly perform the join/aggregate. That's what can lead to the skew issue I mentioned earlier, as datasets with a large disparity in the cardinality of values associated with keys can end up with just a few tasks receiving almost all of the data, because the data is associated with very few join or groupby keys.

We can't simply change the hash partition for the shuffle into a round-robin partition, as that would break the semantics of the join or groupby operation. For example, if a task received only some of the values for key K in the right table of a left join then it cannot properly generate the output of the join because another task received the other values for K in the right table but not the other left table values for K.

@revans2
Copy link
Collaborator

revans2 commented Oct 22, 2020

We can't simply change the hash partition for the shuffle into a round-robin partition

That is true but AQE does offer a feature where you can detect a skewed join and convert the skewed portion of it into broadcast join. @andygrove do you know if we have done any tests with this? Looking at the code it appears that it is explicitly looking for a SortMergeJoin so it might not work for us out of the box.

@andygrove
Copy link
Contributor

@revans2 AQE will be looking at the part of the plan that has not yet been converted to GPU, so this should work for us. We do have unit tests for this scenario so I believe it is working. I haven't yet analyzed the TPC-DS query plans to confirm if we are seeing this behavior in practice though.

@andygrove
Copy link
Contributor

I just finished comparing results from TPC-DS benchmarks comparing AQE off versus on and I do consistently see GpuShuffledHashJoin operators being replaced with GpuBroadcastHashJoin when AQE is enabled.

@revans2
Copy link
Collaborator

revans2 commented Oct 23, 2020

Thanks @andygrove then it looks like if we want to run this at scale we have to get out of core joins working properly. Thinking about it more because it is requesting a single batch it has to be a join on the build side, so we might need to create a sort merge join implementation, but even then if a single join key is too large we might not be able to make it work without some big changes to cudf.

@sameerz
Copy link
Collaborator

sameerz commented Feb 18, 2021

The work to address this question is in issue #1642

@revans2
Copy link
Collaborator

revans2 commented May 18, 2021

I believe that this should be fixed now in the current SNAPSHOT branch please reopen if you run into more issues.

@revans2 revans2 closed this as completed May 18, 2021
pxLi pushed a commit to pxLi/spark-rapids that referenced this issue May 12, 2022
Add API docs

Add overwrite

Python module of api-docs
tgravescs pushed a commit to tgravescs/spark-rapids that referenced this issue Nov 30, 2023
Signed-off-by: spark-rapids automation <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

6 participants