Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support sort merge join with a join condition #553

Merged
merged 19 commits into from
Aug 30, 2024

Conversation

viirya
Copy link
Member

@viirya viirya commented Jun 11, 2024

Which issue does this PR close?

Closes #398.

Rationale for this change

What changes are included in this PR?

How are these changes tested?

@viirya viirya marked this pull request as draft June 11, 2024 07:31
@comphead
Copy link
Contributor

Some tests doesn't pass, its good to check them in DF, SMJ joined filter is still in progress

@viirya
Copy link
Member Author

viirya commented Jun 11, 2024

Yea, that's why I marked this as a draft PR now.

@comphead
Copy link
Contributor

just checked DF on one of the failed queries

#[tokio::test]
    async fn test_smj_left_filtered() -> Result<()> {
        let ctx: SessionContext = SessionContext::new();

        let sql = "set datafusion.optimizer.prefer_hash_join = false;";
        let _ = ctx.sql(sql).await?.collect().await?;

        let sql = "set datafusion.execution.batch_size = 100";
        let _ = ctx.sql(sql).await?.collect().await?;

        let sql = "
            select * from (
            with t as (
                select id, id % 5 id1 from (select unnest(range(0,10)) id)
            ), t1 as (
                select id % 10 id, id + 2 id1 from (select unnest(range(0,10)) id)
            )
            select * from  t left join  t1 on t.id1 = t1.id and t.id > t1.id1
            ) order by 1, 2, 3, 4
        ";

        let actual = ctx.sql(sql).await?.collect().await?;

        let expected: Vec<&str> = vec![
            "+----+-----+----+-----+",
            "| id | id1 | id | id1 |",
            "+----+-----+----+-----+",
            "| 0  | 0   |    |     |",
            "| 1  | 1   |    |     |",
            "| 2  | 2   |    |     |",
            "| 3  | 3   |    |     |",
            "| 4  | 4   |    |     |",
            "| 5  | 0   | 0  | 2   |",
            "| 6  | 1   | 1  | 3   |",
            "| 7  | 2   | 2  | 4   |",
            "| 8  | 3   | 3  | 5   |",
            "| 9  | 4   | 4  | 6   |",
            "+----+-----+----+-----+",
        ];
        datafusion_common::assert_batches_eq!(expected, &actual);

        Ok(())
    }

it passes

@comphead
Copy link
Contributor

looks like Comet produces duplicates

@comphead
Copy link
Contributor

Right join fails in DF

#[tokio::test]
    async fn test_smj_left_filtered() -> Result<()> {
        let ctx: SessionContext = SessionContext::new();

        let sql = "set datafusion.optimizer.prefer_hash_join = false;";
        let _ = ctx.sql(sql).await?.collect().await?;

        let sql = "set datafusion.execution.batch_size = 100";
        let _ = ctx.sql(sql).await?.collect().await?;

        let sql = "
            select * from (
            with t as (
                select id, id % 5 id1 from (select unnest(range(0,10)) id)
            ), t1 as (
                select id % 10 id, id + 2 id1 from (select unnest(range(0,10)) id)
            )
            select * from  t right join  t1 on t.id1 = t1.id and t.id > t1.id1
            ) order by 1, 2, 3, 4
        ";

        let actual = ctx.sql(sql).await?.collect().await?;

        let expected: Vec<&str> = vec![
            "+----+-----+----+-----+",
            "| id | id1 | id | id1 |",
            "+----+-----+----+-----+",
            "| 5  | 0   | 0  | 2   |",
            "| 6  | 1   | 1  | 3   |",
            "| 7  | 2   | 2  | 4   |",
            "| 8  | 3   | 3  | 5   |",
            "| 9  | 4   | 4  | 6   |",
            "|    |     | 5  | 7   |",
            "|    |     | 6  | 8   |",
            "|    |     | 7  | 9   |",
            "|    |     | 8  | 10  |",
            "|    |     | 9  | 11  |",
            "+----+-----+----+-----+",
        ];
        datafusion_common::assert_batches_eq!(expected, &actual);

        Ok(())
    }

@comphead
Copy link
Contributor

Filed apache/datafusion#10882

datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", rev = "41.0.0-rc1", default-features = false }
datafusion-physical-expr-common = { git = "https://github.com/apache/datafusion.git", rev = "41.0.0-rc1", default-features = false }
datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", rev = "41.0.0-rc1", default-features = false }
datafusion-common = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "f98693e" }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the commit of the PR apache/datafusion#12090. When the PR is merged, we can change back to DataFusion repo.

Comment on lines 1272 to 1277
AggregateExprBuilder::new(sum_udaf(), vec![child])
.schema(schema)
.alias("count")
.with_ignore_nulls(false)
.with_distinct(false)
.build().map_err(|e| ExecutionError::DataFusionError(e.to_string()))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, DataFusion API changes.

@@ -126,7 +126,7 @@ impl FilterExec {
let schema = input.schema();
if !check_support(predicate, &schema) {
let selectivity = default_selectivity as f64 / 100.0;
let mut stats = input_stats.into_inexact();
let mut stats = input_stats.to_inexact();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataFusion API change.

@viirya viirya marked this pull request as ready for review August 21, 2024 06:23
@viirya viirya force-pushed the smj_filter branch 2 times, most recently from c27f1f6 to bb7586f Compare August 21, 2024 06:55
val left = sql("SELECT * FROM tbl_a")
val right = sql("SELECT * FROM tbl_b")

val df8 =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we can also use SQL for Anti, Semi joins?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to SQL syntax.

@@ -2860,7 +2860,11 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
case RightOuter => JoinType.RightOuter
case FullOuter => JoinType.FullOuter
case LeftSemi => JoinType.LeftSemi
case LeftAnti => JoinType.LeftAnti
// TODO: DF SMJ with join condition fails TPCH q21
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me have a look on q21. I remember Anti join had issues with TPCH in DF but it was fixed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it might be also related to apache/datafusion#11555

@codecov-commenter
Copy link

codecov-commenter commented Aug 21, 2024

Codecov Report

Attention: Patch coverage is 36.36364% with 7 lines in your changes missing coverage. Please review.

Project coverage is 34.20%. Comparing base (9d8730d) to head (b9cd9da).
Report is 6 commits behind head on main.

Files Patch % Lines
.../scala/org/apache/comet/serde/QueryPlanSerde.scala 36.36% 5 Missing and 2 partials ⚠️
Additional details and impacted files
@@              Coverage Diff              @@
##               main     #553       +/-   ##
=============================================
- Coverage     55.16%   34.20%   -20.96%     
- Complexity      857      888       +31     
=============================================
  Files           109      112        +3     
  Lines         10542    43071    +32529     
  Branches       2010     9509     +7499     
=============================================
+ Hits           5815    14733     +8918     
- Misses         3714    25352    +21638     
- Partials       1013     2986     +1973     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

}
}

test("full outer join") {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test fails currently. It needs the fix at DataFusion apache/datafusion#12159

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix was merged at DataFusion. I updated Comet to use the latest commit.

@@ -75,7 +75,6 @@ abstract class CometTestBase
conf.set(MEMORY_OFFHEAP_SIZE.key, "2g")
conf.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "1g")
conf.set(SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key, "1g")
conf.set(SQLConf.COALESCE_PARTITIONS_ENABLED.key, "false")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to set SQLConf.COALESCE_PARTITIONS_ENABLED.key false now.

We also need to remove this to trigger test failure https://github.com/apache/datafusion-comet/pull/553/files#r1730694210.

datafusion-physical-plan = { version = "41.0.0", default-features = false }
datafusion-physical-expr-common = { version = "41.0.0", default-features = false }
datafusion-physical-expr = { version = "41.0.0", default-features = false }
datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "dff590b" }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to latest DataFusion to use the two commits including bug fixes.

@@ -142,6 +142,22 @@ impl AggregateExpr for AvgDecimal {
),
}
}

fn default_value(&self, _data_type: &DataType) -> Result<ScalarValue> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New DataFusion API required for AggregateExpr trait.

withInfo(join, cond)
return None
}
condProto.get
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any scenario of None.get?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (condProto.isEmpty) {
  withInfo(join, cond)
   return None
}

If it is None, it will return None earlier.

checkSparkAnswerAndOperator(df7)

val df8 = sql(
"SELECT * FROM tbl_a LEFT SEMI JOIN tbl_b ON tbl_a._2 = tbl_b._1 " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a RIGHT SEMI in Spark? afair there is still no proper support in DF for RightSemi

apache/datafusion#9846

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No

val left = UnresolvedRelation(TableIdentifier("left"))
val right = UnresolvedRelation(TableIdentifier("right"))

checkSparkAnswer(left.join(right, $"left.N" === $"right.N", "full"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to rephrase it in SQL as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is copied from Spark. I think it is good to keep it as the same.

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm thanks @viirya

checkSparkAnswerAndOperator(df9)

// TODO: Enable these tests after fixing the issue:
// https://github.com/apache/datafusion-comet/issues/861
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably can create a separate github ticket on this to not forget enabling tests

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #891

@viirya
Copy link
Member Author

viirya commented Aug 29, 2024

Thanks @comphead

@viirya
Copy link
Member Author

viirya commented Aug 29, 2024

I need to update plan stability results...

@viirya viirya merged commit e57ead4 into apache:main Aug 30, 2024
74 checks passed
@viirya viirya deleted the smj_filter branch August 30, 2024 00:22
himadripal pushed a commit to himadripal/datafusion-comet that referenced this pull request Sep 7, 2024
* Init

* test

* test

* test

* Use specified commit to test

* Fix format

* fix clippy

* fix

* fix

* Fix

* Change to SQL syntax

* Disable SMJ LeftAnti with join filter

* Fix

* Add test

* Add test

* Update to last DataFusion commit

* fix format

* fix

* Update diffs

(cherry picked from commit e57ead4)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support sort merge join with a join condition
3 participants