Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prune unused stats columns when reading Delta checkpoint #19848

Merged

Conversation

findinpath
Copy link
Contributor

@findinpath findinpath commented Nov 21, 2023

Description

Follow-up work from #19588 used to retrieve from the checkpoint file only the add file statistics for the columns projected in the query.

This change can save up CPU and IO time from deserializing unnecessary add.stats_parsed.... columns in a Delta Lake query.

Used for testing a multi-part checkpoint file (25 parts , each around 12MB ~ 300MB in total) for testing this feature while storing the checkpoint in local MinIO and came up with the following results:

ADD without any checkpoint filtering
number of add entries: Optional[1235155]
checkpoint iterator completed positions: Optional[1235157]
checkpoint iterator completed bytes: Optional[323227977]
Elapsed Time in milliseconds: 20031

ADD with checkpoint filtering (no partition pruning) stats Projection
number of add entries: Optional[1235155]
checkpoint iterator completed positions : Optional[1235157]
checkpoint iterator completed bytes: Optional[290305358]
Elapsed Time in milliseconds: 12899

ADD with checkpoint filtering with partition pruning without stats projection
number of add entries: Optional[18578]
checkpoint iterator completed positions: Optional[49290]
checkpoint iterator completed bytes: Optional[14099674]
Elapsed Time in milliseconds: 1011

ADD with checkpoint filtering with partition pruning with stats projection
number of add entries: Optional[18578]
checkpoint iterator completed positions: Optional[49290]
checkpoint iterator completed bytes: Optional[13106732]
Elapsed Time in milliseconds: 802

As can be seen on the listing above, compared to the partition pruning change from #19588 , the change here does not provide an explosive improvement in efficiency, but it manages to shave off a noticeable wait time in processing the checkpoint.

Fixes #19733

Additional context and related issues

Same as in #19588 , the add stats projection functioanlity is effective only when the session setting checkpoint_filtering_enabled is set to true

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
(x) Release notes are required, with the following suggested text:

# Delta Lake
* Improve performance when reading large checkpoint files on partitioned tables. ({issue}`issuenumber`)

@findinpath findinpath self-assigned this Nov 21, 2023
@findinpath findinpath added the delta-lake Delta Lake connector label Nov 21, 2023
@cla-bot cla-bot bot added the cla-signed label Nov 21, 2023
@findinpath findinpath force-pushed the findinpath/filter-checkpoint-add-stats branch 4 times, most recently from a934893 to cd7a3a3 Compare November 22, 2023 10:49
@findinpath findinpath marked this pull request as ready for review November 22, 2023 10:49
@@ -600,10 +699,14 @@ private CheckpointEntryIterator createCheckpointEntryIterator(
metadataEntry,
protocolEntry,
new FileFormatDataSourceStats(),
new ParquetReaderConfig().toParquetReaderOptions(),
new ParquetReaderConfig()
.setMaxBufferSize(DataSize.ofBytes(500))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes were necessary to be able to showcase the effectiveness of the stats_parsed projection functionality.

Copy link
Contributor

@jkylling jkylling left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for adding this!

I mostly have nitpicks for you :)

We could perhaps just pass the underlying set for addStatsColumnFilter around, instead of a function. Then it could be renamed projectedColumns. Maybe we could even pass the projectedColumns to the CheckpointEntryIterator. That would set us up for supporting statistics projection of non-base columns in the future.

Perhaps unrelated to this PR, but it might be good to have a test case which tests performance of checkpoint reads from tables with a very wide schema (e.g., internally we have a table with 276 columns).

@findinpath findinpath force-pushed the findinpath/filter-checkpoint-add-stats branch from cd7a3a3 to ac0d251 Compare November 23, 2023 10:24
@findinpath
Copy link
Contributor Author

We could perhaps just pass the underlying set for addStatsColumnFilter around, instead of a function. Then it could be renamed projectedColumns

My initial version of the code contained projectedColumns all the way down to CheckpointEntryIterator. However semantically it looked weird to deal with Optional.empty() for the situation where all the columns are to be retrieved.

See in TransactionLogAccess

    @Deprecated
    public List<AddFileEntry> getActiveFiles(TableSnapshot tableSnapshot, MetadataEntry metadataEntry, ProtocolEntry protocolEntry, ConnectorSession session)
    {
        return retrieveActiveFiles(tableSnapshot, metadataEntry, protocolEntry, TupleDomain.all(), Optional.empty(), session);
    }

It seems weird to pass Optional.empty() when we intend to select all the columns.

if (projectedColumns.isPresent()) {
Set<String> baseColumnNames = projectedColumns.get().stream()
.filter(DeltaLakeColumnHandle::isBaseColumn) // Only base column stats are supported
.map(DeltaLakeColumnHandle::getColumnName)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normalized (lowercased) or not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using here the original column names - e.g a_NuMbEr instead of a_number.

See corresponding test io.trino.plugin.deltalake.TestDeltaLakeBasic#testCheckpointFilteringForParsedStatsWithCaseSensitiveColumnNames and test resource: databricks133/parsed_stats_case_sensitive/_delta_log/00000000000000000002.checkpoint.parquet

    optional group stats_parsed {
      optional int64 numRecords;
      optional group minValues {
        optional int32 a_NuMbEr;
        optional binary a_StRiNg (STRING);
      }

@findinpath findinpath force-pushed the findinpath/filter-checkpoint-add-stats branch 2 times, most recently from e27c336 to b425db6 Compare November 26, 2023 03:33
@findepi findepi changed the title Add support for stats projection in Delta checkpoint iterator Prune unused stats columns when reading Delta checkpoint Nov 27, 2023
Add support for stats projection in Delta checkpoint iterator
@findepi findepi force-pushed the findinpath/filter-checkpoint-add-stats branch from b425db6 to 5478483 Compare November 27, 2023 16:32
@findepi findepi merged commit f6f7646 into trinodb:master Nov 27, 2023
3 of 10 checks passed
@github-actions github-actions bot added this to the 434 milestone Nov 27, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed delta-lake Delta Lake connector
Development

Successfully merging this pull request may close these issues.

Delta Lake: Read from the checkpoint only the statistics relevant to the query
4 participants