diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a4e2dd8ba0b1..2cd37bdf8f8c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -478,6 +478,15 @@ jobs: if: matrix.modules == 'plugin/trino-bigquery' && env.BIGQUERY_CASE_INSENSITIVE_CREDENTIALS_KEY != '' run: | $MAVEN test ${MAVEN_TEST} -pl :trino-bigquery -Pcloud-tests-case-insensitive-mapping -Dbigquery.credentials-key="${BIGQUERY_CASE_INSENSITIVE_CREDENTIALS_KEY}" + - name: Iceberg Glue Catalog Tests + env: + AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESSKEY }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRETKEY }} + AWS_REGION: us-east-2 + S3_BUCKET: presto-ci-test + if: contains(matrix.modules, 'plugin/trino-iceberg') && (env.AWS_ACCESS_KEY_ID != '' || env.AWS_SECRET_ACCESS_KEY != '') + run: | + $MAVEN test ${MAVEN_TEST} -pl :trino-iceberg -P test-glue-catalog -Ds3.bucket=${S3_BUCKET} - name: Sanitize artifact name if: always() run: | diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ViewReaderUtil.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ViewReaderUtil.java index cee456280281..76d2df723946 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ViewReaderUtil.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ViewReaderUtil.java @@ -41,6 +41,7 @@ import java.util.Base64; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Optional; import java.util.function.BiFunction; @@ -126,12 +127,22 @@ private static CoralTableRedirectionResolver coralTableRedirectionResolver( public static boolean isPrestoView(Table table) { - return "true".equals(table.getParameters().get(PRESTO_VIEW_FLAG)); + return isPrestoView(table.getParameters()); + } + + public static boolean isPrestoView(Map tableParameters) + { + return "true".equals(tableParameters.get(PRESTO_VIEW_FLAG)); } public static boolean isHiveOrPrestoView(Table table) { - return table.getTableType().equals(TableType.VIRTUAL_VIEW.name()); + return isHiveOrPrestoView(table.getTableType()); + } + + public static boolean isHiveOrPrestoView(String tableType) + { + return tableType.equals(TableType.VIRTUAL_VIEW.name()); } public static boolean canDecodeView(Table table) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java index 686aa0af1671..f1fbf517f496 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java @@ -202,7 +202,7 @@ public GlueHiveMetastore( this.columnStatisticsProvider = columnStatisticsProviderFactory.createGlueColumnStatisticsProvider(glueClient, stats); } - private static AWSGlueAsync createAsyncGlueClient(GlueHiveMetastoreConfig config, Optional requestHandler, RequestMetricCollector metricsCollector) + public static AWSGlueAsync createAsyncGlueClient(GlueHiveMetastoreConfig config, Optional requestHandler, RequestMetricCollector metricsCollector) { ClientConfiguration clientConfig = new ClientConfiguration() .withMaxConnections(config.getMaxGlueConnections()) diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index ba6f2fab2741..98fdfce39e36 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -84,6 +84,21 @@ units + + com.amazonaws + aws-java-sdk-core + + + + com.amazonaws + aws-java-sdk-glue + + + + com.amazonaws + aws-java-sdk-s3 + + com.fasterxml.jackson.core jackson-core @@ -320,6 +335,9 @@ **/TestIceberg*FailureRecoveryTest.java + **/TestIcebergGlueCatalogConnectorSmokeTest.java + **/TestTrinoGlueCatalogTest.java + **/TestSharedGlueMetastore.java @@ -371,5 +389,24 @@ + + + test-glue-catalog + + + + org.apache.maven.plugins + maven-surefire-plugin + + + **/TestIcebergGlueCatalogConnectorSmokeTest.java + **/TestTrinoGlueCatalogTest.java + **/TestSharedGlueMetastore.java + + + + + + diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergErrorCode.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergErrorCode.java index 86327eff6684..9df0b5291bcc 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergErrorCode.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergErrorCode.java @@ -37,6 +37,7 @@ public enum IcebergErrorCode ICEBERG_WRITE_VALIDATION_FAILED(10, INTERNAL_ERROR), ICEBERG_INVALID_SNAPSHOT_ID(11, USER_ERROR), ICEBERG_COMMIT_ERROR(12, EXTERNAL), + ICEBERG_CATALOG_ERROR(13, EXTERNAL), /**/; private final ErrorCode errorCode; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java index a5f01028b063..8486bd0a0ba1 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java @@ -25,7 +25,6 @@ import io.trino.plugin.hive.orc.OrcWriterConfig; import io.trino.plugin.hive.parquet.ParquetReaderConfig; import io.trino.plugin.hive.parquet.ParquetWriterConfig; -import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; import io.trino.plugin.iceberg.procedure.OptimizeTableProcedure; import io.trino.spi.connector.ConnectorNodePartitioningProvider; import io.trino.spi.connector.ConnectorPageSinkProvider; @@ -66,7 +65,6 @@ public void configure(Binder binder) configBinder(binder).bindConfig(ParquetReaderConfig.class); configBinder(binder).bindConfig(ParquetWriterConfig.class); - binder.bind(TrinoCatalogFactory.class).in(Scopes.SINGLETON); binder.bind(IcebergMetadataFactory.class).in(Scopes.SINGLETON); jsonCodecBinder(binder).bindJsonCodec(CommitTaskData.class); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java index 68cc6353be07..a22e7f961ac7 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java @@ -18,7 +18,6 @@ import io.airlift.slice.Slice; import io.airlift.slice.SliceUtf8; import io.airlift.slice.Slices; -import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.iceberg.catalog.IcebergTableOperations; import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; import io.trino.plugin.iceberg.catalog.TrinoCatalog; @@ -125,10 +124,10 @@ public static boolean isIcebergTable(io.trino.plugin.hive.metastore.Table table) return ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(table.getParameters().get(TABLE_TYPE_PROP)); } - public static Table loadIcebergTable(HiveMetastore metastore, IcebergTableOperationsProvider tableOperationsProvider, ConnectorSession session, SchemaTableName table) + public static Table loadIcebergTable(TrinoCatalog catalog, IcebergTableOperationsProvider tableOperationsProvider, ConnectorSession session, SchemaTableName table) { TableOperations operations = tableOperationsProvider.createTableOperations( - metastore, + catalog, session, table.getSchemaName(), table.getTableName(), @@ -138,14 +137,14 @@ public static Table loadIcebergTable(HiveMetastore metastore, IcebergTableOperat } public static Table getIcebergTableWithMetadata( - HiveMetastore metastore, + TrinoCatalog catalog, IcebergTableOperationsProvider tableOperationsProvider, ConnectorSession session, SchemaTableName table, TableMetadata tableMetadata) { IcebergTableOperations operations = tableOperationsProvider.createTableOperations( - metastore, + catalog, session, table.getSchemaName(), table.getTableName(), @@ -237,7 +236,7 @@ public static Optional getTableComment(Table table) return Optional.ofNullable(table.properties().get(TABLE_COMMENT)); } - private static String quotedTableName(SchemaTableName name) + public static String quotedTableName(SchemaTableName name) { return quotedName(name.getSchemaName()) + "." + quotedName(name.getTableName()); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java new file mode 100644 index 000000000000..23a35e5ef5d2 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java @@ -0,0 +1,118 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog; + +import io.trino.plugin.hive.HdfsEnvironment; +import io.trino.plugin.iceberg.ColumnIdentity; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.SchemaTableName; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.Transaction; + +import java.io.IOException; +import java.util.Map; +import java.util.Optional; + +import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; +import static java.util.UUID.randomUUID; +import static org.apache.iceberg.TableMetadata.newTableMetadata; +import static org.apache.iceberg.Transactions.createTableTransaction; + +public abstract class AbstractTrinoCatalog + implements TrinoCatalog +{ + protected final IcebergTableOperationsProvider tableOperationsProvider; + private final boolean useUniqueTableLocation; + + protected AbstractTrinoCatalog( + IcebergTableOperationsProvider tableOperationsProvider, + boolean useUniqueTableLocation) + { + this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationsProvider is null"); + this.useUniqueTableLocation = useUniqueTableLocation; + } + + @Override + public void updateTableComment(ConnectorSession session, SchemaTableName schemaTableName, Optional comment) + { + Table icebergTable = loadTable(session, schemaTableName); + if (comment.isEmpty()) { + icebergTable.updateProperties().remove(TABLE_COMMENT).commit(); + } + else { + icebergTable.updateProperties().set(TABLE_COMMENT, comment.get()).commit(); + } + } + + @Override + public void updateColumnComment(ConnectorSession session, SchemaTableName schemaTableName, ColumnIdentity columnIdentity, Optional comment) + { + Table icebergTable = loadTable(session, schemaTableName); + icebergTable.updateSchema().updateColumnDoc(columnIdentity.getName(), comment.orElse(null)).commit(); + } + + protected Transaction newCreateTableTransaction( + ConnectorSession session, + SchemaTableName schemaTableName, + Schema schema, + PartitionSpec partitionSpec, + String location, + Map properties, + Optional owner) + { + TableMetadata metadata = newTableMetadata(schema, partitionSpec, location, properties); + TableOperations ops = tableOperationsProvider.createTableOperations( + this, + session, + schemaTableName.getSchemaName(), + schemaTableName.getTableName(), + owner, + Optional.of(location)); + return createTableTransaction(schemaTableName.toString(), ops, metadata); + } + + protected String createNewTableName(String baseTableName) + { + String tableName = baseTableName; + if (useUniqueTableLocation) { + tableName += "-" + randomUUID().toString().replace("-", ""); + } + return tableName; + } + + protected void deleteTableDirectory( + ConnectorSession session, + SchemaTableName schemaTableName, + HdfsEnvironment hdfsEnvironment, + Path tableLocation) + { + try { + FileSystem fileSystem = hdfsEnvironment.getFileSystem(new HdfsEnvironment.HdfsContext(session), tableLocation); + fileSystem.delete(tableLocation, true); + } + catch (IOException e) { + throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, format("Failed to delete directory %s of the table %s", tableLocation, schemaTableName), e); + } + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergCatalogModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergCatalogModule.java index a5a6f2010058..9a9dbf795bd2 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergCatalogModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergCatalogModule.java @@ -14,6 +14,7 @@ package io.trino.plugin.iceberg.catalog; import com.google.inject.Binder; +import com.google.inject.Inject; import com.google.inject.Module; import com.google.inject.Scopes; import io.airlift.configuration.AbstractConfigurationAwareModule; @@ -26,13 +27,14 @@ import io.trino.plugin.iceberg.IcebergConfig; import io.trino.plugin.iceberg.catalog.file.FileMetastoreTableOperationsProvider; import io.trino.plugin.iceberg.catalog.file.IcebergFileMetastoreCatalogModule; +import io.trino.plugin.iceberg.catalog.glue.IcebergGlueCatalogModule; import io.trino.plugin.iceberg.catalog.hms.IcebergHiveMetastoreCatalogModule; - -import javax.inject.Inject; +import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalogFactory; import java.util.Optional; import static io.airlift.configuration.ConditionalModule.conditionalModule; +import static io.trino.plugin.iceberg.CatalogType.GLUE; import static io.trino.plugin.iceberg.CatalogType.HIVE_METASTORE; import static io.trino.plugin.iceberg.CatalogType.TESTING_FILE_METASTORE; import static java.util.Objects.requireNonNull; @@ -52,16 +54,16 @@ protected void setup(Binder binder) { if (metastore.isPresent()) { binder.bind(HiveMetastoreFactory.class).annotatedWith(RawHiveMetastoreFactory.class).toInstance(HiveMetastoreFactory.ofInstance(metastore.get())); + binder.bind(MetastoreValidator.class).asEagerSingleton(); + install(new DecoratedHiveMetastoreModule()); binder.bind(IcebergTableOperationsProvider.class).to(FileMetastoreTableOperationsProvider.class).in(Scopes.SINGLETON); + binder.bind(TrinoCatalogFactory.class).to(TrinoHiveCatalogFactory.class).in(Scopes.SINGLETON); } else { bindCatalogModule(HIVE_METASTORE, new IcebergHiveMetastoreCatalogModule()); bindCatalogModule(TESTING_FILE_METASTORE, new IcebergFileMetastoreCatalogModule()); - // TODO add support for Glue metastore + bindCatalogModule(GLUE, new IcebergGlueCatalogModule()); } - - binder.bind(MetastoreValidator.class).asEagerSingleton(); - install(new DecoratedHiveMetastoreModule()); } public static class MetastoreValidator diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergTableOperationsProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergTableOperationsProvider.java index c4e970cab61b..fdf3c6b64c94 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergTableOperationsProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergTableOperationsProvider.java @@ -13,7 +13,6 @@ */ package io.trino.plugin.iceberg.catalog; -import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.spi.connector.ConnectorSession; import java.util.Optional; @@ -21,7 +20,7 @@ public interface IcebergTableOperationsProvider { IcebergTableOperations createTableOperations( - HiveMetastore hiveMetastore, + TrinoCatalog catalog, ConnectorSession session, String database, String table, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalogFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalogFactory.java index ffa1ad610fae..4b72584b183b 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalogFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalogFactory.java @@ -13,85 +13,9 @@ */ package io.trino.plugin.iceberg.catalog; -import io.trino.plugin.base.CatalogName; -import io.trino.plugin.hive.HdfsEnvironment; -import io.trino.plugin.hive.HiveConfig; -import io.trino.plugin.hive.NodeVersion; -import io.trino.plugin.hive.metastore.HiveMetastoreFactory; -import io.trino.plugin.iceberg.CatalogType; -import io.trino.plugin.iceberg.IcebergConfig; -import io.trino.plugin.iceberg.IcebergSecurityConfig; -import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalog; -import io.trino.spi.TrinoException; import io.trino.spi.security.ConnectorIdentity; -import io.trino.spi.type.TypeManager; -import javax.inject.Inject; - -import java.util.Optional; - -import static io.trino.plugin.hive.metastore.cache.CachingHiveMetastore.memoizeMetastore; -import static io.trino.plugin.iceberg.IcebergSecurityConfig.IcebergSecurity.SYSTEM; -import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; -import static java.util.Objects.requireNonNull; - -public class TrinoCatalogFactory +public interface TrinoCatalogFactory { - private final CatalogName catalogName; - private final HiveMetastoreFactory metastoreFactory; - private final HdfsEnvironment hdfsEnvironment; - private final TypeManager typeManager; - private final IcebergTableOperationsProvider tableOperationsProvider; - private final String trinoVersion; - private final CatalogType catalogType; - private final boolean isUniqueTableLocation; - private final boolean isUsingSystemSecurity; - private final boolean deleteSchemaLocationsFallback; - - @Inject - public TrinoCatalogFactory( - IcebergConfig config, - CatalogName catalogName, - HiveMetastoreFactory metastoreFactory, - HdfsEnvironment hdfsEnvironment, - TypeManager typeManager, - IcebergTableOperationsProvider tableOperationsProvider, - NodeVersion nodeVersion, - IcebergSecurityConfig securityConfig, - HiveConfig hiveConfig) - { - this.catalogName = requireNonNull(catalogName, "catalogName is null"); - this.metastoreFactory = requireNonNull(metastoreFactory, "metastoreFactory is null"); - this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); - this.typeManager = requireNonNull(typeManager, "typeManager is null"); - this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationProvider is null"); - this.trinoVersion = requireNonNull(nodeVersion, "trinoVersion is null").toString(); - requireNonNull(config, "config is null"); - this.catalogType = config.getCatalogType(); - this.isUniqueTableLocation = config.isUniqueTableLocation(); - this.isUsingSystemSecurity = securityConfig.getSecuritySystem() == SYSTEM; - this.deleteSchemaLocationsFallback = requireNonNull(hiveConfig).isDeleteSchemaLocationsFallback(); - } - - public TrinoCatalog create(ConnectorIdentity identity) - { - switch (catalogType) { - case TESTING_FILE_METASTORE: - case HIVE_METASTORE: - return new TrinoHiveCatalog( - catalogName, - memoizeMetastore(metastoreFactory.createMetastore(Optional.of(identity)), 1000), - hdfsEnvironment, - typeManager, - tableOperationsProvider, - trinoVersion, - isUniqueTableLocation, - isUsingSystemSecurity, - deleteSchemaLocationsFallback); - case GLUE: - // TODO not supported yet - throw new TrinoException(NOT_SUPPORTED, "Unknown Trino Iceberg catalog type"); - } - throw new TrinoException(NOT_SUPPORTED, "Unsupported Trino Iceberg catalog type " + catalogType); - } + TrinoCatalog create(ConnectorIdentity identity); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperationsProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperationsProvider.java index 7fafc4a58c95..09f569a7962a 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperationsProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperationsProvider.java @@ -14,10 +14,11 @@ package io.trino.plugin.iceberg.catalog.file; import io.trino.plugin.hive.HdfsEnvironment.HdfsContext; -import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.iceberg.FileIoProvider; import io.trino.plugin.iceberg.catalog.IcebergTableOperations; import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalog; import io.trino.spi.connector.ConnectorSession; import javax.inject.Inject; @@ -39,7 +40,7 @@ public FileMetastoreTableOperationsProvider(FileIoProvider fileIoProvider) @Override public IcebergTableOperations createTableOperations( - HiveMetastore hiveMetastore, + TrinoCatalog catalog, ConnectorSession session, String database, String table, @@ -48,7 +49,7 @@ public IcebergTableOperations createTableOperations( { return new FileMetastoreTableOperations( fileIoProvider.createFileIo(new HdfsContext(session), session.getQueryId()), - hiveMetastore, + ((TrinoHiveCatalog) catalog).getMetastore(), session, database, table, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/IcebergFileMetastoreCatalogModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/IcebergFileMetastoreCatalogModule.java index 64cb309ea05e..d366ad8b994f 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/IcebergFileMetastoreCatalogModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/IcebergFileMetastoreCatalogModule.java @@ -16,8 +16,12 @@ import com.google.inject.Binder; import com.google.inject.Scopes; import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.trino.plugin.hive.metastore.DecoratedHiveMetastoreModule; import io.trino.plugin.hive.metastore.file.FileMetastoreModule; +import io.trino.plugin.iceberg.catalog.IcebergCatalogModule.MetastoreValidator; import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; +import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalogFactory; public class IcebergFileMetastoreCatalogModule extends AbstractConfigurationAwareModule @@ -27,5 +31,8 @@ protected void setup(Binder binder) { install(new FileMetastoreModule()); binder.bind(IcebergTableOperationsProvider.class).to(FileMetastoreTableOperationsProvider.class).in(Scopes.SINGLETON); + binder.bind(TrinoCatalogFactory.class).to(TrinoHiveCatalogFactory.class).in(Scopes.SINGLETON); + binder.bind(MetastoreValidator.class).asEagerSingleton(); + install(new DecoratedHiveMetastoreModule()); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueIcebergTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueIcebergTableOperations.java new file mode 100644 index 000000000000..dc6744b08fd2 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueIcebergTableOperations.java @@ -0,0 +1,146 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.glue; + +import com.amazonaws.services.glue.AWSGlueAsync; +import com.amazonaws.services.glue.model.CreateTableRequest; +import com.amazonaws.services.glue.model.EntityNotFoundException; +import com.amazonaws.services.glue.model.GetTableRequest; +import com.amazonaws.services.glue.model.Table; +import com.amazonaws.services.glue.model.TableInput; +import com.amazonaws.services.glue.model.UpdateTableRequest; +import com.google.common.collect.ImmutableMap; +import io.trino.plugin.hive.metastore.glue.GlueMetastoreStats; +import io.trino.plugin.iceberg.UnknownTableTypeException; +import io.trino.plugin.iceberg.catalog.AbstractIcebergTableOperations; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.TableNotFoundException; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.io.FileIO; + +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; +import static io.trino.plugin.hive.ViewReaderUtil.isHiveOrPrestoView; +import static io.trino.plugin.hive.ViewReaderUtil.isPrestoView; +import static io.trino.plugin.hive.util.HiveUtil.isIcebergTable; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; +import static io.trino.plugin.iceberg.catalog.glue.GlueIcebergUtil.getTableInput; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; +import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE; +import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP; +import static org.apache.iceberg.BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP; +import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; + +public class GlueIcebergTableOperations + extends AbstractIcebergTableOperations +{ + private final AWSGlueAsync glueClient; + private final GlueMetastoreStats stats; + + protected GlueIcebergTableOperations( + AWSGlueAsync glueClient, + GlueMetastoreStats stats, + FileIO fileIo, + ConnectorSession session, + String database, + String table, + Optional owner, + Optional location) + { + super(fileIo, session, database, table, owner, location); + this.glueClient = requireNonNull(glueClient, "glueClient is null"); + this.stats = requireNonNull(stats, "stats is null"); + } + + @Override + protected String getRefreshedLocation() + { + Table table = getTable(); + + if (isPrestoView(table.getParameters()) && isHiveOrPrestoView(table.getTableType())) { + // this is a Presto Hive view, hence not a table + throw new TableNotFoundException(getSchemaTableName()); + } + if (!isIcebergTable(table.getParameters())) { + throw new UnknownTableTypeException(getSchemaTableName()); + } + + String metadataLocation = table.getParameters().get(METADATA_LOCATION_PROP); + if (metadataLocation == null) { + throw new TrinoException(ICEBERG_INVALID_METADATA, format("Table is missing [%s] property: %s", METADATA_LOCATION_PROP, getSchemaTableName())); + } + return metadataLocation; + } + + @Override + protected void commitNewTable(TableMetadata metadata) + { + verify(version == -1, "commitNewTable called on a table which already exists"); + String newMetadataLocation = writeNewMetadata(metadata, 0); + TableInput tableInput = getTableInput(tableName, owner, ImmutableMap.builder() + .put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE) + .put(METADATA_LOCATION_PROP, newMetadataLocation) + .buildOrThrow()); + + CreateTableRequest createTableRequest = new CreateTableRequest() + .withDatabaseName(database) + .withTableInput(tableInput); + stats.getCreateTable().call(() -> glueClient.createTable(createTableRequest)); + shouldRefresh = true; + } + + @Override + protected void commitToExistingTable(TableMetadata base, TableMetadata metadata) + { + String newMetadataLocation = writeNewMetadata(metadata, version + 1); + TableInput tableInput = getTableInput(tableName, owner, ImmutableMap.builder() + .put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE) + .put(METADATA_LOCATION_PROP, newMetadataLocation) + .put(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation) + .buildOrThrow()); + + Table table = getTable(); + + checkState(currentMetadataLocation != null, "No current metadata location for existing table"); + String metadataLocation = table.getParameters().get(METADATA_LOCATION_PROP); + if (!currentMetadataLocation.equals(metadataLocation)) { + throw new CommitFailedException("Metadata location [%s] is not same as table metadata location [%s] for %s", + currentMetadataLocation, metadataLocation, getSchemaTableName()); + } + + UpdateTableRequest updateTableRequest = new UpdateTableRequest() + .withDatabaseName(database) + .withTableInput(tableInput); + stats.getUpdateTable().call(() -> glueClient.updateTable(updateTableRequest)); + shouldRefresh = true; + } + + private Table getTable() + { + try { + GetTableRequest getTableRequest = new GetTableRequest() + .withDatabaseName(database) + .withName(tableName); + return stats.getGetTable().call(() -> glueClient.getTable(getTableRequest).getTable()); + } + catch (EntityNotFoundException e) { + throw new TableNotFoundException(getSchemaTableName(), e); + } + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueIcebergTableOperationsProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueIcebergTableOperationsProvider.java new file mode 100644 index 000000000000..3556a9e48d58 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueIcebergTableOperationsProvider.java @@ -0,0 +1,76 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.glue; + +import com.amazonaws.services.glue.AWSGlueAsync; +import io.trino.plugin.hive.HdfsEnvironment.HdfsContext; +import io.trino.plugin.hive.metastore.glue.GlueHiveMetastoreConfig; +import io.trino.plugin.hive.metastore.glue.GlueMetastoreStats; +import io.trino.plugin.iceberg.FileIoProvider; +import io.trino.plugin.iceberg.catalog.IcebergTableOperations; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.spi.connector.ConnectorSession; +import org.weakref.jmx.Flatten; +import org.weakref.jmx.Managed; + +import javax.inject.Inject; + +import java.util.Optional; + +import static io.trino.plugin.hive.metastore.glue.GlueHiveMetastore.createAsyncGlueClient; +import static java.util.Objects.requireNonNull; + +public class GlueIcebergTableOperationsProvider + implements IcebergTableOperationsProvider +{ + private final FileIoProvider fileIoProvider; + private final AWSGlueAsync glueClient; + private final GlueMetastoreStats stats = new GlueMetastoreStats(); + + @Inject + public GlueIcebergTableOperationsProvider(FileIoProvider fileIoProvider, GlueHiveMetastoreConfig glueConfig) + { + this.fileIoProvider = requireNonNull(fileIoProvider, "fileIoProvider is null"); + requireNonNull(glueConfig, "glueConfig is null"); + this.glueClient = createAsyncGlueClient(glueConfig, Optional.empty(), stats.newRequestMetricsCollector()); + } + + @Managed + @Flatten + public GlueMetastoreStats getStats() + { + return stats; + } + + @Override + public IcebergTableOperations createTableOperations( + TrinoCatalog catalog, + ConnectorSession session, + String database, + String table, + Optional owner, + Optional location) + { + return new GlueIcebergTableOperations( + glueClient, + stats, + fileIoProvider.createFileIo(new HdfsContext(session), session.getQueryId()), + session, + database, + table, + owner, + location); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueIcebergUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueIcebergUtil.java new file mode 100644 index 000000000000..579be5e949fb --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueIcebergUtil.java @@ -0,0 +1,36 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.glue; + +import com.amazonaws.services.glue.model.TableInput; + +import java.util.Map; +import java.util.Optional; + +import static org.apache.hadoop.hive.metastore.TableType.EXTERNAL_TABLE; + +public final class GlueIcebergUtil +{ + private GlueIcebergUtil() {} + + public static TableInput getTableInput(String tableName, Optional owner, Map parameters) + { + return new TableInput() + .withName(tableName) + .withOwner(owner.orElse(null)) + .withParameters(parameters) + // Iceberg does not distinguish managed and external tables, all tables are treated the same and marked as EXTERNAL + .withTableType(EXTERNAL_TABLE.name()); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/IcebergGlueCatalogModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/IcebergGlueCatalogModule.java new file mode 100644 index 000000000000..c085a431e4b4 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/IcebergGlueCatalogModule.java @@ -0,0 +1,38 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.glue; + +import com.google.inject.Binder; +import com.google.inject.Scopes; +import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.trino.plugin.hive.metastore.glue.GlueHiveMetastoreConfig; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; + +import static io.airlift.configuration.ConfigBinder.configBinder; +import static org.weakref.jmx.guice.ExportBinder.newExporter; + +public class IcebergGlueCatalogModule + extends AbstractConfigurationAwareModule +{ + @Override + protected void setup(Binder binder) + { + configBinder(binder).bindConfig(GlueHiveMetastoreConfig.class); + binder.bind(IcebergTableOperationsProvider.class).to(GlueIcebergTableOperationsProvider.class).in(Scopes.SINGLETON); + newExporter(binder).export(IcebergTableOperationsProvider.class).withGeneratedName(); + binder.bind(TrinoCatalogFactory.class).to(TrinoGlueCatalogFactory.class).in(Scopes.SINGLETON); + newExporter(binder).export(TrinoCatalogFactory.class).withGeneratedName(); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java new file mode 100644 index 000000000000..0dd21eac0794 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java @@ -0,0 +1,448 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.glue; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.glue.AWSGlueAsync; +import com.amazonaws.services.glue.model.AlreadyExistsException; +import com.amazonaws.services.glue.model.CreateDatabaseRequest; +import com.amazonaws.services.glue.model.CreateTableRequest; +import com.amazonaws.services.glue.model.Database; +import com.amazonaws.services.glue.model.DatabaseInput; +import com.amazonaws.services.glue.model.DeleteDatabaseRequest; +import com.amazonaws.services.glue.model.DeleteTableRequest; +import com.amazonaws.services.glue.model.EntityNotFoundException; +import com.amazonaws.services.glue.model.GetDatabaseRequest; +import com.amazonaws.services.glue.model.GetDatabasesRequest; +import com.amazonaws.services.glue.model.GetDatabasesResult; +import com.amazonaws.services.glue.model.GetTableRequest; +import com.amazonaws.services.glue.model.GetTablesRequest; +import com.amazonaws.services.glue.model.GetTablesResult; +import com.amazonaws.services.glue.model.TableInput; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.trino.plugin.hive.HdfsEnvironment; +import io.trino.plugin.hive.SchemaAlreadyExistsException; +import io.trino.plugin.hive.metastore.glue.GlueMetastoreStats; +import io.trino.plugin.iceberg.catalog.AbstractTrinoCatalog; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.ConnectorMaterializedViewDefinition; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorViewDefinition; +import io.trino.spi.connector.SchemaNotFoundException; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.security.PrincipalType; +import io.trino.spi.security.TrinoPrincipal; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.Transaction; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Stream; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.plugin.hive.HiveErrorCode.HIVE_DATABASE_LOCATION_ERROR; +import static io.trino.plugin.hive.HiveErrorCode.HIVE_METASTORE_ERROR; +import static io.trino.plugin.hive.metastore.glue.AwsSdkUtil.getPaginatedResults; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CATALOG_ERROR; +import static io.trino.plugin.iceberg.IcebergSchemaProperties.LOCATION_PROPERTY; +import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata; +import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName; +import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped; +import static io.trino.plugin.iceberg.catalog.glue.GlueIcebergUtil.getTableInput; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; +import static org.apache.iceberg.CatalogUtil.dropTableData; + +public class TrinoGlueCatalog + extends AbstractTrinoCatalog +{ + private final HdfsEnvironment hdfsEnvironment; + private final Optional defaultSchemaLocation; + private final AWSGlueAsync glueClient; + private final GlueMetastoreStats stats; + + private final Map tableMetadataCache = new ConcurrentHashMap<>(); + + public TrinoGlueCatalog( + HdfsEnvironment hdfsEnvironment, + IcebergTableOperationsProvider tableOperationsProvider, + AWSGlueAsync glueClient, + GlueMetastoreStats stats, + Optional defaultSchemaLocation, + boolean useUniqueTableLocation) + { + super(tableOperationsProvider, useUniqueTableLocation); + this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); + this.glueClient = requireNonNull(glueClient, "glueClient is null"); + this.stats = requireNonNull(stats, "stats is null"); + this.defaultSchemaLocation = requireNonNull(defaultSchemaLocation, "defaultSchemaLocation is null"); + } + + @Override + public List listNamespaces(ConnectorSession session) + { + try { + return getPaginatedResults( + glueClient::getDatabases, + new GetDatabasesRequest(), + GetDatabasesRequest::setNextToken, + GetDatabasesResult::getNextToken, + stats.getGetDatabases()) + .map(GetDatabasesResult::getDatabaseList) + .flatMap(List::stream) + .map(com.amazonaws.services.glue.model.Database::getName) + .collect(toImmutableList()); + } + catch (AmazonServiceException e) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, e); + } + } + + @Override + public void dropNamespace(ConnectorSession session, String namespace) + { + try { + stats.getDeleteDatabase().call(() -> + glueClient.deleteDatabase(new DeleteDatabaseRequest().withName(namespace))); + } + catch (EntityNotFoundException e) { + throw new SchemaNotFoundException(namespace); + } + catch (AmazonServiceException e) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, e); + } + } + + @Override + public Map loadNamespaceMetadata(ConnectorSession session, String namespace) + { + try { + GetDatabaseRequest getDatabaseRequest = new GetDatabaseRequest().withName(namespace); + Database database = stats.getGetDatabase().call(() -> + glueClient.getDatabase(getDatabaseRequest).getDatabase()); + ImmutableMap.Builder metadata = ImmutableMap.builder(); + if (database.getLocationUri() != null) { + metadata.put(LOCATION_PROPERTY, database.getLocationUri()); + } + if (database.getParameters() != null) { + metadata.putAll(database.getParameters()); + } + return metadata.buildOrThrow(); + } + catch (EntityNotFoundException e) { + throw new SchemaNotFoundException(namespace); + } + catch (AmazonServiceException e) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, e); + } + } + + @Override + public Optional getNamespacePrincipal(ConnectorSession session, String namespace) + { + return Optional.empty(); + } + + @Override + public void createNamespace(ConnectorSession session, String namespace, Map properties, TrinoPrincipal owner) + { + checkArgument(owner.getType() == PrincipalType.USER, "Owner type must be USER"); + checkArgument(owner.getName().equals(session.getUser()), "Explicit schema owner is not supported"); + + try { + stats.getCreateDatabase().call(() -> + glueClient.createDatabase(new CreateDatabaseRequest() + .withDatabaseInput(createDatabaseInput(namespace, properties)))); + } + catch (AlreadyExistsException e) { + throw new SchemaAlreadyExistsException(namespace); + } + catch (AmazonServiceException e) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, e); + } + } + + private DatabaseInput createDatabaseInput(String namespace, Map properties) + { + DatabaseInput databaseInput = new DatabaseInput().withName(namespace); + Object location = properties.get(LOCATION_PROPERTY); + if (location != null) { + databaseInput.setLocationUri((String) location); + } + + return databaseInput; + } + + @Override + public void setNamespacePrincipal(ConnectorSession session, String namespace, TrinoPrincipal principal) + { + throw new TrinoException(NOT_SUPPORTED, "setNamespacePrincipal is not supported for Iceberg Glue catalogs"); + } + + @Override + public void renameNamespace(ConnectorSession session, String source, String target) + { + throw new TrinoException(NOT_SUPPORTED, "renameNamespace is not supported for Iceberg Glue catalogs"); + } + + @Override + public List listTables(ConnectorSession session, Optional namespace) + { + try { + List namespaces = namespace.map(List::of).orElseGet(() -> listNamespaces(session)); + return namespaces.stream() + .flatMap(glueNamespace -> { + try { + return getPaginatedResults( + glueClient::getTables, + new GetTablesRequest().withDatabaseName(glueNamespace), + GetTablesRequest::setNextToken, + GetTablesResult::getNextToken, + stats.getGetTables()) + .map(GetTablesResult::getTableList) + .flatMap(List::stream) + .map(table -> new SchemaTableName(glueNamespace, table.getName())); + } + catch (EntityNotFoundException e) { + // Namespace may have been deleted + return Stream.empty(); + } + }) + .collect(toImmutableList()); + } + catch (AmazonServiceException e) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, e); + } + } + + @Override + public Table loadTable(ConnectorSession session, SchemaTableName table) + { + TableMetadata metadata = tableMetadataCache.computeIfAbsent( + table, + ignore -> { + TableOperations operations = tableOperationsProvider.createTableOperations( + this, + session, + table.getSchemaName(), + table.getTableName(), + Optional.empty(), + Optional.empty()); + return new BaseTable(operations, quotedTableName(table)).operations().current(); + }); + + return getIcebergTableWithMetadata( + this, + tableOperationsProvider, + session, + table, + metadata); + } + + @Override + public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) + { + BaseTable table = (BaseTable) loadTable(session, schemaTableName); + validateTableCanBeDropped(table); + try { + deleteTable(schemaTableName.getSchemaName(), schemaTableName.getTableName()); + } + catch (AmazonServiceException e) { + throw new TrinoException(HIVE_METASTORE_ERROR, e); + } + dropTableData(table.io(), table.operations().current()); + deleteTableDirectory(session, schemaTableName, hdfsEnvironment, new Path(table.location())); + } + + @Override + public Transaction newCreateTableTransaction( + ConnectorSession session, + SchemaTableName schemaTableName, + Schema schema, + PartitionSpec partitionSpec, + String location, + Map properties) + { + return newCreateTableTransaction( + session, + schemaTableName, + schema, + partitionSpec, + location, + properties, + Optional.of(session.getUser())); + } + + @Override + public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTableName to) + { + boolean newTableCreated = false; + try { + GetTableRequest getTableRequest = new GetTableRequest() + .withDatabaseName(from.getSchemaName()) + .withName(from.getTableName()); + com.amazonaws.services.glue.model.Table table = stats.getGetTable().call(() -> glueClient.getTable(getTableRequest).getTable()); + TableInput tableInput = getTableInput(to.getTableName(), Optional.ofNullable(table.getOwner()), table.getParameters()); + CreateTableRequest createTableRequest = new CreateTableRequest() + .withDatabaseName(to.getSchemaName()) + .withTableInput(tableInput); + stats.getCreateTable().call(() -> glueClient.createTable(createTableRequest)); + newTableCreated = true; + deleteTable(from.getSchemaName(), from.getTableName()); + } + catch (RuntimeException e) { + if (newTableCreated) { + try { + deleteTable(to.getSchemaName(), to.getTableName()); + } + catch (RuntimeException cleanupException) { + if (!cleanupException.equals(e)) { + e.addSuppressed(cleanupException); + } + } + } + throw e; + } + } + + private void deleteTable(String schema, String table) + { + stats.getDeleteTable().call(() -> + glueClient.deleteTable(new DeleteTableRequest() + .withDatabaseName(schema) + .withName(table))); + } + + @Override + public String defaultTableLocation(ConnectorSession session, SchemaTableName schemaTableName) + { + GetDatabaseRequest getDatabaseRequest = new GetDatabaseRequest() + .withName(schemaTableName.getSchemaName()); + String databaseLocation = stats.getGetDatabase().call(() -> + glueClient.getDatabase(getDatabaseRequest) + .getDatabase() + .getLocationUri()); + + String tableName = createNewTableName(schemaTableName.getTableName()); + + Path location; + if (databaseLocation == null) { + if (defaultSchemaLocation.isEmpty()) { + throw new TrinoException( + HIVE_DATABASE_LOCATION_ERROR, + format("Schema '%s' location cannot be determined. " + + "Either set the 'location' property when creating the schema, or set the 'hive.metastore.glue.default-warehouse-dir' " + + "catalog property.", + schemaTableName.getSchemaName())); + } + String schemaDirectoryName = schemaTableName.getSchemaName() + ".db"; + location = new Path(new Path(defaultSchemaLocation.get(), schemaDirectoryName), tableName); + } + else { + location = new Path(databaseLocation, tableName); + } + + return location.toString(); + } + + @Override + public void setTablePrincipal(ConnectorSession session, SchemaTableName schemaTableName, TrinoPrincipal principal) + { + throw new TrinoException(NOT_SUPPORTED, "setTablePrincipal is not supported for Iceberg Glue catalogs"); + } + + @Override + public void createView(ConnectorSession session, SchemaTableName schemaViewName, ConnectorViewDefinition definition, boolean replace) + { + throw new TrinoException(NOT_SUPPORTED, "createView is not supported for Iceberg Glue catalogs"); + } + + @Override + public void renameView(ConnectorSession session, SchemaTableName source, SchemaTableName target) + { + throw new TrinoException(NOT_SUPPORTED, "renameView is not supported for Iceberg Glue catalogs"); + } + + @Override + public void setViewPrincipal(ConnectorSession session, SchemaTableName schemaViewName, TrinoPrincipal principal) + { + throw new TrinoException(NOT_SUPPORTED, "setViewPrincipal is not supported for Iceberg Glue catalogs"); + } + + @Override + public void dropView(ConnectorSession session, SchemaTableName schemaViewName) + { + throw new TrinoException(NOT_SUPPORTED, "dropView is not supported for Iceberg Glue catalogs"); + } + + @Override + public List listViews(ConnectorSession session, Optional namespace) + { + return ImmutableList.of(); + } + + @Override + public Map getViews(ConnectorSession session, Optional namespace) + { + return ImmutableMap.of(); + } + + @Override + public Optional getView(ConnectorSession session, SchemaTableName viewIdentifier) + { + return Optional.empty(); + } + + @Override + public List listMaterializedViews(ConnectorSession session, Optional namespace) + { + return ImmutableList.of(); + } + + @Override + public void createMaterializedView(ConnectorSession session, SchemaTableName schemaViewName, ConnectorMaterializedViewDefinition definition, boolean replace, boolean ignoreExisting) + { + throw new TrinoException(NOT_SUPPORTED, "createMaterializedView is not supported for Iceberg Glue catalogs"); + } + + @Override + public void dropMaterializedView(ConnectorSession session, SchemaTableName schemaViewName) + { + throw new TrinoException(NOT_SUPPORTED, "dropMaterializedView is not supported for Iceberg Glue catalogs"); + } + + @Override + public Optional getMaterializedView(ConnectorSession session, SchemaTableName schemaViewName) + { + return Optional.empty(); + } + + @Override + public void renameMaterializedView(ConnectorSession session, SchemaTableName source, SchemaTableName target) + { + throw new TrinoException(NOT_SUPPORTED, "renameMaterializedView is not supported for Iceberg Glue catalogs"); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalogFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalogFactory.java new file mode 100644 index 000000000000..47a805e843b9 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalogFactory.java @@ -0,0 +1,75 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.glue; + +import com.amazonaws.services.glue.AWSGlueAsync; +import io.trino.plugin.hive.HdfsEnvironment; +import io.trino.plugin.hive.metastore.glue.GlueHiveMetastoreConfig; +import io.trino.plugin.hive.metastore.glue.GlueMetastoreStats; +import io.trino.plugin.iceberg.IcebergConfig; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; +import io.trino.spi.security.ConnectorIdentity; +import org.weakref.jmx.Flatten; +import org.weakref.jmx.Managed; + +import javax.inject.Inject; + +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.trino.plugin.hive.metastore.glue.GlueHiveMetastore.createAsyncGlueClient; +import static java.util.Objects.requireNonNull; + +public class TrinoGlueCatalogFactory + implements TrinoCatalogFactory +{ + private final HdfsEnvironment hdfsEnvironment; + private final IcebergTableOperationsProvider tableOperationsProvider; + private final Optional defaultSchemaLocation; + private final AWSGlueAsync glueClient; + private final boolean isUniqueTableLocation; + private final GlueMetastoreStats stats = new GlueMetastoreStats(); + + @Inject + public TrinoGlueCatalogFactory( + HdfsEnvironment hdfsEnvironment, + IcebergTableOperationsProvider tableOperationsProvider, + GlueHiveMetastoreConfig glueConfig, + IcebergConfig icebergConfig) + { + this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); + this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationsProvider is null"); + requireNonNull(glueConfig, "glueConfig is null"); + checkArgument(glueConfig.getCatalogId().isEmpty(), "catalogId configuration is not supported"); + this.defaultSchemaLocation = glueConfig.getDefaultWarehouseDir(); + this.glueClient = createAsyncGlueClient(glueConfig, Optional.empty(), stats.newRequestMetricsCollector()); + requireNonNull(icebergConfig, "icebergConfig is null"); + this.isUniqueTableLocation = icebergConfig.isUniqueTableLocation(); + } + + @Managed + @Flatten + public GlueMetastoreStats getStats() + { + return stats; + } + + @Override + public TrinoCatalog create(ConnectorIdentity identity) + { + return new TrinoGlueCatalog(hdfsEnvironment, tableOperationsProvider, glueClient, stats, defaultSchemaLocation, isUniqueTableLocation); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperationsProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperationsProvider.java index 38422ae428ba..76cb147056a2 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperationsProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperationsProvider.java @@ -14,11 +14,11 @@ package io.trino.plugin.iceberg.catalog.hms; import io.trino.plugin.hive.HdfsEnvironment.HdfsContext; -import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.hive.metastore.thrift.ThriftMetastore; import io.trino.plugin.iceberg.FileIoProvider; import io.trino.plugin.iceberg.catalog.IcebergTableOperations; import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; import io.trino.spi.connector.ConnectorSession; import javax.inject.Inject; @@ -42,7 +42,7 @@ public HiveMetastoreTableOperationsProvider(FileIoProvider fileIoProvider, Thrif @Override public IcebergTableOperations createTableOperations( - HiveMetastore hiveMetastore, + TrinoCatalog catalog, ConnectorSession session, String database, String table, @@ -51,7 +51,7 @@ public IcebergTableOperations createTableOperations( { return new HiveMetastoreTableOperations( fileIoProvider.createFileIo(new HdfsContext(session), session.getQueryId()), - hiveMetastore, + ((TrinoHiveCatalog) catalog).getMetastore(), thriftMetastore, session, database, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/IcebergHiveMetastoreCatalogModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/IcebergHiveMetastoreCatalogModule.java index 65ab21e6e990..018d6e4ae6ed 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/IcebergHiveMetastoreCatalogModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/IcebergHiveMetastoreCatalogModule.java @@ -16,8 +16,11 @@ import com.google.inject.Binder; import com.google.inject.Scopes; import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.trino.plugin.hive.metastore.DecoratedHiveMetastoreModule; import io.trino.plugin.hive.metastore.thrift.ThriftMetastoreModule; +import io.trino.plugin.iceberg.catalog.IcebergCatalogModule.MetastoreValidator; import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; public class IcebergHiveMetastoreCatalogModule extends AbstractConfigurationAwareModule @@ -27,5 +30,8 @@ protected void setup(Binder binder) { install(new ThriftMetastoreModule()); binder.bind(IcebergTableOperationsProvider.class).to(HiveMetastoreTableOperationsProvider.class).in(Scopes.SINGLETON); + binder.bind(TrinoCatalogFactory.class).to(TrinoHiveCatalogFactory.class).in(Scopes.SINGLETON); + binder.bind(MetastoreValidator.class).asEagerSingleton(); + install(new DecoratedHiveMetastoreModule()); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java index 0e1d255afa48..40f0f8dddf18 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java @@ -28,14 +28,16 @@ import io.trino.plugin.hive.ViewReaderUtil; import io.trino.plugin.hive.metastore.Column; import io.trino.plugin.hive.metastore.Database; +import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.hive.metastore.HivePrincipal; import io.trino.plugin.hive.metastore.PrincipalPrivileges; import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore; import io.trino.plugin.hive.util.HiveUtil; +import io.trino.plugin.iceberg.ColumnIdentity; import io.trino.plugin.iceberg.IcebergMaterializedViewDefinition; import io.trino.plugin.iceberg.IcebergUtil; +import io.trino.plugin.iceberg.catalog.AbstractTrinoCatalog; import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; -import io.trino.plugin.iceberg.catalog.TrinoCatalog; import io.trino.spi.TrinoException; import io.trino.spi.connector.CatalogSchemaTableName; import io.trino.spi.connector.ColumnMetadata; @@ -59,7 +61,6 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; -import org.apache.iceberg.TableOperations; import org.apache.iceberg.Transaction; import java.io.IOException; @@ -90,7 +91,6 @@ import static io.trino.plugin.hive.metastore.StorageFormat.VIEW_STORAGE_FORMAT; import static io.trino.plugin.hive.util.HiveUtil.isHiveSystemSchema; import static io.trino.plugin.hive.util.HiveWriteUtils.getTableDefaultLocation; -import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR; import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.decodeMaterializedViewData; import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.encodeMaterializedViewData; import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.fromConnectorMaterializedViewDefinition; @@ -106,23 +106,16 @@ import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.StandardErrorCode.SCHEMA_NOT_EMPTY; import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND; -import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.UUID.randomUUID; import static org.apache.hadoop.hive.metastore.TableType.VIRTUAL_VIEW; import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE; import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; import static org.apache.iceberg.CatalogUtil.dropTableData; -import static org.apache.iceberg.TableMetadata.newTableMetadata; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; -import static org.apache.iceberg.TableProperties.OBJECT_STORE_PATH; -import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION; -import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION; -import static org.apache.iceberg.TableProperties.WRITE_NEW_DATA_LOCATION; -import static org.apache.iceberg.Transactions.createTableTransaction; public class TrinoHiveCatalog - implements TrinoCatalog + extends AbstractTrinoCatalog { private static final Logger log = Logger.get(TrinoHiveCatalog.class); private static final String ICEBERG_MATERIALIZED_VIEW_COMMENT = "Presto Materialized View"; @@ -140,9 +133,7 @@ public class TrinoHiveCatalog private final CachingHiveMetastore metastore; private final HdfsEnvironment hdfsEnvironment; private final TypeManager typeManager; - private final IcebergTableOperationsProvider tableOperationsProvider; private final String trinoVersion; - private final boolean useUniqueTableLocation; private final boolean isUsingSystemSecurity; private final boolean deleteSchemaLocationsFallback; @@ -160,17 +151,21 @@ public TrinoHiveCatalog( boolean isUsingSystemSecurity, boolean deleteSchemaLocationsFallback) { + super(tableOperationsProvider, useUniqueTableLocation); this.catalogName = requireNonNull(catalogName, "catalogName is null"); this.metastore = requireNonNull(metastore, "metastore is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); - this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationsProvider is null"); this.trinoVersion = requireNonNull(trinoVersion, "trinoVersion is null"); - this.useUniqueTableLocation = useUniqueTableLocation; this.isUsingSystemSecurity = isUsingSystemSecurity; this.deleteSchemaLocationsFallback = deleteSchemaLocationsFallback; } + public HiveMetastore getMetastore() + { + return metastore; + } + @Override public List listNamespaces(ConnectorSession session) { @@ -276,15 +271,14 @@ public Transaction newCreateTableTransaction( String location, Map properties) { - TableMetadata metadata = newTableMetadata(schema, partitionSpec, location, properties); - TableOperations ops = tableOperationsProvider.createTableOperations( - metastore, + return newCreateTableTransaction( session, - schemaTableName.getSchemaName(), - schemaTableName.getTableName(), - isUsingSystemSecurity ? Optional.empty() : Optional.of(session.getUser()), - Optional.of(location)); - return createTableTransaction(schemaTableName.toString(), ops, metadata); + schemaTableName, + schema, + partitionSpec, + location, + properties, + isUsingSystemSecurity ? Optional.empty() : Optional.of(session.getUser())); } @Override @@ -326,22 +320,7 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) // Use the Iceberg routine for dropping the table data because the data files // of the Iceberg table may be located in different locations dropTableData(table.io(), metadata); - deleteTableDirectory(session, metastoreTable); - } - - private void deleteTableDirectory(ConnectorSession session, io.trino.plugin.hive.metastore.Table metastoreTable) - { - Path tablePath = new Path(metastoreTable.getStorage().getLocation()); - try { - FileSystem fileSystem = hdfsEnvironment.getFileSystem(new HdfsContext(session), tablePath); - fileSystem.delete(tablePath, true); - } - catch (IOException e) { - throw new TrinoException( - ICEBERG_FILESYSTEM_ERROR, - format("Failed to delete directory %s of the table %s.%s", tablePath, metastoreTable.getDatabaseName(), metastoreTable.getTableName()), - e); - } + deleteTableDirectory(session, schemaTableName, hdfsEnvironment, new Path(metastoreTable.getStorage().getLocation())); } @Override @@ -355,31 +334,23 @@ public Table loadTable(ConnectorSession session, SchemaTableName schemaTableName { TableMetadata metadata = tableMetadataCache.computeIfAbsent( schemaTableName, - ignore -> ((BaseTable) loadIcebergTable(metastore, tableOperationsProvider, session, schemaTableName)).operations().current()); + ignore -> ((BaseTable) loadIcebergTable(this, tableOperationsProvider, session, schemaTableName)).operations().current()); - return getIcebergTableWithMetadata(metastore, tableOperationsProvider, session, schemaTableName, metadata); + return getIcebergTableWithMetadata(this, tableOperationsProvider, session, schemaTableName, metadata); } @Override public void updateTableComment(ConnectorSession session, SchemaTableName schemaTableName, Optional comment) { metastore.commentTable(schemaTableName.getSchemaName(), schemaTableName.getTableName(), comment); - Table icebergTable = loadTable(session, schemaTableName); - if (comment.isEmpty()) { - icebergTable.updateProperties().remove(TABLE_COMMENT).commit(); - } - else { - icebergTable.updateProperties().set(TABLE_COMMENT, comment.get()).commit(); - } + super.updateTableComment(session, schemaTableName, comment); } @Override public void updateColumnComment(ConnectorSession session, SchemaTableName schemaTableName, ColumnIdentity columnIdentity, Optional comment) { metastore.commentColumn(schemaTableName.getSchemaName(), schemaTableName.getTableName(), columnIdentity.getName(), comment); - - Table icebergTable = loadTable(session, schemaTableName); - icebergTable.updateSchema().updateColumnDoc(columnIdentity.getName(), comment.orElse(null)).commit(); + super.updateColumnComment(session, schemaTableName, columnIdentity, comment); } @Override @@ -387,10 +358,7 @@ public String defaultTableLocation(ConnectorSession session, SchemaTableName sch { Database database = metastore.getDatabase(schemaTableName.getSchemaName()) .orElseThrow(() -> new SchemaNotFoundException(schemaTableName.getSchemaName())); - String tableNameForLocation = schemaTableName.getTableName(); - if (useUniqueTableLocation) { - tableNameForLocation += "-" + randomUUID().toString().replace("-", ""); - } + String tableNameForLocation = createNewTableName(schemaTableName.getTableName()); return getTableDefaultLocation(database, new HdfsEnvironment.HdfsContext(session), hdfsEnvironment, schemaTableName.getSchemaName(), tableNameForLocation).toString(); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalogFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalogFactory.java new file mode 100644 index 000000000000..9310f0be102f --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalogFactory.java @@ -0,0 +1,90 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.hms; + +import io.trino.plugin.base.CatalogName; +import io.trino.plugin.hive.HdfsEnvironment; +import io.trino.plugin.hive.HiveConfig; +import io.trino.plugin.hive.NodeVersion; +import io.trino.plugin.hive.metastore.HiveMetastoreFactory; +import io.trino.plugin.iceberg.IcebergConfig; +import io.trino.plugin.iceberg.IcebergSecurityConfig; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; +import io.trino.spi.security.ConnectorIdentity; +import io.trino.spi.type.TypeManager; + +import javax.inject.Inject; + +import java.util.Optional; + +import static io.trino.plugin.hive.metastore.cache.CachingHiveMetastore.memoizeMetastore; +import static io.trino.plugin.iceberg.IcebergSecurityConfig.IcebergSecurity.SYSTEM; +import static java.util.Objects.requireNonNull; + +public class TrinoHiveCatalogFactory + implements TrinoCatalogFactory +{ + private final CatalogName catalogName; + private final HiveMetastoreFactory metastoreFactory; + private final HdfsEnvironment hdfsEnvironment; + private final TypeManager typeManager; + private final IcebergTableOperationsProvider tableOperationsProvider; + private final String trinoVersion; + private final boolean isUniqueTableLocation; + private final boolean isUsingSystemSecurity; + private final boolean deleteSchemaLocationsFallback; + + @Inject + public TrinoHiveCatalogFactory( + IcebergConfig config, + CatalogName catalogName, + HiveMetastoreFactory metastoreFactory, + HdfsEnvironment hdfsEnvironment, + TypeManager typeManager, + IcebergTableOperationsProvider tableOperationsProvider, + NodeVersion nodeVersion, + IcebergSecurityConfig securityConfig, + HiveConfig hiveConfig) + { + this.catalogName = requireNonNull(catalogName, "catalogName is null"); + this.metastoreFactory = requireNonNull(metastoreFactory, "metastoreFactory is null"); + this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationProvider is null"); + this.trinoVersion = requireNonNull(nodeVersion, "trinoVersion is null").toString(); + requireNonNull(config, "config is null"); + this.isUniqueTableLocation = config.isUniqueTableLocation(); + requireNonNull(securityConfig, "securityConfig is null"); + this.isUsingSystemSecurity = securityConfig.getSecuritySystem() == SYSTEM; + requireNonNull(hiveConfig, "hiveConfig is null"); + this.deleteSchemaLocationsFallback = hiveConfig.isDeleteSchemaLocationsFallback(); + } + + @Override + public TrinoCatalog create(ConnectorIdentity identity) + { + return new TrinoHiveCatalog( + catalogName, + memoizeMetastore(metastoreFactory.createMetastore(Optional.of(identity)), 1000), + hdfsEnvironment, + typeManager, + tableOperationsProvider, + trinoVersion, + isUniqueTableLocation, + isUsingSystemSecurity, + deleteSchemaLocationsFallback); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergGlueCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergGlueCatalogConnectorSmokeTest.java new file mode 100644 index 000000000000..0d9e26534368 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergGlueCatalogConnectorSmokeTest.java @@ -0,0 +1,153 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.google.common.collect.ImmutableMap; +import io.trino.plugin.hive.metastore.glue.GlueMetastoreApiStats; +import io.trino.testing.QueryRunner; +import org.apache.iceberg.FileFormat; +import org.testng.annotations.AfterClass; +import org.testng.annotations.Parameters; +import org.testng.annotations.Test; + +import java.util.List; +import java.util.Optional; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.plugin.hive.metastore.glue.AwsSdkUtil.getPaginatedResults; +import static io.trino.plugin.iceberg.IcebergQueryRunner.createIcebergQueryRunner; +import static io.trino.testing.sql.TestTable.randomTableSuffix; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/* + * TestIcebergGlueCatalogConnectorSmokeTest currently uses AWS Default Credential Provider Chain, + * See https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default + * on ways to set your AWS credentials which will be needed to run this test. + */ +public class TestIcebergGlueCatalogConnectorSmokeTest + extends BaseIcebergConnectorSmokeTest +{ + private final String bucketName; + private final String schemaName; + + @Parameters("s3.bucket") + public TestIcebergGlueCatalogConnectorSmokeTest(String bucketName) + { + super(FileFormat.PARQUET); + this.bucketName = requireNonNull(bucketName, "bucketName is null"); + this.schemaName = "iceberg_smoke_test_" + randomTableSuffix(); + } + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return createIcebergQueryRunner( + ImmutableMap.of(), + ImmutableMap.of( + "iceberg.catalog.type", "glue", + "hive.metastore.glue.default-warehouse-dir", schemaPath()), + SchemaInitializer.builder() + .withClonedTpchTables(REQUIRED_TPCH_TABLES) + .withSchemaName(schemaName) + .build(), + Optional.empty()); + } + + @AfterClass(alwaysRun = true) + public void cleanup() + { + computeActual("SHOW TABLES").getMaterializedRows() + .forEach(table -> getQueryRunner().execute("DROP TABLE " + table.getField(0))); + getQueryRunner().execute("DROP SCHEMA IF EXISTS " + schemaName); + + // DROP TABLES should clean up any files, but clear the directory manually to be safe + AmazonS3 s3 = AmazonS3ClientBuilder.standard().build(); + + ListObjectsV2Request listObjectsRequest = new ListObjectsV2Request() + .withBucketName(bucketName) + .withPrefix(schemaPath()); + List keysToDelete = getPaginatedResults( + s3::listObjectsV2, + listObjectsRequest, + ListObjectsV2Request::setContinuationToken, + ListObjectsV2Result::getNextContinuationToken, + new GlueMetastoreApiStats()) + .map(ListObjectsV2Result::getObjectSummaries) + .flatMap(objectSummaries -> objectSummaries.stream().map(S3ObjectSummary::getKey)) + .map(DeleteObjectsRequest.KeyVersion::new) + .collect(toImmutableList()); + + if (!keysToDelete.isEmpty()) { + s3.deleteObjects(new DeleteObjectsRequest(bucketName).withKeys(keysToDelete)); + } + } + + @Test + @Override + public void testShowCreateTable() + { + assertThat((String) computeScalar("SHOW CREATE TABLE region")) + .isEqualTo(format("" + + "CREATE TABLE iceberg.%1$s.region (\n" + + " regionkey bigint,\n" + + " name varchar,\n" + + " comment varchar\n" + + ")\n" + + "WITH (\n" + + " format = 'ORC',\n" + + " location = '%2$s/%1$s.db/region'\n" + + ")", + schemaName, + schemaPath())); + } + + @Test + @Override + public void testView() + { + assertThatThrownBy(super::testView) + .hasStackTraceContaining("createView is not supported for Iceberg Glue catalogs"); + } + + @Test + @Override + public void testMaterializedView() + { + assertThatThrownBy(super::testMaterializedView) + .hasStackTraceContaining("createMaterializedView is not supported for Iceberg Glue catalogs"); + } + + @Test + @Override + public void testRenameSchema() + { + assertThatThrownBy(super::testRenameSchema) + .hasStackTraceContaining("renameNamespace is not supported for Iceberg Glue catalogs"); + } + + private String schemaPath() + { + return format("s3://%s/%s", bucketName, schemaName); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMergeAppend.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMergeAppend.java index d81681d264c6..b220e30de602 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMergeAppend.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMergeAppend.java @@ -14,6 +14,7 @@ package io.trino.plugin.iceberg; import com.google.common.collect.ImmutableSet; +import io.trino.plugin.base.CatalogName; import io.trino.plugin.hive.HdfsConfig; import io.trino.plugin.hive.HdfsConfiguration; import io.trino.plugin.hive.HdfsConfigurationInitializer; @@ -26,8 +27,11 @@ import io.trino.plugin.hive.metastore.file.FileHiveMetastore; import io.trino.plugin.hive.metastore.file.FileHiveMetastoreConfig; import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; import io.trino.plugin.iceberg.catalog.file.FileMetastoreTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalog; import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.type.TestingTypeManager; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.DistributedQueryRunner; import io.trino.testing.MaterializedResult; @@ -38,13 +42,13 @@ import java.io.File; +import static io.trino.plugin.hive.metastore.cache.CachingHiveMetastore.memoizeMetastore; import static org.testng.Assert.assertEquals; public class TestIcebergMergeAppend extends AbstractTestQueryFramework { - private HiveMetastore metastore; - private HdfsEnvironment hdfsEnvironment; + private TrinoCatalog trinoCatalog; private IcebergTableOperationsProvider tableOperationsProvider; @Override @@ -53,10 +57,10 @@ protected QueryRunner createQueryRunner() throws Exception DistributedQueryRunner queryRunner = IcebergQueryRunner.createIcebergQueryRunner(); HdfsConfig hdfsConfig = new HdfsConfig(); HdfsConfiguration hdfsConfiguration = new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hdfsConfig), ImmutableSet.of()); - hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, hdfsConfig, new NoHdfsAuthentication()); + HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, hdfsConfig, new NoHdfsAuthentication()); File baseDir = queryRunner.getCoordinator().getBaseDataDir().resolve("iceberg_data").toFile(); - metastore = new FileHiveMetastore( + HiveMetastore metastore = new FileHiveMetastore( new NodeVersion("testversion"), hdfsEnvironment, new MetastoreConfig(), @@ -64,6 +68,16 @@ protected QueryRunner createQueryRunner() throws Exception .setCatalogDirectory(baseDir.toURI().toString()) .setMetastoreUser("test")); tableOperationsProvider = new FileMetastoreTableOperationsProvider(new HdfsFileIoProvider(hdfsEnvironment)); + trinoCatalog = new TrinoHiveCatalog( + new CatalogName("catalog"), + memoizeMetastore(metastore, 1000), + hdfsEnvironment, + new TestingTypeManager(), + tableOperationsProvider, + "trino-version", + false, + false, + false); return queryRunner; } @@ -72,7 +86,7 @@ protected QueryRunner createQueryRunner() throws Exception public void testInsertWithAppend() { assertUpdate("CREATE TABLE table_to_insert (_bigint BIGINT, _varchar VARCHAR)"); - Table table = IcebergUtil.loadIcebergTable(metastore, tableOperationsProvider, TestingConnectorSession.SESSION, + Table table = IcebergUtil.loadIcebergTable(trinoCatalog, tableOperationsProvider, TestingConnectorSession.SESSION, new SchemaTableName("tpch", "table_to_insert")); table.updateProperties() .set("commit.manifest.min-count-to-merge", "2") diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergOrcMetricsCollection.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergOrcMetricsCollection.java index 58d459761a38..f2076d80d679 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergOrcMetricsCollection.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergOrcMetricsCollection.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableSet; import io.trino.Session; +import io.trino.plugin.base.CatalogName; import io.trino.plugin.hive.HdfsConfig; import io.trino.plugin.hive.HdfsConfiguration; import io.trino.plugin.hive.HdfsConfigurationInitializer; @@ -27,9 +28,12 @@ import io.trino.plugin.hive.metastore.file.FileHiveMetastore; import io.trino.plugin.hive.metastore.file.FileHiveMetastoreConfig; import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; import io.trino.plugin.iceberg.catalog.file.FileMetastoreTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalog; import io.trino.plugin.tpch.TpchPlugin; import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.type.TestingTypeManager; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.DistributedQueryRunner; import io.trino.testing.MaterializedResult; @@ -48,6 +52,7 @@ import static io.trino.SystemSessionProperties.MAX_DRIVERS_PER_TASK; import static io.trino.SystemSessionProperties.TASK_CONCURRENCY; import static io.trino.SystemSessionProperties.TASK_WRITER_COUNT; +import static io.trino.plugin.hive.metastore.cache.CachingHiveMetastore.memoizeMetastore; import static io.trino.plugin.iceberg.DataFileRecord.toDataFileRecord; import static io.trino.testing.TestingSession.testSessionBuilder; import static org.testng.Assert.assertEquals; @@ -56,8 +61,7 @@ public class TestIcebergOrcMetricsCollection extends AbstractTestQueryFramework { - private HiveMetastore metastore; - private HdfsEnvironment hdfsEnvironment; + private TrinoCatalog trinoCatalog; private IcebergTableOperationsProvider tableOperationsProvider; @Override @@ -80,9 +84,9 @@ protected QueryRunner createQueryRunner() HdfsConfig hdfsConfig = new HdfsConfig(); HdfsConfiguration hdfsConfiguration = new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hdfsConfig), ImmutableSet.of()); - hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, hdfsConfig, new NoHdfsAuthentication()); + HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, hdfsConfig, new NoHdfsAuthentication()); - metastore = new FileHiveMetastore( + HiveMetastore metastore = new FileHiveMetastore( new NodeVersion("test_version"), hdfsEnvironment, new MetastoreConfig(), @@ -90,6 +94,16 @@ protected QueryRunner createQueryRunner() .setCatalogDirectory(baseDir.toURI().toString()) .setMetastoreUser("test")); tableOperationsProvider = new FileMetastoreTableOperationsProvider(new HdfsFileIoProvider(hdfsEnvironment)); + trinoCatalog = new TrinoHiveCatalog( + new CatalogName("catalog"), + memoizeMetastore(metastore, 1000), + hdfsEnvironment, + new TestingTypeManager(), + tableOperationsProvider, + "trino-version", + false, + false, + false); queryRunner.installPlugin(new TestingIcebergPlugin(Optional.of(metastore), Optional.empty())); queryRunner.createCatalog("iceberg", "iceberg"); @@ -106,7 +120,7 @@ protected QueryRunner createQueryRunner() public void testMetrics() { assertUpdate("create table no_metrics (c1 varchar, c2 varchar)"); - Table table = IcebergUtil.loadIcebergTable(metastore, tableOperationsProvider, TestingConnectorSession.SESSION, + Table table = IcebergUtil.loadIcebergTable(trinoCatalog, tableOperationsProvider, TestingConnectorSession.SESSION, new SchemaTableName("test_schema", "no_metrics")); // skip metrics for all columns table.updateProperties().set("write.metadata.metrics.default", "none").commit(); @@ -123,7 +137,7 @@ public void testMetrics() // keep c1 metrics assertUpdate("create table c1_metrics (c1 varchar, c2 varchar)"); - table = IcebergUtil.loadIcebergTable(metastore, tableOperationsProvider, TestingConnectorSession.SESSION, + table = IcebergUtil.loadIcebergTable(trinoCatalog, tableOperationsProvider, TestingConnectorSession.SESSION, new SchemaTableName("test_schema", "c1_metrics")); table.updateProperties() .set("write.metadata.metrics.default", "none") @@ -141,7 +155,7 @@ public void testMetrics() // set c1 metrics mode to count assertUpdate("create table c1_metrics_count (c1 varchar, c2 varchar)"); - table = IcebergUtil.loadIcebergTable(metastore, tableOperationsProvider, TestingConnectorSession.SESSION, + table = IcebergUtil.loadIcebergTable(trinoCatalog, tableOperationsProvider, TestingConnectorSession.SESSION, new SchemaTableName("test_schema", "c1_metrics_count")); table.updateProperties() .set("write.metadata.metrics.default", "none") @@ -159,7 +173,7 @@ public void testMetrics() // set c1 metrics mode to truncate(10) assertUpdate("create table c1_metrics_truncate (c1 varchar, c2 varchar)"); - table = IcebergUtil.loadIcebergTable(metastore, tableOperationsProvider, TestingConnectorSession.SESSION, + table = IcebergUtil.loadIcebergTable(trinoCatalog, tableOperationsProvider, TestingConnectorSession.SESSION, new SchemaTableName("test_schema", "c1_metrics_truncate")); table.updateProperties() .set("write.metadata.metrics.default", "none") @@ -179,7 +193,7 @@ public void testMetrics() // keep both c1 and c2 metrics assertUpdate("create table c_metrics (c1 varchar, c2 varchar)"); - table = IcebergUtil.loadIcebergTable(metastore, tableOperationsProvider, TestingConnectorSession.SESSION, + table = IcebergUtil.loadIcebergTable(trinoCatalog, tableOperationsProvider, TestingConnectorSession.SESSION, new SchemaTableName("test_schema", "c_metrics")); table.updateProperties() .set("write.metadata.metrics.column.c1", "full") @@ -196,7 +210,7 @@ public void testMetrics() // keep all metrics assertUpdate("create table metrics (c1 varchar, c2 varchar)"); - table = IcebergUtil.loadIcebergTable(metastore, tableOperationsProvider, TestingConnectorSession.SESSION, + table = IcebergUtil.loadIcebergTable(trinoCatalog, tableOperationsProvider, TestingConnectorSession.SESSION, new SchemaTableName("test_schema", "metrics")); table.updateProperties() .set("write.metadata.metrics.default", "full") diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java index f6953ab1a417..14f5142992ef 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java @@ -70,11 +70,13 @@ public void testGlueMetastore() { ConnectorFactory factory = getConnectorFactory(); - assertThatThrownBy(() -> factory.create( + factory.create( "test", - Map.of("iceberg.catalog.type", "glue"), - new TestingConnectorContext())) - .hasMessageMatching("(?s).*Explicit bindings are required and HiveMetastoreFactory .* is not explicitly bound.*"); + Map.of( + "iceberg.catalog.type", "glue", + "hive.metastore.glue.region", "us-east-1"), + new TestingConnectorContext()) + .shutdown(); assertThatThrownBy(() -> factory.create( "test", @@ -83,6 +85,15 @@ public void testGlueMetastore() "hive.metastore.uri", "thrift://foo:1234"), new TestingConnectorContext())) .hasMessageContaining("Error: Configuration property 'hive.metastore.uri' was not used"); + + assertThatThrownBy(() -> factory.create( + "test", + Map.of( + "iceberg.catalog.type", "glue", + "hive.metastore.glue.catalogid", "123", + "hive.metastore.glue.region", "us-east-1"), + new TestingConnectorContext())) + .hasMessageContaining("catalogId configuration is not supported"); } @Test @@ -99,6 +110,16 @@ public void testRecordingMetastore() "hive.metastore-recording-path", "/tmp"), new TestingConnectorContext()) .shutdown(); + + // recording with glue + assertThatThrownBy(() -> factory.create( + "test", + Map.of( + "iceberg.catalog.type", "glue", + "hive.metastore.glue.region", "us-east-2", + "hive.metastore-recording-path", "/tmp"), + new TestingConnectorContext())) + .hasMessageContaining("Configuration property 'hive.metastore-recording-path' was not used"); } @Test diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java index b8d8c12067e4..33c998f0ec07 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.airlift.units.Duration; +import io.trino.plugin.base.CatalogName; import io.trino.plugin.hive.HdfsConfig; import io.trino.plugin.hive.HdfsConfiguration; import io.trino.plugin.hive.HdfsConfigurationInitializer; @@ -25,7 +26,9 @@ import io.trino.plugin.hive.authentication.NoHdfsAuthentication; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; import io.trino.plugin.iceberg.catalog.file.FileMetastoreTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalog; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.DynamicFilter; import io.trino.spi.connector.SchemaTableName; @@ -56,9 +59,9 @@ import static com.google.common.io.MoreFiles.deleteRecursively; import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static io.trino.plugin.hive.metastore.cache.CachingHiveMetastore.memoizeMetastore; import static io.trino.plugin.hive.metastore.file.FileHiveMetastore.createTestingFileHiveMetastore; import static io.trino.plugin.iceberg.IcebergQueryRunner.createIcebergQueryRunner; -import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable; import static io.trino.spi.connector.Constraint.alwaysTrue; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.testing.TestingConnectorSession.SESSION; @@ -72,8 +75,7 @@ public class TestIcebergSplitSource extends AbstractTestQueryFramework { private File metastoreDir; - private HiveMetastore metastore; - private IcebergTableOperationsProvider operationsProvider; + private TrinoCatalog catalog; @Override protected QueryRunner createQueryRunner() @@ -85,8 +87,18 @@ protected QueryRunner createQueryRunner() File tempDir = Files.createTempDirectory("test_iceberg_split_source").toFile(); this.metastoreDir = new File(tempDir, "iceberg_data"); - this.metastore = createTestingFileHiveMetastore(metastoreDir); - this.operationsProvider = new FileMetastoreTableOperationsProvider(new HdfsFileIoProvider(hdfsEnvironment)); + HiveMetastore metastore = createTestingFileHiveMetastore(metastoreDir); + IcebergTableOperationsProvider operationsProvider = new FileMetastoreTableOperationsProvider(new HdfsFileIoProvider(hdfsEnvironment)); + this.catalog = new TrinoHiveCatalog( + new CatalogName("hive"), + memoizeMetastore(metastore, 1000), + hdfsEnvironment, + new TestingTypeManager(), + operationsProvider, + "test", + false, + false, + false); return createIcebergQueryRunner(ImmutableMap.of(), ImmutableMap.of(), ImmutableList.of(NATION), Optional.of(metastoreDir)); } @@ -113,7 +125,7 @@ public void testIncompleteDynamicFilterTimeout() TupleDomain.all(), ImmutableSet.of(), Optional.empty()); - Table nationTable = loadIcebergTable(metastore, operationsProvider, SESSION, schemaTableName); + Table nationTable = catalog.loadTable(SESSION, schemaTableName); IcebergSplitSource splitSource = new IcebergSplitSource( tableHandle, diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java index 8f8047296c84..fd65ecd1e157 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import io.trino.plugin.base.CatalogName; import io.trino.plugin.hive.HdfsConfig; import io.trino.plugin.hive.HdfsConfiguration; import io.trino.plugin.hive.HdfsConfigurationInitializer; @@ -23,9 +24,13 @@ import io.trino.plugin.hive.HiveHdfsConfiguration; import io.trino.plugin.hive.authentication.NoHdfsAuthentication; import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore; import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; import io.trino.plugin.iceberg.catalog.file.FileMetastoreTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalog; import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.type.TestingTypeManager; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.QueryRunner; import org.apache.hadoop.fs.FileSystem; @@ -184,7 +189,17 @@ private void writeEqualityDeleteToNationTable(Table icebergTable) private Table updateTableToV2(String tableName) { IcebergTableOperationsProvider tableOperationsProvider = new FileMetastoreTableOperationsProvider(new HdfsFileIoProvider(hdfsEnvironment)); - BaseTable table = (BaseTable) loadIcebergTable(metastore, tableOperationsProvider, SESSION, new SchemaTableName("tpch", tableName)); + TrinoCatalog catalog = new TrinoHiveCatalog( + new CatalogName("hive"), + CachingHiveMetastore.memoizeMetastore(metastore, 1000), + hdfsEnvironment, + new TestingTypeManager(), + tableOperationsProvider, + "test", + false, + false, + false); + BaseTable table = (BaseTable) loadIcebergTable(catalog, tableOperationsProvider, SESSION, new SchemaTableName("tpch", tableName)); TableOperations operations = table.operations(); TableMetadata currentMetadata = operations.current(); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestSharedGlueMetastore.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestSharedGlueMetastore.java new file mode 100644 index 000000000000..57fdc7dd8e1a --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestSharedGlueMetastore.java @@ -0,0 +1,233 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.airlift.log.Logger; +import io.trino.Session; +import io.trino.plugin.hive.HdfsConfig; +import io.trino.plugin.hive.HdfsConfigurationInitializer; +import io.trino.plugin.hive.HdfsEnvironment; +import io.trino.plugin.hive.HiveHdfsConfiguration; +import io.trino.plugin.hive.TestingHivePlugin; +import io.trino.plugin.hive.authentication.NoHdfsAuthentication; +import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.plugin.hive.metastore.glue.DefaultGlueColumnStatisticsProviderFactory; +import io.trino.plugin.hive.metastore.glue.GlueHiveMetastore; +import io.trino.plugin.hive.metastore.glue.GlueHiveMetastoreConfig; +import io.trino.plugin.tpch.TpchPlugin; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.DistributedQueryRunner; +import io.trino.testing.QueryRunner; +import io.trino.tpch.TpchTable; +import org.testng.annotations.AfterClass; +import org.testng.annotations.Test; + +import java.nio.file.Path; +import java.util.Optional; + +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static io.trino.plugin.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; +import static io.trino.plugin.tpch.TpchMetadata.TINY_SCHEMA_NAME; +import static io.trino.testing.QueryAssertions.copyTpchTables; +import static io.trino.testing.TestingSession.testSessionBuilder; +import static io.trino.testing.sql.TestTable.randomTableSuffix; +import static java.lang.String.format; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.testng.Assert.assertEquals; + +/** + * Tests metadata operations on a schema which has a mix of Hive and Iceberg tables. + * + * Requires AWS credentials, which can be provided any way supported by the DefaultProviderChain + * See https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default + */ +public class TestSharedGlueMetastore + extends AbstractTestQueryFramework +{ + private static final Logger LOG = Logger.get(TestSharedGlueMetastore.class); + private static final String HIVE_CATALOG = "hive"; + + private final String schema = "test_shared_glue_schema_" + randomTableSuffix(); + private Path dataDirectory; + private HiveMetastore glueMetastore; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + Session icebergSession = testSessionBuilder() + .setCatalog(ICEBERG_CATALOG) + .setSchema(schema) + .build(); + Session hiveSession = testSessionBuilder() + .setCatalog(HIVE_CATALOG) + .setSchema(schema) + .build(); + + DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(icebergSession).build(); + + queryRunner.installPlugin(new TpchPlugin()); + queryRunner.createCatalog("tpch", "tpch"); + + this.dataDirectory = queryRunner.getCoordinator().getBaseDataDir().resolve("iceberg_data"); + this.dataDirectory.toFile().deleteOnExit(); + + queryRunner.installPlugin(new IcebergPlugin()); + queryRunner.createCatalog( + ICEBERG_CATALOG, + "iceberg", + ImmutableMap.of( + "iceberg.catalog.type", "glue", + "hive.metastore.glue.default-warehouse-dir", dataDirectory.toString())); + + HdfsConfig hdfsConfig = new HdfsConfig(); + HdfsEnvironment hdfsEnvironment = new HdfsEnvironment( + new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hdfsConfig), ImmutableSet.of()), + hdfsConfig, + new NoHdfsAuthentication()); + this.glueMetastore = new GlueHiveMetastore( + hdfsEnvironment, + new GlueHiveMetastoreConfig(), + directExecutor(), + new DefaultGlueColumnStatisticsProviderFactory(new GlueHiveMetastoreConfig(), directExecutor(), directExecutor()), + Optional.empty(), + table -> true); + queryRunner.installPlugin(new TestingHivePlugin(glueMetastore)); + queryRunner.createCatalog(HIVE_CATALOG, "hive"); + queryRunner.createCatalog( + "hive_with_redirections", + "hive", + ImmutableMap.of("hive.iceberg-catalog-name", "iceberg")); + + queryRunner.execute("CREATE SCHEMA " + schema + " WITH (location = '" + dataDirectory.toString() + "')"); + copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, icebergSession, ImmutableList.of(TpchTable.NATION)); + copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, hiveSession, ImmutableList.of(TpchTable.REGION)); + + return queryRunner; + } + + @AfterClass(alwaysRun = true) + public void cleanup() + { + try { + if (glueMetastore != null) { + // Data is on the local disk and will be deleted by the deleteOnExit hook + glueMetastore.dropDatabase(schema, false); + } + } + catch (Exception e) { + LOG.error(e, "Failed to clean up Glue database: %s", schema); + } + } + + @Test + public void testReadInformationSchema() + { + assertThat(query("SELECT table_schema FROM hive.information_schema.tables WHERE table_name = 'region'")) + .skippingTypesCheck() + .containsAll("VALUES '" + schema + "'"); + assertThat(query("SELECT table_schema FROM iceberg.information_schema.tables WHERE table_name = 'nation'")) + .skippingTypesCheck() + .containsAll("VALUES '" + schema + "'"); + assertThat(query("SELECT table_schema FROM hive_with_redirections.information_schema.tables WHERE table_name = 'region'")) + .skippingTypesCheck() + .containsAll("VALUES '" + schema + "'"); + assertThat(query("SELECT table_schema FROM hive_with_redirections.information_schema.tables WHERE table_name = 'nation'")) + .skippingTypesCheck() + .containsAll("VALUES '" + schema + "'"); + + assertQuery("SELECT table_name, column_name from hive.information_schema.columns WHERE table_schema = '" + schema + "'", + "VALUES ('region', 'regionkey'), ('region', 'name'), ('region', 'comment')"); + assertQuery("SELECT table_name, column_name from iceberg.information_schema.columns WHERE table_schema = '" + schema + "'", + "VALUES ('nation', 'nationkey'), ('nation', 'name'), ('nation', 'regionkey'), ('nation', 'comment')"); + assertQuery("SELECT table_name, column_name from hive_with_redirections.information_schema.columns WHERE table_schema = '" + schema + "'", + "VALUES" + + "('region', 'regionkey'), ('region', 'name'), ('region', 'comment'), " + + "('nation', 'nationkey'), ('nation', 'name'), ('nation', 'regionkey'), ('nation', 'comment')"); + } + + @Test + public void testShowTables() + { + assertQuery("SHOW TABLES FROM iceberg." + schema, "VALUES 'region', 'nation'"); + assertQuery("SHOW TABLES FROM hive." + schema, "VALUES 'region', 'nation'"); + assertQuery("SHOW TABLES FROM hive_with_redirections." + schema, "VALUES 'region', 'nation'"); + + assertThatThrownBy(() -> query("SHOW CREATE TABLE iceberg." + schema + ".region")) + .hasMessageContaining("Not an Iceberg table"); + assertThatThrownBy(() -> query("SHOW CREATE TABLE hive." + schema + ".nation")) + .hasMessageContaining("Cannot query Iceberg table"); + + assertThatThrownBy(() -> query("DESCRIBE iceberg." + schema + ".region")) + .hasMessageContaining("Not an Iceberg table"); + assertThatThrownBy(() -> query("DESCRIBE hive." + schema + ".nation")) + .hasMessageContaining("Cannot query Iceberg table"); + } + + @Test + public void testShowSchemas() + { + assertThat(query("SHOW SCHEMAS FROM hive")) + .skippingTypesCheck() + .containsAll("VALUES '" + schema + "'"); + assertThat(query("SHOW SCHEMAS FROM iceberg")) + .skippingTypesCheck() + .containsAll("VALUES '" + schema + "'"); + assertThat(query("SHOW SCHEMAS FROM hive_with_redirections")) + .skippingTypesCheck() + .containsAll("VALUES '" + schema + "'"); + + String expectedHiveCreateSchema = "CREATE SCHEMA %s.%s\n" + + "AUTHORIZATION ROLE public\n" + + "WITH (\n" + + " location = '%s'\n" + + ")"; + + String showCreateHiveSchema = (String) computeActual("SHOW CREATE SCHEMA hive." + schema).getOnlyValue(); + assertEquals( + showCreateHiveSchema, + format(expectedHiveCreateSchema, "hive", schema, dataDirectory)); + String showCreateIcebergSchema = (String) computeActual("SHOW CREATE SCHEMA iceberg." + schema).getOnlyValue(); + assertEquals( + showCreateIcebergSchema, + format("CREATE SCHEMA iceberg.%s\n" + + "WITH (\n" + + " location = '%s'\n" + + ")", + schema, + dataDirectory)); + String showCreateHiveWithRedirectionsSchema = (String) computeActual("SHOW CREATE SCHEMA hive_with_redirections." + schema).getOnlyValue(); + assertEquals( + showCreateHiveWithRedirectionsSchema, + format(expectedHiveCreateSchema, "hive_with_redirections", schema, dataDirectory)); + } + + @Test + public void testSelect() + { + assertQuery("SELECT * FROM iceberg." + schema + ".nation", "SELECT * FROM nation"); + assertQuery("SELECT * FROM hive." + schema + ".region", "SELECT * FROM region"); + assertQuery("SELECT * FROM hive_with_redirections." + schema + ".nation", "SELECT * FROM nation"); + assertQuery("SELECT * FROM hive_with_redirections." + schema + ".region", "SELECT * FROM region"); + + assertThatThrownBy(() -> query("SELECT * FROM iceberg." + schema + ".region")) + .hasMessageContaining("Not an Iceberg table"); + assertThatThrownBy(() -> query("SELECT * FROM hive." + schema + ".nation")) + .hasMessageContaining("Cannot query Iceberg table"); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestTrinoGlueCatalogTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestTrinoGlueCatalogTest.java new file mode 100644 index 000000000000..d0992325a322 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestTrinoGlueCatalogTest.java @@ -0,0 +1,109 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.amazonaws.services.glue.AWSGlueAsyncClientBuilder; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.airlift.log.Logger; +import io.trino.plugin.hive.HdfsConfig; +import io.trino.plugin.hive.HdfsConfigurationInitializer; +import io.trino.plugin.hive.HdfsEnvironment; +import io.trino.plugin.hive.HiveHdfsConfiguration; +import io.trino.plugin.hive.authentication.NoHdfsAuthentication; +import io.trino.plugin.hive.metastore.glue.GlueHiveMetastoreConfig; +import io.trino.plugin.hive.metastore.glue.GlueMetastoreStats; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.plugin.iceberg.catalog.glue.GlueIcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.glue.TrinoGlueCatalog; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.security.PrincipalType; +import io.trino.spi.security.TrinoPrincipal; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Optional; + +import static io.trino.testing.TestingConnectorSession.SESSION; +import static io.trino.testing.sql.TestTable.randomTableSuffix; +import static org.testng.Assert.assertEquals; + +public class TestTrinoGlueCatalogTest + extends BaseTrinoCatalogTest +{ + private static final Logger LOG = Logger.get(TestTrinoGlueCatalogTest.class); + + @Override + protected TrinoCatalog createTrinoCatalog(boolean useUniqueTableLocations) + { + HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(new HiveHdfsConfiguration( + new HdfsConfigurationInitializer( + new HdfsConfig(), + ImmutableSet.of()), + ImmutableSet.of()), + new HdfsConfig(), + new NoHdfsAuthentication()); + return new TrinoGlueCatalog( + hdfsEnvironment, + new GlueIcebergTableOperationsProvider(new HdfsFileIoProvider(hdfsEnvironment), new GlueHiveMetastoreConfig()), + AWSGlueAsyncClientBuilder.defaultClient(), + new GlueMetastoreStats(), + Optional.empty(), + useUniqueTableLocations); + } + + @Test + public void testDefaultLocation() + throws IOException + { + Path tmpDirectory = Files.createTempDirectory("test_glue_catalog_default_location_"); + tmpDirectory.toFile().deleteOnExit(); + + HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(new HiveHdfsConfiguration( + new HdfsConfigurationInitializer( + new HdfsConfig(), + ImmutableSet.of()), + ImmutableSet.of()), + new HdfsConfig(), + new NoHdfsAuthentication()); + TrinoCatalog catalogWithDefaultLocation = new TrinoGlueCatalog( + hdfsEnvironment, + new GlueIcebergTableOperationsProvider(new HdfsFileIoProvider(hdfsEnvironment), new GlueHiveMetastoreConfig()), + AWSGlueAsyncClientBuilder.defaultClient(), + new GlueMetastoreStats(), + Optional.of(tmpDirectory.toAbsolutePath().toString()), + false); + + String namespace = "test_default_location_" + randomTableSuffix(); + String table = "tableName"; + SchemaTableName schemaTableName = new SchemaTableName(namespace, table); + catalogWithDefaultLocation.createNamespace(SESSION, namespace, ImmutableMap.of(), new TrinoPrincipal(PrincipalType.USER, SESSION.getUser())); + try { + File expectedSchemaDirectory = new File(tmpDirectory.toFile(), namespace + ".db"); + File expectedTableDirectory = new File(expectedSchemaDirectory, schemaTableName.getTableName()); + assertEquals(catalogWithDefaultLocation.defaultTableLocation(SESSION, schemaTableName), expectedTableDirectory.toPath().toAbsolutePath().toString()); + } + finally { + try { + catalogWithDefaultLocation.dropNamespace(SESSION, namespace); + } + catch (Exception e) { + LOG.warn("Failed to clean up namespace: %s", namespace); + } + } + } +}