Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the performance of Min/Max/Count utilizing Iceberg table metrics #10974

Open
fengguangyuan opened this issue Feb 8, 2022 · 25 comments
Open
Assignees
Labels
enhancement New feature or request performance

Comments

@fengguangyuan
Copy link

fengguangyuan commented Feb 8, 2022

Purpose

This issue is aimed to have a basic optimize rule for min/max/count queries on the connectors having accurate table/partitions/columns statistics, like Iceberg composed of Orc/Parquet files.

Reason

Nowadays, most storage engines or self-describing files are storing table level/partition level/column level statistics to supply a more effective ability of data retrieval, e.g. Iceberg, hive.

We know that Iceberg is now supporting Orc/Parquet files, of which table metrics are aggregated from each data file, therefore it's table metrics is trustworthy for calculating min(T)/max(T)/count(T)/count(*), no matter the stored data is written by Trino or Spark, hence we can manually construct the results, for the queries only with min/max/count aggregations, from metadata.

For example, for query select count(x) from test, if column x has precomputed statistics with 2 total rows, 0 null values and [0, 9] range, the query could be rewritten to select 2, in which 2 is the difference between total rows and nulls count.

Conclusion

Trino should supply an optimize rule to rewrite the queries from metadata, doing the stuff like hive-2847. Obviously, this optimize rule is adaptive to the simple queries without complex syntax, such as group by, distinct, join etc.

Now we had a basic implementation on this issue and tested on Iceberg connector (instead on all connectors considering the statistics maybe inaccurate), if Trino expect this improvement, please let me know, and it's my pleasure to make a PR.

@findepi findepi added enhancement New feature or request performance labels Feb 8, 2022
@findepi
Copy link
Member

findepi commented Feb 8, 2022

This shouldn't be implemented as an optimizer rule. Instead, Iceberg connector should support applyAggregation.
Things to keep in mind

  • some files may have stats, but some files may be missing stats for a column (see eg Support Metrics mode when creating/writing Iceberg tables #9938); during planning time we would need to inspect all the files and verify availability of the information
    • ideally we should support "mixed mode" where some files are skipped (e.g. we have max for given column) and some files are still processed (we don't have max). Today this would require implementing the aggregations on the connector side, but would be nice if the SPI allowed the connector and the engine to cooperate. For example, a connector would tell the SPI it's accepting the pushdown, but will return "partial aggregation" of some sort. cc @sopel39 @martint @losipiuk
  • min and max are not guaranteed to be exact.
    • eg varchars can be truncated
    • timestamp values are rounded up or down to millisecond precision (in case of ORC)
    • @rdblue does the spec require min/max to be exact otherwise? or are they defined to be some lower and upper bound only?
  • the count is the easiest, as it doesn't suffer from exactness doubts, but it's also probably least important. count(*) requires reading no columns, so only file metadata is touched. Room for improvement still.

cc @alexjo2144 @homar

@hashhar
Copy link
Member

hashhar commented Feb 8, 2022

  • ideally we should support "mixed mode" where some files are skipped (e.g. we have max for given column) and some files are still processed (we don't have max). Today this would require implementing the aggregations on the connector side, but would be nice if the SPI allowed the connector and the engine to cooperate. For example, a connector would tell the SPI it's accepting the pushdown, but will return "partial aggregation" of some sort. cc @sopel39 @martint @losipiuk

See also #10964 which could benefit with a similar concept.

@fengguangyuan
Copy link
Author

fengguangyuan commented Feb 8, 2022

@findepi Thanks for your reply. Yes, indeed what you mentioned is the key points, I don't agree that any more, and that's why I said Limitations.

There two ways to do the optimization, possibly I thought:

  1. As rule: Same as ShowStatsRewrite, just extracting aggregated values from stats, if the stats are not Null or NaN, and the columns stats are reliable.
  2. As work flow: The complex working flow is as what you said, mix-mode, connector should be responsible for aggregating data, may need Local Aggregation in each split (simply regarded as a data file), so that no matter the file has the stats or not, the accurate stats can be computed or extract from each data file adaptively. Therefore more works need on SPI side.

But the two approaches are both based on the assumption: if the expected stats are not NULLs or NaNs, they should be correct, at least for the Iceberg connector, otherwise they are not reliable and should be calculated from the real data.

Considering the implementation complexity, we just simply implemented the easier approach, which will skip rewriting plan once found the unreliable stats or found min/max on the non-numeric columns (except timestamp type), because Trino only carries double ranges, then the plan can still aggregating the real data, so the correctness could be guaranteed, no matter on table level or column level.

After all, the rule approach is the cheapest, while the mix-mode approach is the ideal, but need much more nuts to crack. :)

@findepi
Copy link
Member

findepi commented Feb 8, 2022

which will skip rewriting plan once found the unreliable stats or found min/max on the non-numeric columns (expect timestamp type), because Trino only carries double ranges

oh, you mean base the logic on io.trino.spi.statistics.ColumnStatistics?
that's not the write API, as statistics are defined to allow them to be inexact, also for numeric types.

per #18, the API to use for aggregation pushdown is the ConnectorMetadata#applyAggregation.

@fengguangyuan
Copy link
Author

oh, you mean base the logic on io.trino.spi.statistics.ColumnStatistics?
that's not the write API, as statistics are defined to allow them to be inexact, also for numeric types.

Yep, this optimize is only for queries, based on ColumnStatistics.

per #18, the API to use for aggregation pushdown is the ConnectorMetadata#applyAggregation.

Thanks for the tips, but I think the mix-mode implementation is far more than this one interface.

So you guys prefer the mix-mode implementation? :)

@osscm
Copy link
Contributor

osscm commented Sep 29, 2022

@findepi can you please share any update on this, are we planning to use the min/max if present, for the query optimization? thanks

@alexjo2144
Copy link
Member

alexjo2144 commented Sep 29, 2022

I would probably start with an implementation for applyAggregation for count(*). Min/max are going to have the problem of making sure that Parquet/ORC's sorting is the same as what we want for Trino sorting, but count should be a bit simpler. As long as all files report row count stats in the manifest, we don't have to read any data files. (as long as there are no unenforced predicates)

@findepi
Copy link
Member

findepi commented Sep 29, 2022

Min/max are going to have the problem of making sure that Parquet/ORC's sorting is the same as what we want for Trino sorting

These must not be different, otherwise predicate pushdown would be totally wrong.

But min/max can be inaccurate (varchars truncated, timestamps rounded).

As long as all files report row count stats in the manifest, we don't have to read any data files.

When some files have row count in the manifest, but some do not, microplans could be useful -- #13534

@alexjo2144
Copy link
Member

These must not be different, otherwise predicate pushdown would be totally wrong.

IIRC in Delta we had to skip using min/max values for pushdown of Double types in certain situations.

@findepi
Copy link
Member

findepi commented Sep 30, 2022

You mean this

if (isNotFinite(minValue, column.getType())) {
minValue = Optional.empty();
}
Optional<Object> maxValue = stats.getMaxColumnValue(deltaLakeColumnHandle);
if (maxValue.isPresent() && isFloatingPointNaN(column.getType(), maxValue.get())) {
return allValues(column.getType(), hasNulls);
}
if (isNotFinite(maxValue, column.getType())) {
maxValue = Optional.empty();
}
if (minValue.isPresent() && maxValue.isPresent()) {
return Domain.create(
ofRanges(range(column.getType(), minValue.get(), true, maxValue.get(), true)),
hasNulls);
}
if (minValue.isPresent()) {
return Domain.create(ofRanges(greaterThanOrEqual(column.getType(), minValue.get())), hasNulls);
}
return maxValue
.map(value -> Domain.create(ofRanges(lessThanOrEqual(column.getType(), value)), hasNulls))
.orElseGet(() -> Domain.all(column.getType()));
}
private static boolean isNotFinite(Optional<Object> value, Type type)
{
if (type.equals(DOUBLE)) {
return value
.map(Double.class::cast)
.filter(val -> !Double.isFinite(val))
.isPresent();
}
if (type.equals(REAL)) {
return value
.map(Long.class::cast)
.map(Math::toIntExact)
.map(Float::intBitsToFloat)
.filter(val -> !Float.isFinite(val))
.isPresent();
}
return false;
}
?

it's not strictly because of ordering (as in ORDER BY). Rather, it's because of semantics of double comparisons with NaN (they 5 < NaN and 5 >= NaN are both false). This impacts refusal of a Domain to handle NaNs (these would need to be handled explicitly, just like NULLs).

@osscm
Copy link
Contributor

osscm commented Oct 11, 2022

@findepi @alexjo2144 even if the aggregation like count can be pushed down and use the metrics (if available) then it will be very useful.

So, shall we start with supporting aggregate push down for Iceberg and handle the case for count?

@osscm
Copy link
Contributor

osscm commented Nov 6, 2022

I would probably start with an implementation for applyAggregation for count(*). Min/max are going to have the problem of making sure that Parquet/ORC's sorting is the same as what we want for Trino sorting, but count should be a bit simpler. As long as all files report row count stats in the manifest, we don't have to read any data files. (as long as there are no unenforced predicates)

@alexjo2144 I have started working on a PR for the count(*) support. thanks!

@ahshahid
Copy link

Hi. I am new to iceberg and I was also thinking on similar lines , though from a different perspective.
Currently spark allows Dynamic Partition Pruning and the underlying data sources return the filter columns only if they are partitioned.
If we allow non partition cols also participate in DPP, then the DPP query becomes expensive and is not worth.
I am wondering if the DPP query is lightweight & approximate ( needing only the min/max values for non partition column as joining key) , would we see benefit.

@osscm
Copy link
Contributor

osscm commented Dec 5, 2022

Quick update:
Refining some of the work in the local, then will start WIP, as will continue to add test-cases etc..

@osscm
Copy link
Contributor

osscm commented Dec 5, 2022

Hi. I am new to iceberg and I was also thinking on similar lines , though from a different perspective.
Currently spark allows Dynamic Partition Pruning and the underlying data sources return the filter columns only if they are partitioned.
If we allow non partition cols also participate in DPP, then the DPP query becomes expensive and is not worth.
I am wondering if the DPP query is lightweight & approximate ( needing only the min/max values for non partition column as joining key) , would we see benefit.

@ahshahid
Sorry I didnt get everything.
Are you trying to ask, if we can use Iceberg metrics/metadata's min/max for DPP or Joins?
May be if you can please share some example here?

thanks

@ahshahid
Copy link

ahshahid commented Dec 6, 2022

@osscm Well what i meant was that if this PR is able to provide max min support at iceberg level using the stats ( atleast in simple scenarios), then it may be possible to leverage this for using Dynamic Partition Pruning (DPP) mechanism of spark to work for non partition columns too. Right now when I tried to make use of DPP for non partition col( by modifying iceberg code), the perf degraded as the cost of DPP query is too high. But if the max/min gets evlauted using stats of manifest files, then possibly cost of dpp query for non partition cols can be brough down..

@osscm
Copy link
Contributor

osscm commented Jan 14, 2023

@ahshahid I'm afraid not, I think you are talking about the the output of SHOW STATS FOR <tlb>.
This PR will target queries like
SELECT count(*) FROM <tbl> WHERE part1=1
and subsequent PRs will be for min/max as well.

Right now stats output looks like this (after running ANALYZE), not sure about some of the columns lower/higher values are coming as null.

show stats FOR sample_partitionedv2;
 column_name | data_size | distinct_values_count | nulls_fraction | row_count | low_value  | high_value 
-------------+-----------+-----------------------+----------------+-----------+------------+------------
 userid      |      NULL |                   3.0 |            0.0 |      NULL | -3         | -1         
 country     |     332.0 |                   2.0 |            0.0 |      NULL | NULL       | NULL       
 event_date  |      NULL |                   2.0 |            0.0 |      NULL | 2022-01-01 | 2022-11-01 
 city        |     332.0 |                   1.0 |            0.0 |      NULL | NULL       | NULL       

@osscm
Copy link
Contributor

osscm commented Jan 17, 2023

Started a issue to target count(*) first.
#15745

@fengguangyuan
Copy link
Author

Quick update: Refining some of the work in the local, then will start WIP, as will continue to add test-cases etc..
Thanks for you first sub task of this BIG issue.

Anyway, I will post a rough implementation from my branch recently, for min/max/count mixed queries, and hope it help us to hackle the possibilities and impossibilities of this IDEAR.
:)

@osscm
Copy link
Contributor

osscm commented Feb 17, 2023 via email

@fengguangyuan
Copy link
Author

Just FYI, I have started a PR for count agg pushdown. Regards, Manish

On Fri, Feb 17, 2023 at 2:09 AM fengguangyuan @.> wrote: Quick update: Refining some of the work in the local, then will start WIP, as will continue to add test-cases etc.. Thanks for you first sub task of this BIG issue. Anyway, I will post a rough implementation from my branch recently, for min/max/count mixed queries, and hope it help us to hackle the possibilities and impossibilities of this IDEAR. :) — Reply to this email directly, view it on GitHub <#10974 (comment)>, or unsubscribe https://github.com/notifications/unsubscribe-auth/AXQ2PYQJQ4EEUHYYLC6PW3LWX5E4RANCNFSM5NZQ4W7A . You are receiving this because you were mentioned.Message ID: @.>

Thanks, that's great. It's my pleasure to know that you have been working on count. I will try to figure out the possibilities of other aggs.

@atifiu
Copy link
Contributor

atifiu commented Oct 13, 2023

@osscm What is the status of iceberg aggregate pushdown ? Are you still working on count pushdown ? Is other aggregate function like min/max is also being worked upon by someone ?

@findepi
Copy link
Member

findepi commented Oct 13, 2023

@atifiu there is a simpler proposal for count(*) handling here: #19303
the min/max has a potential of correctness issues due to bounds not being exact min/max values

@atifiu
Copy link
Contributor

atifiu commented Oct 13, 2023

Ok got it. I am aware about that issue. Thanks

@findepi
Copy link
Member

findepi commented Oct 16, 2023

FWIW #19303 should improve performance for count(*) queries (without a filter, or when filtering over partition values).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request performance
Development

No branches or pull requests

7 participants