From 7f87123104666b9ffd33a06cf5e7c622fa6ed0be Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Thu, 26 Oct 2023 11:00:50 +0900 Subject: [PATCH] Add support for partition pruning in Delta checkpoint iterator --- docs/src/main/sphinx/connector/delta-lake.md | 4 + .../plugin/deltalake/DeltaLakeConfig.java | 13 ++ .../plugin/deltalake/DeltaLakeMetadata.java | 2 +- .../deltalake/DeltaLakeSessionProperties.java | 11 + .../deltalake/DeltaLakeSplitManager.java | 2 +- .../transactionlog/TableSnapshot.java | 14 +- .../transactionlog/TransactionLogAccess.java | 33 ++- .../checkpoint/CheckpointEntryIterator.java | 51 ++++- .../checkpoint/CheckpointSchemaManager.java | 13 +- .../checkpoint/CheckpointWriter.java | 3 +- .../checkpoint/CheckpointWriterManager.java | 7 +- .../plugin/deltalake/TestDeltaLakeBasic.java | 105 +++++++++ .../plugin/deltalake/TestDeltaLakeConfig.java | 3 + .../TestDeltaLakeFileOperations.java | 31 +++ .../deltalake/TestDeltaLakeSplitManager.java | 2 +- .../deltalake/TestTransactionLogAccess.java | 2 +- .../deltalake/TestingDeltaLakeUtils.java | 2 +- .../transactionlog/TestTableSnapshot.java | 5 +- .../TestCheckpointEntryIterator.java | 210 ++++++++++++++++-- .../checkpoint/TestCheckpointWriter.java | 4 +- .../TestDeltaLakeFileStatistics.java | 10 +- .../partition_values_parsed/README.md | 21 ++ .../_delta_log/00000000000000000000.json | 3 + .../00000000000000000001.checkpoint.parquet | Bin 0 -> 16816 bytes .../_delta_log/00000000000000000001.json | 2 + .../00000000000000000002.checkpoint.parquet | Bin 0 -> 16791 bytes .../_delta_log/00000000000000000002.json | 2 + .../00000000000000000003.checkpoint.parquet | Bin 0 -> 16925 bytes .../_delta_log/00000000000000000003.json | 2 + .../_delta_log/_last_checkpoint | 1 + ...4e70-86ab-c21ae44c7f3f.c000.snappy.parquet | Bin 0 -> 452 bytes ...4ce1-b96c-32c5cf472476.c000.snappy.parquet | Bin 0 -> 452 bytes ...4fa6-a8bf-860da0131a5c.c000.snappy.parquet | Bin 0 -> 452 bytes .../README.md | 77 +++++++ .../_delta_log/00000000000000000000.json | 3 + .../_delta_log/00000000000000000001.json | 2 + .../_delta_log/00000000000000000002.json | 2 + .../00000000000000000003.checkpoint.parquet | Bin 0 -> 25294 bytes .../_delta_log/00000000000000000003.json | 2 + .../_delta_log/_last_checkpoint | 1 + ...4363-81b6-aeaea00fff6d.c000.snappy.parquet | Bin 0 -> 452 bytes ...4c22-ad51-909b9600e221.c000.snappy.parquet | Bin 0 -> 452 bytes ...4156-8560-bd819942f7fd.c000.snappy.parquet | Bin 0 -> 452 bytes .../partition_values_parsed/README.md | 17 ++ .../_delta_log/00000000000000000000.json | 3 + .../00000000000000000001.checkpoint.parquet | Bin 0 -> 7055 bytes .../_delta_log/00000000000000000001.json | 2 + .../00000000000000000002.checkpoint.parquet | Bin 0 -> 6920 bytes .../_delta_log/00000000000000000002.json | 2 + .../00000000000000000003.checkpoint.parquet | Bin 0 -> 7077 bytes .../_delta_log/00000000000000000003.json | 2 + .../_delta_log/_last_checkpoint | 1 + .../_trino_meta/extended_stats.json | 1 + ...9eakg_302b745a-59c0-4fce-8ca7-fed724196b93 | Bin 0 -> 199 bytes ...9eakg_e6448c08-9b43-4fa1-8288-823fd3d692b9 | Bin 0 -> 199 bytes ...9eakg_2a008dd8-da7f-496a-b404-ca455732578e | Bin 0 -> 199 bytes 56 files changed, 630 insertions(+), 43 deletions(-) create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/partition_values_parsed/README.md create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/partition_values_parsed/_delta_log/00000000000000000000.json create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/partition_values_parsed/_delta_log/00000000000000000001.checkpoint.parquet create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/partition_values_parsed/_delta_log/00000000000000000001.json create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/partition_values_parsed/_delta_log/00000000000000000002.checkpoint.parquet create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/partition_values_parsed/_delta_log/00000000000000000002.json create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/partition_values_parsed/_delta_log/00000000000000000003.checkpoint.parquet create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/partition_values_parsed/_delta_log/00000000000000000003.json create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/partition_values_parsed/_delta_log/_last_checkpoint create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/partition_values_parsed/int_part=10/string_part=part1/part-00000-383afb1a-87de-4e70-86ab-c21ae44c7f3f.c000.snappy.parquet create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/partition_values_parsed/int_part=20/string_part=part2/part-00000-e0b4887e-95f6-4ce1-b96c-32c5cf472476.c000.snappy.parquet create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/partition_values_parsed/int_part=__HIVE_DEFAULT_PARTITION__/string_part=__HIVE_DEFAULT_PARTITION__/part-00000-dcb29d13-eeca-4fa6-a8bf-860da0131a5c.c000.snappy.parquet create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/partition_values_parsed_all_types/README.md create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/partition_values_parsed_all_types/_delta_log/00000000000000000000.json create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/partition_values_parsed_all_types/_delta_log/00000000000000000001.json create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/partition_values_parsed_all_types/_delta_log/00000000000000000002.json create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/partition_values_parsed_all_types/_delta_log/00000000000000000003.checkpoint.parquet create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/partition_values_parsed_all_types/_delta_log/00000000000000000003.json create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/partition_values_parsed_all_types/_delta_log/_last_checkpoint create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/partition_values_parsed_all_types/part_boolean=__HIVE_DEFAULT_PARTITION__/part_tinyint=__HIVE_DEFAULT_PARTITION__/part_smallint=__HIVE_DEFAULT_PARTITION__/part_int=__HIVE_DEFAULT_PARTITION__/part_bigint=__HIVE_DEFAULT_PARTITION__/part_short_decimal=__HIVE_DEFAULT_PARTITION__/part_long_decimal=__HIVE_DEFAULT_PARTITION__/part_double=__HIVE_DEFAULT_PARTITION__/part_float=__HIVE_DEFAULT_PARTITION__/part_varchar=__HIVE_DEFAULT_PARTITION__/part_date=__HIVE_DEFAULT_PARTITION__/part_timestamp=__HIVE_DEFAULT_PARTITION__/part_timestamp_ntz=__HIVE_DEFAULT_PARTITION__/part-00000-194b12f4-b133-4363-81b6-aeaea00fff6d.c000.snappy.parquet create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/partition_values_parsed_all_types/part_boolean=false/part_tinyint=2/part_smallint=20/part_int=200/part_bigint=2000/part_short_decimal=223.12/part_long_decimal=223456789012345678.123/part_double=10.2/part_float=30.4/part_varchar=b/part_date=2020-08-22/part_timestamp=2020-10-22 01%3A00%3A00.123/part_timestamp_ntz=2023-01-03 01%3A02%3A03.456/part-00000-b0b98900-4fe4-4c22-ad51-909b9600e221.c000.snappy.parquet create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/partition_values_parsed_all_types/part_boolean=true/part_tinyint=1/part_smallint=10/part_int=100/part_bigint=1000/part_short_decimal=123.12/part_long_decimal=123456789012345678.123/part_double=1.2/part_float=3.4/part_varchar=a/part_date=2020-08-21/part_timestamp=2020-10-21 01%3A00%3A00.123/part_timestamp_ntz=2023-01-02 01%3A02%3A03.456/part-00000-0c74f71e-bb8b-4156-8560-bd819942f7fd.c000.snappy.parquet create mode 100644 plugin/trino-delta-lake/src/test/resources/trino432/partition_values_parsed/README.md create mode 100644 plugin/trino-delta-lake/src/test/resources/trino432/partition_values_parsed/_delta_log/00000000000000000000.json create mode 100644 plugin/trino-delta-lake/src/test/resources/trino432/partition_values_parsed/_delta_log/00000000000000000001.checkpoint.parquet create mode 100644 plugin/trino-delta-lake/src/test/resources/trino432/partition_values_parsed/_delta_log/00000000000000000001.json create mode 100644 plugin/trino-delta-lake/src/test/resources/trino432/partition_values_parsed/_delta_log/00000000000000000002.checkpoint.parquet create mode 100644 plugin/trino-delta-lake/src/test/resources/trino432/partition_values_parsed/_delta_log/00000000000000000002.json create mode 100644 plugin/trino-delta-lake/src/test/resources/trino432/partition_values_parsed/_delta_log/00000000000000000003.checkpoint.parquet create mode 100644 plugin/trino-delta-lake/src/test/resources/trino432/partition_values_parsed/_delta_log/00000000000000000003.json create mode 100644 plugin/trino-delta-lake/src/test/resources/trino432/partition_values_parsed/_delta_log/_last_checkpoint create mode 100644 plugin/trino-delta-lake/src/test/resources/trino432/partition_values_parsed/_delta_log/_trino_meta/extended_stats.json create mode 100644 plugin/trino-delta-lake/src/test/resources/trino432/partition_values_parsed/int_part=10/string_part=part1/20231109_020343_00032_9eakg_302b745a-59c0-4fce-8ca7-fed724196b93 create mode 100644 plugin/trino-delta-lake/src/test/resources/trino432/partition_values_parsed/int_part=20/string_part=part2/20231109_020344_00033_9eakg_e6448c08-9b43-4fa1-8288-823fd3d692b9 create mode 100644 plugin/trino-delta-lake/src/test/resources/trino432/partition_values_parsed/int_part=__HIVE_DEFAULT_PARTITION__/string_part=__HIVE_DEFAULT_PARTITION__/20231109_020350_00034_9eakg_2a008dd8-da7f-496a-b404-ca455732578e diff --git a/docs/src/main/sphinx/connector/delta-lake.md b/docs/src/main/sphinx/connector/delta-lake.md index 1bbd1bb95e9c..1432e4571076 100644 --- a/docs/src/main/sphinx/connector/delta-lake.md +++ b/docs/src/main/sphinx/connector/delta-lake.md @@ -124,6 +124,10 @@ values. Typical usage does not require you to configure them. * - `delta.checkpoint-row-statistics-writing.enabled` - Enable writing row statistics to checkpoint files. - `true` +* - ``delta.checkpoint-filtering.enabled`` + - Enable partition pruning when reading checkpoint files. + The equivalent catalog session property is ``checkpoint_filtering_enabled``. + - ``false`` * - `delta.dynamic-filtering.wait-timeout` - Duration to wait for completion of [dynamic filtering](/admin/dynamic-filtering) during split generation. The equivalent diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java index 9dd03f686b18..f92a5b893b11 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java @@ -62,6 +62,7 @@ public class DeltaLakeConfig private boolean unsafeWritesEnabled; private boolean checkpointRowStatisticsWritingEnabled = true; private long defaultCheckpointWritingInterval = 10; + private boolean checkpointFilteringEnabled; private Duration vacuumMinRetention = new Duration(7, DAYS); private Optional hiveCatalogName = Optional.empty(); private Duration dynamicFilteringWaitTimeout = new Duration(0, SECONDS); @@ -269,6 +270,18 @@ public long getDefaultCheckpointWritingInterval() return defaultCheckpointWritingInterval; } + public boolean isCheckpointPartitionFilterEnabled() + { + return checkpointFilteringEnabled; + } + + @Config("delta.checkpoint-filtering.enabled") + public DeltaLakeConfig setCheckpointPartitionFilterEnabled(boolean checkpointFilteringEnabled) + { + this.checkpointFilteringEnabled = checkpointFilteringEnabled; + return this; + } + @NotNull public Duration getVacuumMinRetention() { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index fc337cdbeb1c..90dd538310f6 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -3518,7 +3518,7 @@ private OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandl private List getAddFileEntriesMatchingEnforcedPartitionConstraint(ConnectorSession session, DeltaLakeTableHandle tableHandle) { TableSnapshot tableSnapshot = getSnapshot(session, tableHandle); - List validDataFiles = transactionLogAccess.getActiveFiles(tableSnapshot, tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry(), session); + List validDataFiles = transactionLogAccess.getActiveFiles(tableSnapshot, tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry(), tableHandle.getEnforcedPartitionConstraint(), session); TupleDomain enforcedPartitionConstraint = tableHandle.getEnforcedPartitionConstraint(); if (enforcedPartitionConstraint.isAll()) { return validDataFiles; diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java index 0ca4012c16e6..1179ccec7cb7 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java @@ -70,6 +70,7 @@ public final class DeltaLakeSessionProperties public static final String LEGACY_CREATE_TABLE_WITH_EXISTING_LOCATION_ENABLED = "legacy_create_table_with_existing_location_enabled"; private static final String PROJECTION_PUSHDOWN_ENABLED = "projection_pushdown_enabled"; private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required"; + private static final String CHECKPOINT_FILTERING_ENABLED = "checkpoint_filtering_enabled"; private final List> sessionProperties; @@ -202,6 +203,11 @@ public DeltaLakeSessionProperties( QUERY_PARTITION_FILTER_REQUIRED, "Require filter on partition column", deltaLakeConfig.isQueryPartitionFilterRequired(), + false), + booleanProperty( + CHECKPOINT_FILTERING_ENABLED, + "Use filter in checkpoint reader", + deltaLakeConfig.isCheckpointPartitionFilterEnabled(), false)); } @@ -306,4 +312,9 @@ public static boolean isQueryPartitionFilterRequired(ConnectorSession session) { return session.getProperty(QUERY_PARTITION_FILTER_REQUIRED, Boolean.class); } + + public static boolean isCheckpointFilteringEnabled(ConnectorSession session) + { + return session.getProperty(CHECKPOINT_FILTERING_ENABLED, Boolean.class); + } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java index 84b5e71caf1f..606ab5d55ce1 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java @@ -154,7 +154,7 @@ private Stream getSplits( { TableSnapshot tableSnapshot = deltaLakeTransactionManager.get(transaction, session.getIdentity()) .getSnapshot(session, tableHandle.getSchemaTableName(), tableHandle.getLocation(), tableHandle.getReadVersion()); - List validDataFiles = transactionLogAccess.getActiveFiles(tableSnapshot, tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry(), session); + List validDataFiles = transactionLogAccess.getActiveFiles(tableSnapshot, tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry(), tableHandle.getEnforcedPartitionConstraint(), session); TupleDomain enforcedPartitionConstraint = tableHandle.getEnforcedPartitionConstraint(); TupleDomain nonPartitionConstraint = tableHandle.getNonPartitionConstraint(); Domain pathDomain = getPathDomain(nonPartitionConstraint); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java index 5069517239a8..ba4fc8c37a40 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java @@ -18,6 +18,7 @@ import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoInputFile; import io.trino.parquet.ParquetReaderOptions; +import io.trino.plugin.deltalake.DeltaLakeColumnHandle; import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator; import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointSchemaManager; import io.trino.plugin.deltalake.transactionlog.checkpoint.LastCheckpoint; @@ -26,6 +27,7 @@ import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.TypeManager; import java.io.FileNotFoundException; @@ -178,7 +180,8 @@ public Stream getCheckpointTransactionLogEntries( TypeManager typeManager, TrinoFileSystem fileSystem, FileFormatDataSourceStats stats, - Optional metadataAndProtocol) + Optional metadataAndProtocol, + TupleDomain partitionConstraint) throws IOException { if (lastCheckpoint.isEmpty()) { @@ -206,7 +209,8 @@ public Stream getCheckpointTransactionLogEntries( typeManager, stats, checkpoint, - checkpointFile))); + checkpointFile, + partitionConstraint))); } return resultStream; } @@ -225,7 +229,8 @@ private Iterator getCheckpointTransactionLogEntrie TypeManager typeManager, FileFormatDataSourceStats stats, LastCheckpoint checkpoint, - TrinoInputFile checkpointFile) + TrinoInputFile checkpointFile, + TupleDomain partitionConstraint) throws IOException { long fileSize; @@ -247,7 +252,8 @@ private Iterator getCheckpointTransactionLogEntrie stats, parquetReaderOptions, checkpointRowStatisticsWritingEnabled, - domainCompactionThreshold); + domainCompactionThreshold, + partitionConstraint); } public record MetadataAndProtocolEntry(MetadataEntry metadataEntry, ProtocolEntry protocolEntry) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java index 52ff743b8968..b503a27c28cb 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java @@ -27,6 +27,7 @@ import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.filesystem.TrinoInputFile; import io.trino.parquet.ParquetReaderOptions; +import io.trino.plugin.deltalake.DeltaLakeColumnHandle; import io.trino.plugin.deltalake.DeltaLakeColumnMetadata; import io.trino.plugin.deltalake.DeltaLakeConfig; import io.trino.plugin.deltalake.transactionlog.TableSnapshot.MetadataAndProtocolEntry; @@ -39,6 +40,7 @@ import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.ArrayType; import io.trino.spi.type.BooleanType; import io.trino.spi.type.MapType; @@ -75,6 +77,8 @@ import static io.airlift.slice.SizeOf.instanceSize; import static io.trino.cache.CacheUtils.invalidateAllIf; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA; +import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isCheckpointFilteringEnabled; +import static io.trino.plugin.deltalake.DeltaLakeSplitManager.partitionMatchesPredicate; import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.readLastCheckpoint; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogJsonEntryPath; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.ADD; @@ -219,9 +223,20 @@ public MetadataEntry getMetadataEntry(TableSnapshot tableSnapshot, ConnectorSess .orElseThrow(() -> new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Metadata not found in transaction log for " + tableSnapshot.getTable())); } + @Deprecated public List getActiveFiles(TableSnapshot tableSnapshot, MetadataEntry metadataEntry, ProtocolEntry protocolEntry, ConnectorSession session) + { + return getActiveFiles(tableSnapshot, metadataEntry, protocolEntry, TupleDomain.all(), session); + } + + public List getActiveFiles(TableSnapshot tableSnapshot, MetadataEntry metadataEntry, ProtocolEntry protocolEntry, TupleDomain partitionConstraint, ConnectorSession session) { try { + if (isCheckpointFilteringEnabled(session)) { + return loadActiveFiles(tableSnapshot, metadataEntry, protocolEntry, partitionConstraint, session).stream() + .collect(toImmutableList()); + } + TableVersion tableVersion = new TableVersion(new TableLocation(tableSnapshot.getTable(), tableSnapshot.getTableLocation()), tableSnapshot.getVersion()); DeltaLakeDataFileCacheEntry cacheEntry = activeDataFileCache.get(tableVersion, () -> { @@ -249,7 +264,7 @@ public List getActiveFiles(TableSnapshot tableSnapshot, MetadataEn } } - List activeFiles = loadActiveFiles(tableSnapshot, metadataEntry, protocolEntry, session); + List activeFiles = loadActiveFiles(tableSnapshot, metadataEntry, protocolEntry, TupleDomain.all(), session); return new DeltaLakeDataFileCacheEntry(tableSnapshot.getVersion(), activeFiles); }); return cacheEntry.getActiveFiles(); @@ -259,7 +274,12 @@ public List getActiveFiles(TableSnapshot tableSnapshot, MetadataEn } } - private List loadActiveFiles(TableSnapshot tableSnapshot, MetadataEntry metadataEntry, ProtocolEntry protocolEntry, ConnectorSession session) + private List loadActiveFiles( + TableSnapshot tableSnapshot, + MetadataEntry metadataEntry, + ProtocolEntry protocolEntry, + TupleDomain partitionConstraint, + ConnectorSession session) { List transactions = tableSnapshot.getTransactions(); try (Stream checkpointEntries = tableSnapshot.getCheckpointTransactionLogEntries( @@ -269,8 +289,12 @@ private List loadActiveFiles(TableSnapshot tableSnapshot, Metadata typeManager, fileSystemFactory.create(session), fileFormatDataSourceStats, - Optional.of(new MetadataAndProtocolEntry(metadataEntry, protocolEntry)))) { + Optional.of(new MetadataAndProtocolEntry(metadataEntry, protocolEntry)), + partitionConstraint)) { return activeAddEntries(checkpointEntries, transactions) + .filter(partitionConstraint.isAll() + ? addAction -> true + : addAction -> partitionMatchesPredicate(addAction.getCanonicalPartitionValues(), partitionConstraint.getDomains().orElseThrow())) .collect(toImmutableList()); } catch (IOException e) { @@ -407,8 +431,9 @@ private Stream getEntries( { try { List transactions = tableSnapshot.getTransactions(); + // Passing TupleDomain.all() because this method is used for getting all entries Stream checkpointEntries = tableSnapshot.getCheckpointTransactionLogEntries( - session, entryTypes, checkpointSchemaManager, typeManager, fileSystem, stats, Optional.empty()); + session, entryTypes, checkpointSchemaManager, typeManager, fileSystem, stats, Optional.empty(), TupleDomain.all()); return entryMapper.apply( checkpointEntries, diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java index 1892b44643dd..c2dca1f4e295 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java @@ -22,6 +22,7 @@ import io.airlift.log.Logger; import io.trino.filesystem.TrinoInputFile; import io.trino.parquet.ParquetReaderOptions; +import io.trino.plugin.deltalake.DeltaHiveTypeTranslator; import io.trino.plugin.deltalake.DeltaLakeColumnHandle; import io.trino.plugin.deltalake.DeltaLakeColumnMetadata; import io.trino.plugin.deltalake.transactionlog.AddFileEntry; @@ -82,6 +83,7 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA; +import static io.trino.plugin.deltalake.DeltaLakeSplitManager.partitionMatchesPredicate; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.isDeletionVectorEnabled; import static io.trino.plugin.deltalake.transactionlog.TransactionLogAccess.columnsWithStats; @@ -140,6 +142,7 @@ public String getColumnName() private final Queue nextEntries; private final List extractors; private final boolean checkpointRowStatisticsWritingEnabled; + private final TupleDomain partitionConstraint; private MetadataEntry metadataEntry; private ProtocolEntry protocolEntry; private List schema; @@ -160,13 +163,15 @@ public CheckpointEntryIterator( FileFormatDataSourceStats stats, ParquetReaderOptions parquetReaderOptions, boolean checkpointRowStatisticsWritingEnabled, - int domainCompactionThreshold) + int domainCompactionThreshold, + TupleDomain partitionConstraint) { this.checkpointPath = checkpoint.location().toString(); this.session = requireNonNull(session, "session is null"); this.stringList = (ArrayType) typeManager.getType(TypeSignature.arrayType(VARCHAR.getTypeSignature())); this.stringMap = (MapType) typeManager.getType(TypeSignature.mapType(VARCHAR.getTypeSignature(), VARCHAR.getTypeSignature())); this.checkpointRowStatisticsWritingEnabled = checkpointRowStatisticsWritingEnabled; + this.partitionConstraint = requireNonNull(partitionConstraint, "partitionConstraint is null"); checkArgument(!fields.isEmpty(), "fields is empty"); Map extractors = ImmutableMap.builder() .put(TRANSACTION, this::buildTxnEntry) @@ -221,7 +226,7 @@ private DeltaLakeColumnHandle buildColumnHandle(EntryType entryType, CheckpointS { Type type = switch (entryType) { case TRANSACTION -> schemaManager.getTxnEntryType(); - case ADD -> schemaManager.getAddEntryType(metadataEntry, protocolEntry, true, true); + case ADD -> schemaManager.getAddEntryType(metadataEntry, protocolEntry, true, true, true); case REMOVE -> schemaManager.getRemoveEntryType(); case METADATA -> schemaManager.getMetadataEntryType(); case PROTOCOL -> schemaManager.getProtocolEntryType(true, true); @@ -272,7 +277,30 @@ private TupleDomain buildTupleDomainColumnHandle(EntryType ent type)), ColumnType.REGULAR, column.getComment()); - return TupleDomain.withColumnDomains(ImmutableMap.of(handle, Domain.notNull(handle.getType()))); + + ImmutableMap.Builder domains = ImmutableMap.builder() + .put(handle, Domain.notNull(handle.getType())); + if (entryType == ADD) { + partitionConstraint.getDomains().orElseThrow().forEach((key, value) -> domains.put(toPartitionValuesParsedField(column, key), value)); + } + + return TupleDomain.withColumnDomains(domains.buildOrThrow()); + } + + private static HiveColumnHandle toPartitionValuesParsedField(HiveColumnHandle addColumn, DeltaLakeColumnHandle partitionColumn) + { + return new HiveColumnHandle( + addColumn.getBaseColumnName(), + addColumn.getBaseHiveColumnIndex(), + addColumn.getBaseHiveType(), + addColumn.getBaseType(), + Optional.of(new HiveColumnProjectionInfo( + ImmutableList.of(0, 0), // hiveColumnIndex; we provide fake value because we always find columns by name + ImmutableList.of("partitionvalues_parsed", partitionColumn.getColumnName()), + DeltaHiveTypeTranslator.toHiveType(partitionColumn.getType()), + partitionColumn.getType())), + HiveColumnHandle.ColumnType.REGULAR, + addColumn.getComment()); } private DeltaLakeTransactionLogEntry buildCommitInfoEntry(ConnectorSession session, Block block, int pagePosition) @@ -431,13 +459,16 @@ private DeltaLakeTransactionLogEntry buildAddEntry(ConnectorSession session, Blo statsFieldIndex = 5; } - Optional parsedStats = Optional.ofNullable(getRowField(addEntryRow, statsFieldIndex + 1)).map(this::parseStatisticsFromParquet); + boolean partitionValuesParsedExists = addEntryRow.getUnderlyingFieldBlock(statsFieldIndex + 1) instanceof RowBlock && // partitionValues_parsed + addEntryRow.getUnderlyingFieldBlock(statsFieldIndex + 2) instanceof RowBlock; // stats_parsed + int parsedStatsIndex = partitionValuesParsedExists ? statsFieldIndex + 1 : statsFieldIndex; + Optional parsedStats = Optional.ofNullable(getRowField(addEntryRow, parsedStatsIndex + 1)).map(this::parseStatisticsFromParquet); Optional stats = Optional.empty(); if (parsedStats.isEmpty()) { stats = Optional.ofNullable(getStringField(addEntryRow, statsFieldIndex)); } - Map tags = getMapField(addEntryRow, statsFieldIndex + 2); + Map tags = getMapField(addEntryRow, parsedStatsIndex + 2); AddFileEntry result = new AddFileEntry( path, partitionValues, @@ -709,7 +740,15 @@ private void fillNextEntries() for (int i = 0; i < extractors.size(); ++i) { DeltaLakeTransactionLogEntry entry = extractors.get(i).getEntry(session, page.getBlock(i).getLoadedBlock(), pagePosition); if (entry != null) { - nextEntries.add(entry); + if (entry.getAdd() != null) { + if (partitionConstraint.isAll() || + partitionMatchesPredicate(entry.getAdd().getCanonicalPartitionValues(), partitionConstraint.getDomains().orElseThrow())) { + nextEntries.add(entry); + } + } + else { + nextEntries.add(entry); + } } } pagePosition++; diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointSchemaManager.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointSchemaManager.java index 290d66081fc0..3fb8df2d5b69 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointSchemaManager.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointSchemaManager.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Inject; +import io.trino.plugin.deltalake.DeltaLakeColumnHandle; import io.trino.plugin.deltalake.DeltaLakeColumnMetadata; import io.trino.plugin.deltalake.transactionlog.MetadataEntry; import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; @@ -30,6 +31,7 @@ import java.util.Optional; import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractPartitionColumns; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.isDeletionVectorEnabled; import static io.trino.plugin.deltalake.transactionlog.TransactionLogAccess.columnsWithStats; @@ -112,7 +114,7 @@ public RowType getMetadataEntryType() return metadataEntryType; } - public RowType getAddEntryType(MetadataEntry metadataEntry, ProtocolEntry protocolEntry, boolean requireWriteStatsAsJson, boolean requireWriteStatsAsStruct) + public RowType getAddEntryType(MetadataEntry metadataEntry, ProtocolEntry protocolEntry, boolean requireWriteStatsAsJson, boolean requireWriteStatsAsStruct, boolean usePartitionValuesParsed) { List allColumns = extractSchema(metadataEntry, protocolEntry, typeManager); List minMaxColumns = columnsWithStats(metadataEntry, protocolEntry, typeManager); @@ -156,6 +158,15 @@ public RowType getAddEntryType(MetadataEntry metadataEntry, ProtocolEntry protoc if (requireWriteStatsAsJson) { addFields.add(RowType.field("stats", VARCHAR)); } + if (usePartitionValuesParsed) { + List partitionColumns = extractPartitionColumns(metadataEntry, protocolEntry, typeManager); + if (!partitionColumns.isEmpty()) { + List partitionValuesParsed = partitionColumns.stream() + .map(column -> RowType.field(column.getColumnName(), column.getType())) + .collect(toImmutableList()); + addFields.add(RowType.field("partitionValues_parsed", RowType.from(partitionValuesParsed))); + } + } if (requireWriteStatsAsStruct) { addFields.add(RowType.field("stats_parsed", RowType.from(statsColumns.build()))); } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriter.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriter.java index 4f2c94a46abd..b8d0c1e38c74 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriter.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriter.java @@ -111,7 +111,8 @@ public void write(CheckpointEntries entries, TrinoOutputFile outputFile) RowType metadataEntryType = checkpointSchemaManager.getMetadataEntryType(); RowType protocolEntryType = checkpointSchemaManager.getProtocolEntryType(protocolEntry.getReaderFeatures().isPresent(), protocolEntry.getWriterFeatures().isPresent()); RowType txnEntryType = checkpointSchemaManager.getTxnEntryType(); - RowType addEntryType = checkpointSchemaManager.getAddEntryType(entries.getMetadataEntry(), entries.getProtocolEntry(), writeStatsAsJson, writeStatsAsStruct); + // TODO https://github.com/trinodb/trino/issues/19586 Add support for writing 'partitionValues_parsed' field + RowType addEntryType = checkpointSchemaManager.getAddEntryType(entries.getMetadataEntry(), entries.getProtocolEntry(), writeStatsAsJson, writeStatsAsStruct, false); RowType removeEntryType = checkpointSchemaManager.getRemoveEntryType(); List columnNames = ImmutableList.of( diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriterManager.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriterManager.java index 44ebc5d96aed..0c797dc87c22 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriterManager.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriterManager.java @@ -29,6 +29,7 @@ import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.TypeManager; import java.io.IOException; @@ -103,7 +104,8 @@ public void writeCheckpoint(ConnectorSession session, TableSnapshot snapshot) typeManager, fileSystem, fileFormatDataSourceStats, - Optional.empty()) + Optional.empty(), + TupleDomain.all()) .filter(entry -> entry.getMetaData() != null || entry.getProtocol() != null) .collect(toImmutableList()); @@ -135,7 +137,8 @@ public void writeCheckpoint(ConnectorSession session, TableSnapshot snapshot) typeManager, fileSystem, fileFormatDataSourceStats, - Optional.of(new MetadataAndProtocolEntry(metadataLogEntry.getMetaData(), protocolLogEntry.getProtocol()))) + Optional.of(new MetadataAndProtocolEntry(metadataLogEntry.getMetaData(), protocolLogEntry.getProtocol())), + TupleDomain.all()) .forEach(checkpointBuilder::addLogEntry); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java index 6ce5295fd046..f9ea4fc8d336 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java @@ -43,6 +43,7 @@ import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.PrimitiveType; +import org.intellij.lang.annotations.Language; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; @@ -1032,6 +1033,110 @@ public void testReadMultipartCheckpoint() assertThat(query("SELECT * FROM " + tableName)).matches("VALUES 1, 2, 3, 4, 5, 6, 7"); } + /** + * @see deltalake.partition_values_parsed + */ + @Test + public void testDeltaLakeWithPartitionValuesParsed() + throws Exception + { + testPartitionValuesParsed("deltalake/partition_values_parsed"); + } + + /** + * @see trino432.partition_values_parsed + */ + @Test + public void testTrinoWithoutPartitionValuesParsed() + throws Exception + { + testPartitionValuesParsed("trino432/partition_values_parsed"); + } + + private void testPartitionValuesParsed(String resourceName) + throws Exception + { + String tableName = "test_partition_values_parsed_checkpoint_" + randomNameSuffix(); + Path tableLocation = Files.createTempFile(tableName, null); + copyDirectoryContents(new File(Resources.getResource(resourceName).toURI()).toPath(), tableLocation); + assertUpdate("CALL system.register_table('%s', '%s', '%s')".formatted(getSession().getSchema().orElseThrow(), tableName, tableLocation.toUri())); + + Session session = Session.builder(getQueryRunner().getDefaultSession()) + .setCatalogSessionProperty("delta", "checkpoint_filtering_enabled", "true") + .build(); + + assertThat(query(session, "SELECT id FROM " + tableName + " WHERE int_part = 10 AND string_part = 'part1'")) + .matches("VALUES 1"); + assertThat(query(session, "SELECT id FROM " + tableName + " WHERE int_part != 10")) + .matches("VALUES 2"); + assertThat(query(session, "SELECT id FROM " + tableName + " WHERE int_part > 10")) + .matches("VALUES 2"); + assertThat(query(session, "SELECT id FROM " + tableName + " WHERE int_part >= 10")) + .matches("VALUES 1, 2"); + assertThat(query(session, "SELECT id FROM " + tableName + " WHERE int_part IN (10, 20)")) + .matches("VALUES 1, 2"); + assertThat(query("SELECT id FROM " + tableName + " WHERE int_part IS NULL AND string_part IS NULL")) + .matches("VALUES 3"); + assertThat(query("SELECT id FROM " + tableName + " WHERE int_part IS NOT NULL AND string_part IS NOT NULL")) + .matches("VALUES 1, 2"); + + assertThat(query("SELECT id FROM " + tableName + " WHERE int_part = 10 AND string_part = 'unmatched partition condition'")) + .returnsEmptyResult(); + assertThat(query("SELECT id FROM " + tableName + " WHERE int_part IS NULL AND string_part IS NOT NULL")) + .returnsEmptyResult(); + } + + /** + * @see deltalake.partition_values_parsed_all_types + */ + @Test + public void testDeltaLakeWithPartitionValuesParsedAllTypes() + throws Exception + { + String tableName = "test_partition_values_parsed_checkpoint_" + randomNameSuffix(); + Path tableLocation = Files.createTempFile(tableName, null); + copyDirectoryContents(new File(Resources.getResource("deltalake/partition_values_parsed_all_types").toURI()).toPath(), tableLocation); + assertUpdate("CALL system.register_table('%s', '%s', '%s')".formatted(getSession().getSchema().orElseThrow(), tableName, tableLocation.toUri())); + + assertPartitionValuesParsedCondition(tableName, 1, "part_boolean = true"); + assertPartitionValuesParsedCondition(tableName, 1, "part_tinyint = 1"); + assertPartitionValuesParsedCondition(tableName, 1, "part_smallint = 10"); + assertPartitionValuesParsedCondition(tableName, 1, "part_int = 100"); + assertPartitionValuesParsedCondition(tableName, 1, "part_bigint = 1000"); + assertPartitionValuesParsedCondition(tableName, 1, "part_short_decimal = CAST('123.12' AS DECIMAL(5,2))"); + assertPartitionValuesParsedCondition(tableName, 1, "part_long_decimal = CAST('123456789012345678.123' AS DECIMAL(21,3))"); + assertPartitionValuesParsedCondition(tableName, 1, "part_double = 1.2"); + assertPartitionValuesParsedCondition(tableName, 1, "part_float = 3.4"); + assertPartitionValuesParsedCondition(tableName, 1, "part_varchar = 'a'"); + assertPartitionValuesParsedCondition(tableName, 1, "part_date = DATE '2020-08-21'"); + assertPartitionValuesParsedCondition(tableName, 1, "part_timestamp = TIMESTAMP '2020-10-21 01:00:00.123 UTC'"); + assertPartitionValuesParsedCondition(tableName, 1, "part_timestamp_ntz = TIMESTAMP '2023-01-02 01:02:03.456'"); + + assertPartitionValuesParsedCondition(tableName, 3, "part_boolean IS NULL"); + assertPartitionValuesParsedCondition(tableName, 3, "part_tinyint IS NULL"); + assertPartitionValuesParsedCondition(tableName, 3, "part_smallint IS NULL"); + assertPartitionValuesParsedCondition(tableName, 3, "part_int IS NULL"); + assertPartitionValuesParsedCondition(tableName, 3, "part_bigint IS NULL"); + assertPartitionValuesParsedCondition(tableName, 3, "part_short_decimal IS NULL"); + assertPartitionValuesParsedCondition(tableName, 3, "part_long_decimal IS NULL"); + assertPartitionValuesParsedCondition(tableName, 3, "part_double IS NULL"); + assertPartitionValuesParsedCondition(tableName, 3, "part_float IS NULL"); + assertPartitionValuesParsedCondition(tableName, 3, "part_varchar IS NULL"); + assertPartitionValuesParsedCondition(tableName, 3, "part_date IS NULL"); + assertPartitionValuesParsedCondition(tableName, 3, "part_timestamp IS NULL"); + assertPartitionValuesParsedCondition(tableName, 3, "part_timestamp_ntz IS NULL"); + } + + private void assertPartitionValuesParsedCondition(String tableName, int id, @Language("SQL") String condition) + { + Session session = Session.builder(getQueryRunner().getDefaultSession()) + .setCatalogSessionProperty("delta", "checkpoint_filtering_enabled", "true") + .build(); + + assertThat(query(session, "SELECT id FROM " + tableName + " WHERE " + condition)) + .matches("VALUES " + id); + } + private static MetadataEntry loadMetadataEntry(long entryNumber, Path tableLocation) throws IOException { diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java index f374ebb6c605..c2ddffcc048f 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java @@ -53,6 +53,7 @@ public void testDefaults() .setMaxPartitionsPerWriter(100) .setUnsafeWritesEnabled(false) .setDefaultCheckpointWritingInterval(10) + .setCheckpointPartitionFilterEnabled(false) .setCheckpointRowStatisticsWritingEnabled(true) .setVacuumMinRetention(new Duration(7, DAYS)) .setHiveCatalogName(null) @@ -90,6 +91,7 @@ public void testExplicitPropertyMappings() .put("delta.max-partitions-per-writer", "200") .put("delta.enable-non-concurrent-writes", "true") .put("delta.default-checkpoint-writing-interval", "15") + .put("delta.checkpoint-filtering.enabled", "true") .put("delta.checkpoint-row-statistics-writing.enabled", "false") .put("delta.vacuum.min-retention", "13h") .put("delta.hive-catalog-name", "hive") @@ -125,6 +127,7 @@ public void testExplicitPropertyMappings() .setUnsafeWritesEnabled(true) .setDefaultCheckpointWritingInterval(15) .setCheckpointRowStatisticsWritingEnabled(false) + .setCheckpointPartitionFilterEnabled(true) .setVacuumMinRetention(new Duration(13, HOURS)) .setHiveCatalogName("hive") .setDynamicFilteringWaitTimeout(new Duration(30, MINUTES)) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java index 0e4f043f94f0..6470b9978c2f 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java @@ -204,6 +204,37 @@ public void testReadTableCheckpointInterval() assertUpdate("DROP TABLE test_read_checkpoint"); } + @Test + public void testReadPartitionTableWithCheckpointFiltering() + { + String catalog = getSession().getCatalog().orElseThrow(); + + assertUpdate("DROP TABLE IF EXISTS test_checkpoint_filtering"); + + assertUpdate("CREATE TABLE test_checkpoint_filtering(key varchar, data varchar) WITH (partitioned_by = ARRAY['key'], checkpoint_interval = 2)"); + assertUpdate("INSERT INTO test_checkpoint_filtering(key, data) VALUES ('p1', '1-abc'), ('p1', '1-def'), ('p2', '2-abc'), ('p2', '2-def')", 4); + assertUpdate("INSERT INTO test_checkpoint_filtering(key, data) VALUES ('p1', '1-baz'), ('p2', '2-baz')", 2); + + Session session = Session.builder(getSession()) + .setCatalogSessionProperty(catalog, "checkpoint_filtering_enabled", "true") + .build(); + + assertUpdate("CALL system.flush_metadata_cache(schema_name => CURRENT_SCHEMA, table_name => 'test_checkpoint_filtering')"); + assertFileSystemAccesses( + session, + "TABLE test_checkpoint_filtering", + ImmutableMultiset.builder() + .add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM)) + .addCopies(new FileOperation(CHECKPOINT, "00000000000000000002.checkpoint.parquet", INPUT_FILE_GET_LENGTH), 4) // TODO (https://github.com/trinodb/trino/issues/18916) should be checked once per query + .addCopies(new FileOperation(CHECKPOINT, "00000000000000000002.checkpoint.parquet", INPUT_FILE_NEW_STREAM), 2) // TODO (https://github.com/trinodb/trino/issues/18916) should be checked once per query + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM)) + .addCopies(new FileOperation(DATA, "key=p1/", INPUT_FILE_NEW_STREAM), 2) + .addCopies(new FileOperation(DATA, "key=p2/", INPUT_FILE_NEW_STREAM), 2) + .build()); + + assertUpdate("DROP TABLE test_checkpoint_filtering"); + } + @Test public void testReadWholePartition() { diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java index ff5ea4be8ac6..9e0367c8b9f0 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java @@ -185,7 +185,7 @@ private DeltaLakeSplitManager setupSplitManager(List addFileEntrie new ParquetReaderConfig()) { @Override - public List getActiveFiles(TableSnapshot tableSnapshot, MetadataEntry metadataEntry, ProtocolEntry protocolEntry, ConnectorSession session) + public List getActiveFiles(TableSnapshot tableSnapshot, MetadataEntry metadataEntry, ProtocolEntry protocolEntry, TupleDomain partitionConstraint, ConnectorSession session) { return addFileEntries; } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java index 281804f7bec3..fc1d17dc475d 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java @@ -66,6 +66,7 @@ import static io.trino.filesystem.TrackingFileSystemFactory.OperationType.INPUT_FILE_GET_LENGTH; import static io.trino.filesystem.TrackingFileSystemFactory.OperationType.INPUT_FILE_NEW_STREAM; import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR; +import static io.trino.plugin.deltalake.DeltaTestingConnectorSession.SESSION; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractColumnMetadata; import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.LAST_CHECKPOINT_FILENAME; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY; @@ -73,7 +74,6 @@ import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_STATS; import static io.trino.spi.type.TimeZoneKey.UTC_KEY; import static io.trino.testing.MultisetAssertions.assertMultisetsEqual; -import static io.trino.testing.TestingConnectorSession.SESSION; import static io.trino.type.InternalTypeManager.TESTING_TYPE_MANAGER; import static java.lang.String.format; import static java.time.ZoneOffset.UTC; diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestingDeltaLakeUtils.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestingDeltaLakeUtils.java index 618c953171eb..251985eb1afe 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestingDeltaLakeUtils.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestingDeltaLakeUtils.java @@ -28,7 +28,7 @@ import java.util.stream.Stream; import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.DELTA_CATALOG; -import static io.trino.testing.TestingConnectorSession.SESSION; +import static io.trino.plugin.deltalake.DeltaTestingConnectorSession.SESSION; import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; public final class TestingDeltaLakeUtils diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java index 3b6186fe7654..fd953a86f30d 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java @@ -28,6 +28,7 @@ import io.trino.plugin.hive.FileFormatDataSourceStats; import io.trino.plugin.hive.parquet.ParquetReaderConfig; import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.TypeManager; import io.trino.testing.TestingConnectorContext; import org.junit.jupiter.api.BeforeEach; @@ -142,7 +143,7 @@ public void readsCheckpointFile() ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(SESSION, tableSnapshot); tableSnapshot.setCachedMetadata(Optional.of(metadataEntry)); try (Stream stream = tableSnapshot.getCheckpointTransactionLogEntries( - SESSION, ImmutableSet.of(ADD), checkpointSchemaManager, TESTING_TYPE_MANAGER, trackingFileSystem, new FileFormatDataSourceStats(), Optional.of(new MetadataAndProtocolEntry(metadataEntry, protocolEntry)))) { + SESSION, ImmutableSet.of(ADD), checkpointSchemaManager, TESTING_TYPE_MANAGER, trackingFileSystem, new FileFormatDataSourceStats(), Optional.of(new MetadataAndProtocolEntry(metadataEntry, protocolEntry)), TupleDomain.all())) { List entries = stream.collect(toImmutableList()); assertThat(entries).hasSize(9); @@ -184,7 +185,7 @@ public void readsCheckpointFile() // lets read two entry types in one call; add and protocol try (Stream stream = tableSnapshot.getCheckpointTransactionLogEntries( - SESSION, ImmutableSet.of(ADD, PROTOCOL), checkpointSchemaManager, TESTING_TYPE_MANAGER, trackingFileSystem, new FileFormatDataSourceStats(), Optional.of(new MetadataAndProtocolEntry(metadataEntry, protocolEntry)))) { + SESSION, ImmutableSet.of(ADD, PROTOCOL), checkpointSchemaManager, TESTING_TYPE_MANAGER, trackingFileSystem, new FileFormatDataSourceStats(), Optional.of(new MetadataAndProtocolEntry(metadataEntry, protocolEntry)), TupleDomain.all())) { List entries = stream.collect(toImmutableList()); assertThat(entries).hasSize(10); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java index d31ca20fe438..245b602ad40b 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java @@ -24,6 +24,7 @@ import io.trino.filesystem.TrinoOutputFile; import io.trino.filesystem.hdfs.HdfsFileSystemFactory; import io.trino.parquet.writer.ParquetWriterOptions; +import io.trino.plugin.deltalake.DeltaLakeColumnHandle; import io.trino.plugin.deltalake.DeltaLakeConfig; import io.trino.plugin.deltalake.transactionlog.AddFileEntry; import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry; @@ -32,6 +33,9 @@ import io.trino.plugin.deltalake.transactionlog.RemoveFileEntry; import io.trino.plugin.hive.FileFormatDataSourceStats; import io.trino.plugin.hive.parquet.ParquetReaderConfig; +import io.trino.spi.predicate.TupleDomain; +import io.trino.spi.type.Int128; +import io.trino.spi.type.Type; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -41,16 +45,23 @@ import java.io.File; import java.io.IOException; import java.net.URI; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZonedDateTime; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.OptionalInt; import java.util.Set; import java.util.UUID; import java.util.stream.IntStream; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.io.Resources.getResource; +import static com.google.common.math.LongMath.divide; +import static io.airlift.slice.Slices.utf8Slice; +import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR; import static io.trino.plugin.deltalake.DeltaTestingConnectorSession.SESSION; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.ADD; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.COMMIT; @@ -60,7 +71,29 @@ import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.TRANSACTION; import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_STATS; +import static io.trino.spi.predicate.Domain.notNull; +import static io.trino.spi.predicate.Domain.onlyNull; +import static io.trino.spi.predicate.Domain.singleValue; +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.spi.type.BooleanType.BOOLEAN; +import static io.trino.spi.type.DateTimeEncoding.packDateTimeWithZone; +import static io.trino.spi.type.DateType.DATE; +import static io.trino.spi.type.DecimalType.createDecimalType; +import static io.trino.spi.type.DoubleType.DOUBLE; +import static io.trino.spi.type.IntegerType.INTEGER; +import static io.trino.spi.type.RealType.REAL; +import static io.trino.spi.type.SmallintType.SMALLINT; +import static io.trino.spi.type.TimeZoneKey.UTC_KEY; +import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS; +import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS; +import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_SECOND; +import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MICROSECOND; +import static io.trino.spi.type.TinyintType.TINYINT; +import static io.trino.spi.type.VarcharType.VARCHAR; import static io.trino.type.InternalTypeManager.TESTING_TYPE_MANAGER; +import static java.lang.Float.floatToIntBits; +import static java.math.RoundingMode.UNNECESSARY; +import static java.time.ZoneOffset.UTC; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; @@ -91,7 +124,7 @@ public void testReadNoEntries() throws Exception { URI checkpointUri = getResource(TEST_CHECKPOINT).toURI(); - assertThatThrownBy(() -> createCheckpointEntryIterator(checkpointUri, ImmutableSet.of(), Optional.empty(), Optional.empty())) + assertThatThrownBy(() -> createCheckpointEntryIterator(checkpointUri, ImmutableSet.of(), Optional.empty(), Optional.empty(), TupleDomain.all())) .isInstanceOf(IllegalArgumentException.class) .hasMessage("fields is empty"); } @@ -135,7 +168,7 @@ public void testReadProtocolEntries() throws Exception { URI checkpointUri = getResource(TEST_CHECKPOINT).toURI(); - CheckpointEntryIterator checkpointEntryIterator = createCheckpointEntryIterator(checkpointUri, ImmutableSet.of(PROTOCOL), Optional.empty(), Optional.empty()); + CheckpointEntryIterator checkpointEntryIterator = createCheckpointEntryIterator(checkpointUri, ImmutableSet.of(PROTOCOL), Optional.empty(), Optional.empty(), TupleDomain.all()); List entries = ImmutableList.copyOf(checkpointEntryIterator); assertThat(entries).hasSize(1); @@ -153,7 +186,7 @@ public void testReadMetadataAndProtocolEntry() throws Exception { URI checkpointUri = getResource(TEST_CHECKPOINT).toURI(); - CheckpointEntryIterator checkpointEntryIterator = createCheckpointEntryIterator(checkpointUri, ImmutableSet.of(METADATA, PROTOCOL), Optional.empty(), Optional.empty()); + CheckpointEntryIterator checkpointEntryIterator = createCheckpointEntryIterator(checkpointUri, ImmutableSet.of(METADATA, PROTOCOL), Optional.empty(), Optional.empty(), TupleDomain.all()); List entries = ImmutableList.copyOf(checkpointEntryIterator); assertThat(entries).hasSize(2); @@ -196,7 +229,7 @@ public void testReadAddEntries() throws Exception { URI checkpointUri = getResource(TEST_CHECKPOINT).toURI(); - CheckpointEntryIterator checkpointEntryIterator = createCheckpointEntryIterator(checkpointUri, ImmutableSet.of(ADD), Optional.of(readMetadataEntry(checkpointUri)), Optional.of(readProtocolEntry(checkpointUri))); + CheckpointEntryIterator checkpointEntryIterator = createCheckpointEntryIterator(checkpointUri, ImmutableSet.of(ADD), Optional.of(readMetadataEntry(checkpointUri)), Optional.of(readProtocolEntry(checkpointUri)), TupleDomain.all()); List entries = ImmutableList.copyOf(checkpointEntryIterator); assertThat(entries).hasSize(9); @@ -236,6 +269,146 @@ public void testReadAddEntries() Optional.empty())); } + @Test + public void testReadAddEntriesPartitionPruning() + throws Exception + { + String checkpoint = "deltalake/partition_values_parsed/_delta_log/00000000000000000003.checkpoint.parquet"; + URI checkpointUri = getResource(checkpoint).toURI(); + + DeltaLakeColumnHandle stringPartField = new DeltaLakeColumnHandle( + "string_part", + VARCHAR, + OptionalInt.empty(), + "string_part", + VARCHAR, + REGULAR, + Optional.empty()); + + DeltaLakeColumnHandle intPartField = new DeltaLakeColumnHandle( + "int_part", + BIGINT, + OptionalInt.empty(), + "int_part", + BIGINT, + REGULAR, + Optional.empty()); + + // The domain specifies all partition columns + CheckpointEntryIterator partitionsEntryIterator = createCheckpointEntryIterator( + checkpointUri, + ImmutableSet.of(ADD), + Optional.of(readMetadataEntry(checkpointUri)), + Optional.of(readProtocolEntry(checkpointUri)), + TupleDomain.withColumnDomains(ImmutableMap.of(intPartField, singleValue(BIGINT, 10L), stringPartField, singleValue(VARCHAR, utf8Slice("part1"))))); + List partitionsEntries = ImmutableList.copyOf(partitionsEntryIterator); + + assertThat(partitionsEntryIterator.getCompletedPositions().orElseThrow()).isEqualTo(5); + assertThat(partitionsEntries) + .hasSize(1) + .extracting(entry -> entry.getAdd().getPath()) + .containsExactly("int_part=10/string_part=part1/part-00000-383afb1a-87de-4e70-86ab-c21ae44c7f3f.c000.snappy.parquet"); + + // The domain specifies a part of partition columns + CheckpointEntryIterator partitionEntryIterator = createCheckpointEntryIterator( + checkpointUri, + ImmutableSet.of(ADD), + Optional.of(readMetadataEntry(checkpointUri)), + Optional.of(readProtocolEntry(checkpointUri)), + TupleDomain.withColumnDomains(ImmutableMap.of(intPartField, singleValue(BIGINT, 10L)))); + List partitionEntries = ImmutableList.copyOf(partitionEntryIterator); + + assertThat(partitionEntryIterator.getCompletedPositions().orElseThrow()).isEqualTo(5); + assertThat(partitionEntries) + .hasSize(1) + .extracting(entry -> entry.getAdd().getPath()) + .containsExactly("int_part=10/string_part=part1/part-00000-383afb1a-87de-4e70-86ab-c21ae44c7f3f.c000.snappy.parquet"); + + // Verify empty iterator when the condition doesn't match + CheckpointEntryIterator emptyIterator = createCheckpointEntryIterator( + checkpointUri, + ImmutableSet.of(ADD), + Optional.of(readMetadataEntry(checkpointUri)), + Optional.of(readProtocolEntry(checkpointUri)), + TupleDomain.withColumnDomains(ImmutableMap.of( + intPartField, singleValue(BIGINT, 10L), + stringPartField, singleValue(VARCHAR, utf8Slice("unmatched partition condition"))))); + assertThat(ImmutableList.copyOf(emptyIterator)).isEmpty(); + + // Verify IS NULL condition + CheckpointEntryIterator isNullIterator = createCheckpointEntryIterator( + checkpointUri, + ImmutableSet.of(ADD), + Optional.of(readMetadataEntry(checkpointUri)), + Optional.of(readProtocolEntry(checkpointUri)), + TupleDomain.withColumnDomains(ImmutableMap.of( + intPartField, onlyNull(BIGINT), + stringPartField, onlyNull(VARCHAR)))); + assertThat(ImmutableList.copyOf(isNullIterator)) + .hasSize(1) + .extracting(entry -> entry.getAdd().getPath()) + .containsExactly("int_part=__HIVE_DEFAULT_PARTITION__/string_part=__HIVE_DEFAULT_PARTITION__/part-00000-dcb29d13-eeca-4fa6-a8bf-860da0131a5c.c000.snappy.parquet"); + + // Verify IS NOT NULL condition + CheckpointEntryIterator isNotNullIterator = createCheckpointEntryIterator( + checkpointUri, + ImmutableSet.of(ADD), + Optional.of(readMetadataEntry(checkpointUri)), + Optional.of(readProtocolEntry(checkpointUri)), + TupleDomain.withColumnDomains(ImmutableMap.of( + intPartField, notNull(BIGINT), + stringPartField, notNull(VARCHAR)))); + assertThat(ImmutableList.copyOf(isNotNullIterator)) + .hasSize(2) + .extracting(entry -> entry.getAdd().getPath()) + .containsExactly( + "int_part=10/string_part=part1/part-00000-383afb1a-87de-4e70-86ab-c21ae44c7f3f.c000.snappy.parquet", + "int_part=20/string_part=part2/part-00000-e0b4887e-95f6-4ce1-b96c-32c5cf472476.c000.snappy.parquet"); + } + + @Test + public void testReadAddEntriesPartitionPruningAllTypes() + throws Exception + { + String checkpoint = "deltalake/partition_values_parsed_all_types/_delta_log/00000000000000000003.checkpoint.parquet"; + URI checkpointUri = getResource(checkpoint).toURI(); + + assertPartitionValuesParsedCondition(checkpointUri, "part_boolean", BOOLEAN, true); + assertPartitionValuesParsedCondition(checkpointUri, "part_tinyint", TINYINT, 1L); + assertPartitionValuesParsedCondition(checkpointUri, "part_smallint", SMALLINT, 10L); + assertPartitionValuesParsedCondition(checkpointUri, "part_int", INTEGER, 100L); + assertPartitionValuesParsedCondition(checkpointUri, "part_bigint", BIGINT, 1000L); + assertPartitionValuesParsedCondition(checkpointUri, "part_short_decimal", createDecimalType(5, 2), 12312L); + assertPartitionValuesParsedCondition(checkpointUri, "part_long_decimal", createDecimalType(21, 3), Int128.valueOf("123456789012345678123")); + assertPartitionValuesParsedCondition(checkpointUri, "part_double", DOUBLE, 1.2); + assertPartitionValuesParsedCondition(checkpointUri, "part_float", REAL, (long) floatToIntBits(3.4f)); + assertPartitionValuesParsedCondition(checkpointUri, "part_varchar", VARCHAR, utf8Slice("a")); + assertPartitionValuesParsedCondition(checkpointUri, "part_date", DATE, LocalDate.parse("2020-08-21").toEpochDay()); + ZonedDateTime zonedDateTime = LocalDateTime.parse("2020-10-21T01:00:00.123").atZone(UTC); + long timestampValue = packDateTimeWithZone(zonedDateTime.toInstant().toEpochMilli(), UTC_KEY); + assertPartitionValuesParsedCondition(checkpointUri, "part_timestamp", TIMESTAMP_TZ_MILLIS, timestampValue); + LocalDateTime timestampNtz = LocalDateTime.parse("2023-01-02T01:02:03.456"); + long timestampNtzValue = timestampNtz.toEpochSecond(UTC) * MICROSECONDS_PER_SECOND + divide(timestampNtz.getNano(), NANOSECONDS_PER_MICROSECOND, UNNECESSARY); + assertPartitionValuesParsedCondition(checkpointUri, "part_timestamp_ntz", TIMESTAMP_MICROS, timestampNtzValue); + } + + private void assertPartitionValuesParsedCondition(URI checkpointUri, String columnName, Type type, Object value) + throws IOException + { + DeltaLakeColumnHandle intPartField = new DeltaLakeColumnHandle(columnName, type, OptionalInt.empty(), columnName, type, REGULAR, Optional.empty()); + + CheckpointEntryIterator partitionEntryIterator = createCheckpointEntryIterator( + checkpointUri, + ImmutableSet.of(ADD), + Optional.of(readMetadataEntry(checkpointUri)), + Optional.of(readProtocolEntry(checkpointUri)), + TupleDomain.withColumnDomains(ImmutableMap.of(intPartField, singleValue(type, value)))); + List partitionEntries = ImmutableList.copyOf(partitionEntryIterator); + + assertThat(partitionEntryIterator.getCompletedPositions().orElseThrow()).isEqualTo(5); + assertThat(partitionEntries).hasSize(1); + } + @Test public void testReadAllEntries() throws Exception @@ -246,7 +419,8 @@ public void testReadAllEntries() checkpointUri, ImmutableSet.of(METADATA, PROTOCOL, TRANSACTION, ADD, REMOVE, COMMIT), Optional.of(readMetadataEntry(checkpointUri)), - Optional.of(readProtocolEntry(checkpointUri))); + Optional.of(readProtocolEntry(checkpointUri)), + TupleDomain.all()); List entries = ImmutableList.copyOf(checkpointEntryIterator); assertThat(entries).hasSize(17); @@ -309,7 +483,8 @@ public void testSkipRemoveEntries() "metadataFormatProvider", ImmutableMap.of()), "{\"type\":\"struct\",\"fields\":" + - "[{\"name\":\"ts\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}}]}", + "[{\"name\":\"ts\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"part_key\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}}]}", ImmutableList.of("part_key"), ImmutableMap.of(), 1000); @@ -367,16 +542,17 @@ public void testSkipRemoveEntries() writer.write(entries, createOutputFile(targetPath)); CheckpointEntryIterator metadataAndProtocolEntryIterator = - createCheckpointEntryIterator(URI.create(targetPath), ImmutableSet.of(METADATA, PROTOCOL), Optional.empty(), Optional.empty()); + createCheckpointEntryIterator(URI.create(targetPath), ImmutableSet.of(METADATA, PROTOCOL), Optional.empty(), Optional.empty(), TupleDomain.all()); CheckpointEntryIterator addEntryIterator = createCheckpointEntryIterator( URI.create(targetPath), ImmutableSet.of(ADD), Optional.of(metadataEntry), - Optional.of(protocolEntry)); + Optional.of(protocolEntry), + TupleDomain.all()); CheckpointEntryIterator removeEntryIterator = - createCheckpointEntryIterator(URI.create(targetPath), ImmutableSet.of(REMOVE), Optional.empty(), Optional.empty()); + createCheckpointEntryIterator(URI.create(targetPath), ImmutableSet.of(REMOVE), Optional.empty(), Optional.empty(), TupleDomain.all()); CheckpointEntryIterator txnEntryIterator = - createCheckpointEntryIterator(URI.create(targetPath), ImmutableSet.of(TRANSACTION), Optional.empty(), Optional.empty()); + createCheckpointEntryIterator(URI.create(targetPath), ImmutableSet.of(TRANSACTION), Optional.empty(), Optional.empty(), TupleDomain.all()); assertThat(Iterators.size(metadataAndProtocolEntryIterator)).isEqualTo(2); assertThat(Iterators.size(addEntryIterator)).isEqualTo(1); @@ -392,18 +568,23 @@ public void testSkipRemoveEntries() private MetadataEntry readMetadataEntry(URI checkpointUri) throws IOException { - CheckpointEntryIterator checkpointEntryIterator = createCheckpointEntryIterator(checkpointUri, ImmutableSet.of(METADATA), Optional.empty(), Optional.empty()); + CheckpointEntryIterator checkpointEntryIterator = createCheckpointEntryIterator(checkpointUri, ImmutableSet.of(METADATA), Optional.empty(), Optional.empty(), TupleDomain.all()); return Iterators.getOnlyElement(checkpointEntryIterator).getMetaData(); } private ProtocolEntry readProtocolEntry(URI checkpointUri) throws IOException { - CheckpointEntryIterator checkpointEntryIterator = createCheckpointEntryIterator(checkpointUri, ImmutableSet.of(PROTOCOL), Optional.empty(), Optional.empty()); + CheckpointEntryIterator checkpointEntryIterator = createCheckpointEntryIterator(checkpointUri, ImmutableSet.of(PROTOCOL), Optional.empty(), Optional.empty(), TupleDomain.all()); return Iterators.getOnlyElement(checkpointEntryIterator).getProtocol(); } - private CheckpointEntryIterator createCheckpointEntryIterator(URI checkpointUri, Set entryTypes, Optional metadataEntry, Optional protocolEntry) + private CheckpointEntryIterator createCheckpointEntryIterator( + URI checkpointUri, + Set entryTypes, + Optional metadataEntry, + Optional protocolEntry, + TupleDomain partitionConstraint) throws IOException { TrinoFileSystem fileSystem = new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS).create(SESSION); @@ -421,7 +602,8 @@ private CheckpointEntryIterator createCheckpointEntryIterator(URI checkpointUri, new FileFormatDataSourceStats(), new ParquetReaderConfig().toParquetReaderOptions(), true, - new DeltaLakeConfig().getDomainCompactionThreshold()); + new DeltaLakeConfig().getDomainCompactionThreshold(), + partitionConstraint); } private static TrinoOutputFile createOutputFile(String path) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java index c17f72519d4e..0b1260a50aa8 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java @@ -36,6 +36,7 @@ import io.trino.plugin.hive.parquet.ParquetReaderConfig; import io.trino.spi.block.Block; import io.trino.spi.block.SqlRow; +import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.BigintType; import io.trino.spi.type.Int128; import io.trino.spi.type.IntegerType; @@ -484,7 +485,8 @@ private CheckpointEntries readCheckpoint(String checkpointPath, MetadataEntry me new FileFormatDataSourceStats(), new ParquetReaderConfig().toParquetReaderOptions(), rowStatisticsEnabled, - new DeltaLakeConfig().getDomainCompactionThreshold()); + new DeltaLakeConfig().getDomainCompactionThreshold(), + TupleDomain.all()); CheckpointBuilder checkpointBuilder = new CheckpointBuilder(); while (checkpointEntryIterator.hasNext()) { diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/statistics/TestDeltaLakeFileStatistics.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/statistics/TestDeltaLakeFileStatistics.java index 9145e2d112c9..ac6e842b104a 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/statistics/TestDeltaLakeFileStatistics.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/statistics/TestDeltaLakeFileStatistics.java @@ -27,6 +27,7 @@ import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointSchemaManager; import io.trino.plugin.hive.FileFormatDataSourceStats; import io.trino.plugin.hive.parquet.ParquetReaderConfig; +import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.ArrayType; import io.trino.spi.type.DecimalType; import io.trino.spi.type.DoubleType; @@ -107,7 +108,8 @@ public void testParseParquetStatistics() new FileFormatDataSourceStats(), new ParquetReaderConfig().toParquetReaderOptions(), true, - new DeltaLakeConfig().getDomainCompactionThreshold()); + new DeltaLakeConfig().getDomainCompactionThreshold(), + TupleDomain.all()); MetadataEntry metadataEntry = getOnlyElement(metadataEntryIterator).getMetaData(); CheckpointEntryIterator protocolEntryIterator = new CheckpointEntryIterator( checkpointFile, @@ -121,7 +123,8 @@ public void testParseParquetStatistics() new FileFormatDataSourceStats(), new ParquetReaderConfig().toParquetReaderOptions(), true, - new DeltaLakeConfig().getDomainCompactionThreshold()); + new DeltaLakeConfig().getDomainCompactionThreshold(), + TupleDomain.all()); ProtocolEntry protocolEntry = getOnlyElement(protocolEntryIterator).getProtocol(); CheckpointEntryIterator checkpointEntryIterator = new CheckpointEntryIterator( @@ -136,7 +139,8 @@ public void testParseParquetStatistics() new FileFormatDataSourceStats(), new ParquetReaderConfig().toParquetReaderOptions(), true, - new DeltaLakeConfig().getDomainCompactionThreshold()); + new DeltaLakeConfig().getDomainCompactionThreshold(), + TupleDomain.all()); DeltaLakeTransactionLogEntry matchingAddFileEntry = null; while (checkpointEntryIterator.hasNext()) { DeltaLakeTransactionLogEntry entry = checkpointEntryIterator.next(); diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/partition_values_parsed/README.md b/plugin/trino-delta-lake/src/test/resources/deltalake/partition_values_parsed/README.md new file mode 100644 index 000000000000..a3ae6655153f --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/deltalake/partition_values_parsed/README.md @@ -0,0 +1,21 @@ +Data generated using OSS Delta Lake 3.0.0: + +```sql +CREATE TABLE test_partition_values_parsed ( + id INT, + int_part INT, + string_part STRING +) +USING delta +PARTITIONED BY (int_part, string_part) +LOCATION 's3://test-bucket/test_partition_values_parsed' +TBLPROPERTIES ( + delta.checkpoint.writeStatsAsStruct = true, + delta.checkpoint.writeStatsAsJson = false, + delta.checkpointInterval = 1 +); + +INSERT INTO test_partition_values_parsed VALUES (1, 10, 'part1'); +INSERT INTO test_partition_values_parsed VALUES (2, 20, 'part2'); +INSERT INTO test_partition_values_parsed VALUES (3, NULL, NULL); +``` diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/partition_values_parsed/_delta_log/00000000000000000000.json b/plugin/trino-delta-lake/src/test/resources/deltalake/partition_values_parsed/_delta_log/00000000000000000000.json new file mode 100644 index 000000000000..9c3414fefca2 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/deltalake/partition_values_parsed/_delta_log/00000000000000000000.json @@ -0,0 +1,3 @@ +{"commitInfo":{"timestamp":1699426607713,"operation":"CREATE TABLE","operationParameters":{"isManaged":"false","description":null,"partitionBy":"[\"int_part\",\"string_part\"]","properties":"{\"delta.checkpoint.writeStatsAsStruct\":\"true\",\"delta.checkpoint.writeStatsAsJson\":\"false\",\"delta.checkpointInterval\":\"1\"}"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.0.0","txnId":"8e5c07fa-7101-4a83-899b-6d124f3bb8d6"}} +{"metaData":{"id":"2f4075c9-1e3f-4fcc-b5b3-1f15d800a400","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"int_part\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"string_part\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["int_part","string_part"],"configuration":{"delta.checkpoint.writeStatsAsStruct":"true","delta.checkpoint.writeStatsAsJson":"false","delta.checkpointInterval":"1"},"createdTime":1699426607271}} +{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/partition_values_parsed/_delta_log/00000000000000000001.checkpoint.parquet b/plugin/trino-delta-lake/src/test/resources/deltalake/partition_values_parsed/_delta_log/00000000000000000001.checkpoint.parquet new file mode 100644 index 0000000000000000000000000000000000000000..fa1bbfd0199f92012b6e883f9d91d9dffe9d7586 GIT binary patch literal 16816 zcmeHPeQXrR72iD{=ko`~5YKXMTrh_@Ok$fmY-8g(gai~BL@9AnTD79l`fd-dxew=V z?LZKbLlxvYu50xpN+@+ul)4B}iV};GDk2dgDM9Fzq9&oKMI1^i8u5pPP!&Ze>U*=h zvvYfA-yH_he1N%|ot^i4Z{ECl^XAQ%&G&Wsh(OB7Jo4%IU;a~2MI7+xAWlIjbQK7K z0RL{n`|~16isYY9{p<7`jzhScadXLY$JfX%z(QQR@#0tA_x#+Oh zdqr`gzOl7Y4tDwE`qq|!Qs1PscErA+;`|QQ1i@1)L)Kc{o2*uifRg)TW z;Mr%c^8;JnuD9VW;8xG7Z1`vq@4oQid48-@EP@&qTEvwXes_s)m0}TEud^}xs=xmc z=^_KK4!ty+vz+C}sZ~@9;2?w*!6t7@vwu~+PiYL+HwFFv`mW}##(H1S*Bof|dgUgs zSD$(3fBpWO#T<_Aip!0eIoWWQkFx*PP`NggII@9+OFxJR}GEYp0r6MVQT6lGZhJ{HMy zA~|;WZ$F2jDF#Fb0e$so5H46n4o(&>? zhiJbS2~R8)$~PopQP+lzLe+ZcYP?qt17-*OojCB1SIRgRH98BB(kZMLMCUrx13H?Z z98M^>k{IjFkv9+RDdh}DUvH{~-&eDR1YO*}j~y9h>je0A-NiWt1OqX!&7Xhzlb`YN z0BeJb1++oy+5h*~pJr{W)&sb?H@l_oqjIiT=@Q_F*eWH5knPb{=~|N>j%d*dUEP=C zXFWt3Uhh;<$;=`AD6AcX1#z5)IF+svJ~A;|*nHkf*DNjeFl(JgoBKxE-w|{w#J|wJFd(6mmw@vYsxUbjTg5~u+NJT^t>jQr6$K_x#Mo6Wj(%`A?UnYCtdYz~vRubj!7EN5+p=CWeaEDp^xYt7=&9cHar z9I7yD&EgOt!-j%KB-Y}BSsa;9w&*>&HlK-h(#49ymE?pmUdcPj9s};3Dssjs?^{56 zb&%nOZ0sf$GQAzFX2r>B27O`?D-ODuaA(cp;9{*7jQaY69i<8I8;?GcfOsh)lTxAC z<|nuc0|8Q@ z(b~IZb-PXquyb&645>i6MGp5U31TC*jdcLqYu|Oyo^2><+a3})R0<u9y~ZmWymG{X!Gxm1>JASj*p>Cs z`**5JLf5Nbjt4?fIUG`V>NOF0M`!Hu_0fQ`L#rw3k`oG61+?e+0VOE+gjJ|o8;e9j z>K1*ZKyb;Td<>=%N&vNu0C`G~&{Q1`J1BiNtypGB162sb$PHl!GIXP~}K> z`Y_OOsZe&P3iKoJKq#zyTi;FK2;CYJ`67>-6E|Dyz>EoOKGCFXAvPQF(T+KR zB%mbx@lZD!DK~V%SUe)D+T?=a?vBTLLjfgjwkV8Il9|j8DqpGQNnoD^24upQGy7W` z3-?5#$j7m|4 zixz;ZtwfUgYmoP80ANzX!b2AvN;zPY`fTvN-36Mm=C$+NNM=oM-v$@r5$k9zaSNW|!W7lhXe@1H9a4w15t>>i z{csz=oU6r2KRzuC8#af8IvBSCjUvz_1v+HqEQoz&skSK6*+ z*R)TVU^LgJJ@M1*3NribF47=)E&?-sQKSPi1vB1jQhuAHPM~eq({}n7aMXRC_Qmtu z(+)fj-%VBso-sIdF&1Kdmtd~N7(k58wy63!B))Jrj{G$sHbzP(PTxsT5fSN{73l~f zU8hK6khM3wm5yX}SX7uYi!i4UVqYB+#yZ0V|y+&b^0BacqbMZp0C9TB!k@y$&sI{TFZ2VjG zo@Vo=2st#@BG1bRd6YsXvqENas8moZ>S;txYJ$!)XJTeSd7nyUoM(~o0-{~fWXy_| z=2oeIR>*4zInY2g3D*z|x0+eNG|x(m9J`ab>B2tbh-&ehCtViYbBKG9;ts<;BhBzq zk|0zu19s_z%`;rt?)pfa;Ze!dC-LkAU*Gb=*}Z<;Z#}S?@-W|k;0`%W6sgZ_FX4Qs(GBR6bnMuKJr5Hqq?-VI&87hG@O3YwHi-f4J@4(5l_(wH=LX{0P+ zraK`oA?*=8$vbmPP4uk+h9gawns}!wonLAOFU{>0OZ#oo84*a4+mt7suI%Tfyb_*6 znJzZLA(!CUw;*?+nAa?pdZk_OC=e zGQ1Gd8OGVHS%P_(p_b?N7INFS5|bwrz)jzX)v~;ATE3UE#O>Dvu%rjQ+9$B$o_H4H z$m0A;+Svg6rdnMHxU-bJUqZPb{*4jv(kc3OeXUP%W)omYTNK)74ZF7~BoIEihvU zGf832P7RYfK^WRuE7}=E8*0U#4lXu#i58)Zv^c{VTx^kQ0&xx_j&!I-a(JB5sl^p= zE??(z#p2r<YCNq(A?k^T+7m0BpwSJm!uowvBw*Xn{}@JOL7Sk zAW_epW-jgNSuUf%JAiOT1J7A5_!(Ya?~e%(j^#tnofPK>v2}OOB0{2vVdtU|JfCF* zc`JmSX=w-A42W@#$lXv0Icq!^)^1&D8mGzSjFoF+Q5ACiiA`|nQoC@Od+#yX%!F{~ zH#*i6^c%%7kK9d!u2?Lr$Y`rMS{feIGxE<%o5pGO7JAg5E)Gnesjkp+*8v#a4Oka{ zci_fxw2KBNeg1m^^K!xrP`HG!iIS&{nl~WwgUG{!usap`aj0Y0r>D4taiee+AEV#f zU}$V|Ok<6ocYvK`+UGGi+NpJ3ou8S9H+%rQ(ZYD=>0WP&Tg-HA#uO-TFKaO^hQw^@ z3$%P01NF7D{@PjR5z^iiXPt)wGj4OG&$a3Nw;9vs9YoOI>0!B+pcz*N83U%~X^h%>t&kDB^Nz>?DkYsX z1E|ysj0}i%=oZ>dYIf;1u+C$IQohKq84NQQ?;fDQO4S?%UITRwCY@>BG$1eTA4~w8_|t1`lAV!Yn+xFX z>XqV+62~>J!XGY)J0rQfo7>ylB|pVW}YUwGkdt8V@? z+o>97awVXrN_m|d!9;&|{*8Z?=*HPMDdU*&sEtNgfUCXpC&NcG`OA~#lh3@TAC+b%^7JQT_GSP6hoqZ4_tZlJ`c1odkebI1t`>CI=x=tnwRo2}JyMh3 z+3fdvo!u?nO-_&B)8cD)yTxX=TQQo4pZw?%wj5fxSf+-HiN3ET_2lq@%X{?ZJiC+1 z@d{T2awv{laS&(kzjQwvYIG2sMfD>&3>zsR{U-9}-yR(>tM&$>U*J=U4&DXc_twO> z^++{KYrvQHdgC<>HU5AU^u=nHe*fN@uo%Lx0=}AtS=T=WAxqjUMPW#|Hy9MVgYX>? zNewk2DK7fNxLC9F-tF5P?j<)>%F&Jg9v3hE(eagBr5q`iuHuTxeF@IIaXUX%TnKaY zNx``2@@|p5clJbJ7OuOafw**QT#Uz-$8M$bUclXRJy)?N775$dtmUd!!&IYvVi3gY zf?x0d>g~s2Nf?V-g#`vHuFJ$*J5dklX#8R@CSfGPJOlmM$B#UE9^xGH4W2oOa_~AcU+%Q_tvV4YJ zRrg_^#oKKh{3a&5VC5$JXtLXuWoArr0!1@~#rV?+$&lMUUgD&OXjGFf4WdyGkv(hj zp|EBU7R0ft;@EAa`ZP!zVe+``wnCJvQBSAlDFC51^(zIc5yf@E48Kn)Fl|9-~sOnP8t6BTz_fE}YvoM3EZ4B0 zMi1BxX|Vw|+cY7(s<0DpHq-=+STLcFN55{j*^FbDmsd0jI6RoA8J)SFyXvZ8PpVDb z*Jkw4Kt&z3hGa&k4BgZ>=d@(`*k_esCCdclP$hJ+R)S~E6i;>+C<9(9Bm26U)7p!E3(mYa3)+)^_#pD*HnJQsS5~Zw}(wd1ftygO%W@x=yGf}Sf zmXmb~i~Tc+Pic)3!{t(mOUdbMVfXuVo9DQLY~ zGdV}=okRAhS{=2q?g2Y%j@rovWky4DnP|J_vF6A;^0G-8k(^H+P~rA}jhs;1=c-Ac zGGt@{Q`W&6(xvoX(wc({8T8It);wLyB-&TUnx~Z}81>kS|tpe*KX6PyElcKReBrJa_4T`b&cX}Wllzfy{^!Z4o zOl$5D<69I`fSrlKF%SpR4PvlYiosFdw6+0YTgSUNPzmgvDSWzD?-~iCfo4?p_`CNn7Qf;%+e}VV6(-JjW;b#ok~Xx>iO) zp+J0tvQi+pU{F40LNUpQ+C~9be#7R2H7c4eDMAB=B`Sp?eG-|SxxCaeW~s`-6^O;f zP*3JE&~>Slw#Frxhi_FND1B2oP2dV$9TWK?ubUMQTb*FWq(auZDZq9kT9_FRZK_>i zN2yPWd82_IG*aEt`6JPg7?(E}EO$>d(iiYaQEfy~gpy2Wf6%Q^CVLV%XMq7}@DU1p`+BVsDhXv|J}uPEVy%qVz>V zVj!&U8`V==U>GPe~JSy4BO_V+o^A z@CJZmHxfZOMFCO(z@&!2Ljw+_JY68{E^wrP)?5#|Qi|_WI|oi)J-2BrSbbkRYI^iX z&8e38_1ZsmBe4k=9pniU=XeRqU1jzE-!{#c` zj{Z2S3j_8BD5aZmPeN>=YknT}-oJ=+a*hO+^6hnCER!%TltVhQg*RmBnj;~j^0l0DM zegST}6ovA+Sy%yMI^ft3zX6&4t;af!SeI`gHJsx}aoRG_^d@7$twNWu7KR#j(FnpE z_&V<3D-*)7CA11_gj-;+0q7zE4N{;KROXfK*p91aNS5?T5}hl0mQ;xW@4=e@Z>O z9r2D*ycFOmc9>1HQe8sG@KQ#+Gl+LmmJcqcCc@LDlMF90!yw;Bh<8DjPs5{oQp-LW zdCEwhxEaWIIjOqJnK+buykwPuCCUx5Jb_>bDOd`?^tvA>yJcA|1zjtwbEL{U|i~&?y5I&2<11?nVP=%&kg*Y=AXPO>=KfB>mF zzU%^K^`*l`$hQ#kk}Rb#DP%TxniLFDo=3>t9;!wSQr+)aJnFC!;ZsC7Oc9cEm{2Oh zq)s(C#~{(Z<$!lWmI(0FY?>WA;kZfnX+JOQ;T^+sI&v#+=Qc{P)aO(x&karQSPw0S zT=Kk!9N{^~s4dI9=((ABpi!GaEyqzU$tGN=0XX+$s6~IuW+(%81Ih@Zyhu?-HI#Oy zT#I}vXQy$Bv|iZY7`1n>dOmA_w$vIw>(kHqB654Hi7bV9Fc)RU1pRg+GxeOGB~iru zZfLGy4iByX)c$7F#IAXHpc?$VxU4koTM!5b~Zp@8$9^>AJs}Q<-LJdXkoa|D4^>YrNmgPmR??BIn5ct>)6N2_Jr z`UK>dN--*h3`bkBFtPt@nf%g=U>R<&OnA&BoZx{Jxs7{in8TH$UI}MWrbBITPsBOS zRp%}g^O|KspU?-SC=|V+gu6;TgUjeXkF}F}&M~q;%LAB}^;J&lJ@NE2vH%Jg>Yb+6 zf_<2!SI=$tO5k>`9Sax-Ygoco&q}8mB~!|>Q|N->aNxqbQu!ya(tXmi_&Kt8Y6%{= zE{(?`t^CSZTtXIj?<)U97TPC0i(|;*BxNzUP+JqV{%fe^8qHF!e+(`(#IaKdI<^Gk zJjok5)ypgM(Ex&xe8_c|t@%N0+@7E%0tpezTbS?6J_ygs91Hb&QwLQ^nD4 ztu=)1NF*qUXsbC|8s2p?^3O|~>V@zIdRv|;4s4(CUZLu4)EV6rtV@6WK6M=Vu3n?h z|16;0?@t3oR|3{h^0ZOy5kYu2H?v%Ngg1j^gX8cd5JF`N1Vg)eoXzH-)I zIqN)9+RwyU=i$KA2T_@OZ6g1z<+OPx5%izMu-Z(%8Ncp17Lzz{5#%~Cqi5b_f*GqX zyckj5OqB;=41jUHCO$0>i_pI~W76h5UFOP_2Wkw7&0CB{#cj~7JWw;9J>Zqvl#d;0 zFWfRYUlWaM`Iqj69FrxdV4cUJsZ|%ofT?)`qc*Pmh-OZl%CXaiGucf=Ksaf*2ENOE4+S}Y-pD1}+JyNr~ b-RJ8T7r_Hgc%J`1{+sJ@bKE-kUo!j`hD4H9 literal 0 HcmV?d00001 diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/partition_values_parsed/_delta_log/00000000000000000002.json b/plugin/trino-delta-lake/src/test/resources/deltalake/partition_values_parsed/_delta_log/00000000000000000002.json new file mode 100644 index 000000000000..e6f9284d3789 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/deltalake/partition_values_parsed/_delta_log/00000000000000000002.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1699426626909,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":1,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"452"},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.0.0","txnId":"64a2fdf1-ecd5-44b2-ad60-42d487414f5f"}} +{"add":{"path":"int_part=20/string_part=part2/part-00000-e0b4887e-95f6-4ce1-b96c-32c5cf472476.c000.snappy.parquet","partitionValues":{"int_part":"20","string_part":"part2"},"size":452,"modificationTime":1699426625000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":2},\"maxValues\":{\"id\":2},\"nullCount\":{\"id\":0}}"}} diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/partition_values_parsed/_delta_log/00000000000000000003.checkpoint.parquet b/plugin/trino-delta-lake/src/test/resources/deltalake/partition_values_parsed/_delta_log/00000000000000000003.checkpoint.parquet new file mode 100644 index 0000000000000000000000000000000000000000..7215b844f02d48b854ff2c6cdeb3d951d773071b GIT binary patch literal 16925 zcmeHP4QyLi6~51L95+eRw99?7S8oex>SiN$?fkd6U4ODFsmd6o6a*UCi{Eo+=AU3Y zS-YvIDiu^A8l^y0s;R4BunCnCK{YA}6_lzBQ(#Oms;XjR3=~QQ)dr&)rf|-E@7?$O z?8J7mb^L7YlKbwx=R4<~d+xdCo_n`@*KZ9{LL2B3`ue3G{jthJYvI#DorKi5st6(Q z?;Fs5uA1(()8GB(J?A~Q51r7~-Vh6V<=-#yfS_onsl-?DA! z-t}9z4Q?BJ&pks!E7O@&G_hmv3eto+(bejxqWsFIP$tp}aKz)JtaL>MALze1)v`#Pn_a8UB~ z^~#E`OX&>&iZtvCcLXJ+t1H|a>5LSIBJRJFj!g<79e{H{2@H4j_4O*gRo#&uUsqTO z`i58agngYI;qGvxtGA=8x5vL(A=FNL=xX}tcYl1w7J?5uAxj?s)pK3cL0KI$Qw_{S zGi{~EujO7})w91K(gCF(MWO&$ER1F#!u6aC?5|9Gh?L~SUV({omUPVf%^E}TjS!wx@pb^OH6c zoztTuLx)@FZSH$@~a_-uJg{EE&U1~2F45=FAmi^q> z>4VjJH^cIQmAW$im)mcnE%eXRKm2u#Y5a@&m>SoTTF_III^P{&sRs{T_`So_&wanv zj~kI*=+HFxX8M#;J`2s{SC7ANf!mSQOxXORnOu4ND=(XS<(Y{({8G&R-`hVzhv~^b zfAQ~5lX#ms})T0w^$k*+{*cX*XAsB}iaONYb0;qKv1UoaBvmiqz$sVflBjOO{H z*T2YDt+5=5P)2(zZKEeY@vU#0m-CW-RgS6}kVA9+0EILEx9|SxFWjQRKJsTifDWL{ zZWpTPgpK~=6XVa+npAK_ct+E~Z-MtCEty>-O3UgNaOtDrOiO!9B&x*Zbj#`wKhlzr z;@A|GTiO?|I0l=cvO`Hhm&9l+CJo1+4TzNXmN@KOvXqfpRzI?Pcl#rBO_REB3Lhew z1s^@Lku<3r#_COE0sZ)XV!v-UjAAR@M!)yO)#qS?uZK^z9o(UA2hFe@WXL+(?EB7T zfgTp z5kN7#5AGk(2aRI-3x|GW-h%RN$5uhNH`RH#7Yl9|fxpz|7HdhVPn+Adjz?k;9hst! zH7rh8i9dZ1P6YzK8R8`~M9E;TXw3Is6xI&Hf;diH9Ji~^B#o(uEf{dSs!^^EtaWO1 zdb9!%deKDW)I$$w)}s+qvj8d+n9fj)id1;RI)ifTxJAP+woAj<9qQPbGJ4?dZzQ+;vY4Yf>+on(gSJfr>e51Idm~8LF9Y)~aWpvPxP# zqLZ!@k%LDZ;HCWnFFqsClXk7cB?sNA6;IUAeS%iL;-vjradH72(Tc}v=?1O%QZ3!0 z6;IUBd$eM%jx(96=f#{+oLXqq8pWvwqt+-+Ei!5s(R(!(=NqZ46(<+-4p)uhixaof(>AR~_IA2ghdbX)vwHbZ3mwzCOf2J+7njjZTJ5n`I-nJ&TX~14 zm-FI2FBk5tQQWsetp%gby?3BK4SwUnp)^EGaf#NGZkwwrvoirlby~mN=BkEl#h|>L zZ1I5gQ9IPsj47#fG?`Gp>S9tl^ZpSC1{ImnO0rCwR9gFplzBiS1=vNnIYu);x?PHm zDrq>-+qSd=Y`5LD;6Y{A5X4`w675J#Tvbq`vt@#?R=oxVv8SUCWBLM$jq#)$jYPu| zqHl}Fp_)K-Eyk!DACM9|6qL=Lk#@|7D?l`WQ&HFulyD}Q0(~(pK}?WLNjsEn5L)XZ zqRDFf64NtdiC5n`I>;jm9!iwJ{lwM>E^CnF7H@i}EqzNh>mH8wKF; z4IdBYxL8=E2@M#IloC&lDRfEU^s-r4tSbj+Af1uoBZbqz=A}v5nNeUI@}_7^dAD|! zz!`dTOys|L-kf;U>IXBXjjGmdQN9|{!t8it)9nf?>SQGyPDMx1NKI20Nv7gbMqOMm z-6N^wSX5S0Mvs~#Bbm?oU`wH1^(1i00s}JO8@c^$OvXmz3FPD05>i*cJr+%8D4bSs z05!0};z}Z8&@K!o6OrhS(G*+W^K~0_;go{jin$3e4b(*@8BWF^d&t&8V?3JJsz{(6 zef{w258yeHSEF447IfSMh@&ay(#joF13Qz>iBe9+rD#H5H@c^GsMVU(bhiy2F z@>;cc(hlvR9(&uPf_``_oz*XT5Hu(M(Tuf&=Uj9CE|@&+mSK)(+md$y=DFIlhPuR~ zZS<&(crQVosouTZ?r;xOfxfKn*4Tw{_XR~ZieD4NZwf%?%~g9qm*&H-Y144G3(}%NW39ZZ_6&ox1p_~=(B;tM2hk{MjqhK@QSwb-m zIaO~FF^iB-FvuK0+Gd8-LO2eUvqHUyP-j%>AelT1lsN$8cvA~4qP>QAFRG#eo@R$d zwPy++IbP0+H*pu>U03CUJG9yGOu;9|n`*Gg_a(%8%#X8?Gw|4&G-6dwp0kplLh|o0 z@+r7lG%vH6LR$_twa6mN&k*c_Dhq&_bw5Ka%wu2%x<$O#n_Co8vs!A3OLVlc9GCiK zUS=;Nv)BC8PrTASkpP9M5j$EEtNjty|E#_R>w6NUR z{BWgAF1G07z&ap)ekIP$RUp=ts}UE5?p(81tw>)*q`xwx$t8w=(vYk!iV3sCBFq#* z91JjF3L%Of4-9#%V9z1gBmqa7h)a5TT?~&06u!BHrt& zL~tC?V{d*)g~KUZrxSvBK=6*cL*=Ey%j+dN+saC;mxpF{tQR*xEP1$-o)C!l$Sp<2 z#ZKAW10A`=qUKjnE$6z>oGun4H=oHl%0(;6{*8e0GDA7Clnc?vmFrN?@cit~khY53 zy+@XYc=n$cKwJ8a0IxHT`6zPR+f7%)e!#e4fc`-%Gd(L%Bv8co?&?yDCQc&iR5xnk zvCwKts&QMg?Zjh%zW|9FU++A z9bA=bx9ip$k{R#LbqLd~qJg4Tmb!qyZ}lO-)Zf%mZRx)@pV` zFKz_ad##&gC$hKe9%Fzn?1H=@%N6Q5LRjhb;vPZVD|oZFW3Ip)R-5w^M(XMsGCb6W ziHY;gh5YhUW(97qL43?6W(6QcZZlpnx=LSAnw4;%A7r}N2e(SZd#I&+p}5y<5XZzZ zAVs0reWptYni)LR5AO5wD%wW86U&T9f@|5F6J_2Lo%_TxNMPtUphgMSVUb!hx6{b& z;A%{lTm){~(F@WZTB5Ze!OB=}5H~@0IB?<3s`?XHnZC>{eu*reU5y8>W34l=$V=8T z7X9J?bO#oAORN4w7RHyE#bso1ow1k(7TU3b55ki9wVI`x|CnyI>|=ZG2GHa0!u|2- za`r2AGFr$mcTdLEkN>|0@x>H^|CfjHtX?+|OfPH}1lJT!5)yIjfC4!<-a z!Of+gU1IvRn@Lx7r@z}DAg((KdZa%V)1O}ZQ^|+?`a?F?W6R445+F0qon|@x+2tvx zz$bTbDFWXm-qYz`UK@`A5KC4sAafwj zIWCPrB_yx$CRTmqsA`<0mWx(yOeQjrkx$xJUL@|o$ys_Fin)6K!U%$N0Z$5C(g4f@K@ z0>*2Ad7zlm76wX%HflUMs0^Y455gZYRK}s5zgC{%)x%ujEIvkiD8bSA;+Vx6-|7In z#Imnoa@1?$iUvPF53lo%P$! zx`L4Q4RO{LI57SBRN-2i&3_v)ZN)(Z`%h?iZl=AY7Y8?o&I2R+wf>KCO6B&UBSN2U|^NpJ>1*brG&dzbq3_V-auHElweO#=?e78 Y^00IVJokjh{Qu#9$#gExn%i58wkJGacy=eO;zZdh?0EyO}7xRBybL?knbb#SJSc_>o5@+7q=b-TBDqPeSiM!JCq=-QVZbVm;k^|WmWAXV)NZ~w>c=W z8PX23=nxw~+`c-@XDWMn%%f+MDoT(%A)Le-7_lAOX;`Rk2NqQA6-x;t?2-)|*7-(r zQyDbZMjM0{mr*axk#8!7>pNW)^o8U|={KF!8b%@mr-wZD8dSsqU(e9xDGbb+`2 zP2}@LV$^wQ3YkscWNHefRiXIZLKN>aBL*TbvN9?}TRg=k(YYAhFT_NptE*Tey&a1u zg&CU+iAk9xDok(yH%(joX^O#OxqMgxzur2YrdfR(vF=IFmtMUYmT{u%I_!8o-*fvq l2)lmAL_yzob+7A2S{d)mGaa|5b*Ngl6M=c#z^vL}Zg)YcTyvQblTii+J}J z`D>gu_2gxC-rJe?9=oH9Tb}~0Q%=8sKEEmsB?xQO0ysD(08p(|vD!z(=6l<&JSedl z(hjre5F0@3ULEFhmAyRX@w1sIN{~DuoFp0;u^roKTBvS^7S!wwO9>S`pb`N4pmlabCEyZiT5MRy050dVY%JAWp{d;Z%U6@-$!yB^0~tyWc>CW( zzDOlTT`Wx@v)P-Rm_lh)D1NsR#rxcdfyj$&85hD8Pl-u&E(Z52F;$bzRicsJjV05< zj7^5bbeX0qN^t-;jVt~%#bC8wKdgaY?;KAjSz{Nm;Y&Y|exnsFlTatFo{Nw zX|#%*H1$Ml+n9A@YOSSi4%@YNzZ`UVB^@|^G{=vZ1Fj_nL}MVkG-f`;B?CVez#n<1X_V(`ffnILg8HwL3HGWQVcB)~e zgZl=gvmqta0P0b^HJy`pPbN=W7O8V&huhm*Hf~$r zzHa@7)i<=ZwQpIywQXbD#%njXw>!?=wrEcksXex5DOvpDo+ZTlt36kcx<5#Jd?fHt z_8u?k{t{$FkM8l1+kOO;ku!TL$xWS4TuCDTai@=T|MGypp(f-Hbk=x7{$S8s6RZn* z>;0iRuNr~BiqhHHSr_(gx-pB|=rh^$=`8x{=oiPco9KLMBjgv-DkpSc)~$HM=DUjA zTHD)QbDhwy3SG`6qFoDI#pSGz!HZ(*sy25k8ZPHu(?JU2PV5|XK^gCzF4qFq+gxi% zo{JW-9z~zT{^{FE6oHSO1IE%e`U(1nqx-%pmC>yrcrMK)WZ4i5IFj@Jt0hv-`-^fQ zy*V3(hAqD+(6;N~h;#AU$OGT}34Z{Lr*r6Q+4PHVed`4qqTq-Hl%Ho+n_(oUQe&Hs zI0S|lQ6-1n?+fJ28CR5f@XY!)>&WGbzfn=(2Zu)%yz(o6MzoOxnj|jSi|B)Z-}UHiu*b#Y(z{e*4hSH_c{g z(H)#(=*9NY&jZ>WFKeIM$F)x=hisqw$xn}&%h}6xWFVMN7t%jI@YdI$eKd;-8ZA&z zx!Gt_8FNC*XgPg*^umDx6IEf2Mum#c-YCrPODE1goTHN&<{4ON=KKHid(`&x_YMs? zP4(y6UQQ%Xz(QI{$6Ms6g0P<=@|$>k*HEm_fN+H+}ba z-}qOiFg}9wmgwxWvw#F^zW4q&9x%6+s5vQo`LoTx2jkN7cYk{wd;cg+^Oc%3FkHrfa7zP`83H67SVGydS-0! zPiEhKzkP`7K!Oo?S4DhxZ=|BJq8}#uj(A0NMQ1e974ENS{N!B~J!-e~5oa`9QSG|o z2zW-39g#ku=-JuTrGgV((TL7Lbwzh1u7X#jRy5vq_ubWZxtgoZ{(R{*rLN&?NQve! zNna;%$9J5e-SNoC=ZWUm2i6lOI`UH6@IDe)O-`3cw&;$)2Esh6@4rZ-kuu%KZ@8tv zRZ>-39aviN>^CLp{?6-&GvKcdet#>mgL}8S@`ly4XgCJmGWhn3zvlgK*WBT?q(pN9 zSAVZyy%Q=LeZEh!h1K=96|5~N9DaZp{(VES?=tezM+x)hH(g5#=Jj1qY`5GEN`EDr z{w#~`d+D7c_A)Sy-0%90R1R7s*%QT5u4LarRy|y76RszB8)=d5Ef%=Sg_~)2xzG*G zcRAhQEu!D)np>Z+o^g?akmg)c+Q^;PJl7KLWZqZCJzIxsVG%l|g{}glBJ}nt&gVIa z9+c=e_WtlW=!u1T1$Mfb%dtIM${iwCUk-H%N4nywujAH8$7gzDAeZmr!Q3KPEa5C{Qm&n>Dwgc) zu-^!;4=0fz6vc`B%`c362h?kwph?ziKNkL%3=Y|fSs}1F6N&xWqrRkS_A6QjbSFqF%Wp$rZ6GX& zldp-x#z3jt$)0%EHc?6bQnw8mVctTmq+R#-OWh)ADq`22cz7-RcZ074N|Tt@P>_t2 z_@NPldYG6JRx4MlwG9UhD1-U6rfl?KA)C3ertGkm&Ae74+pc@4pc~<&8n12WkOCO< zNsVlW@IOo44#NdC2<*DM39!QBG+x`#y98L~U>ezy3e^mmtB5`$&@s2sptI{P9iR)Z zj=%oUn&dCV2)e?ObUcHu<&Vd1mK`0}YcF|R-ybE+Z(7n)|5HJr3J_ZyZX?%74wM?X zMsi@jkvm^79{dR1!nutb$)Pfld!dYO*9jJg+@Xau%yaiI5(MWL34-J0w2c!SSS)f! zD(D8DyZ3TYY~peu*^wn8IbkG^R2sRJLQzLOLQ#_*K`?TKNKP8bkt!qCNRBKua*gE3 zl}4_S99c#W=uJCTEhzSRMRLqbxADpy@d?r1G?K@c)5mf+#dr<9H=CzVub^W(^pg#A z7tcP_DD-x&k+$&MXRZ>-$*bt6dG6~?B6+G=Nc7-JkvzqdFjC&Txy9KJR_-(H{owU> ztJFzqv)tM7J9;3pN}ZKi?i`55ZwyzG)-oKKHprQ?E7I2=jrC|x`CV#%{Dxli!o#er z8V=JEt*otAjo-?P0_`6Wmao7z}DK_j?YAPw}T%IPL2%=6L&V` z=xmuF?5&=Ig4p__cSbZRin?RrXlJxTMf5htESQR22@mL?|i)kspA0E^lkv3RS>LQ}hg{XmR`!@D;MSCLacImIO;8kv{v$H=E z2Wtlp`B;^WJ!@cX*w6Laq4tHNJ!)4pzMJQCt9NXT-M+CW9JxcwafHQ(21c*`y$=3T@hM5wfnP?`!<-grTF;6KYlS1w29cbEA(xgy1J`x-JMcvV!tq~QpquUpe{c((!h-~avfCYpb0C8s@Gx_OOs*uHAE~;cW z)~!Z+^l_ulWVu>4R~u`KM8)ZEaORk!4dpPA^GeG~-&^g0P>Hj&7NR(%RX9Cr(<051 z^W-LZmB-#x2l0n82(5sM2$9dY!1>ljJs4R~$fRg|qq=t!^O~|a|y|R45;W-IqZB>8DV&*R45CS)V{1`%t zkcL*AMKLhM!2=!%0X=>|K?5=ybbP@#RP_|@$7+P)#6m!O7N-`pM;qvOh$-G= zIbEPtG)Y`4DV;t)rwnvqY$_E~Z30h8me}g4KofdGDq`LnQjuw$Bo&q3cT&-zW1F(l zqR*sH$$D1`=Db^_bsi+i*%`Azdio~nmd{>I4`&h21Z?LxODl72r7hWD6y{>zGr8r4 zD~>KWSc|RlgiSt^4dp!h^UJ_sBzRK^K}=FwjoX_zRe{sQOyTp$d0-7~C7!deNs5d~ zD#kh5N&s1($&nwiLCL1Ng;gx3;FkR6ik(LIm)6iK;(7Km&0N~-V58C6p;OJQ-7a|# zy!E_STxLW|jg)Mw5;0UGH%V3AM$0<6hOQ)@NzRMS2f%B}0)+AHl6!%&$%w(aMPL@0 z>XBI^*Icu~PXM>))*`oaB`~$fEd zc%tT&6{{;wx1+KJZvydNW_SbhX2i3s$OPoTJPYKz2>G@qB0y$o>Q5#jU&#rmQ7hE` zPXg3|^{BQ1_l!{Hb)di-a9c!s0`Z0!-YDR47a^Hw`KDE*z#FyVJ%@NNYVv^-GZUU^ z4Jz;kN-gr8LcDXDdshf^I>>NIJ|PJ*r$1T9k>~ooY;WFYw#n94Llo+X#?xb%xMCL^$B`rMo49Sb?naKrlbPdjLNJo}fkWL`dvkYkhkaRDMBUv3WCX5y0G(xcv9l!UBy|Thx3&1+@%r!G_xBO;X$(U>7L+ycU$F5alU`GG?IE3+3{Fya>cIq^px**g_j&KJcN;7=*ad%1jUOCs_{TD)xxaQiqoi^?g={ zH#0%C*v&VsxMvafvFlL7$CoG3aKaV>Q_NVA{8$bUH@WYzU*fTm=}))fp;$b&VvbcB z2UvLcIQ1df1Q@;oGQq$WXoeVin9~4ZLC6eIAO>mxz(c}moM}(|CR9cXks2r%-ONym zgz%|>#IN~jr+-r(LrxIN9AcdchzE$5W+O(1 z@O*Q|8pi;y0l}X#5kmxj4HR?MML}V7I0+ojo)PkvyJu|xJJpZJ3qXf6Dyy2E^cO~L z)&|UQxglZf&Dwwjb(jY{zg1$+%A^m9a+hQSz#*FT1i)9B^#lx;ZPpW*a$zfB5YKu7 zFkmjy^VPng+R~1A#aD&rqHzckR^-Y;P&sKM%XLSFUJ@u@a2-^nz@}pZf9;_ zM@4`eKdVqBPHs*)vB_AjlUqFJfF-+hqx}X}rYAFtw~@u6ZMg6B%vIB{NI3l|SUhtT zu)qsE+HYiGJegSxb^wc$jKw6d;9DD_5|*Q+)~=Z+e_H;uXA+ntT>D9U0_HQkH4AeX zg?V=y-Ap{MHzg4!@pNi(VP0?gGYNAHg*owY6y{Vj3ZorFElfP<$`80Q#p9OBsb)*O zatvV}z7gY!53ZaRCUHOtvVIfXd#@J(`_ z$1V?C4QCKatKIHc-wvPJtHRY3AG@XE<5y4IIYD2oPa*E5Qz-gx>+(T>60hy~`eL{H z^c(u_})a(CK+5jc`1u+z~9 z-gprOGFFJ?ETRAsoV(Rt$b^H5c-00Mc)G^fk-?;yYhyieI1k>x8NR$h`{IW5Yma-B zLWqjYTE}`P>0)tUUBWLh83-Bs*Ld|w6#q+UQ@{VRja{CaDh@PevTx$h+F4w%66K7> zPuo#6-^RLl9PR3sQ9k2Elj*wGC7_tLY7LYO`>1iJEHj7h-2yYd^d?~;H@oPaUG%bfS%k4+T%~QD&6h4ODg2#+X9mVgy9bt` zakvx@NS`Cy^gE)0cZTlApT%uRb`jMOC}S_gVtNIVu(^Ll?w#IHv(AihcU?4S=5gd% zXNCtq({bQ3Fkt%qXwfuctj*xxn7^4utgzBF_+Fb#OPg+I3yCxK^R>&|qI^0v z)-QaELK!pV_rnE%;QFE1vf?X{4CNMY{ z)%AAwmGiJ^x!>o9|MsA~Dzr1&6)q2lYW(#6M=c#z^vL}Zg)YcTyvQblTii+J}J z`D>gu_2gxC-rJe?9=oH9Tb}~0Q%=8sKEEmsB?xQO0ysD(08p(|vD!z(=6l<&JSedl z(hjre5F0@3ULEFhmAyRX@w1sIN{~DuoFp0;u^roKTBvS^H4E$wO9>S`pb`N4pmlabCEyZiT5MRy050dVY%JAWp{d;Z%U6@-$!yB^0~tyWc>CW( zzDOlTT`Wx@v)P-Rm_lh)D1NsR#rxcdfyj$&85hD8Pl-u&E(Z52F;$bzRicsJjV05< zj7^5bbeX0qN^t-;jVt~%#bC8wKdgaY?;KAjSz{Nm;Y&Y|exnsFlTO}7xRBybL?knbb#SJSc_>o5@+7q=b-TBDqPeSiM!JCq=-QVZbVm;k^|WmWAXV)NZ~w>c=W z8PX23=nxw~+`c-@XDWMn%%f+MDoT(%A)Le-7_lAOX;`Rk2UQE~6-x;t?2-)|*7-(r zQyDbZMjM0{mr*axk#8!7>pNW)^o8U|={KF!8b%@mr-wZD8dSsqU(e9xDGbb+`2 zP2}@LV$^wQ3YkscWNHefRiXIZLKN>aBL*TbvN9?}TRg=k(YYAhFT_NptE*Tey&a1u zg&CU+iAk9xDok(yH%(joX^O#OxqMgxzur2YrdfR(vF=IFmtMUYmT{u%I_!8o-*fvq l2)lmAL_yzob+7A2S{d)mGaa|5b*Ngl#gExn%i58wkJGacy=eO;zZdh?0Ey7%Q6rNo>j@_hf+Gdsw3dpk3B}H+wcK)2mtqQ5C6h&$kN`XQfZR|<9)PLCP zP>6zh;82MlaYh_CaNy7jhy&t)stN%D389`4;sCd*s!$F+^iUz*o86sVuQz{9kw9QY z_U_Dk-}{|6ZziXon#)i^FVKY{8iR)zjT18bMT8I%k;wr3OtA4nHkTVuv%Hp!SUOwC6&G@&xk4t32z%&$A3~?f5hSwV%m5wq@$5g&c_O3}q}Gi3t4(GBdp}LO zL@`44jLN7QiQ+fVZ-wCJ2z^I(F~Gb#(sz!1zXQrj%6h%VQd6mhRc{uo)KF^CWTj#w zHTBX;s$!IJ$}Faa_C4{DS+Up>s{=`;St=O|C71&yHk2we%P1O_k(ydrT^(AXM|vyP zJlG(eBUa*-8`Gp0(9O!y)CrQHZ(b$Q^Q)lw96d8c&-n3E){69H|Kb4`tZ$9-v%0>Q0SvRimpLU5$6Ew&!RAx4b7GZp1CQ}A0?kI zW{vBTBo{!M>%$`%ec1M4Vz1P@_Ecs#t7k_t8GUkI&*~$iBlEhhk7Va3nQ>`penihM zOpJ~h>9NUzo*rE+uyn3qOr#fCaUwgKnH*o3905B{(>Y%R2ki*PAq=&(O#(PY^FGeU z98QS+agreDrHtMI-x&RM!;vK40ttRVmC%4C$-BcMU_G?g=cIVQ<0J`~KbPp2eh;2o zd(oLa1?aF2n&mlGsMc{=WFXzm%2}h-Wbn3vi(@LY3WLVw_CZ`I)756haw+vy%=}Pg zb)GwpIV$bQVZ*3k-{Aw{9;U z$5(*5LAAQVu^(pU%YFAJX1cMkROcJ8*2@OnMaE<`j^*e$lq1?hX!k)`mCfQoGTV=a z<7*Vlnt!0H$Qp&ZS+mS)#Tf^f*kZL_HY^&ox#G2Yb=fSkx<}Dj;K!nB z;u3p(9x6?f*^ESuz=&U32cPwiblciL%WI?u--Fq2qEgeCkjTvV_k(hGb~m-V}j!bX0IJMwhy%cSA$cmYN1*} zV`ZT1fweKm3~v9G50v@FfqgFd9;)YsuE<2Dy z1Im>v84c^X8tgx;*rHs-C>Grmh}I12qUWBy!nO9A-Km434f743XSiNHa1K{W^QKqW68DL2irZv zJV!5s)5=Qdx;9S0v>vNsH&zfvvQETcc8< zgAWmn^z}ozg+Rl?3N?inIc|)ujB4*l`Ejsx6CQsZ6s*@bV9g27ImJ>GNzQ&kc}BY{ z<<(9|VQgo`1NfqCtX;cMYh*F7BxEwDy%8YUmcB?9wd2u0%q?0!hMu2{;lcDExx9v$@Wd8*hL zHxtp?nuXLc6mUD1SJm}SvFp!3qCNoEgKF$6mov(&_FHFOl``-LTJYZ>ZZx>x`uj>= zIgcsFwQI2N#@&Oq_XMCJqd^g}E5dC|+Zs|XC#U4KHINg<^OXG!_`bI-;kGog0{2=1 zaB*L@zY*7a3*x#h2zI`9$IEeW&%Tui*?n9WBE8!W3yu$WZNkyzYn27pPhng_FX*9p zzs1S#Ny=SGd#fw2UT_z(cdx+jW{zNEz4N<;zZ)Nd@-MfbJ&~}-nZYwp@Nm;sV&Y0& zv9x>L+pb|7qlj#VQV>C`_9xNL2m7X1i{?_Ki(52Nc(6| zlCul()z~7*JEglv%_-%mc5VOmccL(C_Xg6BeS16Dj)y|9#m#_={j_pI`{V%l$o3HW z$bCW|xzFBc&_`R4C0Z)6vi>-*{tma1*8WaSfPLH>0@mFp>L|atXe*vEC9fRQe%T(c zD2*`%PpKc!P|5be(z)&Mc3o@+ehuSrE2zWKehtD+MfUfV5P@zPbJ&BG4Eeqe@+sdu zqQu766o{tU^BTX%J8Z$N%i%Yw6{bd>J2OpG-8t^3%k?zBolQCycS#FwXWZw(g{E05 RCM(0nA@Tzy7(25q`T|QA>%oC|hr{Oo%`r)h?AqeEinFX5g(VV%+XaZysxHgT} z_d1ph6hvuwjOY$Lw5 ze{hi8r|0MD_7?0x_!hB4-~Du*%mI3(zO#0Tgy;``M5Ol)K=P~fdXi4TLy*GOUQ^lD zL`ewRpm-3J07FEvr{TE^z+PR_?tyuvBmkrVqQYv^s@le4dDkl6YBm7oGKk*VGpazU z(brvN0O{EcpF%;a0ubYQzDncnDv)HR;Am`p@$v6|3~OM8LSOc+m`<=2G6NVri_Bnl zsI|QC%)J188_Q}fp4S>snt?03-m@NRRA}Fuk5y&HS?=_X_ZPePuDi#2NeIx?$IYc zPZyFj3~QOOM2hU#x=KHs5dvsNH|Pyq;@DHNYflNVCz)Iph0|sxJpuYagUTSa&R=za zjnKJGDxxHE?2j;uAAmYT6+C5R3-X2L9mSFW%q>#gX8|wWaOQrp)U}b_HEt^a?prCx zoRtXjSCKvze3QQw4!_|{zYQXQ4|Zx-t#YG)1WM}g)3;|deEf{s^C1Krm}AGlBW0ur`hsv2VrGv@ z9cM|evXLo%fO3wiK4niWxZsR)zYf{tCqbk;V(Ml3s;gY^_ETwQHXt!beIH4W2(Nb} zT<}%8su+E?vJu8e*w|`AaH<(}iY$w&jPdg##7`O_G<;4}#fo{3Y{pP4_$px>_76;( zR=eD)H0?^G?u-LWaJ$i}88+B3qmrAg#$Ls=THcDu1|w;Mp^KYIfM1r>&@JnOEeMSr zt4m1K0F3y#O|V)2NVqHgH5Sx8_z3z|1%+B~R6Dgg@(FGhyL_Z-rEOEOE8C=1wQ5$~ z_GnL)8};qVPN!uc{YeCrKx5P8mSxzMdA$OPTiASX_QEVm0jdV2*o|_diklT-Wdt1R zs%3zh-nCk7)Mp%)&*O4F|IJnfu8)CvfD72~*WvPF`kpYF&CC4UP>RVt2a-!bnQ&F3 zZNJlm3)3=tgbNs^>6(DlH0)hZJu_UjT}MBuf~0okwofxmu1Kw6R<_i z7>9>|xcTkr>UgV^VB#x8BheT{ zQ#fdFtf5m=3LNPBNOXH7&ob;0;NDEwZPO#)bl969h-qM~5MRgKP81rxb z0J<0$8cG=tjqc>V{?xb|)|ttj}nAqBo=cGNAlk z)IJRq)z5`-3j()p0QVZND&W{Goo0<)Y zW60ni!J?{;D!WI|fT6ws)r(5(3zr+pruO&AqAFzI52Rqf{kT&9ee2IFS>(K+T+|-G zy(_z&0q+Swhm7{K5M35-VmgqJGCl>Rs2u_y3G1}(H{ko;#=`AMWI67G5a8my-2Fye z@2wx#O+hg8?Pk0T2k-1Bp&_@At3q7w=EI!gS5wDOO#4!0PW5OQ6_*QgXqIoWv6O>;H+MiEr|2Y){U5p9@NyyM7%(kH?<4e^r8##s(cdf8;Te+?M zVg^+{!Bne?nRLJO%q*m60>x*w2s^Hu%C`0}qWv@yQ)Raa<2@--V#p|o4i%c)9WWRu zY#6gE%608&1YFl`)TpxCx4?zeL0I7eEawYkUp#dgoC^|a$7~v1yaWVJiepk1MJ_EbFc+eN>4rLs{{F2 z|9VdQ;w+rC+kEn#@B{`&N1U}2Lz#v4@bqd-w@`XhxupGVj>kJ>0j!*O#~Ak2DMw(t zO*cRGNAx+c9bFBPn;)Hv{T*djyQgSRC}<00W*_efwKX_8LR*e!9L1HV2Hch(DWfCx zQAqwk!@+*|(h#t4lt&p>fe!>RSt%+Pw1>|}%fphaV72vZICZx`a;kRk_P|T+V|9qE zZg6oZ2FrQ2RV4q{Fmg9=IOP4Y1L=Mf?PIGvc}0=)kE7skYUKN@A+=~j_h9jMqi(5z vcdoA!HSLtGsah+=x{UKq18UxeE`$4=D0M1TbH2W0yheULN5~cUAJ~5ZyNchX literal 0 HcmV?d00001 diff --git a/plugin/trino-delta-lake/src/test/resources/trino432/partition_values_parsed/_delta_log/00000000000000000002.json b/plugin/trino-delta-lake/src/test/resources/trino432/partition_values_parsed/_delta_log/00000000000000000002.json new file mode 100644 index 000000000000..d3461be234eb --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/trino432/partition_values_parsed/_delta_log/00000000000000000002.json @@ -0,0 +1,2 @@ +{"commitInfo":{"version":2,"timestamp":1699495426681,"userId":"yuya.ebihara","userName":"yuya.ebihara","operation":"WRITE","operationParameters":{"queryId":"20231109_020344_00033_9eakg"},"clusterId":"trino-testversion-312e43c0-2e3d-4039-8fd3-9fe5e100d186","readVersion":1,"isolationLevel":"WriteSerializable","isBlindAppend":true}} +{"add":{"path":"int_part=20/string_part=part2/20231109_020344_00033_9eakg_e6448c08-9b43-4fa1-8288-823fd3d692b9","partitionValues":{"int_part":"20","string_part":"part2"},"size":199,"modificationTime":1699495426664,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":2},\"maxValues\":{\"id\":2},\"nullCount\":{\"id\":0}}","tags":{}}} diff --git a/plugin/trino-delta-lake/src/test/resources/trino432/partition_values_parsed/_delta_log/00000000000000000003.checkpoint.parquet b/plugin/trino-delta-lake/src/test/resources/trino432/partition_values_parsed/_delta_log/00000000000000000003.checkpoint.parquet new file mode 100644 index 0000000000000000000000000000000000000000..fb248e83da3ff566a55060fb9e37bc1555a18640 GIT binary patch literal 7077 zcmeHMO>7&-6`ox#DUy5L*UT9Gt)TipgDe#(YVkEaII4`;$LP4t>5L0`;j6>EckmOj#)Y!tzAAWHU*1!r8pLDDk4YL(q1~7P*GK1N^ z-tyctzjEV$xg4@Q<)1KN9`N^oou$OHx!mH?jW==^-?()C`toY-&GRd(ORGy)ujF#G zE#G8@PEnM2A~#30TN^o5QgNe+a5_A&-zc4_^s)UWn&>pWY{_1ROXe6P?Q^;y|s%q8T`w z@D*Rz<9cdN&Cg*0r||qT{D3u2ZSLt)}w*>!MN0h1>Ack7jX$Bx< zAE2JV5@lkV!9Q&=VG?v}l}$cQQLg(;oy0F&azk)AqA-VIQoA1TK)O#f-<_t|FETRl z!7tN`V>Iwqa}9`vWexjLl9#+RTwjE6jROdeo#rLJsGmm5lb{#&D$ua+92nI}qfjr_ zjAFH7jsuKmy;?7826#84^499rt)foq?G+<}5fnGQ{P4uNaViC<8kAyG3)K?cEDtLKaKu)K25P!N z>J6&T5GhDW4k2(^V_7t!c3~l2uCj=lobyBWw5z8~2wq#O{LpnsLpHnS^(#o~A z1U;%2{lUrLmmwKV;;~@T>p{V=9WY?Km>2!xE+^jQWN%iQ9wt2=Zk6Z_zd$q;oP=l! zhK7UHcPlK&vCSxALcY&sQ~m>KoN>@5eY_88#@3t}EbSo4$S;YPAw8{ zy673}mF_gy=4LksE9SQ2ERC;>-Rt57`D4ECMu&uIg=QOU*VPxS%V|2IH>3TxOZ+7- zf8xqYpK-(1A6A(H$7{GE#4cXUA_B@5v(ov>5RRkUMrjQmR1Cef^S`i1@Ynxa(snqg zmY>_ztlQ%d5Qp;C{!wPpjxs$t7t6Co%=`*Y_n2siTkx!A7!P96nVR*9 zW0b+)JXuK^ROJtz0Y`lfs^^v15iS?SW%*AdS&56mA4tJ|yK#l?`_`FPvdDQ(yeL0} zd)I3#cm)rufi*0L7>yRko>~^rN!nf*hLVTWtAL zxA;Ji?~Z0uLiiqMo#lf&FRZP#-?GDYWu!G*80gJ7BE(tNxtQ(1PsCZ*M>V@X!}M%= z&34)&1Q%(A(OpXP9O!`fJ|}*_$zL3ke|&5bG&d-0L?E9NXl@R@9Y^pru2+1sKn|L8azbgS_rd9BtD<-sj~-!-&1C0&#mwsSlK|9K@dc7xz`NgbwPW~X+N07m^c3AR@v^IO%pSo2}nKgd< zuAx`phbQ~+vx19B(Odkpty1zQ{m8ArVUl;p4oY|TR2N(E_pgZl)Xz_We@cOOSo>3usm>C&?q$rc5jt5UkQD$C#d~tF{YHp$^ z6Hri4I%kC`&CW&dkqKC`m0Yfw47`N;7j(6!P>F NH5ujsbq4^w004thBAEaH literal 0 HcmV?d00001 diff --git a/plugin/trino-delta-lake/src/test/resources/trino432/partition_values_parsed/int_part=20/string_part=part2/20231109_020344_00033_9eakg_e6448c08-9b43-4fa1-8288-823fd3d692b9 b/plugin/trino-delta-lake/src/test/resources/trino432/partition_values_parsed/int_part=20/string_part=part2/20231109_020344_00033_9eakg_e6448c08-9b43-4fa1-8288-823fd3d692b9 new file mode 100644 index 0000000000000000000000000000000000000000..c26b952c5e645d00c8da3e3dace9c344526bb110 GIT binary patch literal 199 zcmWG=3^EjD5ET)X&=F+3usm>EH&D3hd)2Tw^+W?p`LadJj#ZlWj? zP*9agf+;hFL5xXKMp8zNK?bPIl!J{y5+o|hB*r9WCT69^VI#o;G6O=2F=!z17$jwY z)&Wfd$^#WMnOQMt7!)KH6_%!!=;jvbf^AeNOD!tS%+FIONi8mcu{DxPGjmcD^7Iom M8Rh|X2LQbQ0EKKKo&W#< literal 0 HcmV?d00001 diff --git a/plugin/trino-delta-lake/src/test/resources/trino432/partition_values_parsed/int_part=__HIVE_DEFAULT_PARTITION__/string_part=__HIVE_DEFAULT_PARTITION__/20231109_020350_00034_9eakg_2a008dd8-da7f-496a-b404-ca455732578e b/plugin/trino-delta-lake/src/test/resources/trino432/partition_values_parsed/int_part=__HIVE_DEFAULT_PARTITION__/string_part=__HIVE_DEFAULT_PARTITION__/20231109_020350_00034_9eakg_2a008dd8-da7f-496a-b404-ca455732578e new file mode 100644 index 0000000000000000000000000000000000000000..979c12f8bf0acb6dd97e99b6fb60123736a0254f GIT binary patch literal 199 zcmWG=3^EjD5ET)X&=F+3usm>HRYq$rc5jt5UkQD$C#d~tF{YHp$^ z6Hri4I%kC`&CW&dkqKC`m0Yfw47`N;7j(6!P>F NH5ujsbq4^w004)EBBKBR literal 0 HcmV?d00001