Skip to content

Commit

Permalink
fixup! Add metadata_log_entries table to Iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
oneonestar committed Mar 12, 2024
1 parent 2fafea4 commit d7547f1
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,10 @@
import com.google.common.collect.ImmutableList;
import io.trino.plugin.iceberg.util.PageListBuilder;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.FixedPageSource;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.TimeZoneKey;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;

Expand All @@ -48,9 +44,10 @@ public class MetadataLogEntriesTable

public MetadataLogEntriesTable(SchemaTableName tableName, Table icebergTable)
{
this.icebergTable = requireNonNull(icebergTable, "icebergTable is null");

tableMetadata = new ConnectorTableMetadata(requireNonNull(tableName, "tableName is null"),
super(icebergTable);
requireNonNull(tableName, "tableName is null");
tableMetadata = new ConnectorTableMetadata(
tableName,
ImmutableList.<ColumnMetadata>builder()
.add(new ColumnMetadata(TIMESTAMP_COLUMN_NAME, TIMESTAMP_TZ_MILLIS))
.add(new ColumnMetadata(FILE_COLUMN_NAME, VARCHAR))
Expand All @@ -61,9 +58,9 @@ public MetadataLogEntriesTable(SchemaTableName tableName, Table icebergTable)
}

@Override
public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
protected MetadataTableType getMetadataTableType()
{
return new FixedPageSource(buildPages(tableMetadata, session, icebergTable, METADATA_LOG_ENTRIES));
return METADATA_LOG_ENTRIES;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6308,23 +6308,33 @@ public void testReadFromVersionedTableWithExpiredHistory()
}

@Test
public void testDeleteRetainsTableHistoryAndMetadataFile()
public void testDeleteRetainsTableHistory()
{
String tableName = "test_delete_retains_table_history_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + "(c1 INT, c2 INT)");
assertUpdate("INSERT INTO " + tableName + " VALUES (1, 1), (2, 2), (3, 3)", 3);
assertUpdate("INSERT INTO " + tableName + " VALUES (3, 3), (4, 4), (5, 5)", 3);
List<Long> snapshots = getTableHistory(tableName);
List<Long> metadataLogEntries = getTableMetadataLogEntries(tableName);

assertUpdate("DELETE FROM " + tableName + " WHERE c1 < 4", 4);
List<Long> snapshotsAfterDelete = getTableHistory(tableName);
assertThat(snapshotsAfterDelete.size()).isGreaterThan(snapshots.size());
assertThat(snapshotsAfterDelete).containsAll(snapshots);
}

@Test
public void testDeleteRetainsMetadataFile()
{
String tableName = "test_delete_retains_metadata_file_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + "(c1 INT, c2 INT)");
assertUpdate("INSERT INTO " + tableName + " VALUES (1, 1), (2, 2), (3, 3)", 3);
assertUpdate("INSERT INTO " + tableName + " VALUES (3, 3), (4, 4), (5, 5)", 3);
List<Long> metadataLogEntries = getLatestSequenceNumbersInMetadataLogEntries(tableName);

List<Long> metadataLogEntriesAfterDelete = getTableMetadataLogEntries(tableName);
assertThat(metadataLogEntriesAfterDelete.size()).isGreaterThan(metadataLogEntries.size());
assertThat(metadataLogEntriesAfterDelete).containsAll(metadataLogEntries);
List<Long> metadataLogEntriesAfterDelete = getLatestSequenceNumbersInMetadataLogEntries(tableName);
assertThat(metadataLogEntriesAfterDelete)
.hasSizeGreaterThan(metadataLogEntries.size())
.containsAll(metadataLogEntries);
assertUpdate("DROP TABLE " + tableName);
}

Expand Down Expand Up @@ -7864,7 +7874,7 @@ private List<Long> getTableHistory(String tableName)
.collect(toImmutableList());
}

private List<Long> getTableMetadataLogEntries(String tableName)
private List<Long> getLatestSequenceNumbersInMetadataLogEntries(String tableName)
{
return getQueryRunner().execute(format("SELECT latest_sequence_number FROM \"%s$metadata_log_entries\"", tableName))
.getOnlyColumn()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,70 +209,70 @@ public void testMetadataLogEntriesTable()
"('latest_schema_id', 'integer', '', '')," +
"('latest_sequence_number', 'bigint', '', '')");

List<Integer> latestSchemaId = new ArrayList<>();
List<Long> latestSequenceNumber = new ArrayList<>();
// Update this test after Iceberg fixed metadata_log_entries after RTAS (https://github.com/apache/iceberg/issues/9723)
List<Integer> latestSchemaIds = new ArrayList<>();
List<Long> latestSequenceNumbers = new ArrayList<>();

assertUpdate("CREATE TABLE test_schema.test_metadata_log_entries (c1 BIGINT)");
latestSchemaId.add(0);
latestSequenceNumber.add(1L);
testMetadataLogEntriesHelper(latestSchemaId, latestSequenceNumber);
latestSchemaIds.add(0);
latestSequenceNumbers.add(1L);
assertMetadataLogEntries(latestSchemaIds, latestSequenceNumbers);

assertUpdate("INSERT INTO test_schema.test_metadata_log_entries VALUES (1)", 1);
// INSERT create two commits (https://github.com/trinodb/trino/issues/15439) and share a same snapshotId
latestSchemaId.add(0);
latestSchemaId.add(0);
latestSequenceNumber.add(2L);
latestSequenceNumber.add(2L);
testMetadataLogEntriesHelper(latestSchemaId, latestSequenceNumber);
latestSchemaIds.add(0);
latestSchemaIds.add(0);
latestSequenceNumbers.add(2L);
latestSequenceNumbers.add(2L);
assertMetadataLogEntries(latestSchemaIds, latestSequenceNumbers);

assertUpdate("ALTER TABLE test_schema.test_metadata_log_entries ADD COLUMN c2 VARCHAR");
latestSchemaId.add(0);
latestSequenceNumber.add(2L);
testMetadataLogEntriesHelper(latestSchemaId, latestSequenceNumber);
latestSchemaIds.add(0);
latestSequenceNumbers.add(2L);
assertMetadataLogEntries(latestSchemaIds, latestSequenceNumbers);

assertUpdate("DELETE FROM test_schema.test_metadata_log_entries WHERE c1 = 1", 1);
latestSchemaId.add(1);
latestSequenceNumber.add(3L);
testMetadataLogEntriesHelper(latestSchemaId, latestSequenceNumber);
latestSchemaIds.add(1);
latestSequenceNumbers.add(3L);
assertMetadataLogEntries(latestSchemaIds, latestSequenceNumbers);

assertUpdate("ALTER TABLE test_schema.test_metadata_log_entries execute optimize");
latestSchemaId.add(1);
latestSequenceNumber.add(4L);
testMetadataLogEntriesHelper(latestSchemaId, latestSequenceNumber);
latestSchemaIds.add(1);
latestSequenceNumbers.add(4L);
assertMetadataLogEntries(latestSchemaIds, latestSequenceNumbers);

assertUpdate("CREATE OR REPLACE TABLE test_schema.test_metadata_log_entries (c3 INTEGER)");
// Iceberg unintentionally deleted the history entries after RTAS
// Update this test after Iceberg release the fix (https://github.com/apache/iceberg/issues/9723)
latestSchemaId = new ArrayList<>();
latestSequenceNumber = new ArrayList<>();
latestSchemaIds = new ArrayList<>();
latestSequenceNumbers = new ArrayList<>();
for (int i = 0; i < 6; i++) {
latestSchemaId.add(null);
latestSequenceNumber.add(null);
latestSchemaIds.add(null);
latestSequenceNumbers.add(null);
}

latestSchemaId.add(2);
latestSequenceNumber.add(5L);
testMetadataLogEntriesHelper(latestSchemaId, latestSequenceNumber);
latestSchemaIds.add(2);
latestSequenceNumbers.add(5L);
assertMetadataLogEntries(latestSchemaIds, latestSequenceNumbers);

assertUpdate("INSERT INTO test_schema.test_metadata_log_entries VALUES (1)", 1);
latestSchemaId.add(2);
latestSequenceNumber.add(6L);
latestSchemaId.add(2);
latestSequenceNumber.add(6L);
testMetadataLogEntriesHelper(latestSchemaId, latestSequenceNumber);
latestSchemaIds.add(2);
latestSequenceNumbers.add(6L);
latestSchemaIds.add(2);
latestSequenceNumbers.add(6L);
assertMetadataLogEntries(latestSchemaIds, latestSequenceNumbers);

assertUpdate("DROP TABLE IF EXISTS test_schema.test_metadata_log_entries");
}

private void testMetadataLogEntriesHelper(List<Integer> latestSchemaId, List<Long> latestSequenceNumber)
private void assertMetadataLogEntries(List<Integer> latestSchemaIds, List<Long> latestSequenceNumbers)
{
MaterializedResult result = computeActual("SELECT * FROM test_schema.\"test_metadata_log_entries$metadata_log_entries\" ORDER BY timestamp");
MaterializedResult result = computeActual("SELECT latest_schema_id, latest_sequence_number FROM test_schema.\"test_metadata_log_entries$metadata_log_entries\" ORDER BY timestamp");
List<MaterializedRow> materializedRows = result.getMaterializedRows();

assertThat(result.getRowCount()).isEqualTo(latestSchemaId.size());
assertThat(result.getRowCount()).isEqualTo(latestSchemaIds.size());
for (int i = 0; i < result.getRowCount(); i++) {
assertThat(materializedRows.get(i).getField(3)).isEqualTo(latestSchemaId.get(i));
assertThat(materializedRows.get(i).getField(4)).isEqualTo(latestSequenceNumber.get(i));
assertThat(materializedRows.get(i).getField(0)).isEqualTo(latestSchemaIds.get(i));
assertThat(materializedRows.get(i).getField(1)).isEqualTo(latestSequenceNumbers.get(i));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ public void testSelectSystemTable()
// select from $metadata_log_entries
assertMetastoreInvocations("SELECT * FROM \"test_select_snapshots$metadata_log_entries\"",
ImmutableMultiset.<MetastoreMethod>builder()
.addCopies(GET_TABLE, 1)
.add(GET_TABLE)
.build());

// select from $snapshots
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ public void testSelectSystemTable()
// select from $metadata_log_entries
assertGlueMetastoreApiInvocations("SELECT * FROM \"test_select_snapshots$metadata_log_entries\"",
ImmutableMultiset.builder()
.addCopies(GET_TABLE, 1)
.add(GET_TABLE)
.build());

// select from $snapshots
Expand Down

0 comments on commit d7547f1

Please sign in to comment.