Skip to content

Commit

Permalink
1.7.1rc0 cherry picks (#11593)
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck authored Nov 20, 2024
1 parent 5f7c992 commit 70da31d
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,17 @@ public Optional<Long> adlsWriteBlockSize() {
return Optional.ofNullable(adlsWriteBlockSize);
}

/**
* Applies configuration to the {@link DataLakeFileSystemClientBuilder} to provide the endpoint
* and credentials required to create an instance of the client.
*
* <p>The default endpoint is constructed in the form {@code
* https://{account}.dfs.core.windows.net} and default credentials are provided via the {@link
* com.azure.identity.DefaultAzureCredential}.
*
* @param account the service account name
* @param builder the builder instance
*/
public void applyClientConfiguration(String account, DataLakeFileSystemClientBuilder builder) {
String sasToken = adlsSasTokens.get(account);
if (sasToken != null && !sasToken.isEmpty()) {
Expand All @@ -93,7 +104,7 @@ public void applyClientConfiguration(String account, DataLakeFileSystemClientBui
if (connectionString != null && !connectionString.isEmpty()) {
builder.endpoint(connectionString);
} else {
builder.endpoint("https://" + account);
builder.endpoint("https://" + account + ".dfs.core.windows.net");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,26 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/**
* This class represents a fully qualified location in Azure expressed as a URI.
* This class represents a fully qualified location to a file or directory in Azure Data Lake
* Storage Gen2 storage.
*
* <p>Locations follow the conventions used by Hadoop's Azure support, i.e.
* <p>Locations follow a URI like structure to identify resources
*
* <pre>{@code abfs[s]://[<container>@]<storage account host>/<file path>}</pre>
* <pre>{@code abfs[s]://[<container>@]<storageAccount>.dfs.core.windows.net/<path>}</pre>
*
* <p>See <a href="https://hadoop.apache.org/docs/stable/hadoop-azure/abfs.html">Hadoop Azure
* Support</a>
* or
*
* <pre>{@code wasb[s]://<container>@<storageAccount>.blob.core.windows.net/<path>}</pre>
*
* For compatibility, locations using the wasb scheme are also accepted but will use the Azure Data
* Lake Storage Gen2 REST APIs instead of the Blob Storage REST APIs.
*
* <p>See <a
* href="https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-introduction-abfs-uri#uri-syntax">Azure
* Data Lake Storage URI</a>
*/
class ADLSLocation {
private static final Pattern URI_PATTERN = Pattern.compile("^abfss?://([^/?#]+)(.*)?$");
private static final Pattern URI_PATTERN = Pattern.compile("^(abfss?|wasbs?)://([^/?#]+)(.*)?$");

private final String storageAccount;
private final String container;
Expand All @@ -53,19 +62,19 @@ class ADLSLocation {

ValidationException.check(matcher.matches(), "Invalid ADLS URI: %s", location);

String authority = matcher.group(1);
String authority = matcher.group(2);
String[] parts = authority.split("@", -1);
if (parts.length > 1) {
this.container = parts[0];
this.storageAccount = parts[1];
String host = parts[1];
this.storageAccount = host.split("\\.", -1)[0];
} else {
this.container = null;
this.storageAccount = authority;
this.storageAccount = authority.split("\\.", -1)[0];
}

String uriPath = matcher.group(2);
uriPath = uriPath == null ? "" : uriPath.startsWith("/") ? uriPath.substring(1) : uriPath;
this.path = uriPath.split("\\?", -1)[0].split("#", -1)[0];
String uriPath = matcher.group(3);
this.path = uriPath == null ? "" : uriPath.startsWith("/") ? uriPath.substring(1) : uriPath;
}

/** Returns Azure storage account. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,13 @@ public void testNoSasToken() {
@Test
public void testWithConnectionString() {
AzureProperties props =
new AzureProperties(ImmutableMap.of("adls.connection-string.account1", "http://endpoint"));
new AzureProperties(
ImmutableMap.of(
"adls.connection-string.account1", "https://account1.dfs.core.usgovcloudapi.net"));

DataLakeFileSystemClientBuilder clientBuilder = mock(DataLakeFileSystemClientBuilder.class);
props.applyClientConfiguration("account1", clientBuilder);
verify(clientBuilder).endpoint("http://endpoint");
verify(clientBuilder).endpoint("https://account1.dfs.core.usgovcloudapi.net");
}

@Test
Expand All @@ -111,7 +113,7 @@ public void testNoMatchingConnectionString() {

DataLakeFileSystemClientBuilder clientBuilder = mock(DataLakeFileSystemClientBuilder.class);
props.applyClientConfiguration("account1", clientBuilder);
verify(clientBuilder).endpoint("https://account1");
verify(clientBuilder).endpoint("https://account1.dfs.core.windows.net");
}

@Test
Expand All @@ -120,7 +122,7 @@ public void testNoConnectionString() {

DataLakeFileSystemClientBuilder clientBuilder = mock(DataLakeFileSystemClientBuilder.class);
props.applyClientConfiguration("account", clientBuilder);
verify(clientBuilder).endpoint("https://account");
verify(clientBuilder).endpoint("https://account.dfs.core.windows.net");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,18 @@ public void testLocationParsing(String scheme) {
String p1 = scheme + "://[email protected]/path/to/file";
ADLSLocation location = new ADLSLocation(p1);

assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net");
assertThat(location.storageAccount()).isEqualTo("account");
assertThat(location.container().get()).isEqualTo("container");
assertThat(location.path()).isEqualTo("path/to/file");
}

@ParameterizedTest
@ValueSource(strings = {"wasb", "wasbs"})
public void testWasbLocatonParsing(String scheme) {
String p1 = scheme + "://[email protected]/path/to/file";
ADLSLocation location = new ADLSLocation(p1);

assertThat(location.storageAccount()).isEqualTo("account");
assertThat(location.container().get()).isEqualTo("container");
assertThat(location.path()).isEqualTo("path/to/file");
}
Expand All @@ -43,7 +54,7 @@ public void testEncodedString() {
String p1 = "abfs://[email protected]/path%20to%20file";
ADLSLocation location = new ADLSLocation(p1);

assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net");
assertThat(location.storageAccount()).isEqualTo("account");
assertThat(location.container().get()).isEqualTo("container");
assertThat(location.path()).isEqualTo("path%20to%20file");
}
Expand All @@ -67,7 +78,7 @@ public void testNoContainer() {
String p1 = "abfs://account.dfs.core.windows.net/path/to/file";
ADLSLocation location = new ADLSLocation(p1);

assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net");
assertThat(location.storageAccount()).isEqualTo("account");
assertThat(location.container().isPresent()).isFalse();
assertThat(location.path()).isEqualTo("path/to/file");
}
Expand All @@ -77,28 +88,16 @@ public void testNoPath() {
String p1 = "abfs://[email protected]";
ADLSLocation location = new ADLSLocation(p1);

assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net");
assertThat(location.storageAccount()).isEqualTo("account");
assertThat(location.container().get()).isEqualTo("container");
assertThat(location.path()).isEqualTo("");
}

@Test
public void testQueryAndFragment() {
String p1 = "abfs://[email protected]/path/to/file?query=foo#123";
ADLSLocation location = new ADLSLocation(p1);

assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net");
assertThat(location.container().get()).isEqualTo("container");
assertThat(location.path()).isEqualTo("path/to/file");
}

@Test
public void testQueryAndFragmentNoPath() {
String p1 = "abfs://[email protected]?query=foo#123";
ADLSLocation location = new ADLSLocation(p1);

assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net");
assertThat(location.container().get()).isEqualTo("container");
assertThat(location.path()).isEqualTo("");
@ParameterizedTest
@ValueSource(strings = {"file?.txt", "file%3F.txt"})
public void testQuestionMarkInFileName(String path) {
String fullPath = String.format("abfs://[email protected]/%s", path);
ADLSLocation location = new ADLSLocation(fullPath);
assertThat(location.path()).contains(path);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ public class ResolvingFileIO implements HadoopConfigurable, DelegateFileIO {
"s3n", S3_FILE_IO_IMPL,
"gs", GCS_FILE_IO_IMPL,
"abfs", ADLS_FILE_IO_IMPL,
"abfss", ADLS_FILE_IO_IMPL);
"abfss", ADLS_FILE_IO_IMPL,
"wasb", ADLS_FILE_IO_IMPL,
"wasbs", ADLS_FILE_IO_IMPL);

private final Map<String, DelegateFileIO> ioInstances = Maps.newConcurrentMap();
private final AtomicBoolean isClosed = new AtomicBoolean(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ public List<Namespace> listNamespaces(SessionContext context, Namespace namespac

Map<String, String> queryParams = Maps.newHashMap();
if (!namespace.isEmpty()) {
queryParams.put("parent", RESTUtil.encodeNamespace(namespace));
queryParams.put("parent", RESTUtil.NAMESPACE_JOINER.join(namespace.levels()));
}

ImmutableList.Builder<Namespace> namespaces = ImmutableList.builder();
Expand Down
16 changes: 3 additions & 13 deletions core/src/main/java/org/apache/iceberg/rest/RESTUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,14 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;

public class RESTUtil {
private static final char NAMESPACE_SEPARATOR = '\u001f';
public static final Joiner NAMESPACE_JOINER = Joiner.on(NAMESPACE_SEPARATOR);
public static final Splitter NAMESPACE_SPLITTER = Splitter.on(NAMESPACE_SEPARATOR);
private static final String NAMESPACE_ESCAPED_SEPARATOR = "%1F";
private static final Joiner NAMESPACE_ESCAPED_JOINER = Joiner.on(NAMESPACE_ESCAPED_SEPARATOR);
private static final Splitter NAMESPACE_ESCAPED_SPLITTER =
Splitter.on(NAMESPACE_ESCAPED_SEPARATOR);

/**
* @deprecated since 1.7.0, will be made private in 1.8.0; use {@link
* RESTUtil#encodeNamespace(Namespace)} instead.
*/
@Deprecated public static final Joiner NAMESPACE_JOINER = Joiner.on(NAMESPACE_ESCAPED_SEPARATOR);

/**
* @deprecated since 1.7.0, will be made private in 1.8.0; use {@link
* RESTUtil#decodeNamespace(String)} instead.
*/
@Deprecated
public static final Splitter NAMESPACE_SPLITTER = Splitter.on(NAMESPACE_ESCAPED_SEPARATOR);

private RESTUtil() {}

public static String stripTrailingSlash(String path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,11 @@ public <T extends RESTResponse> T handleRequest(
if (asNamespaceCatalog != null) {
Namespace ns;
if (vars.containsKey("parent")) {
ns = RESTUtil.decodeNamespace(vars.get("parent"));
ns =
Namespace.of(
RESTUtil.NAMESPACE_SPLITTER
.splitToStream(vars.get("parent"))
.toArray(String[]::new));
} else {
ns = Namespace.empty();
}
Expand Down
3 changes: 1 addition & 2 deletions kafka-connect/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ project(':iceberg-kafka-connect:iceberg-kafka-connect-runtime') {
force 'org.xerial.snappy:snappy-java:1.1.10.7'
force 'org.apache.commons:commons-compress:1.27.1'
force 'org.apache.hadoop.thirdparty:hadoop-shaded-guava:1.3.0'
force 'com.fasterxml.woodstox:woodstox-core:6.7.0'
}
}
}
Expand All @@ -96,7 +97,6 @@ project(':iceberg-kafka-connect:iceberg-kafka-connect-runtime') {
exclude group: 'org.slf4j'
exclude group: 'ch.qos.reload4j'
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'com.fasterxml.woodstox'
exclude group: 'com.google.guava'
exclude group: 'com.google.protobuf'
exclude group: 'org.apache.curator'
Expand All @@ -105,7 +105,6 @@ project(':iceberg-kafka-connect:iceberg-kafka-connect-runtime') {
exclude group: 'org.apache.hadoop', module: 'hadoop-auth'
exclude group: 'org.apache.commons', module: 'commons-configuration2'
exclude group: 'org.apache.hadoop.thirdparty', module: 'hadoop-shaded-protobuf_3_7'
exclude group: 'org.codehaus.woodstox'
exclude group: 'org.eclipse.jetty'
}
implementation project(':iceberg-orc')
Expand Down

0 comments on commit 70da31d

Please sign in to comment.