From d20bb4eca083f5d9f1e4a80b1a38c8e4a2b8d381 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Fri, 28 Jun 2024 09:43:59 +0900 Subject: [PATCH] Add support for reading from Unity catalog with Iceberg REST --- .github/workflows/ci.yml | 5 + .../main/sphinx/object-storage/metastores.md | 15 + plugin/trino-iceberg/pom.xml | 23 + .../rest/IcebergRestCatalogConfig.java | 15 + .../rest/TrinoIcebergRestCatalogFactory.java | 4 + .../catalog/rest/TrinoRestCatalog.java | 46 +- .../plugin/iceberg/IcebergQueryRunner.java | 12 +- .../rest/TestIcebergRestCatalogConfig.java | 3 + ...ergUnityRestCatalogConnectorSmokeTest.java | 645 ++++++++++++++++++ .../catalog/rest/TestTrinoRestCatalog.java | 11 +- 10 files changed, 765 insertions(+), 14 deletions(-) create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergUnityRestCatalogConnectorSmokeTest.java diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a07e83d59b05..1de740eb0193 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -708,6 +708,11 @@ jobs: SNOWFLAKE_CATALOG_S3_SECRET_ACCESS_KEY: ${{ secrets.SNOWFLAKE_CATALOG_S3_SECRET_ACCESS_KEY }} SNOWFLAKE_EXTERNAL_VOLUME: ${{ vars.SNOWFLAKE_EXTERNAL_VOLUME }} SNOWFLAKE_CATALOG_S3_REGION: ${{ vars.SNOWFLAKE_CATALOG_S3_REGION }} + DATABRICKS_HOST: ${{ vars.DATABRICKS_HOST }} + DATABRICKS_LOGIN: token + DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }} + DATABRICKS_UNITY_CATALOG_NAME: ${{ vars.DATABRICKS_UNITY_CATALOG_NAME }} + DATABRICKS_UNITY_JDBC_URL: ${{ vars.DATABRICKS_UNITY_JDBC_URL }} if: >- contains(matrix.modules, 'trino-iceberg') && contains(matrix.profile, 'cloud-tests') && (env.CI_SKIP_SECRETS_PRESENCE_CHECKS != '' || env.AWS_ACCESS_KEY_ID != '' || env.AWS_SECRET_ACCESS_KEY != '' || env.GCP_CREDENTIALS_KEY != '') diff --git a/docs/src/main/sphinx/object-storage/metastores.md b/docs/src/main/sphinx/object-storage/metastores.md index 331a85e80822..5e62e62664e2 100644 --- a/docs/src/main/sphinx/object-storage/metastores.md +++ b/docs/src/main/sphinx/object-storage/metastores.md @@ -470,6 +470,9 @@ following properties: * - `iceberg.rest-catalog.warehouse` - Warehouse identifier/location for the catalog (optional). Example: `s3://my_bucket/warehouse_location` +* - `iceberg.rest-catalog.namespace` + - The namespace to use with the REST catalog server. Example: + `main` * - `iceberg.rest-catalog.security` - The type of security to use (default: `NONE`). `OAUTH2` requires either a `token` or `credential`. Example: `OAUTH2` @@ -497,6 +500,18 @@ iceberg.catalog.type=rest iceberg.rest-catalog.uri=http://iceberg-with-rest:8181 ``` +`iceberg.security` must be `read_only` when connecting to Databricks Unity catalog +using an Iceberg REST catalog: + +```properties +connector.name=iceberg +iceberg.catalog.type=rest +iceberg.rest-catalog.uri=https://dbc-12345678-9999.cloud.databricks.com/api/2.1/unity-catalog/iceberg +iceberg.security=read_only +iceberg.rest-catalog.security=OAUTH2 +iceberg.rest-catalog.oauth2.token=*** +``` + The REST catalog supports [view management](sql-view-management) using the [Iceberg View specification](https://iceberg.apache.org/view-spec/). diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index b54c9d95138b..6c49beefdbc7 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -366,6 +366,19 @@ runtime + + org.apache.httpcomponents + httpclient + 4.5.14 + runtime + + + commons-logging + commons-logging + + + + org.apache.httpcomponents.client5 httpclient5 @@ -428,6 +441,13 @@ runtime + + com.databricks + databricks-jdbc + 2.6.36 + test + + io.airlift http-server @@ -697,6 +717,7 @@ iceberg-build.properties mozilla/public-suffix-list.txt mime.types + Log4j-charsets.properties @@ -732,6 +753,7 @@ **/Test*FailureRecoveryTest.java **/TestIcebergSnowflakeCatalogConnectorSmokeTest.java **/TestTrinoSnowflakeCatalog.java + **/TestIcebergUnityRestCatalogConnectorSmokeTest.java @@ -796,6 +818,7 @@ **/TestIcebergAbfsConnectorSmokeTest.java **/TestIcebergSnowflakeCatalogConnectorSmokeTest.java **/TestTrinoSnowflakeCatalog.java + **/TestIcebergUnityRestCatalogConnectorSmokeTest.java diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogConfig.java index 3d515d1e186c..cac016467808 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogConfig.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogConfig.java @@ -16,6 +16,7 @@ import io.airlift.configuration.Config; import io.airlift.configuration.ConfigDescription; import jakarta.validation.constraints.NotNull; +import org.apache.iceberg.catalog.Namespace; import java.net.URI; import java.util.Optional; @@ -37,6 +38,7 @@ public enum SessionType private URI restUri; private Optional prefix = Optional.empty(); private Optional warehouse = Optional.empty(); + private Namespace namespace = Namespace.of(); private Security security = Security.NONE; private SessionType sessionType = SessionType.NONE; private boolean vendedCredentialsEnabled; @@ -83,6 +85,19 @@ public IcebergRestCatalogConfig setWarehouse(String warehouse) return this; } + public Namespace getNamespace() + { + return namespace; + } + + @Config("iceberg.rest-catalog.namespace") + @ConfigDescription("The namespace to use with the REST catalog server") + public IcebergRestCatalogConfig setNamespace(String namespace) + { + this.namespace = namespace == null ? Namespace.empty() : Namespace.of(namespace); + return this; + } + @NotNull public Security getSecurity() { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoIcebergRestCatalogFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoIcebergRestCatalogFactory.java index d048bd82372b..f7113dcfbe0a 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoIcebergRestCatalogFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoIcebergRestCatalogFactory.java @@ -28,6 +28,7 @@ import io.trino.spi.security.ConnectorIdentity; import io.trino.spi.type.TypeManager; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.rest.HTTPClient; import org.apache.iceberg.rest.RESTSessionCatalog; @@ -49,6 +50,7 @@ public class TrinoIcebergRestCatalogFactory private final URI serverUri; private final Optional prefix; private final Optional warehouse; + private final Namespace namespace; private final SessionType sessionType; private final boolean vendedCredentialsEnabled; private final SecurityProperties securityProperties; @@ -75,6 +77,7 @@ public TrinoIcebergRestCatalogFactory( this.serverUri = restConfig.getBaseUri(); this.prefix = restConfig.getPrefix(); this.warehouse = restConfig.getWarehouse(); + this.namespace = restConfig.getNamespace(); this.sessionType = restConfig.getSessionType(); this.vendedCredentialsEnabled = restConfig.isVendedCredentialsEnabled(); this.securityProperties = requireNonNull(securityProperties, "securityProperties is null"); @@ -122,6 +125,7 @@ public synchronized TrinoCatalog create(ConnectorIdentity identity) catalogName, sessionType, credentials, + namespace, trinoVersion, typeManager, uniqueTableLocation); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java index de9feeee9a83..a9923707370f 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java @@ -69,6 +69,7 @@ import org.apache.iceberg.view.ViewRepresentation; import org.apache.iceberg.view.ViewVersion; +import java.util.Arrays; import java.util.Date; import java.util.Iterator; import java.util.List; @@ -92,6 +93,7 @@ import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.UUID.randomUUID; +import static java.util.stream.Collectors.joining; import static org.apache.iceberg.view.ViewProperties.COMMENT; public class TrinoRestCatalog @@ -106,6 +108,7 @@ public class TrinoRestCatalog private final TypeManager typeManager; private final SessionType sessionType; private final Map credentials; + private final Namespace namespace; private final String trinoVersion; private final boolean useUniqueTableLocation; @@ -118,6 +121,7 @@ public TrinoRestCatalog( CatalogName catalogName, SessionType sessionType, Map credentials, + Namespace namespace, String trinoVersion, TypeManager typeManager, boolean useUniqueTableLocation) @@ -126,6 +130,7 @@ public TrinoRestCatalog( this.catalogName = requireNonNull(catalogName, "catalogName is null"); this.sessionType = requireNonNull(sessionType, "sessionType is null"); this.credentials = ImmutableMap.copyOf(requireNonNull(credentials, "credentials is null")); + this.namespace = requireNonNull(namespace, "namespace is null"); this.trinoVersion = requireNonNull(trinoVersion, "trinoVersion is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.useUniqueTableLocation = useUniqueTableLocation; @@ -134,14 +139,14 @@ public TrinoRestCatalog( @Override public boolean namespaceExists(ConnectorSession session, String namespace) { - return restSessionCatalog.namespaceExists(convert(session), Namespace.of(namespace)); + return restSessionCatalog.namespaceExists(convert(session), toNamespace(namespace)); } @Override public List listNamespaces(ConnectorSession session) { - return restSessionCatalog.listNamespaces(convert(session)).stream() - .map(Namespace::toString) + return restSessionCatalog.listNamespaces(convert(session), namespace).stream() + .map(this::toSchemaName) .collect(toImmutableList()); } @@ -164,7 +169,7 @@ public Map loadNamespaceMetadata(ConnectorSession session, Strin { try { // Return immutable metadata as direct modifications will not be reflected on the namespace - return ImmutableMap.copyOf(restSessionCatalog.loadNamespaceMetadata(convert(session), Namespace.of(namespace))); + return ImmutableMap.copyOf(restSessionCatalog.loadNamespaceMetadata(convert(session), toNamespace(namespace))); } catch (NoSuchNamespaceException e) { throw new SchemaNotFoundException(namespace); @@ -362,7 +367,10 @@ public Table loadTable(ConnectorSession session, SchemaTableName schemaTableName tableCache, schemaTableName, () -> { - BaseTable baseTable = (BaseTable) restSessionCatalog.loadTable(convert(session), toIdentifier(schemaTableName)); + Namespace namespace = toNamespace(schemaTableName.getSchemaName()); + TableIdentifier identifier = TableIdentifier.of(namespace, schemaTableName.getTableName()); + + BaseTable baseTable = (BaseTable) restSessionCatalog.loadTable(convert(session), identifier); // Creating a new base table is necessary to adhere to Trino's expectations for quoted table names return new BaseTable(baseTable.operations(), quotedTableName(schemaTableName)); }); @@ -653,19 +661,39 @@ private void invalidateTableCache(SchemaTableName schemaTableName) tableCache.invalidate(schemaTableName); } - private static TableIdentifier toIdentifier(SchemaTableName schemaTableName) + private Namespace toNamespace(String schemaName) + { + if (namespace.isEmpty()) { + return Namespace.of(schemaName); + } + return Namespace.of(namespace + "." + schemaName); + } + + private String toSchemaName(Namespace namespace) { - return TableIdentifier.of(schemaTableName.getSchemaName(), schemaTableName.getTableName()); + if (this.namespace.isEmpty()) { + return namespace.toString(); + } + return Arrays.stream(namespace.levels(), this.namespace.length(), namespace.length()) + .collect(joining(".")); + } + + private TableIdentifier toIdentifier(SchemaTableName schemaTableName) + { + if (namespace.isEmpty()) { + return TableIdentifier.of(schemaTableName.getSchemaName(), schemaTableName.getTableName()); + } + return TableIdentifier.of(Namespace.of(namespace + "." + schemaTableName.getSchemaName()), schemaTableName.getTableName()); } private List listNamespaces(ConnectorSession session, Optional namespace) { if (namespace.isEmpty()) { return listNamespaces(session).stream() - .map(Namespace::of) + .map(this::toNamespace) .collect(toImmutableList()); } - return ImmutableList.of(Namespace.of(namespace.get())); + return ImmutableList.of(toNamespace(namespace.get())); } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java index 274646e6ac2c..257dd18ed406 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java @@ -43,7 +43,6 @@ import java.util.Optional; import java.util.Set; -import static com.google.common.base.Preconditions.checkState; import static io.airlift.testing.Closeables.closeAllSuppress; import static io.trino.plugin.iceberg.catalog.jdbc.TestingIcebergJdbcServer.PASSWORD; import static io.trino.plugin.iceberg.catalog.jdbc.TestingIcebergJdbcServer.USER; @@ -83,7 +82,7 @@ public static class Builder { private Optional metastoreDirectory = Optional.empty(); private ImmutableMap.Builder icebergProperties = ImmutableMap.builder(); - private Optional schemaInitializer = Optional.empty(); + private Optional schemaInitializer = Optional.of(SchemaInitializer.builder().build()); protected Builder() { @@ -133,12 +132,17 @@ public Builder setInitialTables(Iterable> initialTables) public Builder setSchemaInitializer(SchemaInitializer schemaInitializer) { - checkState(this.schemaInitializer.isEmpty(), "schemaInitializer is already set"); this.schemaInitializer = Optional.of(requireNonNull(schemaInitializer, "schemaInitializer is null")); amendSession(sessionBuilder -> sessionBuilder.setSchema(schemaInitializer.getSchemaName())); return self(); } + public Builder disableSchemaInitializer() + { + schemaInitializer = Optional.empty(); + return self(); + } + @Override public DistributedQueryRunner build() throws Exception @@ -155,7 +159,7 @@ public DistributedQueryRunner build() Path dataDir = metastoreDirectory.map(File::toPath).orElseGet(() -> queryRunner.getCoordinator().getBaseDataDir().resolve("iceberg_data")); queryRunner.installPlugin(new TestingIcebergPlugin(dataDir)); queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg", icebergProperties.buildOrThrow()); - schemaInitializer.orElseGet(() -> SchemaInitializer.builder().build()).accept(queryRunner); + schemaInitializer.ifPresent(initializer -> initializer.accept(queryRunner)); return queryRunner; } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogConfig.java index 0f26a254bda0..1bc938fa1357 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogConfig.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogConfig.java @@ -31,6 +31,7 @@ public void testDefaults() .setBaseUri(null) .setPrefix(null) .setWarehouse(null) + .setNamespace(null) .setSessionType(IcebergRestCatalogConfig.SessionType.NONE) .setSecurity(IcebergRestCatalogConfig.Security.NONE) .setVendedCredentialsEnabled(false)); @@ -43,6 +44,7 @@ public void testExplicitPropertyMappings() .put("iceberg.rest-catalog.uri", "http://localhost:1234") .put("iceberg.rest-catalog.prefix", "dev") .put("iceberg.rest-catalog.warehouse", "test_warehouse_identifier") + .put("iceberg.rest-catalog.namespace", "main") .put("iceberg.rest-catalog.security", "OAUTH2") .put("iceberg.rest-catalog.session", "USER") .put("iceberg.rest-catalog.vended-credentials-enabled", "true") @@ -52,6 +54,7 @@ public void testExplicitPropertyMappings() .setBaseUri("http://localhost:1234") .setPrefix("dev") .setWarehouse("test_warehouse_identifier") + .setNamespace("main") .setSessionType(IcebergRestCatalogConfig.SessionType.USER) .setSecurity(IcebergRestCatalogConfig.Security.OAUTH2) .setVendedCredentialsEnabled(true); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergUnityRestCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergUnityRestCatalogConnectorSmokeTest.java new file mode 100644 index 000000000000..cc78244de346 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergUnityRestCatalogConnectorSmokeTest.java @@ -0,0 +1,645 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.rest; + +import com.databricks.client.jdbc.Driver; +import com.google.common.collect.ImmutableMap; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; +import io.airlift.log.Logger; +import io.trino.filesystem.Location; +import io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest; +import io.trino.plugin.iceberg.IcebergConfig; +import io.trino.plugin.iceberg.IcebergQueryRunner; +import io.trino.testing.DistributedQueryRunner; +import io.trino.testing.QueryRunner; +import io.trino.testing.TestingConnectorBehavior; +import io.trino.tpch.TpchTable; +import org.assertj.core.util.Files; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.parallel.Execution; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Path; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.temporal.ChronoUnit; +import java.util.Optional; +import java.util.Properties; + +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; +import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD; + +@TestInstance(PER_CLASS) +@Execution(SAME_THREAD) // Don't run tests in parallel for mitigating rate limit of Databricks REST API +final class TestIcebergUnityRestCatalogConnectorSmokeTest + extends BaseIcebergConnectorSmokeTest +{ + private static final Logger log = Logger.get(TestIcebergUnityRestCatalogConnectorSmokeTest.class); + + private static final RetryPolicy RETRY_POLICY = RetryPolicy.builder() + .handleIf(throwable -> throwable.getMessage().contains("HTTP Response code: 502")) + .withBackoff(1, 10, ChronoUnit.SECONDS) + .withMaxRetries(60) + .onRetry(event -> log.warn(event.getLastException(), "Query failed on attempt %d, will retry.", event.getAttemptCount())) + .build(); + + private final File warehouseLocation; + private final String databricksHost; + private final String databricksLogin; + private final String databricksToken; + private final String databricksUnityJdbcUrl; + private final String databricksCatalogName; + private final String s3Region; + private final String s3AccessKey; + private final String s3SecretKey; + + public TestIcebergUnityRestCatalogConnectorSmokeTest() + { + super(new IcebergConfig().getFileFormat().toIceberg()); + warehouseLocation = Files.newTemporaryFolder(); + + databricksHost = requireEnv("DATABRICKS_HOST"); + databricksLogin = requireEnv("DATABRICKS_LOGIN"); + databricksToken = requireEnv("DATABRICKS_TOKEN"); + databricksUnityJdbcUrl = requireEnv("DATABRICKS_UNITY_JDBC_URL") + ";EnableArrow=0"; + databricksCatalogName = requireEnv("DATABRICKS_UNITY_CATALOG_NAME"); + + s3Region = requireEnv("AWS_REGION"); + s3AccessKey = requireEnv("AWS_ACCESS_KEY_ID"); + s3SecretKey = requireEnv("AWS_SECRET_ACCESS_KEY"); + } + + private static String requireEnv(String variable) + { + return requireNonNull(System.getenv(variable), () -> "environment variable not set: " + variable); + } + + @Override + protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) + { + return switch (connectorBehavior) { + case SUPPORTS_CREATE_MATERIALIZED_VIEW, + SUPPORTS_RENAME_MATERIALIZED_VIEW, + SUPPORTS_RENAME_SCHEMA -> false; + default -> super.hasBehavior(connectorBehavior); + }; + } + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + closeAfterClass(() -> deleteRecursively(warehouseLocation.toPath(), ALLOW_INSECURE)); + + DistributedQueryRunner queryRunner = IcebergQueryRunner.builder() + .setBaseDataDir(Optional.of(warehouseLocation.toPath())) + .setIcebergProperties( + ImmutableMap.builder() + .put("iceberg.file-format", format.name()) + .put("iceberg.catalog.type", "rest") + .put("iceberg.security", "read_only") + .put("iceberg.rest-catalog.uri", "https://%s:443/api/2.1/unity-catalog/iceberg" .formatted(databricksHost)) + .put("iceberg.rest-catalog.namespace", databricksCatalogName) + .put("iceberg.rest-catalog.security", "OAUTH2") + .put("iceberg.rest-catalog.oauth2.token", databricksToken) + .put("iceberg.register-table-procedure.enabled", "true") + .put("fs.native-s3.enabled", "true") + .put("s3.region", s3Region) + .put("s3.aws-access-key", s3AccessKey) + .put("s3.aws-secret-key", s3SecretKey) + .buildOrThrow()) + .disableSchemaInitializer() + .build(); + + for (TpchTable table : REQUIRED_TPCH_TABLES) { + copyTpchTable(queryRunner, table.getTableName()); + } + + return queryRunner; + } + + private void copyTpchTable(DistributedQueryRunner queryRunner, String tableName) + { + if (isLoaded(queryRunner, tableName)) { + return; + } + + Properties properties = new Properties(); + properties.setProperty("user", databricksLogin); + properties.setProperty("password", databricksToken); + + try (Connection connection = new Driver().connect(databricksUnityJdbcUrl, properties); + Statement statement = connection.createStatement()) { + Failsafe.with(RETRY_POLICY).run(() -> statement.execute("DROP TABLE IF EXISTS main.tpch." + tableName)); + + String columns = switch (tableName) { + case "region" -> "r_regionkey AS regionkey, r_name AS name, r_comment AS comment"; + case "nation" -> "n_nationkey AS nationkey, n_name AS name, n_regionkey AS regionkey, n_comment AS comment"; + default -> throw new IllegalArgumentException("Unexpected table name: " + tableName); + }; + + String createTableSql = "CREATE TABLE main.tpch." + tableName + " " + + "USING DELTA " + + "TBLPROPERTIES ('delta.universalFormat.enabledFormats' = 'iceberg') AS " + + "SELECT " + columns + " FROM samples.tpch." + tableName; + + Failsafe.with(RETRY_POLICY).run(() -> statement.execute(createTableSql)); + } + catch (SQLException e) { + throw new RuntimeException(e); + } + } + + private static boolean isLoaded(DistributedQueryRunner queryRunner, String tableName) + { + try { + long actual = (long) queryRunner.execute("SELECT COUNT(*) FROM " + tableName).getOnlyValue(); + long expected = (long) queryRunner.execute("SELECT COUNT(*) FROM tpch.tiny." + tableName).getOnlyValue(); + return actual == expected; + } + catch (Exception e) { + return false; + } + } + + @Override + protected void dropTableFromMetastore(String tableName) + { + throw new UnsupportedOperationException(); + } + + @Override + protected String getMetadataLocation(String tableName) + { + throw new UnsupportedOperationException(); + } + + @Override + protected String schemaPath() + { + return format("%s/%s", warehouseLocation, getSession().getSchema()); + } + + @Override + protected boolean locationExists(String location) + { + throw new UnsupportedOperationException(); + } + + @Override + protected boolean isFileSorted(Location path, String sortColumnName) + { + throw new UnsupportedOperationException(); + } + + @Override + protected void deleteDirectory(String location) + { + try { + deleteRecursively(Path.of(location), ALLOW_INSECURE); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Test + @Override // Overridden as the table location and column data type is different + public void testShowCreateTable() + { + assertThat((String) computeScalar("SHOW CREATE TABLE " + TpchTable.REGION.getTableName())) + .matches("" + + "CREATE TABLE iceberg.tpch.region \\(\n" + + " regionkey bigint,\n" + + " name varchar,\n" + + " comment varchar\n" + + "\\)\n" + + "WITH \\(\n" + + " format = 'PARQUET',\n" + + " format_version = 1,\n" + + " location = 's3://.*'\n" + + "\\)"); + } + + @Test + @Override + public void testView() + { + assertThatThrownBy(super::testView) + .hasStackTraceContaining("Access Denied"); + } + + @Test + @Override + public void testCommentView() + { + assertThatThrownBy(super::testCommentView) + .hasStackTraceContaining("Access Denied"); + } + + @Test + @Override + public void testCommentViewColumn() + { + assertThatThrownBy(super::testCommentViewColumn) + .hasStackTraceContaining("Access Denied"); + } + + @Test + @Override + public void testMaterializedView() + { + assertThatThrownBy(super::testMaterializedView) + .hasStackTraceContaining("Access Denied"); + } + + @Test + @Override + public void testRenameSchema() + { + assertThatThrownBy(super::testRenameSchema) + .hasStackTraceContaining("Access Denied"); + } + + @Test + @Override + public void testRenameTable() + { + assertThatThrownBy(super::testRenameTable) + .hasStackTraceContaining("Access Denied"); + } + + @Test + @Override + public void testRenameTableAcrossSchemas() + { + assertThatThrownBy(super::testRenameTableAcrossSchemas) + .hasStackTraceContaining("Access Denied"); + } + + @Test + @Override + public void testCreateTable() + { + assertThatThrownBy(super::testCreateTable) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testCreateTableAsSelect() + { + assertThatThrownBy(super::testCreateTableAsSelect) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testUpdate() + { + assertThatThrownBy(super::testUpdate) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testInsert() + { + assertThatThrownBy(super::testInsert) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testHiddenPathColumn() + { + assertThatThrownBy(super::testHiddenPathColumn) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testRowLevelDelete() + { + assertThatThrownBy(super::testRowLevelDelete) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testDeleteAllDataFromTable() + { + assertThatThrownBy(super::testDeleteAllDataFromTable) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testDeleteRowsConcurrently() + { + assertThatThrownBy(super::testDeleteRowsConcurrently) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testCreateOrReplaceTable() + { + assertThatThrownBy(super::testCreateOrReplaceTable) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testCreateOrReplaceTableChangeColumnNamesAndTypes() + { + assertThatThrownBy(super::testCreateOrReplaceTableChangeColumnNamesAndTypes) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testRegisterTableWithTableLocation() + { + assertThatThrownBy(super::testRegisterTableWithTableLocation) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testRegisterTableWithComments() + { + assertThatThrownBy(super::testRegisterTableWithComments) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testRowLevelUpdate() + { + assertThatThrownBy(super::testRowLevelUpdate) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testMerge() + { + assertThatThrownBy(super::testMerge) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testCreateSchema() + { + assertThatThrownBy(super::testCreateSchema) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testCreateSchemaWithNonLowercaseOwnerName() + { + assertThatThrownBy(super::testCreateSchemaWithNonLowercaseOwnerName) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testRegisterTableWithShowCreateTable() + { + assertThatThrownBy(super::testRegisterTableWithShowCreateTable) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testRegisterTableWithReInsert() + { + assertThatThrownBy(super::testRegisterTableWithReInsert) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testRegisterTableWithDroppedTable() + { + assertThatThrownBy(super::testRegisterTableWithDroppedTable) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testRegisterTableWithDifferentTableName() + { + assertThatThrownBy(super::testRegisterTableWithDifferentTableName) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testRegisterTableWithMetadataFile() + { + assertThatThrownBy(super::testRegisterTableWithMetadataFile) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testCreateTableWithTrailingSpaceInLocation() + { + assertThatThrownBy(super::testCreateTableWithTrailingSpaceInLocation) + .hasStackTraceContaining("Access Denied"); + } + + @Test + @Override + public void testRegisterTableWithTrailingSpaceInLocation() + { + assertThatThrownBy(super::testRegisterTableWithTrailingSpaceInLocation) + .hasStackTraceContaining("Access Denied"); + } + + @Test + @Override + public void testUnregisterTable() + { + assertThatThrownBy(super::testUnregisterTable) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testUnregisterBrokenTable() + { + assertThatThrownBy(super::testUnregisterBrokenTable) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testUnregisterTableNotExistingTable() + { + assertThatThrownBy(super::testUnregisterTableNotExistingTable) + .hasStackTraceContaining("Table .* not found"); + } + + @Test + @Override + public void testUnregisterTableNotExistingSchema() + { + assertThatThrownBy(super::testUnregisterTableNotExistingSchema) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testRepeatUnregisterTable() + { + assertThatThrownBy(super::testRepeatUnregisterTable) + .hasStackTraceContaining("Table .* not found"); + } + + @Test + @Override + public void testUnregisterTableAccessControl() + { + assertThatThrownBy(super::testUnregisterTableAccessControl) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testCreateTableWithNonExistingSchemaVerifyLocation() + { + assertThatThrownBy(super::testCreateTableWithNonExistingSchemaVerifyLocation) + .hasStackTraceContaining("Access Denied"); + } + + @Test + @Override + public void testSortedNationTable() + { + assertThatThrownBy(super::testSortedNationTable) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testFileSortingWithLargerTable() + { + assertThatThrownBy(super::testFileSortingWithLargerTable) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testDropTableWithMissingMetadataFile() + { + assertThatThrownBy(super::testDropTableWithMissingMetadataFile) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testDropTableWithMissingSnapshotFile() + { + assertThatThrownBy(super::testDropTableWithMissingSnapshotFile) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testDropTableWithMissingManifestListFile() + { + assertThatThrownBy(super::testDropTableWithMissingManifestListFile) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testDropTableWithMissingDataFile() + { + assertThatThrownBy(super::testDropTableWithMissingDataFile) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testDropTableWithNonExistentTableLocation() + { + assertThatThrownBy(super::testDropTableWithNonExistentTableLocation) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testMetadataTables() + { + assertThatThrownBy(super::testMetadataTables) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testPartitionFilterRequired() + { + assertThatThrownBy(super::testPartitionFilterRequired) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testTableChangesFunction() + { + assertThatThrownBy(super::testTableChangesFunction) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testRowLevelDeletesWithTableChangesFunction() + { + assertThatThrownBy(super::testRowLevelDeletesWithTableChangesFunction) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testCreateOrReplaceWithTableChangesFunction() + { + assertThatThrownBy(super::testCreateOrReplaceWithTableChangesFunction) + .hasMessageContaining("Access Denied"); + } + + @Test + @Override + public void testTruncateTable() + { + assertThatThrownBy(super::testTruncateTable) + .hasMessageContaining("Access Denied"); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java index ed54ed24fa21..84131fa30399 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java @@ -32,6 +32,7 @@ import io.trino.spi.security.TrinoPrincipal; import io.trino.spi.type.TestingTypeManager; import io.trino.spi.type.VarcharType; +import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.exceptions.BadRequestException; import org.apache.iceberg.rest.DelegatingRestSessionCatalog; import org.apache.iceberg.rest.RESTSessionCatalog; @@ -79,7 +80,15 @@ private static TrinoRestCatalog createTrinoRestCatalog(boolean useUniqueTableLoc restSessionCatalog.initialize(catalogName, properties); - return new TrinoRestCatalog(restSessionCatalog, new CatalogName(catalogName), NONE, ImmutableMap.of(), "test", new TestingTypeManager(), useUniqueTableLocations); + return new TrinoRestCatalog( + restSessionCatalog, + new CatalogName(catalogName), + NONE, + ImmutableMap.of(), + Namespace.empty(), + "test", + new TestingTypeManager(), + useUniqueTableLocations); } @Test