From 44bb855704a810b1f60c0ddd85665b1e92ffbbbc Mon Sep 17 00:00:00 2001 From: Star Poon Date: Thu, 18 Jan 2024 19:59:25 +0900 Subject: [PATCH] Add metadata_log_entries table to Iceberg --- docs/src/main/sphinx/connector/iceberg.md | 46 ++++++++++ .../trino/plugin/iceberg/IcebergMetadata.java | 1 + .../iceberg/MetadataLogEntriesTable.java | 83 +++++++++++++++++++ .../io/trino/plugin/iceberg/TableType.java | 1 + .../iceberg/BaseIcebergConnectorTest.java | 15 +++- .../iceberg/BaseIcebergSystemTables.java | 79 ++++++++++++++++++ .../TestIcebergMetastoreAccessOperations.java | 9 +- ...estIcebergGlueCatalogAccessOperations.java | 9 +- 8 files changed, 240 insertions(+), 3 deletions(-) create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/MetadataLogEntriesTable.java diff --git a/docs/src/main/sphinx/connector/iceberg.md b/docs/src/main/sphinx/connector/iceberg.md index f73405093b2b..6b284fc35163 100644 --- a/docs/src/main/sphinx/connector/iceberg.md +++ b/docs/src/main/sphinx/connector/iceberg.md @@ -825,6 +825,52 @@ The output of the query has the following columns: - Whether or not this snapshot is an ancestor of the current snapshot. ::: +##### `$metadata_log_entries` table + +The `$metadata_log_entries` table provides a view of metadata log entries +of the Iceberg table. + +You can retrieve the information about the metadata log entries of the Iceberg +table `test_table` by using the following query: + +``` +SELECT * FROM "test_table$metadata_log_entries" +``` + +```text + timestamp | file | latest_snapshot_id | latest_schema_id | latest_sequence_number +---------------------------------------+----------------------------------------------------------------------------------------------------------------------------+---------------------+------------------+------------------------ + 2024-01-16 15:55:31.172 Europe/Vienna | hdfs://hadoop-master:9000/user/hive/warehouse/test_table/metadata/00000-39174715-be2a-48fa-9949-35413b8b736e.metadata.json | 1221802298419195590 | 0 | 1 + 2024-01-16 17:19:56.118 Europe/Vienna | hdfs://hadoop-master:9000/user/hive/warehouse/test_table/metadata/00001-e40178c9-271f-4a96-ad29-eed5e7aef9b0.metadata.json | 7124386610209126943 | 0 | 2 +``` + +The output of the query has the following columns: + +:::{list-table} Metadata log entries columns +:widths: 30, 30, 40 +:header-rows: 1 + +* - Name + - Type + - Description +* - `timestamp` + - `TIMESTAMP(3) WITH TIME ZONE` + - The time when the metadata was created. +* - `file` + - `VARCHAR` + - The location of the metadata file. +* - `latest_snapshot_id` + - `BIGINT` + - The identifier of the latest snapshot when the metadata was updated. +* - `latest_schema_id` + - `INTEGER` + - The identifier of the latest schema when the metadata was updated. +* - `latest_sequence_number` + - `BIGINT` + - The data sequence number of the metadata file. +::: + + ##### `$snapshots` table The `$snapshots` table provides a detailed view of snapshots of the Iceberg diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 8f45318eac41..b36b36b0f5f3 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -574,6 +574,7 @@ private Optional getRawSystemTable(ConnectorSession session, Schema return switch (tableType) { case DATA, MATERIALIZED_VIEW_STORAGE -> throw new VerifyException("Unexpected table type: " + tableType); // Handled above. case HISTORY -> Optional.of(new HistoryTable(tableName, table)); + case METADATA_LOG_ENTRIES -> Optional.of(new MetadataLogEntriesTable(tableName, table)); case SNAPSHOTS -> Optional.of(new SnapshotsTable(tableName, typeManager, table)); case PARTITIONS -> Optional.of(new PartitionTable(tableName, typeManager, table, getCurrentSnapshotId(table))); case MANIFESTS -> Optional.of(new ManifestsTable(tableName, table, getCurrentSnapshotId(table))); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/MetadataLogEntriesTable.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/MetadataLogEntriesTable.java new file mode 100644 index 000000000000..a7be41d9cd3e --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/MetadataLogEntriesTable.java @@ -0,0 +1,83 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.google.common.collect.ImmutableList; +import io.trino.plugin.iceberg.util.PageListBuilder; +import io.trino.spi.connector.ColumnMetadata; +import io.trino.spi.connector.ConnectorPageSource; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableMetadata; +import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.connector.FixedPageSource; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.predicate.TupleDomain; +import io.trino.spi.type.TimeZoneKey; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; + +import java.util.Map; + +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.spi.type.IntegerType.INTEGER; +import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS; +import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND; +import static io.trino.spi.type.VarcharType.VARCHAR; +import static java.util.Objects.requireNonNull; +import static org.apache.iceberg.MetadataTableType.METADATA_LOG_ENTRIES; + +public class MetadataLogEntriesTable + extends BaseSystemTable +{ + private static final String TIMESTAMP_COLUMN_NAME = "timestamp"; + private static final String FILE_COLUMN_NAME = "file"; + private static final String LATEST_SNAPSHOT_ID_COLUMN_NAME = "latest_snapshot_id"; + private static final String LATEST_SCHEMA_ID_COLUMN_NAME = "latest_schema_id"; + private static final String LATEST_SEQUENCE_NUMBER_COLUMN_NAME = "latest_sequence_number"; + + public MetadataLogEntriesTable(SchemaTableName tableName, Table icebergTable) + { + this.icebergTable = requireNonNull(icebergTable, "icebergTable is null"); + + tableMetadata = new ConnectorTableMetadata(requireNonNull(tableName, "tableName is null"), + ImmutableList.builder() + .add(new ColumnMetadata(TIMESTAMP_COLUMN_NAME, TIMESTAMP_TZ_MILLIS)) + .add(new ColumnMetadata(FILE_COLUMN_NAME, VARCHAR)) + .add(new ColumnMetadata(LATEST_SNAPSHOT_ID_COLUMN_NAME, BIGINT)) + .add(new ColumnMetadata(LATEST_SCHEMA_ID_COLUMN_NAME, INTEGER)) + .add(new ColumnMetadata(LATEST_SEQUENCE_NUMBER_COLUMN_NAME, BIGINT)) + .build()); + } + + @Override + public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain constraint) + { + return new FixedPageSource(buildPages(tableMetadata, session, icebergTable, METADATA_LOG_ENTRIES)); + } + + @Override + protected void addRow(PageListBuilder pagesBuilder, StructLike structLike, TimeZoneKey timeZoneKey, Map columnNameToPositionInSchema) + { + pagesBuilder.beginRow(); + + pagesBuilder.appendTimestampTzMillis( + structLike.get(columnNameToPositionInSchema.get(TIMESTAMP_COLUMN_NAME), Long.class) / MICROSECONDS_PER_MILLISECOND, + timeZoneKey); + pagesBuilder.appendVarchar(structLike.get(columnNameToPositionInSchema.get(FILE_COLUMN_NAME), String.class)); + pagesBuilder.appendBigint(structLike.get(columnNameToPositionInSchema.get(LATEST_SNAPSHOT_ID_COLUMN_NAME), Long.class)); + pagesBuilder.appendInteger(structLike.get(columnNameToPositionInSchema.get(LATEST_SCHEMA_ID_COLUMN_NAME), Integer.class)); + pagesBuilder.appendBigint(structLike.get(columnNameToPositionInSchema.get(LATEST_SEQUENCE_NUMBER_COLUMN_NAME), Long.class)); + pagesBuilder.endRow(); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableType.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableType.java index cea961b4d51a..a878f9dada65 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableType.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableType.java @@ -17,6 +17,7 @@ public enum TableType { DATA, HISTORY, + METADATA_LOG_ENTRIES, SNAPSHOTS, MANIFESTS, PARTITIONS, diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 612b010df10b..d19736b6cf7a 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -6308,18 +6308,23 @@ public void testReadFromVersionedTableWithExpiredHistory() } @Test - public void testDeleteRetainsTableHistory() + public void testDeleteRetainsTableHistoryAndMetadataFile() { String tableName = "test_delete_retains_table_history_" + randomNameSuffix(); assertUpdate("CREATE TABLE " + tableName + "(c1 INT, c2 INT)"); assertUpdate("INSERT INTO " + tableName + " VALUES (1, 1), (2, 2), (3, 3)", 3); assertUpdate("INSERT INTO " + tableName + " VALUES (3, 3), (4, 4), (5, 5)", 3); List snapshots = getTableHistory(tableName); + List metadataLogEntries = getTableMetadataLogEntries(tableName); assertUpdate("DELETE FROM " + tableName + " WHERE c1 < 4", 4); List snapshotsAfterDelete = getTableHistory(tableName); assertThat(snapshotsAfterDelete.size()).isGreaterThan(snapshots.size()); assertThat(snapshotsAfterDelete).containsAll(snapshots); + + List metadataLogEntriesAfterDelete = getTableMetadataLogEntries(tableName); + assertThat(metadataLogEntriesAfterDelete.size()).isGreaterThan(metadataLogEntries.size()); + assertThat(metadataLogEntriesAfterDelete).containsAll(metadataLogEntries); assertUpdate("DROP TABLE " + tableName); } @@ -7859,6 +7864,14 @@ private List getTableHistory(String tableName) .collect(toImmutableList()); } + private List getTableMetadataLogEntries(String tableName) + { + return getQueryRunner().execute(format("SELECT latest_sequence_number FROM \"%s$metadata_log_entries\"", tableName)) + .getOnlyColumn() + .map(Long.class::cast) + .collect(toImmutableList()); + } + private long getCurrentSnapshotId(String tableName) { return (long) computeScalar("SELECT snapshot_id FROM \"" + tableName + "$snapshots\" ORDER BY committed_at DESC FETCH FIRST 1 ROW WITH TIES"); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java index f120f942caf0..237ceb2e292c 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java @@ -24,6 +24,8 @@ import org.junit.jupiter.api.TestInstance; import java.time.LocalDate; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.function.Function; @@ -197,6 +199,83 @@ public void testHistoryTable() assertQuery("SELECT count(*) FROM test_schema.\"test_table$history\"", "VALUES 3"); } + @Test + public void testMetadataLogEntriesTable() + { + assertQuery("SHOW COLUMNS FROM test_schema.\"test_table$metadata_log_entries\"", + "VALUES ('timestamp', 'timestamp(3) with time zone', '', '')," + + "('file', 'varchar', '', '')," + + "('latest_snapshot_id', 'bigint', '', '')," + + "('latest_schema_id', 'integer', '', '')," + + "('latest_sequence_number', 'bigint', '', '')"); + + List latestSchemaId = new ArrayList<>(); + List latestSequenceNumber = new ArrayList<>(); + // Update this test after Iceberg fixed metadata_log_entries after RTAS (https://github.com/apache/iceberg/issues/9723) + assertUpdate("CREATE TABLE test_schema.test_metadata_log_entries (c1 BIGINT)"); + latestSchemaId.add(0); + latestSequenceNumber.add(1L); + testMetadataLogEntriesHelper(latestSchemaId, latestSequenceNumber); + + assertUpdate("INSERT INTO test_schema.test_metadata_log_entries VALUES (1)", 1); + // INSERT create two commits (https://github.com/trinodb/trino/issues/15439) and share a same snapshotId + latestSchemaId.add(0); + latestSchemaId.add(0); + latestSequenceNumber.add(2L); + latestSequenceNumber.add(2L); + testMetadataLogEntriesHelper(latestSchemaId, latestSequenceNumber); + + assertUpdate("ALTER TABLE test_schema.test_metadata_log_entries ADD COLUMN c2 VARCHAR"); + latestSchemaId.add(0); + latestSequenceNumber.add(2L); + testMetadataLogEntriesHelper(latestSchemaId, latestSequenceNumber); + + assertUpdate("DELETE FROM test_schema.test_metadata_log_entries WHERE c1 = 1", 1); + latestSchemaId.add(1); + latestSequenceNumber.add(3L); + testMetadataLogEntriesHelper(latestSchemaId, latestSequenceNumber); + + assertUpdate("ALTER TABLE test_schema.test_metadata_log_entries execute optimize"); + latestSchemaId.add(1); + latestSequenceNumber.add(4L); + testMetadataLogEntriesHelper(latestSchemaId, latestSequenceNumber); + + assertUpdate("CREATE OR REPLACE TABLE test_schema.test_metadata_log_entries (c3 INTEGER)"); + // Iceberg unintentionally deleted the history entries after RTAS + // Update this test after Iceberg release the fix (https://github.com/apache/iceberg/issues/9723) + latestSchemaId = new ArrayList<>(); + latestSequenceNumber = new ArrayList<>(); + for (int i = 0; i < 6; i++) { + latestSchemaId.add(null); + latestSequenceNumber.add(null); + } + + latestSchemaId.add(2); + latestSequenceNumber.add(5L); + testMetadataLogEntriesHelper(latestSchemaId, latestSequenceNumber); + + assertUpdate("INSERT INTO test_schema.test_metadata_log_entries VALUES (1)", 1); + latestSchemaId.add(2); + latestSequenceNumber.add(6L); + latestSchemaId.add(2); + latestSequenceNumber.add(6L); + testMetadataLogEntriesHelper(latestSchemaId, latestSequenceNumber); + + assertUpdate("DROP TABLE IF EXISTS test_schema.test_metadata_log_entries"); + } + + private void testMetadataLogEntriesHelper(List latestSchemaId, List latestSequenceNumber) + { + MaterializedResult result = computeActual("SELECT * FROM test_schema.\"test_metadata_log_entries$metadata_log_entries\" ORDER BY timestamp"); + List materializedRows = result.getMaterializedRows(); + + assertThat(result.getRowCount()).isEqualTo(latestSchemaId.size()); + for (int i = 0; i < result.getRowCount(); i++) { + assertThat(materializedRows.get(i).getField(3)).isEqualTo(latestSchemaId.get(i)); + assertThat(materializedRows.get(i).getField(4)).isEqualTo(latestSequenceNumber.get(i)); + } + } + @Test public void testSnapshotsTable() { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java index 4466cd1ddff0..c0dc4501be57 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java @@ -42,6 +42,7 @@ import static io.trino.plugin.iceberg.TableType.HISTORY; import static io.trino.plugin.iceberg.TableType.MANIFESTS; import static io.trino.plugin.iceberg.TableType.MATERIALIZED_VIEW_STORAGE; +import static io.trino.plugin.iceberg.TableType.METADATA_LOG_ENTRIES; import static io.trino.plugin.iceberg.TableType.PARTITIONS; import static io.trino.plugin.iceberg.TableType.PROPERTIES; import static io.trino.plugin.iceberg.TableType.REFS; @@ -301,6 +302,12 @@ public void testSelectSystemTable() .addCopies(GET_TABLE, 1) .build()); + // select from $metadata_log_entries + assertMetastoreInvocations("SELECT * FROM \"test_select_snapshots$metadata_log_entries\"", + ImmutableMultiset.builder() + .addCopies(GET_TABLE, 1) + .build()); + // select from $snapshots assertMetastoreInvocations("SELECT * FROM \"test_select_snapshots$snapshots\"", ImmutableMultiset.builder() @@ -336,7 +343,7 @@ public void testSelectSystemTable() // This test should get updated if a new system table is added. assertThat(TableType.values()) - .containsExactly(DATA, HISTORY, SNAPSHOTS, MANIFESTS, PARTITIONS, FILES, PROPERTIES, REFS, MATERIALIZED_VIEW_STORAGE); + .containsExactly(DATA, HISTORY, METADATA_LOG_ENTRIES, SNAPSHOTS, MANIFESTS, PARTITIONS, FILES, PROPERTIES, REFS, MATERIALIZED_VIEW_STORAGE); } @Test diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java index 9b3275d12393..7af4da1ac840 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java @@ -50,6 +50,7 @@ import static io.trino.plugin.iceberg.TableType.HISTORY; import static io.trino.plugin.iceberg.TableType.MANIFESTS; import static io.trino.plugin.iceberg.TableType.MATERIALIZED_VIEW_STORAGE; +import static io.trino.plugin.iceberg.TableType.METADATA_LOG_ENTRIES; import static io.trino.plugin.iceberg.TableType.PARTITIONS; import static io.trino.plugin.iceberg.TableType.PROPERTIES; import static io.trino.plugin.iceberg.TableType.REFS; @@ -447,6 +448,12 @@ public void testSelectSystemTable() .addCopies(GET_TABLE, 1) .build()); + // select from $metadata_log_entries + assertGlueMetastoreApiInvocations("SELECT * FROM \"test_select_snapshots$metadata_log_entries\"", + ImmutableMultiset.builder() + .addCopies(GET_TABLE, 1) + .build()); + // select from $snapshots assertGlueMetastoreApiInvocations("SELECT * FROM \"test_select_snapshots$snapshots\"", ImmutableMultiset.builder() @@ -488,7 +495,7 @@ public void testSelectSystemTable() // This test should get updated if a new system table is added. assertThat(TableType.values()) - .containsExactly(DATA, HISTORY, SNAPSHOTS, MANIFESTS, PARTITIONS, FILES, PROPERTIES, REFS, MATERIALIZED_VIEW_STORAGE); + .containsExactly(DATA, HISTORY, METADATA_LOG_ENTRIES, SNAPSHOTS, MANIFESTS, PARTITIONS, FILES, PROPERTIES, REFS, MATERIALIZED_VIEW_STORAGE); } finally { getQueryRunner().execute("DROP TABLE IF EXISTS test_select_snapshots");