From 9bcd0854d4de330ae336fd2cde4503a2e70ab222 Mon Sep 17 00:00:00 2001 From: Raunaq Morarka Date: Mon, 21 Oct 2024 14:28:02 +0530 Subject: [PATCH 1/5] Avoid fileStatisticsDomain as member variable in IcebergSplitSource Keeping domain attached to the relevant FileScanTask is safer than handling it as member variable of the class --- .../plugin/iceberg/IcebergSplitSource.java | 50 ++++++++++++------- 1 file changed, 32 insertions(+), 18 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java index 5158f0e8742b..1641c6dbe71d 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java @@ -19,6 +19,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterators; import com.google.common.collect.Maps; import com.google.common.io.Closer; import io.airlift.units.DataSize; @@ -137,8 +138,7 @@ public class IcebergSplitSource private CloseableIterable fileScanIterable; private long targetSplitSize; private CloseableIterator fileScanIterator; - private Iterator fileTasksIterator = emptyIterator(); - private TupleDomain fileStatisticsDomain; + private Iterator fileTasksIterator = emptyIterator(); private final boolean recordScannedFiles; private final ImmutableSet.Builder scannedFiles = ImmutableSet.builder(); @@ -265,8 +265,8 @@ private CompletableFuture getNextBatchInternal(int maxSize) FileScanTask wholeFileTask = fileScanIterator.next(); boolean fileHasNoDeletions = wholeFileTask.deletes().isEmpty(); - fileStatisticsDomain = createFileStatisticsDomain(wholeFileTask); - if (pruneFileScanTask(wholeFileTask, fileHasNoDeletions, dynamicFilterPredicate, fileStatisticsDomain)) { + FileScanTaskWithDomain fileScanTaskWithDomain = createFileScanTaskWithDomain(wholeFileTask); + if (pruneFileScanTask(fileScanTaskWithDomain, fileHasNoDeletions, dynamicFilterPredicate)) { continue; } @@ -291,21 +291,22 @@ private CompletableFuture getNextBatchInternal(int maxSize) } if (fileHasNoDeletions && noDataColumnsProjected(wholeFileTask)) { - fileTasksIterator = List.of(wholeFileTask).iterator(); + fileTasksIterator = List.of(fileScanTaskWithDomain).iterator(); } else { - fileTasksIterator = wholeFileTask.split(targetSplitSize).iterator(); + fileTasksIterator = fileScanTaskWithDomain.split(targetSplitSize); } // In theory, .split() could produce empty iterator, so let's evaluate the outer loop condition again. continue; } - splits.add(toIcebergSplit(fileTasksIterator.next(), fileStatisticsDomain)); + splits.add(toIcebergSplit(fileTasksIterator.next())); } return completedFuture(new ConnectorSplitBatch(splits, isFinished())); } - private boolean pruneFileScanTask(FileScanTask fileScanTask, boolean fileHasNoDeletions, TupleDomain dynamicFilterPredicate, TupleDomain fileStatisticsDomain) + private boolean pruneFileScanTask(FileScanTaskWithDomain fileScanTaskWithDomain, boolean fileHasNoDeletions, TupleDomain dynamicFilterPredicate) { + FileScanTask fileScanTask = fileScanTaskWithDomain.fileScanTask(); if (fileHasNoDeletions && maxScannedFileSizeInBytes.isPresent() && fileScanTask.file().fileSizeInBytes() > maxScannedFileSizeInBytes.get()) { @@ -338,7 +339,7 @@ private boolean pruneFileScanTask(FileScanTask fileScanTask, boolean fileHasNoDe dynamicFilterPredicate)) { return true; } - if (!fileStatisticsDomain.overlaps(dynamicFilterPredicate)) { + if (!fileScanTaskWithDomain.fileStatisticsDomain().overlaps(dynamicFilterPredicate)) { return true; } } @@ -390,18 +391,30 @@ public void close() } } - private TupleDomain createFileStatisticsDomain(FileScanTask wholeFileTask) + private FileScanTaskWithDomain createFileScanTaskWithDomain(FileScanTask wholeFileTask) { List predicatedColumns = wholeFileTask.schema().columns().stream() .filter(column -> predicatedColumnIds.contains(column.fieldId())) .map(column -> getColumnHandle(column, typeManager)) .collect(toImmutableList()); - return createFileStatisticsDomain( - fieldIdToType, - wholeFileTask.file().lowerBounds(), - wholeFileTask.file().upperBounds(), - wholeFileTask.file().nullValueCounts(), - predicatedColumns); + return new FileScanTaskWithDomain( + wholeFileTask, + createFileStatisticsDomain( + fieldIdToType, + wholeFileTask.file().lowerBounds(), + wholeFileTask.file().upperBounds(), + wholeFileTask.file().nullValueCounts(), + predicatedColumns)); + } + + private record FileScanTaskWithDomain(FileScanTask fileScanTask, TupleDomain fileStatisticsDomain) + { + Iterator split(long targetSplitSize) + { + return Iterators.transform( + fileScanTask().split(targetSplitSize).iterator(), + task -> new FileScanTaskWithDomain(task, fileStatisticsDomain)); + } } @VisibleForTesting @@ -521,8 +534,9 @@ static boolean partitionMatchesPredicate( return true; } - private IcebergSplit toIcebergSplit(FileScanTask task, TupleDomain fileStatisticsDomain) + private IcebergSplit toIcebergSplit(FileScanTaskWithDomain taskWithDomain) { + FileScanTask task = taskWithDomain.fileScanTask(); return new IcebergSplit( task.file().path().toString(), task.start(), @@ -536,7 +550,7 @@ private IcebergSplit toIcebergSplit(FileScanTask task, TupleDomain Date: Mon, 21 Oct 2024 14:28:02 +0530 Subject: [PATCH 2/5] Refactor IcebergSplitSource to allow processing of multiple FileScanTask This will be used in subsequent commit to skip unncessary files from optimize --- .../plugin/iceberg/IcebergSplitSource.java | 81 ++++++++++++------- 1 file changed, 51 insertions(+), 30 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java index 1641c6dbe71d..383a18ec6c45 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java @@ -262,40 +262,13 @@ private CompletableFuture getNextBatchInternal(int maxSize) finish(); break; } - FileScanTask wholeFileTask = fileScanIterator.next(); - boolean fileHasNoDeletions = wholeFileTask.deletes().isEmpty(); - FileScanTaskWithDomain fileScanTaskWithDomain = createFileScanTaskWithDomain(wholeFileTask); - if (pruneFileScanTask(fileScanTaskWithDomain, fileHasNoDeletions, dynamicFilterPredicate)) { + List fileScanTasks = processFileScanTask(dynamicFilterPredicate); + if (fileScanTasks.isEmpty()) { continue; } - if (recordScannedFiles) { - // Equality deletes can only be cleaned up if the whole table has been optimized. - // Equality and position deletes may apply to many files, however position deletes are always local to a partition - // https://github.com/apache/iceberg/blob/70c506ebad2dfc6d61b99c05efd59e884282bfa6/core/src/main/java/org/apache/iceberg/deletes/DeleteGranularity.java#L61 - // OPTIMIZE supports only enforced predicates which select whole partitions, so if there is no path or fileModifiedTime predicate, then we can clean up position deletes - List fullyAppliedDeletes = wholeFileTask.deletes().stream() - .filter(deleteFile -> switch (deleteFile.content()) { - case POSITION_DELETES -> pathDomain.isAll() && fileModifiedTimeDomain.isAll(); - case EQUALITY_DELETES -> tableHandle.getEnforcedPredicate().isAll(); - case DATA -> throw new IllegalStateException("Unexpected delete file: " + deleteFile); - }) - .collect(toImmutableList()); - scannedFiles.add(new DataFileWithDeleteFiles(wholeFileTask.file(), fullyAppliedDeletes)); - } - - if (fileHasNoDeletions) { - // There were no deletions, so we will produce splits covering the whole file - outputRowsLowerBound = saturatedAdd(outputRowsLowerBound, wholeFileTask.file().recordCount()); - } - - if (fileHasNoDeletions && noDataColumnsProjected(wholeFileTask)) { - fileTasksIterator = List.of(fileScanTaskWithDomain).iterator(); - } - else { - fileTasksIterator = fileScanTaskWithDomain.split(targetSplitSize); - } + fileTasksIterator = prepareFileTasksIterator(fileScanTasks); // In theory, .split() could produce empty iterator, so let's evaluate the outer loop condition again. continue; } @@ -304,6 +277,54 @@ private CompletableFuture getNextBatchInternal(int maxSize) return completedFuture(new ConnectorSplitBatch(splits, isFinished())); } + private Iterator prepareFileTasksIterator(List fileScanTasks) + { + ImmutableList.Builder scanTaskBuilder = ImmutableList.builder(); + for (FileScanTaskWithDomain fileScanTaskWithDomain : fileScanTasks) { + FileScanTask wholeFileTask = fileScanTaskWithDomain.fileScanTask(); + if (recordScannedFiles) { + // Equality deletes can only be cleaned up if the whole table has been optimized. + // Equality and position deletes may apply to many files, however position deletes are always local to a partition + // https://github.com/apache/iceberg/blob/70c506ebad2dfc6d61b99c05efd59e884282bfa6/core/src/main/java/org/apache/iceberg/deletes/DeleteGranularity.java#L61 + // OPTIMIZE supports only enforced predicates which select whole partitions, so if there is no path or fileModifiedTime predicate, then we can clean up position deletes + List fullyAppliedDeletes = wholeFileTask.deletes().stream() + .filter(deleteFile -> switch (deleteFile.content()) { + case POSITION_DELETES -> pathDomain.isAll() && fileModifiedTimeDomain.isAll(); + case EQUALITY_DELETES -> tableHandle.getEnforcedPredicate().isAll(); + case DATA -> throw new IllegalStateException("Unexpected delete file: " + deleteFile); + }) + .collect(toImmutableList()); + scannedFiles.add(new DataFileWithDeleteFiles(wholeFileTask.file(), fullyAppliedDeletes)); + } + + boolean fileHasNoDeletions = wholeFileTask.deletes().isEmpty(); + if (fileHasNoDeletions) { + // There were no deletions, so we will produce splits covering the whole file + outputRowsLowerBound = saturatedAdd(outputRowsLowerBound, wholeFileTask.file().recordCount()); + } + + if (fileHasNoDeletions && noDataColumnsProjected(wholeFileTask)) { + scanTaskBuilder.add(fileScanTaskWithDomain); + } + else { + scanTaskBuilder.addAll(fileScanTaskWithDomain.split(targetSplitSize)); + } + } + return scanTaskBuilder.build().iterator(); + } + + private List processFileScanTask(TupleDomain dynamicFilterPredicate) + { + FileScanTask wholeFileTask = fileScanIterator.next(); + boolean fileHasNoDeletions = wholeFileTask.deletes().isEmpty(); + FileScanTaskWithDomain fileScanTaskWithDomain = createFileScanTaskWithDomain(wholeFileTask); + if (pruneFileScanTask(fileScanTaskWithDomain, fileHasNoDeletions, dynamicFilterPredicate)) { + return ImmutableList.of(); + } + + return ImmutableList.of(fileScanTaskWithDomain); + } + private boolean pruneFileScanTask(FileScanTaskWithDomain fileScanTaskWithDomain, boolean fileHasNoDeletions, TupleDomain dynamicFilterPredicate) { FileScanTask fileScanTask = fileScanTaskWithDomain.fileScanTask(); From ec5db344fe738786965ef3964dd3a4761839fe0a Mon Sep 17 00:00:00 2001 From: Raunaq Morarka Date: Tue, 22 Oct 2024 14:07:45 +0530 Subject: [PATCH 3/5] Extract StructLikeWrapperWithFieldIdToIndex to separate class --- .../trino/plugin/iceberg/PartitionTable.java | 45 +----------- .../StructLikeWrapperWithFieldIdToIndex.java | 69 +++++++++++++++++++ ...stStructLikeWrapperWithFieldIdToIndex.java | 4 +- 3 files changed, 73 insertions(+), 45 deletions(-) create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/StructLikeWrapperWithFieldIdToIndex.java diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/PartitionTable.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/PartitionTable.java index 6ff4ee439198..7689dbf3dc95 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/PartitionTable.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/PartitionTable.java @@ -13,9 +13,7 @@ */ package io.trino.plugin.iceberg; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import io.trino.spi.block.SqlRow; import io.trino.spi.connector.ColumnMetadata; import io.trino.spi.connector.ConnectorSession; @@ -49,10 +47,8 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.stream.IntStream; import java.util.stream.Stream; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -262,10 +258,10 @@ private RecordCursor buildRecordCursor(Map fieldIdToIndex; - - public StructLikeWrapperWithFieldIdToIndex(StructLikeWrapper structLikeWrapper, Types.StructType structType) - { - this.structLikeWrapper = structLikeWrapper; - ImmutableMap.Builder fieldIdToIndex = ImmutableMap.builder(); - List fields = structType.fields(); - IntStream.range(0, fields.size()) - .forEach(i -> fieldIdToIndex.put(fields.get(i).fieldId(), i)); - this.fieldIdToIndex = fieldIdToIndex.buildOrThrow(); - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - StructLikeWrapperWithFieldIdToIndex that = (StructLikeWrapperWithFieldIdToIndex) o; - // Due to bogus implementation of equals in StructLikeWrapper https://github.com/apache/iceberg/issues/5064 order here matters. - return Objects.equals(fieldIdToIndex, that.fieldIdToIndex) && Objects.equals(structLikeWrapper, that.structLikeWrapper); - } - - @Override - public int hashCode() - { - return Objects.hash(fieldIdToIndex, structLikeWrapper); - } - } - private record IcebergPartitionColumn(RowType rowType, List fieldIds) {} } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/StructLikeWrapperWithFieldIdToIndex.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/StructLikeWrapperWithFieldIdToIndex.java new file mode 100644 index 000000000000..f31dccf2ce10 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/StructLikeWrapperWithFieldIdToIndex.java @@ -0,0 +1,69 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.StructLikeWrapper; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.IntStream; + +public class StructLikeWrapperWithFieldIdToIndex +{ + private final StructLikeWrapper structLikeWrapper; + private final Map fieldIdToIndex; + + public StructLikeWrapperWithFieldIdToIndex(StructLikeWrapper structLikeWrapper, Types.StructType structType) + { + this.structLikeWrapper = structLikeWrapper; + ImmutableMap.Builder fieldIdToIndex = ImmutableMap.builder(); + List fields = structType.fields(); + IntStream.range(0, fields.size()) + .forEach(i -> fieldIdToIndex.put(fields.get(i).fieldId(), i)); + this.fieldIdToIndex = fieldIdToIndex.buildOrThrow(); + } + + public StructLikeWrapper getStructLikeWrapper() + { + return structLikeWrapper; + } + + public Map getFieldIdToIndex() + { + return fieldIdToIndex; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + StructLikeWrapperWithFieldIdToIndex that = (StructLikeWrapperWithFieldIdToIndex) o; + // Due to bogus implementation of equals in StructLikeWrapper https://github.com/apache/iceberg/issues/5064 order here matters. + return Objects.equals(fieldIdToIndex, that.fieldIdToIndex) && Objects.equals(structLikeWrapper, that.structLikeWrapper); + } + + @Override + public int hashCode() + { + return Objects.hash(fieldIdToIndex, structLikeWrapper); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestStructLikeWrapperWithFieldIdToIndex.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestStructLikeWrapperWithFieldIdToIndex.java index d0d5b48afaae..4cb3de25b720 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestStructLikeWrapperWithFieldIdToIndex.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestStructLikeWrapperWithFieldIdToIndex.java @@ -36,8 +36,8 @@ public void testStructLikeWrapperWithFieldIdToIndexEquals() NestedField.optional(1001, "level", IntegerType.get())); PartitionData firstPartitionData = PartitionData.fromJson("{\"partitionValues\":[\"ERROR\",\"449245\"]}", new Type[] {StringType.get(), IntegerType.get()}); PartitionData secondPartitionData = PartitionData.fromJson("{\"partitionValues\":[\"449245\",\"ERROR\"]}", new Type[] {IntegerType.get(), StringType.get()}); - PartitionTable.StructLikeWrapperWithFieldIdToIndex first = new PartitionTable.StructLikeWrapperWithFieldIdToIndex(StructLikeWrapper.forType(firstStructType).set(firstPartitionData), firstStructType); - PartitionTable.StructLikeWrapperWithFieldIdToIndex second = new PartitionTable.StructLikeWrapperWithFieldIdToIndex(StructLikeWrapper.forType(secondStructType).set(secondPartitionData), secondStructType); + StructLikeWrapperWithFieldIdToIndex first = new StructLikeWrapperWithFieldIdToIndex(StructLikeWrapper.forType(firstStructType).set(firstPartitionData), firstStructType); + StructLikeWrapperWithFieldIdToIndex second = new StructLikeWrapperWithFieldIdToIndex(StructLikeWrapper.forType(secondStructType).set(secondPartitionData), secondStructType); assertThat(first).isNotEqualTo(second); } } From b2c7e6eec68c1343da71aa619be7676fd55676b7 Mon Sep 17 00:00:00 2001 From: Raunaq Morarka Date: Wed, 23 Oct 2024 19:30:14 +0530 Subject: [PATCH 4/5] Add StructLikeWrapperWithFieldIdToIndex#createStructLikeWrapper --- .../java/io/trino/plugin/iceberg/PartitionTable.java | 9 ++------- .../iceberg/StructLikeWrapperWithFieldIdToIndex.java | 12 +++++++++++- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/PartitionTable.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/PartitionTable.java index 7689dbf3dc95..7e0c4de227e4 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/PartitionTable.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/PartitionTable.java @@ -30,15 +30,12 @@ import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionField; import org.apache.iceberg.Schema; -import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; -import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.NestedField; -import org.apache.iceberg.util.StructLikeWrapper; import java.io.IOException; import java.io.UncheckedIOException; @@ -56,6 +53,7 @@ import static io.trino.plugin.iceberg.IcebergTypes.convertIcebergValueToTrino; import static io.trino.plugin.iceberg.IcebergUtil.getIdentityPartitions; import static io.trino.plugin.iceberg.IcebergUtil.primitiveFieldTypes; +import static io.trino.plugin.iceberg.StructLikeWrapperWithFieldIdToIndex.createStructLikeWrapper; import static io.trino.plugin.iceberg.TypeConverter.toTrinoType; import static io.trino.spi.block.RowValueBuilder.buildRowValue; import static io.trino.spi.type.BigintType.BIGINT; @@ -215,10 +213,7 @@ private Map getStatistic Map partitions = new HashMap<>(); for (FileScanTask fileScanTask : fileScanTasks) { DataFile dataFile = fileScanTask.file(); - Types.StructType structType = fileScanTask.spec().partitionType(); - StructLike partitionStruct = dataFile.partition(); - StructLikeWrapper partitionWrapper = StructLikeWrapper.forType(structType).set(partitionStruct); - StructLikeWrapperWithFieldIdToIndex structLikeWrapperWithFieldIdToIndex = new StructLikeWrapperWithFieldIdToIndex(partitionWrapper, structType); + StructLikeWrapperWithFieldIdToIndex structLikeWrapperWithFieldIdToIndex = createStructLikeWrapper(fileScanTask); partitions.computeIfAbsent( structLikeWrapperWithFieldIdToIndex, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/StructLikeWrapperWithFieldIdToIndex.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/StructLikeWrapperWithFieldIdToIndex.java index f31dccf2ce10..4fa5c08f1a9c 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/StructLikeWrapperWithFieldIdToIndex.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/StructLikeWrapperWithFieldIdToIndex.java @@ -13,7 +13,9 @@ */ package io.trino.plugin.iceberg; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; +import org.apache.iceberg.FileScanTask; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.StructLikeWrapper; @@ -27,7 +29,15 @@ public class StructLikeWrapperWithFieldIdToIndex private final StructLikeWrapper structLikeWrapper; private final Map fieldIdToIndex; - public StructLikeWrapperWithFieldIdToIndex(StructLikeWrapper structLikeWrapper, Types.StructType structType) + public static StructLikeWrapperWithFieldIdToIndex createStructLikeWrapper(FileScanTask fileScanTask) + { + Types.StructType structType = fileScanTask.spec().partitionType(); + StructLikeWrapper partitionWrapper = StructLikeWrapper.forType(structType).set(fileScanTask.file().partition()); + return new StructLikeWrapperWithFieldIdToIndex(partitionWrapper, structType); + } + + @VisibleForTesting + StructLikeWrapperWithFieldIdToIndex(StructLikeWrapper structLikeWrapper, Types.StructType structType) { this.structLikeWrapper = structLikeWrapper; ImmutableMap.Builder fieldIdToIndex = ImmutableMap.builder(); From 84ac7143c8b9f5efefc7cdf9b5e0277dc224eb38 Mon Sep 17 00:00:00 2001 From: Raunaq Morarka Date: Mon, 14 Oct 2024 17:48:25 +0530 Subject: [PATCH 5/5] Skip single file in a partition from OPTIMIZE Single file without a delete in a partition can't be optimized any further --- .../plugin/iceberg/IcebergSplitManager.java | 2 +- .../plugin/iceberg/IcebergSplitSource.java | 55 +++++++++++++++- .../iceberg/BaseIcebergConnectorTest.java | 65 ++++++++++++++++++- .../iceberg/TestIcebergSplitSource.java | 4 +- .../trino/plugin/iceberg/TestIcebergV2.java | 18 ++++- 5 files changed, 133 insertions(+), 11 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java index 01ff4e10a99a..2fe083b0ca67 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java @@ -98,7 +98,7 @@ public ConnectorSplitSource getSplits( fileSystemFactory, session, table, - icebergTable.io().properties(), + icebergTable, scan, table.getMaxScannedFileSize(), dynamicFilter, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java index 383a18ec6c45..58da28fdfeef 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java @@ -22,6 +22,7 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Maps; import com.google.common.io.Closer; +import io.airlift.log.Logger; import io.airlift.units.DataSize; import io.airlift.units.Duration; import io.trino.cache.NonEvictableCache; @@ -49,6 +50,7 @@ import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Scan; import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; @@ -58,6 +60,7 @@ import java.io.UncheckedIOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -94,6 +97,7 @@ import static io.trino.plugin.iceberg.IcebergUtil.getPartitionValues; import static io.trino.plugin.iceberg.IcebergUtil.getPathDomain; import static io.trino.plugin.iceberg.IcebergUtil.primitiveFieldTypes; +import static io.trino.plugin.iceberg.StructLikeWrapperWithFieldIdToIndex.createStructLikeWrapper; import static io.trino.plugin.iceberg.TypeConverter.toIcebergType; import static io.trino.spi.type.DateTimeEncoding.packDateTimeWithZone; import static io.trino.spi.type.TimeZoneKey.UTC_KEY; @@ -110,6 +114,7 @@ public class IcebergSplitSource implements ConnectorSplitSource { + private static final Logger log = Logger.get(IcebergSplitSource.class); private static final ConnectorSplitBatch EMPTY_BATCH = new ConnectorSplitBatch(ImmutableList.of(), false); private static final ConnectorSplitBatch NO_MORE_SPLITS_BATCH = new ConnectorSplitBatch(ImmutableList.of(), true); @@ -141,7 +146,10 @@ public class IcebergSplitSource private Iterator fileTasksIterator = emptyIterator(); private final boolean recordScannedFiles; + private final int currentSpecId; private final ImmutableSet.Builder scannedFiles = ImmutableSet.builder(); + @Nullable + private Map> scannedFilesByPartition = new HashMap<>(); private long outputRowsLowerBound; private final CachingHostAddressProvider cachingHostAddressProvider; @@ -149,7 +157,7 @@ public IcebergSplitSource( IcebergFileSystemFactory fileSystemFactory, ConnectorSession session, IcebergTableHandle tableHandle, - Map fileIoProperties, + Table icebergTable, Scan tableScan, Optional maxScannedFileSize, DynamicFilter dynamicFilter, @@ -163,7 +171,7 @@ public IcebergSplitSource( this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.session = requireNonNull(session, "session is null"); this.tableHandle = requireNonNull(tableHandle, "tableHandle is null"); - this.fileIoProperties = requireNonNull(fileIoProperties, "fileIoProperties is null"); + this.fileIoProperties = requireNonNull(icebergTable.io().properties(), "fileIoProperties is null"); this.tableScan = requireNonNull(tableScan, "tableScan is null"); this.maxScannedFileSizeInBytes = maxScannedFileSize.map(DataSize::toBytes); this.fieldIdToType = primitiveFieldTypes(tableScan.schema()); @@ -173,6 +181,7 @@ public IcebergSplitSource( this.partitionConstraintMatcher = new PartitionConstraintMatcher(constraint); this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.recordScannedFiles = recordScannedFiles; + this.currentSpecId = icebergTable.spec().specId(); this.minimumAssignedSplitWeight = minimumAssignedSplitWeight; this.projectedBaseColumns = tableHandle.getProjectedColumns().stream() .map(column -> column.getBaseColumnIdentity().getId()) @@ -322,6 +331,37 @@ private List processFileScanTask(TupleDomain allQueuedTasks = scannedFilesByPartition.values().stream() + .filter(Optional::isPresent) + .map(Optional::get); + scannedFilesByPartition = null; + return Stream.concat(allQueuedTasks, Stream.of(fileScanTaskWithDomain)).collect(toImmutableList()); + } + StructLikeWrapperWithFieldIdToIndex structLikeWrapperWithFieldIdToIndex = createStructLikeWrapper(wholeFileTask); + Optional alreadyQueuedFileTask = scannedFilesByPartition.get(structLikeWrapperWithFieldIdToIndex); + if (alreadyQueuedFileTask != null) { + // Optional.empty() is a marker for partitions where we've seen enough files to avoid skipping them from OPTIMIZE + if (alreadyQueuedFileTask.isEmpty()) { + return ImmutableList.of(fileScanTaskWithDomain); + } + scannedFilesByPartition.put(structLikeWrapperWithFieldIdToIndex, Optional.empty()); + return ImmutableList.of(alreadyQueuedFileTask.get(), fileScanTaskWithDomain); + } + // If file has no deletions, and it's the only file seen so far for the partition + // then we skip it from splits generation unless we encounter another file in the same partition + if (fileHasNoDeletions) { + scannedFilesByPartition.put(structLikeWrapperWithFieldIdToIndex, Optional.of(fileScanTaskWithDomain)); + return ImmutableList.of(); + } + scannedFilesByPartition.put(structLikeWrapperWithFieldIdToIndex, Optional.empty()); return ImmutableList.of(fileScanTaskWithDomain); } @@ -398,7 +438,16 @@ public Optional> getTableExecuteSplitsInfo() if (!recordScannedFiles) { return Optional.empty(); } - return Optional.of(ImmutableList.copyOf(scannedFiles.build())); + long filesSkipped = 0; + if (scannedFilesByPartition != null) { + filesSkipped = scannedFilesByPartition.values().stream() + .filter(Optional::isPresent) + .count(); + scannedFilesByPartition = null; + } + List splitsInfo = ImmutableList.copyOf(scannedFiles.build()); + log.info("Generated %d splits, skipped %d files for OPTIMIZE", splitsInfo.size(), filesSkipped); + return Optional.of(splitsInfo); } @Override diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index aab9d6b1fc5f..dc11682ec5f1 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import io.airlift.units.DataSize; import io.airlift.units.Duration; import io.trino.Session; @@ -5377,7 +5378,7 @@ public void testOptimizeForPartitionedTable() List updatedFiles = getActiveFiles(tableName); // as we force repartitioning there should be only 3 partitions assertThat(updatedFiles).hasSize(3); - assertThat(getAllDataFilesFromTableDirectory(tableName)).containsExactlyInAnyOrderElementsOf(concat(initialFiles, updatedFiles)); + assertThat(getAllDataFilesFromTableDirectory(tableName)).containsExactlyInAnyOrderElementsOf(ImmutableSet.copyOf(concat(initialFiles, updatedFiles))); assertUpdate("DROP TABLE " + tableName); } @@ -5545,7 +5546,8 @@ public void testOptimizeCleansUpDeleteFiles() List allDataFilesAfterFullOptimize = getAllDataFilesFromTableDirectory(tableName); assertThat(allDataFilesAfterFullOptimize) .hasSize(5) - .doesNotContain(allDataFilesInitially.toArray(new String[0])); + // All files skipped from OPTIMIZE as they have no deletes and there's only one file per partition + .contains(allDataFilesAfterOptimizeWithWhere.toArray(new String[0])); assertThat(query("SELECT * FROM " + tableName)) .matches("SELECT * FROM nation WHERE nationkey != 7"); @@ -5578,6 +5580,65 @@ public void testOptimizeSystemTable() .failure().hasMessage("This connector does not support table procedures"); } + @Test + void testOptimizeOnlyOneFileShouldHaveNoEffect() + { + String tableName = "test_optimize_one_file_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (a integer)"); + assertUpdate("INSERT INTO " + tableName + " VALUES 1, 2", 2); + + List initialFiles = getActiveFiles(tableName); + assertThat(initialFiles).hasSize(1); + + computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); + assertThat(query("SELECT a FROM " + tableName)) + .matches("VALUES 1, 2"); + assertThat(getActiveFiles(tableName)) + .containsExactlyInAnyOrderElementsOf(initialFiles); + + assertUpdate("DELETE FROM " + tableName + " WHERE a = 1", 1); + // Calling optimize after adding a DELETE should result in compaction + computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); + assertThat(query("SELECT a FROM " + tableName)) + .matches("VALUES 2"); + assertThat(getActiveFiles(tableName)) + .hasSize(1) + .doesNotContainAnyElementsOf(initialFiles); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test + void testOptimizeAfterChangeInPartitioning() + { + String tableName = "test_optimize_after_change_in_partitioning_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['bucket(nationkey, 5)']) AS SELECT * FROM tpch.tiny.supplier", 100); + List initialFiles = getActiveFiles(tableName); + assertThat(initialFiles).hasSize(5); + + // OPTIMIZE shouldn't have to rewrite files + computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); + assertThat(query("SELECT COUNT(*) FROM " + tableName)).matches("VALUES BIGINT '100'"); + assertThat(getActiveFiles(tableName)) + .containsExactlyInAnyOrderElementsOf(initialFiles); + + // Change in partitioning should result in OPTIMIZE rewriting all files + assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES partitioning = ARRAY['nationkey']"); + computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); + assertThat(query("SELECT COUNT(*) FROM " + tableName)).matches("VALUES BIGINT '100'"); + List filesAfterPartioningChange = getActiveFiles(tableName); + assertThat(filesAfterPartioningChange) + .hasSize(25) + .doesNotContainAnyElementsOf(initialFiles); + + // OPTIMIZE shouldn't have to rewrite files anymore + computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); + assertThat(query("SELECT COUNT(*) FROM " + tableName)).matches("VALUES BIGINT '100'"); + assertThat(getActiveFiles(tableName)) + .hasSize(25) + .containsExactlyInAnyOrderElementsOf(filesAfterPartioningChange); + } + private List getActiveFiles(String tableName) { return computeActual(format("SELECT file_path FROM \"%s$files\"", tableName)).getOnlyColumn() diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java index de96d8d8bd94..db9a145fba8f 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java @@ -168,7 +168,7 @@ public void testIncompleteDynamicFilterTimeout() new DefaultIcebergFileSystemFactory(fileSystemFactory), SESSION, tableHandle, - ImmutableMap.of(), + nationTable, nationTable.newScan(), Optional.empty(), new DynamicFilter() @@ -443,7 +443,7 @@ private IcebergSplit generateSplit(Table nationTable, IcebergTableHandle tableHa new DefaultIcebergFileSystemFactory(fileSystemFactory), SESSION, tableHandle, - ImmutableMap.of(), + nationTable, nationTable.newScan(), Optional.empty(), dynamicFilter, diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java index ae310c3e1200..a660869fde8c 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java @@ -310,7 +310,11 @@ public void testOptimizingV2TableRemovesEqualityDeletesWhenWholeTableIsScanned() throws Exception { String tableName = "test_optimize_table_cleans_equality_delete_file_when_whole_table_is_scanned" + randomNameSuffix(); - assertUpdate("CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['regionkey']) AS SELECT * FROM tpch.tiny.nation", 25); + assertUpdate("CREATE TABLE " + tableName + " (LIKE nation) WITH (partitioning = ARRAY['regionkey'])"); + // Create multiple files per partition + for (int nationKey = 0; nationKey < 25; nationKey++) { + assertUpdate("INSERT INTO " + tableName + " SELECT * FROM tpch.tiny.nation WHERE nationkey = " + nationKey, 1); + } Table icebergTable = loadTable(tableName); assertThat(icebergTable.currentSnapshot().summary()).containsEntry("total-equality-deletes", "0"); writeEqualityDeleteToNationTable(icebergTable, Optional.of(icebergTable.spec()), Optional.of(new PartitionData(new Long[] {1L}))); @@ -329,7 +333,11 @@ public void testOptimizingV2TableDoesntRemoveEqualityDeletesWhenOnlyPartOfTheTab throws Exception { String tableName = "test_optimize_table_with_equality_delete_file_for_different_partition_" + randomNameSuffix(); - assertUpdate("CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['regionkey']) AS SELECT * FROM tpch.tiny.nation", 25); + assertUpdate("CREATE TABLE " + tableName + " (LIKE nation) WITH (partitioning = ARRAY['regionkey'])"); + // Create multiple files per partition + for (int nationKey = 0; nationKey < 25; nationKey++) { + assertUpdate("INSERT INTO " + tableName + " SELECT * FROM tpch.tiny.nation WHERE nationkey = " + nationKey, 1); + } Table icebergTable = loadTable(tableName); assertThat(icebergTable.currentSnapshot().summary()).containsEntry("total-equality-deletes", "0"); List initialActiveFiles = getActiveFiles(tableName); @@ -593,7 +601,11 @@ public void testOptimizingPartitionsOfV2TableWithGlobalEqualityDeleteFile() throws Exception { String tableName = "test_optimize_partitioned_table_with_global_equality_delete_file_" + randomNameSuffix(); - assertUpdate("CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['regionkey']) AS SELECT * FROM tpch.tiny.nation", 25); + assertUpdate("CREATE TABLE " + tableName + " (LIKE nation) WITH (partitioning = ARRAY['regionkey'])"); + // Create multiple files per partition + for (int nationKey = 0; nationKey < 25; nationKey++) { + assertUpdate("INSERT INTO " + tableName + " SELECT * FROM tpch.tiny.nation WHERE nationkey = " + nationKey, 1); + } Table icebergTable = loadTable(tableName); assertThat(icebergTable.currentSnapshot().summary()).containsEntry("total-equality-deletes", "0"); writeEqualityDeleteToNationTable(icebergTable, Optional.of(icebergTable.spec()), Optional.of(new PartitionData(new Long[] {1L})));