Skip to content

Commit

Permalink
Support Iceberg refs system table
Browse files Browse the repository at this point in the history
  • Loading branch information
jackye1995 authored and electrum committed Mar 14, 2023
1 parent ce42133 commit 3d9316f
Show file tree
Hide file tree
Showing 8 changed files with 211 additions and 22 deletions.
44 changes: 44 additions & 0 deletions docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1356,6 +1356,50 @@ The output of the query has the following columns:
- ``array(integer)``
- The set of field IDs used for equality comparison in equality delete files

``$refs`` table
^^^^^^^^^^^^^^^

The ``$refs`` table provides information about Iceberg references including branches and tags.

You can retrieve the references of the Iceberg table ``test_table`` by using the following query::

SELECT * FROM "test_table$refs"

.. code-block:: text
name | type | snapshot_id | max_reference_age_in_ms | min_snapshots_to_keep | max_snapshot_age_in_ms |
----------------+--------+-------------+-------------------------+-----------------------+------------------------+
example_tag | TAG | 10000000000 | 10000 | null | null |
example_branch | BRANCH | 20000000000 | 20000 | 2 | 30000 |
The output of the query has the following columns:

.. list-table:: Refs columns
:widths: 20, 30, 50
:header-rows: 1

* - Name
- Type
- Description
* - ``name``
- ``varchar``
- Name of the reference
* - ``type``
- ``varchar``
- Type of the reference, either ``BRANCH`` or ``TAG``
* - ``snapshot_id``
- ``bigint``
- The snapshot ID of the reference
* - ``max_reference_age_in_ms``
- ``bigint``
- The maximum age of the reference before it could be expired.
* - ``min_snapshots_to_keep``
- ``integer``
- For branch only, the minimum number of snapshots to keep in a branch.
* - ``max_snapshot_age_in_ms``
- ``bigint``
- For branch only, the max snapshot age allowed in a branch. Older snapshots in the branch will be expired.

.. _iceberg-materialized-views:

Materialized views
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Schema
case MANIFESTS -> Optional.of(new ManifestsTable(systemTableName, table, getCurrentSnapshotId(table)));
case FILES -> Optional.of(new FilesTable(systemTableName, typeManager, table, getCurrentSnapshotId(table)));
case PROPERTIES -> Optional.of(new PropertiesTable(systemTableName, table));
case REFS -> Optional.of(new RefsTable(systemTableName, table));
};
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import io.trino.plugin.iceberg.util.PageListBuilder;
import io.trino.spi.Page;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.FixedPageSource;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.predicate.TupleDomain;
import org.apache.iceberg.Table;

import java.util.List;

import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.util.Objects.requireNonNull;

public class RefsTable
implements SystemTable
{
private final ConnectorTableMetadata tableMetadata;
private final Table icebergTable;

public RefsTable(SchemaTableName tableName, Table icebergTable)
{
this.icebergTable = requireNonNull(icebergTable, "icebergTable is null");

this.tableMetadata = new ConnectorTableMetadata(requireNonNull(tableName, "tableName is null"),
ImmutableList.<ColumnMetadata>builder()
.add(new ColumnMetadata("name", VARCHAR))
.add(new ColumnMetadata("type", VARCHAR))
.add(new ColumnMetadata("snapshot_id", BIGINT))
.add(new ColumnMetadata("max_reference_age_in_ms", BIGINT))
.add(new ColumnMetadata("min_snapshots_to_keep", INTEGER))
.add(new ColumnMetadata("max_snapshot_age_in_ms", BIGINT))
.build());
}

@Override
public Distribution getDistribution()
{
return Distribution.SINGLE_COORDINATOR;
}

@Override
public ConnectorTableMetadata getTableMetadata()
{
return tableMetadata;
}

@Override
public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
{
return new FixedPageSource(buildPages(tableMetadata, icebergTable));
}

private static List<Page> buildPages(ConnectorTableMetadata tableMetadata, Table icebergTable)
{
PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata);

icebergTable.refs().forEach((refName, ref) -> {
pagesBuilder.beginRow();
pagesBuilder.appendVarchar(refName);
pagesBuilder.appendVarchar(ref.isBranch() ? "BRANCH" : "TAG");
pagesBuilder.appendBigint(ref.snapshotId());
pagesBuilder.appendBigint(ref.maxRefAgeMs());
pagesBuilder.appendInteger(ref.minSnapshotsToKeep());
pagesBuilder.appendBigint(ref.maxSnapshotAgeMs());
pagesBuilder.endRow();
});

return pagesBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,30 +86,13 @@ private static List<Page> buildPages(ConnectorTableMetadata tableMetadata, Conne
pagesBuilder.beginRow();
pagesBuilder.appendTimestampTzMillis(snapshot.timestampMillis(), timeZoneKey);
pagesBuilder.appendBigint(snapshot.snapshotId());
if (checkNonNull(snapshot.parentId(), pagesBuilder)) {
pagesBuilder.appendBigint(snapshot.parentId());
}
if (checkNonNull(snapshot.operation(), pagesBuilder)) {
pagesBuilder.appendVarchar(snapshot.operation());
}
if (checkNonNull(snapshot.manifestListLocation(), pagesBuilder)) {
pagesBuilder.appendVarchar(snapshot.manifestListLocation());
}
if (checkNonNull(snapshot.summary(), pagesBuilder)) {
pagesBuilder.appendVarcharVarcharMap(snapshot.summary());
}
pagesBuilder.appendBigint(snapshot.parentId());
pagesBuilder.appendVarchar(snapshot.operation());
pagesBuilder.appendVarchar(snapshot.manifestListLocation());
pagesBuilder.appendVarcharVarcharMap(snapshot.summary());
pagesBuilder.endRow();
});

return pagesBuilder.build();
}

private static boolean checkNonNull(Object object, PageListBuilder pagesBuilder)
{
if (object == null) {
pagesBuilder.appendNull();
return false;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ public enum TableType
PARTITIONS,
FILES,
PROPERTIES,
REFS
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,25 @@ public void appendInteger(int value)
INTEGER.writeLong(nextColumn(), value);
}

public void appendInteger(Integer value)
{
if (checkNonNull(value)) {
appendInteger(value.intValue());
}
}

public void appendBigint(long value)
{
BIGINT.writeLong(nextColumn(), value);
}

public void appendBigint(Long value)
{
if (checkNonNull(value)) {
appendBigint(value.longValue());
}
}

public void appendTimestampTzMillis(long millisUtc, TimeZoneKey timeZoneKey)
{
TIMESTAMP_TZ_MILLIS.writeLong(nextColumn(), packDateTimeWithZone(millisUtc, timeZoneKey));
Expand Down Expand Up @@ -190,4 +204,13 @@ public static PageListBuilder forTable(ConnectorTableMetadata table)
.map(ColumnMetadata::getType)
.collect(toImmutableList()));
}

private boolean checkNonNull(Object object)
{
if (object == null) {
appendNull();
return false;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import static io.trino.plugin.iceberg.TableType.MANIFESTS;
import static io.trino.plugin.iceberg.TableType.PARTITIONS;
import static io.trino.plugin.iceberg.TableType.PROPERTIES;
import static io.trino.plugin.iceberg.TableType.REFS;
import static io.trino.plugin.iceberg.TableType.SNAPSHOTS;
import static io.trino.testing.TestingSession.testSessionBuilder;
import static java.lang.String.format;
Expand Down Expand Up @@ -295,7 +296,7 @@ public void testSelectSystemTable()

// This test should get updated if a new system table is added.
assertThat(TableType.values())
.containsExactly(DATA, HISTORY, SNAPSHOTS, MANIFESTS, PARTITIONS, FILES, PROPERTIES);
.containsExactly(DATA, HISTORY, SNAPSHOTS, MANIFESTS, PARTITIONS, FILES, PROPERTIES, REFS);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,49 @@ public void testStatsFilePruning()
}
}

@Test
public void testSnapshotReferenceSystemTable()
{
String tableName = "test_snapshot_reference_system_table_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['regionkey']) AS SELECT * FROM tpch.tiny.nation", 25);
Table icebergTable = this.loadTable(tableName);
long snapshotId1 = icebergTable.currentSnapshot().snapshotId();
icebergTable.manageSnapshots()
.createTag("test-tag", snapshotId1)
.setMaxRefAgeMs("test-tag", 1)
.commit();

assertUpdate("INSERT INTO " + tableName + " SELECT * FROM tpch.tiny.nation LIMIT 5", 5);
icebergTable.refresh();
long snapshotId2 = icebergTable.currentSnapshot().snapshotId();
icebergTable.manageSnapshots()
.createBranch("test-branch", snapshotId2)
.setMaxSnapshotAgeMs("test-branch", 1)
.commit();

assertUpdate("INSERT INTO " + tableName + " SELECT * FROM tpch.tiny.nation LIMIT 5", 5);
icebergTable.refresh();
long snapshotId3 = icebergTable.currentSnapshot().snapshotId();
icebergTable.manageSnapshots()
.createBranch("test-branch2", snapshotId3)
.setMinSnapshotsToKeep("test-branch2", 1)
.commit();

assertQuery("SHOW COLUMNS FROM \"" + tableName + "$refs\"",
"VALUES ('name', 'varchar', '', '')," +
"('type', 'varchar', '', '')," +
"('snapshot_id', 'bigint', '', '')," +
"('max_reference_age_in_ms', 'bigint', '', '')," +
"('min_snapshots_to_keep', 'integer', '', '')," +
"('max_snapshot_age_in_ms', 'bigint', '', '')");

assertQuery("SELECT * FROM \"" + tableName + "$refs\"",
"VALUES ('test-tag', 'TAG', " + snapshotId1 + ", 1, null, null)," +
"('test-branch', 'BRANCH', " + snapshotId2 + ", null, null, 1)," +
"('test-branch2', 'BRANCH', " + snapshotId3 + ", null, 1, null)," +
"('main', 'BRANCH', " + snapshotId3 + ", null, null, null)");
}

private void writeEqualityDeleteToNationTable(Table icebergTable)
throws Exception
{
Expand Down

0 comments on commit 3d9316f

Please sign in to comment.