From e2decbb9571eb8324b46153774b5e912bc09ce61 Mon Sep 17 00:00:00 2001 From: mmalhotra Date: Wed, 10 Apr 2024 17:09:50 -0700 Subject: [PATCH] Add support for the custom hive catalog --- .../main/sphinx/object-storage/metastores.md | 7 + .../DefaultThriftMetastoreClientFactory.java | 9 +- .../HttpThriftMetastoreClientFactory.java | 1 + .../thrift/ThriftHiveMetastoreClient.java | 122 ++++++++++--- .../thrift/ThriftMetastoreConfig.java | 15 ++ ...stHiveCustomCatalogConnectorSmokeTest.java | 170 ++++++++++++++++++ .../metastore/TestHiveMetastoreCatalogs.java | 165 +++++++++++++++++ .../thrift/TestThriftHiveMetastoreClient.java | 2 + .../thrift/TestThriftMetastoreConfig.java | 7 +- ...stingTokenAwareMetastoreClientFactory.java | 2 +- .../hive/TestHiveMetastoreClientFactory.java | 3 +- 11 files changed, 468 insertions(+), 35 deletions(-) create mode 100644 plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveCustomCatalogConnectorSmokeTest.java create mode 100644 plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/TestHiveMetastoreCatalogs.java diff --git a/docs/src/main/sphinx/object-storage/metastores.md b/docs/src/main/sphinx/object-storage/metastores.md index ef8fac287ecf..b0420c04e8da 100644 --- a/docs/src/main/sphinx/object-storage/metastores.md +++ b/docs/src/main/sphinx/object-storage/metastores.md @@ -182,6 +182,13 @@ properties: * - `hive.metastore.thrift.txn-lock-max-wait` - Maximum time to wait to acquire hive transaction lock. - `10m` +* - `hive.metastore.thrift.catalog-name` + - The term "Hive metastore catalog name" refers to the abstraction concept + within Hive, enabling various systems to connect to distinct, independent + catalogs stored in the metastore. By default, the catalog name in Hive + metastore is set to "hive." When this configuration property is left empty, + the default catalog of the Hive metastore will be accessed. + - ::: Use the following configuration properties for HTTP client transport mode, so diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/DefaultThriftMetastoreClientFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/DefaultThriftMetastoreClientFactory.java index 32a96887dfb6..ec0d33783f82 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/DefaultThriftMetastoreClientFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/DefaultThriftMetastoreClientFactory.java @@ -45,6 +45,7 @@ public class DefaultThriftMetastoreClientFactory private final int readTimeoutMillis; private final HiveMetastoreAuthentication metastoreAuthentication; private final String hostname; + private final Optional catalogName; private final MetastoreSupportsDateStatistics metastoreSupportsDateStatistics = new MetastoreSupportsDateStatistics(); private final AtomicInteger chosenGetTableAlternative = new AtomicInteger(Integer.MAX_VALUE); @@ -57,7 +58,8 @@ public DefaultThriftMetastoreClientFactory( Duration connectTimeout, Duration readTimeout, HiveMetastoreAuthentication metastoreAuthentication, - String hostname) + String hostname, + Optional catalogName) { this.sslContext = requireNonNull(sslContext, "sslContext is null"); this.socksProxy = requireNonNull(socksProxy, "socksProxy is null"); @@ -65,6 +67,7 @@ public DefaultThriftMetastoreClientFactory( this.readTimeoutMillis = toIntExact(readTimeout.toMillis()); this.metastoreAuthentication = requireNonNull(metastoreAuthentication, "metastoreAuthentication is null"); this.hostname = requireNonNull(hostname, "hostname is null"); + this.catalogName = requireNonNull(catalogName, "catalogName is null"); } @Inject @@ -84,7 +87,8 @@ public DefaultThriftMetastoreClientFactory( config.getConnectTimeout(), config.getReadTimeout(), metastoreAuthentication, - nodeManager.getCurrentNode().getHost()); + nodeManager.getCurrentNode().getHost(), + config.getCatalogName()); } @Override @@ -107,6 +111,7 @@ protected ThriftMetastoreClient create(TransportSupplier transportSupplier, Stri return new ThriftHiveMetastoreClient( transportSupplier, hostname, + catalogName, metastoreSupportsDateStatistics, true, chosenGetTableAlternative, diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/HttpThriftMetastoreClientFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/HttpThriftMetastoreClientFactory.java index 6dbc5f3b3d83..52b0df41ca1b 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/HttpThriftMetastoreClientFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/HttpThriftMetastoreClientFactory.java @@ -81,6 +81,7 @@ public ThriftMetastoreClient create(URI uri, Optional delegationToken) return new ThriftHiveMetastoreClient( () -> createHttpTransport(uri), hostname, + Optional.empty(), new MetastoreSupportsDateStatistics(), false, chosenGetTableAlternative, diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftHiveMetastoreClient.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftHiveMetastoreClient.java index 843d368c0d2a..32217ba4b7b4 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftHiveMetastoreClient.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftHiveMetastoreClient.java @@ -65,6 +65,7 @@ import io.trino.plugin.hive.acid.AcidOperation; import io.trino.plugin.hive.metastore.thrift.MetastoreSupportsDateStatistics.DateStatisticsSupport; import io.trino.spi.connector.RelationType; +import jakarta.annotation.Nullable; import org.apache.thrift.TApplicationException; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; @@ -76,6 +77,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; @@ -103,6 +105,9 @@ public class ThriftHiveMetastoreClient implements ThriftMetastoreClient { private static final Logger log = Logger.get(ThriftHiveMetastoreClient.class); + private static final char CATALOG_DB_THRIFT_NAME_MARKER = '@'; + private static final String CATALOG_DB_SEPARATOR = "#"; + private static final String DB_EMPTY_MARKER = "!"; private final TransportSupplier transportSupplier; private TTransport transport; @@ -114,10 +119,12 @@ public class ThriftHiveMetastoreClient private final AtomicInteger chosenGetTableAlternative; private final AtomicInteger chosenAlterTransactionalTableAlternative; private final AtomicInteger chosenAlterPartitionsAlternative; + private final Optional catalogName; public ThriftHiveMetastoreClient( TransportSupplier transportSupplier, String hostname, + Optional catalogName, MetastoreSupportsDateStatistics metastoreSupportsDateStatistics, boolean metastoreSupportsTableMeta, AtomicInteger chosenGetTableAlternative, @@ -132,6 +139,7 @@ public ThriftHiveMetastoreClient( this.chosenGetTableAlternative = requireNonNull(chosenGetTableAlternative, "chosenGetTableAlternative is null"); this.chosenAlterTransactionalTableAlternative = requireNonNull(chosenAlterTransactionalTableAlternative, "chosenAlterTransactionalTableAlternative is null"); this.chosenAlterPartitionsAlternative = requireNonNull(chosenAlterPartitionsAlternative, "chosenAlterPartitionsAlternative is null"); + this.catalogName = requireNonNull(catalogName, "catalogName is null"); connect(); } @@ -162,6 +170,9 @@ private void disconnect() public List getAllDatabases() throws TException { + if (catalogName.isPresent()) { + return client.getDatabases(prependCatalogToDbName(catalogName, null)); + } return client.getAllDatabases(); } @@ -169,7 +180,7 @@ public List getAllDatabases() public Database getDatabase(String dbName) throws TException { - return client.getDatabase(dbName); + return client.getDatabase(prependCatalogToDbName(catalogName, dbName)); } @Override @@ -178,9 +189,10 @@ public List getTableMeta(String databaseName) { // TODO: remove this once Unity adds support for getTableMeta if (!metastoreSupportsTableMeta) { + String catalogDatabaseName = prependCatalogToDbName(catalogName, databaseName); Map tables = new HashMap<>(); - client.getTables(databaseName, ".*").forEach(name -> tables.put(name, new TableMeta(databaseName, name, RelationType.TABLE.toString()))); - client.getTablesByType(databaseName, ".*", VIRTUAL_VIEW.name()).forEach(name -> { + client.getTables(catalogDatabaseName, ".*").forEach(name -> tables.put(name, new TableMeta(databaseName, name, RelationType.TABLE.toString()))); + client.getTablesByType(catalogDatabaseName, ".*", VIRTUAL_VIEW.name()).forEach(name -> { TableMeta tableMeta = new TableMeta(databaseName, name, VIRTUAL_VIEW.name()); // This makes all views look like a Trino view, so that they are not filtered out during SHOW VIEWS tableMeta.setComments(PRESTO_VIEW_COMMENT); @@ -191,53 +203,57 @@ public List getTableMeta(String databaseName) if (databaseName.indexOf('*') >= 0 || databaseName.indexOf('|') >= 0) { // in this case we replace any pipes with a glob and then filter the output - return client.getTableMeta(databaseName.replace('|', '*'), "*", ImmutableList.of()).stream() + return client.getTableMeta(prependCatalogToDbName(catalogName, databaseName.replace('|', '*')), "*", ImmutableList.of()).stream() .filter(tableMeta -> tableMeta.getDbName().equals(databaseName)) .collect(toImmutableList()); } - return client.getTableMeta(databaseName, "*", ImmutableList.of()); + return client.getTableMeta(prependCatalogToDbName(catalogName, databaseName), "*", ImmutableList.of()); } @Override public void createDatabase(Database database) throws TException { - client.createDatabase(database); + client.createDatabase(catalogName.isEmpty() + ? database + : database.deepCopy().setCatalogName(catalogName.orElseThrow())); } @Override public void dropDatabase(String databaseName, boolean deleteData, boolean cascade) throws TException { - client.dropDatabase(databaseName, deleteData, cascade); + client.dropDatabase(prependCatalogToDbName(catalogName, databaseName), deleteData, cascade); } @Override public void alterDatabase(String databaseName, Database database) throws TException { - client.alterDatabase(databaseName, database); + client.alterDatabase(prependCatalogToDbName(catalogName, databaseName), database); } @Override public void createTable(Table table) throws TException { - client.createTable(table); + client.createTable(catalogName.isEmpty() + ? table + : table.deepCopy().setCatName(catalogName.orElseThrow())); } @Override public void dropTable(String databaseName, String name, boolean deleteData) throws TException { - client.dropTable(databaseName, name, deleteData); + client.dropTable(prependCatalogToDbName(catalogName, databaseName), name, deleteData); } @Override public void alterTableWithEnvironmentContext(String databaseName, String tableName, Table newTable, EnvironmentContext context) throws TException { - client.alterTableWithEnvironmentContext(databaseName, tableName, newTable, context); + client.alterTableWithEnvironmentContext(prependCatalogToDbName(catalogName, databaseName), tableName, newTable, context); } @Override @@ -249,17 +265,18 @@ public Table getTable(String databaseName, String tableName) chosenGetTableAlternative, () -> { GetTableRequest request = new GetTableRequest(databaseName, tableName); + catalogName.ifPresent(request::setCatName); request.setCapabilities(new ClientCapabilities(ImmutableList.of(ClientCapability.INSERT_ONLY_TABLES))); return client.getTableReq(request).getTable(); }, - () -> client.getTable(databaseName, tableName)); + () -> client.getTable(prependCatalogToDbName(catalogName, databaseName), tableName)); } @Override public List getFields(String databaseName, String tableName) throws TException { - return client.getFields(databaseName, tableName); + return client.getFields(prependCatalogToDbName(catalogName, databaseName), tableName); } @Override @@ -267,6 +284,7 @@ public List getTableColumnStatistics(String databaseName, S throws TException { TableStatsRequest tableStatsRequest = new TableStatsRequest(databaseName, tableName, columnNames); + catalogName.ifPresent(tableStatsRequest::setCatName); return client.getTableStatisticsReq(tableStatsRequest).getTableStats(); } @@ -279,6 +297,7 @@ public void setTableColumnStatistics(String databaseName, String tableName, List statistics, stats -> { ColumnStatisticsDesc statisticsDescription = new ColumnStatisticsDesc(true, databaseName, tableName); + catalogName.ifPresent(statisticsDescription::setCatName); ColumnStatistics request = new ColumnStatistics(statisticsDescription, stats); client.updateTableColumnStatistics(request); }); @@ -288,7 +307,7 @@ public void setTableColumnStatistics(String databaseName, String tableName, List public void deleteTableColumnStatistics(String databaseName, String tableName, String columnName) throws TException { - client.deleteTableColumnStatistics(databaseName, tableName, columnName); + client.deleteTableColumnStatistics(prependCatalogToDbName(catalogName, databaseName), tableName, columnName); } @Override @@ -296,6 +315,7 @@ public Map> getPartitionColumnStatistics(Strin throws TException { PartitionsStatsRequest partitionsStatsRequest = new PartitionsStatsRequest(databaseName, tableName, columnNames, partitionNames); + catalogName.ifPresent(partitionsStatsRequest::setCatName); return client.getPartitionsStatisticsReq(partitionsStatsRequest).getPartStats(); } @@ -308,6 +328,7 @@ public void setPartitionColumnStatistics(String databaseName, String tableName, statistics, stats -> { ColumnStatisticsDesc statisticsDescription = new ColumnStatisticsDesc(false, databaseName, tableName); + catalogName.ifPresent(statisticsDescription::setCatName); statisticsDescription.setPartName(partitionName); ColumnStatistics request = new ColumnStatistics(statisticsDescription, stats); client.updatePartitionColumnStatistics(request); @@ -318,7 +339,7 @@ public void setPartitionColumnStatistics(String databaseName, String tableName, public void deletePartitionColumnStatistics(String databaseName, String tableName, String partitionName, String columnName) throws TException { - client.deletePartitionColumnStatistics(databaseName, tableName, partitionName, columnName); + client.deletePartitionColumnStatistics(prependCatalogToDbName(catalogName, databaseName), tableName, partitionName, columnName); } private void setColumnStatistics(String objectName, List statistics, UnaryCall> saveColumnStatistics) @@ -373,49 +394,55 @@ private void setColumnStatistics(String objectName, List st public List getPartitionNames(String databaseName, String tableName) throws TException { - return client.getPartitionNames(databaseName, tableName, (short) -1); + return client.getPartitionNames(prependCatalogToDbName(catalogName, databaseName), tableName, (short) -1); } @Override public List getPartitionNamesFiltered(String databaseName, String tableName, List partitionValues) throws TException { - return client.getPartitionNamesPs(databaseName, tableName, partitionValues, (short) -1); + return client.getPartitionNamesPs(prependCatalogToDbName(catalogName, databaseName), tableName, partitionValues, (short) -1); } @Override public int addPartitions(List newPartitions) throws TException { - return client.addPartitions(newPartitions); + if (catalogName.isEmpty()) { + return client.addPartitions(newPartitions); + } + + return client.addPartitions(newPartitions.stream() + .map(partition -> partition.deepCopy().setCatName(catalogName.orElseThrow())) + .collect(toImmutableList())); } @Override public boolean dropPartition(String databaseName, String tableName, List partitionValues, boolean deleteData) throws TException { - return client.dropPartition(databaseName, tableName, partitionValues, deleteData); + return client.dropPartition(prependCatalogToDbName(catalogName, databaseName), tableName, partitionValues, deleteData); } @Override public void alterPartition(String databaseName, String tableName, Partition partition) throws TException { - client.alterPartition(databaseName, tableName, partition); + client.alterPartition(prependCatalogToDbName(catalogName, databaseName), tableName, partition); } @Override public Partition getPartition(String databaseName, String tableName, List partitionValues) throws TException { - return client.getPartition(databaseName, tableName, partitionValues); + return client.getPartition(prependCatalogToDbName(catalogName, databaseName), tableName, partitionValues); } @Override public List getPartitionsByNames(String databaseName, String tableName, List partitionNames) throws TException { - return client.getPartitionsByNames(databaseName, tableName, partitionNames); + return client.getPartitionsByNames(prependCatalogToDbName(catalogName, databaseName), tableName, partitionNames); } @Override @@ -429,7 +456,11 @@ public List listRoles(String principalName, PrincipalType principalType) public List listPrivileges(String principalName, PrincipalType principalType, HiveObjectRef hiveObjectRef) throws TException { - return client.listPrivileges(principalName, principalType, hiveObjectRef); + HiveObjectRef hiveObjectRefParam = catalogName.isEmpty() + ? hiveObjectRef + : hiveObjectRef.deepCopy().setCatName(catalogName.orElseThrow()); + + return client.listPrivileges(principalName, principalType, hiveObjectRefParam); } @Override @@ -658,12 +689,13 @@ public void alterPartitions(String dbName, String tableName, List par chosenAlterPartitionsAlternative, () -> { AlterPartitionsRequest request = new AlterPartitionsRequest(dbName, tableName, partitions); + catalogName.ifPresent(request::setCatName); request.setWriteId(writeId); client.alterPartitionsReq(request); return null; }, () -> { - client.alterPartitionsWithEnvironmentContext(dbName, tableName, partitions, new EnvironmentContext()); + client.alterPartitionsWithEnvironmentContext(prependCatalogToDbName(catalogName, dbName), tableName, partitions, new EnvironmentContext()); return null; }); } @@ -689,6 +721,7 @@ public void alterTransactionalTable(Table table, long transactionId, long writeI table.setWriteId(writeId); checkArgument(writeId >= table.getWriteId(), "The writeId supplied %s should be greater than or equal to the table writeId %s", writeId, table.getWriteId()); AlterTableRequest request = new AlterTableRequest(table.getDbName(), table.getTableName(), table); + catalogName.ifPresent(request::setCatName); request.setValidWriteIdList(getValidWriteIds(ImmutableList.of(format("%s.%s", table.getDbName(), table.getTableName())), transactionId)); request.setWriteId(writeId); request.setEnvironmentContext(environmentContext); @@ -706,35 +739,37 @@ public void alterTransactionalTable(Table table, long transactionId, long writeI public Function getFunction(String databaseName, String functionName) throws TException { - return client.getFunction(databaseName, functionName); + return client.getFunction(prependCatalogToDbName(catalogName, databaseName), functionName); } @Override public Collection getFunctions(String databaseName, String functionNamePattern) throws TException { - return client.getFunctions(databaseName, functionNamePattern); + return client.getFunctions(prependCatalogToDbName(catalogName, databaseName), functionNamePattern); } @Override public void createFunction(Function function) throws TException { - client.createFunction(function); + client.createFunction(catalogName.isEmpty() + ? function + : function.deepCopy().setCatName(catalogName.orElseThrow())); } @Override public void alterFunction(Function function) throws TException { - client.alterFunction(function.getDbName(), function.getFunctionName(), function); + client.alterFunction(prependCatalogToDbName(catalogName, function.getDbName()), function.getFunctionName(), function); } @Override public void dropFunction(String databaseName, String functionName) throws TException { - client.dropFunction(databaseName, functionName); + client.dropFunction(prependCatalogToDbName(catalogName, databaseName), functionName); } // Method needs to be final for @SafeVarargs to work @@ -841,4 +876,33 @@ public interface TransportSupplier TTransport createTransport() throws TTransportException; } + + /** + * To construct a pattern using database and catalog name that Hive Thrift Server. + * Based on the Hive's implementation. + * + * @param catalogName hive catalog name + * @param databaseName database name + * @return string pattern that Hive Thrift Server understands + */ + private static String prependCatalogToDbName(Optional catalogName, @Nullable String databaseName) + { + if (catalogName.isEmpty()) { + return databaseName; + } + + StringBuilder catalogDatabaseName = new StringBuilder() + .append(CATALOG_DB_THRIFT_NAME_MARKER) + .append(catalogName.orElseThrow()) + .append(CATALOG_DB_SEPARATOR); + if (databaseName != null) { + if (databaseName.isEmpty()) { + catalogDatabaseName.append(DB_EMPTY_MARKER); + } + else { + catalogDatabaseName.append(databaseName); + } + } + return catalogDatabaseName.toString(); + } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastoreConfig.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastoreConfig.java index e688c51d53b9..807d9da084c2 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastoreConfig.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastoreConfig.java @@ -28,6 +28,7 @@ import jakarta.validation.constraints.NotNull; import java.io.File; +import java.util.Optional; import java.util.concurrent.TimeUnit; @DefunctConfig("hive.metastore.thrift.batch-fetch.enabled") @@ -47,6 +48,7 @@ public class ThriftMetastoreConfig private long delegationTokenCacheMaximumSize = 1000; private boolean deleteFilesOnDrop; private Duration maxWaitForTransactionLock = new Duration(10, TimeUnit.MINUTES); + private String catalogName; private boolean tlsEnabled; private File keystorePath; @@ -347,4 +349,17 @@ public ThriftMetastoreConfig setWriteStatisticsThreads(int writeStatisticsThread this.writeStatisticsThreads = writeStatisticsThreads; return this; } + + public Optional getCatalogName() + { + return Optional.ofNullable(catalogName); + } + + @Config("hive.metastore.thrift.catalog-name") + @ConfigDescription("Hive metastore thrift catalog name") + public ThriftMetastoreConfig setCatalogName(String catalogName) + { + this.catalogName = catalogName; + return this; + } } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveCustomCatalogConnectorSmokeTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveCustomCatalogConnectorSmokeTest.java new file mode 100644 index 000000000000..8163b804486d --- /dev/null +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveCustomCatalogConnectorSmokeTest.java @@ -0,0 +1,170 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive; + +import io.trino.plugin.hive.containers.HiveHadoop; +import io.trino.plugin.hive.containers.HiveMinioDataLake; +import io.trino.plugin.hive.metastore.Database; +import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.plugin.hive.metastore.HiveMetastoreFactory; +import io.trino.spi.security.PrincipalType; +import io.trino.testing.BaseConnectorSmokeTest; +import io.trino.testing.QueryRunner; +import io.trino.testing.TestingConnectorBehavior; +import org.junit.jupiter.api.Test; + +import java.util.Optional; + +import static io.trino.plugin.hive.HiveMetadata.MODIFYING_NON_TRANSACTIONAL_TABLE_MESSAGE; +import static io.trino.plugin.hive.HiveQueryRunner.TPCH_SCHEMA; +import static io.trino.plugin.hive.TestingHiveUtils.getConnectorService; +import static io.trino.plugin.tpch.TpchMetadata.TINY_SCHEMA_NAME; +import static io.trino.testing.QueryAssertions.copyTpchTables; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static io.trino.testing.containers.Minio.MINIO_ACCESS_KEY; +import static io.trino.testing.containers.Minio.MINIO_REGION; +import static io.trino.testing.containers.Minio.MINIO_SECRET_KEY; +import static java.lang.String.format; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class TestHiveCustomCatalogConnectorSmokeTest + extends BaseConnectorSmokeTest +{ + private static final String HIVE_CUSTOM_CATALOG = "custom"; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + String bucketName = "test-hive-metastore-catalog-smoke-test-" + randomNameSuffix(); + HiveMinioDataLake hiveMinioDataLake = closeAfterClass(new HiveMinioDataLake(bucketName, HiveHadoop.HIVE3_IMAGE)); + hiveMinioDataLake.start(); + + // Inserting into metastore's database directly because the Hive does not expose a way to create a custom catalog + hiveMinioDataLake.getHiveHadoop().runOnMetastore("INSERT INTO CTLGS VALUES (2, '%s', 'Custom catalog', 's3://%s/custom')".formatted(HIVE_CUSTOM_CATALOG, bucketName)); + + QueryRunner queryRunner = HiveQueryRunner.builder() + .addHiveProperty("hive.metastore", "thrift") + .addHiveProperty("hive.metastore.uri", hiveMinioDataLake.getHiveHadoop().getHiveMetastoreEndpoint().toString()) + .addHiveProperty("hive.metastore.thrift.catalog-name", HIVE_CUSTOM_CATALOG) + .addHiveProperty("fs.hadoop.enabled", "false") + .addHiveProperty("fs.native-s3.enabled", "true") + .addHiveProperty("s3.path-style-access", "true") + .addHiveProperty("s3.region", MINIO_REGION) + .addHiveProperty("s3.endpoint", hiveMinioDataLake.getMinio().getMinioAddress()) + .addHiveProperty("s3.aws-access-key", MINIO_ACCESS_KEY) + .addHiveProperty("s3.aws-secret-key", MINIO_SECRET_KEY) + .setCreateTpchSchemas(false) // Create the required tpch tables after the initialisation of the query runner + .build(); + + HiveMetastore metastore = getConnectorService(queryRunner, HiveMetastoreFactory.class) + .createMetastore(Optional.empty()); + metastore.createDatabase(createDatabaseMetastoreObject(TPCH_SCHEMA, Optional.of("s3://%s/%s".formatted(bucketName, TPCH_SCHEMA)))); + copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, REQUIRED_TPCH_TABLES); + + return queryRunner; + } + + @Override + protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) + { + return switch (connectorBehavior) { + case SUPPORTS_MULTI_STATEMENT_WRITES -> true; + case SUPPORTS_CREATE_MATERIALIZED_VIEW, + SUPPORTS_RENAME_SCHEMA, + SUPPORTS_TOPN_PUSHDOWN, + SUPPORTS_TRUNCATE -> false; + default -> super.hasBehavior(connectorBehavior); + }; + } + + @Test + @Override + public void testRowLevelDelete() + { + assertThatThrownBy(super::testRowLevelDelete) + .hasMessage(MODIFYING_NON_TRANSACTIONAL_TABLE_MESSAGE); + } + + @Test + @Override + public void testRowLevelUpdate() + { + assertThatThrownBy(super::testRowLevelUpdate) + .hasMessage(MODIFYING_NON_TRANSACTIONAL_TABLE_MESSAGE); + } + + @Test + @Override + public void testUpdate() + { + assertThatThrownBy(super::testUpdate) + .hasMessage(MODIFYING_NON_TRANSACTIONAL_TABLE_MESSAGE); + } + + @Test + @Override + public void testMerge() + { + assertThatThrownBy(super::testMerge) + .hasMessage(MODIFYING_NON_TRANSACTIONAL_TABLE_MESSAGE); + } + + @Test + @Override + public void testShowCreateTable() + { + assertThat((String) computeScalar("SHOW CREATE TABLE region")) + .isEqualTo(""" + CREATE TABLE hive.tpch.region ( + regionkey bigint, + name varchar(25), + comment varchar(152) + ) + WITH ( + format = 'ORC' + )"""); + } + + @Test + @Override + public void testCreateSchemaWithNonLowercaseOwnerName() + { + // Override because HivePrincipal's username is case-sensitive unlike TrinoPrincipal + assertThatThrownBy(super::testCreateSchemaWithNonLowercaseOwnerName) + .hasMessageContaining("Access Denied: Cannot create schema") + .hasStackTraceContaining("CREATE SCHEMA"); + } + + @Test + @Override + public void testRenameSchema() + { + String schemaName = getSession().getSchema().orElseThrow(); + assertQueryFails( + format("ALTER SCHEMA %s RENAME TO %s", schemaName, schemaName + randomNameSuffix()), + "Hive metastore does not support renaming schemas"); + } + + private static Database createDatabaseMetastoreObject(String name, Optional locationBase) + { + return Database.builder() + .setLocation(locationBase.map(base -> base + "/" + name)) + .setDatabaseName(name) + .setOwnerName(Optional.of("public")) + .setOwnerType(Optional.of(PrincipalType.ROLE)) + .build(); + } +} diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/TestHiveMetastoreCatalogs.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/TestHiveMetastoreCatalogs.java new file mode 100644 index 000000000000..40c50e2406ce --- /dev/null +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/TestHiveMetastoreCatalogs.java @@ -0,0 +1,165 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore; + +import com.google.common.collect.ImmutableMap; +import io.trino.Session; +import io.trino.plugin.hive.HiveQueryRunner; +import io.trino.plugin.hive.containers.HiveHadoop; +import io.trino.plugin.hive.containers.HiveMinioDataLake; +import io.trino.spi.security.PrincipalType; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.QueryRunner; +import org.junit.jupiter.api.Test; + +import java.util.Map; +import java.util.Optional; + +import static io.trino.plugin.hive.TestingHiveUtils.getConnectorService; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static io.trino.testing.TestingSession.testSessionBuilder; +import static io.trino.testing.containers.Minio.MINIO_ACCESS_KEY; +import static io.trino.testing.containers.Minio.MINIO_REGION; +import static io.trino.testing.containers.Minio.MINIO_SECRET_KEY; +import static java.lang.String.format; +import static org.assertj.core.api.Assertions.assertThat; + +public class TestHiveMetastoreCatalogs + extends AbstractTestQueryFramework +{ + private static final String TRINO_HIVE_CATALOG = "hive_catalog"; + private static final String TRINO_HIVE_CUSTOM_CATALOG = "hive_custom_catalog"; + private static final String HIVE_CUSTOM_CATALOG = "custom"; + + private String bucketName; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + this.bucketName = "test-hive-metastore-catalogs-" + randomNameSuffix(); + HiveMinioDataLake hiveMinioDataLake = closeAfterClass(new HiveMinioDataLake(bucketName, HiveHadoop.HIVE3_IMAGE)); + hiveMinioDataLake.start(); + + QueryRunner queryRunner = HiveQueryRunner.builder() + .setHiveProperties(buildHiveProperties(hiveMinioDataLake)) + .setCreateTpchSchemas(false) + .build(); + + hiveMinioDataLake.getHiveHadoop().runOnMetastore("INSERT INTO CTLGS VALUES (2, '%s', 'Custom catalog', 's3://%s/custom')".formatted(HIVE_CUSTOM_CATALOG, bucketName)); + + queryRunner.createCatalog(TRINO_HIVE_CUSTOM_CATALOG, "hive", ImmutableMap.builder() + .put("hive.metastore.thrift.catalog-name", HIVE_CUSTOM_CATALOG) + .putAll(buildHiveProperties(hiveMinioDataLake)) + .buildOrThrow()); + + queryRunner.createCatalog(TRINO_HIVE_CATALOG, "hive", ImmutableMap.builder() + .put("hive.metastore.thrift.catalog-name", "hive") // HiveMetastore uses "hive" as the default catalog name + .putAll(buildHiveProperties(hiveMinioDataLake)) + .buildOrThrow()); + + return queryRunner; + } + + private static Map buildHiveProperties(HiveMinioDataLake hiveMinioDataLake) + { + return ImmutableMap.builder() + .put("hive.metastore", "thrift") + .put("hive.metastore.uri", hiveMinioDataLake.getHiveHadoop().getHiveMetastoreEndpoint().toString()) + .put("fs.hadoop.enabled", "false") + .put("fs.native-s3.enabled", "true") + .put("s3.path-style-access", "true") + .put("s3.region", MINIO_REGION) + .put("s3.endpoint", hiveMinioDataLake.getMinio().getMinioAddress()) + .put("s3.aws-access-key", MINIO_ACCESS_KEY) + .put("s3.aws-secret-key", MINIO_SECRET_KEY) + .buildOrThrow(); + } + + @Test + public void testShowTables() + { + assertThat(query("SHOW SCHEMAS")).matches("VALUES VARCHAR 'default', VARCHAR 'information_schema'"); + + HiveMetastore metastore = getConnectorService(getQueryRunner(), HiveMetastoreFactory.class) + .createMetastore(Optional.empty()); + + String defaultCatalogSchema = "default_catalog_schema"; + metastore.createDatabase(createDatabaseMetastoreObject(defaultCatalogSchema, Optional.of("s3://%s/%s".formatted(bucketName, defaultCatalogSchema)))); + + Session defaultCatalogSession = testSessionBuilder() + .setCatalog("hive") + .setSchema(defaultCatalogSchema) + .build(); + Session hiveCatalogSession = testSessionBuilder() + .setCatalog(TRINO_HIVE_CATALOG) + .setSchema(defaultCatalogSchema) + .build(); + assertUpdate(defaultCatalogSession, "CREATE TABLE tabledefault (data integer)"); + assertUpdate(defaultCatalogSession, "INSERT INTO tabledefault VALUES (1),(2),(3),(4)", 4); + + String customCatalogSchema = "custom_catalog_schema"; + assertUpdate( + testSessionBuilder() + .setCatalog(TRINO_HIVE_CUSTOM_CATALOG) + .build(), + "CREATE SCHEMA " + customCatalogSchema); + Session customCatalogSession = testSessionBuilder() + .setCatalog(TRINO_HIVE_CUSTOM_CATALOG) + .setSchema(customCatalogSchema) + .build(); + assertUpdate(customCatalogSession, "CREATE TABLE tablecustom (data integer)"); + assertUpdate(customCatalogSession, "INSERT INTO tablecustom VALUES (4),(5),(6),(7)", 4); + + // schemas from the default Hive catalog are not visible in the custom Hive catalog and vice versa + assertThat(computeActual(defaultCatalogSession, "SHOW SCHEMAS").getOnlyColumn()) + .containsOnly("default", "default_catalog_schema", "information_schema"); + assertThat(computeActual(hiveCatalogSession, "SHOW SCHEMAS").getOnlyColumn()) + .containsOnly("default", "default_catalog_schema", "information_schema"); + assertThat(computeActual(customCatalogSession, "SHOW SCHEMAS").getOnlyColumn()) + .containsOnly(customCatalogSchema, "information_schema"); + + // tables from the default Hive catalog are not visible in the custom Hive catalog and vice versa + assertThat(computeActual(defaultCatalogSession, "SHOW TABLES IN " + defaultCatalogSchema).getOnlyColumn()) + .containsOnly("tabledefault"); + assertThat(computeActual(hiveCatalogSession, "SHOW TABLES IN " + defaultCatalogSchema).getOnlyColumn()) + .containsOnly("tabledefault"); + assertThat(computeActual(customCatalogSession, "SHOW TABLES IN " + customCatalogSchema).getOnlyColumn()) + .containsOnly("tablecustom"); + assertThat((String) computeScalar(customCatalogSession, format("SHOW CREATE TABLE %s.tablecustom", customCatalogSchema))) + .isEqualTo(""" + CREATE TABLE hive_custom_catalog.custom_catalog_schema.tablecustom ( + data integer + ) + WITH ( + format = 'ORC' + )"""); + + // select query : join hive's and custom catalog's table + assertQuery("SELECT a.data from hive.default_catalog_schema.tabledefault a, hive_custom_catalog.custom_catalog_schema.tablecustom b WHERE a.data = b.data", "SELECT 4"); + + assertUpdate(defaultCatalogSession, "DROP SCHEMA " + defaultCatalogSchema + " CASCADE"); + assertUpdate(customCatalogSession, "DROP SCHEMA " + customCatalogSchema + " CASCADE"); + } + + private static Database createDatabaseMetastoreObject(String name, Optional locationBase) + { + return Database.builder() + .setLocation(locationBase.map(base -> base + "/" + name)) + .setDatabaseName(name) + .setOwnerName(Optional.of("public")) + .setOwnerType(Optional.of(PrincipalType.ROLE)) + .build(); + } +} diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestThriftHiveMetastoreClient.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestThriftHiveMetastoreClient.java index 9053766a9f0d..c469374be025 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestThriftHiveMetastoreClient.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestThriftHiveMetastoreClient.java @@ -19,6 +19,7 @@ import org.apache.thrift.transport.TTransport; import org.junit.jupiter.api.Test; +import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; @@ -44,6 +45,7 @@ public void testAlternativeCall() return new TTransportMock(); }, "dummy", + Optional.empty(), new MetastoreSupportsDateStatistics(), true, new AtomicInteger(), diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestThriftMetastoreConfig.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestThriftMetastoreConfig.java index afb72ddabdc8..b204f613a443 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestThriftMetastoreConfig.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestThriftMetastoreConfig.java @@ -58,7 +58,8 @@ public void testDefaults() .setDeleteFilesOnDrop(false) .setMaxWaitForTransactionLock(new Duration(10, MINUTES)) .setAssumeCanonicalPartitionKeys(false) - .setWriteStatisticsThreads(20)); + .setWriteStatisticsThreads(20) + .setCatalogName(null)); } @Test @@ -90,6 +91,7 @@ public void testExplicitPropertyMappings() .put("hive.metastore.thrift.write-statistics-threads", "10") .put("hive.metastore.thrift.assume-canonical-partition-keys", "true") .put("hive.metastore.thrift.use-spark-table-statistics-fallback", "false") + .put("hive.metastore.thrift.catalog-name", "custom_catalog_name") .buildOrThrow(); ThriftMetastoreConfig expected = new ThriftMetastoreConfig() @@ -113,7 +115,8 @@ public void testExplicitPropertyMappings() .setMaxWaitForTransactionLock(new Duration(5, MINUTES)) .setAssumeCanonicalPartitionKeys(true) .setWriteStatisticsThreads(10) - .setUseSparkTableStatisticsFallback(false); + .setUseSparkTableStatisticsFallback(false) + .setCatalogName("custom_catalog_name"); assertFullMapping(properties, expected); } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestingTokenAwareMetastoreClientFactory.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestingTokenAwareMetastoreClientFactory.java index b34e1a7842ec..9d960e86d6ee 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestingTokenAwareMetastoreClientFactory.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestingTokenAwareMetastoreClientFactory.java @@ -46,7 +46,7 @@ public TestingTokenAwareMetastoreClientFactory(Optional socksProxy, public TestingTokenAwareMetastoreClientFactory(Optional socksProxy, URI uri, Duration timeout, MetastoreClientAdapterProvider metastoreClientAdapterProvider) { - this.factory = new DefaultThriftMetastoreClientFactory(Optional.empty(), socksProxy, timeout, timeout, AUTHENTICATION, "localhost"); + this.factory = new DefaultThriftMetastoreClientFactory(Optional.empty(), socksProxy, timeout, timeout, AUTHENTICATION, "localhost", Optional.empty()); this.address = requireNonNull(uri, "uri is null"); this.metastoreClientAdapterProvider = requireNonNull(metastoreClientAdapterProvider, "metastoreClientAdapterProvider is null"); } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveMetastoreClientFactory.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveMetastoreClientFactory.java index 17f2a687136e..6f59336cdf3a 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveMetastoreClientFactory.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveMetastoreClientFactory.java @@ -35,7 +35,8 @@ public final class TestHiveMetastoreClientFactory new Duration(10, SECONDS), new Duration(10, SECONDS), new NoHiveMetastoreAuthentication(), - "localhost"); + "localhost", + Optional.empty()); @Inject @Named("databases.hive.metastore.host")