Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-19232: [ABFS][FNSOverBlob] Implementing Ingress Support with various Fallback Handling #7272

Merged
merged 22 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,6 @@
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]commit[\\/]ITestAbfsTerasort.java"/>
<suppress checks="ParameterNumber"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]TestAbfsOutputStream.java"/>
<suppress checks="MagicNumber"
anujmodi2021 marked this conversation as resolved.
Show resolved Hide resolved
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]ITestAzureBlobFileSystemAppend.java"/>
</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ public void initialize(URI uri, Configuration configuration)
}
}
}
getAbfsStore().updateClientWithNamespaceInfo(new TracingContext(initFSTracingContext));

LOG.trace("Initiate check for delegation token manager");
if (UserGroupInformation.isSecurityEnabled()) {
Expand Down Expand Up @@ -797,7 +798,7 @@ private FileStatus getFileStatus(final Path path,
Path qualifiedPath = makeQualified(path);

try {
return abfsStore.getFileStatus(qualifiedPath, tracingContext);
return getAbfsStore().getFileStatus(qualifiedPath, tracingContext);
} catch (AzureBlobFileSystemException ex) {
checkException(path, ex);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,18 @@ public AzureBlobFileSystemStore(
"abfs-bounded");
}

/**
* Updates the client with the namespace information.
*
* @param tracingContext the tracing context to be used for the operation
* @throws AzureBlobFileSystemException if an error occurs while updating the client
*/
public void updateClientWithNamespaceInfo(TracingContext tracingContext)
throws AzureBlobFileSystemException {
boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext);
AbfsClient.setIsNamespaceEnabled(isNamespaceEnabled);
}

/**
* Checks if the given key in Azure Storage should be stored as a page
* blob instead of block blob.
Expand Down Expand Up @@ -635,14 +647,15 @@ public OutputStream createFile(final Path path,
final FsPermission permission, final FsPermission umask,
TracingContext tracingContext) throws IOException {
try (AbfsPerfInfo perfInfo = startTracking("createFile", "createPath")) {
AbfsClient createClient = getClientHandler().getIngressClient();
boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext);
LOG.debug("createFile filesystem: {} path: {} overwrite: {} permission: {} umask: {} isNamespaceEnabled: {}",
getClient().getFileSystem(),
path,
overwrite,
permission,
umask,
isNamespaceEnabled);
createClient.getFileSystem(),
path,
overwrite,
permission,
umask,
isNamespaceEnabled);

String relativePath = getRelativePath(path);
boolean isAppendBlob = false;
Expand All @@ -660,9 +673,9 @@ public OutputStream createFile(final Path path,
}

final ContextEncryptionAdapter contextEncryptionAdapter;
if (getClient().getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
if (createClient.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
contextEncryptionAdapter = new ContextProviderEncryptionAdapter(
getClient().getEncryptionContextProvider(), getRelativePath(path));
createClient.getEncryptionContextProvider(), getRelativePath(path));
} else {
contextEncryptionAdapter = NoContextEncryptionAdapter.getInstance();
}
Expand All @@ -677,7 +690,7 @@ public OutputStream createFile(final Path path,
);

} else {
op = getClient().createPath(relativePath, true,
op = createClient.createPath(relativePath, true,
overwrite,
new Permissions(isNamespaceEnabled, permission, umask),
isAppendBlob,
Expand All @@ -689,15 +702,16 @@ public OutputStream createFile(final Path path,
perfInfo.registerResult(op.getResult()).registerSuccess(true);

AbfsLease lease = maybeCreateLease(relativePath, tracingContext);

String eTag = extractEtagHeader(op.getResult());
return new AbfsOutputStream(
populateAbfsOutputStreamContext(
isAppendBlob,
lease,
getClient(),
getClientHandler(),
statistics,
relativePath,
0,
eTag,
contextEncryptionAdapter,
tracingContext));
}
Expand All @@ -720,12 +734,12 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa
final ContextEncryptionAdapter contextEncryptionAdapter,
final TracingContext tracingContext) throws IOException {
AbfsRestOperation op;

AbfsClient createClient = getClientHandler().getIngressClient();
try {
// Trigger a create with overwrite=false first so that eTag fetch can be
// avoided for cases when no pre-existing file is present (major portion
// of create file traffic falls into the case of no pre-existing file).
op = getClient().createPath(relativePath, true, false, permissions,
op = createClient.createPath(relativePath, true, false, permissions,
isAppendBlob, null, contextEncryptionAdapter, tracingContext);

} catch (AbfsRestOperationException e) {
Expand All @@ -745,12 +759,11 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa
}
}

String eTag = op.getResult()
.getResponseHeader(HttpHeaderConfigurations.ETAG);
String eTag = extractEtagHeader(op.getResult());

try {
// overwrite only if eTag matches with the file properties fetched befpre
op = getClient().createPath(relativePath, true, true, permissions,
op = createClient.createPath(relativePath, true, true, permissions,
isAppendBlob, eTag, contextEncryptionAdapter, tracingContext);
} catch (AbfsRestOperationException ex) {
if (ex.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) {
Expand Down Expand Up @@ -778,22 +791,24 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa
*
* @param isAppendBlob is Append blob support enabled?
* @param lease instance of AbfsLease for this AbfsOutputStream.
* @param client AbfsClient.
* @param clientHandler AbfsClientHandler.
* @param statistics FileSystem statistics.
* @param path Path for AbfsOutputStream.
* @param position Position or offset of the file being opened, set to 0
* when creating a new file, but needs to be set for APPEND
* calls on the same file.
* @param eTag eTag of the file.
* @param tracingContext instance of TracingContext for this AbfsOutputStream.
* @return AbfsOutputStreamContext instance with the desired parameters.
*/
private AbfsOutputStreamContext populateAbfsOutputStreamContext(
boolean isAppendBlob,
AbfsLease lease,
AbfsClient client,
AbfsClientHandler clientHandler,
FileSystem.Statistics statistics,
String path,
long position,
String eTag,
ContextEncryptionAdapter contextEncryptionAdapter,
TracingContext tracingContext) {
int bufferSize = abfsConfiguration.getWriteBufferSize();
Expand All @@ -814,24 +829,38 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(
.withEncryptionAdapter(contextEncryptionAdapter)
.withBlockFactory(getBlockFactory())
.withBlockOutputActiveBlocks(blockOutputActiveBlocks)
.withClient(client)
.withClientHandler(clientHandler)
.withPosition(position)
.withFsStatistics(statistics)
.withPath(path)
.withExecutorService(new SemaphoredDelegatingExecutor(boundedThreadPool,
blockOutputActiveBlocks, true))
.withTracingContext(tracingContext)
.withAbfsBackRef(fsBackRef)
.withIngressServiceType(abfsConfiguration.getIngressServiceType())
.withDFSToBlobFallbackEnabled(abfsConfiguration.isDfsToBlobFallbackEnabled())
.withETag(eTag)
.build();
}

/**
* Creates a directory.
*
* @param path Path of the directory to create.
* @param permission Permission of the directory.
* @param umask Umask of the directory.
* @param tracingContext tracing context
*
* @throws AzureBlobFileSystemException server error.
*/
public void createDirectory(final Path path, final FsPermission permission,
final FsPermission umask, TracingContext tracingContext)
throws IOException {
try (AbfsPerfInfo perfInfo = startTracking("createDirectory", "createPath")) {
AbfsClient createClient = getClientHandler().getIngressClient();
boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext);
LOG.debug("createDirectory filesystem: {} path: {} permission: {} umask: {} isNamespaceEnabled: {}",
getClient().getFileSystem(),
createClient.getFileSystem(),
path,
permission,
umask,
Expand All @@ -841,7 +870,7 @@ public void createDirectory(final Path path, final FsPermission permission,
!isNamespaceEnabled || abfsConfiguration.isEnabledMkdirOverwrite();
Permissions permissions = new Permissions(isNamespaceEnabled,
permission, umask);
final AbfsRestOperation op = getClient().createPath(getRelativePath(path),
final AbfsRestOperation op = createClient.createPath(getRelativePath(path),
false, overwrite, permissions, false, null, null, tracingContext);
perfInfo.registerResult(op.getResult()).registerSuccess(true);
}
Expand Down Expand Up @@ -976,6 +1005,7 @@ public OutputStream openFileForWrite(final Path path,
overwrite);

String relativePath = getRelativePath(path);
AbfsClient writeClient = getClientHandler().getIngressClient();

final AbfsRestOperation op = getClient()
.getPathStatus(relativePath, false, tracingContext, null);
Expand All @@ -1000,8 +1030,9 @@ public OutputStream openFileForWrite(final Path path,
}

AbfsLease lease = maybeCreateLease(relativePath, tracingContext);
final String eTag = extractEtagHeader(op.getResult());
final ContextEncryptionAdapter contextEncryptionAdapter;
if (getClient().getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
if (writeClient.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
final String encryptionContext = op.getResult()
.getResponseHeader(
HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT);
Expand All @@ -1010,7 +1041,7 @@ public OutputStream openFileForWrite(final Path path,
"File doesn't have encryptionContext.");
}
contextEncryptionAdapter = new ContextProviderEncryptionAdapter(
getClient().getEncryptionContextProvider(), getRelativePath(path),
writeClient.getEncryptionContextProvider(), getRelativePath(path),
encryptionContext.getBytes(StandardCharsets.UTF_8));
} else {
contextEncryptionAdapter = NoContextEncryptionAdapter.getInstance();
Expand All @@ -1020,10 +1051,11 @@ public OutputStream openFileForWrite(final Path path,
populateAbfsOutputStreamContext(
isAppendBlob,
lease,
getClient(),
getClientHandler(),
statistics,
relativePath,
offset,
eTag,
contextEncryptionAdapter,
tracingContext));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public final class AbfsHttpConstants {
public static final String DEFAULT_TIMEOUT = "90";
public static final String APPEND_BLOB_TYPE = "appendblob";
public static final String LIST = "list";
public static final String BLOCK_BLOB_TYPE = "BlockBlob";
anujmodi2021 marked this conversation as resolved.
Show resolved Hide resolved
public static final String APPEND_BLOCK = "appendblock";
anujmodi2021 marked this conversation as resolved.
Show resolved Hide resolved

//Abfs Http Client Constants for Blob Endpoint APIs.

Expand Down Expand Up @@ -238,7 +240,7 @@ public static ApiVersion getCurrentVersion() {
public static final String PUT_BLOCK_LIST = "PutBlockList";

/**
* Value that differentiates categories of the http_status.
* Value that differentiates categories of the HTTP status.
* <pre>
* 100 - 199 : Informational responses
* 200 - 299 : Successful responses
Expand All @@ -249,6 +251,28 @@ public static ApiVersion getCurrentVersion() {
*/
public static final Integer HTTP_STATUS_CATEGORY_QUOTIENT = 100;

/**
anujmodi2021 marked this conversation as resolved.
Show resolved Hide resolved
* XML version declaration for the block list.
*/
public static final String XML_VERSION = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>%n";

/**
* Start tag for the block list XML.
*/
public static final String BLOCK_LIST_START_TAG = "<BlockList>%n";

/**
* End tag for the block list XML.
*/
public static final String BLOCK_LIST_END_TAG = "</BlockList>%n";

/**
* Format string for the latest block in the block list XML.
* The placeholder will be replaced with the block identifier.
*/
public static final String LATEST_BLOCK_FORMAT = "<Latest>%s</Latest>%n";


/**
* List of configurations that are related to Customer-Provided-Keys.
* <ol>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ public final class FileSystemConfigurations {
*/
public static final int BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT = 20;

/**
* Length of the block ID used for appends.
rakeshadr marked this conversation as resolved.
Show resolved Hide resolved
*/
public static final int BLOCK_ID_LENGTH = 60;
anujmodi2021 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Buffer blocks to disk.
* Capacity is limited to available disk space.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,5 +104,11 @@ public final class HttpHeaderConfigurations {
*/
public static final String X_MS_BLOB_CONTENT_MD5 = "x-ms-blob-content-md5";

/**
* Http Request Header for denoting blob type.
* {@value}
*/
public static final String X_MS_BLOB_TYPE = "x-ms-blob-type";

private HttpHeaderConfigurations() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs.contracts.exceptions;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/**
* Exception thrown when an invalid ingress service is encountered.
*
* <p>This exception is used to indicate that the ingress service being used
* is not valid or supported. It extends the {@link AbfsRestOperationException}
* to provide additional context about the error.</p>
*
* @see AbfsRestOperationException
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public final class InvalidIngressServiceException extends AbfsRestOperationException {

/**
* Constructs a new InvalidIngressServiceException with the specified details.
*
* @param statusCode the HTTP status code
* @param errorCode the error code
* @param errorMessage the error message
* @param innerException the inner exception
*/
public InvalidIngressServiceException(final int statusCode,
anujmodi2021 marked this conversation as resolved.
Show resolved Hide resolved
final String errorCode,
final String errorMessage,
final Exception innerException) {
super(statusCode, errorCode, errorMessage, innerException);
}
}
Loading
Loading