Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Default datafusion.optimizer.prefer_existing_sort to true #8572

Closed
tustvold opened this issue Dec 17, 2023 · 8 comments
Closed

Default datafusion.optimizer.prefer_existing_sort to true #8572

tustvold opened this issue Dec 17, 2023 · 8 comments
Labels
enhancement New feature or request

Comments

@tustvold
Copy link
Contributor

Is your feature request related to a problem or challenge?

Whilst working on #8540 I was surprised to see removing unbounded causing the DataFusion optimizer to not remove the SortExec from the below plan:

    "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
    "  SortExec: expr=[a@0 ASC NULLS LAST]",
    "    RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
    "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
    "        CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",

Doing some spelunking this appears to be a regression introduced by #7671 (comment)

Describe the solution you'd like

I can't see an obvious reason to not enable this by default, as it seems like the more reasonable default, and also consistent with how I historically remember DataFusion behaving

Describe alternatives you've considered

No response

Additional context

FYI @alamb @ozankabak

@alamb
Copy link
Contributor

alamb commented Dec 18, 2023

I believe it was a deliberate change. from the comment you reference #7671 (comment)

For some detailed context: The flag basically lets the user choose whether they want SortPreservingMerges, or Repartition/Coalesce+Sort cascades. We ran some benchmarks and there is no clearly dominating strategy, each alternative comes out ahead in certain cases. In non-streaming cases, the first alternative typically came out ahead, so we let the default flag value to be false.

In IOx it is better for our usecase to use pre-existing sort orders, but I can see how for other uses cases it may not be.

If there is a consensus that changing the default setting would be less surprising, it would be fine with me to change it.

@mustafasrepo
Copy link
Contributor

At the time of the original PR, we did a benchmark for comparison with following results.

PLAN V1

"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
"  SortPreservingRepartitionExec: partitioning=Hash([b@0], 16), input_partitions=16",
"    RepartitionExec: partitioning=Hash([a@0], 16), input_partitions=1",
"      MemoryExec: partitions=1, partition_sizes=[<depends_on_batch_size>], output_ordering: [PhysicalSortExpr { expr: Column { name: \\"a\\", index: 0 }, options: SortOptions { descending: false, nulls_first: false } }]",

PLAN V2

"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
"  SortExec: expr=[a@0 ASC NULLS LAST]",
"    RepartitionExec: partitioning=Hash([b@0], 16), input_partitions=16",
"      RepartitionExec: partitioning=Hash([a@0], 16), input_partitions=1",
"        MemoryExec: partitions=1, partition_sizes=[(<depends_on_batch_size>)], output_ordering: [PhysicalSortExpr { expr: Column { name: \\"a\\", index: 0 }, options: SortOptions { descending: false, nulls_first: false } }]",
n_elem n_trial batch_size elapsed_mean(v1) elapsed_median(v1) elapsed_mean(v2) elapsed_median(v2)
100000 10 25 581.791054ms 581.777708ms 1.112292862s 1.117181625s
100000 10 50 373.18927ms 373.231708ms 385.533966ms 385.685291ms
100000 10 100 240.91572ms 240.570833ms 239.308708ms 239.303709ms
100000 10 1000 115.471541ms 113.817709ms 100.713324ms 100.647333ms

After these results, we decided to use existing default behavior (where V2 is preferred over V1) to not hurt performance for others. Frankly, changing default would be better for our use cases. As @alamb suggests if there is concencus we can change the default. I wonder @ozankabak's opinion regarding this change.

@ozankabak
Copy link
Contributor

I believe it was a deliberate change. from the comment you reference #7671 (comment)

Right.

If there is a consensus that changing the default setting would be less surprising, it would be fine with me to change it.

Focusing on our use cases, we would be fine with changing the default (it fits much better to our use cases). However, it may result in a somewhat noticeable OOTB performance hit for some groups of users that use (or will use) DF for batch compute jobs. It may also hurt OOTB performance in certain batch benchmarks.

I think it'd be a good idea to try to discuss this with a wider audience to better understand the implications.

@tustvold
Copy link
Contributor Author

tustvold commented Dec 18, 2023

Perhaps I am missing something, but in the example plan in the issue, setting this option to true causes the optimiser to remove the SortExec, with no modification to the rest of the plan. I struggle to see how this would lead to a performance regression, and by extension why this would not be the default behaviour? Perhaps the setting is overly restrictive on the optimizer?

@mustafasrepo
Copy link
Contributor

Actually, it sets preserve_order, flag to true for the second repartition in the plan (What I call SortPreservingRepartitionExec in Plan V1). In this mode, during repartitioning streaming_merge helper is used. Hence this mode preserves input ordering during repartitioning. This decreases speed as compared to the direct repartitioning (Similar to the CoalescePartitionsExec and SortPreservingMergeExec)

@tustvold
Copy link
Contributor Author

But it only has one input partition, why does it need a streaming merge to do order preserving repartition?

@mustafasrepo
Copy link
Contributor

By second I mean from bottom to top. In your original example

    "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
    "  SortExec: expr=[a@0 ASC NULLS LAST]",
    "    RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
    "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
    "        CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",

RepartitionExec immediately below the SortExec has input partition 8.
Other plan would be

    "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
    "  RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true",
    "    RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
    "      CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",

In this case streaming_merge will be used in the internal of second RepartitionExec

@tustvold
Copy link
Contributor Author

Aah makes sense. 👍

Tbh in that case I'd hope DF would strip all the repartitioning and merges out, they're all unnecessary and will just make it slower, but I suspect that's a different issue.

Thanks for the discussion, closing this for now

@tustvold tustvold closed this as not planned Won't fix, can't repro, duplicate, stale Dec 19, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants