Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for utilization complex aggregate exprs inside group by #8107

Closed
wants to merge 14 commits into from
Closed

Add support for utilization complex aggregate exprs inside group by #8107

wants to merge 14 commits into from

Conversation

mustafasrepo
Copy link
Contributor

@mustafasrepo mustafasrepo commented Nov 9, 2023

Which issue does this PR close?

Closes #8043. Part of #8064

Rationale for this change

Group by cannot understand ordering of the complex expression currently.
Assume that we know that column ts is ordered. When some writes a query in the form GROUP BY (ts) we can understand that ts is ordered, then run AggregateExec in Sorted mode. However, when some writes date_bin(ts), we cannot understand date_bin(ts) is indeed ordered given ts is ordered (Please note that datafusion has this mechanism. However, in group by we do not use this mechanism).
This PR solves this problem.

The design is as follows, we apply an implicit projection before checking

  • whether an ordering is satisfied,
  • before searching the longest prefix(algorithm used in group by)
    Then existing code runs as before. This implicit projection enables us to treat complex expression as columns, where their result are calculated(Please note that this calculation is done only in terms of ordering properties. There is no computation).

Assume that existing ordering is ts ASC. Also assume that some writes GROUP BY (date_bin(ts)) in the query.
Implicit projection do following projection:
ts -> col(ts), date_bin(ts)-> col(date_bin(ts)).
This projection transforms existing ordering from ts ASC to [ts ASC], [date_bin(ts) ASC]. Then existing algorithms works on this projected form.

Please note that: Changes in the replace_order_preserving_variants.rs file only comes from schema changes. Previously some of the plans were invalid (such column should be at index 1, however it is at index 2). However, they were silent. Because these plans never executed. With this new algorithm, algorithm considers existing schema during projection also. Hence these silent bugs aroused. I also fixed them as part of this PR.

What changes are included in this PR?

Are these changes tested?

Yes new tests are added

Are there any user-facing changes?

@github-actions github-actions bot added physical-expr Physical Expressions core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Nov 9, 2023
@mustafasrepo mustafasrepo changed the title Bug fix/complex aggregate exprs Add support for utilization complex aggregate exprs inside group by Nov 9, 2023
@ozankabak
Copy link
Contributor

@alamb, this should solve the test that fails on your end. Can you please confirm?

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this @mustafasrepo -- I am a little confused about the algorithm (I left some questions)

@alamb, this should solve the test that fails on your end. Can you please confirm?

I tried @ozankabak and unfortunately it does not solve our issue. I will work tomorrow on getting a datafusion only reproducer for our issue and add it to the ticket.

datafusion/physical-expr/src/equivalence.rs Outdated Show resolved Hide resolved
let projection_mapping = self.implicit_projection_mapping(&exprs);
let projected_eqs =
self.project(&projection_mapping, projection_mapping.output_schema());
if let Some(projected_reqs) = projection_mapping.project_lex_reqs(reqs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not quite sure I follow this logic -- does it say that any expression that can be projected maintains the ordering?

What about non monotonic expressions like abs(x) ? If the input is orderered by x

-2
-1
0
1
2

The output will not be ordered by abs(xx)

2
1
0
1
2

(the same applies to date functions like extract(day from ts)

Perhaps we need to check FuncMonotonicity

Copy link
Contributor Author

@mustafasrepo mustafasrepo Nov 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not quite sure I follow this logic -- does it say that any expression that can be projected maintains the ordering?

No, it just projects everything as if they are evaluated (in terms of ordering properties). For instance if ts is not ordered. We know that date_bin(ts) is not ordered. Similarly abs(x) is not ordered no matter x is ordered or not. update_ordering considers the properties of functions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I was trying to say that in general, for ProjectionExec and any node that computes expressions, I would expect that we would have to use FuncMonotonicity to determine if an expression of x is ordered by expr(x)

Maybe that would be a good thing to do as a follow on PR

datafusion/sqllogictest/test_files/groupby.slt Outdated Show resolved Hide resolved
@ozankabak
Copy link
Contributor

I tried @ozankabak and unfortunately it does not solve our issue. I will work tomorrow on getting a datafusion only reproducer for our issue and add it to the ticket.

Thanks, it will help us pinpoint exactly what is wrong

@mustafasrepo
Copy link
Contributor Author

mustafasrepo commented Nov 10, 2023

By the way, I want to stress that this PR is beneficial even if it doesn't solve the issue in #8043. This PR enables us to run some of the aggregate queries in ordered or partially ordered mode. As can be seen from the test result

GlobalLimitExec: skip=0, fetch=5
--SortPreservingMergeExec: [time_chunks@0 DESC], fetch=5
----ProjectionExec: expr=[date_bin(Utf8("15 minutes"),csv_with_timestamps.ts)@0 as time_chunks]
------AggregateExec: mode=FinalPartitioned, gby=[date_bin(Utf8("15 minutes"),csv_with_timestamps.ts)@0 as date_bin(Utf8("15 minutes"),csv_with_timestamps.ts)], aggr=[], ordering_mode=Sorted
--------CoalesceBatchesExec: target_batch_size=2
----------SortPreservingRepartitionExec: partitioning=Hash([date_bin(Utf8("15 minutes"),csv_with_timestamps.ts)@0], 8), input_partitions=8, sort_exprs=date_bin(Utf8("15 minutes"),csv_with_timestamps.ts)@0 DESC
------------AggregateExec: mode=Partial, gby=[date_bin(900000000000, ts@0) as date_bin(Utf8("15 minutes"),csv_with_timestamps.ts)], aggr=[], ordering_mode=Sorted
--------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/timestamps.csv]]}, projection=[ts], infinite_source=true, output_ordering=[ts@0 DESC], has_header=false

AggregateExecs work in Sorted mode, we can now determine that given ts is ordered. date_bin(900000000000, ts@0) is also ordered. Previously these AggregateExecs were not working in Sorted mode.

Also after this PR I would expect that following query result mentioned in the #8043.

2023-11-02T15:58:06.601675Z TRACE log: Optimized physical plan by CombinePartialFinalAggregate:
OutputRequirementExec
  SortExec: expr=[time@1 ASC NULLS LAST]
    CoalescePartitionsExec
      ProjectionExec: expr=[cpu as iox::measurement, time@0 as time, (selector_last(sum_idle,time)@1).[value] as last, (selector_last(sum_system,time)@2).[value] as last_1]
        AggregateExec: mode=FinalPartitioned, gby=[time@0 as time], aggr=[selector_last(sum_idle,time), selector_last(sum_system,time)], ordering_mode=Sorted
          SortPreservingRepartitionExec: partitioning=Hash([time@0], 16), input_partitions=16, sort_exprs=time@0 ASC NULLS LAST
            AggregateExec: mode=Partial, gby=[date_bin(10000000000, time@0, 0) as time], aggr=[selector_last(sum_idle,time), selector_last(sum_system,time)]
              RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1
                SortExec: expr=[time@0 ASC NULLS LAST]
                  CoalescePartitionsExec
                    ProjectionExec: expr=[time@0 as time, SUM(cpu.usage_idle)@1 as sum_idle, SUM(cpu.usage_system)@2 as sum_system]
                      AggregateExec: mode=FinalPartitioned, gby=[time@0 as time], aggr=[SUM(cpu.usage_idle), SUM(cpu.usage_system)]
                        RepartitionExec: partitioning=Hash([time@0], 16), input_partitions=16
                          AggregateExec: mode=Partial, gby=[date_bin(10000000000, time@0, 0) as time], aggr=[SUM(cpu.usage_idle), SUM(cpu.usage_system)]
                            RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1
                              ProjectionExec: expr=[time@1 as time, usage_idle@2 as usage_idle, usage_system@3 as usage_system]
                                FilterExec: date_bin(10000000000, time@1, 0) <= 1698940686290451000 AND time@1 <= 1698940686290451000 AND cpu@0 = cpu-total
                                  ParquetExec: file_groups={1 group: [[2/8/0649f0e8b1abed092a356ec6181369fcf585431d1cc0694a0cc4ab45cf78b49d/0c5ac9b2-f6d4-4004-9036-15412da47647.parquet]]}, projection=[cpu, time, usage_idle, usage_system], predicate=date_bin(10000000000, time@2, 0) <= 1698940686290451000 AND time@2 <= 1698940686290451000 AND cpu@0 = cpu-total, pruning_predicate=time_min@0 <= 1698940686290451000 AND cpu_min@1 <= cpu-total AND cpu-total <= cpu_max@2

would turn into (pointed AggregateExec mode changes to Sorted mode.)

2023-11-02T15:58:06.601675Z TRACE log: Optimized physical plan by CombinePartialFinalAggregate:
OutputRequirementExec
  SortExec: expr=[time@1 ASC NULLS LAST]
    CoalescePartitionsExec
      ProjectionExec: expr=[cpu as iox::measurement, time@0 as time, (selector_last(sum_idle,time)@1).[value] as last, (selector_last(sum_system,time)@2).[value] as last_1]
        AggregateExec: mode=FinalPartitioned, gby=[time@0 as time], aggr=[selector_last(sum_idle,time), selector_last(sum_system,time)], ordering_mode=Sorted
          SortPreservingRepartitionExec: partitioning=Hash([time@0], 16), input_partitions=16, sort_exprs=time@0 ASC NULLS LAST
      ----->AggregateExec: mode=Partial, gby=[date_bin(10000000000, time@0, 0) as time], aggr=[selector_last(sum_idle,time), selector_last(sum_system,time)], ordering_mode=Sorted
              RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1
                SortExec: expr=[time@0 ASC NULLS LAST]
                  CoalescePartitionsExec
                    ProjectionExec: expr=[time@0 as time, SUM(cpu.usage_idle)@1 as sum_idle, SUM(cpu.usage_system)@2 as sum_system]
                      AggregateExec: mode=FinalPartitioned, gby=[time@0 as time], aggr=[SUM(cpu.usage_idle), SUM(cpu.usage_system)]
                        RepartitionExec: partitioning=Hash([time@0], 16), input_partitions=16
                          AggregateExec: mode=Partial, gby=[date_bin(10000000000, time@0, 0) as time], aggr=[SUM(cpu.usage_idle), SUM(cpu.usage_system)]
                            RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1
                              ProjectionExec: expr=[time@1 as time, usage_idle@2 as usage_idle, usage_system@3 as usage_system]
                                FilterExec: date_bin(10000000000, time@1, 0) <= 1698940686290451000 AND time@1 <= 1698940686290451000 AND cpu@0 = cpu-total
                                  ParquetExec: file_groups={1 group: [[2/8/0649f0e8b1abed092a356ec6181369fcf585431d1cc0694a0cc4ab45cf78b49d/0c5ac9b2-f6d4-4004-9036-15412da47647.parquet]]}, projection=[cpu, time, usage_idle, usage_system], predicate=date_bin(10000000000, time@2, 0) <= 1698940686290451000 AND time@2 <= 1698940686290451000 AND cpu@0 = cpu-total, pruning_predicate=time_min@0 <= 1698940686290451000 AND cpu_min@1 <= cpu-total AND cpu-total <= cpu_max@2

Then enforce sorting wouldn't remove the SortExec at the input of the pointed AggregateExec. However, apparently I am missing something.

@mustafasrepo mustafasrepo marked this pull request as draft November 10, 2023 09:52
@mustafasrepo
Copy link
Contributor Author

I marked this PR as draft. The reason is that, I have realized that we may not need to have implicit projection pass to have this support. If so, having implicit projection is a bit, cumbersome. I will try to implement alternative algorithm.

@alamb
Copy link
Contributor

alamb commented Nov 10, 2023

By the way, I want to stress that this PR is beneficial even if it doesn't solve the issue in #8043.

Yes, I 100% agree -- this is a very cool (and important feature).

BTW I have a fix for #8043 (see #8127)

# Conflicts:
#	datafusion/core/src/physical_optimizer/enforce_distribution.rs
#	datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
#	datafusion/physical-expr/src/equivalence.rs
#	datafusion/sqllogictest/test_files/groupby.slt
@mustafasrepo
Copy link
Contributor Author

This is complete in PR8281

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Internal error with repartitioning after equivalence consolidation
4 participants