Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: avoid converting union into interleave #4

Closed

Conversation

NGA-TRAN
Copy link

@NGA-TRAN NGA-TRAN commented Apr 3, 2024

This is a temporary PR just for @alamb and me to discuss our approach. We do not plan to merge this into any branch/repo (yet)

This related to IOX WIP PR https://github.com/influxdata/influxdb_iox/pull/10540

Which issue does this PR close?

Currently, DF always replaces Union with Interleave if the the operators under the union can interleave which means they have same hash partition. And this happens if they are aggregate as the plan below

Logical plan of SHOW TAG VALUES WITH KEY = "tag0"; with aggregation under union

 Sort: iox::measurement ASC NULLS LAST, key ASC NULLS LAST, value ASC NULLS LAST
   Union
     Projection: Dictionary(Int32, Utf8("m0")) AS iox::measurement, Dictionary(Int32, Utf8("tag0")) AS key, m0.tag0 AS value
       Aggregate: groupBy=[[m0.tag0]], aggr=[[]]
         TableScan: m0 projection=[tag0], full_filters=[m0.time >= TimestampNanosecond(631152000000000000, Some("UTC"))]
     Projection: Dictionary(Int32, Utf8("m1")) AS iox::measurement, Dictionary(Int32, Utf8("tag0")) AS key, m1.tag0 AS value
       Aggregate: groupBy=[[m1.tag0]], aggr=[[]]
          TableScan: m1 projection=[tag0], full_filters=[m1.time >= TimestampNanosecond(631152000000000000, Some("UTC"))]

Physical plan where the Union was replaced with Interleave and as a result/consequence the Sort was not pushed down

SortPreservingMergeExec: [iox::measurement@0 ASC NULLS LAST,key@1 ASC NULLS LAST,value@2 ASC NULLS LAST]
   SortExec: expr=[iox::measurement@0 ASC NULLS LAST,key@1 ASC NULLS LAST,value@2 ASC NULLS LAST]
     InterleaveExec   -- Union is now Interleave and thus sort is not pushed down (which makes sense in case of interleave but not waht we want)
       ProjectionExec: expr=[m0 as iox::measurement, tag0 as key, tag0@0 as value]
         AggregateExec: mode=FinalPartitioned, gby=[tag0@0 as tag0], aggr=[]
           CoalesceBatchesExec: target_batch_size=8192
             RepartitionExec: partitioning=Hash([tag0@0], 4), input_partitions=4
               AggregateExec: mode=Partial, gby=[tag0@0 as tag0], aggr=[]
                 RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
                   ProjectionExec: expr=[tag0@0 as tag0]
                     CoalesceBatchesExec: target_batch_size=8192
                       FilterExec: time@1 >= 631152000000000000
                         ParquetExec: file_groups={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, projection=[tag0, time], predicate=time@5 >= 631152000000000000, pruning_predicate=time_max@0 >= 631152000000000000, required_guarantees=[]
       ProjectionExec: expr=[m1 as iox::measurement, tag0 as key, tag0@0 as value]
         AggregateExec: mode=FinalPartitioned, gby=[tag0@0 as tag0], aggr=[], ordering_mode=Sorted
           CoalesceBatchesExec: target_batch_size=8192
             RepartitionExec: partitioning=Hash([tag0@0], 4), input_partitions=4, preserve_order=true, sort_exprs=tag0@0 ASC
               AggregateExec: mode=Partial, gby=[tag0@0 as tag0], aggr=[], ordering_mode=Sorted
                 RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
                   ProjectionExec: expr=[tag0@0 as tag0]
                     CoalesceBatchesExec: target_batch_size=8192
                       FilterExec: time@1 >= 631152000000000000
                         ParquetExec: file_groups={1 group: [[1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, projection=[tag0, time], output_ordering=[tag0@0 ASC, time@1 ASC], predicate=time@4 >= 631152000000000000, pruning_predicate=time_max@0 >= 631152000000000000, required_guarantees=[]

If we do not let Union be replaced with Interleave

Physical plan will have Sort pushed down

 SortPreservingMergeExec: [iox::measurement@0 ASC NULLS LAST,key@1 ASC NULLS LAST,value@2 ASC NULLS LAST]
   UnionExec  -- Union stil still union
     SortExec: expr=[iox::measurement@0 ASC NULLS LAST,key@1 ASC NULLS LAST,value@2 ASC NULLS LAST]
       ProjectionExec: expr=[m0 as iox::measurement, tag0 as key, tag0@0 as value]
         AggregateExec: mode=FinalPartitioned, gby=[tag0@0 as tag0], aggr=[]
           CoalesceBatchesExec: target_batch_size=8192
             RepartitionExec: partitioning=Hash([tag0@0], 4), input_partitions=4
               AggregateExec: mode=Partial, gby=[tag0@0 as tag0], aggr=[]
                 RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
                   ProjectionExec: expr=[tag0@0 as tag0]
                     CoalesceBatchesExec: target_batch_size=8192
                       FilterExec: time@1 >= 631152000000000000
                         ParquetExec: file_groups={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, projection=[tag0, time], predicate=time@5 >= 631152000000000000, pruning_predicate=time_max@0 >= 631152000000000000, required_guarantees=[]
     SortExec: expr=[iox::measurement@0 ASC NULLS LAST,key@1 ASC NULLS LAST,value@2 ASC NULLS LAST]
       ProjectionExec: expr=[m1 as iox::measurement, tag0 as key, tag0@0 as value]
         AggregateExec: mode=FinalPartitioned, gby=[tag0@0 as tag0], aggr=[], ordering_mode=Sorted
           CoalesceBatchesExec: target_batch_size=8192
             RepartitionExec: partitioning=Hash([tag0@0], 4), input_partitions=4, preserve_order=true, sort_exprs=tag0@0 ASC
               AggregateExec: mode=Partial, gby=[tag0@0 as tag0], aggr=[], ordering_mode=Sorted
                 RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
                   ProjectionExec: expr=[tag0@0 as tag0]
                     CoalesceBatchesExec: target_batch_size=8192
                       FilterExec: time@1 >= 631152000000000000
                         ParquetExec: file_groups={1 group: [[1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, projection=[tag0, time], output_ordering=[tag0@0 ASC, time@1 ASC], predicate=time@4 >= 631152000000000000, pruning_predicate=time_max@0 >= 631152000000000000, required_guarantees=[]

And we will be able to replace SortPreservingMerge with ProgresiveEval

ProgressiveEvalExec: input_ranges=[(Utf8("m0"), Utf8("m0")), (Utf8("m1"), Utf8("m1")), (Utf8("m2"), Utf8("m2")), (Utf8("m3"), Utf8("m3")), (Utf8("select_test"), Utf8("select_test"))]
   UnionExec
     SortPreservingMergeExec: [iox::measurement@0 ASC NULLS LAST,key@1 ASC NULLS LAST,value@2 ASC NULLS LAST]
       SortExec: expr=[iox::measurement@0 ASC NULLS LAST,key@1 ASC NULLS LAST,value@2 ASC NULLS LAST]
         ProjectionExec: expr=[m0 as iox::measurement, tag0 as key, tag0@0 as value]
           AggregateExec: mode=FinalPartitioned, gby=[tag0@0 as tag0], aggr=[]
             CoalesceBatchesExec: target_batch_size=8192
               RepartitionExec: partitioning=Hash([tag0@0], 4), input_partitions=4
                 AggregateExec: mode=Partial, gby=[tag0@0 as tag0], aggr=[]
                   RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
                     ProjectionExec: expr=[tag0@0 as tag0]
                       CoalesceBatchesExec: target_batch_size=8192
                         FilterExec: time@1 >= 631152000000000000
                           ParquetExec: file_groups={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, projection=[tag0, time], predicate=time@5 >= 631152000000000000, pruning_predicate=time_max@0 >= 631152000000000000, required_guarantees=[]
     SortPreservingMergeExec: [iox::measurement@0 ASC NULLS LAST,key@1 ASC NULLS LAST,value@2 ASC NULLS LAST]
       SortExec: expr=[iox::measurement@0 ASC NULLS LAST,key@1 ASC NULLS LAST,value@2 ASC NULLS LAST]
         ProjectionExec: expr=[m1 as iox::measurement, tag0 as key, tag0@0 as value]
           AggregateExec: mode=FinalPartitioned, gby=[tag0@0 as tag0], aggr=[], ordering_mode=Sorted
             CoalesceBatchesExec: target_batch_size=8192
               RepartitionExec: partitioning=Hash([tag0@0], 4), input_partitions=4, preserve_order=true, sort_exprs=tag0@0 ASC
                 AggregateExec: mode=Partial, gby=[tag0@0 as tag0], aggr=[], ordering_mode=Sorted
                   RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
                     ProjectionExec: expr=[tag0@0 as tag0]
                       CoalesceBatchesExec: target_batch_size=8192
                         FilterExec: time@1 >= 631152000000000000
                           ParquetExec: file_groups={1 group: [[1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, projection=[tag0, time], output_ordering=[tag0@0 ASC, time@1 ASC], predicate=time@4 >= 631152000000000000, pruning_predicate=time_max@0 >= 631152000000000000, required_guarantees=[]

@NGA-TRAN NGA-TRAN marked this pull request as draft April 3, 2024 15:52
Copy link
Collaborator

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @NGA-TRAN -- I think the basic idea of this PR makes sense.

I think I might suggest we call this something different (like maybe avoid_repartitioning or something) that is connected more to the effect rather than a specific code change

Looking forward to our discussion later today

// Data
Arc::new(InterleaveExec::try_new(children_plans)?)
if let Some(union_exec) = plan.as_any().downcast_ref::<UnionExec>() {
if !union_exec.skip_interleave() && can_interleave(children_plans.iter()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to remind myself what Interleave did:

https://github.com/apache/arrow-datafusion/blob/63888e853b7b094f2f47f53192a94f38327f5f5a/datafusion/physical-plan/src/union.rs#L286-L317

Do I understand correctly that the problem with switching to InterleaveExec is that it has more than 1 output partition and thus can't be converted to ProgressiveEval (due to this code
https://github.com/influxdata/influxdb_iox/blob/124f8b481179fd9f6d03f40cfecf40c326ac3127/iox_query/src/physical_optimizer/sort/order_union_sorted_inputs.rs#L153

)

If that is the case, I wonder would it be possible to make ProgressiveEval work with multiple partitions 🤔

Copy link
Author

@NGA-TRAN NGA-TRAN Apr 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the definition Interleave https://github.com/apache/arrow-datafusion/blob/63888e853b7b094f2f47f53192a94f38327f5f5a/datafusion/physical-plan/src/union.rs#L286-L317, data will be grouped by hash-function/hash-partitioning which means we group data and won't keep them in any order. ProgresiveEval needs data to be in order for it to work.

So if we can solve the problem to have output of Interleave sorted, it will work with ProgressiveEval. However, if I understand correctly, the whole point of Interleave is we do not have to worry and won't keep data sorted. That is why using Interleave is not what we want in the first place

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am bothered by the fact that the flag is on the UnionExec because:

  1. that isn't logically a property of the UnionExec but rather a behavior we want to change in one of the OptimzerPasses. This seems to couple the plan to the optimizer which I think makes the overall codebase harder to understand
  2. There aren't existing examples of this kind of "change the optimizer behavior" flag on plan nodes (ConfigOptions is used for this purpose elsewhere)

Thus, I think we should add a flag to ConfigOptions that will affect the behavior of the EnforceDistribution pass, similar to repartition_file_scans is passed. This would also give us a good place to document and explain what the option does.

So I think that instead of checking on some property of the union_exec we should use a property of options -- aka that is the ConfigOptions passed here:

fn ensure_distribution(
    dist_context: DistributionContext,
    config: &ConfigOptions,
) -> Result<Transformed<DistributionContext>> {

This would be consistent with how other settings are done and would make it clear when this flag got set

The downside is it would potentially affect all UnionExecs in the plan, not just specific ones, but I actually think that is easier to understand and will not cause any performance issue

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb Here is the DF ticket apache#10257. Can you have a look. If I understand correctly, I just need to use that new flag to avoid Interleave and I will find a good place to set that config flag. I will create different branch that can be merged directly into DF main branch for this work

Copy link
Collaborator

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @NGA-TRAN -- I think the code is well commented and makes sense

I am concerned about two things:

  1. The overhead of maintaining a patched version of DataFusion long term (it adds overhead to each new upgrade). This can be addressed by filing a upstream ticket / PR in datafusion so we don't have to maintain the fork
  2. That this code will be accidentally removed / broken in the future as it is quite subtle and would only be used in InfluxDB. This can be addressed with moving to a config option, documentation and tests (i left some suggestion)

So all in all I think this is pretty close. Thank you for bearing with me

// Data
Arc::new(InterleaveExec::try_new(children_plans)?)
if let Some(union_exec) = plan.as_any().downcast_ref::<UnionExec>() {
if !union_exec.skip_interleave() && can_interleave(children_plans.iter()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am bothered by the fact that the flag is on the UnionExec because:

  1. that isn't logically a property of the UnionExec but rather a behavior we want to change in one of the OptimzerPasses. This seems to couple the plan to the optimizer which I think makes the overall codebase harder to understand
  2. There aren't existing examples of this kind of "change the optimizer behavior" flag on plan nodes (ConfigOptions is used for this purpose elsewhere)

Thus, I think we should add a flag to ConfigOptions that will affect the behavior of the EnforceDistribution pass, similar to repartition_file_scans is passed. This would also give us a good place to document and explain what the option does.

So I think that instead of checking on some property of the union_exec we should use a property of options -- aka that is the ConfigOptions passed here:

fn ensure_distribution(
    dist_context: DistributionContext,
    config: &ConfigOptions,
) -> Result<Transformed<DistributionContext>> {

This would be consistent with how other settings are done and would make it clear when this flag got set

The downside is it would potentially affect all UnionExecs in the plan, not just specific ones, but I actually think that is easier to understand and will not cause any performance issue

} else {
plan.with_new_children(children_plans)?
plan = plan.with_new_children(children_plans)?
};

Ok(Transformed::yes(DistributionContext::new(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need tests for this behavior in enforce_distribution . Can you please add some? Perhaps modeled after the existing union test https://github.com/influxdata/arrow-datafusion/blob/main/datafusion/core/src/physical_optimizer/enforce_distribution.rs#L3064-L3103 ?

This would also ensure we had the behavior change documented in code and would hopefully make it hard to break in the future

@NGA-TRAN
Copy link
Author

Close this in favor of apache#10259

@NGA-TRAN NGA-TRAN closed this Apr 29, 2024
wiedld pushed a commit that referenced this pull request Aug 9, 2024
* Make `CommonSubexprEliminate` top-down like

* fix top-down recursion, fix unit tests to use real a Optimizer to verify behavior on plans

* Extract result of `find_common_exprs` into a struct (#4)

* Extract the result of find_common_exprs into a struct

* Make naming consistent

---------

Co-authored-by: Andrew Lamb <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants