Skip to content

Commit

Permalink
Add support for reading from Unity catalog with Iceberg REST
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Sep 24, 2024
1 parent 1b9b55a commit d20bb4e
Show file tree
Hide file tree
Showing 10 changed files with 765 additions and 14 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,11 @@ jobs:
SNOWFLAKE_CATALOG_S3_SECRET_ACCESS_KEY: ${{ secrets.SNOWFLAKE_CATALOG_S3_SECRET_ACCESS_KEY }}
SNOWFLAKE_EXTERNAL_VOLUME: ${{ vars.SNOWFLAKE_EXTERNAL_VOLUME }}
SNOWFLAKE_CATALOG_S3_REGION: ${{ vars.SNOWFLAKE_CATALOG_S3_REGION }}
DATABRICKS_HOST: ${{ vars.DATABRICKS_HOST }}
DATABRICKS_LOGIN: token
DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }}
DATABRICKS_UNITY_CATALOG_NAME: ${{ vars.DATABRICKS_UNITY_CATALOG_NAME }}
DATABRICKS_UNITY_JDBC_URL: ${{ vars.DATABRICKS_UNITY_JDBC_URL }}
if: >-
contains(matrix.modules, 'trino-iceberg') && contains(matrix.profile, 'cloud-tests') &&
(env.CI_SKIP_SECRETS_PRESENCE_CHECKS != '' || env.AWS_ACCESS_KEY_ID != '' || env.AWS_SECRET_ACCESS_KEY != '' || env.GCP_CREDENTIALS_KEY != '')
Expand Down
15 changes: 15 additions & 0 deletions docs/src/main/sphinx/object-storage/metastores.md
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,9 @@ following properties:
* - `iceberg.rest-catalog.warehouse`
- Warehouse identifier/location for the catalog (optional). Example:
`s3://my_bucket/warehouse_location`
* - `iceberg.rest-catalog.namespace`
- The namespace to use with the REST catalog server. Example:
`main`
* - `iceberg.rest-catalog.security`
- The type of security to use (default: `NONE`). `OAUTH2` requires either a
`token` or `credential`. Example: `OAUTH2`
Expand Down Expand Up @@ -497,6 +500,18 @@ iceberg.catalog.type=rest
iceberg.rest-catalog.uri=http://iceberg-with-rest:8181
```

`iceberg.security` must be `read_only` when connecting to Databricks Unity catalog
using an Iceberg REST catalog:

```properties
connector.name=iceberg
iceberg.catalog.type=rest
iceberg.rest-catalog.uri=https://dbc-12345678-9999.cloud.databricks.com/api/2.1/unity-catalog/iceberg
iceberg.security=read_only
iceberg.rest-catalog.security=OAUTH2
iceberg.rest-catalog.oauth2.token=***
```

The REST catalog supports [view management](sql-view-management)
using the [Iceberg View specification](https://iceberg.apache.org/view-spec/).

Expand Down
23 changes: 23 additions & 0 deletions plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,19 @@
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.14</version>
<scope>runtime</scope>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
Expand Down Expand Up @@ -428,6 +441,13 @@
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>com.databricks</groupId>
<artifactId>databricks-jdbc</artifactId>
<version>2.6.36</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>http-server</artifactId>
Expand Down Expand Up @@ -697,6 +717,7 @@
<ignoredResourcePattern>iceberg-build.properties</ignoredResourcePattern>
<ignoredResourcePattern>mozilla/public-suffix-list.txt</ignoredResourcePattern>
<ignoredResourcePattern>mime.types</ignoredResourcePattern>
<ignoredResourcePattern>Log4j-charsets.properties</ignoredResourcePattern>
</ignoredResourcePatterns>
</configuration>
</plugin>
Expand Down Expand Up @@ -732,6 +753,7 @@
<exclude>**/Test*FailureRecoveryTest.java</exclude>
<exclude>**/TestIcebergSnowflakeCatalogConnectorSmokeTest.java</exclude>
<exclude>**/TestTrinoSnowflakeCatalog.java</exclude>
<exclude>**/TestIcebergUnityRestCatalogConnectorSmokeTest.java</exclude>
</excludes>
</configuration>
</plugin>
Expand Down Expand Up @@ -796,6 +818,7 @@
<include>**/TestIcebergAbfsConnectorSmokeTest.java</include>
<include>**/TestIcebergSnowflakeCatalogConnectorSmokeTest.java</include>
<include>**/TestTrinoSnowflakeCatalog.java</include>
<include>**/TestIcebergUnityRestCatalogConnectorSmokeTest.java</include>
</includes>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import jakarta.validation.constraints.NotNull;
import org.apache.iceberg.catalog.Namespace;

import java.net.URI;
import java.util.Optional;
Expand All @@ -37,6 +38,7 @@ public enum SessionType
private URI restUri;
private Optional<String> prefix = Optional.empty();
private Optional<String> warehouse = Optional.empty();
private Namespace namespace = Namespace.of();
private Security security = Security.NONE;
private SessionType sessionType = SessionType.NONE;
private boolean vendedCredentialsEnabled;
Expand Down Expand Up @@ -83,6 +85,19 @@ public IcebergRestCatalogConfig setWarehouse(String warehouse)
return this;
}

public Namespace getNamespace()
{
return namespace;
}

@Config("iceberg.rest-catalog.namespace")
@ConfigDescription("The namespace to use with the REST catalog server")
public IcebergRestCatalogConfig setNamespace(String namespace)
{
this.namespace = namespace == null ? Namespace.empty() : Namespace.of(namespace);
return this;
}

@NotNull
public Security getSecurity()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.trino.spi.security.ConnectorIdentity;
import io.trino.spi.type.TypeManager;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.rest.HTTPClient;
import org.apache.iceberg.rest.RESTSessionCatalog;

Expand All @@ -49,6 +50,7 @@ public class TrinoIcebergRestCatalogFactory
private final URI serverUri;
private final Optional<String> prefix;
private final Optional<String> warehouse;
private final Namespace namespace;
private final SessionType sessionType;
private final boolean vendedCredentialsEnabled;
private final SecurityProperties securityProperties;
Expand All @@ -75,6 +77,7 @@ public TrinoIcebergRestCatalogFactory(
this.serverUri = restConfig.getBaseUri();
this.prefix = restConfig.getPrefix();
this.warehouse = restConfig.getWarehouse();
this.namespace = restConfig.getNamespace();
this.sessionType = restConfig.getSessionType();
this.vendedCredentialsEnabled = restConfig.isVendedCredentialsEnabled();
this.securityProperties = requireNonNull(securityProperties, "securityProperties is null");
Expand Down Expand Up @@ -122,6 +125,7 @@ public synchronized TrinoCatalog create(ConnectorIdentity identity)
catalogName,
sessionType,
credentials,
namespace,
trinoVersion,
typeManager,
uniqueTableLocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.iceberg.view.ViewRepresentation;
import org.apache.iceberg.view.ViewVersion;

import java.util.Arrays;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
Expand All @@ -92,6 +93,7 @@
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
import static java.util.stream.Collectors.joining;
import static org.apache.iceberg.view.ViewProperties.COMMENT;

public class TrinoRestCatalog
Expand All @@ -106,6 +108,7 @@ public class TrinoRestCatalog
private final TypeManager typeManager;
private final SessionType sessionType;
private final Map<String, String> credentials;
private final Namespace namespace;
private final String trinoVersion;
private final boolean useUniqueTableLocation;

Expand All @@ -118,6 +121,7 @@ public TrinoRestCatalog(
CatalogName catalogName,
SessionType sessionType,
Map<String, String> credentials,
Namespace namespace,
String trinoVersion,
TypeManager typeManager,
boolean useUniqueTableLocation)
Expand All @@ -126,6 +130,7 @@ public TrinoRestCatalog(
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.sessionType = requireNonNull(sessionType, "sessionType is null");
this.credentials = ImmutableMap.copyOf(requireNonNull(credentials, "credentials is null"));
this.namespace = requireNonNull(namespace, "namespace is null");
this.trinoVersion = requireNonNull(trinoVersion, "trinoVersion is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.useUniqueTableLocation = useUniqueTableLocation;
Expand All @@ -134,14 +139,14 @@ public TrinoRestCatalog(
@Override
public boolean namespaceExists(ConnectorSession session, String namespace)
{
return restSessionCatalog.namespaceExists(convert(session), Namespace.of(namespace));
return restSessionCatalog.namespaceExists(convert(session), toNamespace(namespace));
}

@Override
public List<String> listNamespaces(ConnectorSession session)
{
return restSessionCatalog.listNamespaces(convert(session)).stream()
.map(Namespace::toString)
return restSessionCatalog.listNamespaces(convert(session), namespace).stream()
.map(this::toSchemaName)
.collect(toImmutableList());
}

Expand All @@ -164,7 +169,7 @@ public Map<String, Object> loadNamespaceMetadata(ConnectorSession session, Strin
{
try {
// Return immutable metadata as direct modifications will not be reflected on the namespace
return ImmutableMap.copyOf(restSessionCatalog.loadNamespaceMetadata(convert(session), Namespace.of(namespace)));
return ImmutableMap.copyOf(restSessionCatalog.loadNamespaceMetadata(convert(session), toNamespace(namespace)));
}
catch (NoSuchNamespaceException e) {
throw new SchemaNotFoundException(namespace);
Expand Down Expand Up @@ -362,7 +367,10 @@ public Table loadTable(ConnectorSession session, SchemaTableName schemaTableName
tableCache,
schemaTableName,
() -> {
BaseTable baseTable = (BaseTable) restSessionCatalog.loadTable(convert(session), toIdentifier(schemaTableName));
Namespace namespace = toNamespace(schemaTableName.getSchemaName());
TableIdentifier identifier = TableIdentifier.of(namespace, schemaTableName.getTableName());

BaseTable baseTable = (BaseTable) restSessionCatalog.loadTable(convert(session), identifier);
// Creating a new base table is necessary to adhere to Trino's expectations for quoted table names
return new BaseTable(baseTable.operations(), quotedTableName(schemaTableName));
});
Expand Down Expand Up @@ -653,19 +661,39 @@ private void invalidateTableCache(SchemaTableName schemaTableName)
tableCache.invalidate(schemaTableName);
}

private static TableIdentifier toIdentifier(SchemaTableName schemaTableName)
private Namespace toNamespace(String schemaName)
{
if (namespace.isEmpty()) {
return Namespace.of(schemaName);
}
return Namespace.of(namespace + "." + schemaName);
}

private String toSchemaName(Namespace namespace)
{
return TableIdentifier.of(schemaTableName.getSchemaName(), schemaTableName.getTableName());
if (this.namespace.isEmpty()) {
return namespace.toString();
}
return Arrays.stream(namespace.levels(), this.namespace.length(), namespace.length())
.collect(joining("."));
}

private TableIdentifier toIdentifier(SchemaTableName schemaTableName)
{
if (namespace.isEmpty()) {
return TableIdentifier.of(schemaTableName.getSchemaName(), schemaTableName.getTableName());
}
return TableIdentifier.of(Namespace.of(namespace + "." + schemaTableName.getSchemaName()), schemaTableName.getTableName());
}

private List<Namespace> listNamespaces(ConnectorSession session, Optional<String> namespace)
{
if (namespace.isEmpty()) {
return listNamespaces(session).stream()
.map(Namespace::of)
.map(this::toNamespace)
.collect(toImmutableList());
}

return ImmutableList.of(Namespace.of(namespace.get()));
return ImmutableList.of(toNamespace(namespace.get()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import java.util.Optional;
import java.util.Set;

import static com.google.common.base.Preconditions.checkState;
import static io.airlift.testing.Closeables.closeAllSuppress;
import static io.trino.plugin.iceberg.catalog.jdbc.TestingIcebergJdbcServer.PASSWORD;
import static io.trino.plugin.iceberg.catalog.jdbc.TestingIcebergJdbcServer.USER;
Expand Down Expand Up @@ -83,7 +82,7 @@ public static class Builder
{
private Optional<File> metastoreDirectory = Optional.empty();
private ImmutableMap.Builder<String, String> icebergProperties = ImmutableMap.builder();
private Optional<SchemaInitializer> schemaInitializer = Optional.empty();
private Optional<SchemaInitializer> schemaInitializer = Optional.of(SchemaInitializer.builder().build());

protected Builder()
{
Expand Down Expand Up @@ -133,12 +132,17 @@ public Builder setInitialTables(Iterable<TpchTable<?>> initialTables)

public Builder setSchemaInitializer(SchemaInitializer schemaInitializer)
{
checkState(this.schemaInitializer.isEmpty(), "schemaInitializer is already set");
this.schemaInitializer = Optional.of(requireNonNull(schemaInitializer, "schemaInitializer is null"));
amendSession(sessionBuilder -> sessionBuilder.setSchema(schemaInitializer.getSchemaName()));
return self();
}

public Builder disableSchemaInitializer()
{
schemaInitializer = Optional.empty();
return self();
}

@Override
public DistributedQueryRunner build()
throws Exception
Expand All @@ -155,7 +159,7 @@ public DistributedQueryRunner build()
Path dataDir = metastoreDirectory.map(File::toPath).orElseGet(() -> queryRunner.getCoordinator().getBaseDataDir().resolve("iceberg_data"));
queryRunner.installPlugin(new TestingIcebergPlugin(dataDir));
queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg", icebergProperties.buildOrThrow());
schemaInitializer.orElseGet(() -> SchemaInitializer.builder().build()).accept(queryRunner);
schemaInitializer.ifPresent(initializer -> initializer.accept(queryRunner));

return queryRunner;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public void testDefaults()
.setBaseUri(null)
.setPrefix(null)
.setWarehouse(null)
.setNamespace(null)
.setSessionType(IcebergRestCatalogConfig.SessionType.NONE)
.setSecurity(IcebergRestCatalogConfig.Security.NONE)
.setVendedCredentialsEnabled(false));
Expand All @@ -43,6 +44,7 @@ public void testExplicitPropertyMappings()
.put("iceberg.rest-catalog.uri", "http://localhost:1234")
.put("iceberg.rest-catalog.prefix", "dev")
.put("iceberg.rest-catalog.warehouse", "test_warehouse_identifier")
.put("iceberg.rest-catalog.namespace", "main")
.put("iceberg.rest-catalog.security", "OAUTH2")
.put("iceberg.rest-catalog.session", "USER")
.put("iceberg.rest-catalog.vended-credentials-enabled", "true")
Expand All @@ -52,6 +54,7 @@ public void testExplicitPropertyMappings()
.setBaseUri("http://localhost:1234")
.setPrefix("dev")
.setWarehouse("test_warehouse_identifier")
.setNamespace("main")
.setSessionType(IcebergRestCatalogConfig.SessionType.USER)
.setSecurity(IcebergRestCatalogConfig.Security.OAUTH2)
.setVendedCredentialsEnabled(true);
Expand Down
Loading

0 comments on commit d20bb4e

Please sign in to comment.