Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add aggregation pushdown support for count using Iceberg Metrics #15832

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

osscm
Copy link
Contributor

@osscm osscm commented Jan 25, 2023

Description

Iceberg maintains metrics of the data files in the Iceberg metadata. So, we can utilize these metrics to optimize the simple aggregate queries.
To achieve this, Query Engine needs to support aggregation pushdown at the connector layer. Trino connector framework supports aggregation pushdown but Iceberg and Hive connectors have not implemented it. Aggregation Pushdown.
If Iceberg connector can support it then we can use the Iceberg metrics to give some of the answers like count, min, and max with the partition filters. Currently, in Iceberg and Hive connectors, aggregations are handled at the engine level.
Important points
This is a single-threaded implementation and generates a single split to populate the page/data for the result.
We might be able to implement it in a distributed manner If an engine can support handling aggregation at the top level while handing the aggregation pushdown. Like how the engine handles count(*) by summing the result/count it gets from different workers.
This implementation will also not handle the situation where the count is not present for some files/partitions so that some metrics can be used partially, and for some data still, the metrics will be read from the actual data files.
My understanding is, that this would require the following feature to be done: #13534
It can handle positional delete but not equality-based deletes.

Additional context and related issues

ConnectorMetadata.applyAggregation(....) can result into projected column per aggregate function, if it needs to pushed down

applyAggregation is called by the QueryEngine in and latter on in the flow, it calls the ConnectorSplitManager to generate the splits.

IcebergSplitManager creates the IcebergSplitSource now it can create AggregateSplitSource as well.

AggregateSplitSource (like IcebergSplitSource) apply the partition filter and get the dataFiles needed, and then use the metrics. Now it can create AggregationSplit with the calculated count/metrics.

IcebergPageSourceProvider.createPageSource gets the split, now here if aggregation pushdown is used then create the AggregatePageSource based on the count/min/max available in the split.

This feature is controlled by session level flag aggregation_pushdown_enabled

Following type of queries can be benefitted

SELECT count(*) FROM tbl ;

SELECT count(*) FROM tbl WHERE part_col = 'foo' ;

SELECT count(*) FROM tbl WHERE part_date =DATE  '2023-01-10' ;
SELECT count(*) FROM tbl WHERE part_date BETWEEN DATE  '2023-01-10'  AND DATE  '2023-01-15' ;

Release notes

( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
( ) Release notes are required, with the following suggested text:

# Section
* Fix some things. ({issue}`15745`)

For #10974

@cla-bot cla-bot bot added the cla-signed label Jan 25, 2023
@osscm osscm marked this pull request as draft January 25, 2023 02:52
@osscm osscm requested a review from alexjo2144 January 25, 2023 02:53
@osscm
Copy link
Contributor Author

osscm commented Jan 25, 2023

To understand some of the class dependencies.

Image

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.iceberg.types.Conversions.fromByteBuffer;

public class AggregateSplitSource
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will/Can refactor to avoid the duplicate code of IcebergSplitSource, as it's based on it.
But first thought of getting feedback on the approach, then will do further refactoring.

@osscm osscm force-pushed the iceberg-agg-pushdown branch 4 times, most recently from 3046d5d to e64a256 Compare January 26, 2023 22:45
@osscm osscm force-pushed the iceberg-agg-pushdown branch from e64a256 to bc0df7e Compare February 8, 2023 17:40
@github-actions
Copy link

github-actions bot commented Mar 1, 2023

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua

@github-actions github-actions bot added the stale label Mar 1, 2023
@mosabua
Copy link
Member

mosabua commented Mar 2, 2023

@colebow should the stalebot ignore drafts maybe?

@osscm are you still working on this?

@bitsondatadev
Copy link
Member

@colebow should the stalebot ignore drafts maybe?

I would argue it should still look at these. Many drafts are started and abandoned. I've closed out some already that weren't going to be worked on.

@mosabua
Copy link
Member

mosabua commented Mar 2, 2023

Fair .. lets keep drafts in..

@github-actions github-actions bot removed the stale label Mar 2, 2023
@osscm
Copy link
Contributor Author

osscm commented Mar 3, 2023

@mosabua @bitsondatadev yes Im working on it.
@alexjo2144 and I had discussed about the initial design.
So, put together this draft, and wanted to get his feedback.

Alex, whenever you get time, can you please check.

@bitsondatadev
Copy link
Member

Thanks for that background @osscm, I'll flag Alex if he doesn't look at this within the next few days!

@osscm
Copy link
Contributor Author

osscm commented Mar 26, 2023

I'll check the conflicts.

@alexjo2144 thanks a lot for the discussions.
I have started the work to support the min/max functions, so it would be great if we can catchup on this one.

@osscm osscm requested a review from Parth-Brahmbhatt April 30, 2023 18:27
@osscm osscm marked this pull request as ready for review June 15, 2023 18:31
@osscm
Copy link
Contributor Author

osscm commented Jun 29, 2023 via email

@osscm osscm force-pushed the iceberg-agg-pushdown branch from bc0df7e to 0b5ccb4 Compare July 3, 2023 08:04
@github-actions github-actions bot added the iceberg Iceberg connector label Jul 3, 2023
@osscm osscm force-pushed the iceberg-agg-pushdown branch from 0b5ccb4 to 16cd7ee Compare July 7, 2023 07:36
@mosabua
Copy link
Member

mosabua commented Jul 7, 2023

Can you fix the checkstyle errors and the look at the test failure @osscm

Also @martint @findepi @alexjo2144 .. can you have a look at this

@vgankidi
Copy link

vgankidi commented Sep 12, 2023

Hi team, could you please confirm if the approach looks good here and you have time to review this. We have added support for additional aggregation pushdowns and would like to contribute that as well once this goes through. Once we have confirmation on the approach we can go ahead and rebase this again if someone can take a look at it. As we progress, constant rebasing takes time too, so would appreciate feedback on the overall approach first. cc @martint @findepi @electrum @alexjo2144 @osscm

@findepi
Copy link
Member

findepi commented Sep 20, 2023

for count(*) pushdown we might be able to do something simpler
for example see the code in Delta connector --

(regularColumns.isEmpty() || onlyRowIdColumn(regularColumns))) {
return new DeltaLakePageSource(
deltaLakeColumns,
ImmutableSet.of(),
partitionKeys,
partitionValues,
generatePages(split.getFileRowCount().get(), onlyRowIdColumn(regularColumns)),
Optional.empty(),
split.getPath(),
split.getFileSize(),
split.getFileModifiedTime(),
Optional::empty);
}

Comment on lines +791 to +773
assertQuery(clientSession, format("SELECT COUNT(*) from %s WHERE event_date = DATE '2022-11-02' AND country = 'FRA' AND state = 'Brittany'", tableName), "SELECT 1");
assertQuery(clientSession, format("SELECT COUNT(*) from %s WHERE event_date = DATE '2022-11-02' AND country = 'FRA' AND state = 'Brittany'", tableName), "SELECT 1");

assertQuery(clientSession, format("SELECT COUNT(*) from %s WHERE event_date = DATE '2022-11-02' AND country = 'USA' AND state = 'Corsica'", tableName), "SELECT 1");
assertQuery(format("SELECT COUNT(*) from %s WHERE event_date = DATE '2022-11-02' AND country = 'USA' AND state = 'Corsica'", tableName), "SELECT 1");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need some tests which validate that agg pushdown actually happened by looking at the explain plan?

@osscm osscm force-pushed the iceberg-agg-pushdown branch from f392621 to 72b2cda Compare April 14, 2024 00:53
@osscm osscm force-pushed the iceberg-agg-pushdown branch from c32b0d4 to 38b9fc3 Compare April 14, 2024 18:54
@osscm
Copy link
Contributor Author

osscm commented Apr 19, 2024

I don't think I'd use anything that's not spec'd out as a source of truth for determining if equality deletes exist.
@amogh-jahagirdar thanks for the giving time for the discussion.
it was nice talking to you.

As we discussed, we will go ahead with this property check, and you will also check with the Iceberg's team to add this property to be part of the Spec, as it is already being updated.
Snapshot Summary is already part of the spec.

I have also discussed it with @flyrain (Iceberg team), and he also gave +1 for this.
thanks @flyrain

@mosabua mosabua requested a review from dain May 1, 2024 20:49
Copy link

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua

@github-actions github-actions bot added the stale label May 23, 2024
@mosabua
Copy link
Member

mosabua commented May 24, 2024

Please rebase @osscm

Can @electrum and @findepi please figure out next steps.

@github-actions github-actions bot removed the stale label May 27, 2024
Copy link

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua

@github-actions github-actions bot added the stale label Jun 17, 2024
Copy link

github-actions bot commented Jul 8, 2024

Closing this pull request, as it has been stale for six weeks. Feel free to re-open at any time.

@github-actions github-actions bot closed this Jul 8, 2024
@mosabua mosabua reopened this Jul 8, 2024
@mosabua
Copy link
Member

mosabua commented Jul 8, 2024

We really need to get this in .. cc @alexjo2144 @findepi @findinpath @cwsteinbach @dain ..

I am going to add the stale-ignore label so we wont have it closed again.

@mosabua mosabua added stale-ignore Use this label on PRs that should be ignored by the stale bot so they are not flagged or closed. and removed stale labels Jul 8, 2024
@sopel39
Copy link
Member

sopel39 commented Aug 5, 2024

Given discussion in https://trinodb.slack.com/archives/CJ6UC075E/p1722544669964209 I wonder how we can proceed here. cc @dain @findepi @raunaqmorarka
It seems Iceberg spec explicitly allows deviation from accurate https://iceberg.apache.org/spec/#manifests stats. I wonder if we can detect that the stats are in fact correct. Maybe we miss something like https://issues.apache.org/jira/browse/PARQUET-2352?

@sopel39
Copy link
Member

sopel39 commented Aug 7, 2024

@findepi @dain would it be fine to land it behind feature flag (disabled by default)? Additionally, we would like to implement similar logic as for Spark (https://github.com/apache/iceberg/blob/main/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java#L309). It's not perfect, so in the meantime we could work with Iceberg community to have more firm spec for column stats accuracy. Once this is done, we could enable it by default.

@findepi
Copy link
Member

findepi commented Aug 8, 2024

@sopel39 there are basic unresolved comments at this point, so there is probably no point in another review round, least to talk about landing.
Also, it's not clear we've settled on the strategy how this should work on the logical level. For example, do we want to violate iceberg spec and take upper bound as max value? Or do we want to use lower/upper bounds for pruning only. The latter could typically be similarly efficient but without violating iceberg spec, so would pave the way to eventually enable the feature without a flag.

@sopel39
Copy link
Member

sopel39 commented Aug 8, 2024

Also, it's not clear we've settled on the strategy how this should work on the logical level.

@findepi that's exactly what I mean rather than landing without another review round.

For example, do we want to violate iceberg spec and take upper bound as max value?

It seems Iceberg itself is violating it's own spec with Spark unfortunately regarding to min/max stats

Or do we want to use lower/upper bounds for pruning only. The latter could typically be similarly efficient but without violating iceberg spec

Don't we prune already using Iceberg stats? Anyways, pruning and agg pushdown are completely different things.

Also, we talked about min/max but this PR is abount count. According to spec count stats are indeed accurate:
https://iceberg.apache.org/spec/#manifest-lists

513 existing_rows_count

Number of rows in all of files in the manifest that have status EXISTING, when null this is assumed to be non-zero

So we should be good to just have it enabled for counts for the moment

@findepi
Copy link
Member

findepi commented Aug 8, 2024

Don't we prune already using Iceberg stats? Anyways, pruning and agg pushdown are completely different things.

Yes to both

Also, we talked about min/max but this PR is abount count. According to spec count stats are indeed accurate:

You're right. I will copy #15832 (comment) here:

Agreed. However, we need to understand what the approach would look like to support other aggregations than count, since the input information, and guarantees, is so much different.

@sopel39
Copy link
Member

sopel39 commented Aug 8, 2024

Agreed. However, we need to understand what the approach would look like to support other aggregations than count, since the input information, and guarantees, is so much different.

I think to get guarantees for other stats Iceberg spec has to be updated. My suggestion would be some kind of optional field in the metadata itself specifying if stats are estimates or not. cc @osscm

@findepi
Copy link
Member

findepi commented Aug 8, 2024

I think to get guarantees for other stats Iceberg spec has to be updated. My suggestion would be some kind of optional field in the metadata itself specifying if stats are estimates or not

Just like @raunaqmorarka did for Parquet footer. i like the idea.

Note that it's not must-have to do some aggregation pushdown on top of iceberg.
It's just a must-have for a particular approach we seem to be thinking about.
But we can do agg pushdown without changes to Iceberg spec.

@sopel39
Copy link
Member

sopel39 commented Aug 13, 2024

@findepi I've create Iceberg spec proposal: apache/iceberg#10930

@mosabua
Copy link
Member

mosabua commented Aug 20, 2024

@findepi I've create Iceberg spec proposal: apache/iceberg#10930

fyi @cwsteinbach and @alexjo2144

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed iceberg Iceberg connector stale-ignore Use this label on PRs that should be ignored by the stale bot so they are not flagged or closed.
Development

Successfully merging this pull request may close these issues.

8 participants