Skip to content

Commit

Permalink
Skip single file in a partition from OPTIMIZE
Browse files Browse the repository at this point in the history
Single file without a delete in a partition can't be
optimized any further
  • Loading branch information
raunaqmorarka committed Oct 25, 2024
1 parent b2c7e6e commit 84ac714
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public ConnectorSplitSource getSplits(
fileSystemFactory,
session,
table,
icebergTable.io().properties(),
icebergTable,
scan,
table.getMaxScannedFileSize(),
dynamicFilter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.Iterators;
import com.google.common.collect.Maps;
import com.google.common.io.Closer;
import io.airlift.log.Logger;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.cache.NonEvictableCache;
Expand Down Expand Up @@ -49,6 +50,7 @@
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Scan;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
Expand All @@ -58,6 +60,7 @@
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -94,6 +97,7 @@
import static io.trino.plugin.iceberg.IcebergUtil.getPartitionValues;
import static io.trino.plugin.iceberg.IcebergUtil.getPathDomain;
import static io.trino.plugin.iceberg.IcebergUtil.primitiveFieldTypes;
import static io.trino.plugin.iceberg.StructLikeWrapperWithFieldIdToIndex.createStructLikeWrapper;
import static io.trino.plugin.iceberg.TypeConverter.toIcebergType;
import static io.trino.spi.type.DateTimeEncoding.packDateTimeWithZone;
import static io.trino.spi.type.TimeZoneKey.UTC_KEY;
Expand All @@ -110,6 +114,7 @@
public class IcebergSplitSource
implements ConnectorSplitSource
{
private static final Logger log = Logger.get(IcebergSplitSource.class);
private static final ConnectorSplitBatch EMPTY_BATCH = new ConnectorSplitBatch(ImmutableList.of(), false);
private static final ConnectorSplitBatch NO_MORE_SPLITS_BATCH = new ConnectorSplitBatch(ImmutableList.of(), true);

Expand Down Expand Up @@ -141,15 +146,18 @@ public class IcebergSplitSource
private Iterator<FileScanTaskWithDomain> fileTasksIterator = emptyIterator();

private final boolean recordScannedFiles;
private final int currentSpecId;
private final ImmutableSet.Builder<DataFileWithDeleteFiles> scannedFiles = ImmutableSet.builder();
@Nullable
private Map<StructLikeWrapperWithFieldIdToIndex, Optional<FileScanTaskWithDomain>> scannedFilesByPartition = new HashMap<>();
private long outputRowsLowerBound;
private final CachingHostAddressProvider cachingHostAddressProvider;

public IcebergSplitSource(
IcebergFileSystemFactory fileSystemFactory,
ConnectorSession session,
IcebergTableHandle tableHandle,
Map<String, String> fileIoProperties,
Table icebergTable,
Scan<?, FileScanTask, CombinedScanTask> tableScan,
Optional<DataSize> maxScannedFileSize,
DynamicFilter dynamicFilter,
Expand All @@ -163,7 +171,7 @@ public IcebergSplitSource(
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.session = requireNonNull(session, "session is null");
this.tableHandle = requireNonNull(tableHandle, "tableHandle is null");
this.fileIoProperties = requireNonNull(fileIoProperties, "fileIoProperties is null");
this.fileIoProperties = requireNonNull(icebergTable.io().properties(), "fileIoProperties is null");
this.tableScan = requireNonNull(tableScan, "tableScan is null");
this.maxScannedFileSizeInBytes = maxScannedFileSize.map(DataSize::toBytes);
this.fieldIdToType = primitiveFieldTypes(tableScan.schema());
Expand All @@ -173,6 +181,7 @@ public IcebergSplitSource(
this.partitionConstraintMatcher = new PartitionConstraintMatcher(constraint);
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.recordScannedFiles = recordScannedFiles;
this.currentSpecId = icebergTable.spec().specId();
this.minimumAssignedSplitWeight = minimumAssignedSplitWeight;
this.projectedBaseColumns = tableHandle.getProjectedColumns().stream()
.map(column -> column.getBaseColumnIdentity().getId())
Expand Down Expand Up @@ -322,6 +331,37 @@ private List<FileScanTaskWithDomain> processFileScanTask(TupleDomain<IcebergColu
return ImmutableList.of();
}

if (!recordScannedFiles || scannedFilesByPartition == null) {
return ImmutableList.of(fileScanTaskWithDomain);
}

// Assess if the partition that wholeFileTask belongs to should be included for OPTIMIZE
// If file was partitioned under an old spec, OPTIMIZE may be able to merge it with another file under new partitioning spec
// We don't know which partition of new spec this file belongs to, so we include all files in OPTIMIZE
if (currentSpecId != wholeFileTask.spec().specId()) {
Stream<FileScanTaskWithDomain> allQueuedTasks = scannedFilesByPartition.values().stream()
.filter(Optional::isPresent)
.map(Optional::get);
scannedFilesByPartition = null;
return Stream.concat(allQueuedTasks, Stream.of(fileScanTaskWithDomain)).collect(toImmutableList());
}
StructLikeWrapperWithFieldIdToIndex structLikeWrapperWithFieldIdToIndex = createStructLikeWrapper(wholeFileTask);
Optional<FileScanTaskWithDomain> alreadyQueuedFileTask = scannedFilesByPartition.get(structLikeWrapperWithFieldIdToIndex);
if (alreadyQueuedFileTask != null) {
// Optional.empty() is a marker for partitions where we've seen enough files to avoid skipping them from OPTIMIZE
if (alreadyQueuedFileTask.isEmpty()) {
return ImmutableList.of(fileScanTaskWithDomain);
}
scannedFilesByPartition.put(structLikeWrapperWithFieldIdToIndex, Optional.empty());
return ImmutableList.of(alreadyQueuedFileTask.get(), fileScanTaskWithDomain);
}
// If file has no deletions, and it's the only file seen so far for the partition
// then we skip it from splits generation unless we encounter another file in the same partition
if (fileHasNoDeletions) {
scannedFilesByPartition.put(structLikeWrapperWithFieldIdToIndex, Optional.of(fileScanTaskWithDomain));
return ImmutableList.of();
}
scannedFilesByPartition.put(structLikeWrapperWithFieldIdToIndex, Optional.empty());
return ImmutableList.of(fileScanTaskWithDomain);
}

Expand Down Expand Up @@ -398,7 +438,16 @@ public Optional<List<Object>> getTableExecuteSplitsInfo()
if (!recordScannedFiles) {
return Optional.empty();
}
return Optional.of(ImmutableList.copyOf(scannedFiles.build()));
long filesSkipped = 0;
if (scannedFilesByPartition != null) {
filesSkipped = scannedFilesByPartition.values().stream()
.filter(Optional::isPresent)
.count();
scannedFilesByPartition = null;
}
List<Object> splitsInfo = ImmutableList.copyOf(scannedFiles.build());
log.info("Generated %d splits, skipped %d files for OPTIMIZE", splitsInfo.size(), filesSkipped);
return Optional.of(splitsInfo);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.Session;
Expand Down Expand Up @@ -5377,7 +5378,7 @@ public void testOptimizeForPartitionedTable()
List<String> updatedFiles = getActiveFiles(tableName);
// as we force repartitioning there should be only 3 partitions
assertThat(updatedFiles).hasSize(3);
assertThat(getAllDataFilesFromTableDirectory(tableName)).containsExactlyInAnyOrderElementsOf(concat(initialFiles, updatedFiles));
assertThat(getAllDataFilesFromTableDirectory(tableName)).containsExactlyInAnyOrderElementsOf(ImmutableSet.copyOf(concat(initialFiles, updatedFiles)));

assertUpdate("DROP TABLE " + tableName);
}
Expand Down Expand Up @@ -5545,7 +5546,8 @@ public void testOptimizeCleansUpDeleteFiles()
List<String> allDataFilesAfterFullOptimize = getAllDataFilesFromTableDirectory(tableName);
assertThat(allDataFilesAfterFullOptimize)
.hasSize(5)
.doesNotContain(allDataFilesInitially.toArray(new String[0]));
// All files skipped from OPTIMIZE as they have no deletes and there's only one file per partition
.contains(allDataFilesAfterOptimizeWithWhere.toArray(new String[0]));

assertThat(query("SELECT * FROM " + tableName))
.matches("SELECT * FROM nation WHERE nationkey != 7");
Expand Down Expand Up @@ -5578,6 +5580,65 @@ public void testOptimizeSystemTable()
.failure().hasMessage("This connector does not support table procedures");
}

@Test
void testOptimizeOnlyOneFileShouldHaveNoEffect()
{
String tableName = "test_optimize_one_file_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " (a integer)");
assertUpdate("INSERT INTO " + tableName + " VALUES 1, 2", 2);

List<String> initialFiles = getActiveFiles(tableName);
assertThat(initialFiles).hasSize(1);

computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
assertThat(query("SELECT a FROM " + tableName))
.matches("VALUES 1, 2");
assertThat(getActiveFiles(tableName))
.containsExactlyInAnyOrderElementsOf(initialFiles);

assertUpdate("DELETE FROM " + tableName + " WHERE a = 1", 1);
// Calling optimize after adding a DELETE should result in compaction
computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
assertThat(query("SELECT a FROM " + tableName))
.matches("VALUES 2");
assertThat(getActiveFiles(tableName))
.hasSize(1)
.doesNotContainAnyElementsOf(initialFiles);

assertUpdate("DROP TABLE " + tableName);
}

@Test
void testOptimizeAfterChangeInPartitioning()
{
String tableName = "test_optimize_after_change_in_partitioning_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['bucket(nationkey, 5)']) AS SELECT * FROM tpch.tiny.supplier", 100);
List<String> initialFiles = getActiveFiles(tableName);
assertThat(initialFiles).hasSize(5);

// OPTIMIZE shouldn't have to rewrite files
computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
assertThat(query("SELECT COUNT(*) FROM " + tableName)).matches("VALUES BIGINT '100'");
assertThat(getActiveFiles(tableName))
.containsExactlyInAnyOrderElementsOf(initialFiles);

// Change in partitioning should result in OPTIMIZE rewriting all files
assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES partitioning = ARRAY['nationkey']");
computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
assertThat(query("SELECT COUNT(*) FROM " + tableName)).matches("VALUES BIGINT '100'");
List<String> filesAfterPartioningChange = getActiveFiles(tableName);
assertThat(filesAfterPartioningChange)
.hasSize(25)
.doesNotContainAnyElementsOf(initialFiles);

// OPTIMIZE shouldn't have to rewrite files anymore
computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
assertThat(query("SELECT COUNT(*) FROM " + tableName)).matches("VALUES BIGINT '100'");
assertThat(getActiveFiles(tableName))
.hasSize(25)
.containsExactlyInAnyOrderElementsOf(filesAfterPartioningChange);
}

private List<String> getActiveFiles(String tableName)
{
return computeActual(format("SELECT file_path FROM \"%s$files\"", tableName)).getOnlyColumn()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public void testIncompleteDynamicFilterTimeout()
new DefaultIcebergFileSystemFactory(fileSystemFactory),
SESSION,
tableHandle,
ImmutableMap.of(),
nationTable,
nationTable.newScan(),
Optional.empty(),
new DynamicFilter()
Expand Down Expand Up @@ -443,7 +443,7 @@ private IcebergSplit generateSplit(Table nationTable, IcebergTableHandle tableHa
new DefaultIcebergFileSystemFactory(fileSystemFactory),
SESSION,
tableHandle,
ImmutableMap.of(),
nationTable,
nationTable.newScan(),
Optional.empty(),
dynamicFilter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,11 @@ public void testOptimizingV2TableRemovesEqualityDeletesWhenWholeTableIsScanned()
throws Exception
{
String tableName = "test_optimize_table_cleans_equality_delete_file_when_whole_table_is_scanned" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['regionkey']) AS SELECT * FROM tpch.tiny.nation", 25);
assertUpdate("CREATE TABLE " + tableName + " (LIKE nation) WITH (partitioning = ARRAY['regionkey'])");
// Create multiple files per partition
for (int nationKey = 0; nationKey < 25; nationKey++) {
assertUpdate("INSERT INTO " + tableName + " SELECT * FROM tpch.tiny.nation WHERE nationkey = " + nationKey, 1);
}
Table icebergTable = loadTable(tableName);
assertThat(icebergTable.currentSnapshot().summary()).containsEntry("total-equality-deletes", "0");
writeEqualityDeleteToNationTable(icebergTable, Optional.of(icebergTable.spec()), Optional.of(new PartitionData(new Long[] {1L})));
Expand All @@ -329,7 +333,11 @@ public void testOptimizingV2TableDoesntRemoveEqualityDeletesWhenOnlyPartOfTheTab
throws Exception
{
String tableName = "test_optimize_table_with_equality_delete_file_for_different_partition_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['regionkey']) AS SELECT * FROM tpch.tiny.nation", 25);
assertUpdate("CREATE TABLE " + tableName + " (LIKE nation) WITH (partitioning = ARRAY['regionkey'])");
// Create multiple files per partition
for (int nationKey = 0; nationKey < 25; nationKey++) {
assertUpdate("INSERT INTO " + tableName + " SELECT * FROM tpch.tiny.nation WHERE nationkey = " + nationKey, 1);
}
Table icebergTable = loadTable(tableName);
assertThat(icebergTable.currentSnapshot().summary()).containsEntry("total-equality-deletes", "0");
List<String> initialActiveFiles = getActiveFiles(tableName);
Expand Down Expand Up @@ -593,7 +601,11 @@ public void testOptimizingPartitionsOfV2TableWithGlobalEqualityDeleteFile()
throws Exception
{
String tableName = "test_optimize_partitioned_table_with_global_equality_delete_file_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['regionkey']) AS SELECT * FROM tpch.tiny.nation", 25);
assertUpdate("CREATE TABLE " + tableName + " (LIKE nation) WITH (partitioning = ARRAY['regionkey'])");
// Create multiple files per partition
for (int nationKey = 0; nationKey < 25; nationKey++) {
assertUpdate("INSERT INTO " + tableName + " SELECT * FROM tpch.tiny.nation WHERE nationkey = " + nationKey, 1);
}
Table icebergTable = loadTable(tableName);
assertThat(icebergTable.currentSnapshot().summary()).containsEntry("total-equality-deletes", "0");
writeEqualityDeleteToNationTable(icebergTable, Optional.of(icebergTable.spec()), Optional.of(new PartitionData(new Long[] {1L})));
Expand Down

0 comments on commit 84ac714

Please sign in to comment.