Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for partition pruning in Delta checkpoint iterator #19588

Merged
merged 1 commit into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/src/main/sphinx/connector/delta-lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ values. Typical usage does not require you to configure them.
* - `delta.checkpoint-row-statistics-writing.enabled`
- Enable writing row statistics to checkpoint files.
- `true`
* - ``delta.checkpoint-filtering.enabled``
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you pls test coverage into TestDeltaLakeFileOperations with checkpoint_filtering_enabled session property enabled to add more transparence in regards to the consequences coming with this change?

- Enable partition pruning when reading checkpoint files.
The equivalent catalog session property is ``checkpoint_filtering_enabled``.
- ``false``
* - `delta.dynamic-filtering.wait-timeout`
- Duration to wait for completion of [dynamic
filtering](/admin/dynamic-filtering) during split generation. The equivalent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class DeltaLakeConfig
private boolean unsafeWritesEnabled;
private boolean checkpointRowStatisticsWritingEnabled = true;
private long defaultCheckpointWritingInterval = 10;
private boolean checkpointFilteringEnabled;
private Duration vacuumMinRetention = new Duration(7, DAYS);
private Optional<String> hiveCatalogName = Optional.empty();
private Duration dynamicFilteringWaitTimeout = new Duration(0, SECONDS);
Expand Down Expand Up @@ -269,6 +270,18 @@ public long getDefaultCheckpointWritingInterval()
return defaultCheckpointWritingInterval;
}

public boolean isCheckpointPartitionFilterEnabled()
{
return checkpointFilteringEnabled;
}

@Config("delta.checkpoint-filtering.enabled")
public DeltaLakeConfig setCheckpointPartitionFilterEnabled(boolean checkpointFilteringEnabled)
{
this.checkpointFilteringEnabled = checkpointFilteringEnabled;
return this;
}

@NotNull
public Duration getVacuumMinRetention()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3518,7 +3518,7 @@ private OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandl
private List<AddFileEntry> getAddFileEntriesMatchingEnforcedPartitionConstraint(ConnectorSession session, DeltaLakeTableHandle tableHandle)
{
TableSnapshot tableSnapshot = getSnapshot(session, tableHandle);
List<AddFileEntry> validDataFiles = transactionLogAccess.getActiveFiles(tableSnapshot, tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry(), session);
List<AddFileEntry> validDataFiles = transactionLogAccess.getActiveFiles(tableSnapshot, tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry(), tableHandle.getEnforcedPartitionConstraint(), session);
TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint = tableHandle.getEnforcedPartitionConstraint();
if (enforcedPartitionConstraint.isAll()) {
return validDataFiles;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public final class DeltaLakeSessionProperties
public static final String LEGACY_CREATE_TABLE_WITH_EXISTING_LOCATION_ENABLED = "legacy_create_table_with_existing_location_enabled";
private static final String PROJECTION_PUSHDOWN_ENABLED = "projection_pushdown_enabled";
private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required";
private static final String CHECKPOINT_FILTERING_ENABLED = "checkpoint_filtering_enabled";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -202,6 +203,11 @@ public DeltaLakeSessionProperties(
QUERY_PARTITION_FILTER_REQUIRED,
"Require filter on partition column",
deltaLakeConfig.isQueryPartitionFilterRequired(),
false),
booleanProperty(
CHECKPOINT_FILTERING_ENABLED,
"Use filter in checkpoint reader",
deltaLakeConfig.isCheckpointPartitionFilterEnabled(),
false));
}

Expand Down Expand Up @@ -306,4 +312,9 @@ public static boolean isQueryPartitionFilterRequired(ConnectorSession session)
{
return session.getProperty(QUERY_PARTITION_FILTER_REQUIRED, Boolean.class);
}

public static boolean isCheckpointFilteringEnabled(ConnectorSession session)
{
return session.getProperty(CHECKPOINT_FILTERING_ENABLED, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private Stream<DeltaLakeSplit> getSplits(
{
TableSnapshot tableSnapshot = deltaLakeTransactionManager.get(transaction, session.getIdentity())
.getSnapshot(session, tableHandle.getSchemaTableName(), tableHandle.getLocation(), tableHandle.getReadVersion());
List<AddFileEntry> validDataFiles = transactionLogAccess.getActiveFiles(tableSnapshot, tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry(), session);
List<AddFileEntry> validDataFiles = transactionLogAccess.getActiveFiles(tableSnapshot, tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry(), tableHandle.getEnforcedPartitionConstraint(), session);
TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint = tableHandle.getEnforcedPartitionConstraint();
TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint = tableHandle.getNonPartitionConstraint();
Domain pathDomain = getPathDomain(nonPartitionConstraint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoInputFile;
import io.trino.parquet.ParquetReaderOptions;
import io.trino.plugin.deltalake.DeltaLakeColumnHandle;
import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator;
import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointSchemaManager;
import io.trino.plugin.deltalake.transactionlog.checkpoint.LastCheckpoint;
Expand All @@ -26,6 +27,7 @@
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.TypeManager;

import java.io.FileNotFoundException;
Expand Down Expand Up @@ -178,7 +180,8 @@ public Stream<DeltaLakeTransactionLogEntry> getCheckpointTransactionLogEntries(
TypeManager typeManager,
TrinoFileSystem fileSystem,
FileFormatDataSourceStats stats,
Optional<MetadataAndProtocolEntry> metadataAndProtocol)
Optional<MetadataAndProtocolEntry> metadataAndProtocol,
TupleDomain<DeltaLakeColumnHandle> partitionConstraint)
throws IOException
{
if (lastCheckpoint.isEmpty()) {
Expand Down Expand Up @@ -206,7 +209,8 @@ public Stream<DeltaLakeTransactionLogEntry> getCheckpointTransactionLogEntries(
typeManager,
stats,
checkpoint,
checkpointFile)));
checkpointFile,
partitionConstraint)));
}
return resultStream;
}
Expand All @@ -225,7 +229,8 @@ private Iterator<DeltaLakeTransactionLogEntry> getCheckpointTransactionLogEntrie
TypeManager typeManager,
FileFormatDataSourceStats stats,
LastCheckpoint checkpoint,
TrinoInputFile checkpointFile)
TrinoInputFile checkpointFile,
TupleDomain<DeltaLakeColumnHandle> partitionConstraint)
throws IOException
{
long fileSize;
Expand All @@ -247,7 +252,8 @@ private Iterator<DeltaLakeTransactionLogEntry> getCheckpointTransactionLogEntrie
stats,
parquetReaderOptions,
checkpointRowStatisticsWritingEnabled,
domainCompactionThreshold);
domainCompactionThreshold,
partitionConstraint);
}

public record MetadataAndProtocolEntry(MetadataEntry metadataEntry, ProtocolEntry protocolEntry)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.TrinoInputFile;
import io.trino.parquet.ParquetReaderOptions;
import io.trino.plugin.deltalake.DeltaLakeColumnHandle;
import io.trino.plugin.deltalake.DeltaLakeColumnMetadata;
import io.trino.plugin.deltalake.DeltaLakeConfig;
import io.trino.plugin.deltalake.transactionlog.TableSnapshot.MetadataAndProtocolEntry;
Expand All @@ -39,6 +40,7 @@
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.BooleanType;
import io.trino.spi.type.MapType;
Expand Down Expand Up @@ -75,6 +77,8 @@
import static io.airlift.slice.SizeOf.instanceSize;
import static io.trino.cache.CacheUtils.invalidateAllIf;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isCheckpointFilteringEnabled;
import static io.trino.plugin.deltalake.DeltaLakeSplitManager.partitionMatchesPredicate;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.readLastCheckpoint;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogJsonEntryPath;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.ADD;
Expand Down Expand Up @@ -219,9 +223,20 @@ public MetadataEntry getMetadataEntry(TableSnapshot tableSnapshot, ConnectorSess
.orElseThrow(() -> new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Metadata not found in transaction log for " + tableSnapshot.getTable()));
}

@Deprecated
public List<AddFileEntry> getActiveFiles(TableSnapshot tableSnapshot, MetadataEntry metadataEntry, ProtocolEntry protocolEntry, ConnectorSession session)
{
return getActiveFiles(tableSnapshot, metadataEntry, protocolEntry, TupleDomain.all(), session);
}

public List<AddFileEntry> getActiveFiles(TableSnapshot tableSnapshot, MetadataEntry metadataEntry, ProtocolEntry protocolEntry, TupleDomain<DeltaLakeColumnHandle> partitionConstraint, ConnectorSession session)
{
try {
if (isCheckpointFilteringEnabled(session)) {
return loadActiveFiles(tableSnapshot, metadataEntry, protocolEntry, partitionConstraint, session).stream()
.collect(toImmutableList());
}

TableVersion tableVersion = new TableVersion(new TableLocation(tableSnapshot.getTable(), tableSnapshot.getTableLocation()), tableSnapshot.getVersion());

DeltaLakeDataFileCacheEntry cacheEntry = activeDataFileCache.get(tableVersion, () -> {
Expand Down Expand Up @@ -249,7 +264,7 @@ public List<AddFileEntry> getActiveFiles(TableSnapshot tableSnapshot, MetadataEn
}
}

List<AddFileEntry> activeFiles = loadActiveFiles(tableSnapshot, metadataEntry, protocolEntry, session);
List<AddFileEntry> activeFiles = loadActiveFiles(tableSnapshot, metadataEntry, protocolEntry, TupleDomain.all(), session);
return new DeltaLakeDataFileCacheEntry(tableSnapshot.getVersion(), activeFiles);
});
return cacheEntry.getActiveFiles();
Expand All @@ -259,7 +274,12 @@ public List<AddFileEntry> getActiveFiles(TableSnapshot tableSnapshot, MetadataEn
}
}

private List<AddFileEntry> loadActiveFiles(TableSnapshot tableSnapshot, MetadataEntry metadataEntry, ProtocolEntry protocolEntry, ConnectorSession session)
private List<AddFileEntry> loadActiveFiles(
TableSnapshot tableSnapshot,
MetadataEntry metadataEntry,
ProtocolEntry protocolEntry,
TupleDomain<DeltaLakeColumnHandle> partitionConstraint,
ConnectorSession session)
{
List<Transaction> transactions = tableSnapshot.getTransactions();
try (Stream<DeltaLakeTransactionLogEntry> checkpointEntries = tableSnapshot.getCheckpointTransactionLogEntries(
Expand All @@ -269,8 +289,12 @@ private List<AddFileEntry> loadActiveFiles(TableSnapshot tableSnapshot, Metadata
typeManager,
fileSystemFactory.create(session),
fileFormatDataSourceStats,
Optional.of(new MetadataAndProtocolEntry(metadataEntry, protocolEntry)))) {
Optional.of(new MetadataAndProtocolEntry(metadataEntry, protocolEntry)),
partitionConstraint)) {
return activeAddEntries(checkpointEntries, transactions)
.filter(partitionConstraint.isAll()
? addAction -> true
: addAction -> partitionMatchesPredicate(addAction.getCanonicalPartitionValues(), partitionConstraint.getDomains().orElseThrow()))
.collect(toImmutableList());
}
catch (IOException e) {
Expand Down Expand Up @@ -407,8 +431,9 @@ private <T> Stream<T> getEntries(
{
try {
List<Transaction> transactions = tableSnapshot.getTransactions();
// Passing TupleDomain.all() because this method is used for getting all entries
Stream<DeltaLakeTransactionLogEntry> checkpointEntries = tableSnapshot.getCheckpointTransactionLogEntries(
session, entryTypes, checkpointSchemaManager, typeManager, fileSystem, stats, Optional.empty());
session, entryTypes, checkpointSchemaManager, typeManager, fileSystem, stats, Optional.empty(), TupleDomain.all());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe worth a code comment why partitionConstraint isn't needed here.


return entryMapper.apply(
checkpointEntries,
Expand Down
Loading
Loading