diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index a07e83d59b05..1de740eb0193 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -708,6 +708,11 @@ jobs:
SNOWFLAKE_CATALOG_S3_SECRET_ACCESS_KEY: ${{ secrets.SNOWFLAKE_CATALOG_S3_SECRET_ACCESS_KEY }}
SNOWFLAKE_EXTERNAL_VOLUME: ${{ vars.SNOWFLAKE_EXTERNAL_VOLUME }}
SNOWFLAKE_CATALOG_S3_REGION: ${{ vars.SNOWFLAKE_CATALOG_S3_REGION }}
+ DATABRICKS_HOST: ${{ vars.DATABRICKS_HOST }}
+ DATABRICKS_LOGIN: token
+ DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }}
+ DATABRICKS_UNITY_CATALOG_NAME: ${{ vars.DATABRICKS_UNITY_CATALOG_NAME }}
+ DATABRICKS_UNITY_JDBC_URL: ${{ vars.DATABRICKS_UNITY_JDBC_URL }}
if: >-
contains(matrix.modules, 'trino-iceberg') && contains(matrix.profile, 'cloud-tests') &&
(env.CI_SKIP_SECRETS_PRESENCE_CHECKS != '' || env.AWS_ACCESS_KEY_ID != '' || env.AWS_SECRET_ACCESS_KEY != '' || env.GCP_CREDENTIALS_KEY != '')
diff --git a/docs/src/main/sphinx/object-storage/metastores.md b/docs/src/main/sphinx/object-storage/metastores.md
index 331a85e80822..5e62e62664e2 100644
--- a/docs/src/main/sphinx/object-storage/metastores.md
+++ b/docs/src/main/sphinx/object-storage/metastores.md
@@ -470,6 +470,9 @@ following properties:
* - `iceberg.rest-catalog.warehouse`
- Warehouse identifier/location for the catalog (optional). Example:
`s3://my_bucket/warehouse_location`
+* - `iceberg.rest-catalog.namespace`
+ - The namespace to use with the REST catalog server. Example:
+ `main`
* - `iceberg.rest-catalog.security`
- The type of security to use (default: `NONE`). `OAUTH2` requires either a
`token` or `credential`. Example: `OAUTH2`
@@ -497,6 +500,18 @@ iceberg.catalog.type=rest
iceberg.rest-catalog.uri=http://iceberg-with-rest:8181
```
+`iceberg.security` must be `read_only` when connecting to Databricks Unity catalog
+using an Iceberg REST catalog:
+
+```properties
+connector.name=iceberg
+iceberg.catalog.type=rest
+iceberg.rest-catalog.uri=https://dbc-12345678-9999.cloud.databricks.com/api/2.1/unity-catalog/iceberg
+iceberg.security=read_only
+iceberg.rest-catalog.security=OAUTH2
+iceberg.rest-catalog.oauth2.token=***
+```
+
The REST catalog supports [view management](sql-view-management)
using the [Iceberg View specification](https://iceberg.apache.org/view-spec/).
diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml
index b54c9d95138b..6c49beefdbc7 100644
--- a/plugin/trino-iceberg/pom.xml
+++ b/plugin/trino-iceberg/pom.xml
@@ -366,6 +366,19 @@
runtime
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.14
+ runtime
+
+
+ commons-logging
+ commons-logging
+
+
+
+
org.apache.httpcomponents.client5
httpclient5
@@ -428,6 +441,13 @@
runtime
+
+ com.databricks
+ databricks-jdbc
+ 2.6.36
+ test
+
+
io.airlift
http-server
@@ -697,6 +717,7 @@
iceberg-build.properties
mozilla/public-suffix-list.txt
mime.types
+ Log4j-charsets.properties
@@ -732,6 +753,7 @@
**/Test*FailureRecoveryTest.java
**/TestIcebergSnowflakeCatalogConnectorSmokeTest.java
**/TestTrinoSnowflakeCatalog.java
+ **/TestIcebergUnityRestCatalogConnectorSmokeTest.java
@@ -796,6 +818,7 @@
**/TestIcebergAbfsConnectorSmokeTest.java
**/TestIcebergSnowflakeCatalogConnectorSmokeTest.java
**/TestTrinoSnowflakeCatalog.java
+ **/TestIcebergUnityRestCatalogConnectorSmokeTest.java
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogConfig.java
index 3d515d1e186c..cac016467808 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogConfig.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogConfig.java
@@ -16,6 +16,7 @@
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import jakarta.validation.constraints.NotNull;
+import org.apache.iceberg.catalog.Namespace;
import java.net.URI;
import java.util.Optional;
@@ -37,6 +38,7 @@ public enum SessionType
private URI restUri;
private Optional prefix = Optional.empty();
private Optional warehouse = Optional.empty();
+ private Namespace namespace = Namespace.of();
private Security security = Security.NONE;
private SessionType sessionType = SessionType.NONE;
private boolean vendedCredentialsEnabled;
@@ -83,6 +85,19 @@ public IcebergRestCatalogConfig setWarehouse(String warehouse)
return this;
}
+ public Namespace getNamespace()
+ {
+ return namespace;
+ }
+
+ @Config("iceberg.rest-catalog.namespace")
+ @ConfigDescription("The namespace to use with the REST catalog server")
+ public IcebergRestCatalogConfig setNamespace(String namespace)
+ {
+ this.namespace = namespace == null ? Namespace.empty() : Namespace.of(namespace);
+ return this;
+ }
+
@NotNull
public Security getSecurity()
{
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoIcebergRestCatalogFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoIcebergRestCatalogFactory.java
index d048bd82372b..f7113dcfbe0a 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoIcebergRestCatalogFactory.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoIcebergRestCatalogFactory.java
@@ -28,6 +28,7 @@
import io.trino.spi.security.ConnectorIdentity;
import io.trino.spi.type.TypeManager;
import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.rest.HTTPClient;
import org.apache.iceberg.rest.RESTSessionCatalog;
@@ -49,6 +50,7 @@ public class TrinoIcebergRestCatalogFactory
private final URI serverUri;
private final Optional prefix;
private final Optional warehouse;
+ private final Namespace namespace;
private final SessionType sessionType;
private final boolean vendedCredentialsEnabled;
private final SecurityProperties securityProperties;
@@ -75,6 +77,7 @@ public TrinoIcebergRestCatalogFactory(
this.serverUri = restConfig.getBaseUri();
this.prefix = restConfig.getPrefix();
this.warehouse = restConfig.getWarehouse();
+ this.namespace = restConfig.getNamespace();
this.sessionType = restConfig.getSessionType();
this.vendedCredentialsEnabled = restConfig.isVendedCredentialsEnabled();
this.securityProperties = requireNonNull(securityProperties, "securityProperties is null");
@@ -122,6 +125,7 @@ public synchronized TrinoCatalog create(ConnectorIdentity identity)
catalogName,
sessionType,
credentials,
+ namespace,
trinoVersion,
typeManager,
uniqueTableLocation);
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java
index de9feeee9a83..a9923707370f 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java
@@ -69,6 +69,7 @@
import org.apache.iceberg.view.ViewRepresentation;
import org.apache.iceberg.view.ViewVersion;
+import java.util.Arrays;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
@@ -92,6 +93,7 @@
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
+import static java.util.stream.Collectors.joining;
import static org.apache.iceberg.view.ViewProperties.COMMENT;
public class TrinoRestCatalog
@@ -106,6 +108,7 @@ public class TrinoRestCatalog
private final TypeManager typeManager;
private final SessionType sessionType;
private final Map credentials;
+ private final Namespace namespace;
private final String trinoVersion;
private final boolean useUniqueTableLocation;
@@ -118,6 +121,7 @@ public TrinoRestCatalog(
CatalogName catalogName,
SessionType sessionType,
Map credentials,
+ Namespace namespace,
String trinoVersion,
TypeManager typeManager,
boolean useUniqueTableLocation)
@@ -126,6 +130,7 @@ public TrinoRestCatalog(
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.sessionType = requireNonNull(sessionType, "sessionType is null");
this.credentials = ImmutableMap.copyOf(requireNonNull(credentials, "credentials is null"));
+ this.namespace = requireNonNull(namespace, "namespace is null");
this.trinoVersion = requireNonNull(trinoVersion, "trinoVersion is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.useUniqueTableLocation = useUniqueTableLocation;
@@ -134,14 +139,14 @@ public TrinoRestCatalog(
@Override
public boolean namespaceExists(ConnectorSession session, String namespace)
{
- return restSessionCatalog.namespaceExists(convert(session), Namespace.of(namespace));
+ return restSessionCatalog.namespaceExists(convert(session), toNamespace(namespace));
}
@Override
public List listNamespaces(ConnectorSession session)
{
- return restSessionCatalog.listNamespaces(convert(session)).stream()
- .map(Namespace::toString)
+ return restSessionCatalog.listNamespaces(convert(session), namespace).stream()
+ .map(this::toSchemaName)
.collect(toImmutableList());
}
@@ -164,7 +169,7 @@ public Map loadNamespaceMetadata(ConnectorSession session, Strin
{
try {
// Return immutable metadata as direct modifications will not be reflected on the namespace
- return ImmutableMap.copyOf(restSessionCatalog.loadNamespaceMetadata(convert(session), Namespace.of(namespace)));
+ return ImmutableMap.copyOf(restSessionCatalog.loadNamespaceMetadata(convert(session), toNamespace(namespace)));
}
catch (NoSuchNamespaceException e) {
throw new SchemaNotFoundException(namespace);
@@ -362,7 +367,10 @@ public Table loadTable(ConnectorSession session, SchemaTableName schemaTableName
tableCache,
schemaTableName,
() -> {
- BaseTable baseTable = (BaseTable) restSessionCatalog.loadTable(convert(session), toIdentifier(schemaTableName));
+ Namespace namespace = toNamespace(schemaTableName.getSchemaName());
+ TableIdentifier identifier = TableIdentifier.of(namespace, schemaTableName.getTableName());
+
+ BaseTable baseTable = (BaseTable) restSessionCatalog.loadTable(convert(session), identifier);
// Creating a new base table is necessary to adhere to Trino's expectations for quoted table names
return new BaseTable(baseTable.operations(), quotedTableName(schemaTableName));
});
@@ -653,19 +661,39 @@ private void invalidateTableCache(SchemaTableName schemaTableName)
tableCache.invalidate(schemaTableName);
}
- private static TableIdentifier toIdentifier(SchemaTableName schemaTableName)
+ private Namespace toNamespace(String schemaName)
+ {
+ if (namespace.isEmpty()) {
+ return Namespace.of(schemaName);
+ }
+ return Namespace.of(namespace + "." + schemaName);
+ }
+
+ private String toSchemaName(Namespace namespace)
{
- return TableIdentifier.of(schemaTableName.getSchemaName(), schemaTableName.getTableName());
+ if (this.namespace.isEmpty()) {
+ return namespace.toString();
+ }
+ return Arrays.stream(namespace.levels(), this.namespace.length(), namespace.length())
+ .collect(joining("."));
+ }
+
+ private TableIdentifier toIdentifier(SchemaTableName schemaTableName)
+ {
+ if (namespace.isEmpty()) {
+ return TableIdentifier.of(schemaTableName.getSchemaName(), schemaTableName.getTableName());
+ }
+ return TableIdentifier.of(Namespace.of(namespace + "." + schemaTableName.getSchemaName()), schemaTableName.getTableName());
}
private List listNamespaces(ConnectorSession session, Optional namespace)
{
if (namespace.isEmpty()) {
return listNamespaces(session).stream()
- .map(Namespace::of)
+ .map(this::toNamespace)
.collect(toImmutableList());
}
- return ImmutableList.of(Namespace.of(namespace.get()));
+ return ImmutableList.of(toNamespace(namespace.get()));
}
}
diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java
index 274646e6ac2c..257dd18ed406 100644
--- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java
+++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java
@@ -43,7 +43,6 @@
import java.util.Optional;
import java.util.Set;
-import static com.google.common.base.Preconditions.checkState;
import static io.airlift.testing.Closeables.closeAllSuppress;
import static io.trino.plugin.iceberg.catalog.jdbc.TestingIcebergJdbcServer.PASSWORD;
import static io.trino.plugin.iceberg.catalog.jdbc.TestingIcebergJdbcServer.USER;
@@ -83,7 +82,7 @@ public static class Builder
{
private Optional metastoreDirectory = Optional.empty();
private ImmutableMap.Builder icebergProperties = ImmutableMap.builder();
- private Optional schemaInitializer = Optional.empty();
+ private Optional schemaInitializer = Optional.of(SchemaInitializer.builder().build());
protected Builder()
{
@@ -133,12 +132,17 @@ public Builder setInitialTables(Iterable> initialTables)
public Builder setSchemaInitializer(SchemaInitializer schemaInitializer)
{
- checkState(this.schemaInitializer.isEmpty(), "schemaInitializer is already set");
this.schemaInitializer = Optional.of(requireNonNull(schemaInitializer, "schemaInitializer is null"));
amendSession(sessionBuilder -> sessionBuilder.setSchema(schemaInitializer.getSchemaName()));
return self();
}
+ public Builder disableSchemaInitializer()
+ {
+ schemaInitializer = Optional.empty();
+ return self();
+ }
+
@Override
public DistributedQueryRunner build()
throws Exception
@@ -155,7 +159,7 @@ public DistributedQueryRunner build()
Path dataDir = metastoreDirectory.map(File::toPath).orElseGet(() -> queryRunner.getCoordinator().getBaseDataDir().resolve("iceberg_data"));
queryRunner.installPlugin(new TestingIcebergPlugin(dataDir));
queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg", icebergProperties.buildOrThrow());
- schemaInitializer.orElseGet(() -> SchemaInitializer.builder().build()).accept(queryRunner);
+ schemaInitializer.ifPresent(initializer -> initializer.accept(queryRunner));
return queryRunner;
}
diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogConfig.java
index 0f26a254bda0..1bc938fa1357 100644
--- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogConfig.java
+++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogConfig.java
@@ -31,6 +31,7 @@ public void testDefaults()
.setBaseUri(null)
.setPrefix(null)
.setWarehouse(null)
+ .setNamespace(null)
.setSessionType(IcebergRestCatalogConfig.SessionType.NONE)
.setSecurity(IcebergRestCatalogConfig.Security.NONE)
.setVendedCredentialsEnabled(false));
@@ -43,6 +44,7 @@ public void testExplicitPropertyMappings()
.put("iceberg.rest-catalog.uri", "http://localhost:1234")
.put("iceberg.rest-catalog.prefix", "dev")
.put("iceberg.rest-catalog.warehouse", "test_warehouse_identifier")
+ .put("iceberg.rest-catalog.namespace", "main")
.put("iceberg.rest-catalog.security", "OAUTH2")
.put("iceberg.rest-catalog.session", "USER")
.put("iceberg.rest-catalog.vended-credentials-enabled", "true")
@@ -52,6 +54,7 @@ public void testExplicitPropertyMappings()
.setBaseUri("http://localhost:1234")
.setPrefix("dev")
.setWarehouse("test_warehouse_identifier")
+ .setNamespace("main")
.setSessionType(IcebergRestCatalogConfig.SessionType.USER)
.setSecurity(IcebergRestCatalogConfig.Security.OAUTH2)
.setVendedCredentialsEnabled(true);
diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergUnityRestCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergUnityRestCatalogConnectorSmokeTest.java
new file mode 100644
index 000000000000..cc78244de346
--- /dev/null
+++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergUnityRestCatalogConnectorSmokeTest.java
@@ -0,0 +1,645 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.iceberg.catalog.rest;
+
+import com.databricks.client.jdbc.Driver;
+import com.google.common.collect.ImmutableMap;
+import dev.failsafe.Failsafe;
+import dev.failsafe.RetryPolicy;
+import io.airlift.log.Logger;
+import io.trino.filesystem.Location;
+import io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest;
+import io.trino.plugin.iceberg.IcebergConfig;
+import io.trino.plugin.iceberg.IcebergQueryRunner;
+import io.trino.testing.DistributedQueryRunner;
+import io.trino.testing.QueryRunner;
+import io.trino.testing.TestingConnectorBehavior;
+import io.trino.tpch.TpchTable;
+import org.assertj.core.util.Files;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.parallel.Execution;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.temporal.ChronoUnit;
+import java.util.Optional;
+import java.util.Properties;
+
+import static com.google.common.io.MoreFiles.deleteRecursively;
+import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;
+import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD;
+
+@TestInstance(PER_CLASS)
+@Execution(SAME_THREAD) // Don't run tests in parallel for mitigating rate limit of Databricks REST API
+final class TestIcebergUnityRestCatalogConnectorSmokeTest
+ extends BaseIcebergConnectorSmokeTest
+{
+ private static final Logger log = Logger.get(TestIcebergUnityRestCatalogConnectorSmokeTest.class);
+
+ private static final RetryPolicy> RETRY_POLICY = RetryPolicy.builder()
+ .handleIf(throwable -> throwable.getMessage().contains("HTTP Response code: 502"))
+ .withBackoff(1, 10, ChronoUnit.SECONDS)
+ .withMaxRetries(60)
+ .onRetry(event -> log.warn(event.getLastException(), "Query failed on attempt %d, will retry.", event.getAttemptCount()))
+ .build();
+
+ private final File warehouseLocation;
+ private final String databricksHost;
+ private final String databricksLogin;
+ private final String databricksToken;
+ private final String databricksUnityJdbcUrl;
+ private final String databricksCatalogName;
+ private final String s3Region;
+ private final String s3AccessKey;
+ private final String s3SecretKey;
+
+ public TestIcebergUnityRestCatalogConnectorSmokeTest()
+ {
+ super(new IcebergConfig().getFileFormat().toIceberg());
+ warehouseLocation = Files.newTemporaryFolder();
+
+ databricksHost = requireEnv("DATABRICKS_HOST");
+ databricksLogin = requireEnv("DATABRICKS_LOGIN");
+ databricksToken = requireEnv("DATABRICKS_TOKEN");
+ databricksUnityJdbcUrl = requireEnv("DATABRICKS_UNITY_JDBC_URL") + ";EnableArrow=0";
+ databricksCatalogName = requireEnv("DATABRICKS_UNITY_CATALOG_NAME");
+
+ s3Region = requireEnv("AWS_REGION");
+ s3AccessKey = requireEnv("AWS_ACCESS_KEY_ID");
+ s3SecretKey = requireEnv("AWS_SECRET_ACCESS_KEY");
+ }
+
+ private static String requireEnv(String variable)
+ {
+ return requireNonNull(System.getenv(variable), () -> "environment variable not set: " + variable);
+ }
+
+ @Override
+ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
+ {
+ return switch (connectorBehavior) {
+ case SUPPORTS_CREATE_MATERIALIZED_VIEW,
+ SUPPORTS_RENAME_MATERIALIZED_VIEW,
+ SUPPORTS_RENAME_SCHEMA -> false;
+ default -> super.hasBehavior(connectorBehavior);
+ };
+ }
+
+ @Override
+ protected QueryRunner createQueryRunner()
+ throws Exception
+ {
+ closeAfterClass(() -> deleteRecursively(warehouseLocation.toPath(), ALLOW_INSECURE));
+
+ DistributedQueryRunner queryRunner = IcebergQueryRunner.builder()
+ .setBaseDataDir(Optional.of(warehouseLocation.toPath()))
+ .setIcebergProperties(
+ ImmutableMap.builder()
+ .put("iceberg.file-format", format.name())
+ .put("iceberg.catalog.type", "rest")
+ .put("iceberg.security", "read_only")
+ .put("iceberg.rest-catalog.uri", "https://%s:443/api/2.1/unity-catalog/iceberg" .formatted(databricksHost))
+ .put("iceberg.rest-catalog.namespace", databricksCatalogName)
+ .put("iceberg.rest-catalog.security", "OAUTH2")
+ .put("iceberg.rest-catalog.oauth2.token", databricksToken)
+ .put("iceberg.register-table-procedure.enabled", "true")
+ .put("fs.native-s3.enabled", "true")
+ .put("s3.region", s3Region)
+ .put("s3.aws-access-key", s3AccessKey)
+ .put("s3.aws-secret-key", s3SecretKey)
+ .buildOrThrow())
+ .disableSchemaInitializer()
+ .build();
+
+ for (TpchTable> table : REQUIRED_TPCH_TABLES) {
+ copyTpchTable(queryRunner, table.getTableName());
+ }
+
+ return queryRunner;
+ }
+
+ private void copyTpchTable(DistributedQueryRunner queryRunner, String tableName)
+ {
+ if (isLoaded(queryRunner, tableName)) {
+ return;
+ }
+
+ Properties properties = new Properties();
+ properties.setProperty("user", databricksLogin);
+ properties.setProperty("password", databricksToken);
+
+ try (Connection connection = new Driver().connect(databricksUnityJdbcUrl, properties);
+ Statement statement = connection.createStatement()) {
+ Failsafe.with(RETRY_POLICY).run(() -> statement.execute("DROP TABLE IF EXISTS main.tpch." + tableName));
+
+ String columns = switch (tableName) {
+ case "region" -> "r_regionkey AS regionkey, r_name AS name, r_comment AS comment";
+ case "nation" -> "n_nationkey AS nationkey, n_name AS name, n_regionkey AS regionkey, n_comment AS comment";
+ default -> throw new IllegalArgumentException("Unexpected table name: " + tableName);
+ };
+
+ String createTableSql = "CREATE TABLE main.tpch." + tableName + " " +
+ "USING DELTA " +
+ "TBLPROPERTIES ('delta.universalFormat.enabledFormats' = 'iceberg') AS " +
+ "SELECT " + columns + " FROM samples.tpch." + tableName;
+
+ Failsafe.with(RETRY_POLICY).run(() -> statement.execute(createTableSql));
+ }
+ catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static boolean isLoaded(DistributedQueryRunner queryRunner, String tableName)
+ {
+ try {
+ long actual = (long) queryRunner.execute("SELECT COUNT(*) FROM " + tableName).getOnlyValue();
+ long expected = (long) queryRunner.execute("SELECT COUNT(*) FROM tpch.tiny." + tableName).getOnlyValue();
+ return actual == expected;
+ }
+ catch (Exception e) {
+ return false;
+ }
+ }
+
+ @Override
+ protected void dropTableFromMetastore(String tableName)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected String getMetadataLocation(String tableName)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected String schemaPath()
+ {
+ return format("%s/%s", warehouseLocation, getSession().getSchema());
+ }
+
+ @Override
+ protected boolean locationExists(String location)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected boolean isFileSorted(Location path, String sortColumnName)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected void deleteDirectory(String location)
+ {
+ try {
+ deleteRecursively(Path.of(location), ALLOW_INSECURE);
+ }
+ catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ @Test
+ @Override // Overridden as the table location and column data type is different
+ public void testShowCreateTable()
+ {
+ assertThat((String) computeScalar("SHOW CREATE TABLE " + TpchTable.REGION.getTableName()))
+ .matches(""
+ + "CREATE TABLE iceberg.tpch.region \\(\n"
+ + " regionkey bigint,\n"
+ + " name varchar,\n"
+ + " comment varchar\n"
+ + "\\)\n"
+ + "WITH \\(\n"
+ + " format = 'PARQUET',\n"
+ + " format_version = 1,\n"
+ + " location = 's3://.*'\n"
+ + "\\)");
+ }
+
+ @Test
+ @Override
+ public void testView()
+ {
+ assertThatThrownBy(super::testView)
+ .hasStackTraceContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testCommentView()
+ {
+ assertThatThrownBy(super::testCommentView)
+ .hasStackTraceContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testCommentViewColumn()
+ {
+ assertThatThrownBy(super::testCommentViewColumn)
+ .hasStackTraceContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testMaterializedView()
+ {
+ assertThatThrownBy(super::testMaterializedView)
+ .hasStackTraceContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testRenameSchema()
+ {
+ assertThatThrownBy(super::testRenameSchema)
+ .hasStackTraceContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testRenameTable()
+ {
+ assertThatThrownBy(super::testRenameTable)
+ .hasStackTraceContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testRenameTableAcrossSchemas()
+ {
+ assertThatThrownBy(super::testRenameTableAcrossSchemas)
+ .hasStackTraceContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testCreateTable()
+ {
+ assertThatThrownBy(super::testCreateTable)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testCreateTableAsSelect()
+ {
+ assertThatThrownBy(super::testCreateTableAsSelect)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testUpdate()
+ {
+ assertThatThrownBy(super::testUpdate)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testInsert()
+ {
+ assertThatThrownBy(super::testInsert)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testHiddenPathColumn()
+ {
+ assertThatThrownBy(super::testHiddenPathColumn)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testRowLevelDelete()
+ {
+ assertThatThrownBy(super::testRowLevelDelete)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testDeleteAllDataFromTable()
+ {
+ assertThatThrownBy(super::testDeleteAllDataFromTable)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testDeleteRowsConcurrently()
+ {
+ assertThatThrownBy(super::testDeleteRowsConcurrently)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testCreateOrReplaceTable()
+ {
+ assertThatThrownBy(super::testCreateOrReplaceTable)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testCreateOrReplaceTableChangeColumnNamesAndTypes()
+ {
+ assertThatThrownBy(super::testCreateOrReplaceTableChangeColumnNamesAndTypes)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testRegisterTableWithTableLocation()
+ {
+ assertThatThrownBy(super::testRegisterTableWithTableLocation)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testRegisterTableWithComments()
+ {
+ assertThatThrownBy(super::testRegisterTableWithComments)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testRowLevelUpdate()
+ {
+ assertThatThrownBy(super::testRowLevelUpdate)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testMerge()
+ {
+ assertThatThrownBy(super::testMerge)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testCreateSchema()
+ {
+ assertThatThrownBy(super::testCreateSchema)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testCreateSchemaWithNonLowercaseOwnerName()
+ {
+ assertThatThrownBy(super::testCreateSchemaWithNonLowercaseOwnerName)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testRegisterTableWithShowCreateTable()
+ {
+ assertThatThrownBy(super::testRegisterTableWithShowCreateTable)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testRegisterTableWithReInsert()
+ {
+ assertThatThrownBy(super::testRegisterTableWithReInsert)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testRegisterTableWithDroppedTable()
+ {
+ assertThatThrownBy(super::testRegisterTableWithDroppedTable)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testRegisterTableWithDifferentTableName()
+ {
+ assertThatThrownBy(super::testRegisterTableWithDifferentTableName)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testRegisterTableWithMetadataFile()
+ {
+ assertThatThrownBy(super::testRegisterTableWithMetadataFile)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testCreateTableWithTrailingSpaceInLocation()
+ {
+ assertThatThrownBy(super::testCreateTableWithTrailingSpaceInLocation)
+ .hasStackTraceContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testRegisterTableWithTrailingSpaceInLocation()
+ {
+ assertThatThrownBy(super::testRegisterTableWithTrailingSpaceInLocation)
+ .hasStackTraceContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testUnregisterTable()
+ {
+ assertThatThrownBy(super::testUnregisterTable)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testUnregisterBrokenTable()
+ {
+ assertThatThrownBy(super::testUnregisterBrokenTable)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testUnregisterTableNotExistingTable()
+ {
+ assertThatThrownBy(super::testUnregisterTableNotExistingTable)
+ .hasStackTraceContaining("Table .* not found");
+ }
+
+ @Test
+ @Override
+ public void testUnregisterTableNotExistingSchema()
+ {
+ assertThatThrownBy(super::testUnregisterTableNotExistingSchema)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testRepeatUnregisterTable()
+ {
+ assertThatThrownBy(super::testRepeatUnregisterTable)
+ .hasStackTraceContaining("Table .* not found");
+ }
+
+ @Test
+ @Override
+ public void testUnregisterTableAccessControl()
+ {
+ assertThatThrownBy(super::testUnregisterTableAccessControl)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testCreateTableWithNonExistingSchemaVerifyLocation()
+ {
+ assertThatThrownBy(super::testCreateTableWithNonExistingSchemaVerifyLocation)
+ .hasStackTraceContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testSortedNationTable()
+ {
+ assertThatThrownBy(super::testSortedNationTable)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testFileSortingWithLargerTable()
+ {
+ assertThatThrownBy(super::testFileSortingWithLargerTable)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testDropTableWithMissingMetadataFile()
+ {
+ assertThatThrownBy(super::testDropTableWithMissingMetadataFile)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testDropTableWithMissingSnapshotFile()
+ {
+ assertThatThrownBy(super::testDropTableWithMissingSnapshotFile)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testDropTableWithMissingManifestListFile()
+ {
+ assertThatThrownBy(super::testDropTableWithMissingManifestListFile)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testDropTableWithMissingDataFile()
+ {
+ assertThatThrownBy(super::testDropTableWithMissingDataFile)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testDropTableWithNonExistentTableLocation()
+ {
+ assertThatThrownBy(super::testDropTableWithNonExistentTableLocation)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testMetadataTables()
+ {
+ assertThatThrownBy(super::testMetadataTables)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testPartitionFilterRequired()
+ {
+ assertThatThrownBy(super::testPartitionFilterRequired)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testTableChangesFunction()
+ {
+ assertThatThrownBy(super::testTableChangesFunction)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testRowLevelDeletesWithTableChangesFunction()
+ {
+ assertThatThrownBy(super::testRowLevelDeletesWithTableChangesFunction)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testCreateOrReplaceWithTableChangesFunction()
+ {
+ assertThatThrownBy(super::testCreateOrReplaceWithTableChangesFunction)
+ .hasMessageContaining("Access Denied");
+ }
+
+ @Test
+ @Override
+ public void testTruncateTable()
+ {
+ assertThatThrownBy(super::testTruncateTable)
+ .hasMessageContaining("Access Denied");
+ }
+}
diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java
index ed54ed24fa21..84131fa30399 100644
--- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java
+++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java
@@ -32,6 +32,7 @@
import io.trino.spi.security.TrinoPrincipal;
import io.trino.spi.type.TestingTypeManager;
import io.trino.spi.type.VarcharType;
+import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.exceptions.BadRequestException;
import org.apache.iceberg.rest.DelegatingRestSessionCatalog;
import org.apache.iceberg.rest.RESTSessionCatalog;
@@ -79,7 +80,15 @@ private static TrinoRestCatalog createTrinoRestCatalog(boolean useUniqueTableLoc
restSessionCatalog.initialize(catalogName, properties);
- return new TrinoRestCatalog(restSessionCatalog, new CatalogName(catalogName), NONE, ImmutableMap.of(), "test", new TestingTypeManager(), useUniqueTableLocations);
+ return new TrinoRestCatalog(
+ restSessionCatalog,
+ new CatalogName(catalogName),
+ NONE,
+ ImmutableMap.of(),
+ Namespace.empty(),
+ "test",
+ new TestingTypeManager(),
+ useUniqueTableLocations);
}
@Test