Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] connect: df.join #3354

Draft
wants to merge 1 commit into
base: andrew/connect-with-columns-renamed
Choose a base branch
from

Conversation

andrewgazelka
Copy link
Member

@andrewgazelka andrewgazelka commented Nov 20, 2024

  • fix
E           pyspark.errors.exceptions.connect.SparkConnectGrpcException: <_MultiThreadedRendezvous of RPC that terminated with:
E               status = StatusCode.INTERNAL
E               details = "Error in Daft server: Not Yet Implemented: Joins without join conditions (cross join) are not supported yet
E           
E           Location:
E               src/daft-connect/src/op/execute/root.rs:37:39"
E               debug_error_string = "UNKNOWN:Error received from peer ipv4:127.0.0.1:55241 {created_time:"2024-11-21T09:18:54.065079-08:00", grpc_status:13, grpc_message:"Error in Daft server: Not Yet Implemented: Joins without join conditions (cross join) are not supported yet\n\nLocation:\n    src/daft-connect/src/op/execute/root.rs:37:39"}"
E           >

@andrewgazelka andrewgazelka marked this pull request as ready for review November 20, 2024 09:12
Copy link
Member Author

andrewgazelka commented Nov 20, 2024

Warning

This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
Learn more

This stack of pull requests is managed by Graphite. Learn more about stacking.

@andrewgazelka andrewgazelka changed the base branch from andrew/connect-alias to andrew/connect-refactor-to-ref November 20, 2024 09:25
@andrewgazelka andrewgazelka changed the base branch from andrew/connect-refactor-to-ref to graphite-base/3354 November 21, 2024 05:11
@andrewgazelka andrewgazelka changed the base branch from graphite-base/3354 to main November 21, 2024 05:12
@github-actions github-actions bot added the enhancement New feature or request label Nov 21, 2024
Copy link

graphite-app bot commented Nov 21, 2024

Graphite Automations

"Notify author when CI fails" took an action on this PR • (11/21/24)

1 teammate was notified to this PR based on Andrew Gazelka's automation.

Copy link

codspeed-hq bot commented Nov 21, 2024

CodSpeed Performance Report

Merging #3354 will degrade performances by 54.24%

Comparing andrew/connect-join (2c4cb41) with main (cbe9d3b)

Summary

❌ 2 regressions
✅ 15 untouched benchmarks

⚠️ Please fix the performance issues or acknowledge them on CodSpeed.

Benchmarks breakdown

Benchmark main andrew/connect-join Change
test_iter_rows_first_row[100 Small Files] 219.4 ms 262.8 ms -16.51%
test_show[100 Small Files] 15 ms 32.7 ms -54.24%

Copy link

codecov bot commented Nov 21, 2024

Codecov Report

Attention: Patch coverage is 83.01887% with 9 lines in your changes missing coverage. Please review.

Project coverage is 77.35%. Comparing base (3394a66) to head (a56ba92).

Files with missing lines Patch % Lines
.../daft-connect/src/translation/logical_plan/join.rs 82.69% 9 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@           Coverage Diff           @@
##             main    #3354   +/-   ##
=======================================
  Coverage   77.35%   77.35%           
=======================================
  Files         685      686    +1     
  Lines       83631    83682   +51     
=======================================
+ Hits        64695    64735   +40     
- Misses      18936    18947   +11     
Files with missing lines Coverage Δ
src/daft-connect/src/translation/logical_plan.rs 64.28% <100.00%> (+2.74%) ⬆️
.../daft-connect/src/translation/logical_plan/join.rs 82.69% <82.69%> (ø)

... and 3 files with indirect coverage changes

---- 🚨 Try these New Features:

None,
None,
None,
false, // todo(correctness): we want join keys or not
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what spark expects here. Usually SQL will retain the join keys, and most dataframes will not. I'd try to perform a simple join with spark and see what happens.

Comment on lines +26 to +28
if let Some(join_condition) = join_condition {
bail!("Join conditions are not yet supported; use using_columns (join keys) instead; got {join_condition:?}");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should definitely support both USING and regular joins. They are mutually exclusive though, so we'll need to error if both are provided

bail!("Right side of join is required");
};

if let Some(join_condition) = join_condition {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we'll need some complicated logic to split the join condition into left and right halves. Check out the SQL frontend. We're doing something similar to split the join condition

Comment on lines 6 to 27
def test_join(spark_session):
# Create two DataFrames with overlapping IDs
df1 = spark_session.range(5)
df2 = spark_session.range(3, 7)

# Perform inner join on 'id' column
joined_df = df1.join(df2, "id", "inner")

# Verify join results
# assert joined_df.count() == 2, "Inner join should only return matching rows"

# Convert to pandas to verify exact results
joined_pandas = joined_df.toPandas()
assert set(joined_pandas["id"].tolist()) == {3, 4}, "Inner join should only contain IDs 3 and 4"

# Test left outer join
left_joined_pandas = df1.join(df2, "id", "left").toPandas()
assert set(left_joined_pandas["id"].tolist()) == {0, 1, 2, 3, 4}, "Left join should keep all rows from left DataFrame"

# Test right outer join
right_joined_pandas = df1.join(df2, "id", "right").toPandas()
assert set(right_joined_pandas["id"].tolist()) == {3, 4, 5, 6}, "Right join should keep all rows from right DataFrame"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Re: lines +6 to +28]

we're going to need a lot more tests for joins. Check out the other join tests, both for dataframe and sql.

See this comment inline on Graphite.

bail!("Join type must be specified; got Unspecified")
}
JoinType::Inner => Ok(daft_core::join::JoinType::Inner),
JoinType::FullOuter => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FullOuter is just the same as Outer

}
JoinType::LeftOuter => Ok(daft_core::join::JoinType::Left), // todo(correctness): is this correct?
JoinType::RightOuter => Ok(daft_core::join::JoinType::Right),
JoinType::LeftAnti => Ok(daft_core::join::JoinType::Anti), // todo(correctness): is this correct?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove comment. This is correct

JoinType::FullOuter => {
bail!("Full outer joins not yet supported") // todo(completeness): add support for full outer joins if it is not already implemented
}
JoinType::LeftOuter => Ok(daft_core::join::JoinType::Left), // todo(correctness): is this correct?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove comment. this is correct

Comment on lines 54 to 55
None,
None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does spark suffix/prefix the duplicate join keys at all? We should verify and match what they do here.

@andrewgazelka andrewgazelka force-pushed the andrew/connect-join branch 3 times, most recently from dd5e05c to 2c4cb41 Compare November 21, 2024 17:19
@andrewgazelka andrewgazelka changed the base branch from main to andrew/connect-with-columns-renamed November 21, 2024 17:19

impl From<daft_logical_plan::JoinType> for JoinTypeInfo {
fn from(join_type: daft_logical_plan::JoinType) -> Self {
JoinTypeInfo::Regular(join_type)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace JoinTypeInfo::Regular(join_type) with Self::Regular(join_type) to follow Rust style guidelines for impl From blocks

Spotted by Graphite Reviewer (based on CI logs)

Is this helpful? React 👍 or 👎 to let us know.

Comment on lines +47 to +49
let result = match join_type {
JoinTypeInfo::Cross => {
left.cross_join(&right, None, None)? // todo(correctness): is this correct?
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cross joins are not supported in Daft yet. This should be replaced with a bail!() statement that indicates cross joins are not supported.

Spotted by Graphite Reviewer (based on CI logs)

Is this helpful? React 👍 or 👎 to let us know.




def test_cross_join(spark_session):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't support cross joins yet

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants