Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of Iceberg remove_orphan_files #14383

Merged
merged 6 commits into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import org.apache.datasketches.theta.CompactSketch;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
Expand All @@ -107,11 +108,12 @@
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.IsolationLevel;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestReader;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Schema;
Expand Down Expand Up @@ -155,7 +157,6 @@
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -168,9 +169,6 @@
import static com.google.common.collect.Iterables.getLast;
import static com.google.common.collect.Maps.transformValues;
import static com.google.common.collect.Sets.difference;
import static com.google.common.collect.Sets.union;
import static com.google.common.collect.Streams.concat;
import static com.google.common.collect.Streams.stream;
import static io.trino.plugin.base.util.Procedures.checkProcedureArgument;
import static io.trino.plugin.hive.HiveApplyProjectionUtil.extractSupportedProjectedColumns;
import static io.trino.plugin.hive.HiveApplyProjectionUtil.replaceWithNewVariables;
Expand Down Expand Up @@ -1242,39 +1240,55 @@ public void executeRemoveOrphanFiles(ConnectorSession session, IcebergTableExecu

long expireTimestampMillis = session.getStart().toEpochMilli() - retention.toMillis();
removeOrphanFiles(table, session, executeHandle.getSchemaTableName(), expireTimestampMillis);
removeOrphanMetadataFiles(table, session, executeHandle.getSchemaTableName(), expireTimestampMillis);
}

private void removeOrphanFiles(Table table, ConnectorSession session, SchemaTableName schemaTableName, long expireTimestamp)
{
Set<String> validDataFilePaths = stream(table.snapshots())
.map(Snapshot::snapshotId)
.flatMap(snapshotId -> stream(table.newScan().useSnapshot(snapshotId).planFiles()))
.map(fileScanTask -> fileName(fileScanTask.file().path().toString()))
.collect(toImmutableSet());
Set<String> validDeleteFilePaths = stream(table.snapshots())
.map(Snapshot::snapshotId)
.flatMap(snapshotId -> stream(table.newScan().useSnapshot(snapshotId).planFiles()))
.flatMap(fileScanTask -> fileScanTask.deletes().stream().map(file -> fileName(file.path().toString())))
.collect(Collectors.toUnmodifiableSet());
scanAndDeleteInvalidFiles(table, session, schemaTableName, expireTimestamp, union(validDataFilePaths, validDeleteFilePaths), "/data");
Set<String> processedManifestFilePaths = new HashSet<>();
// Similarly to issues like https://github.com/trinodb/trino/issues/13759, equivalent paths may have different String
// representations due to things like double slashes. Using file names may result in retaining files which could be removed.
// However, in practice Iceberg metadata and data files have UUIDs in their names which makes this unlikely.
ImmutableSet.Builder<String> validMetadataFileNames = ImmutableSet.builder();
ImmutableSet.Builder<String> validDataFileNames = ImmutableSet.builder();

for (Snapshot snapshot : table.snapshots()) {
if (snapshot.manifestListLocation() != null) {
validMetadataFileNames.add(fileName(snapshot.manifestListLocation()));
}

for (ManifestFile manifest : snapshot.allManifests(table.io())) {
if (!processedManifestFilePaths.add(manifest.path())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This used to do

validMetadataFileNames.add(fileName(manifest.path())

do we not need to populate validMetadataFileNames?
is it because metadataFileLocations(table, false).stream() will do this for us?
if so, maybe we don't need validMetadataFileNames.add(fileName(snapshot.manifestListLocation())); either?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what I get for thinking, I don't need to rerun the tests it was just a small refactor 😦

// Already read this manifest
continue;
alexjo2144 marked this conversation as resolved.
Show resolved Hide resolved
}

validMetadataFileNames.add(fileName(manifest.path()));
try (ManifestReader<? extends ContentFile<?>> manifestReader = readerForManifest(table, manifest)) {
for (ContentFile<?> contentFile : manifestReader) {
findepi marked this conversation as resolved.
Show resolved Hide resolved
validDataFileNames.add(fileName(contentFile.path().toString()));
}
}
catch (IOException e) {
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Unable to list manifest file content from " + manifest.path(), e);
}
}
}

metadataFileLocations(table, false).stream()
.map(IcebergUtil::fileName)
.forEach(validMetadataFileNames::add);
validMetadataFileNames.add(fileName(versionHintLocation(table)));

scanAndDeleteInvalidFiles(table, session, schemaTableName, expireTimestamp, validDataFileNames.build(), "data");
scanAndDeleteInvalidFiles(table, session, schemaTableName, expireTimestamp, validMetadataFileNames.build(), "metadata");
}

private void removeOrphanMetadataFiles(Table table, ConnectorSession session, SchemaTableName schemaTableName, long expireTimestamp)
private static ManifestReader<? extends ContentFile<?>> readerForManifest(Table table, ManifestFile manifest)
{
ImmutableSet<String> manifests = stream(table.snapshots())
.flatMap(snapshot -> snapshot.allManifests(table.io()).stream())
.map(ManifestFile::path)
.collect(toImmutableSet());
List<String> manifestLists = ReachableFileUtil.manifestListLocations(table);
List<String> otherMetadataFiles = concat(
metadataFileLocations(table, false).stream(),
Stream.of(versionHintLocation(table)))
.collect(toImmutableList());
Set<String> validMetadataFiles = concat(manifests.stream(), manifestLists.stream(), otherMetadataFiles.stream())
.map(IcebergUtil::fileName)
.collect(toImmutableSet());
scanAndDeleteInvalidFiles(table, session, schemaTableName, expireTimestamp, validMetadataFiles, "metadata");
return switch (manifest.content()) {
case DATA -> ManifestFiles.read(manifest, table.io());
case DELETES -> ManifestFiles.readDeleteManifest(manifest, table.io(), table.specs());
};
}

private void scanAndDeleteInvalidFiles(Table table, ConnectorSession session, SchemaTableName schemaTableName, long expireTimestamp, Set<String> validFiles, String subfolder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5260,11 +5260,14 @@ public void testRemoveOrphanFiles()
Session sessionWithShortRetentionUnlocked = prepareCleanUpSession();
assertUpdate("CREATE TABLE " + tableName + " (key varchar, value integer)");
assertUpdate("INSERT INTO " + tableName + " VALUES ('one', 1)", 1);
assertUpdate("INSERT INTO " + tableName + " VALUES ('two', 2), ('three', 3)", 2);
assertUpdate("DELETE FROM " + tableName + " WHERE key = 'two'", 1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change existing test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It didn't exercise delete files, and also didn't validate that we didn't corrupt the table. Seemed like reasonable additions

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Let's have it as a separate commit, so that we can capture the rationale in the commit message.

otherwise it looks like a left-over from local testing

String location = getTableLocation(tableName);
Path orphanFile = Files.createFile(Path.of(getIcebergTableDataPath(location).toString(), "invalidData." + format));
List<String> initialDataFiles = getAllDataFilesFromTableDirectory(tableName);

assertQuerySucceeds(sessionWithShortRetentionUnlocked, "ALTER TABLE " + tableName + " EXECUTE REMOVE_ORPHAN_FILES (retention_threshold => '0s')");
assertQuery("SELECT * FROM " + tableName, "VALUES ('one', 1), ('three', 3)");

List<String> updatedDataFiles = getAllDataFilesFromTableDirectory(tableName);
assertThat(updatedDataFiles.size()).isLessThan(initialDataFiles.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import static io.trino.plugin.tpch.TpchMetadata.TINY_SCHEMA_NAME;
import static io.trino.testing.QueryAssertions.copyTpchTables;
import static io.trino.testing.TestingSession.testSessionBuilder;
import static io.trino.testing.sql.TestTable.randomTableSuffix;
import static java.lang.String.format;
import static java.util.Collections.nCopies;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -384,10 +385,41 @@ public void testPredicateWithVarcharCastToDate()
assertUpdate("DROP TABLE test_varchar_as_date_predicate");
}

@Test
public void testRemoveOrphanFiles()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you planning on adding a test for snapshot expiration as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can, probably in a separate PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this might be covered by @homar 's #14434

{
String tableName = "test_remove_orphan_files_" + randomTableSuffix();
Session sessionWithShortRetentionUnlocked = Session.builder(getSession())
.setCatalogSessionProperty("iceberg", "remove_orphan_files_min_retention", "0s")
.build();
assertUpdate("CREATE TABLE " + tableName + " (key varchar, value integer)");
alexjo2144 marked this conversation as resolved.
Show resolved Hide resolved
assertUpdate("INSERT INTO " + tableName + " VALUES ('one', 1)", 1);
assertUpdate("INSERT INTO " + tableName + " VALUES ('two', 2), ('three', 3)", 2);
assertUpdate("DELETE FROM " + tableName + " WHERE key = 'two'", 1);

assertFileSystemAccesses(
sessionWithShortRetentionUnlocked,
"ALTER TABLE " + tableName + " EXECUTE REMOVE_ORPHAN_FILES (retention_threshold => '0s')",
ImmutableMultiset.builder()
.add(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM))
.addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH), 4)
.addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM), 4)
.addCopies(new FileOperation(MANIFEST, INPUT_FILE_GET_LENGTH), 6)
.addCopies(new FileOperation(MANIFEST, INPUT_FILE_NEW_STREAM), 6)
Comment on lines +407 to +408
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we're still reading each manifest twice, I'll take a look at that tomorrow.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that first every manifest is read here:

for (ManifestFile manifest : snapshot.allManifests(table.io())) 

and then again here:

ManifestFiles.read(manifest, table.io());
ManifestFiles.readDeleteManifest(manifest, table.io(), table.specs());

but this is still huge improvement as previously these were the numbers:

.addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH), 4)
.addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM), 4)
.addCopies(new FileOperation(MANIFEST, INPUT_FILE_GET_LENGTH), 24)
.addCopies(new FileOperation(MANIFEST, INPUT_FILE_NEW_STREAM), 24)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it's part of how the ManifestReader works. The factory ManifestFiles.read opens the file once to read the metadata, and then calling iterator opens the file again. I don't think there's much we can do about that

.build());

assertUpdate("DROP TABLE " + tableName);
}

private void assertFileSystemAccesses(@Language("SQL") String query, Multiset<Object> expectedAccesses)
{
assertFileSystemAccesses(TEST_SESSION, query, expectedAccesses);
}

private void assertFileSystemAccesses(Session session, @Language("SQL") String query, Multiset<Object> expectedAccesses)
{
resetCounts();
getDistributedQueryRunner().executeWithQueryId(TEST_SESSION, query);
getDistributedQueryRunner().executeWithQueryId(session, query);
assertThat(ImmutableMultiset.<Object>copyOf(getOperations())).containsExactlyInAnyOrderElementsOf(expectedAccesses);
}

Expand Down