Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change display of RepartitionExec from SortPreservingRepartitionExec to RepartitionExec preserve_order=true` #8129

Closed
alamb opened this issue Nov 10, 2023 · 6 comments · Fixed by #8521
Labels
enhancement New feature or request good first issue Good for newcomers

Comments

@alamb
Copy link
Contributor

alamb commented Nov 10, 2023

Is your feature request related to a problem or challenge?

Most (all?) nodes in DataFusion explain plans have the property that they are displayed with the same name as the struct that implements them, which makes matching explain plans to code easy

However, a notable exception is RepartitionExec which is displayed sometimes as RepartitionExec and sometimes as SortPreservingRepartitionExec to signify it preserves the input sort order, which makes finding the relevant code harder (as there is no such code as SortPreservingRepartitionExec)

For example in the following plan RepartitionExec and SortPreservingRepartitionExec` are the same ExecutionPlan node:

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan                                                                                                                                                                                                                                                                                                                                                  |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| SortPreservingMergeExec: [cpu@2 ASC NULLS LAST,time@1 ASC NULLS LAST]                                                                                                                                                                                                                                                                                 |
|   SortExec: expr=[cpu@2 ASC NULLS LAST,time@1 ASC NULLS LAST]                                                                                                                                                                                                                                                                                         |
|     ProjectionExec: expr=[cpu as iox::measurement, time@0 as time, cpu@1 as cpu, coalesce(P@2 / 99.99, 0) as P]                                                                                                                                                                                                                                       |
|       RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=16                                                                                                                                                                                                                                                                          |
|         ProjectionExec: expr=[time@1 as time, cpu@2 as cpu, (selector_max(cpu.usage_system,cpu.time)@3).[value] as P]                                                                                                                                                                                                                                 |
|           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                 |
|             FilterExec: AVG(cpu.usage_idle)@4 > 0.5 AND AVG(cpu.usage_idle)@4 < 1.5 AND (selector_max(cpu.usage_system,cpu.time))[value]selector_max(cpu.usage_system,cpu.time)@0 > 0 AND (selector_max(cpu.usage_system,cpu.time))[value]selector_max(cpu.usage_system,cpu.time)@0 < 1                                                               |
|               ProjectionExec: expr=[(selector_max(cpu.usage_system,cpu.time)@2).[value] as (selector_max(cpu.usage_system,cpu.time))[value]selector_max(cpu.usage_system,cpu.time), time@0 as time, cpu@1 as cpu, selector_max(cpu.usage_system,cpu.time)@2 as selector_max(cpu.usage_system,cpu.time), AVG(cpu.usage_idle)@3 as AVG(cpu.usage_idle)] |
|                 AggregateExec: mode=FinalPartitioned, gby=[time@0 as time, cpu@1 as cpu], aggr=[selector_max(cpu.usage_system,cpu.time), AVG(cpu.usage_idle)], ordering_mode=PartiallyOrdered                                                                                                                                                         |
|                   CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                         |
|                     SortPreservingRepartitionExec: partitioning=Hash([time@0, cpu@1], 16), input_partitions=16, sort_exprs=cpu@1 ASC                                                                                                                                                                                                                  |
|                       AggregateExec: mode=Partial, gby=[date_bin(10000000000, time@1, 0) as time, cpu@0 as cpu], aggr=[selector_max(cpu.usage_system,cpu.time), AVG(cpu.usage_idle)], ordering_mode=PartiallyOrdered                                                                                                                                  |
|                         RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1                                                                                                                                                                                                                                                         |
|                           ProjectionExec: expr=[cpu@1 as cpu, time@3 as time, usage_idle@4 as usage_idle, usage_system@5 as usage_system]                                                                                                                                                                                                             |
|                             DeduplicateExec: [cpu@1 ASC,host@2 ASC,time@3 ASC]                                                                                                                                                                                                                                                                        |
|                               SortExec: expr=[cpu@1 ASC,host@2 ASC,time@3 ASC,__chunk_order@0 ASC]                                                                                                                                                                                                                                                    |
|                                 CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                           |
|                                   FilterExec: time@3 <= 1699620021099501000                                                                                                                                                                                                                                                                           |
|                                     RecordBatchesExec: chunks=1, projection=[__chunk_order, cpu, host, time, usage_idle, usage_system]                                                                                                                                                                                                                |
|                                                                                                                                             

Describe the solution you'd like

I would like to change the display of RepartitionExec so it always looks like RepartitionExec and has preserve_order=true to follow the same pattern as the other nodes where the output of the explain plan matches the name of the code that implements it.

So for example, name should always be RepartitionExec: https://github.com/apache/arrow-datafusion/blob/91c9d6f847eda0b5b1d01257b5c24459651d3926/datafusion/physical-plan/src/repartition/mod.rs#L373-L379

And instead of

 SortPreservingRepartitionExec: partitioning=Hash([time@0, cpu@1], 16), input_partitions=16, sort_exprs=cpu@1 ASC

It should display like

 RepartitionExec: partitioning=Hash([time@0, cpu@1], 16), input_partitions=16, preserve_order=true, sort_exprs=cpu@1 ASC

Describe alternatives you've considered

I recommend ONLY displaying preserve_order when it is true

so when false, display like this:

RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=16                       

Additional context

I think this is a good first issue as it is well specified and a mechanical change and would teach the person about datafusions tests. Beware, however, that this is likely to require a non trivial number of changes to expected test output.

Thanks in advance!

@alamb alamb added enhancement New feature or request good first issue Good for newcomers labels Nov 10, 2023
@alamb
Copy link
Contributor Author

alamb commented Nov 10, 2023

FYI @ozankabak and @mustafasrepo and @metesynnada as this may impact you

@JacobOgle
Copy link
Contributor

I wouldn't mind taking a look into this

@ozankabak
Copy link
Contributor

We are fine with this change

@alamb
Copy link
Contributor Author

alamb commented Nov 10, 2023

Thanks @JacobOgle !

JacobOgle added a commit to JacobOgle/arrow-datafusion that referenced this issue Dec 12, 2023
metesynnada pushed a commit that referenced this issue Dec 13, 2023
…to RepartitionExec preserve_order=true (#8521)

* Change display of RepartitionExec from SortPreservingRepartitionExec to RepartitionExec preserve_order=true #8129

* fix mod.rs with cargo fmt

* test fix for repartition::test::test_preserve_order
@jeffwidman
Copy link

jeffwidman commented Dec 15, 2023

Should this be closed now that #8521 is merged?

Looks like the PR description simply mis-formatted the Close #8129 statement so it didn't auto-close.

@Dandandan
Copy link
Contributor

Closed by #8521

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request good first issue Good for newcomers
Projects
None yet
5 participants