Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for partition pruning in Delta checkpoint iterator #19588

Merged
merged 1 commit into from
Nov 16, 2023

Conversation

ebyhr
Copy link
Member

@ebyhr ebyhr commented Oct 31, 2023

Release notes

(x) Release notes are required, with the following suggested text:

# Delta Lake
* Improve performance when reading large checkpoint files on partitioned tables. ({issue}`issuenumber`)

@cla-bot cla-bot bot added the cla-signed label Oct 31, 2023
@github-actions github-actions bot added docs delta-lake Delta Lake connector labels Oct 31, 2023
@ebyhr ebyhr self-assigned this Oct 31, 2023
@ebyhr ebyhr force-pushed the ebi/delta-part-values-parsed branch 3 times, most recently from a4106b2 to 7c9ac69 Compare October 31, 2023 21:39
TupleDomain.withColumnDomains(ImmutableMap.of(intPartField, singleValue(BIGINT, 10L), stringPartField, singleValue(VARCHAR, utf8Slice("part1")))));
List<DeltaLakeTransactionLogEntry> entries = ImmutableList.copyOf(checkpointEntryIterator);

assertThat(entries).hasSize(2);
Copy link
Contributor

@findinpath findinpath Nov 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we have here only 1 entry?
Probably this relates to https://github.com/trinodb/trino/pull/19588/files/7c9ac692875bdb08827aa1dc9f7beac63a9874d4#r1383331077
We should have also the check to see that a reduced amount of entries has been actually read from the parquet file

        assertThat(checkpointEntryIterator.getCompletedPositions().orElseThrow()).isEqualTo(....);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When doing buildAddEntry check whether the partitionValues / partitionValues_parsed match the partitionConstraint and return null if not matching.

Map<DeltaLakeColumnHandle, Domain> enforcedDomains = enforcedPartitionConstraint.getDomains().orElseThrow();
if (!partitionMatchesPredicate(addAction.getCanonicalPartitionValues(), enforcedDomains)) {

@ebyhr
Copy link
Member Author

ebyhr commented Nov 8, 2023

Just rebased on master.

{
try {
if (isCheckpointPartitionFilterEnabled(session) && !partitionConstraint.isAll()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps remove && !partitionConstraint.isAll()

i think the new code path should eventually replace the old cache-based approach, so we can use isCheckpointPartitionFilterEnabled as a algorithm-selecting toggle

@ebyhr
Copy link
Member Author

ebyhr commented Nov 9, 2023

CI hit #19602

@ebyhr ebyhr marked this pull request as ready for review November 9, 2023 08:11
@@ -431,13 +458,16 @@ private DeltaLakeTransactionLogEntry buildAddEntry(ConnectorSession session, Blo
statsFieldIndex = 5;
}

Optional<DeltaLakeParquetFileStatistics> parsedStats = Optional.ofNullable(getRowField(addEntryRow, statsFieldIndex + 1)).map(this::parseStatisticsFromParquet);
boolean partitionValuesParsedExists = addEntryRow.getUnderlyingFieldBlock(statsFieldIndex + 1) instanceof RowBlock && // partitionValues_parsed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check this for every position ? Seems like we should know this per file based on parquet file metadata (maybe it's possible to use io.trino.plugin.hive.ReaderPageSource#getReaderColumns).

Copy link
Member Author

@ebyhr ebyhr Nov 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with using Parquet metadata though getReaderColumns returns an empty list in this case. Sent another PR #19727

nextEntries.add(entry);
if (entry.getAdd() != null) {
if (partitionConstraint.isAll() ||
partitionMatchesPredicate(entry.getAdd().getCanonicalPartitionValues(), partitionConstraint.getDomains().orElseThrow())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this may help in reducing the number of DeltaLakeTransactionLogEntry, doing the filtering after materialising all channels on each position of a page means that we can't benefit from lazy loading of blocks.
Ideally we should filter directly on the relevant block channels and skip to next position without decoding the remaining channels when the predicate does not match. But this can be looked at as a follow-up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct.
The partition matching check should be done directly in io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator#buildAddEntry

If we know that we have the field partitionValues_parsed (see https://github.com/trinodb/trino/pull/19588/files#r1389691135) , maybe we should do this check right away after doing

log.debug("Building add entry from %s pagePosition %d", block, pagePosition);
if (block.isNull(pagePosition)) {
return null;
}

optional: One word concerning using entry.getAdd().getCanonicalPartitionValues().
We have at hand the partitionValues_parsed. We could avoid deserializing the stringified partition values and use the "parsed" values directly. OTOH, we don't actually use the parsed partition values otherwise anywhere else. Did you intentionally restrain from reading the parsed partition values in favor of the stringified partition values?

nextEntries.add(entry);
if (entry.getAdd() != null) {
if (partitionConstraint.isAll() ||
partitionMatchesPredicate(entry.getAdd().getCanonicalPartitionValues(), partitionConstraint.getDomains().orElseThrow())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct.
The partition matching check should be done directly in io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator#buildAddEntry

If we know that we have the field partitionValues_parsed (see https://github.com/trinodb/trino/pull/19588/files#r1389691135) , maybe we should do this check right away after doing

log.debug("Building add entry from %s pagePosition %d", block, pagePosition);
if (block.isNull(pagePosition)) {
return null;
}

optional: One word concerning using entry.getAdd().getCanonicalPartitionValues().
We have at hand the partitionValues_parsed. We could avoid deserializing the stringified partition values and use the "parsed" values directly. OTOH, we don't actually use the parsed partition values otherwise anywhere else. Did you intentionally restrain from reading the parsed partition values in favor of the stringified partition values?

@ebyhr ebyhr force-pushed the ebi/delta-part-values-parsed branch from e73edbc to c0494e2 Compare November 13, 2023 02:05
@@ -3518,7 +3518,8 @@ private OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandl
private List<AddFileEntry> getAddFileEntriesMatchingEnforcedPartitionConstraint(ConnectorSession session, DeltaLakeTableHandle tableHandle)
{
TableSnapshot tableSnapshot = getSnapshot(session, tableHandle);
List<AddFileEntry> validDataFiles = transactionLogAccess.getActiveFiles(tableSnapshot, tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry(), session);
// TODO Consider passing DeltaLakeTableHandle.getEnforcedPartitionConstraint to getActiveFiles method
List<AddFileEntry> validDataFiles = transactionLogAccess.getActiveFiles(tableSnapshot, TupleDomain.all(), tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry(), session);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why TODO? why not do it right away?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just wanted to focus on SELECT path in this PR. Going to handle in this PR.

return addFileEntryStream.collect(toImmutableList());
}
return addFileEntryStream
.filter(addAction -> partitionMatchesPredicate(addAction.getCanonicalPartitionValues(), partitionConstraint.getDomains().orElseThrow()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The callers (eg split source) will likely repeat this work, so it's partially wasted.
Still useful because this allows us to materialize a shorter list.

I think this wouldn't be needed here if we could return a Stream/Iterator instead of a List.

@@ -112,7 +114,7 @@ public RowType getMetadataEntryType()
return metadataEntryType;
}

public RowType getAddEntryType(MetadataEntry metadataEntry, ProtocolEntry protocolEntry, boolean requireWriteStatsAsJson, boolean requireWriteStatsAsStruct)
public RowType getAddEntryType(MetadataEntry metadataEntry, ProtocolEntry protocolEntry, boolean requireWriteStatsAsJson, boolean requireWriteStatsAsStruct, boolean requirePartitionValuesParsed)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

require... or use... ?

we wnt to use use partitionvalues_parsed field if it is present, but we don't require that it exists (we don't fail when it doesn't), right?

@@ -156,6 +158,15 @@ public RowType getAddEntryType(MetadataEntry metadataEntry, ProtocolEntry protoc
if (requireWriteStatsAsJson) {
addFields.add(RowType.field("stats", VARCHAR));
}
if (requirePartitionValuesParsed) {
List<DeltaLakeColumnHandle> partitionColumns = extractPartitionColumns(metadataEntry, protocolEntry, typeManager);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The set of partitioning columns may change in the meantime probably only through the CREATE OR REPLACE TABLE operation. In such case, we shouldn't need to read the old checkpoint file at all, but I don't know whether this is the case.

@@ -111,7 +111,8 @@ public void write(CheckpointEntries entries, TrinoOutputFile outputFile)
RowType metadataEntryType = checkpointSchemaManager.getMetadataEntryType();
RowType protocolEntryType = checkpointSchemaManager.getProtocolEntryType(protocolEntry.getReaderFeatures().isPresent(), protocolEntry.getWriterFeatures().isPresent());
RowType txnEntryType = checkpointSchemaManager.getTxnEntryType();
RowType addEntryType = checkpointSchemaManager.getAddEntryType(entries.getMetadataEntry(), entries.getProtocolEntry(), writeStatsAsJson, writeStatsAsStruct);
// TODO https://github.com/trinodb/trino/issues/19586 Add support for writing 'partitionValues_parsed' field
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

( #19586 )

@@ -124,6 +124,10 @@ values. Typical usage does not require you to configure them.
* - `delta.checkpoint-row-statistics-writing.enabled`
- Enable writing row statistics to checkpoint files.
- `true`
* - ``delta.checkpoint-filtering.enabled``
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you pls test coverage into TestDeltaLakeFileOperations with checkpoint_filtering_enabled session property enabled to add more transparence in regards to the consequences coming with this change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Development

Successfully merging this pull request may close these issues.

4 participants