Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-19232: [ABFS][FNSOverBlob] Implementing Ingress Support with various Fallback Handling #7272

Merged
merged 22 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1462,6 +1462,18 @@ void setIsNamespaceEnabledAccount(String isNamespaceEnabledAccount) {
this.isNamespaceEnabledAccount = isNamespaceEnabledAccount;
}

/**
* Checks if the FixedSASTokenProvider is configured for the current account.
*
* @return true if the FixedSASTokenProvider is configured, false otherwise.
* @throws AzureBlobFileSystemException if there is an error in getting the SAS token provider.
*/
public boolean isFixedSASTokenProviderConfigured()
throws AzureBlobFileSystemException {
return getAuthType(this.accountName) == AuthType.SAS
&& getSASTokenProvider() instanceof FixedSASTokenProvider;
}

private String getTrimmedPasswordString(String key, String defaultValue) throws IOException {
String value = getPasswordString(key);
if (StringUtils.isBlank(value)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,12 @@
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CPK_IN_NON_HNS_ACCOUNT_ERROR_MESSAGE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.UNAUTHORIZED_SAS;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_HNS_ENABLED;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SAS_TOKEN_PROVIDER_TYPE;
import static org.apache.hadoop.fs.azurebfs.constants.FSOperationType.CREATE_FILESYSTEM;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT;
Expand Down Expand Up @@ -234,6 +236,27 @@ public void initialize(URI uri, Configuration configuration)
throw new InvalidConfigurationValueException(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, ex);
}

/**
anujmodi2021 marked this conversation as resolved.
Show resolved Hide resolved
* Validates if the FixedSASTokenProvider is configured for non-HNS accounts.
anujmodi2021 marked this conversation as resolved.
Show resolved Hide resolved
* If the namespace is not enabled and the FixedSASTokenProvider is not configured,
* the filesystem is closed and an InvalidConfigurationValueException is thrown.
*
* @throws InvalidConfigurationValueException if the namespace is not enabled and the FixedSASTokenProvider is not configured.
*/
try {
if (!getIsNamespaceEnabled(new TracingContext(initFSTracingContext))
anujmodi2021 marked this conversation as resolved.
Show resolved Hide resolved
&& !abfsConfiguration.isFixedSASTokenProviderConfigured()) {
close();
anujmodi2021 marked this conversation as resolved.
Show resolved Hide resolved
throw new InvalidConfigurationValueException(
FS_AZURE_SAS_TOKEN_PROVIDER_TYPE, UNAUTHORIZED_SAS);
}
} catch (AzureBlobFileSystemException ex) {
anujmodi2021 marked this conversation as resolved.
Show resolved Hide resolved
LOG.debug("Failed to determine account type for auth type validation",
ex);
throw new InvalidConfigurationValueException(
FS_AZURE_ACCOUNT_IS_HNS_ENABLED, ex);
}

/*
* Non-hierarchical-namespace account can not have a customer-provided-key(CPK).
* Fail initialization of filesystem if the configs are provided. CPK is of
Expand Down Expand Up @@ -266,6 +289,7 @@ public void initialize(URI uri, Configuration configuration)
}
}
}
getAbfsStore().updateClientWithNamespaceInfo(new TracingContext(initFSTracingContext));

LOG.trace("Initiate check for delegation token manager");
if (UserGroupInformation.isSecurityEnabled()) {
Expand Down Expand Up @@ -797,7 +821,7 @@ private FileStatus getFileStatus(final Path path,
Path qualifiedPath = makeQualified(path);

try {
return abfsStore.getFileStatus(qualifiedPath, tracingContext);
return getAbfsStore().getFileStatus(qualifiedPath, tracingContext);
} catch (AzureBlobFileSystemException ex) {
checkException(path, ex);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,18 @@ public AzureBlobFileSystemStore(
"abfs-bounded");
}

/**
* Updates the client with the namespace information.
*
* @param tracingContext the tracing context to be used for the operation
* @throws AzureBlobFileSystemException if an error occurs while updating the client
*/
public void updateClientWithNamespaceInfo(TracingContext tracingContext)
throws AzureBlobFileSystemException {
boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext);
AbfsClient.setIsNamespaceEnabled(isNamespaceEnabled);
}

/**
* Checks if the given key in Azure Storage should be stored as a page
* blob instead of block blob.
Expand Down Expand Up @@ -635,14 +647,15 @@ public OutputStream createFile(final Path path,
final FsPermission permission, final FsPermission umask,
TracingContext tracingContext) throws IOException {
try (AbfsPerfInfo perfInfo = startTracking("createFile", "createPath")) {
AbfsClient createClient = getClientHandler().getIngressClient();
boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext);
LOG.debug("createFile filesystem: {} path: {} overwrite: {} permission: {} umask: {} isNamespaceEnabled: {}",
getClient().getFileSystem(),
path,
overwrite,
permission,
umask,
isNamespaceEnabled);
createClient.getFileSystem(),
path,
overwrite,
permission,
umask,
isNamespaceEnabled);

String relativePath = getRelativePath(path);
boolean isAppendBlob = false;
Expand All @@ -660,9 +673,9 @@ public OutputStream createFile(final Path path,
}

final ContextEncryptionAdapter contextEncryptionAdapter;
if (getClient().getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
if (createClient.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
contextEncryptionAdapter = new ContextProviderEncryptionAdapter(
getClient().getEncryptionContextProvider(), getRelativePath(path));
createClient.getEncryptionContextProvider(), getRelativePath(path));
} else {
contextEncryptionAdapter = NoContextEncryptionAdapter.getInstance();
}
Expand All @@ -677,7 +690,7 @@ public OutputStream createFile(final Path path,
);

} else {
op = getClient().createPath(relativePath, true,
op = createClient.createPath(relativePath, true,
overwrite,
new Permissions(isNamespaceEnabled, permission, umask),
isAppendBlob,
Expand All @@ -689,15 +702,16 @@ public OutputStream createFile(final Path path,
perfInfo.registerResult(op.getResult()).registerSuccess(true);

AbfsLease lease = maybeCreateLease(relativePath, tracingContext);

String eTag = extractEtagHeader(op.getResult());
return new AbfsOutputStream(
populateAbfsOutputStreamContext(
isAppendBlob,
lease,
getClient(),
getClientHandler(),
statistics,
relativePath,
0,
eTag,
contextEncryptionAdapter,
tracingContext));
}
Expand All @@ -720,12 +734,12 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa
final ContextEncryptionAdapter contextEncryptionAdapter,
final TracingContext tracingContext) throws IOException {
AbfsRestOperation op;

AbfsClient createClient = getClientHandler().getIngressClient();
try {
// Trigger a create with overwrite=false first so that eTag fetch can be
// avoided for cases when no pre-existing file is present (major portion
// of create file traffic falls into the case of no pre-existing file).
op = getClient().createPath(relativePath, true, false, permissions,
op = createClient.createPath(relativePath, true, false, permissions,
isAppendBlob, null, contextEncryptionAdapter, tracingContext);

} catch (AbfsRestOperationException e) {
Expand All @@ -745,12 +759,11 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa
}
}

String eTag = op.getResult()
.getResponseHeader(HttpHeaderConfigurations.ETAG);
String eTag = extractEtagHeader(op.getResult());

try {
// overwrite only if eTag matches with the file properties fetched befpre
op = getClient().createPath(relativePath, true, true, permissions,
op = createClient.createPath(relativePath, true, true, permissions,
isAppendBlob, eTag, contextEncryptionAdapter, tracingContext);
} catch (AbfsRestOperationException ex) {
if (ex.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) {
Expand Down Expand Up @@ -778,22 +791,24 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa
*
* @param isAppendBlob is Append blob support enabled?
* @param lease instance of AbfsLease for this AbfsOutputStream.
* @param client AbfsClient.
* @param clientHandler AbfsClientHandler.
* @param statistics FileSystem statistics.
* @param path Path for AbfsOutputStream.
* @param position Position or offset of the file being opened, set to 0
* when creating a new file, but needs to be set for APPEND
* calls on the same file.
* @param eTag eTag of the file.
* @param tracingContext instance of TracingContext for this AbfsOutputStream.
* @return AbfsOutputStreamContext instance with the desired parameters.
*/
private AbfsOutputStreamContext populateAbfsOutputStreamContext(
boolean isAppendBlob,
AbfsLease lease,
AbfsClient client,
AbfsClientHandler clientHandler,
FileSystem.Statistics statistics,
String path,
long position,
String eTag,
ContextEncryptionAdapter contextEncryptionAdapter,
TracingContext tracingContext) {
int bufferSize = abfsConfiguration.getWriteBufferSize();
Expand All @@ -814,24 +829,38 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(
.withEncryptionAdapter(contextEncryptionAdapter)
.withBlockFactory(getBlockFactory())
.withBlockOutputActiveBlocks(blockOutputActiveBlocks)
.withClient(client)
.withClientHandler(clientHandler)
.withPosition(position)
.withFsStatistics(statistics)
.withPath(path)
.withExecutorService(new SemaphoredDelegatingExecutor(boundedThreadPool,
blockOutputActiveBlocks, true))
.withTracingContext(tracingContext)
.withAbfsBackRef(fsBackRef)
.withIngressServiceType(abfsConfiguration.getIngressServiceType())
.withDFSToBlobFallbackEnabled(abfsConfiguration.isDfsToBlobFallbackEnabled())
.withETag(eTag)
.build();
}

/**
* Creates a directory.
*
* @param path Path of the directory to create.
* @param permission Permission of the directory.
* @param umask Umask of the directory.
* @param tracingContext tracing context
*
* @throws AzureBlobFileSystemException server error.
*/
public void createDirectory(final Path path, final FsPermission permission,
final FsPermission umask, TracingContext tracingContext)
throws IOException {
try (AbfsPerfInfo perfInfo = startTracking("createDirectory", "createPath")) {
AbfsClient createClient = getClientHandler().getIngressClient();
boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext);
LOG.debug("createDirectory filesystem: {} path: {} permission: {} umask: {} isNamespaceEnabled: {}",
getClient().getFileSystem(),
createClient.getFileSystem(),
path,
permission,
umask,
Expand All @@ -841,7 +870,7 @@ public void createDirectory(final Path path, final FsPermission permission,
!isNamespaceEnabled || abfsConfiguration.isEnabledMkdirOverwrite();
Permissions permissions = new Permissions(isNamespaceEnabled,
permission, umask);
final AbfsRestOperation op = getClient().createPath(getRelativePath(path),
final AbfsRestOperation op = createClient.createPath(getRelativePath(path),
false, overwrite, permissions, false, null, null, tracingContext);
perfInfo.registerResult(op.getResult()).registerSuccess(true);
}
Expand Down Expand Up @@ -976,6 +1005,7 @@ public OutputStream openFileForWrite(final Path path,
overwrite);

String relativePath = getRelativePath(path);
AbfsClient writeClient = getClientHandler().getIngressClient();

final AbfsRestOperation op = getClient()
.getPathStatus(relativePath, false, tracingContext, null);
Expand All @@ -1000,8 +1030,9 @@ public OutputStream openFileForWrite(final Path path,
}

AbfsLease lease = maybeCreateLease(relativePath, tracingContext);
final String eTag = extractEtagHeader(op.getResult());
final ContextEncryptionAdapter contextEncryptionAdapter;
if (getClient().getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
if (writeClient.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
final String encryptionContext = op.getResult()
.getResponseHeader(
HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT);
Expand All @@ -1010,7 +1041,7 @@ public OutputStream openFileForWrite(final Path path,
"File doesn't have encryptionContext.");
}
contextEncryptionAdapter = new ContextProviderEncryptionAdapter(
getClient().getEncryptionContextProvider(), getRelativePath(path),
writeClient.getEncryptionContextProvider(), getRelativePath(path),
encryptionContext.getBytes(StandardCharsets.UTF_8));
} else {
contextEncryptionAdapter = NoContextEncryptionAdapter.getInstance();
Expand All @@ -1020,10 +1051,11 @@ public OutputStream openFileForWrite(final Path path,
populateAbfsOutputStreamContext(
isAppendBlob,
lease,
getClient(),
getClientHandler(),
statistics,
relativePath,
offset,
eTag,
contextEncryptionAdapter,
tracingContext));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public final class AbfsHttpConstants {
public static final String DEFAULT_TIMEOUT = "90";
public static final String APPEND_BLOB_TYPE = "appendblob";
public static final String LIST = "list";
public static final String BLOCK_BLOB_TYPE = "BlockBlob";
anujmodi2021 marked this conversation as resolved.
Show resolved Hide resolved
public static final String APPEND_BLOCK = "appendblock";
anujmodi2021 marked this conversation as resolved.
Show resolved Hide resolved

//Abfs Http Client Constants for Blob Endpoint APIs.

Expand Down Expand Up @@ -238,7 +240,7 @@ public static ApiVersion getCurrentVersion() {
public static final String PUT_BLOCK_LIST = "PutBlockList";

/**
* Value that differentiates categories of the http_status.
* Value that differentiates categories of the HTTP status.
* <pre>
* 100 - 199 : Informational responses
* 200 - 299 : Successful responses
Expand All @@ -249,6 +251,28 @@ public static ApiVersion getCurrentVersion() {
*/
public static final Integer HTTP_STATUS_CATEGORY_QUOTIENT = 100;

/**
anujmodi2021 marked this conversation as resolved.
Show resolved Hide resolved
* XML version declaration for the block list.
*/
public static final String XML_VERSION = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>%n";

/**
* Start tag for the block list XML.
*/
public static final String BLOCK_LIST_START_TAG = "<BlockList>%n";

/**
* End tag for the block list XML.
*/
public static final String BLOCK_LIST_END_TAG = "</BlockList>%n";

/**
* Format string for the latest block in the block list XML.
* The placeholder will be replaced with the block identifier.
*/
public static final String LATEST_BLOCK_FORMAT = "<Latest>%s</Latest>%n";


/**
* List of configurations that are related to Customer-Provided-Keys.
* <ol>
Expand Down Expand Up @@ -279,6 +303,13 @@ public static ApiVersion getCurrentVersion() {
+ "non-hierarchical-namespace account:"
+ CPK_CONFIG_LIST;


/**
* Exception message on filesystem init if token-provider-auth-type configs are provided
*/
public static final String UNAUTHORIZED_SAS =
anujmodi2021 marked this conversation as resolved.
Show resolved Hide resolved
"Non-hierarchical namespace accounts only work with FixedSASTokenProvider.";

/**
* System property that define maximum number of cached-connection per fileSystem for
* ApacheHttpClient. JDK network library uses the same property to define maximum
Expand All @@ -289,6 +320,12 @@ public static ApiVersion getCurrentVersion() {
public static final String APACHE_IMPL = "Apache";
public static final String JDK_FALLBACK = "JDK_fallback";
public static final String KEEP_ALIVE_CACHE_CLOSED = "KeepAliveCache is closed";
public static final String DFS_FLUSH = "DFlush";
public static final String DFS_APPEND = "DAppend";
public static final String BLOB_FLUSH = "BFlush";
public static final String BLOB_APPEND = "BAppend";
public static final String FALLBACK_FLUSH = "FBFlush";
public static final String FALLBACK_APPEND = "FBAppend";

private AbfsHttpConstants() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ public final class FileSystemConfigurations {
*/
public static final int BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT = 20;

/**
* Length of the block ID used for appends.
rakeshadr marked this conversation as resolved.
Show resolved Hide resolved
*/
public static final int BLOCK_ID_LENGTH = 60;
anujmodi2021 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Buffer blocks to disk.
* Capacity is limited to available disk space.
Expand Down
Loading
Loading