Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nessie: Support APIv2 client #22215

Merged
merged 1 commit into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions docs/src/main/sphinx/object-storage/metastores.md
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ properties:
- Description
* - `iceberg.nessie-catalog.uri`
- Nessie API endpoint URI (required). Example:
`https://localhost:19120/api/v1`
`https://localhost:19120/api/v2`
wendigo marked this conversation as resolved.
Show resolved Hide resolved
* - `iceberg.nessie-catalog.ref`
- The branch/tag to use for Nessie. Defaults to `main`.
* - `iceberg.nessie-catalog.default-warehouse-dir`
Expand All @@ -566,12 +566,15 @@ properties:
* - `iceberg.nessie-catalog.authentication.token`
- The token to use with `BEARER` authentication. Example:
`SXVLUXUhIExFQ0tFUiEK`
* - `iceberg.nessie-catalog.client-api-version`
- Optional version of the Client API version to use. By default it is inferred from the `iceberg.nessie-catalog.uri` value.
Valid values are `V1` or `V2`.
:::

```text
connector.name=iceberg
iceberg.catalog.type=nessie
iceberg.nessie-catalog.uri=https://localhost:19120/api/v1
iceberg.nessie-catalog.uri=https://localhost:19120/api/v2
iceberg.nessie-catalog.default-warehouse-dir=/tmp
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@

import java.net.URI;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.plugin.iceberg.catalog.nessie.IcebergNessieCatalogConfig.Security.BEARER;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.function.Predicate.isEqual;
Expand All @@ -38,6 +41,12 @@ public enum Security
BEARER,
}

public enum ClientApiVersion
{
V1,
V2,
}

private String defaultReferenceName = "main";
private String defaultWarehouseDir;
private URI serverUri;
Expand All @@ -46,6 +55,8 @@ public enum Security
private boolean enableCompression = true;
private Security security;
private Optional<String> bearerToken = Optional.empty();
private Optional<ClientApiVersion> clientAPIVersion = Optional.empty();
private static final Pattern VERSION_PATTERN = Pattern.compile("/v(\\d+)$");

@NotNull
public String getDefaultReferenceName()
Expand Down Expand Up @@ -168,4 +179,33 @@ public boolean isMissingTokenForBearerAuth()
{
return getSecurity().filter(isEqual(BEARER)).isEmpty() || getBearerToken().isPresent();
}

public Optional<ClientApiVersion> getClientAPIVersion()
{
return clientAPIVersion;
}

@Config("iceberg.nessie-catalog.client-api-version")
@ConfigDescription("Client API version to use")
public IcebergNessieCatalogConfig setClientAPIVersion(ClientApiVersion version)
{
this.clientAPIVersion = Optional.ofNullable(version);
return this;
}

protected IcebergNessieCatalogConfig.ClientApiVersion inferVersionFromURI()
{
checkArgument(serverUri != null, "URI is not specified in the catalog properties");
// match for uri ending with /v1, /v2 etc
Matcher matcher = VERSION_PATTERN.matcher(serverUri.toString());
if (!matcher.find()) {
throw new IllegalArgumentException("URI doesn't end with the version: %s. Please configure `client-api-version` in the catalog properties explicitly.".formatted(serverUri));
}

return switch (matcher.group(1)) {
case "1" -> ClientApiVersion.V1;
case "2" -> ClientApiVersion.V2;
default -> throw new IllegalArgumentException("Unknown API version in the URI: " + matcher.group(1));
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iceberg.nessie.NessieIcebergClient;
import org.projectnessie.client.NessieClientBuilder;
import org.projectnessie.client.api.NessieApiV1;
import org.projectnessie.client.api.NessieApiV2;
import org.projectnessie.client.auth.BearerAuthenticationProvider;

import static io.airlift.configuration.ConfigBinder.configBinder;
Expand Down Expand Up @@ -58,7 +59,14 @@ public static NessieIcebergClient createNessieIcebergClient(IcebergNessieCatalog
icebergNessieCatalogConfig.getBearerToken()
.ifPresent(token -> builder.withAuthentication(BearerAuthenticationProvider.create(token)));

return new NessieIcebergClient(builder.build(NessieApiV1.class),
IcebergNessieCatalogConfig.ClientApiVersion clientApiVersion = icebergNessieCatalogConfig.getClientAPIVersion()
.orElseGet(icebergNessieCatalogConfig::inferVersionFromURI);
NessieApiV1 api = switch (clientApiVersion) {
case V1 -> builder.build(NessieApiV1.class);
case V2 -> builder.build(NessieApiV2.class);
};

return new NessieIcebergClient(api,
icebergNessieCatalogConfig.getDefaultReferenceName(),
null,
ImmutableMap.of());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableMap;
import com.google.inject.CreationException;
import io.airlift.bootstrap.ApplicationConfigurationException;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorFactory;
Expand Down Expand Up @@ -285,6 +286,7 @@ public void testNessieCatalog()
"iceberg.catalog.type", "nessie",
"iceberg.nessie-catalog.default-warehouse-dir", "/tmp",
"iceberg.nessie-catalog.uri", "http://foo:1234",
"iceberg.nessie-catalog.client-api-version", "V1",
"bootstrap.quiet", "true"),
new TestingConnectorContext())
.shutdown();
Expand All @@ -301,6 +303,7 @@ public void testNessieCatalogWithBearerAuth()
"iceberg.catalog.type", "nessie",
"iceberg.nessie-catalog.default-warehouse-dir", "/tmp",
"iceberg.nessie-catalog.uri", "http://foo:1234",
"iceberg.nessie-catalog.client-api-version", "V2",
"iceberg.nessie-catalog.authentication.type", "BEARER",
"iceberg.nessie-catalog.authentication.token", "someToken"),
new TestingConnectorContext())
Expand Down Expand Up @@ -343,6 +346,23 @@ public void testNessieCatalogWithNoAccessToken()
.hasMessageContaining("'iceberg.nessie-catalog.authentication.token' must be configured with 'iceberg.nessie-catalog.authentication.type' BEARER");
}

@Test
public void testNessieCatalogClientAPIVersion()
{
ConnectorFactory factory = getConnectorFactory();

assertThatThrownBy(() -> factory.create(
"test",
Map.of(
"iceberg.catalog.type", "nessie",
"iceberg.nessie-catalog.uri", "http://foo:1234",
"iceberg.nessie-catalog.default-warehouse-dir", "/tmp"),
new TestingConnectorContext())
.shutdown())
.isInstanceOf(CreationException.class)
.hasMessageContaining("URI doesn't end with the version: http://foo:1234. Please configure `client-api-version` in the catalog properties explicitly.");
}

@Test
public void testSnowflakeCatalog()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,32 +41,35 @@ public void testDefaults()
.setConnectionTimeout(new Duration(DEFAULT_CONNECT_TIMEOUT_MILLIS, MILLISECONDS))
.setReadTimeout(new Duration(DEFAULT_READ_TIMEOUT_MILLIS, MILLISECONDS))
.setSecurity(null)
.setBearerToken(null));
.setBearerToken(null)
.setClientAPIVersion(null));
}

@Test
public void testExplicitPropertyMapping()
{
Map<String, String> properties = ImmutableMap.<String, String>builder()
.put("iceberg.nessie-catalog.default-warehouse-dir", "/tmp")
.put("iceberg.nessie-catalog.uri", "http://localhost:xxx/api/v1")
.put("iceberg.nessie-catalog.uri", "http://localhost:xxx/api/custom")
.put("iceberg.nessie-catalog.ref", "someRef")
.put("iceberg.nessie-catalog.enable-compression", "false")
.put("iceberg.nessie-catalog.connection-timeout", "2s")
.put("iceberg.nessie-catalog.read-timeout", "5m")
.put("iceberg.nessie-catalog.authentication.type", "BEARER")
.put("iceberg.nessie-catalog.authentication.token", "bearerToken")
.put("iceberg.nessie-catalog.client-api-version", "V1")
.buildOrThrow();

IcebergNessieCatalogConfig expected = new IcebergNessieCatalogConfig()
.setDefaultWarehouseDir("/tmp")
.setServerUri(URI.create("http://localhost:xxx/api/v1"))
.setServerUri(URI.create("http://localhost:xxx/api/custom"))
.setDefaultReferenceName("someRef")
.setCompressionEnabled(false)
.setConnectionTimeout(new Duration(2, TimeUnit.SECONDS))
.setReadTimeout(new Duration(5, TimeUnit.MINUTES))
.setSecurity(IcebergNessieCatalogConfig.Security.BEARER)
.setBearerToken("bearerToken");
.setBearerToken("bearerToken")
.setClientAPIVersion(IcebergNessieCatalogConfig.ClientApiVersion.V1);

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.parallel.Execution;
import org.projectnessie.client.NessieClientBuilder;
import org.projectnessie.client.api.NessieApiV1;
import org.projectnessie.client.api.NessieApiV2;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -94,9 +94,9 @@ protected TrinoCatalog createTrinoCatalog(boolean useUniqueTableLocations)
TrinoFileSystemFactory fileSystemFactory = new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS);
IcebergNessieCatalogConfig icebergNessieCatalogConfig = new IcebergNessieCatalogConfig()
.setServerUri(URI.create(nessieContainer.getRestApiUri()));
NessieApiV1 nessieApi = NessieClientBuilder.createClientBuilderFromSystemSettings()
NessieApiV2 nessieApi = NessieClientBuilder.createClientBuilderFromSystemSettings()
.withUri(nessieContainer.getRestApiUri())
.build(NessieApiV1.class);
.build(NessieApiV2.class);
NessieIcebergClient nessieClient = new NessieIcebergClient(nessieApi, icebergNessieCatalogConfig.getDefaultReferenceName(), null, ImmutableMap.of());
return new TrinoNessieCatalog(
new CatalogName("catalog_name"),
Expand All @@ -118,9 +118,9 @@ public void testDefaultLocation()
IcebergNessieCatalogConfig icebergNessieCatalogConfig = new IcebergNessieCatalogConfig()
.setDefaultWarehouseDir(tmpDirectory.toAbsolutePath().toString())
.setServerUri(URI.create(nessieContainer.getRestApiUri()));
NessieApiV1 nessieApi = NessieClientBuilder.createClientBuilderFromSystemSettings()
NessieApiV2 nessieApi = NessieClientBuilder.createClientBuilderFromSystemSettings()
.withUri(nessieContainer.getRestApiUri())
.build(NessieApiV1.class);
.build(NessieApiV2.class);
NessieIcebergClient nessieClient = new NessieIcebergClient(nessieApi, icebergNessieCatalogConfig.getDefaultReferenceName(), null, ImmutableMap.of());
TrinoCatalog catalogWithDefaultLocation = new TrinoNessieCatalog(
new CatalogName("catalog_name"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void start()

public String getRestApiUri()
{
return "http://" + getMappedHostAndPortForExposedPort(PORT) + "/api/v1";
return "http://" + getMappedHostAndPortForExposedPort(PORT) + "/api/v2";
}

public static class Builder
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
connector.name=iceberg
iceberg.catalog.type=nessie
iceberg.nessie-catalog.uri=http://nessie-server:19120/api/v1
iceberg.nessie-catalog.uri=http://nessie-server:19120/api/v2
iceberg.nessie-catalog.default-warehouse-dir=hdfs://hadoop-master:9000/user/hive/warehouse
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
spark.sql.catalog.iceberg_test=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.iceberg_test.catalog-impl=org.apache.iceberg.nessie.NessieCatalog
spark.sql.catalog.iceberg_test.uri=http://nessie-server:19120/api/v1
spark.sql.catalog.iceberg_test.uri=http://nessie-server:19120/api/v2
spark.sql.catalog.iceberg_test.authentication.type=NONE
spark.sql.catalog.iceberg_test.warehouse=hdfs://hadoop-master:9000/user/hive/warehouse
; disabling caching allows us to run spark queries interchangeably with trino's
Expand Down