Skip to content

Commit

Permalink
Disallow querying Iceberg tables
Browse files Browse the repository at this point in the history
Hive connector cannot read from Iceberg tables
reason why querying such tables shouldn't be
permitted within the hive connector.

In case of trying to query an Iceberg table from the
hive connector (without iceberg redirection enabled)
the user will receive a hive unsupported format exception.

It is no longer possible to create a view in hive
which selects from Iceberg.

In case of trying to query on the hive connector
the special hive tables:

- $properties
- $partitions
on an Iceberg table, the user will receive a table
not found exception.
  • Loading branch information
findinpath authored and findepi committed Jan 19, 2022
1 parent e239035 commit 12a6b5f
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ public void testHideDeltaLakeTables()
throw new SkipException("not supported");
}

@Override
public void testDisallowQueryingOfIcebergTables()
{
// Alluxio metastore does not support create operations
throw new SkipException("not supported");
}

@Override
public void testIllegalStorageFormatDuringTableScan()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@
import static io.trino.plugin.hive.util.HiveUtil.hiveColumnHandles;
import static io.trino.plugin.hive.util.HiveUtil.isDeltaLakeTable;
import static io.trino.plugin.hive.util.HiveUtil.isHiveSystemSchema;
import static io.trino.plugin.hive.util.HiveUtil.isIcebergTable;
import static io.trino.plugin.hive.util.HiveUtil.toPartitionValues;
import static io.trino.plugin.hive.util.HiveUtil.verifyPartitionTypeSupported;
import static io.trino.plugin.hive.util.HiveWriteUtils.checkTableIsWritable;
Expand Down Expand Up @@ -436,6 +437,9 @@ public HiveTableHandle getTableHandle(ConnectorSession session, SchemaTableName
if (isDeltaLakeTable(table)) {
throw new TrinoException(HIVE_UNSUPPORTED_FORMAT, format("Cannot query Delta Lake table '%s'", tableName));
}
if (isIcebergTable(table)) {
throw new TrinoException(HIVE_UNSUPPORTED_FORMAT, format("Cannot query Iceberg table '%s'", tableName));
}

// we must not allow system tables due to how permissions are checked in SystemTableAwareAccessControl
if (getSourceTableNameFromSystemTable(systemTableProviders, tableName).isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import static io.trino.plugin.hive.util.HiveUtil.getPartitionKeyColumnHandles;
import static io.trino.plugin.hive.util.HiveUtil.getRegularColumnHandles;
import static io.trino.plugin.hive.util.HiveUtil.isDeltaLakeTable;
import static io.trino.plugin.hive.util.HiveUtil.isIcebergTable;
import static io.trino.plugin.hive.util.SystemTables.createSystemTable;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;
Expand Down Expand Up @@ -82,7 +83,7 @@ public Optional<SystemTable> getSystemTable(HiveMetadata metadata, ConnectorSess
Table sourceTable = metadata.getMetastore()
.getTable(new HiveIdentity(session), sourceTableName.getSchemaName(), sourceTableName.getTableName())
.orElse(null);
if (sourceTable == null || isDeltaLakeTable(sourceTable)) {
if (sourceTable == null || isDeltaLakeTable(sourceTable) || isIcebergTable(sourceTable)) {
return Optional.empty();
}
verifyOnline(sourceTableName, Optional.empty(), getProtectMode(sourceTable), sourceTable.getParameters());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.hive.SystemTableHandler.PROPERTIES;
import static io.trino.plugin.hive.util.HiveUtil.isDeltaLakeTable;
import static io.trino.plugin.hive.util.HiveUtil.isIcebergTable;
import static io.trino.plugin.hive.util.SystemTables.createSystemTable;

public class PropertiesSystemTableProvider
Expand Down Expand Up @@ -61,7 +62,7 @@ public Optional<SystemTable> getSystemTable(HiveMetadata metadata, ConnectorSess
.getTable(new HiveIdentity(session), sourceTableName.getSchemaName(), sourceTableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(tableName));

if (isDeltaLakeTable(table)) {
if (isDeltaLakeTable(table) || isIcebergTable(table)) {
return Optional.empty();
}
Map<String, String> sortedTableParameters = ImmutableSortedMap.copyOf(table.getParameters());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ public final class HiveUtil
public static final String SPARK_TABLE_PROVIDER_KEY = "spark.sql.sources.provider";
public static final String DELTA_LAKE_PROVIDER = "delta";

public static final String ICEBERG_TABLE_TYPE_NAME = "table_type";
public static final String ICEBERG_TABLE_TYPE_VALUE = "iceberg";

private static final DateTimeFormatter HIVE_DATE_PARSER = ISODateTimeFormat.date().withZoneUTC();
private static final DateTimeFormatter HIVE_TIMESTAMP_PARSER;
private static final Field COMPRESSION_CODECS_FIELD;
Expand All @@ -191,8 +194,6 @@ public final class HiveUtil
private static final String BIG_DECIMAL_POSTFIX = "BD";

private static final Splitter COLUMN_NAMES_SPLITTER = Splitter.on(',').trimResults().omitEmptyStrings();
private static final String ICEBERG_TABLE_TYPE_NAME = "table_type";
private static final String ICEBERG_TABLE_TYPE_VALUE = "iceberg";

static {
DateTimeParser[] timestampWithoutTimeZoneParser = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@
import static io.trino.plugin.hive.metastore.cache.CachingHiveMetastore.cachingHiveMetastore;
import static io.trino.plugin.hive.util.HiveBucketing.BucketingVersion.BUCKETING_V1;
import static io.trino.plugin.hive.util.HiveUtil.DELTA_LAKE_PROVIDER;
import static io.trino.plugin.hive.util.HiveUtil.ICEBERG_TABLE_TYPE_NAME;
import static io.trino.plugin.hive.util.HiveUtil.ICEBERG_TABLE_TYPE_VALUE;
import static io.trino.plugin.hive.util.HiveUtil.SPARK_TABLE_PROVIDER_KEY;
import static io.trino.plugin.hive.util.HiveUtil.columnExtraInfo;
import static io.trino.plugin.hive.util.HiveUtil.toPartitionValues;
Expand Down Expand Up @@ -2943,6 +2945,56 @@ public void testHideDeltaLakeTables()
}
}

@Test
public void testDisallowQueryingOfIcebergTables()
{
ConnectorSession session = newSession();
HiveIdentity identity = new HiveIdentity(session);
SchemaTableName tableName = temporaryTable("trino_iceberg_table");

Table.Builder table = Table.builder()
.setDatabaseName(tableName.getSchemaName())
.setTableName(tableName.getTableName())
.setOwner(Optional.of(session.getUser()))
.setTableType(MANAGED_TABLE.name())
.setPartitionColumns(List.of(new Column("a_partition_column", HIVE_INT, Optional.empty())))
.setDataColumns(List.of(new Column("a_column", HIVE_STRING, Optional.empty())))
.setParameter(ICEBERG_TABLE_TYPE_NAME, ICEBERG_TABLE_TYPE_VALUE);
table.getStorageBuilder()
.setStorageFormat(fromHiveStorageFormat(PARQUET))
.setLocation(getTableDefaultLocation(
metastoreClient.getDatabase(tableName.getSchemaName()).orElseThrow(),
new HdfsContext(session.getIdentity()),
hdfsEnvironment,
tableName.getSchemaName(),
tableName.getTableName()).toString());
metastoreClient.createTable(identity, table.build(), NO_PRIVILEGES);

try {
// Verify that the table was created as a Iceberg table can't be queried in hive
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
metadata.beginQuery(session);
assertThatThrownBy(() -> getTableHandle(metadata, tableName))
.hasMessage(format("Cannot query Iceberg table '%s'", tableName));
}

// Verify the hidden `$properties` and `$partitions` hive system tables table handle can't be obtained for the Iceberg tables
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
metadata.beginQuery(session);
SchemaTableName propertiesTableName = new SchemaTableName(tableName.getSchemaName(), format("%s$properties", tableName.getTableName()));
assertThat(metadata.getSystemTable(newSession(), propertiesTableName)).isEmpty();
SchemaTableName partitionsTableName = new SchemaTableName(tableName.getSchemaName(), format("%s$partitions", tableName.getTableName()));
assertThat(metadata.getSystemTable(newSession(), partitionsTableName)).isEmpty();
}
}
finally {
// Clean up
metastoreClient.dropTable(identity, tableName.getSchemaName(), tableName.getTableName(), true);
}
}

@Test
public void testUpdateBasicTableStatistics()
throws Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,10 @@ public void testHideDeltaLakeTables()
{
throw new SkipException("not supported");
}

@Override
public void testDisallowQueryingOfIcebergTables()
{
throw new SkipException("not supported");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,9 @@ public void testDropTable()
String icebergTableName = "iceberg.default." + tableName;

createIcebergTable(icebergTableName, false);
onTrino().executeQuery("DROP TABLE " + hiveTableName);
assertQueryFailure(() -> onTrino().executeQuery("TABLE " + icebergTableName))
.hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): line 1:1: Table '" + icebergTableName + "' does not exist");
//TODO restore test assertions after adding redirection awareness to the DropTableTask
assertQueryFailure(() -> onTrino().executeQuery("DROP TABLE " + hiveTableName))
.hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Cannot query Iceberg table 'default." + tableName + "'");
}

@Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS})
Expand Down Expand Up @@ -305,18 +305,11 @@ public void testAlterTableRename()

createIcebergTable(icebergTableName, false);

onTrino().executeQuery("ALTER TABLE " + hiveTableName + " RENAME TO " + tableName + "_new");
//TODO restore test assertions after adding redirection awareness to the RenameTableTask
assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE " + hiveTableName + " RENAME TO " + tableName + "_new"))
.hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Cannot query Iceberg table 'default." + tableName + "'");

assertQueryFailure(() -> onTrino().executeQuery("TABLE " + hiveTableName))
.hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): line 1:1: Table '" + hiveTableName + "' does not exist");
assertQueryFailure(() -> onTrino().executeQuery("TABLE " + icebergTableName))
.hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): line 1:1: Table '" + icebergTableName + "' does not exist");

assertResultsEqual(
onTrino().executeQuery("TABLE " + icebergTableName + "_new"),
onTrino().executeQuery("TABLE " + hiveTableName + "_new"));

onTrino().executeQuery("DROP TABLE " + icebergTableName + "_new");
onTrino().executeQuery("DROP TABLE " + icebergTableName);
}

@Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS})
Expand All @@ -328,15 +321,9 @@ public void testAlterTableAddColumn()

createIcebergTable(icebergTableName, false);

onTrino().executeQuery("ALTER TABLE " + hiveTableName + " ADD COLUMN some_new_column double");

// TODO: ALTER TABLE succeeded, but new column was not added
Assertions.assertThat(onTrino().executeQuery("DESCRIBE " + icebergTableName).column(1))
.containsOnly("nationkey", "name", "regionkey", "comment");

assertResultsEqual(
onTrino().executeQuery("TABLE " + icebergTableName),
onTrino().executeQuery("SELECT * /*, NULL*/ FROM tpch.tiny.nation"));
//TODO restore test assertions after adding redirection awareness to the AddColumnTask
assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE " + hiveTableName + " ADD COLUMN some_new_column double"))
.hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Cannot query Iceberg table 'default." + tableName + "'");

onTrino().executeQuery("DROP TABLE " + icebergTableName);
}
Expand All @@ -353,11 +340,9 @@ public void testCommentTable()
assertTableComment("hive", "default", tableName).isNull();
assertTableComment("iceberg", "default", tableName).isNull();

onTrino().executeQuery("COMMENT ON TABLE " + hiveTableName + " IS 'This is my table, there are many like it but this one is mine'");

// TODO: COMMENT ON TABLE succeeded, but comment was not preserved
assertTableComment("hive", "default", tableName).isNull();
assertTableComment("iceberg", "default", tableName).isNull();
//TODO restore test assertions after adding redirection awareness to the CommentTask
assertQueryFailure(() -> onTrino().executeQuery("COMMENT ON TABLE " + hiveTableName + " IS 'This is my table, there are many like it but this one is mine'"))
.hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Cannot query Iceberg table 'default." + tableName + "'");

onTrino().executeQuery("DROP TABLE " + icebergTableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
import org.testng.annotations.Test;

import static io.trino.tempto.assertions.QueryAssert.assertQueryFailure;
import static io.trino.tempto.assertions.QueryAssert.assertThat;
import static io.trino.tests.product.TestGroups.HMS_ONLY;
import static io.trino.tests.product.TestGroups.ICEBERG;
import static io.trino.tests.product.TestGroups.STORAGE_FORMATS;
import static io.trino.tests.product.hive.util.TemporaryHiveTable.randomTableSuffix;
import static io.trino.tests.product.utils.QueryExecutors.onTrino;
import static java.lang.String.format;

public class TestIcebergHiveTablesCompatibility
extends ProductTest
Expand All @@ -49,15 +49,13 @@ public void testHiveSelectFromIcebergTable()
onTrino().executeQuery("CREATE TABLE iceberg.default." + tableName + "(a bigint)");

assertQueryFailure(() -> onTrino().executeQuery("SELECT * FROM hive.default." + tableName))
// TODO (https://github.com/trinodb/trino/issues/8693) throw specific exception message
.hasMessageMatching("Query failed \\(#\\w+\\):\\Q Unable to create input format org.apache.hadoop.mapred.FileInputFormat");
.hasMessageMatching(format("Query failed \\(#\\w+\\):\\Q Cannot query Iceberg table 'default.%s'", tableName));

assertQueryFailure(() -> onTrino().executeQuery("SELECT * FROM hive.default.\"" + tableName + "$partitions\""))
.hasMessageMatching("Query failed \\(#\\w+\\):\\Q line 1:15: Table 'hive.default." + tableName + "$partitions' does not exist");

// TODO (https://github.com/trinodb/trino/issues/8693) should fail
assertThat(onTrino().executeQuery("SELECT * FROM hive.default.\"" + tableName + "$properties\""))
.hasRowsCount(1);
assertQueryFailure(() -> onTrino().executeQuery("SELECT * FROM hive.default.\"" + tableName + "$properties\""))
.hasMessageMatching("Query failed \\(#\\w+\\):\\Q line 1:15: Table 'hive.default." + tableName + "$properties' does not exist");

onTrino().executeQuery("DROP TABLE iceberg.default." + tableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;

import static io.trino.tempto.assertions.QueryAssert.Row.row;
import static io.trino.tempto.assertions.QueryAssert.assertQueryFailure;
import static io.trino.tempto.assertions.QueryAssert.assertThat;
import static io.trino.tests.product.TestGroups.HMS_ONLY;
import static io.trino.tests.product.TestGroups.ICEBERG;
Expand Down Expand Up @@ -53,8 +54,8 @@ public void testIcebergHiveViewsCompatibility()
onTrino().executeQuery("CREATE VIEW hive.default.hive_view_qualified_hive AS SELECT * FROM hive.default.hive_table");
onTrino().executeQuery("CREATE VIEW hive.default.hive_view_unqualified_hive AS SELECT * FROM hive_table");
onTrino().executeQuery("CREATE VIEW hive.default.hive_view_qualified_iceberg AS SELECT * FROM iceberg.default.iceberg_table");
// this should probably fail but it does not now; testing current behavior as a documentation
onTrino().executeQuery("CREATE VIEW hive.default.hive_view_unqualified_iceberg AS SELECT * FROM iceberg_table");
assertQueryFailure(() -> onTrino().executeQuery("CREATE VIEW hive.default.hive_view_unqualified_iceberg AS SELECT * FROM iceberg_table"))
.hasMessageMatching("Query failed \\(#\\w+\\):\\Q Cannot query Iceberg table 'default.iceberg_table'");

onTrino().executeQuery("USE iceberg.default"); // for sake of unqualified table references
onTrino().executeQuery("CREATE VIEW iceberg.default.iceberg_view_qualified_hive AS SELECT * FROM hive.default.hive_table");
Expand All @@ -75,7 +76,6 @@ public void testIcebergHiveViewsCompatibility()
.add(row("hive_view_qualified_hive"))
.add(row("hive_view_unqualified_hive"))
.add(row("hive_view_qualified_iceberg"))
.add(row("hive_view_unqualified_iceberg"))
.add(row("iceberg_view_qualified_hive"))
.add(row("iceberg_view_qualified_iceberg"))
.add(row("iceberg_view_unqualified_iceberg"))
Expand All @@ -88,7 +88,6 @@ public void testIcebergHiveViewsCompatibility()
.add(row("hive_view_qualified_hive"))
.add(row("hive_view_unqualified_hive"))
.add(row("hive_view_qualified_iceberg"))
.add(row("hive_view_unqualified_iceberg"))
.add(row("iceberg_view_qualified_hive"))
.add(row("iceberg_view_qualified_iceberg"))
.add(row("iceberg_view_unqualified_iceberg"))
Expand All @@ -98,10 +97,6 @@ public void testIcebergHiveViewsCompatibility()
assertThat(onTrino().executeQuery("SELECT * FROM hive.default.hive_view_qualified_hive")).containsOnly(row(1));
assertThat(onTrino().executeQuery("SELECT * FROM hive.default.hive_view_unqualified_hive")).containsOnly(row(1));
assertThat(onTrino().executeQuery("SELECT * FROM hive.default.hive_view_qualified_iceberg")).containsOnly(row(2));
assertThatThrownBy(() -> onTrino().executeQuery("SELECT * FROM hive.default.hive_view_unqualified_iceberg"))
// hive connector tries to read from iceberg table
// TODO: make query fail with nicer message
.hasMessageContaining("Unable to create input format org.apache.hadoop.mapred.FileInputFormat");
assertThat(onTrino().executeQuery("SELECT * FROM hive.default.iceberg_view_qualified_hive")).containsOnly(row(1));
assertThat(onTrino().executeQuery("SELECT * FROM hive.default.iceberg_view_qualified_iceberg")).containsOnly(row(2));
assertThat(onTrino().executeQuery("SELECT * FROM hive.default.iceberg_view_unqualified_iceberg")).containsOnly(row(2));
Expand All @@ -110,10 +105,6 @@ public void testIcebergHiveViewsCompatibility()
assertThat(onTrino().executeQuery("SELECT * FROM iceberg.default.hive_view_qualified_hive")).containsOnly(row(1));
assertThat(onTrino().executeQuery("SELECT * FROM iceberg.default.hive_view_unqualified_hive")).containsOnly(row(1));
assertThat(onTrino().executeQuery("SELECT * FROM iceberg.default.hive_view_qualified_iceberg")).containsOnly(row(2));
assertThatThrownBy(() -> onTrino().executeQuery("SELECT * FROM iceberg.default.hive_view_unqualified_iceberg"))
// hive connector tries to read from iceberg table
// TODO: make query fail with nicer message
.hasMessageContaining("Unable to create input format org.apache.hadoop.mapred.FileInputFormat");
assertThat(onTrino().executeQuery("SELECT * FROM iceberg.default.iceberg_view_qualified_hive")).containsOnly(row(1));
assertThat(onTrino().executeQuery("SELECT * FROM iceberg.default.iceberg_view_qualified_iceberg")).containsOnly(row(2));
assertThat(onTrino().executeQuery("SELECT * FROM iceberg.default.iceberg_view_unqualified_iceberg")).containsOnly(row(2));
Expand Down

0 comments on commit 12a6b5f

Please sign in to comment.