Skip to content

Commit

Permalink
Add support for reading from Unity catalog with Iceberg REST
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Jul 16, 2024
1 parent 8c0b623 commit b921d5e
Show file tree
Hide file tree
Showing 10 changed files with 742 additions and 15 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,11 @@ jobs:
SNOWFLAKE_CATALOG_S3_SECRET_ACCESS_KEY: ${{ secrets.SNOWFLAKE_CATALOG_S3_SECRET_ACCESS_KEY }}
SNOWFLAKE_EXTERNAL_VOLUME: ${{ vars.SNOWFLAKE_EXTERNAL_VOLUME }}
SNOWFLAKE_CATALOG_S3_REGION: ${{ vars.SNOWFLAKE_CATALOG_S3_REGION }}
DATABRICKS_HOST: ${{ vars.DATABRICKS_HOST }}
DATABRICKS_LOGIN: token
DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }}
DATABRICKS_UNITY_CATALOG_NAME: ${{ vars.DATABRICKS_UNITY_CATALOG_NAME }}
DATABRICKS_UNITY_JDBC_URL: ${{ vars.DATABRICKS_UNITY_JDBC_URL }}
if: >-
contains(matrix.modules, 'trino-iceberg') && contains(matrix.profile, 'cloud-tests') &&
(env.CI_SKIP_SECRETS_PRESENCE_CHECKS != '' || env.AWS_ACCESS_KEY_ID != '' || env.AWS_SECRET_ACCESS_KEY != '' || env.GCP_CREDENTIALS_KEY != '')
Expand Down
15 changes: 15 additions & 0 deletions docs/src/main/sphinx/object-storage/metastores.md
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,9 @@ following properties:
* - `iceberg.rest-catalog.warehouse`
- Warehouse identifier/location for the catalog (optional). Example:
`s3://my_bucket/warehouse_location`
* - `iceberg.rest-catalog.namespace`
- The namespace to use with the REST catalog server. Example:
`main`
* - `iceberg.rest-catalog.security`
- The type of security to use (default: `NONE`). `OAUTH2` requires either a
`token` or `credential`. Example: `OAUTH2`
Expand All @@ -505,6 +508,18 @@ iceberg.catalog.type=rest
iceberg.rest-catalog.uri=http://iceberg-with-rest:8181
```

`iceberg.security` must be `read_only` when connecting to Databricks Unity catalog
using an Iceberg REST catalog:

```properties
connector.name=iceberg
iceberg.catalog.type=rest
iceberg.rest-catalog.uri=https://dbc-12345678-9999.cloud.databricks.com/api/2.1/unity-catalog/iceberg
iceberg.security=read_only
iceberg.rest-catalog.security=OAUTH2
iceberg.rest-catalog.oauth2.token=***
```

The REST catalog supports [view management](sql-view-management)
using the [Iceberg View specification](https://iceberg.apache.org/view-spec/).

Expand Down
23 changes: 23 additions & 0 deletions plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,19 @@
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.14</version>
<scope>runtime</scope>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
Expand Down Expand Up @@ -417,6 +430,13 @@
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>com.databricks</groupId>
<artifactId>databricks-jdbc</artifactId>
<version>2.6.36</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>http-server</artifactId>
Expand Down Expand Up @@ -686,6 +706,7 @@
<ignoredResourcePattern>iceberg-build.properties</ignoredResourcePattern>
<ignoredResourcePattern>mozilla/public-suffix-list.txt</ignoredResourcePattern>
<ignoredResourcePattern>mime.types</ignoredResourcePattern>
<ignoredResourcePattern>Log4j-charsets.properties</ignoredResourcePattern>
</ignoredResourcePatterns>
</configuration>
</plugin>
Expand Down Expand Up @@ -721,6 +742,7 @@
<exclude>**/Test*FailureRecoveryTest.java</exclude>
<exclude>**/TestIcebergSnowflakeCatalogConnectorSmokeTest.java</exclude>
<exclude>**/TestTrinoSnowflakeCatalog.java</exclude>
<exclude>**/TestIcebergUnityRestCatalogConnectorSmokeTest.java</exclude>
</excludes>
</configuration>
</plugin>
Expand Down Expand Up @@ -785,6 +807,7 @@
<include>**/TestIcebergAbfsConnectorSmokeTest.java</include>
<include>**/TestIcebergSnowflakeCatalogConnectorSmokeTest.java</include>
<include>**/TestTrinoSnowflakeCatalog.java</include>
<include>**/TestIcebergUnityRestCatalogConnectorSmokeTest.java</include>
</includes>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import jakarta.validation.constraints.NotNull;
import org.apache.iceberg.catalog.Namespace;

import java.net.URI;
import java.util.Optional;
Expand All @@ -37,6 +38,7 @@ public enum SessionType
private URI restUri;
private Optional<String> prefix = Optional.empty();
private Optional<String> warehouse = Optional.empty();
private Namespace namespace = Namespace.of();
private Security security = Security.NONE;
private SessionType sessionType = SessionType.NONE;
private boolean vendedCredentialsEnabled;
Expand Down Expand Up @@ -83,6 +85,19 @@ public IcebergRestCatalogConfig setWarehouse(String warehouse)
return this;
}

public Namespace getNamespace()
{
return namespace;
}

@Config("iceberg.rest-catalog.namespace")
@ConfigDescription("The namespace to use with the REST catalog server")
public IcebergRestCatalogConfig setNamespace(String namespace)
{
this.namespace = namespace == null ? Namespace.empty() : Namespace.of(namespace);
return this;
}

@NotNull
public Security getSecurity()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.trino.spi.security.ConnectorIdentity;
import io.trino.spi.type.TypeManager;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.rest.HTTPClient;
import org.apache.iceberg.rest.RESTSessionCatalog;

Expand All @@ -44,6 +45,7 @@ public class TrinoIcebergRestCatalogFactory
private final URI serverUri;
private final Optional<String> prefix;
private final Optional<String> warehouse;
private final Namespace namespace;
private final SessionType sessionType;
private final boolean vendedCredentialsEnabled;
private final SecurityProperties securityProperties;
Expand All @@ -70,6 +72,7 @@ public TrinoIcebergRestCatalogFactory(
this.serverUri = restConfig.getBaseUri();
this.prefix = restConfig.getPrefix();
this.warehouse = restConfig.getWarehouse();
this.namespace = restConfig.getNamespace();
this.sessionType = restConfig.getSessionType();
this.vendedCredentialsEnabled = restConfig.isVendedCredentialsEnabled();
this.securityProperties = requireNonNull(securityProperties, "securityProperties is null");
Expand Down Expand Up @@ -108,6 +111,6 @@ public synchronized TrinoCatalog create(ConnectorIdentity identity)
icebergCatalog = icebergCatalogInstance;
}

return new TrinoRestCatalog(icebergCatalog, catalogName, sessionType, trinoVersion, typeManager, uniqueTableLocation);
return new TrinoRestCatalog(icebergCatalog, catalogName, namespace, sessionType, trinoVersion, typeManager, uniqueTableLocation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.iceberg.view.ViewRepresentation;
import org.apache.iceberg.view.ViewVersion;

import java.util.Arrays;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
Expand All @@ -92,6 +93,7 @@
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
import static java.util.stream.Collectors.joining;
import static org.apache.iceberg.view.ViewProperties.COMMENT;

public class TrinoRestCatalog
Expand All @@ -105,6 +107,7 @@ public class TrinoRestCatalog
private final CatalogName catalogName;
private final TypeManager typeManager;
private final SessionType sessionType;
private final Namespace namespace;
private final String trinoVersion;
private final boolean useUniqueTableLocation;

Expand All @@ -115,6 +118,7 @@ public class TrinoRestCatalog
public TrinoRestCatalog(
RESTSessionCatalog restSessionCatalog,
CatalogName catalogName,
Namespace namespace,
SessionType sessionType,
String trinoVersion,
TypeManager typeManager,
Expand All @@ -123,6 +127,7 @@ public TrinoRestCatalog(
this.restSessionCatalog = requireNonNull(restSessionCatalog, "restSessionCatalog is null");
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.sessionType = requireNonNull(sessionType, "sessionType is null");
this.namespace = requireNonNull(namespace, "namespace is null");
this.trinoVersion = requireNonNull(trinoVersion, "trinoVersion is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.useUniqueTableLocation = useUniqueTableLocation;
Expand All @@ -131,14 +136,14 @@ public TrinoRestCatalog(
@Override
public boolean namespaceExists(ConnectorSession session, String namespace)
{
return restSessionCatalog.namespaceExists(convert(session), Namespace.of(namespace));
return restSessionCatalog.namespaceExists(convert(session), toNamespace(namespace));
}

@Override
public List<String> listNamespaces(ConnectorSession session)
{
return restSessionCatalog.listNamespaces(convert(session)).stream()
.map(Namespace::toString)
return restSessionCatalog.listNamespaces(convert(session), namespace).stream()
.map(this::toSchemaName)
.collect(toImmutableList());
}

Expand All @@ -161,7 +166,7 @@ public Map<String, Object> loadNamespaceMetadata(ConnectorSession session, Strin
{
try {
// Return immutable metadata as direct modifications will not be reflected on the namespace
return ImmutableMap.copyOf(restSessionCatalog.loadNamespaceMetadata(convert(session), Namespace.of(namespace)));
return ImmutableMap.copyOf(restSessionCatalog.loadNamespaceMetadata(convert(session), toNamespace(namespace)));
}
catch (NoSuchNamespaceException e) {
throw new SchemaNotFoundException(namespace);
Expand Down Expand Up @@ -210,10 +215,10 @@ public List<TableInfo> listTables(ConnectorSession session, Optional<String> nam
ImmutableList.Builder<TableInfo> tables = ImmutableList.builder();
for (Namespace restNamespace : namespaces) {
listTableIdentifiers(restNamespace, () -> restSessionCatalog.listTables(sessionContext, restNamespace)).stream()
.map(id -> new TableInfo(SchemaTableName.schemaTableName(id.namespace().toString(), id.name()), TableInfo.ExtendedRelationType.TABLE))
.map(id -> new TableInfo(toSchemaTableName(id), TableInfo.ExtendedRelationType.TABLE))
.forEach(tables::add);
listTableIdentifiers(restNamespace, () -> restSessionCatalog.listViews(sessionContext, restNamespace)).stream()
.map(id -> new TableInfo(SchemaTableName.schemaTableName(id.namespace().toString(), id.name()), TableInfo.ExtendedRelationType.OTHER_VIEW))
.map(id -> new TableInfo(toSchemaTableName(id), TableInfo.ExtendedRelationType.OTHER_VIEW))
.forEach(tables::add);
}
return tables.build();
Expand Down Expand Up @@ -344,7 +349,10 @@ public Table loadTable(ConnectorSession session, SchemaTableName schemaTableName
tableCache,
schemaTableName,
() -> {
BaseTable baseTable = (BaseTable) restSessionCatalog.loadTable(convert(session), toIdentifier(schemaTableName));
Namespace namespace = toNamespace(schemaTableName.getSchemaName());
TableIdentifier identifier = TableIdentifier.of(namespace, schemaTableName.getTableName());

BaseTable baseTable = (BaseTable) restSessionCatalog.loadTable(convert(session), identifier);
// Creating a new base table is necessary to adhere to Trino's expectations for quoted table names
return new BaseTable(baseTable.operations(), quotedTableName(schemaTableName));
});
Expand Down Expand Up @@ -635,9 +643,37 @@ private void invalidateTableCache(SchemaTableName schemaTableName)
tableCache.invalidate(schemaTableName);
}

private static TableIdentifier toIdentifier(SchemaTableName schemaTableName)
private Namespace toNamespace(String schemaName)
{
return TableIdentifier.of(schemaTableName.getSchemaName(), schemaTableName.getTableName());
if (namespace.isEmpty()) {
return Namespace.of(schemaName);
}
return Namespace.of(namespace + "." + schemaName);
}

private String toSchemaName(Namespace namespace)
{
if (this.namespace.isEmpty()) {
return namespace.toString();
}
return Arrays.stream(namespace.levels(), this.namespace.length(), namespace.length())
.collect(joining("."));
}

private SchemaTableName toSchemaTableName(TableIdentifier tableIdentifier)
{
if (namespace.isEmpty()) {
return SchemaTableName.schemaTableName(tableIdentifier.namespace().toString(), tableIdentifier.name());
}
return SchemaTableName.schemaTableName(tableIdentifier.namespace().level(1), tableIdentifier.name());
}

private TableIdentifier toIdentifier(SchemaTableName schemaTableName)
{
if (namespace.isEmpty()) {
return TableIdentifier.of(schemaTableName.getSchemaName(), schemaTableName.getTableName());
}
return TableIdentifier.of(Namespace.of(namespace + "." + schemaTableName.getSchemaName()), schemaTableName.getTableName());
}

private List<Namespace> listNamespaces(ConnectorSession session, Optional<String> namespace)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.util.Optional;
import java.util.Set;

import static com.google.common.base.Preconditions.checkState;
import static io.airlift.testing.Closeables.closeAllSuppress;
import static io.trino.plugin.iceberg.catalog.jdbc.TestingIcebergJdbcServer.PASSWORD;
import static io.trino.plugin.iceberg.catalog.jdbc.TestingIcebergJdbcServer.USER;
Expand Down Expand Up @@ -82,7 +81,7 @@ public static class Builder
{
private Optional<File> metastoreDirectory = Optional.empty();
private ImmutableMap.Builder<String, String> icebergProperties = ImmutableMap.builder();
private Optional<SchemaInitializer> schemaInitializer = Optional.empty();
private Optional<SchemaInitializer> schemaInitializer = Optional.of(SchemaInitializer.builder().build());

protected Builder()
{
Expand Down Expand Up @@ -132,12 +131,17 @@ public Builder setInitialTables(Iterable<TpchTable<?>> initialTables)

public Builder setSchemaInitializer(SchemaInitializer schemaInitializer)
{
checkState(this.schemaInitializer.isEmpty(), "schemaInitializer is already set");
this.schemaInitializer = Optional.of(requireNonNull(schemaInitializer, "schemaInitializer is null"));
amendSession(sessionBuilder -> sessionBuilder.setSchema(schemaInitializer.getSchemaName()));
return self();
}

public Builder disableSchemaInitializer()
{
schemaInitializer = Optional.empty();
return self();
}

@Override
public DistributedQueryRunner build()
throws Exception
Expand All @@ -150,7 +154,7 @@ public DistributedQueryRunner build()
Path dataDir = metastoreDirectory.map(File::toPath).orElseGet(() -> queryRunner.getCoordinator().getBaseDataDir().resolve("iceberg_data"));
queryRunner.installPlugin(new TestingIcebergPlugin(dataDir));
queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg", icebergProperties.buildOrThrow());
schemaInitializer.orElseGet(() -> SchemaInitializer.builder().build()).accept(queryRunner);
schemaInitializer.ifPresent(initializer -> initializer.accept(queryRunner));

return queryRunner;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public void testDefaults()
.setBaseUri(null)
.setPrefix(null)
.setWarehouse(null)
.setNamespace(null)
.setSessionType(IcebergRestCatalogConfig.SessionType.NONE)
.setSecurity(IcebergRestCatalogConfig.Security.NONE)
.setVendedCredentialsEnabled(false));
Expand All @@ -43,6 +44,7 @@ public void testExplicitPropertyMappings()
.put("iceberg.rest-catalog.uri", "http://localhost:1234")
.put("iceberg.rest-catalog.prefix", "dev")
.put("iceberg.rest-catalog.warehouse", "test_warehouse_identifier")
.put("iceberg.rest-catalog.namespace", "main")
.put("iceberg.rest-catalog.security", "OAUTH2")
.put("iceberg.rest-catalog.session", "USER")
.put("iceberg.rest-catalog.vended-credentials-enabled", "true")
Expand All @@ -52,6 +54,7 @@ public void testExplicitPropertyMappings()
.setBaseUri("http://localhost:1234")
.setPrefix("dev")
.setWarehouse("test_warehouse_identifier")
.setNamespace("main")
.setSessionType(IcebergRestCatalogConfig.SessionType.USER)
.setSecurity(IcebergRestCatalogConfig.Security.OAUTH2)
.setVendedCredentialsEnabled(true);
Expand Down
Loading

0 comments on commit b921d5e

Please sign in to comment.