Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not repartition inputs whose sort order is required #4885

Merged
merged 1 commit into from
Jan 17, 2023

Conversation

alamb
Copy link
Contributor

@alamb alamb commented Jan 12, 2023

Which issue does this PR close?

Fixes #4883

the fix is small logic fix to a condition. The rest of this PR is documentation and tests

Rationale for this change

The repartition optimizer pass is destroying a pre-existing sort order by repartitioning the data. The plan is actually producing correct answers (which is good) because the sort is added back afterwards, but it was doing so by resorting the data :(

There is much more backstory on #4883

What changes are included in this PR?

Are these changes tested?

yes, new tests

Without this change the tests fail like (added a repartition above the parquet exec)


---- physical_optimizer::repartition::tests::repartition_does_not_destroy_sort_more_complex stdout ----
thread 'physical_optimizer::repartition::tests::repartition_does_not_destroy_sort_more_complex' panicked at 'assertion failed: `(left == right)`
  left: `["UnionExec", "SortRequiredExec", "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", "FilterExec: c1@0", "RepartitionExec: partitioning=RoundRobinBatch(10)", "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]"]`,
 right: `["UnionExec", "SortRequiredExec", "RepartitionExec: partitioning=RoundRobinBatch(10)", "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", "FilterExec: c1@0", "RepartitionExec: partitioning=RoundRobinBatch(10)", "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]"]`:

expected:

[
    "UnionExec",
    "SortRequiredExec",
    "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
    "FilterExec: c1@0",
    "RepartitionExec: partitioning=RoundRobinBatch(10)",
    "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
]
actual:

[
    "UnionExec",
    "SortRequiredExec",
    "RepartitionExec: partitioning=RoundRobinBatch(10)",
    "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
    "FilterExec: c1@0",
    "RepartitionExec: partitioning=RoundRobinBatch(10)",
    "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
]


Are there any user-facing changes?

I am not sure if this bug is hittable for other users. We hit it in IOx and I think UnboundedWindowExec and MergeJoin are susceptible to the same problem, but I am not sure how widely used they are

@github-actions github-actions bot added the core Core DataFusion crate label Jan 12, 2023
Comment on lines 169 to 170
let can_reorder_child = (can_reorder
&& plan.required_input_ordering().is_empty())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change is to add && plan.required_input_ordering().is_empty()) to the check if the child can be reordered

fn repartition_does_not_destroy_sort_more_complex() -> Result<()> {
// model a more complicated scenario where one child of a union can be repartitioned for performance
// but the other can not be
//
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This plan is close to what we have in IOx

datafusion/core/tests/sql/window.rs Outdated Show resolved Hide resolved
@alamb alamb marked this pull request as draft January 12, 2023 14:19
@ozankabak
Copy link
Contributor

FYI, we were aware of this and actually are currently exploring sort-preserving repartitions -- if we are successful, we will have both ways soon 🙂 This fix LGTM in the meantime (or if we fail).

@alamb
Copy link
Contributor Author

alamb commented Jan 12, 2023

FYI, we were aware of this and actually are currently exploring sort-preserving repartitions -- if we are successful, we will have both ways soon 🙂 This fix LGTM in the meantime (or if we fail).

Thank you @ozankabak -- you may be interested in #4867 from @crepererum which proposes some other improvements to partitioning

fix case where repartitioning destroys output order
add tests coverage
@alamb alamb force-pushed the alamb/repartition_past_union branch from d9b1601 to eaa8349 Compare January 13, 2023 21:02
@alamb alamb marked this pull request as ready for review January 13, 2023 21:03
Comment on lines +183 to +202
let required_input_ordering =
plan_has_required_input_ordering(plan.as_ref());

let can_reorder_child = if can_reorder {
// parent of `plan` will not use any particular order

// if `plan` itself doesn't need order OR
!required_input_ordering ||
// child has no order to preserve
child.output_ordering().is_none()
} else {
// parent would like to use the `plan`'s output
// order.

// if `plan` doesn't maintain the input order and
// doesn't need the child's output order itself
(!plan.maintains_input_order() && !required_input_ordering) ||
// child has no ordering to preserve
child.output_ordering().is_none()
};
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the logic I eventually arrived at after a lot of thought. I tried to document the rationale as best I could -- perhaps @ozankabak and @mustafasrepo might have a few minutes to double check this. I believe this was first introduced by #4691

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure; will take a look, discuss with the team and get back to you.

Copy link
Contributor

@mustafasrepo mustafasrepo Jan 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb I checked the snippet, and tried a couple of alternatives with no success :). I think this logic is correct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me as well

/// way for correctness. If this is true, its output should not be
/// repartitioned if it would destroy the required order.
fn plan_has_required_input_ordering(plan: &dyn ExecutionPlan) -> bool {
// NB: checking `is_empty()` is not the right check!
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I lost easily an hour trying to figure this out

@alamb alamb merged commit 9301133 into apache:master Jan 17, 2023
@ursabot
Copy link

ursabot commented Jan 17, 2023

Benchmark runs are scheduled for baseline = 8ab3a91 and contender = 9301133. 9301133 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

alamb added a commit to alamb/datafusion that referenced this pull request Jan 17, 2023
fix case where repartitioning destroys output order
add tests coverage
alamb added a commit to alamb/datafusion that referenced this pull request Jan 17, 2023
fix case where repartitioning destroys output order
add tests coverage
@alamb alamb mentioned this pull request Jan 17, 2023
2 tasks
@alamb alamb deleted the alamb/repartition_past_union branch August 8, 2023 20:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Repartition is being added incorrectly in some cases
4 participants