Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Iceberg, Delta $data system table #16650

Merged
merged 5 commits into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 1 addition & 14 deletions docs/src/main/sphinx/connector/delta-lake.rst
Original file line number Diff line number Diff line change
Expand Up @@ -662,20 +662,7 @@ These metadata tables contain information about the internal structure
of the Delta Lake table. You can query each metadata table by appending the
metadata table name to the table name::

SELECT * FROM "test_table$data"

``$data`` table
^^^^^^^^^^^^^^^

The ``$data`` table is an alias for the Delta Lake table itself.

The statement::

SELECT * FROM "test_table$data"

is equivalent to::

SELECT * FROM test_table
SELECT * FROM "test_table$history"

``$history`` table
^^^^^^^^^^^^^^^^^^
Expand Down
15 changes: 1 addition & 14 deletions docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1046,20 +1046,7 @@ These metadata tables contain information about the internal structure
of the Iceberg table. You can query each metadata table by appending the
metadata table name to the table name::

SELECT * FROM "test_table$data"

``$data`` table
^^^^^^^^^^^^^^^

The ``$data`` table is an alias for the Iceberg table itself.

The statement::

SELECT * FROM "test_table$data"

is equivalent to::

SELECT * FROM test_table
SELECT * FROM "test_table$properties"

``$properties`` table
^^^^^^^^^^^^^^^^^^^^^
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,20 @@ protected Optional<DataMappingTestSetup> filterCaseSensitiveDataMappingTestData(
return Optional.of(dataMappingTestSetup);
}

@Override
public void testNoDataSystemTable()
{
// TODO (https://github.com/trinodb/trino/issues/6515): Big Query throws an error when trying to read "some_table$data".
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertThatThrownBy(super::testNoDataSystemTable)
.hasMessageFindingMatch("\\Q" +
"Expecting message:\n" +
" \"Cannot read partition information from a table that is not partitioned: \\E\\S+\\Q:tpch.nation$data\"\n" +
"to match regex:\n" +
" \"line 1:1: Table '\\w+.\\w+.nation\\$data' does not exist\"\n" +
"but did not.");
throw new SkipException("TODO");
}

@Override
protected boolean isColumnNameRejected(Exception exception, String columnName, boolean delimited)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.deltalake;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.VerifyException;
import com.google.common.collect.Comparators;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -407,8 +408,7 @@ public DeltaLakeTableHandle getTableHandle(ConnectorSession session, SchemaTable
// Pretend the table does not exist to produce better error message in case of table redirects to Hive
return null;
}
DeltaLakeTableName deltaLakeTableName = DeltaLakeTableName.from(tableName.getTableName());
SchemaTableName dataTableName = new SchemaTableName(tableName.getSchemaName(), deltaLakeTableName.getTableName());
SchemaTableName dataTableName = new SchemaTableName(tableName.getSchemaName(), tableName.getTableName());
Optional<Table> table = metastore.getTable(dataTableName.getSchemaName(), dataTableName.getTableName());
if (table.isEmpty()) {
return null;
Expand Down Expand Up @@ -2455,10 +2455,9 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Schema
if (tableType.isEmpty()) {
return Optional.empty();
}
DeltaLakeTableName deltaLakeTableName = new DeltaLakeTableName(name, tableType.get());
SchemaTableName systemTableName = new SchemaTableName(tableName.getSchemaName(), deltaLakeTableName.getTableNameWithType());
return switch (deltaLakeTableName.getTableType()) {
case DATA -> Optional.empty(); // Handled above
SchemaTableName systemTableName = new SchemaTableName(tableName.getSchemaName(), DeltaLakeTableName.tableNameWithType(name, tableType.get()));
return switch (tableType.get()) {
case DATA -> throw new VerifyException("Unexpected DATA table type"); // Handled above.
case HISTORY -> Optional.of(new DeltaLakeHistoryTable(
systemTableName,
getCommitInfoEntries(tableHandle.getSchemaTableName(), session),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,68 +22,22 @@

import static io.trino.plugin.deltalake.DeltaLakeTableType.DATA;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class DeltaLakeTableName
public final class DeltaLakeTableName
{
private DeltaLakeTableName() {}

private static final Pattern TABLE_PATTERN = Pattern.compile("" +
"(?<table>[^$@]+)" +
"(?:\\$(?<type>[^@]+))?");

private final String tableName;
private final DeltaLakeTableType tableType;

public DeltaLakeTableName(String tableName, DeltaLakeTableType tableType)
{
this.tableName = requireNonNull(tableName, "tableName is null");
this.tableType = requireNonNull(tableType, "tableType is null");
}

public String getTableName()
{
return tableName;
}

public DeltaLakeTableType getTableType()
{
return tableType;
}

public String getTableNameWithType()
public static String tableNameWithType(String tableName, DeltaLakeTableType tableType)
{
requireNonNull(tableName, "tableName is null");
return tableName + "$" + tableType.name().toLowerCase(Locale.ENGLISH);
}

@Override
public String toString()
{
return getTableNameWithType();
}

public static DeltaLakeTableName from(String name)
{
Matcher match = TABLE_PATTERN.matcher(name);
if (!match.matches()) {
throw new TrinoException(NOT_SUPPORTED, "Invalid Delta Lake table name: " + name);
}

String table = match.group("table");
String typeString = match.group("type");

DeltaLakeTableType type = DeltaLakeTableType.DATA;
if (typeString != null) {
try {
type = DeltaLakeTableType.valueOf(typeString.toUpperCase(Locale.ENGLISH));
}
catch (IllegalArgumentException e) {
throw new TrinoException(NOT_SUPPORTED, format("Invalid Delta Lake table name (unknown type '%s'): %s", typeString, name));
}
}

return new DeltaLakeTableName(table, type);
}

public static String tableNameFrom(String name)
{
Matcher match = TABLE_PATTERN.matcher(name);
Expand All @@ -105,7 +59,12 @@ public static Optional<DeltaLakeTableType> tableTypeFrom(String name)
return Optional.of(DATA);
}
try {
return Optional.of(DeltaLakeTableType.valueOf(typeString.toUpperCase(Locale.ENGLISH)));
DeltaLakeTableType parsedType = DeltaLakeTableType.valueOf(typeString.toUpperCase(Locale.ENGLISH));
if (parsedType == DATA) {
// $data cannot be encoded in table name
return Optional.empty();
}
return Optional.of(parsedType);
}
catch (IllegalArgumentException e) {
return Optional.empty();
Expand All @@ -119,15 +78,6 @@ public static boolean isDataTable(String name)
throw new TrinoException(NOT_SUPPORTED, "Invalid Delta Lake table name: " + name);
}
String typeString = match.group("type");
if (typeString == null) {
return true;
}
try {
DeltaLakeTableType type = DeltaLakeTableType.valueOf(typeString.toUpperCase(Locale.ENGLISH));
return type == DATA;
}
catch (IllegalArgumentException e) {
return false;
}
return typeString == null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,6 @@ protected DistributedQueryRunner createQueryRunner()
ImmutableMap.of("delta.enable-non-concurrent-writes", "true"));
}

@Test
public void testDataTable()
{
try {
assertUpdate("CREATE TABLE test_data_table (_bigint BIGINT)");
assertUpdate("INSERT INTO test_data_table VALUES 1, 2, 3", 3);

assertQuery("SELECT * FROM test_data_table", "VALUES 1, 2, 3");
assertQuery("SELECT * FROM \"test_data_table$data\"", "VALUES 1, 2, 3");
}
finally {
assertUpdate("DROP TABLE IF EXISTS test_data_table");
}
}

@Test
public void testHistoryTable()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.deltalake;

import org.assertj.core.api.Assertions;
import org.testng.annotations.Test;

import java.util.Optional;
Expand All @@ -28,15 +29,15 @@
public class TestDeltaLakeTableName
{
@Test
public void testFrom()
public void testParse()
{
assertFrom("abc", "abc", DATA);
assertFrom("abc$data", "abc", DATA);
assertFrom("abc$history", "abc", DeltaLakeTableType.HISTORY);
assertParseNameAndType("abc", "abc", DATA);
assertParseNameAndType("abc$history", "abc", DeltaLakeTableType.HISTORY);

assertNoValidTableType("abc$data");
assertInvalid("abc@123", "Invalid Delta Lake table name: abc@123");
assertInvalid("abc@xyz", "Invalid Delta Lake table name: abc@xyz");
assertInvalid("abc$what", "Invalid Delta Lake table name (unknown type 'what'): abc$what");
assertNoValidTableType("abc$what");
assertInvalid("abc@123$data@456", "Invalid Delta Lake table name: abc@123$data@456");
assertInvalid("xyz$data@456", "Invalid Delta Lake table name: xyz$data@456");
}
Expand All @@ -45,8 +46,8 @@ public void testFrom()
public void testIsDataTable()
{
assertTrue(DeltaLakeTableName.isDataTable("abc"));
assertTrue(DeltaLakeTableName.isDataTable("abc$data"));

assertFalse(DeltaLakeTableName.isDataTable("abc$data")); // it's invalid
assertFalse(DeltaLakeTableName.isDataTable("abc$history"));
assertFalse(DeltaLakeTableName.isDataTable("abc$invalid"));
}
Expand All @@ -64,30 +65,35 @@ public void testTableNameFrom()
public void testTableTypeFrom()
{
assertEquals(DeltaLakeTableName.tableTypeFrom("abc"), Optional.of(DATA));
assertEquals(DeltaLakeTableName.tableTypeFrom("abc$data"), Optional.of(DATA));
assertEquals(DeltaLakeTableName.tableTypeFrom("abc$data"), Optional.empty()); // it's invalid
assertEquals(DeltaLakeTableName.tableTypeFrom("abc$history"), Optional.of(HISTORY));

assertEquals(DeltaLakeTableName.tableTypeFrom("abc$invalid"), Optional.empty());
}

@Test
public void testGetTableNameWithType()
public void testTableNameWithType()
{
assertEquals(new DeltaLakeTableName("abc", DATA).getTableNameWithType(), "abc$data");
assertEquals(new DeltaLakeTableName("abc", HISTORY).getTableNameWithType(), "abc$history");
assertEquals(DeltaLakeTableName.tableNameWithType("abc", DATA), "abc$data");
assertEquals(DeltaLakeTableName.tableNameWithType("abc", HISTORY), "abc$history");
}

private static void assertNoValidTableType(String inputName)
{
Assertions.assertThat(DeltaLakeTableName.tableTypeFrom(inputName))
.isEmpty();
}

private static void assertInvalid(String inputName, String message)
{
assertTrinoExceptionThrownBy(() -> DeltaLakeTableName.from(inputName))
assertTrinoExceptionThrownBy(() -> DeltaLakeTableName.tableTypeFrom(inputName))
.hasErrorCode(NOT_SUPPORTED)
.hasMessage(message);
}

private static void assertFrom(String inputName, String tableName, DeltaLakeTableType tableType)
private static void assertParseNameAndType(String inputName, String tableName, DeltaLakeTableType tableType)
{
DeltaLakeTableName name = DeltaLakeTableName.from(inputName);
assertEquals(name.getTableName(), tableName);
assertEquals(name.getTableType(), tableType);
assertEquals(DeltaLakeTableName.tableNameFrom(inputName), tableName);
assertEquals(DeltaLakeTableName.tableTypeFrom(inputName), Optional.of(tableType));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -363,11 +363,10 @@ public IcebergTableHandle getTableHandle(
// Pretend the table does not exist to produce better error message in case of table redirects to Hive
return null;
}
IcebergTableName name = IcebergTableName.from(tableName.getTableName());

BaseTable table;
try {
table = (BaseTable) catalog.loadTable(session, new SchemaTableName(tableName.getSchemaName(), name.getTableName()));
table = (BaseTable) catalog.loadTable(session, new SchemaTableName(tableName.getSchemaName(), tableName.getTableName()));
}
catch (TableNotFoundException e) {
return null;
Expand All @@ -392,8 +391,8 @@ public IcebergTableHandle getTableHandle(
String nameMappingJson = tableProperties.get(TableProperties.DEFAULT_NAME_MAPPING);
return new IcebergTableHandle(
tableName.getSchemaName(),
name.getTableName(),
name.getTableType(),
tableName.getTableName(),
DATA,
tableSnapshotId,
SchemaParser.toJson(tableSchema),
table.sortOrder().fields().stream()
Expand Down Expand Up @@ -476,10 +475,9 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Schema
if (tableType.isEmpty()) {
return Optional.empty();
}
IcebergTableName icebergTableName = new IcebergTableName(name, tableType.get());
SchemaTableName systemTableName = new SchemaTableName(tableName.getSchemaName(), icebergTableName.getTableNameWithType());
return switch (icebergTableName.getTableType()) {
case DATA -> Optional.empty(); // Handled above.
SchemaTableName systemTableName = new SchemaTableName(tableName.getSchemaName(), IcebergTableName.tableNameWithType(name, tableType.get()));
return switch (tableType.get()) {
case DATA -> throw new VerifyException("Unexpected DATA table type"); // Handled above.
case HISTORY -> Optional.of(new HistoryTable(systemTableName, table));
case SNAPSHOTS -> Optional.of(new SnapshotsTable(systemTableName, typeManager, table));
case PARTITIONS -> Optional.of(new PartitionTable(systemTableName, typeManager, table, getCurrentSnapshotId(table)));
Expand Down
Loading