From ad40089c58b5c9fbd6b1c7d7073efcf352a1edea Mon Sep 17 00:00:00 2001 From: Star Poon Date: Wed, 21 Feb 2024 22:01:14 +0900 Subject: [PATCH 1/2] Extract base class from SnapshotsTable --- .../trino/plugin/iceberg/BaseSystemTable.java | 109 ++++++++++++++++++ .../io/trino/plugin/iceberg/IcebergUtil.java | 17 --- .../trino/plugin/iceberg/SnapshotsTable.java | 82 +++---------- 3 files changed, 122 insertions(+), 86 deletions(-) create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/BaseSystemTable.java diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/BaseSystemTable.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/BaseSystemTable.java new file mode 100644 index 000000000000..871d62f8f641 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/BaseSystemTable.java @@ -0,0 +1,109 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import io.trino.plugin.iceberg.util.PageListBuilder; +import io.trino.spi.Page; +import io.trino.spi.connector.ConnectorPageSource; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableMetadata; +import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.connector.FixedPageSource; +import io.trino.spi.connector.SystemTable; +import io.trino.spi.predicate.TupleDomain; +import io.trino.spi.type.TimeZoneKey; +import org.apache.iceberg.DataTask; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.io.CloseableIterable; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.Map; + +import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static com.google.common.collect.Maps.immutableEntry; +import static com.google.common.collect.Streams.mapWithIndex; +import static java.util.Objects.requireNonNull; +import static org.apache.iceberg.MetadataTableUtils.createMetadataTableInstance; + +public abstract class BaseSystemTable + implements SystemTable +{ + private final Table icebergTable; + private final ConnectorTableMetadata tableMetadata; + private final MetadataTableType metadataTableType; + + BaseSystemTable(Table icebergTable, ConnectorTableMetadata tableMetadata, MetadataTableType metadataTableType) + { + this.icebergTable = requireNonNull(icebergTable, "icebergTable is null"); + this.tableMetadata = requireNonNull(tableMetadata, "tableMetadata is null"); + this.metadataTableType = requireNonNull(metadataTableType, "metadataTableType is null"); + } + + @Override + public Distribution getDistribution() + { + return Distribution.SINGLE_COORDINATOR; + } + + @Override + public ConnectorTableMetadata getTableMetadata() + { + return tableMetadata; + } + + @Override + public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain constraint) + { + return new FixedPageSource(buildPages(tableMetadata, session, icebergTable, metadataTableType)); + } + + private List buildPages(ConnectorTableMetadata tableMetadata, ConnectorSession session, Table icebergTable, MetadataTableType metadataTableType) + { + PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata); + + TableScan tableScan = createMetadataTableInstance(icebergTable, metadataTableType).newScan(); + TimeZoneKey timeZoneKey = session.getTimeZoneKey(); + + Map columnNameToPosition = mapWithIndex(tableScan.schema().columns().stream(), + (column, position) -> immutableEntry(column.name(), Long.valueOf(position).intValue())) + .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); + + try (CloseableIterable fileScanTasks = tableScan.planFiles()) { + fileScanTasks.forEach(fileScanTask -> addRows((DataTask) fileScanTask, pagesBuilder, timeZoneKey, columnNameToPosition)); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + + return pagesBuilder.build(); + } + + private void addRows(DataTask dataTask, PageListBuilder pagesBuilder, TimeZoneKey timeZoneKey, Map columnNameToPositionInSchema) + { + try (CloseableIterable dataRows = dataTask.rows()) { + dataRows.forEach(dataTaskRow -> addRow(pagesBuilder, dataTaskRow, timeZoneKey, columnNameToPositionInSchema)); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + protected abstract void addRow(PageListBuilder pagesBuilder, StructLike structLike, TimeZoneKey timeZoneKey, Map columnNameToPositionInSchema); +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java index 52c3d173e6d2..1f3706937b77 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java @@ -55,7 +55,6 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.HistoryEntry; -import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -66,7 +65,6 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; -import org.apache.iceberg.TableScan; import org.apache.iceberg.Transaction; import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.types.Type.PrimitiveType; @@ -100,8 +98,6 @@ import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.Iterables.getOnlyElement; -import static com.google.common.collect.Maps.immutableEntry; -import static com.google.common.collect.Streams.mapWithIndex; import static io.airlift.slice.Slices.utf8Slice; import static io.trino.plugin.base.io.ByteBuffers.getWrappedBytes; import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT; @@ -157,7 +153,6 @@ import static java.math.RoundingMode.UNNECESSARY; import static java.util.Comparator.comparing; import static java.util.Objects.requireNonNull; -import static org.apache.iceberg.MetadataTableUtils.createMetadataTableInstance; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; import static org.apache.iceberg.TableProperties.FORMAT_VERSION; @@ -844,18 +839,6 @@ public static void commit(SnapshotUpdate update, ConnectorSession session) update.commit(); } - public static TableScan buildTableScan(Table icebergTable, MetadataTableType metadataTableType) - { - return createMetadataTableInstance(icebergTable, metadataTableType).newScan(); - } - - public static Map columnNameToPositionInSchema(Schema schema) - { - return mapWithIndex(schema.columns().stream(), - (column, position) -> immutableEntry(column.name(), Long.valueOf(position).intValue())) - .collect(toImmutableMap(Entry::getKey, Entry::getValue)); - } - public static String getLatestMetadataLocation(TrinoFileSystem fileSystem, String location) { List latestMetadataLocations = new ArrayList<>(); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/SnapshotsTable.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/SnapshotsTable.java index 81f9b60699dc..7cfbbbf35896 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/SnapshotsTable.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/SnapshotsTable.java @@ -15,33 +15,17 @@ import com.google.common.collect.ImmutableList; import io.trino.plugin.iceberg.util.PageListBuilder; -import io.trino.spi.Page; import io.trino.spi.connector.ColumnMetadata; -import io.trino.spi.connector.ConnectorPageSource; -import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorTableMetadata; -import io.trino.spi.connector.ConnectorTransactionHandle; -import io.trino.spi.connector.FixedPageSource; import io.trino.spi.connector.SchemaTableName; -import io.trino.spi.connector.SystemTable; -import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.TimeZoneKey; import io.trino.spi.type.TypeManager; import io.trino.spi.type.TypeSignature; -import org.apache.iceberg.DataTask; -import org.apache.iceberg.FileScanTask; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; -import org.apache.iceberg.TableScan; -import org.apache.iceberg.io.CloseableIterable; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.List; import java.util.Map; -import static io.trino.plugin.iceberg.IcebergUtil.buildTableScan; -import static io.trino.plugin.iceberg.IcebergUtil.columnNameToPositionInSchema; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS; import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND; @@ -50,10 +34,8 @@ import static org.apache.iceberg.MetadataTableType.SNAPSHOTS; public class SnapshotsTable - implements SystemTable + extends BaseSystemTable { - private final ConnectorTableMetadata tableMetadata; - private final Table icebergTable; private static final String COMMITTED_AT_COLUMN_NAME = "committed_at"; private static final String SNAPSHOT_ID_COLUMN_NAME = "snapshot_id"; private static final String PARENT_ID_COLUMN_NAME = "parent_id"; @@ -63,10 +45,18 @@ public class SnapshotsTable public SnapshotsTable(SchemaTableName tableName, TypeManager typeManager, Table icebergTable) { - requireNonNull(typeManager, "typeManager is null"); + super( + requireNonNull(icebergTable, "icebergTable is null"), + createConnectorTableMetadata( + requireNonNull(tableName, "tableName is null"), + requireNonNull(typeManager, "typeManager is null")), + SNAPSHOTS); + } - this.icebergTable = requireNonNull(icebergTable, "icebergTable is null"); - tableMetadata = new ConnectorTableMetadata(requireNonNull(tableName, "tableName is null"), + private static ConnectorTableMetadata createConnectorTableMetadata(SchemaTableName tableName, TypeManager typeManager) + { + return new ConnectorTableMetadata( + tableName, ImmutableList.builder() .add(new ColumnMetadata(COMMITTED_AT_COLUMN_NAME, TIMESTAMP_TZ_MILLIS)) .add(new ColumnMetadata(SNAPSHOT_ID_COLUMN_NAME, BIGINT)) @@ -78,53 +68,7 @@ public SnapshotsTable(SchemaTableName tableName, TypeManager typeManager, Table } @Override - public Distribution getDistribution() - { - return Distribution.SINGLE_COORDINATOR; - } - - @Override - public ConnectorTableMetadata getTableMetadata() - { - return tableMetadata; - } - - @Override - public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain constraint) - { - return new FixedPageSource(buildPages(tableMetadata, session, icebergTable)); - } - - private static List buildPages(ConnectorTableMetadata tableMetadata, ConnectorSession session, Table icebergTable) - { - PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata); - - TableScan tableScan = buildTableScan(icebergTable, SNAPSHOTS); - TimeZoneKey timeZoneKey = session.getTimeZoneKey(); - - Map columnNameToPosition = columnNameToPositionInSchema(tableScan.schema()); - - try (CloseableIterable fileScanTasks = tableScan.planFiles()) { - fileScanTasks.forEach(fileScanTask -> addRows((DataTask) fileScanTask, pagesBuilder, timeZoneKey, columnNameToPosition)); - } - catch (IOException e) { - throw new UncheckedIOException(e); - } - - return pagesBuilder.build(); - } - - private static void addRows(DataTask dataTask, PageListBuilder pagesBuilder, TimeZoneKey timeZoneKey, Map columnNameToPositionInSchema) - { - try (CloseableIterable dataRows = dataTask.rows()) { - dataRows.forEach(dataTaskRow -> addRow(pagesBuilder, dataTaskRow, timeZoneKey, columnNameToPositionInSchema)); - } - catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - private static void addRow(PageListBuilder pagesBuilder, StructLike structLike, TimeZoneKey timeZoneKey, Map columnNameToPositionInSchema) + protected void addRow(PageListBuilder pagesBuilder, StructLike structLike, TimeZoneKey timeZoneKey, Map columnNameToPositionInSchema) { pagesBuilder.beginRow(); From d67333228f3f8044640a922df6e4c3c1cdcb3e7d Mon Sep 17 00:00:00 2001 From: Star Poon Date: Thu, 18 Jan 2024 19:59:25 +0900 Subject: [PATCH 2/2] Add metadata_log_entries table to Iceberg --- docs/src/main/sphinx/connector/iceberg.md | 46 +++++++++++ .../trino/plugin/iceberg/IcebergMetadata.java | 1 + .../iceberg/MetadataLogEntriesTable.java | 79 +++++++++++++++++++ .../io/trino/plugin/iceberg/TableType.java | 1 + .../iceberg/BaseIcebergConnectorTest.java | 24 ++++++ .../iceberg/BaseIcebergSystemTables.java | 74 +++++++++++++++++ .../TestIcebergMetastoreAccessOperations.java | 9 ++- ...estIcebergGlueCatalogAccessOperations.java | 9 ++- 8 files changed, 241 insertions(+), 2 deletions(-) create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/MetadataLogEntriesTable.java diff --git a/docs/src/main/sphinx/connector/iceberg.md b/docs/src/main/sphinx/connector/iceberg.md index f73405093b2b..6b284fc35163 100644 --- a/docs/src/main/sphinx/connector/iceberg.md +++ b/docs/src/main/sphinx/connector/iceberg.md @@ -825,6 +825,52 @@ The output of the query has the following columns: - Whether or not this snapshot is an ancestor of the current snapshot. ::: +##### `$metadata_log_entries` table + +The `$metadata_log_entries` table provides a view of metadata log entries +of the Iceberg table. + +You can retrieve the information about the metadata log entries of the Iceberg +table `test_table` by using the following query: + +``` +SELECT * FROM "test_table$metadata_log_entries" +``` + +```text + timestamp | file | latest_snapshot_id | latest_schema_id | latest_sequence_number +---------------------------------------+----------------------------------------------------------------------------------------------------------------------------+---------------------+------------------+------------------------ + 2024-01-16 15:55:31.172 Europe/Vienna | hdfs://hadoop-master:9000/user/hive/warehouse/test_table/metadata/00000-39174715-be2a-48fa-9949-35413b8b736e.metadata.json | 1221802298419195590 | 0 | 1 + 2024-01-16 17:19:56.118 Europe/Vienna | hdfs://hadoop-master:9000/user/hive/warehouse/test_table/metadata/00001-e40178c9-271f-4a96-ad29-eed5e7aef9b0.metadata.json | 7124386610209126943 | 0 | 2 +``` + +The output of the query has the following columns: + +:::{list-table} Metadata log entries columns +:widths: 30, 30, 40 +:header-rows: 1 + +* - Name + - Type + - Description +* - `timestamp` + - `TIMESTAMP(3) WITH TIME ZONE` + - The time when the metadata was created. +* - `file` + - `VARCHAR` + - The location of the metadata file. +* - `latest_snapshot_id` + - `BIGINT` + - The identifier of the latest snapshot when the metadata was updated. +* - `latest_schema_id` + - `INTEGER` + - The identifier of the latest schema when the metadata was updated. +* - `latest_sequence_number` + - `BIGINT` + - The data sequence number of the metadata file. +::: + + ##### `$snapshots` table The `$snapshots` table provides a detailed view of snapshots of the Iceberg diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 3650d627f64a..5129aede8ce4 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -575,6 +575,7 @@ private Optional getRawSystemTable(ConnectorSession session, Schema return switch (tableType) { case DATA, MATERIALIZED_VIEW_STORAGE -> throw new VerifyException("Unexpected table type: " + tableType); // Handled above. case HISTORY -> Optional.of(new HistoryTable(tableName, table)); + case METADATA_LOG_ENTRIES -> Optional.of(new MetadataLogEntriesTable(tableName, table)); case SNAPSHOTS -> Optional.of(new SnapshotsTable(tableName, typeManager, table)); case PARTITIONS -> Optional.of(new PartitionTable(tableName, typeManager, table, getCurrentSnapshotId(table))); case MANIFESTS -> Optional.of(new ManifestsTable(tableName, table, getCurrentSnapshotId(table))); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/MetadataLogEntriesTable.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/MetadataLogEntriesTable.java new file mode 100644 index 000000000000..5dbdb458d84d --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/MetadataLogEntriesTable.java @@ -0,0 +1,79 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.google.common.collect.ImmutableList; +import io.trino.plugin.iceberg.util.PageListBuilder; +import io.trino.spi.connector.ColumnMetadata; +import io.trino.spi.connector.ConnectorTableMetadata; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.type.TimeZoneKey; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; + +import java.util.Map; + +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.spi.type.IntegerType.INTEGER; +import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS; +import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND; +import static io.trino.spi.type.VarcharType.VARCHAR; +import static java.util.Objects.requireNonNull; +import static org.apache.iceberg.MetadataTableType.METADATA_LOG_ENTRIES; + +public class MetadataLogEntriesTable + extends BaseSystemTable +{ + private static final String TIMESTAMP_COLUMN_NAME = "timestamp"; + private static final String FILE_COLUMN_NAME = "file"; + private static final String LATEST_SNAPSHOT_ID_COLUMN_NAME = "latest_snapshot_id"; + private static final String LATEST_SCHEMA_ID_COLUMN_NAME = "latest_schema_id"; + private static final String LATEST_SEQUENCE_NUMBER_COLUMN_NAME = "latest_sequence_number"; + + public MetadataLogEntriesTable(SchemaTableName tableName, Table icebergTable) + { + super( + requireNonNull(icebergTable, "icebergTable is null"), + createConnectorTableMetadata(requireNonNull(tableName, "tableName is null")), + METADATA_LOG_ENTRIES); + } + + private static ConnectorTableMetadata createConnectorTableMetadata(SchemaTableName tableName) + { + return new ConnectorTableMetadata( + tableName, + ImmutableList.builder() + .add(new ColumnMetadata(TIMESTAMP_COLUMN_NAME, TIMESTAMP_TZ_MILLIS)) + .add(new ColumnMetadata(FILE_COLUMN_NAME, VARCHAR)) + .add(new ColumnMetadata(LATEST_SNAPSHOT_ID_COLUMN_NAME, BIGINT)) + .add(new ColumnMetadata(LATEST_SCHEMA_ID_COLUMN_NAME, INTEGER)) + .add(new ColumnMetadata(LATEST_SEQUENCE_NUMBER_COLUMN_NAME, BIGINT)) + .build()); + } + + @Override + protected void addRow(PageListBuilder pagesBuilder, StructLike structLike, TimeZoneKey timeZoneKey, Map columnNameToPositionInSchema) + { + pagesBuilder.beginRow(); + + pagesBuilder.appendTimestampTzMillis( + structLike.get(columnNameToPositionInSchema.get(TIMESTAMP_COLUMN_NAME), Long.class) / MICROSECONDS_PER_MILLISECOND, + timeZoneKey); + pagesBuilder.appendVarchar(structLike.get(columnNameToPositionInSchema.get(FILE_COLUMN_NAME), String.class)); + pagesBuilder.appendBigint(structLike.get(columnNameToPositionInSchema.get(LATEST_SNAPSHOT_ID_COLUMN_NAME), Long.class)); + pagesBuilder.appendInteger(structLike.get(columnNameToPositionInSchema.get(LATEST_SCHEMA_ID_COLUMN_NAME), Integer.class)); + pagesBuilder.appendBigint(structLike.get(columnNameToPositionInSchema.get(LATEST_SEQUENCE_NUMBER_COLUMN_NAME), Long.class)); + pagesBuilder.endRow(); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableType.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableType.java index cea961b4d51a..a878f9dada65 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableType.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableType.java @@ -17,6 +17,7 @@ public enum TableType { DATA, HISTORY, + METADATA_LOG_ENTRIES, SNAPSHOTS, MANIFESTS, PARTITIONS, diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 0ae32ea8b156..12c2853b4fb3 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -6354,6 +6354,22 @@ public void testDeleteRetainsTableHistory() List snapshotsAfterDelete = getTableHistory(tableName); assertThat(snapshotsAfterDelete.size()).isGreaterThan(snapshots.size()); assertThat(snapshotsAfterDelete).containsAll(snapshots); + } + + @Test + public void testDeleteRetainsMetadataFile() + { + String tableName = "test_delete_retains_metadata_file_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + "(c1 INT, c2 INT)"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, 1), (2, 2), (3, 3)", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES (3, 3), (4, 4), (5, 5)", 3); + List metadataLogEntries = getLatestSequenceNumbersInMetadataLogEntries(tableName); + + assertUpdate("DELETE FROM " + tableName + " WHERE c1 < 4", 4); + List metadataLogEntriesAfterDelete = getLatestSequenceNumbersInMetadataLogEntries(tableName); + assertThat(metadataLogEntriesAfterDelete) + .hasSizeGreaterThan(metadataLogEntries.size()) + .containsAll(metadataLogEntries); assertUpdate("DROP TABLE " + tableName); } @@ -7893,6 +7909,14 @@ private List getTableHistory(String tableName) .collect(toImmutableList()); } + private List getLatestSequenceNumbersInMetadataLogEntries(String tableName) + { + return getQueryRunner().execute(format("SELECT latest_sequence_number FROM \"%s$metadata_log_entries\"", tableName)) + .getOnlyColumn() + .map(Long.class::cast) + .collect(toImmutableList()); + } + private long getCurrentSnapshotId(String tableName) { return (long) computeScalar("SELECT snapshot_id FROM \"" + tableName + "$snapshots\" ORDER BY committed_at DESC FETCH FIRST 1 ROW WITH TIES"); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java index f120f942caf0..20f934795c5a 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java @@ -24,6 +24,8 @@ import org.junit.jupiter.api.TestInstance; import java.time.LocalDate; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.function.Function; @@ -100,6 +102,7 @@ public void tearDown() assertUpdate("DROP TABLE IF EXISTS test_schema.test_table_drop_column"); assertUpdate("DROP TABLE IF EXISTS test_schema.test_table_nan"); assertUpdate("DROP TABLE IF EXISTS test_schema.test_table_with_dml"); + assertUpdate("DROP TABLE IF EXISTS test_schema.test_metadata_log_entries"); assertUpdate("DROP SCHEMA IF EXISTS test_schema"); } @@ -197,6 +200,77 @@ public void testHistoryTable() assertQuery("SELECT count(*) FROM test_schema.\"test_table$history\"", "VALUES 3"); } + @Test + public void testMetadataLogEntriesTable() + { + assertQuery("SHOW COLUMNS FROM test_schema.\"test_table$metadata_log_entries\"", + "VALUES ('timestamp', 'timestamp(3) with time zone', '', '')," + + "('file', 'varchar', '', '')," + + "('latest_snapshot_id', 'bigint', '', '')," + + "('latest_schema_id', 'integer', '', '')," + + "('latest_sequence_number', 'bigint', '', '')"); + + List latestSchemaIds = new ArrayList<>(); + List latestSequenceNumbers = new ArrayList<>(); + + assertUpdate("CREATE TABLE test_schema.test_metadata_log_entries (c1 BIGINT)"); + latestSchemaIds.add(0); + latestSequenceNumbers.add(1L); + assertMetadataLogEntries(latestSchemaIds, latestSequenceNumbers); + + assertUpdate("INSERT INTO test_schema.test_metadata_log_entries VALUES (1)", 1); + // INSERT create two commits (https://github.com/trinodb/trino/issues/15439) and share a same snapshotId + latestSchemaIds.add(0); + latestSchemaIds.add(0); + latestSequenceNumbers.add(2L); + latestSequenceNumbers.add(2L); + assertMetadataLogEntries(latestSchemaIds, latestSequenceNumbers); + + assertUpdate("ALTER TABLE test_schema.test_metadata_log_entries ADD COLUMN c2 VARCHAR"); + latestSchemaIds.add(0); + latestSequenceNumbers.add(2L); + assertMetadataLogEntries(latestSchemaIds, latestSequenceNumbers); + + assertUpdate("DELETE FROM test_schema.test_metadata_log_entries WHERE c1 = 1", 1); + latestSchemaIds.add(1); + latestSequenceNumbers.add(3L); + assertMetadataLogEntries(latestSchemaIds, latestSequenceNumbers); + + // OPTIMIZE create two commits: update snapshot and rewrite statistics + assertUpdate("ALTER TABLE test_schema.test_metadata_log_entries execute optimize"); + latestSchemaIds.add(1); + latestSchemaIds.add(1); + latestSequenceNumbers.add(4L); + latestSequenceNumbers.add(4L); + assertMetadataLogEntries(latestSchemaIds, latestSequenceNumbers); + + assertUpdate("CREATE OR REPLACE TABLE test_schema.test_metadata_log_entries (c3 INTEGER)"); + latestSchemaIds.add(2); + latestSequenceNumbers.add(5L); + assertMetadataLogEntries(latestSchemaIds, latestSequenceNumbers); + + assertUpdate("INSERT INTO test_schema.test_metadata_log_entries VALUES (1)", 1); + latestSchemaIds.add(2); + latestSequenceNumbers.add(6L); + latestSchemaIds.add(2); + latestSequenceNumbers.add(6L); + assertMetadataLogEntries(latestSchemaIds, latestSequenceNumbers); + + assertUpdate("DROP TABLE IF EXISTS test_schema.test_metadata_log_entries"); + } + + private void assertMetadataLogEntries(List latestSchemaIds, List latestSequenceNumbers) + { + MaterializedResult result = computeActual("SELECT latest_schema_id, latest_sequence_number FROM test_schema.\"test_metadata_log_entries$metadata_log_entries\" ORDER BY timestamp"); + List materializedRows = result.getMaterializedRows(); + + assertThat(result.getRowCount()).isEqualTo(latestSchemaIds.size()); + for (int i = 0; i < result.getRowCount(); i++) { + assertThat(materializedRows.get(i).getField(0)).isEqualTo(latestSchemaIds.get(i)); + assertThat(materializedRows.get(i).getField(1)).isEqualTo(latestSequenceNumbers.get(i)); + } + } + @Test public void testSnapshotsTable() { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java index 888ffe863968..e5f2280a64c1 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java @@ -41,6 +41,7 @@ import static io.trino.plugin.iceberg.TableType.HISTORY; import static io.trino.plugin.iceberg.TableType.MANIFESTS; import static io.trino.plugin.iceberg.TableType.MATERIALIZED_VIEW_STORAGE; +import static io.trino.plugin.iceberg.TableType.METADATA_LOG_ENTRIES; import static io.trino.plugin.iceberg.TableType.PARTITIONS; import static io.trino.plugin.iceberg.TableType.PROPERTIES; import static io.trino.plugin.iceberg.TableType.REFS; @@ -300,6 +301,12 @@ public void testSelectSystemTable() .addCopies(GET_TABLE, 1) .build()); + // select from $metadata_log_entries + assertMetastoreInvocations("SELECT * FROM \"test_select_snapshots$metadata_log_entries\"", + ImmutableMultiset.builder() + .add(GET_TABLE) + .build()); + // select from $snapshots assertMetastoreInvocations("SELECT * FROM \"test_select_snapshots$snapshots\"", ImmutableMultiset.builder() @@ -335,7 +342,7 @@ public void testSelectSystemTable() // This test should get updated if a new system table is added. assertThat(TableType.values()) - .containsExactly(DATA, HISTORY, SNAPSHOTS, MANIFESTS, PARTITIONS, FILES, PROPERTIES, REFS, MATERIALIZED_VIEW_STORAGE); + .containsExactly(DATA, HISTORY, METADATA_LOG_ENTRIES, SNAPSHOTS, MANIFESTS, PARTITIONS, FILES, PROPERTIES, REFS, MATERIALIZED_VIEW_STORAGE); } @Test diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java index 9b3275d12393..ba0f15f73e5a 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java @@ -50,6 +50,7 @@ import static io.trino.plugin.iceberg.TableType.HISTORY; import static io.trino.plugin.iceberg.TableType.MANIFESTS; import static io.trino.plugin.iceberg.TableType.MATERIALIZED_VIEW_STORAGE; +import static io.trino.plugin.iceberg.TableType.METADATA_LOG_ENTRIES; import static io.trino.plugin.iceberg.TableType.PARTITIONS; import static io.trino.plugin.iceberg.TableType.PROPERTIES; import static io.trino.plugin.iceberg.TableType.REFS; @@ -447,6 +448,12 @@ public void testSelectSystemTable() .addCopies(GET_TABLE, 1) .build()); + // select from $metadata_log_entries + assertGlueMetastoreApiInvocations("SELECT * FROM \"test_select_snapshots$metadata_log_entries\"", + ImmutableMultiset.builder() + .add(GET_TABLE) + .build()); + // select from $snapshots assertGlueMetastoreApiInvocations("SELECT * FROM \"test_select_snapshots$snapshots\"", ImmutableMultiset.builder() @@ -488,7 +495,7 @@ public void testSelectSystemTable() // This test should get updated if a new system table is added. assertThat(TableType.values()) - .containsExactly(DATA, HISTORY, SNAPSHOTS, MANIFESTS, PARTITIONS, FILES, PROPERTIES, REFS, MATERIALIZED_VIEW_STORAGE); + .containsExactly(DATA, HISTORY, METADATA_LOG_ENTRIES, SNAPSHOTS, MANIFESTS, PARTITIONS, FILES, PROPERTIES, REFS, MATERIALIZED_VIEW_STORAGE); } finally { getQueryRunner().execute("DROP TABLE IF EXISTS test_select_snapshots");