Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip single file in a partition from OPTIMIZE #23864

Merged
merged 5 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public ConnectorSplitSource getSplits(
fileSystemFactory,
session,
table,
icebergTable.io().properties(),
icebergTable,
scan,
table.getMaxScannedFileSize(),
dynamicFilter,
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
*/
package io.trino.plugin.iceberg;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.spi.block.SqlRow;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorSession;
Expand All @@ -32,15 +30,12 @@
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.util.StructLikeWrapper;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand All @@ -49,17 +44,16 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.trino.plugin.iceberg.IcebergTypes.convertIcebergValueToTrino;
import static io.trino.plugin.iceberg.IcebergUtil.getIdentityPartitions;
import static io.trino.plugin.iceberg.IcebergUtil.primitiveFieldTypes;
import static io.trino.plugin.iceberg.StructLikeWrapperWithFieldIdToIndex.createStructLikeWrapper;
import static io.trino.plugin.iceberg.TypeConverter.toTrinoType;
import static io.trino.spi.block.RowValueBuilder.buildRowValue;
import static io.trino.spi.type.BigintType.BIGINT;
Expand Down Expand Up @@ -219,10 +213,7 @@ private Map<StructLikeWrapperWithFieldIdToIndex, IcebergStatistics> getStatistic
Map<StructLikeWrapperWithFieldIdToIndex, IcebergStatistics.Builder> partitions = new HashMap<>();
for (FileScanTask fileScanTask : fileScanTasks) {
DataFile dataFile = fileScanTask.file();
Types.StructType structType = fileScanTask.spec().partitionType();
StructLike partitionStruct = dataFile.partition();
StructLikeWrapper partitionWrapper = StructLikeWrapper.forType(structType).set(partitionStruct);
StructLikeWrapperWithFieldIdToIndex structLikeWrapperWithFieldIdToIndex = new StructLikeWrapperWithFieldIdToIndex(partitionWrapper, structType);
StructLikeWrapperWithFieldIdToIndex structLikeWrapperWithFieldIdToIndex = createStructLikeWrapper(fileScanTask);

partitions.computeIfAbsent(
structLikeWrapperWithFieldIdToIndex,
Expand Down Expand Up @@ -262,10 +253,10 @@ private RecordCursor buildRecordCursor(Map<StructLikeWrapperWithFieldIdToIndex,
io.trino.spi.type.Type trinoType = partitionColumnType.rowType.getFields().get(i).getType();
Object value = null;
Integer fieldId = partitionColumnType.fieldIds.get(i);
if (partitionStruct.fieldIdToIndex.containsKey(fieldId)) {
if (partitionStruct.getFieldIdToIndex().containsKey(fieldId)) {
value = convertIcebergValueToTrino(
partitionTypes.get(i),
partitionStruct.structLikeWrapper.get().get(partitionStruct.fieldIdToIndex.get(fieldId), partitionColumnClass.get(i)));
partitionStruct.getStructLikeWrapper().get().get(partitionStruct.getFieldIdToIndex().get(fieldId), partitionColumnClass.get(i)));
}
writeNativeValue(trinoType, fields.get(i), value);
}
Expand Down Expand Up @@ -333,42 +324,5 @@ private static SqlRow getColumnMetricBlock(RowType columnMetricType, Object min,
});
}

@VisibleForTesting
static class StructLikeWrapperWithFieldIdToIndex
{
private final StructLikeWrapper structLikeWrapper;
private final Map<Integer, Integer> fieldIdToIndex;

public StructLikeWrapperWithFieldIdToIndex(StructLikeWrapper structLikeWrapper, Types.StructType structType)
{
this.structLikeWrapper = structLikeWrapper;
ImmutableMap.Builder<Integer, Integer> fieldIdToIndex = ImmutableMap.builder();
List<NestedField> fields = structType.fields();
IntStream.range(0, fields.size())
.forEach(i -> fieldIdToIndex.put(fields.get(i).fieldId(), i));
this.fieldIdToIndex = fieldIdToIndex.buildOrThrow();
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
StructLikeWrapperWithFieldIdToIndex that = (StructLikeWrapperWithFieldIdToIndex) o;
// Due to bogus implementation of equals in StructLikeWrapper https://github.com/apache/iceberg/issues/5064 order here matters.
return Objects.equals(fieldIdToIndex, that.fieldIdToIndex) && Objects.equals(structLikeWrapper, that.structLikeWrapper);
}

@Override
public int hashCode()
{
return Objects.hash(fieldIdToIndex, structLikeWrapper);
}
}

private record IcebergPartitionColumn(RowType rowType, List<Integer> fieldIds) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructLikeWrapper;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.IntStream;

public class StructLikeWrapperWithFieldIdToIndex
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice class name

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

credit to @homar ;)

{
private final StructLikeWrapper structLikeWrapper;
private final Map<Integer, Integer> fieldIdToIndex;

public static StructLikeWrapperWithFieldIdToIndex createStructLikeWrapper(FileScanTask fileScanTask)
{
Types.StructType structType = fileScanTask.spec().partitionType();
StructLikeWrapper partitionWrapper = StructLikeWrapper.forType(structType).set(fileScanTask.file().partition());
return new StructLikeWrapperWithFieldIdToIndex(partitionWrapper, structType);
raunaqmorarka marked this conversation as resolved.
Show resolved Hide resolved
}

@VisibleForTesting
StructLikeWrapperWithFieldIdToIndex(StructLikeWrapper structLikeWrapper, Types.StructType structType)
{
this.structLikeWrapper = structLikeWrapper;
ImmutableMap.Builder<Integer, Integer> fieldIdToIndex = ImmutableMap.builder();
List<Types.NestedField> fields = structType.fields();
IntStream.range(0, fields.size())
.forEach(i -> fieldIdToIndex.put(fields.get(i).fieldId(), i));
this.fieldIdToIndex = fieldIdToIndex.buildOrThrow();
}

public StructLikeWrapper getStructLikeWrapper()
{
return structLikeWrapper;
}

public Map<Integer, Integer> getFieldIdToIndex()
{
return fieldIdToIndex;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
StructLikeWrapperWithFieldIdToIndex that = (StructLikeWrapperWithFieldIdToIndex) o;
// Due to bogus implementation of equals in StructLikeWrapper https://github.com/apache/iceberg/issues/5064 order here matters.
return Objects.equals(fieldIdToIndex, that.fieldIdToIndex) && Objects.equals(structLikeWrapper, that.structLikeWrapper);
}

@Override
public int hashCode()
{
return Objects.hash(fieldIdToIndex, structLikeWrapper);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.Session;
Expand Down Expand Up @@ -5377,7 +5378,7 @@ public void testOptimizeForPartitionedTable()
List<String> updatedFiles = getActiveFiles(tableName);
// as we force repartitioning there should be only 3 partitions
assertThat(updatedFiles).hasSize(3);
assertThat(getAllDataFilesFromTableDirectory(tableName)).containsExactlyInAnyOrderElementsOf(concat(initialFiles, updatedFiles));
assertThat(getAllDataFilesFromTableDirectory(tableName)).containsExactlyInAnyOrderElementsOf(ImmutableSet.copyOf(concat(initialFiles, updatedFiles)));

assertUpdate("DROP TABLE " + tableName);
}
Expand Down Expand Up @@ -5545,7 +5546,8 @@ public void testOptimizeCleansUpDeleteFiles()
List<String> allDataFilesAfterFullOptimize = getAllDataFilesFromTableDirectory(tableName);
assertThat(allDataFilesAfterFullOptimize)
.hasSize(5)
.doesNotContain(allDataFilesInitially.toArray(new String[0]));
// All files skipped from OPTIMIZE as they have no deletes and there's only one file per partition
.contains(allDataFilesAfterOptimizeWithWhere.toArray(new String[0]));

assertThat(query("SELECT * FROM " + tableName))
.matches("SELECT * FROM nation WHERE nationkey != 7");
Expand Down Expand Up @@ -5578,6 +5580,65 @@ public void testOptimizeSystemTable()
.failure().hasMessage("This connector does not support table procedures");
}

@Test
void testOptimizeOnlyOneFileShouldHaveNoEffect()
{
String tableName = "test_optimize_one_file_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " (a integer)");
assertUpdate("INSERT INTO " + tableName + " VALUES 1, 2", 2);

List<String> initialFiles = getActiveFiles(tableName);
assertThat(initialFiles).hasSize(1);

computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
assertThat(query("SELECT a FROM " + tableName))
.matches("VALUES 1, 2");
assertThat(getActiveFiles(tableName))
.containsExactlyInAnyOrderElementsOf(initialFiles);

assertUpdate("DELETE FROM " + tableName + " WHERE a = 1", 1);
// Calling optimize after adding a DELETE should result in compaction
computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
assertThat(query("SELECT a FROM " + tableName))
.matches("VALUES 2");
assertThat(getActiveFiles(tableName))
.hasSize(1)
.doesNotContainAnyElementsOf(initialFiles);

assertUpdate("DROP TABLE " + tableName);
}

@Test
void testOptimizeAfterChangeInPartitioning()
{
String tableName = "test_optimize_after_change_in_partitioning_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['bucket(nationkey, 5)']) AS SELECT * FROM tpch.tiny.supplier", 100);
List<String> initialFiles = getActiveFiles(tableName);
assertThat(initialFiles).hasSize(5);

// OPTIMIZE shouldn't have to rewrite files
computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
assertThat(query("SELECT COUNT(*) FROM " + tableName)).matches("VALUES BIGINT '100'");
assertThat(getActiveFiles(tableName))
.containsExactlyInAnyOrderElementsOf(initialFiles);

// Change in partitioning should result in OPTIMIZE rewriting all files
assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES partitioning = ARRAY['nationkey']");
computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
assertThat(query("SELECT COUNT(*) FROM " + tableName)).matches("VALUES BIGINT '100'");
List<String> filesAfterPartioningChange = getActiveFiles(tableName);
assertThat(filesAfterPartioningChange)
.hasSize(25)
.doesNotContainAnyElementsOf(initialFiles);

// OPTIMIZE shouldn't have to rewrite files anymore
computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
assertThat(query("SELECT COUNT(*) FROM " + tableName)).matches("VALUES BIGINT '100'");
assertThat(getActiveFiles(tableName))
.hasSize(25)
.containsExactlyInAnyOrderElementsOf(filesAfterPartioningChange);
}

private List<String> getActiveFiles(String tableName)
{
return computeActual(format("SELECT file_path FROM \"%s$files\"", tableName)).getOnlyColumn()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public void testIncompleteDynamicFilterTimeout()
new DefaultIcebergFileSystemFactory(fileSystemFactory),
SESSION,
tableHandle,
ImmutableMap.of(),
nationTable,
nationTable.newScan(),
Optional.empty(),
new DynamicFilter()
Expand Down Expand Up @@ -443,7 +443,7 @@ private IcebergSplit generateSplit(Table nationTable, IcebergTableHandle tableHa
new DefaultIcebergFileSystemFactory(fileSystemFactory),
SESSION,
tableHandle,
ImmutableMap.of(),
nationTable,
nationTable.newScan(),
Optional.empty(),
dynamicFilter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,11 @@ public void testOptimizingV2TableRemovesEqualityDeletesWhenWholeTableIsScanned()
throws Exception
{
String tableName = "test_optimize_table_cleans_equality_delete_file_when_whole_table_is_scanned" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['regionkey']) AS SELECT * FROM tpch.tiny.nation", 25);
assertUpdate("CREATE TABLE " + tableName + " (LIKE nation) WITH (partitioning = ARRAY['regionkey'])");
// Create multiple files per partition
for (int nationKey = 0; nationKey < 25; nationKey++) {
assertUpdate("INSERT INTO " + tableName + " SELECT * FROM tpch.tiny.nation WHERE nationkey = " + nationKey, 1);
}
Table icebergTable = loadTable(tableName);
assertThat(icebergTable.currentSnapshot().summary()).containsEntry("total-equality-deletes", "0");
writeEqualityDeleteToNationTable(icebergTable, Optional.of(icebergTable.spec()), Optional.of(new PartitionData(new Long[] {1L})));
Expand All @@ -329,7 +333,11 @@ public void testOptimizingV2TableDoesntRemoveEqualityDeletesWhenOnlyPartOfTheTab
throws Exception
{
String tableName = "test_optimize_table_with_equality_delete_file_for_different_partition_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['regionkey']) AS SELECT * FROM tpch.tiny.nation", 25);
assertUpdate("CREATE TABLE " + tableName + " (LIKE nation) WITH (partitioning = ARRAY['regionkey'])");
// Create multiple files per partition
for (int nationKey = 0; nationKey < 25; nationKey++) {
assertUpdate("INSERT INTO " + tableName + " SELECT * FROM tpch.tiny.nation WHERE nationkey = " + nationKey, 1);
}
Table icebergTable = loadTable(tableName);
assertThat(icebergTable.currentSnapshot().summary()).containsEntry("total-equality-deletes", "0");
List<String> initialActiveFiles = getActiveFiles(tableName);
Expand Down Expand Up @@ -593,7 +601,11 @@ public void testOptimizingPartitionsOfV2TableWithGlobalEqualityDeleteFile()
throws Exception
{
String tableName = "test_optimize_partitioned_table_with_global_equality_delete_file_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['regionkey']) AS SELECT * FROM tpch.tiny.nation", 25);
assertUpdate("CREATE TABLE " + tableName + " (LIKE nation) WITH (partitioning = ARRAY['regionkey'])");
// Create multiple files per partition
for (int nationKey = 0; nationKey < 25; nationKey++) {
assertUpdate("INSERT INTO " + tableName + " SELECT * FROM tpch.tiny.nation WHERE nationkey = " + nationKey, 1);
}
Table icebergTable = loadTable(tableName);
assertThat(icebergTable.currentSnapshot().summary()).containsEntry("total-equality-deletes", "0");
writeEqualityDeleteToNationTable(icebergTable, Optional.of(icebergTable.spec()), Optional.of(new PartitionData(new Long[] {1L})));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public void testStructLikeWrapperWithFieldIdToIndexEquals()
NestedField.optional(1001, "level", IntegerType.get()));
PartitionData firstPartitionData = PartitionData.fromJson("{\"partitionValues\":[\"ERROR\",\"449245\"]}", new Type[] {StringType.get(), IntegerType.get()});
PartitionData secondPartitionData = PartitionData.fromJson("{\"partitionValues\":[\"449245\",\"ERROR\"]}", new Type[] {IntegerType.get(), StringType.get()});
PartitionTable.StructLikeWrapperWithFieldIdToIndex first = new PartitionTable.StructLikeWrapperWithFieldIdToIndex(StructLikeWrapper.forType(firstStructType).set(firstPartitionData), firstStructType);
PartitionTable.StructLikeWrapperWithFieldIdToIndex second = new PartitionTable.StructLikeWrapperWithFieldIdToIndex(StructLikeWrapper.forType(secondStructType).set(secondPartitionData), secondStructType);
StructLikeWrapperWithFieldIdToIndex first = new StructLikeWrapperWithFieldIdToIndex(StructLikeWrapper.forType(firstStructType).set(firstPartitionData), firstStructType);
StructLikeWrapperWithFieldIdToIndex second = new StructLikeWrapperWithFieldIdToIndex(StructLikeWrapper.forType(secondStructType).set(secondPartitionData), secondStructType);
assertThat(first).isNotEqualTo(second);
}
}