Skip to content

Commit

Permalink
Fix past table versions listing
Browse files Browse the repository at this point in the history
When listing past table versions, correctly break
iteration once we get to a file which no longer exists,
e.g. removed by an external system.

Co-authored-by: Marius Grama <[email protected]>
  • Loading branch information
2 people authored and ebyhr committed Jul 25, 2024
1 parent 934709d commit 26e5bc4
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ public List<Long> getPastTableVersions(TrinoFileSystem fileSystem, String transa
}
catch (FileNotFoundException e) {
// no longer exists, break iteration
return null;
break;
}
catch (IOException e) {
throw new UncheckedIOException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.google.common.io.Resources;
import io.airlift.units.DataSize;
import io.trino.Session;
Expand Down Expand Up @@ -3194,6 +3195,50 @@ private void testReadChangesFromCtasTable(ColumnMappingMode mode)
""");
}

@Test
public void testVacuumTableUsingVersionDeletedCheckpoints()
throws Exception
{
Session sessionWithShortRetentionUnlocked = Session.builder(getSession())
.setCatalogSessionProperty(getSession().getCatalog().orElseThrow(), "vacuum_min_retention", "0s")
.build();

String tableName = "test_vacuum_deleted_version_" + randomNameSuffix();
String tableLocation = "s3://%s/%s/%s".formatted(bucketName, SCHEMA, tableName);
String deltaLog = "%s/%s/_delta_log".formatted(SCHEMA, tableName);

assertUpdate("CREATE TABLE " + tableName + " WITH (location = '" + tableLocation + "', checkpoint_interval = 1) AS SELECT 1 id", 1);
Set<String> initialFiles = getActiveFiles(tableName);

assertUpdate("INSERT INTO " + tableName + " VALUES 2", 1);
assertUpdate("UPDATE " + tableName + " SET id = 3 WHERE id = 1", 1);
Stopwatch timeSinceUpdate = Stopwatch.createStarted();

// Remove 0 and 1 versions
assertThat(minioClient.listObjects(bucketName, deltaLog)).hasSize(7);
minioClient.removeObject(bucketName, deltaLog + "/00000000000000000000.json");
minioClient.removeObject(bucketName, deltaLog + "/00000000000000000001.json");
minioClient.removeObject(bucketName, deltaLog + "/00000000000000000001.checkpoint.parquet");
assertThat(minioClient.listObjects(bucketName, deltaLog)).hasSize(4);

assertQuery("SELECT * FROM " + tableName, "VALUES 2, 3");
Set<String> updatedFiles = getActiveFiles(tableName);

assertUpdate("CALL system.vacuum(schema_name => CURRENT_SCHEMA, table_name => '" + tableName + "', retention => '7d')");

// Verify VACUUM disregards updated file because it still fits during the retention time
assertThat(getAllDataFilesFromTableDirectory(tableName)).isEqualTo(Sets.union(initialFiles, updatedFiles));
assertQuery("SELECT * FROM " + tableName, "VALUES 2, 3");

MILLISECONDS.sleep(1_000 - timeSinceUpdate.elapsed(MILLISECONDS) + 1);
assertUpdate(sessionWithShortRetentionUnlocked, "CALL system.vacuum(schema_name => CURRENT_SCHEMA, table_name => '" + tableName + "', retention => '1s')");

assertThat(getAllDataFilesFromTableDirectory(tableName)).isEqualTo(updatedFiles);
assertQuery("SELECT * FROM " + tableName, "VALUES 2, 3");

assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testVacuumDeletesCdfFiles()
throws InterruptedException
Expand Down

0 comments on commit 26e5bc4

Please sign in to comment.