Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support sort merge join with a join condition #553

Merged
merged 19 commits into from
Aug 30, 2024
19 changes: 15 additions & 4 deletions dev/diffs/3.4.3.diff
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
diff --git a/pom.xml b/pom.xml
index d3544881af1..47382e29b5a 100644
index d3544881af1..bf0e2b53c70 100644
--- a/pom.xml
+++ b/pom.xml
@@ -148,6 +148,8 @@
Expand Down Expand Up @@ -679,7 +679,7 @@ index 1792b4c32eb..1616e6f39bd 100644
assert(shuffleMergeJoins.size == 1)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 7f062bfb899..3acf697df9a 100644
index 7f062bfb899..b347ef905d2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -30,7 +30,8 @@ import org.apache.spark.sql.catalyst.TableIdentifier
Expand Down Expand Up @@ -863,7 +863,18 @@ index 7f062bfb899..3acf697df9a 100644
}.size == 1)
}
}
@@ -1489,7 +1517,10 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1482,14 +1510,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan

test("SPARK-43113: Full outer join with duplicate stream-side references in condition (SMJ)") {
def check(plan: SparkPlan): Unit = {
- assert(collect(plan) { case _: SortMergeJoinExec => true }.size === 1)
+ assert(collect(plan) {
+ case _: SortMergeJoinExec => true
+ case _: CometSortMergeJoinExec => true
+ }.size === 1)
}
dupStreamSideColTest("MERGE", check)
}

test("SPARK-43113: Full outer join with duplicate stream-side references in condition (SHJ)") {
def check(plan: SparkPlan): Unit = {
Expand All @@ -875,7 +886,7 @@ index 7f062bfb899..3acf697df9a 100644
}
dupStreamSideColTest("SHUFFLE_HASH", check)
}
@@ -1605,7 +1636,8 @@ class ThreadLeakInSortMergeJoinSuite
@@ -1605,7 +1639,8 @@ class ThreadLeakInSortMergeJoinSuite
sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
}

Expand Down
17 changes: 14 additions & 3 deletions dev/diffs/3.5.1.diff
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
diff --git a/pom.xml b/pom.xml
index 0f504dbee85..71fd49a3744 100644
index 0f504dbee85..f6019da888a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -152,6 +152,8 @@
Expand Down Expand Up @@ -658,7 +658,7 @@ index 7af826583bd..3c3def1eb67 100644
assert(shuffleMergeJoins.size == 1)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 9dcf7ec2904..94a171d1aad 100644
index 9dcf7ec2904..d8b014a4eb8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -30,7 +30,8 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
Expand Down Expand Up @@ -865,7 +865,18 @@ index 9dcf7ec2904..94a171d1aad 100644
}.size == 1)
}
}
@@ -1633,7 +1664,10 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1626,14 +1657,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan

test("SPARK-43113: Full outer join with duplicate stream-side references in condition (SMJ)") {
def check(plan: SparkPlan): Unit = {
- assert(collect(plan) { case _: SortMergeJoinExec => true }.size === 1)
+ assert(collect(plan) {
+ case _: SortMergeJoinExec => true
+ case _: CometSortMergeJoinExec => true
+ }.size === 1)
}
dupStreamSideColTest("MERGE", check)
}

test("SPARK-43113: Full outer join with duplicate stream-side references in condition (SHJ)") {
def check(plan: SparkPlan): Unit = {
Expand Down
19 changes: 15 additions & 4 deletions dev/diffs/4.0.0-preview1.diff
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
diff --git a/pom.xml b/pom.xml
index a4b1b2c3c9f..a2315d2a95b 100644
index a4b1b2c3c9f..db50bdb0d3b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -147,6 +147,8 @@
Expand Down Expand Up @@ -756,7 +756,7 @@ index 53e47f428c3..a55d8f0c161 100644
assert(shuffleMergeJoins.size == 1)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index fcb937d82ba..df79db88fed 100644
index fcb937d82ba..fafe8e8d08b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -29,7 +29,8 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
Expand Down Expand Up @@ -963,7 +963,18 @@ index fcb937d82ba..df79db88fed 100644
}.size == 1)
}
}
@@ -1637,7 +1668,10 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1630,14 +1661,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan

test("SPARK-43113: Full outer join with duplicate stream-side references in condition (SMJ)") {
def check(plan: SparkPlan): Unit = {
- assert(collect(plan) { case _: SortMergeJoinExec => true }.size === 1)
+ assert(collect(plan) {
+ case _: SortMergeJoinExec => true
+ case _: CometSortMergeJoinExec => true
+ }.size === 1)
}
dupStreamSideColTest("MERGE", check)
}

test("SPARK-43113: Full outer join with duplicate stream-side references in condition (SHJ)") {
def check(plan: SparkPlan): Unit = {
Expand All @@ -975,7 +986,7 @@ index fcb937d82ba..df79db88fed 100644
}
dupStreamSideColTest("SHUFFLE_HASH", check)
}
@@ -1773,7 +1807,8 @@ class ThreadLeakInSortMergeJoinSuite
@@ -1773,7 +1810,8 @@ class ThreadLeakInSortMergeJoinSuite
sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
}

Expand Down
Loading
Loading