Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unbounded SortExec (and Top-K) Implementation When Req's Are Satisfied #12174

Merged
merged 12 commits into from
Aug 29, 2024

Conversation

berkaysynnada
Copy link
Contributor

@berkaysynnada berkaysynnada commented Aug 26, 2024

Which issue does this PR close?

Closes #.

Rationale for this change

SortExec (with or w/out fetch) can work without an actual sort if the existing input order is required.

What changes are included in this PR?

  1. When we create a new SortExec, its execution mode is set according to its input order.
  2. Adding a fetch can update its execution mode.
  3. During execution, we consult the information of matching the requirements and having a fetch. If we already satisfy the requirements, SortExec either short-circuits itself (if it does not have a fetch), or behaves as a limit operator (if it has a fetch).

Are these changes tested?

Yes. I cannot practice the newly added stream types via an .slt test, but a unit test is added.

Are there any user-facing changes?

@github-actions github-actions bot added the physical-expr Physical Expressions label Aug 26, 2024
Copy link
Contributor

@ozankabak ozankabak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM thank you

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend we add a test for this change (so that we don't accidentally break it in some future refactor).

pub fn with_fetch(&self, fetch: Option<usize>) -> Self {
let mut cache = self.cache.clone();
if fetch.is_some() {
// When a theoretically unnecessary sort becomes a top-K (which
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand how a top-k sort would become bounded. I may misundersrtand what the ExecutionMode trait means, but it seems like TopK could not complete until its input completed, but if its input was unbounded the sort itself therefore would also be unbounded

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general you are right -- however, you are missing that we turn it into Bounded only when the sort requirement is already satisfied. This happens when a sort "becomes" unnecessary during one of the plan optimization steps (and it will eventually get removed).

Copy link
Contributor Author

@berkaysynnada berkaysynnada Aug 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand how a top-k sort would become bounded. I may misundersrtand what the ExecutionMode trait means, but it seems like TopK could not complete until its input completed, but if its input was unbounded the sort itself therefore would also be unbounded

I misassumed the implementation of top-k. Could you please take a look to the new idea? I will update the PR title and body

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I think the comments help explain what is going on here well. Thank you

@berkaysynnada berkaysynnada changed the title TopK Should Update ExecutionMode Unbounded SortExec (and Top-K) Implementation When Req's Are Satisfied Aug 27, 2024
Copy link
Contributor

@ozankabak ozankabak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for improving the corner cases to match the theory. IMO this is ready to go -- @alamb a quick look would be appreciated

datafusion/physical-plan/src/sorts/sort.rs Outdated Show resolved Hide resolved
datafusion/physical-plan/src/sorts/sort.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @berkaysynnada and @ozankabak

I am sorry, but I am still struggling to understand this PR

It seems like this PR changes the execution plan for a Sort(limit = 5) into a Limit if the data is already sorted correctly (aka doesn't need an additional sort).

But in this case I would have expected the Sort to have been removed by one of the optimizer passes rather than the SortExec implementing a limit. 🤔

execution_options.sort_in_place_threshold_bytes,
&self.metrics_set,
context.runtime_env(),
let sort_satisfied = self
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same calculation as self.execution_mode(), right? Maybe we could call self.execution_mode here instead to be more efficient and ensure the calculations remained in sync

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is similar but not exactly the same (i.e. execution mode is derived from sort_satisfied but AFAIK the reverse is not possible). I think @berkaysynnada tried this but it didn't work

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tried that but didn't work. Knowing the sort is bounded or unbounded does not mean sort is satisfied.

}

fn fetch(&self) -> Option<usize> {
self.fetch
}
}

struct TopKStream {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would help to add documentation to this struct, specifically that explains how it is different than TopK

fetch: usize,
}

impl Stream for TopKStream {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks very similar to LimitStream -- https://docs.rs/datafusion-physical-plan/41.0.0/src/datafusion_physical_plan/limit.rs.html#434

though limit stream has metrics and some other features

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, maybe we can reuse it. We will take a look

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right -- sent a commit to reuse LimitStream

@ozankabak
Copy link
Contributor

ozankabak commented Aug 28, 2024

But in this case I would have expected the Sort to have been removed by one of the optimizer passes rather than the SortExec implementing a limit. 🤔

Right. The sort will actually be entirely removed in later passes. However, in the meantime (before its removal), any rule looking at the execution mode will still think this TopK is pipeline-breaking (while it is not), which results in them behaving incorrectly. We caught this behavior downstream in the context of a custom rule.

Basically, this PR does two things:

  1. It makes the execution_mode API give a correct result when data is already sorted, so intermediate rules that look at execution modes do not falsely think that the pipeline breaks somewhere.
  2. It makes TopK's execution logic behave like a limit if it somehow ends up in the plan without being removed. This is not possible in upstream DataFusion (sans bugs), but people using custom optimizers can end up in such a state. In such cases, it will also execute more efficiently.

@ozankabak
Copy link
Contributor

Thanks for taking another look -- incorporated your feedback to leverage LimitStream

@ozankabak
Copy link
Contributor

ozankabak commented Aug 29, 2024

I will go ahead and merge this soon since it is a small, localized change that improves corner cases without interfering with the main use case of SortExec.

If there are any lingering concerns, we will address with a quick follow-on PR

@ozankabak ozankabak merged commit f5dcdf0 into apache:main Aug 29, 2024
24 checks passed
@alamb
Copy link
Contributor

alamb commented Sep 1, 2024

Thank you for responding to the feedback @berkaysynnada and @ozankabak -- sorry for my delay -- I have been out. This PR now looks good to me

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants