Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disallow querying iceberg tables in hive #10441

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ public void testHideDeltaLakeTables()
throw new SkipException("not supported");
}

@Override
public void testDisallowQueryingOfIcebergTables()
{
// Alluxio metastore does not support create operations
throw new SkipException("not supported");
}

@Override
public void testIllegalStorageFormatDuringTableScan()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@
import static io.trino.plugin.hive.util.HiveUtil.hiveColumnHandles;
import static io.trino.plugin.hive.util.HiveUtil.isDeltaLakeTable;
import static io.trino.plugin.hive.util.HiveUtil.isHiveSystemSchema;
import static io.trino.plugin.hive.util.HiveUtil.isIcebergTable;
import static io.trino.plugin.hive.util.HiveUtil.toPartitionValues;
import static io.trino.plugin.hive.util.HiveUtil.verifyPartitionTypeSupported;
import static io.trino.plugin.hive.util.HiveWriteUtils.checkTableIsWritable;
Expand Down Expand Up @@ -436,6 +437,9 @@ public HiveTableHandle getTableHandle(ConnectorSession session, SchemaTableName
if (isDeltaLakeTable(table)) {
throw new TrinoException(HIVE_UNSUPPORTED_FORMAT, format("Cannot query Delta Lake table '%s'", tableName));
}
if (isIcebergTable(table)) {
throw new TrinoException(HIVE_UNSUPPORTED_FORMAT, format("Cannot query Iceberg table '%s'", tableName));
}

// we must not allow system tables due to how permissions are checked in SystemTableAwareAccessControl
if (getSourceTableNameFromSystemTable(systemTableProviders, tableName).isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.hive;

import io.trino.plugin.hive.authentication.HiveIdentity;
import io.trino.plugin.hive.metastore.Table;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableMetadata;
Expand All @@ -22,6 +23,7 @@
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;

import javax.inject.Inject;

Expand All @@ -33,7 +35,15 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.Streams.stream;
import static io.trino.plugin.hive.HiveSessionProperties.getTimestampPrecision;
import static io.trino.plugin.hive.SystemTableHandler.PARTITIONS;
import static io.trino.plugin.hive.metastore.MetastoreUtil.getProtectMode;
import static io.trino.plugin.hive.metastore.MetastoreUtil.verifyOnline;
import static io.trino.plugin.hive.util.HiveBucketing.getHiveBucketHandle;
import static io.trino.plugin.hive.util.HiveUtil.getPartitionKeyColumnHandles;
import static io.trino.plugin.hive.util.HiveUtil.getRegularColumnHandles;
import static io.trino.plugin.hive.util.HiveUtil.isDeltaLakeTable;
import static io.trino.plugin.hive.util.HiveUtil.isIcebergTable;
import static io.trino.plugin.hive.util.SystemTables.createSystemTable;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;
Expand All @@ -43,11 +53,13 @@ public class PartitionsSystemTableProvider
implements SystemTableProvider
{
private final HivePartitionManager partitionManager;
private final TypeManager typeManager;

@Inject
public PartitionsSystemTableProvider(HivePartitionManager partitionManager)
public PartitionsSystemTableProvider(HivePartitionManager partitionManager, TypeManager typeManager)
{
this.partitionManager = requireNonNull(partitionManager, "partitionManager is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
}

@Override
Expand All @@ -68,11 +80,20 @@ public Optional<SystemTable> getSystemTable(HiveMetadata metadata, ConnectorSess
}

SchemaTableName sourceTableName = PARTITIONS.getSourceTableName(tableName);
HiveTableHandle sourceTableHandle = metadata.getTableHandle(session, sourceTableName);

if (sourceTableHandle == null) {
Table sourceTable = metadata.getMetastore()
.getTable(new HiveIdentity(session), sourceTableName.getSchemaName(), sourceTableName.getTableName())
.orElse(null);
if (sourceTable == null || isDeltaLakeTable(sourceTable) || isIcebergTable(sourceTable)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for follow-up, this returns empty when table not found, while PropertiesSystemTableProvider throws in such case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually PropertiesSystemTableProvider returns Optional.empty() as well.
This is covered by the previous test 3fb545e#diff-92c37834248d69878a49876bfa440c9f6afbee04b1363d402a04cf7fddc7bb62R2905-R2913

return Optional.empty();
}
verifyOnline(sourceTableName, Optional.empty(), getProtectMode(sourceTable), sourceTable.getParameters());
HiveTableHandle sourceTableHandle = new HiveTableHandle(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE that compared to HiveMetadata#getTableHandle I didn't include here anymore the check

        // we must not allow system tables due to how permissions are checked in SystemTableAwareAccessControl
        if (getSourceTableNameFromSystemTable(systemTableProviders, tableName).isPresent()) {
            throw new TrinoException(HIVE_INVALID_METADATA, "Unexpected table present in Hive metastore: " + tableName);
        }

sourceTableName.getSchemaName(),
sourceTableName.getTableName(),
sourceTable.getParameters(),
getPartitionKeyColumnHandles(sourceTable, typeManager),
getRegularColumnHandles(sourceTable, typeManager, getTimestampPrecision(session)),
getHiveBucketHandle(session, sourceTable, typeManager));

List<HiveColumnHandle> partitionColumns = sourceTableHandle.getPartitionColumns();
if (partitionColumns.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.google.common.collect.ImmutableSortedMap;
import io.trino.plugin.hive.authentication.HiveIdentity;
import io.trino.plugin.hive.metastore.Table;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableMetadata;
Expand All @@ -33,11 +32,10 @@
import java.util.Optional;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT;
import static io.trino.plugin.hive.SystemTableHandler.PROPERTIES;
import static io.trino.plugin.hive.util.HiveUtil.isDeltaLakeTable;
import static io.trino.plugin.hive.util.HiveUtil.isIcebergTable;
import static io.trino.plugin.hive.util.SystemTables.createSystemTable;
import static java.lang.String.format;

public class PropertiesSystemTableProvider
implements SystemTableProvider
Expand All @@ -64,8 +62,8 @@ public Optional<SystemTable> getSystemTable(HiveMetadata metadata, ConnectorSess
.getTable(new HiveIdentity(session), sourceTableName.getSchemaName(), sourceTableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(tableName));
findinpath marked this conversation as resolved.
Show resolved Hide resolved

if (isDeltaLakeTable(table)) {
throw new TrinoException(HIVE_UNSUPPORTED_FORMAT, format("Cannot query Delta Lake table '%s'", sourceTableName));
if (isDeltaLakeTable(table) || isIcebergTable(table)) {
return Optional.empty();
}
findinpath marked this conversation as resolved.
Show resolved Hide resolved
Map<String, String> sortedTableParameters = ImmutableSortedMap.copyOf(table.getParameters());
List<ColumnMetadata> columns = sortedTableParameters.keySet().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ public final class HiveUtil
public static final String SPARK_TABLE_PROVIDER_KEY = "spark.sql.sources.provider";
public static final String DELTA_LAKE_PROVIDER = "delta";

public static final String ICEBERG_TABLE_TYPE_NAME = "table_type";
public static final String ICEBERG_TABLE_TYPE_VALUE = "iceberg";

private static final DateTimeFormatter HIVE_DATE_PARSER = ISODateTimeFormat.date().withZoneUTC();
private static final DateTimeFormatter HIVE_TIMESTAMP_PARSER;
private static final Field COMPRESSION_CODECS_FIELD;
Expand All @@ -191,8 +194,6 @@ public final class HiveUtil
private static final String BIG_DECIMAL_POSTFIX = "BD";

private static final Splitter COLUMN_NAMES_SPLITTER = Splitter.on(',').trimResults().omitEmptyStrings();
private static final String ICEBERG_TABLE_TYPE_NAME = "table_type";
private static final String ICEBERG_TABLE_TYPE_VALUE = "iceberg";

static {
DateTimeParser[] timestampWithoutTimeZoneParser = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@
import static io.trino.plugin.hive.metastore.cache.CachingHiveMetastore.cachingHiveMetastore;
import static io.trino.plugin.hive.util.HiveBucketing.BucketingVersion.BUCKETING_V1;
import static io.trino.plugin.hive.util.HiveUtil.DELTA_LAKE_PROVIDER;
import static io.trino.plugin.hive.util.HiveUtil.ICEBERG_TABLE_TYPE_NAME;
import static io.trino.plugin.hive.util.HiveUtil.ICEBERG_TABLE_TYPE_VALUE;
import static io.trino.plugin.hive.util.HiveUtil.SPARK_TABLE_PROVIDER_KEY;
import static io.trino.plugin.hive.util.HiveUtil.columnExtraInfo;
import static io.trino.plugin.hive.util.HiveUtil.toPartitionValues;
Expand Down Expand Up @@ -839,7 +841,7 @@ protected final void setup(String databaseName, HiveConfig hiveConfig, HiveMetas
TupleDomain.all()));
},
ImmutableSet.of(
new PartitionsSystemTableProvider(partitionManager),
new PartitionsSystemTableProvider(partitionManager, TESTING_TYPE_MANAGER),
new PropertiesSystemTableProvider()),
(metastore) -> new NoneHiveMaterializedViewMetadata()
{
Expand Down Expand Up @@ -2899,22 +2901,14 @@ public void testHideDeltaLakeTables()
.hasMessage(format("Cannot query Delta Lake table '%s'", tableName));
}

// Verify the hidden `$properties` Delta Lake table handle can't be obtained within the hive connector
SchemaTableName propertiesTableName = new SchemaTableName(tableName.getSchemaName(), format("%s$properties", tableName.getTableName()));
// Verify the hidden `$properties` and `$partitions` Delta Lake table handle can't be obtained within the hive connector
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
metadata.beginQuery(session);
assertThatThrownBy(() -> metadata.getSystemTable(newSession(), propertiesTableName))
.hasMessage(format("Cannot query Delta Lake table '%s'", tableName));
}

// Verify the hidden `$partitions` Delta Lake table handle can't be obtained within the hive connector
SchemaTableName partitionsTableName = new SchemaTableName(tableName.getSchemaName(), format("%s$partitions", tableName.getTableName()));
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
metadata.beginQuery(session);
assertThatThrownBy(() -> metadata.getSystemTable(newSession(), partitionsTableName))
.hasMessage(format("Cannot query Delta Lake table '%s'", tableName));
SchemaTableName propertiesTableName = new SchemaTableName(tableName.getSchemaName(), format("%s$properties", tableName.getTableName()));
assertThat(metadata.getSystemTable(newSession(), propertiesTableName)).isEmpty();
SchemaTableName partitionsTableName = new SchemaTableName(tableName.getSchemaName(), format("%s$partitions", tableName.getTableName()));
assertThat(metadata.getSystemTable(newSession(), partitionsTableName)).isEmpty();
}

// Assert that table is hidden
Expand Down Expand Up @@ -2951,6 +2945,56 @@ public void testHideDeltaLakeTables()
}
}

@Test
public void testDisallowQueryingOfIcebergTables()
{
ConnectorSession session = newSession();
HiveIdentity identity = new HiveIdentity(session);
SchemaTableName tableName = temporaryTable("trino_iceberg_table");

Table.Builder table = Table.builder()
.setDatabaseName(tableName.getSchemaName())
.setTableName(tableName.getTableName())
.setOwner(Optional.of(session.getUser()))
.setTableType(MANAGED_TABLE.name())
.setPartitionColumns(List.of(new Column("a_partition_column", HIVE_INT, Optional.empty())))
.setDataColumns(List.of(new Column("a_column", HIVE_STRING, Optional.empty())))
.setParameter(ICEBERG_TABLE_TYPE_NAME, ICEBERG_TABLE_TYPE_VALUE);
table.getStorageBuilder()
.setStorageFormat(fromHiveStorageFormat(PARQUET))
.setLocation(getTableDefaultLocation(
metastoreClient.getDatabase(tableName.getSchemaName()).orElseThrow(),
new HdfsContext(session.getIdentity()),
hdfsEnvironment,
tableName.getSchemaName(),
tableName.getTableName()).toString());
metastoreClient.createTable(identity, table.build(), NO_PRIVILEGES);

try {
// Verify that the table was created as a Iceberg table can't be queried in hive
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
metadata.beginQuery(session);
assertThatThrownBy(() -> getTableHandle(metadata, tableName))
.hasMessage(format("Cannot query Iceberg table '%s'", tableName));
}

// Verify the hidden `$properties` and `$partitions` hive system tables table handle can't be obtained for the Iceberg tables
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
metadata.beginQuery(session);
SchemaTableName propertiesTableName = new SchemaTableName(tableName.getSchemaName(), format("%s$properties", tableName.getTableName()));
assertThat(metadata.getSystemTable(newSession(), propertiesTableName)).isEmpty();
SchemaTableName partitionsTableName = new SchemaTableName(tableName.getSchemaName(), format("%s$partitions", tableName.getTableName()));
assertThat(metadata.getSystemTable(newSession(), partitionsTableName)).isEmpty();
}
}
finally {
// Clean up
metastoreClient.dropTable(identity, tableName.getSchemaName(), tableName.getTableName(), true);
}
}

@Test
public void testUpdateBasicTableStatistics()
throws Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ protected void setup(String host, int port, String databaseName, boolean s3Selec
new NodeVersion("test_version"),
new NoneHiveRedirectionsProvider(),
ImmutableSet.of(
new PartitionsSystemTableProvider(hivePartitionManager),
new PartitionsSystemTableProvider(hivePartitionManager, TESTING_TYPE_MANAGER),
new PropertiesSystemTableProvider()),
new DefaultHiveMaterializedViewMetadataFactory(),
SqlStandardAccessControlMetadata::new,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,10 @@ public void testHideDeltaLakeTables()
{
throw new SkipException("not supported");
}

@Override
public void testDisallowQueryingOfIcebergTables()
{
throw new SkipException("not supported");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why disabled?

add assertThatThrownBy(super:: ...

Copy link
Contributor Author

@findinpath findinpath Jan 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While trying to run the test I receive

2022-01-11T03:30:02.708-0600 INFO Copying resource dir 'spark_bucketed_nation' to /var/folders/1q/y42hmc4s3yl38kp0t0q142_c0000gn/T/TestHiveInMemoryMetastore7495715074983273908


java.lang.IllegalArgumentException: Table directory does not exist

	at com.google.common.base.Preconditions.checkArgument(Preconditions.java:145)
	at io.trino.plugin.hive.metastore.thrift.InMemoryThriftMetastore.createTable(InMemoryThriftMetastore.java:190)
	at io.trino.plugin.hive.metastore.thrift.BridgingHiveMetastore.createTable(BridgingHiveMetastore.java:205)
	at io.trino.plugin.hive.AbstractTestHive.testDisallowQueryingOfIcebergTables(AbstractTestHive.java:2978)
	at io.trino.plugin.hive.TestHiveInMemoryMetastore.testDisallowQueryingOfIcebergTables(TestHiveInMemoryMetastore.java:68)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which directory?
This sounds like a test setup problem. Does it mean TestHiveInMemoryMetastore cannot run a test that uses createTable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the comment to contain the missing directory path.

Note that the similar test testHideDeltaLakeTables is also containing a not supported skip exception.

https://github.com/trinodb/trino/blob/master/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveInMemoryMetastore.java#L59-L63

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,9 @@ public void testDropTable()
String icebergTableName = "iceberg.default." + tableName;

createIcebergTable(icebergTableName, false);
onTrino().executeQuery("DROP TABLE " + hiveTableName);
assertQueryFailure(() -> onTrino().executeQuery("TABLE " + icebergTableName))
.hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): line 1:1: Table '" + icebergTableName + "' does not exist");
//TODO restore test assertions after adding redirection awareness to the DropTableTask
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have an issue to link to?

(nit: add space after //)

assertQueryFailure(() -> onTrino().executeQuery("DROP TABLE " + hiveTableName))
.hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Cannot query Iceberg table 'default." + tableName + "'");
}

@Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS})
Expand Down Expand Up @@ -305,18 +305,11 @@ public void testAlterTableRename()

createIcebergTable(icebergTableName, false);

onTrino().executeQuery("ALTER TABLE " + hiveTableName + " RENAME TO " + tableName + "_new");
//TODO restore test assertions after adding redirection awareness to the RenameTableTask
assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE " + hiveTableName + " RENAME TO " + tableName + "_new"))
.hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Cannot query Iceberg table 'default." + tableName + "'");

assertQueryFailure(() -> onTrino().executeQuery("TABLE " + hiveTableName))
.hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): line 1:1: Table '" + hiveTableName + "' does not exist");
assertQueryFailure(() -> onTrino().executeQuery("TABLE " + icebergTableName))
.hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): line 1:1: Table '" + icebergTableName + "' does not exist");

assertResultsEqual(
onTrino().executeQuery("TABLE " + icebergTableName + "_new"),
onTrino().executeQuery("TABLE " + hiveTableName + "_new"));

onTrino().executeQuery("DROP TABLE " + icebergTableName + "_new");
onTrino().executeQuery("DROP TABLE " + icebergTableName);
}

@Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS})
Expand All @@ -328,15 +321,9 @@ public void testAlterTableAddColumn()

createIcebergTable(icebergTableName, false);

onTrino().executeQuery("ALTER TABLE " + hiveTableName + " ADD COLUMN some_new_column double");

// TODO: ALTER TABLE succeeded, but new column was not added
Assertions.assertThat(onTrino().executeQuery("DESCRIBE " + icebergTableName).column(1))
.containsOnly("nationkey", "name", "regionkey", "comment");

assertResultsEqual(
onTrino().executeQuery("TABLE " + icebergTableName),
onTrino().executeQuery("SELECT * /*, NULL*/ FROM tpch.tiny.nation"));
//TODO restore test assertions after adding redirection awareness to the AddColumnTask
assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE " + hiveTableName + " ADD COLUMN some_new_column double"))
.hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Cannot query Iceberg table 'default." + tableName + "'");

onTrino().executeQuery("DROP TABLE " + icebergTableName);
}
Expand All @@ -353,11 +340,9 @@ public void testCommentTable()
assertTableComment("hive", "default", tableName).isNull();
assertTableComment("iceberg", "default", tableName).isNull();

onTrino().executeQuery("COMMENT ON TABLE " + hiveTableName + " IS 'This is my table, there are many like it but this one is mine'");

// TODO: COMMENT ON TABLE succeeded, but comment was not preserved
assertTableComment("hive", "default", tableName).isNull();
assertTableComment("iceberg", "default", tableName).isNull();
//TODO restore test assertions after adding redirection awareness to the CommentTask
assertQueryFailure(() -> onTrino().executeQuery("COMMENT ON TABLE " + hiveTableName + " IS 'This is my table, there are many like it but this one is mine'"))
.hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Cannot query Iceberg table 'default." + tableName + "'");

onTrino().executeQuery("DROP TABLE " + icebergTableName);
}
Expand Down
Loading