Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support WASB scheme in ADLSFileIO #11830

Merged
merged 5 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ public Optional<Long> adlsWriteBlockSize() {
return Optional.ofNullable(adlsWriteBlockSize);
}

/**
* Applies configuration to the {@link DataLakeFileSystemClientBuilder} to provide the endpoint
* and credentials required to create an instance of the client.
*
* <p>Default credentials are provided via the {@link com.azure.identity.DefaultAzureCredential}.
*
* @param account the service account key (e.g. a hostname or storage account key to get values)
* @param builder the builder instance
*/
public void applyClientConfiguration(String account, DataLakeFileSystemClientBuilder builder) {
String sasToken = adlsSasTokens.get(account);
if (sasToken != null && !sasToken.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ DataLakeFileSystemClient client(ADLSLocation location) {
new DataLakeFileSystemClientBuilder().httpClient(HTTP);

location.container().ifPresent(clientBuilder::fileSystemName);
azureProperties.applyClientConfiguration(location.storageAccount(), clientBuilder);
azureProperties.applyClientConfiguration(location.host(), clientBuilder);

return clientBuilder.buildClient();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,26 @@
*
* <p>Locations follow a URI like structure to identify resources
*
* <pre>{@code abfs[s]://[<container>@]<storage account host>/<file path>}</pre>
* <pre>{@code abfs[s]://[<container>@]<storageAccount>.dfs.core.windows.net/<path>}</pre>
*
* or
*
* <pre>{@code wasb[s]://<container>@<storageAccount>.blob.core.windows.net/<path>}</pre>
*
* For compatibility, locations using the wasb scheme are also accepted but will use the Azure Data
* Lake Storage Gen2 REST APIs instead of the Blob Storage REST APIs.
*
* <p>See <a
* href="https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-introduction-abfs-uri#uri-syntax">Azure
* Data Lake Storage URI</a>
*/
class ADLSLocation {
private static final Pattern URI_PATTERN = Pattern.compile("^abfss?://([^/?#]+)(.*)?$");
private static final Pattern URI_PATTERN = Pattern.compile("^(abfss?|wasbs?)://([^/?#]+)(.*)?$");

private final String storageAccount;
private final String container;
private final String path;
private final String host;

/**
* Creates a new ADLSLocation from a fully qualified URI.
Expand All @@ -55,17 +63,19 @@ class ADLSLocation {

ValidationException.check(matcher.matches(), "Invalid ADLS URI: %s", location);

String authority = matcher.group(1);
String authority = matcher.group(2);
String[] parts = authority.split("@", -1);
if (parts.length > 1) {
this.container = parts[0];
this.storageAccount = parts[1];
this.host = parts[1];
this.storageAccount = host.split("\\.", -1)[0];
} else {
this.container = null;
this.storageAccount = authority;
this.host = authority;
this.storageAccount = authority.split("\\.", -1)[0];
}

String uriPath = matcher.group(2);
String uriPath = matcher.group(3);
this.path = uriPath == null ? "" : uriPath.startsWith("/") ? uriPath.substring(1) : uriPath;
}

Expand All @@ -83,4 +93,9 @@ public Optional<String> container() {
public String path() {
return path;
}

/** Returns ADLS host. */
public String host() {
return host;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.azure.adlsv2;

import static org.apache.iceberg.azure.AzureProperties.ADLS_SAS_TOKEN_PREFIX;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.eq;
Expand All @@ -32,13 +33,15 @@
import com.azure.core.http.rest.Response;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClientBuilder;
import com.azure.storage.file.datalake.models.PathItem;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.time.OffsetDateTime;
import java.util.Iterator;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.azure.AzureProperties;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileInfo;
import org.apache.iceberg.io.InputFile;
Expand Down Expand Up @@ -100,6 +103,21 @@ public void testGetClient() {
assertThat(client.exists()).isTrue();
}

@Test
public void testApplyClientConfigurationWithSas() {
AzureProperties props =
spy(
new AzureProperties(
ImmutableMap.of(
ADLS_SAS_TOKEN_PREFIX + "account.dfs.core.windows.net", "sasToken")));
ADLSFileIO io = spy(new ADLSFileIO(props));
String location = AZURITE_CONTAINER.location("path/to/file");
io.client(location);
verify(props)
.applyClientConfiguration(
eq("account.dfs.core.windows.net"), any(DataLakeFileSystemClientBuilder.class));
}

/** Azurite does not support ADLSv2 directory operations yet so use mocks here. */
@SuppressWarnings("unchecked")
@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iceberg.exceptions.ValidationException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;

public class ADLSLocationTest {
Expand All @@ -33,7 +34,18 @@ public void testLocationParsing(String scheme) {
String p1 = scheme + "://[email protected]/path/to/file";
ADLSLocation location = new ADLSLocation(p1);

assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net");
assertThat(location.storageAccount()).isEqualTo("account");
assertThat(location.container().get()).isEqualTo("container");
assertThat(location.path()).isEqualTo("path/to/file");
}

@ParameterizedTest
@ValueSource(strings = {"wasb", "wasbs"})
public void testWasbLocatonParsing(String scheme) {
String p1 = scheme + "://[email protected]/path/to/file";
ADLSLocation location = new ADLSLocation(p1);

assertThat(location.storageAccount()).isEqualTo("account");
assertThat(location.container().get()).isEqualTo("container");
assertThat(location.path()).isEqualTo("path/to/file");
}
Expand All @@ -43,7 +55,7 @@ public void testEncodedString() {
String p1 = "abfs://[email protected]/path%20to%20file";
ADLSLocation location = new ADLSLocation(p1);

assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net");
assertThat(location.storageAccount()).isEqualTo("account");
assertThat(location.container().get()).isEqualTo("container");
assertThat(location.path()).isEqualTo("path%20to%20file");
}
Expand All @@ -67,7 +79,7 @@ public void testNoContainer() {
String p1 = "abfs://account.dfs.core.windows.net/path/to/file";
ADLSLocation location = new ADLSLocation(p1);

assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net");
assertThat(location.storageAccount()).isEqualTo("account");
assertThat(location.container().isPresent()).isFalse();
assertThat(location.path()).isEqualTo("path/to/file");
}
Expand All @@ -77,7 +89,7 @@ public void testNoPath() {
String p1 = "abfs://[email protected]";
ADLSLocation location = new ADLSLocation(p1);

assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net");
assertThat(location.storageAccount()).isEqualTo("account");
assertThat(location.container().get()).isEqualTo("container");
assertThat(location.path()).isEqualTo("");
}
Expand All @@ -89,4 +101,17 @@ public void testQuestionMarkInFileName(String path) {
ADLSLocation location = new ADLSLocation(fullPath);
assertThat(location.path()).contains(path);
}

@ParameterizedTest
@CsvSource({
"abfs://[email protected]/file.txt, account.dfs.core.windows.net",
"abfs://[email protected]/file.txt, account.dfs.core.usgovcloudapi.net",
"wasb://[email protected]/file.txt, account.blob.core.windows.net",
"abfs://account.dfs.core.windows.net/path, account.dfs.core.windows.net",
"wasb://account.blob.core.windows.net/path, account.blob.core.windows.net"
mrcnc marked this conversation as resolved.
Show resolved Hide resolved
})
void testHost(String path, String expectedHost) {
ADLSLocation location = new ADLSLocation(path);
assertThat(location.host()).contains(expectedHost);
mrcnc marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ public class ResolvingFileIO implements HadoopConfigurable, DelegateFileIO {
"s3n", S3_FILE_IO_IMPL,
"gs", GCS_FILE_IO_IMPL,
"abfs", ADLS_FILE_IO_IMPL,
"abfss", ADLS_FILE_IO_IMPL);
"abfss", ADLS_FILE_IO_IMPL,
"wasb", ADLS_FILE_IO_IMPL,
"wasbs", ADLS_FILE_IO_IMPL);
Comment on lines +66 to +67
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to verify that updating this default behavior of ResolvingFileIO is not considered a breaking change, since it could potentially force clients using the WASB scheme with HadoopFileIO to update their configuration. For example, if clients are currently using SAS tokens in an core-site.xml file, then it seems they need to either configure these in the AzureProperties for ADLSFileIO or explicitly configure HadoopFileIO as the implementation to get the same behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not completely sure I follow the concern, if a client is using WASB + HadoopFileIO this means they explicitly configured it or are using a catalog which defaults to using HadoopFileIO (e.g. the HadoopCatalog).

Adding this scheme mapping to ResolvingFileIO just makes it so that when ResolvingFileIO is configured or defaulted to, if a path with "wasb/wasbs" is encountered, we initialize the ADLS file IO. It shouldn't impact a user who has already either explicitly configured HadoopFileIO or is defaulting to that. Unless I'm missing something cc @danielcweeks @bryanck

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the JdbcCatalog uses HadoopFileIO by default so it wouldn't affect these users, but it would affect the REST Catalog which defaults to ResolvingFileIO. Maybe this is ok but needs to be added to release notes for 1.8.0? If not, we can remove it for now or activate this behavior explicitly with another setting?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's actually important to update the mapping (so imo what's here is correct). I think ResolvingFileIO should always only fallback to HadoopFileIO as a last resort, and in this case we do want wasb/wasbs to resolve to ADLS so that any custom integrations that happens in Iceberg in the ADLSFileIO implementation can be taken advantage of. At the same time we should document it.

In general, I feel like we shouldn't design for the case where a user relies on ResolvingFileIO's fallback HadoopFileIO

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mapping should be updated imho. This change makes sense to me here.


private final Map<String, DelegateFileIO> ioInstances = Maps.newConcurrentMap();
private final AtomicBoolean isClosed = new AtomicBoolean(false);
Expand Down