Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Execution error when I run TPC-DS query-82 [QST] #459

Closed
chenrui17 opened this issue Jul 29, 2020 · 6 comments
Closed

Execution error when I run TPC-DS query-82 [QST] #459

chenrui17 opened this issue Jul 29, 2020 · 6 comments
Labels
question Further information is requested

Comments

@chenrui17
Copy link

below is 2 error logs of 2 node:
image
image

I attempt to increase spark.sql.shuffle.partitions, up to 8000+ , it also occurs this error, I don't know why ,it seems not a out of memory problem ,because I found that gpu memory usage is low by dcgm.

I attemp to set the concurrentTask = 1 , it not work

I attemp to set the rapids_batch_size_bytes = 1024G, it not work

my configuration is :
spark-shell
--master yarn
--num-executors 3
--conf spark.plugins=com.nvidia.spark.SQLPlugin
--conf spark.executor.cores=48
--conf spark.executor.memory=144g
--conf spark.rapids.sql.concurrentGpuTasks=1
--conf spark.rapids.memory.pinnedPool.size=30G
--conf spark.executor.memoryOverhead=30g
--conf spark.sql.broadcastTimeout=1500
--conf spark.locality.wait=0s
--conf spark.sql.files.maxPartitionBytes=1024m
--conf spark.rapids.sql.explain=ALL
--conf spark.rapids.sql.castFloatToString.enabled=true
--conf spark.sql.shuffle.partitions=8640
--conf spark.rapids.sql.variableFloatAgg.enabled=true
--conf spark.sql.optimizer.inSetConversionThreshold=1000
--conf spark.rapids.memory.gpu.pooling.enabled=false
--conf spark.executor.resource.gpu.amount=1
--conf spark.task.resource.gpu.amount=0.020833
--conf spark.executor.resource.gpu.discoveryScript=/home/hduser/nvidia-spark3.0-rapids/getGpusResources.sh
--files /home/hduser/spark-3.0.1-SNAPSHOT-bin-hadoop3/conf/executor_log4j.properties
--jars ${SPARK_CUDF_JAR},${SPARK_RAPIDS_PLUGIN_JAR} \

TPC-DS query-82 is :
val rdd_store_sales = spark.read.parquet("/tpcds/9.1T_parquet/store_sales/part*.parquet");
val df_store_sales = rdd_store_sales.toDF("ss_sold_date_sk", "ss_sold_time_sk", "ss_item_sk", "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", "ss_addr_sk", "ss_store_sk", "ss_promo_sk", "ss_ticket_number", "ss_quantity", "ss_wholesale_cost", "ss_list_price", "ss_sales_price", "ss_ext_discount_amt", "ss_ext_sales_price", "ss_ext_wholesale_cost", "ss_ext_list_price", "ss_ext_tax", "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax", "ss_net_profit");
df_store_sales.createOrReplaceTempView("store_sales");

val rdd_Inventory = spark.read.parquet("/tpcds/9.1T_parquet/inventory/part*.parquet");
val df_Inventory = rdd_Inventory.toDF("inv_date_sk","inv_item_sk","inv_warehouse_sk","inv_quantity_on_hand");
df_Inventory.createOrReplaceTempView("inventory");

val rdd_date_dim = spark.read.parquet("/tpcds/9.1T_parquet/date_dim/part*.parquet");
val df_date_dim = rdd_date_dim.toDF("d_date_sk", "d_date_id", "d_date", "d_month_seq", "d_week_seq", "d_quarter_seq", "d_year", "d_dow", "d_moy", "d_dom", "d_qoy", "d_fy_year", "d_fy_quarter_seq", "d_fy_week_seq", "d_day_name", "d_quarter_name", "d_holiday", "d_weekend", "d_following_holiday", "d_first_dom", "d_last_dom", "d_same_day_ly", "d_same_day_lq", "d_current_day", "d_current_week", "d_current_month", "d_current_quarter", "d_current_year");
df_date_dim.createOrReplaceTempView("date_dim");

val rdd_item = spark.read.parquet("/tpcds/9.1T_parquet/item/part*.parquet");
val df_item = rdd_item.toDF("i_item_sk", "i_item_id", "i_rec_start_date", "i_rec_end_date", "i_item_desc", "i_current_price", "i_wholesale_cost", "i_brand_id", "i_brand", "i_class_id", "i_class", "i_category_id", "i_category", "i_manufact_id", "i_manufact", "i_size", "i_formulation", "i_color", "i_units", "i_container", "i_manager_id", "i_product_name");
df_item.createOrReplaceTempView("item");

spark.sql("SELECT i_item_id, i_item_desc, i_current_price FROM item, inventory, date_dim, store_sales WHERE i_current_price BETWEEN 62 AND 62 + 30 AND inv_item_sk = i_item_sk AND d_date_sk = inv_date_sk AND d_date BETWEEN cast('2000-05-25' AS DATE) AND (cast('2000-05-25' AS DATE) + INTERVAL 60 days) AND i_manufact_id IN (129, 270, 821, 423) AND inv_quantity_on_hand BETWEEN 100 AND 500 AND ss_item_sk = i_item_sk GROUP BY i_item_id, i_item_desc, i_current_price ORDER BY i_item_id LIMIT 100 ").show;

@chenrui17 chenrui17 added ? - Needs Triage Need team to review and classify question Further information is requested labels Jul 29, 2020
@chenrui17
Copy link
Author

by the way, I want to know can current version run all querys of TPC-DS successfuly ?

@jlowe
Copy link
Contributor

jlowe commented Jul 29, 2020

it seems not a out of memory problem ,because I found that gpu memory usage is low by dcgm.

I suspect one of the joins may be exploding, and that can cause an OOM error even when the GPU memory utilization is low because it would be trying to allocate a very large amount of GPU memory all at once.

Normally increasing the partitions would help alleviate this type of problem, but there may be skew in the data where the join is producing a lot of results only for very specific keys. Since the same keys will all hash to the same task, increasing the shuffle partitions won't help in that case. Do any tasks in this stage complete or do they all fail with OOM? If the former, do you have any stats from the Spark metrics on how much input shuffle data the task that failed received vs. tasks that succeeded? That could indicate whether there's a join key skew issue.

I want to know can current version run all querys of TPC-DS successfuly ?

We have not explicitly tested TPC-DS. They all should work at least up to a certain scale factor. Once the scale factor gets high there can be GPU OOM issues due to skew or insufficient partitioning. Spark CPU has the same skew issues, but data is spilled from host memory to disk in that case to handle it. We currently do not support spilling sort/join/groupby inputs to host memory and/or disk if a particular column batch input and output for an operation does not fit in GPU memory.

Another limitation is the 2GB data limit for all the data in an individual column of a columnar batch. This can become a factor for string columns. If the data has a column with very long string values it can aggravate this issue by pushing the total amount of string data in the column across this limit as you encountered in #458. This is a current limitation in cudf, and there has been discussions on how to fix this, see rapidsai/cudf#3958.

@jlowe jlowe removed the ? - Needs Triage Need team to review and classify label Jul 29, 2020
@JustPlay
Copy link

@jlowe

" Since the same keys will all hash to the same task, increasing the shuffle partitions won't help in that case"
in this case, do there exist a way in spark/rapids to fix this prob?

"We currently do not support spilling sort/join/groupby inputs to host memory and/or disk if a particular column batch input and output for an operation does not fit in GPU memory"
when rapids/cudf will supprot this feature?

@jlowe
Copy link
Contributor

jlowe commented Jul 30, 2020

in this case, do there exist a way in spark/rapids to fix this prob?

The skew problem could be addressed in a few ways:

  • Using Adaptive Query Execution may help here, since skewed joins are one of the query problems targeted by AQE. Currently the plugin is not fully compatible with AQE but we are actively working on resolving those issues and plan on supporting it in the 0.2 release.
  • The user can manually refactor their query to avoid the skewed join. Obviously this is not ideal for the end user.
  • The plugin adds support for spilling GPU memory to host and/or disk to try to avoid OOM situations. This is what Spark CPU is doing to avoid the issue. Query performance suffers badly but at least it doesn't crash.

when rapids/cudf will supprot this feature?

I don't have an answer. Spilling to host memory or disk is a non-trivial task. We currently support spilling shuffle inputs and outputs in the UCX-accelerated shuffle manager because we aggressively cache shuffle outputs in GPU memory to help increase shuffle performance when using GPUs. However this doesn't apply to individual operators like join, groupby, sort, etc. In some cases supporting spill will need to be addressed by the type of operation. For example, supporting spills during sort would require splitting the input into batches, sorting the batches in GPU memory then spilling the output to host memory or disk, then finally performing a streaming merge sort of the sorted batches to produce the final output. Supporting spill during some operations will require additional features in cudf.

The good news is the basic framework for migrating GPU buffers between host memory and disk is there as part of adding support for it during UCX-accelerated shuffle. We should be able to leverage it during these operations to help them handle memory pressure more gracefully. It is likely that some operators will support spilling before others, e.g.: I wouldn't be surprised if we support spill during a join before spill during a sort.

@jlowe
Copy link
Contributor

jlowe commented Aug 14, 2020

@chenrui17 @JustPlay are there any unresolved questions from this issue or can it be closed?

@chenrui17
Copy link
Author

@chenrui17 @JustPlay are there any unresolved questions from this issue or can it be closed?

ok ,I will close this issue

pxLi pushed a commit to pxLi/spark-rapids that referenced this issue May 12, 2022
tgravescs pushed a commit to tgravescs/spark-rapids that referenced this issue Nov 30, 2023
[auto-merge] bot-auto-merge-branch-22.08 to branch-22.10 [skip ci] [bot]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants