Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Enable Comet shuffle with AQE coalesce partitions #834

Merged
merged 34 commits into from
Aug 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
3f9bdb5
chore: Remove COMET_SHUFFLE_ENFORCE_MODE_ENABLED
viirya Jul 10, 2024
246929c
Update plan stability
viirya Jul 18, 2024
95dcdd3
Fix
viirya Jul 24, 2024
2d5a0dc
Fix
viirya Jul 24, 2024
c14037b
Fix
viirya Jul 25, 2024
50361ba
Fix
viirya Jul 29, 2024
e69006b
Fix
viirya Jul 29, 2024
08ddb1b
Fix
viirya Jul 29, 2024
61ac49a
Fix
viirya Jul 30, 2024
1ff2b5d
Fix
viirya Jul 30, 2024
d6fa16b
Fix
viirya Jul 30, 2024
59859c2
Fix
viirya Jul 31, 2024
f2a212e
Fix
viirya Aug 2, 2024
b49ee0f
Remove test
viirya Aug 2, 2024
513a34f
Update
viirya Aug 4, 2024
59a28aa
Use same allocator
viirya Aug 5, 2024
ab457c5
test
viirya Aug 6, 2024
b143e79
Add synchronized
viirya Aug 7, 2024
bb06edb
test
viirya Aug 11, 2024
a0a421c
Revert "test"
viirya Aug 11, 2024
e65a43c
Revert "Add synchronized"
viirya Aug 11, 2024
c801bc2
Fix
viirya Aug 12, 2024
9edc71e
fix
viirya Aug 12, 2024
32f976c
Update diffs
viirya Aug 13, 2024
9e34e82
Update diffs
viirya Aug 13, 2024
0055a09
Add CometColumnarBatch
viirya Aug 14, 2024
0e01818
Change to ubuntu-20.04.
viirya Aug 15, 2024
439cdd0
Change to macos-latest
viirya Aug 15, 2024
65dcc41
Change to ubuntu-24.04
viirya Aug 15, 2024
39a79a3
update 3.4.3..diff
viirya Aug 15, 2024
95615e1
Update to ubuntu-24.04 for Spark 4.0.0 pipeline
viirya Aug 15, 2024
0850404
Revert some changes
viirya Aug 16, 2024
0507dad
Disable Comet shuffle for Spark SQL core-1 test on Spark 3.5 and 4.0.0
viirya Aug 16, 2024
63f58c5
Disable Comet shuffle for Spark SQL core-1 on Spark 3.4.3 too.
viirya Aug 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
4 changes: 2 additions & 2 deletions .github/workflows/spark_sql_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
spark-sql-catalyst:
strategy:
matrix:
os: [ubuntu-latest]
os: [ubuntu-24.04]
java-version: [11]
spark-version: [{short: '3.4', full: '3.4.3'}, {short: '3.5', full: '3.5.1'}]
module:
Expand Down Expand Up @@ -76,7 +76,7 @@ jobs:
run: |
cd apache-spark
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
ENABLE_COMET=true build/sbt ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
ENABLE_COMET=true ENABLE_COMET_SHUFFLE=${{ matrix.module.name == 'sql/core-1' && 'false' || 'true' }} build/sbt ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
env:
LC_ALL: "C.UTF-8"

4 changes: 2 additions & 2 deletions .github/workflows/spark_sql_test_ansi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
spark-sql-catalyst:
strategy:
matrix:
os: [ubuntu-latest]
os: [ubuntu-24.04]
java-version: [17]
spark-version: [{short: '4.0', full: '4.0.0-preview1'}]
module:
Expand Down Expand Up @@ -74,7 +74,7 @@ jobs:
run: |
cd apache-spark
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
RUST_BACKTRACE=1 ENABLE_COMET=true ENABLE_COMET_ANSI_MODE=true build/sbt ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
RUST_BACKTRACE=1 ENABLE_COMET=true ENABLE_COMET_ANSI_MODE=true ENABLE_COMET_SHUFFLE=${{ matrix.module.name == 'sql/core-1' && 'false' || 'true' }} build/sbt ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
env:
LC_ALL: "C.UTF-8"

11 changes: 0 additions & 11 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -204,17 +204,6 @@ object CometConf extends ShimCometConf {
.checkValues(Set("native", "jvm", "auto"))
.createWithDefault("jvm")

val COMET_SHUFFLE_ENFORCE_MODE_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.shuffle.enforceMode.enabled")
.doc(
"Comet shuffle doesn't support Spark AQE coalesce partitions. If AQE coalesce " +
"partitions is enabled, Comet shuffle won't be triggered even enabled. This config " +
"is used to enforce Comet to trigger shuffle even if AQE coalesce partitions is " +
"enabled. This is for testing purpose only.")
.internal()
.booleanConf
.createWithDefault(false)

val COMET_EXEC_BROADCAST_FORCE_ENABLED: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.broadcast.enabled")
.doc(
Expand Down
426 changes: 338 additions & 88 deletions dev/diffs/3.4.3.diff

Large diffs are not rendered by default.

413 changes: 339 additions & 74 deletions dev/diffs/3.5.1.diff

Large diffs are not rendered by default.

774 changes: 691 additions & 83 deletions dev/diffs/4.0.0-preview1.diff

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion docs/source/contributor-guide/benchmarking.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ $SPARK_HOME/bin/spark-submit \
--conf spark.comet.cast.allowIncompatible=true \
--conf spark.comet.exec.shuffle.enabled=true \
--conf spark.comet.exec.shuffle.mode=auto \
--conf spark.comet.shuffle.enforceMode.enabled=true \
--conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
tpcbench.py \
--benchmark tpch \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1069,21 +1069,13 @@ object CometSparkSessionExtensions extends Logging {
}

private[comet] def isCometShuffleEnabled(conf: SQLConf): Boolean =
COMET_EXEC_SHUFFLE_ENABLED.get(conf) && isCometShuffleManagerEnabled(conf) &&
// TODO: AQE coalesce partitions feature causes Comet shuffle memory leak.
// We should disable Comet shuffle when AQE coalesce partitions is enabled.
(!conf.coalesceShufflePartitionsEnabled || COMET_SHUFFLE_ENFORCE_MODE_ENABLED.get())
COMET_EXEC_SHUFFLE_ENABLED.get(conf) && isCometShuffleManagerEnabled(conf)

private[comet] def getCometShuffleNotEnabledReason(conf: SQLConf): Option[String] = {
if (!COMET_EXEC_SHUFFLE_ENABLED.get(conf)) {
Some(s"${COMET_EXEC_SHUFFLE_ENABLED.key} is not enabled")
} else if (!isCometShuffleManagerEnabled(conf)) {
Some(s"spark.shuffle.manager is not set to ${CometShuffleManager.getClass.getName}")
} else if (conf.coalesceShufflePartitionsEnabled && !COMET_SHUFFLE_ENFORCE_MODE_ENABLED
.get()) {
Some(
s"${SQLConf.COALESCE_PARTITIONS_ENABLED.key} is enabled and " +
s"${COMET_SHUFFLE_ENFORCE_MODE_ENABLED.key} is not enabled")
} else {
None
}
Expand Down
Loading
Loading