Skip to content

Commit

Permalink
Fix Iceberg MV failure after table roll back
Browse files Browse the repository at this point in the history
  • Loading branch information
findepi committed Jul 12, 2023
1 parent 9845e7a commit 0bbc949
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,11 @@ public static Optional<Snapshot> firstSnapshotAfter(Table table, long baseSnapsh
checkArgument(current.snapshotId() != baseSnapshotId, "No snapshot after %s in %s, current snapshot is %s", baseSnapshotId, table, current);

while (true) {
checkArgument(current.parentId() != null, "Snapshot id %s is not valid in table %s, snapshot %s has no parent", baseSnapshotId, table, current);
if (current.parentId() == null) {
// Current is the first snapshot in the table, which means we reached end of table history not finding baseSnapshotId. This is possible
// when table was rolled back and baseSnapshotId is no longer referenced.
return Optional.empty();
}
if (current.parentId() == baseSnapshotId) {
return Optional.of(current);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,37 @@ public void testMaterializedViewOnExpiredTable()
assertUpdate("DROP MATERIALIZED VIEW mv_on_expired_the_mv");
}

@Test
public void testMaterializedViewOnTableRolledBack()
{
assertUpdate("CREATE TABLE mv_on_rolled_back_base_table(a integer)");
assertUpdate("""
CREATE MATERIALIZED VIEW mv_on_rolled_back_the_mv
GRACE PERIOD INTERVAL '0' SECOND
AS SELECT sum(a) s FROM mv_on_rolled_back_base_table""");

// Create some snapshots
assertUpdate("INSERT INTO mv_on_rolled_back_base_table VALUES 4", 1);
long firstSnapshot = getLatestSnapshotId("mv_on_rolled_back_base_table");
assertUpdate("INSERT INTO mv_on_rolled_back_base_table VALUES 8", 1);

// Base MV on a snapshot "in the future"
assertUpdate("REFRESH MATERIALIZED VIEW mv_on_rolled_back_the_mv", 1);
assertUpdate(format("CALL system.rollback_to_snapshot(CURRENT_SCHEMA, 'mv_on_rolled_back_base_table', %s)", firstSnapshot));

// View still can be queried
assertThat(query("TABLE mv_on_rolled_back_the_mv"))
.matches("VALUES BIGINT '4'");

// View can also be refreshed
assertUpdate("REFRESH MATERIALIZED VIEW mv_on_rolled_back_the_mv", 1);
assertThat(query("TABLE mv_on_rolled_back_the_mv"))
.matches("VALUES BIGINT '4'");

assertUpdate("DROP TABLE mv_on_rolled_back_base_table");
assertUpdate("DROP MATERIALIZED VIEW mv_on_rolled_back_the_mv");
}

@Test
public void testSqlFeatures()
{
Expand Down Expand Up @@ -763,4 +794,9 @@ private SchemaTableName getStorageTable(String catalogName, String schemaName, S
assertThat(materializedView).isPresent();
return materializedView.get().getStorageTable().get().getSchemaTableName();
}

private long getLatestSnapshotId(String tableName)
{
return (long) computeScalar(format("SELECT snapshot_id FROM \"%s$snapshots\" ORDER BY committed_at DESC FETCH FIRST 1 ROW WITH TIES", tableName));
}
}

0 comments on commit 0bbc949

Please sign in to comment.