Skip to content

Commit

Permalink
Add metadata_log_entries table to Iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
oneonestar committed Mar 11, 2024
1 parent 4c3e085 commit 44bb855
Show file tree
Hide file tree
Showing 8 changed files with 240 additions and 3 deletions.
46 changes: 46 additions & 0 deletions docs/src/main/sphinx/connector/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,52 @@ The output of the query has the following columns:
- Whether or not this snapshot is an ancestor of the current snapshot.
:::

##### `$metadata_log_entries` table

The `$metadata_log_entries` table provides a view of metadata log entries
of the Iceberg table.

You can retrieve the information about the metadata log entries of the Iceberg
table `test_table` by using the following query:

```
SELECT * FROM "test_table$metadata_log_entries"
```

```text
timestamp | file | latest_snapshot_id | latest_schema_id | latest_sequence_number
---------------------------------------+----------------------------------------------------------------------------------------------------------------------------+---------------------+------------------+------------------------
2024-01-16 15:55:31.172 Europe/Vienna | hdfs://hadoop-master:9000/user/hive/warehouse/test_table/metadata/00000-39174715-be2a-48fa-9949-35413b8b736e.metadata.json | 1221802298419195590 | 0 | 1
2024-01-16 17:19:56.118 Europe/Vienna | hdfs://hadoop-master:9000/user/hive/warehouse/test_table/metadata/00001-e40178c9-271f-4a96-ad29-eed5e7aef9b0.metadata.json | 7124386610209126943 | 0 | 2
```

The output of the query has the following columns:

:::{list-table} Metadata log entries columns
:widths: 30, 30, 40
:header-rows: 1

* - Name
- Type
- Description
* - `timestamp`
- `TIMESTAMP(3) WITH TIME ZONE`
- The time when the metadata was created.
* - `file`
- `VARCHAR`
- The location of the metadata file.
* - `latest_snapshot_id`
- `BIGINT`
- The identifier of the latest snapshot when the metadata was updated.
* - `latest_schema_id`
- `INTEGER`
- The identifier of the latest schema when the metadata was updated.
* - `latest_sequence_number`
- `BIGINT`
- The data sequence number of the metadata file.
:::


##### `$snapshots` table

The `$snapshots` table provides a detailed view of snapshots of the Iceberg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,7 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Schema
return switch (tableType) {
case DATA, MATERIALIZED_VIEW_STORAGE -> throw new VerifyException("Unexpected table type: " + tableType); // Handled above.
case HISTORY -> Optional.of(new HistoryTable(tableName, table));
case METADATA_LOG_ENTRIES -> Optional.of(new MetadataLogEntriesTable(tableName, table));
case SNAPSHOTS -> Optional.of(new SnapshotsTable(tableName, typeManager, table));
case PARTITIONS -> Optional.of(new PartitionTable(tableName, typeManager, table, getCurrentSnapshotId(table)));
case MANIFESTS -> Optional.of(new ManifestsTable(tableName, table, getCurrentSnapshotId(table)));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import io.trino.plugin.iceberg.util.PageListBuilder;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.FixedPageSource;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.TimeZoneKey;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;

import java.util.Map;

import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS;
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.MetadataTableType.METADATA_LOG_ENTRIES;

public class MetadataLogEntriesTable
extends BaseSystemTable
{
private static final String TIMESTAMP_COLUMN_NAME = "timestamp";
private static final String FILE_COLUMN_NAME = "file";
private static final String LATEST_SNAPSHOT_ID_COLUMN_NAME = "latest_snapshot_id";
private static final String LATEST_SCHEMA_ID_COLUMN_NAME = "latest_schema_id";
private static final String LATEST_SEQUENCE_NUMBER_COLUMN_NAME = "latest_sequence_number";

public MetadataLogEntriesTable(SchemaTableName tableName, Table icebergTable)
{
this.icebergTable = requireNonNull(icebergTable, "icebergTable is null");

tableMetadata = new ConnectorTableMetadata(requireNonNull(tableName, "tableName is null"),
ImmutableList.<ColumnMetadata>builder()
.add(new ColumnMetadata(TIMESTAMP_COLUMN_NAME, TIMESTAMP_TZ_MILLIS))
.add(new ColumnMetadata(FILE_COLUMN_NAME, VARCHAR))
.add(new ColumnMetadata(LATEST_SNAPSHOT_ID_COLUMN_NAME, BIGINT))
.add(new ColumnMetadata(LATEST_SCHEMA_ID_COLUMN_NAME, INTEGER))
.add(new ColumnMetadata(LATEST_SEQUENCE_NUMBER_COLUMN_NAME, BIGINT))
.build());
}

@Override
public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
{
return new FixedPageSource(buildPages(tableMetadata, session, icebergTable, METADATA_LOG_ENTRIES));
}

@Override
protected void addRow(PageListBuilder pagesBuilder, StructLike structLike, TimeZoneKey timeZoneKey, Map<String, Integer> columnNameToPositionInSchema)
{
pagesBuilder.beginRow();

pagesBuilder.appendTimestampTzMillis(
structLike.get(columnNameToPositionInSchema.get(TIMESTAMP_COLUMN_NAME), Long.class) / MICROSECONDS_PER_MILLISECOND,
timeZoneKey);
pagesBuilder.appendVarchar(structLike.get(columnNameToPositionInSchema.get(FILE_COLUMN_NAME), String.class));
pagesBuilder.appendBigint(structLike.get(columnNameToPositionInSchema.get(LATEST_SNAPSHOT_ID_COLUMN_NAME), Long.class));
pagesBuilder.appendInteger(structLike.get(columnNameToPositionInSchema.get(LATEST_SCHEMA_ID_COLUMN_NAME), Integer.class));
pagesBuilder.appendBigint(structLike.get(columnNameToPositionInSchema.get(LATEST_SEQUENCE_NUMBER_COLUMN_NAME), Long.class));
pagesBuilder.endRow();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public enum TableType
{
DATA,
HISTORY,
METADATA_LOG_ENTRIES,
SNAPSHOTS,
MANIFESTS,
PARTITIONS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6308,18 +6308,23 @@ public void testReadFromVersionedTableWithExpiredHistory()
}

@Test
public void testDeleteRetainsTableHistory()
public void testDeleteRetainsTableHistoryAndMetadataFile()
{
String tableName = "test_delete_retains_table_history_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + "(c1 INT, c2 INT)");
assertUpdate("INSERT INTO " + tableName + " VALUES (1, 1), (2, 2), (3, 3)", 3);
assertUpdate("INSERT INTO " + tableName + " VALUES (3, 3), (4, 4), (5, 5)", 3);
List<Long> snapshots = getTableHistory(tableName);
List<Long> metadataLogEntries = getTableMetadataLogEntries(tableName);

assertUpdate("DELETE FROM " + tableName + " WHERE c1 < 4", 4);
List<Long> snapshotsAfterDelete = getTableHistory(tableName);
assertThat(snapshotsAfterDelete.size()).isGreaterThan(snapshots.size());
assertThat(snapshotsAfterDelete).containsAll(snapshots);

List<Long> metadataLogEntriesAfterDelete = getTableMetadataLogEntries(tableName);
assertThat(metadataLogEntriesAfterDelete.size()).isGreaterThan(metadataLogEntries.size());
assertThat(metadataLogEntriesAfterDelete).containsAll(metadataLogEntries);
assertUpdate("DROP TABLE " + tableName);
}

Expand Down Expand Up @@ -7859,6 +7864,14 @@ private List<Long> getTableHistory(String tableName)
.collect(toImmutableList());
}

private List<Long> getTableMetadataLogEntries(String tableName)
{
return getQueryRunner().execute(format("SELECT latest_sequence_number FROM \"%s$metadata_log_entries\"", tableName))
.getOnlyColumn()
.map(Long.class::cast)
.collect(toImmutableList());
}

private long getCurrentSnapshotId(String tableName)
{
return (long) computeScalar("SELECT snapshot_id FROM \"" + tableName + "$snapshots\" ORDER BY committed_at DESC FETCH FIRST 1 ROW WITH TIES");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.junit.jupiter.api.TestInstance;

import java.time.LocalDate;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

Expand Down Expand Up @@ -197,6 +199,83 @@ public void testHistoryTable()
assertQuery("SELECT count(*) FROM test_schema.\"test_table$history\"", "VALUES 3");
}

@Test
public void testMetadataLogEntriesTable()
{
assertQuery("SHOW COLUMNS FROM test_schema.\"test_table$metadata_log_entries\"",
"VALUES ('timestamp', 'timestamp(3) with time zone', '', '')," +
"('file', 'varchar', '', '')," +
"('latest_snapshot_id', 'bigint', '', '')," +
"('latest_schema_id', 'integer', '', '')," +
"('latest_sequence_number', 'bigint', '', '')");

List<Integer> latestSchemaId = new ArrayList<>();
List<Long> latestSequenceNumber = new ArrayList<>();
// Update this test after Iceberg fixed metadata_log_entries after RTAS (https://github.com/apache/iceberg/issues/9723)
assertUpdate("CREATE TABLE test_schema.test_metadata_log_entries (c1 BIGINT)");
latestSchemaId.add(0);
latestSequenceNumber.add(1L);
testMetadataLogEntriesHelper(latestSchemaId, latestSequenceNumber);

assertUpdate("INSERT INTO test_schema.test_metadata_log_entries VALUES (1)", 1);
// INSERT create two commits (https://github.com/trinodb/trino/issues/15439) and share a same snapshotId
latestSchemaId.add(0);
latestSchemaId.add(0);
latestSequenceNumber.add(2L);
latestSequenceNumber.add(2L);
testMetadataLogEntriesHelper(latestSchemaId, latestSequenceNumber);

assertUpdate("ALTER TABLE test_schema.test_metadata_log_entries ADD COLUMN c2 VARCHAR");
latestSchemaId.add(0);
latestSequenceNumber.add(2L);
testMetadataLogEntriesHelper(latestSchemaId, latestSequenceNumber);

assertUpdate("DELETE FROM test_schema.test_metadata_log_entries WHERE c1 = 1", 1);
latestSchemaId.add(1);
latestSequenceNumber.add(3L);
testMetadataLogEntriesHelper(latestSchemaId, latestSequenceNumber);

assertUpdate("ALTER TABLE test_schema.test_metadata_log_entries execute optimize");
latestSchemaId.add(1);
latestSequenceNumber.add(4L);
testMetadataLogEntriesHelper(latestSchemaId, latestSequenceNumber);

assertUpdate("CREATE OR REPLACE TABLE test_schema.test_metadata_log_entries (c3 INTEGER)");
// Iceberg unintentionally deleted the history entries after RTAS
// Update this test after Iceberg release the fix (https://github.com/apache/iceberg/issues/9723)
latestSchemaId = new ArrayList<>();
latestSequenceNumber = new ArrayList<>();
for (int i = 0; i < 6; i++) {
latestSchemaId.add(null);
latestSequenceNumber.add(null);
}

latestSchemaId.add(2);
latestSequenceNumber.add(5L);
testMetadataLogEntriesHelper(latestSchemaId, latestSequenceNumber);

assertUpdate("INSERT INTO test_schema.test_metadata_log_entries VALUES (1)", 1);
latestSchemaId.add(2);
latestSequenceNumber.add(6L);
latestSchemaId.add(2);
latestSequenceNumber.add(6L);
testMetadataLogEntriesHelper(latestSchemaId, latestSequenceNumber);

assertUpdate("DROP TABLE IF EXISTS test_schema.test_metadata_log_entries");
}

private void testMetadataLogEntriesHelper(List<Integer> latestSchemaId, List<Long> latestSequenceNumber)
{
MaterializedResult result = computeActual("SELECT * FROM test_schema.\"test_metadata_log_entries$metadata_log_entries\" ORDER BY timestamp");
List<MaterializedRow> materializedRows = result.getMaterializedRows();

assertThat(result.getRowCount()).isEqualTo(latestSchemaId.size());
for (int i = 0; i < result.getRowCount(); i++) {
assertThat(materializedRows.get(i).getField(3)).isEqualTo(latestSchemaId.get(i));
assertThat(materializedRows.get(i).getField(4)).isEqualTo(latestSequenceNumber.get(i));
}
}

@Test
public void testSnapshotsTable()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import static io.trino.plugin.iceberg.TableType.HISTORY;
import static io.trino.plugin.iceberg.TableType.MANIFESTS;
import static io.trino.plugin.iceberg.TableType.MATERIALIZED_VIEW_STORAGE;
import static io.trino.plugin.iceberg.TableType.METADATA_LOG_ENTRIES;
import static io.trino.plugin.iceberg.TableType.PARTITIONS;
import static io.trino.plugin.iceberg.TableType.PROPERTIES;
import static io.trino.plugin.iceberg.TableType.REFS;
Expand Down Expand Up @@ -301,6 +302,12 @@ public void testSelectSystemTable()
.addCopies(GET_TABLE, 1)
.build());

// select from $metadata_log_entries
assertMetastoreInvocations("SELECT * FROM \"test_select_snapshots$metadata_log_entries\"",
ImmutableMultiset.<MetastoreMethod>builder()
.addCopies(GET_TABLE, 1)
.build());

// select from $snapshots
assertMetastoreInvocations("SELECT * FROM \"test_select_snapshots$snapshots\"",
ImmutableMultiset.<MetastoreMethod>builder()
Expand Down Expand Up @@ -336,7 +343,7 @@ public void testSelectSystemTable()

// This test should get updated if a new system table is added.
assertThat(TableType.values())
.containsExactly(DATA, HISTORY, SNAPSHOTS, MANIFESTS, PARTITIONS, FILES, PROPERTIES, REFS, MATERIALIZED_VIEW_STORAGE);
.containsExactly(DATA, HISTORY, METADATA_LOG_ENTRIES, SNAPSHOTS, MANIFESTS, PARTITIONS, FILES, PROPERTIES, REFS, MATERIALIZED_VIEW_STORAGE);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import static io.trino.plugin.iceberg.TableType.HISTORY;
import static io.trino.plugin.iceberg.TableType.MANIFESTS;
import static io.trino.plugin.iceberg.TableType.MATERIALIZED_VIEW_STORAGE;
import static io.trino.plugin.iceberg.TableType.METADATA_LOG_ENTRIES;
import static io.trino.plugin.iceberg.TableType.PARTITIONS;
import static io.trino.plugin.iceberg.TableType.PROPERTIES;
import static io.trino.plugin.iceberg.TableType.REFS;
Expand Down Expand Up @@ -447,6 +448,12 @@ public void testSelectSystemTable()
.addCopies(GET_TABLE, 1)
.build());

// select from $metadata_log_entries
assertGlueMetastoreApiInvocations("SELECT * FROM \"test_select_snapshots$metadata_log_entries\"",
ImmutableMultiset.builder()
.addCopies(GET_TABLE, 1)
.build());

// select from $snapshots
assertGlueMetastoreApiInvocations("SELECT * FROM \"test_select_snapshots$snapshots\"",
ImmutableMultiset.builder()
Expand Down Expand Up @@ -488,7 +495,7 @@ public void testSelectSystemTable()

// This test should get updated if a new system table is added.
assertThat(TableType.values())
.containsExactly(DATA, HISTORY, SNAPSHOTS, MANIFESTS, PARTITIONS, FILES, PROPERTIES, REFS, MATERIALIZED_VIEW_STORAGE);
.containsExactly(DATA, HISTORY, METADATA_LOG_ENTRIES, SNAPSHOTS, MANIFESTS, PARTITIONS, FILES, PROPERTIES, REFS, MATERIALIZED_VIEW_STORAGE);
}
finally {
getQueryRunner().execute("DROP TABLE IF EXISTS test_select_snapshots");
Expand Down

0 comments on commit 44bb855

Please sign in to comment.