Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-19232: [ABFS][FNSOverBlob] Implementing Ingress Support with various Fallback Handling #7272

Merged
merged 22 commits into from
Jan 30, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
review comments
anmolanmol1234 committed Jan 10, 2025
commit 2d605022c1357fa39b912a14b97efb39068f9f72
Original file line number Diff line number Diff line change
@@ -61,6 +61,4 @@
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]commit[\\/]ITestAbfsTerasort.java"/>
<suppress checks="ParameterNumber"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]TestAbfsOutputStream.java"/>
<suppress checks="MagicNumber"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]ITestAzureBlobFileSystemAppend.java"/>
</suppressions>
Original file line number Diff line number Diff line change
@@ -524,10 +524,10 @@ public boolean isDfsToBlobFallbackEnabled() {
public void validateConfiguredServiceType(boolean isHNSEnabled)
throws InvalidConfigurationValueException {
// TODO: [FnsOverBlob][HADOOP-19179] Remove this check when FNS over Blob is ready.
if (getFsConfiguredServiceType() == AbfsServiceType.BLOB) {
throw new InvalidConfigurationValueException(FS_DEFAULT_NAME_KEY,
"Blob Endpoint Support not yet available");
}
// if (getFsConfiguredServiceType() == AbfsServiceType.BLOB) {
anmolanmol1234 marked this conversation as resolved.
Show resolved Hide resolved
// throw new InvalidConfigurationValueException(FS_DEFAULT_NAME_KEY,
// "Blob Endpoint Support not yet available");
// }
if (isHNSEnabled && getConfiguredServiceTypeForFNSAccounts() == AbfsServiceType.BLOB) {
throw new InvalidConfigurationValueException(
FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, "Service Type Cannot be BLOB for HNS Account");
Original file line number Diff line number Diff line change
@@ -254,23 +254,23 @@ public static ApiVersion getCurrentVersion() {
/**
anujmodi2021 marked this conversation as resolved.
Show resolved Hide resolved
* XML version declaration for the block list.
*/
public static final String XML_VERSION = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>%n";
public static final String XML_VERSION = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n";

/**
* Start tag for the block list XML.
*/
public static final String BLOCK_LIST_START_TAG = "<BlockList>%n";
public static final String BLOCK_LIST_START_TAG = "<BlockList>\n";

/**
* End tag for the block list XML.
*/
public static final String BLOCK_LIST_END_TAG = "</BlockList>%n";
public static final String BLOCK_LIST_END_TAG = "</BlockList>\n";

/**
* Format string for the latest block in the block list XML.
* The placeholder will be replaced with the block identifier.
*/
public static final String LATEST_BLOCK_FORMAT = "<Latest>%s</Latest>%n";
public static final String LATEST_BLOCK_FORMAT = "<Latest>%s</Latest>\n";


/**
@@ -313,6 +313,12 @@ public static ApiVersion getCurrentVersion() {
public static final String APACHE_IMPL = "Apache";
public static final String JDK_FALLBACK = "JDK_fallback";
public static final String KEEP_ALIVE_CACHE_CLOSED = "KeepAliveCache is closed";
public static final String DFS_FLUSH = "DFlush";
public static final String DFS_APPEND = "DAppend";
public static final String BLOB_FLUSH = "BFlush";
public static final String BLOB_APPEND = "BAppend";
public static final String FALLBACK_FLUSH = "FBFlush";
public static final String FALLBACK_APPEND = "FBAppend";

private AbfsHttpConstants() {}
}
Original file line number Diff line number Diff line change
@@ -1387,7 +1387,7 @@ public boolean checkIsDir(AbfsHttpOperation result) {
public boolean checkUserError(int responseStatusCode) {
return (responseStatusCode >= HttpURLConnection.HTTP_BAD_REQUEST
&& responseStatusCode < HttpURLConnection.HTTP_INTERNAL_ERROR
&& responseStatusCode != HTTP_CONFLICT);
&& responseStatusCode != HttpURLConnection.HTTP_CONFLICT);
}

/**
Original file line number Diff line number Diff line change
@@ -59,7 +59,7 @@ public abstract class AbfsHttpOperation implements AbfsPerfLoggable {

private static final int CLEAN_UP_BUFFER_SIZE = 64 * 1024;

public static final int ONE_THOUSAND = 1000;
private static final int ONE_THOUSAND = 1000;

private static final int ONE_MILLION = ONE_THOUSAND * ONE_THOUSAND;

Original file line number Diff line number Diff line change
@@ -58,6 +58,7 @@
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.STREAM_ID_LEN;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_WRITE_WITHOUT_LEASE;
import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable;
@@ -483,10 +484,6 @@ private synchronized void uploadCurrentBlock() throws IOException {
}
}

private synchronized AbfsPerfTracker getAbfsPerfTracker() {
return client.getAbfsPerfTracker();
}

/**
* Upload a block of data.
* This will take the block.
@@ -514,9 +511,10 @@ private void uploadBlockAsync(AbfsBlock blockToUpload,
DataBlocks.BlockUploadData blockUploadData = blockToUpload.startUpload();
final Future<Void> job =
executorService.submit(() -> {
AbfsPerfTracker tracker = getAbfsPerfTracker();
AbfsPerfTracker tracker =
client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
"writeCurrentBufferToService", "append")) {
"writeCurrentBufferToService", APPEND_ACTION)) {
AppendRequestParameters.Mode
mode = APPEND_MODE;
if (isFlush & isClose) {
@@ -1042,7 +1040,7 @@ ListeningExecutorService getExecutorService() {
* @return The Azure Blob Storage client.
*/
@VisibleForTesting
synchronized AbfsClient getClient() {
AbfsClient getClient() {
return client;
}

Original file line number Diff line number Diff line change
@@ -93,11 +93,13 @@ protected synchronized AbfsBlock createBlockInternal(long position)
*/
private List<String> getBlockList(TracingContext tracingContext)
throws AzureBlobFileSystemException {
List<String> committedBlockIdList;
List<String> committedBlockIdList = new ArrayList<>();
AbfsBlobClient blobClient = abfsOutputStream.getClientHandler().getBlobClient();
final AbfsRestOperation op = blobClient
.getBlockList(abfsOutputStream.getPath(), tracingContext);
committedBlockIdList = op.getResult().getBlockIdList();
if (op != null && op.getResult() != null) {
committedBlockIdList = op.getResult().getBlockIdList();
}
return committedBlockIdList;
}

Original file line number Diff line number Diff line change
@@ -35,6 +35,10 @@
import org.apache.hadoop.fs.store.DataBlocks;
import org.apache.hadoop.io.IOUtils;

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOB_APPEND;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOB_FLUSH;

public class AzureBlobIngressHandler extends AzureIngressHandler {

private static final Logger LOG = LoggerFactory.getLogger(
@@ -116,10 +120,9 @@ protected AbfsRestOperation remoteWrite(AbfsBlock blockToUpload,
BlobAppendRequestParameters blobParams = new BlobAppendRequestParameters(blockToUpload.getBlockId(), getETag());
reqParams.setBlobParams(blobParams);
AbfsRestOperation op;
long threadId = Thread.currentThread().getId();
String threadIdStr = String.valueOf(threadId);
String threadIdStr = String.valueOf(Thread.currentThread().getId());
TracingContext tracingContextAppend = new TracingContext(tracingContext);
tracingContextAppend.setIngressHandler("BAppend T " + threadIdStr);
tracingContextAppend.setIngressHandler(BLOB_APPEND + " T " + threadIdStr);
tracingContextAppend.setPosition(String.valueOf(blockToUpload.getOffset()));
try {
LOG.trace("Starting remote write for block with ID {} and offset {}",
@@ -172,7 +175,7 @@ protected synchronized AbfsRestOperation remoteFlush(final long offset,
String blockListXml = generateBlockListXml(
blobBlockManager.getBlockIdList());
TracingContext tracingContextFlush = new TracingContext(tracingContext);
tracingContextFlush.setIngressHandler("BFlush");
tracingContextFlush.setIngressHandler(BLOB_FLUSH);
tracingContextFlush.setPosition(String.valueOf(offset));
LOG.trace("Flushing data at offset {} for path {}", offset, abfsOutputStream.getPath());
op = getClient().flush(blockListXml.getBytes(StandardCharsets.UTF_8),
@@ -279,7 +282,7 @@ protected void writeAppendBlobCurrentBufferToService() throws IOException {
// Perform the upload within a performance tracking context.
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(
blobClient.getAbfsPerfTracker(),
"writeCurrentBufferToService", "append")) {
"writeCurrentBufferToService", APPEND_ACTION)) {
LOG.trace("Writing current buffer to service at offset {} and path {}", offset, abfsOutputStream.getPath());
AppendRequestParameters reqParams = new AppendRequestParameters(
offset, 0, bytesLength, AppendRequestParameters.Mode.APPEND_MODE,
Original file line number Diff line number Diff line change
@@ -133,10 +133,10 @@ protected int getBlockSize() {
* Clears the active block.
*/
void clearActiveBlock() {
if (activeBlock != null) {
LOG.debug("Clearing active block");
}
synchronized (this) {
if (activeBlock != null) {
LOG.debug("Clearing active block");
}
activeBlock = null;
}
}
Original file line number Diff line number Diff line change
@@ -28,6 +28,9 @@
import org.apache.hadoop.fs.store.DataBlocks;
import org.apache.hadoop.io.IOUtils;

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DFS_APPEND;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DFS_FLUSH;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;

/**
@@ -113,10 +116,9 @@ protected AbfsRestOperation remoteWrite(AbfsBlock blockToUpload,
AppendRequestParameters reqParams,
TracingContext tracingContext) throws IOException {
TracingContext tracingContextAppend = new TracingContext(tracingContext);
long threadId = Thread.currentThread().getId();
String threadIdStr = String.valueOf(threadId);
String threadIdStr = String.valueOf(Thread.currentThread().getId());
if (tracingContextAppend.getIngressHandler().equals(EMPTY_STRING)) {
tracingContextAppend.setIngressHandler("DAppend T " + threadIdStr);
tracingContextAppend.setIngressHandler(DFS_APPEND + " T " + threadIdStr);
tracingContextAppend.setPosition(
String.valueOf(blockToUpload.getOffset()));
}
@@ -171,7 +173,7 @@ protected synchronized AbfsRestOperation remoteFlush(final long offset,
throws IOException {
TracingContext tracingContextFlush = new TracingContext(tracingContext);
if (tracingContextFlush.getIngressHandler().equals(EMPTY_STRING)) {
tracingContextFlush.setIngressHandler("DFlush");
tracingContextFlush.setIngressHandler(DFS_FLUSH);
tracingContextFlush.setPosition(String.valueOf(offset));
}
LOG.trace("Flushing data at offset {} and path {}", offset, abfsOutputStream.getPath());
@@ -215,7 +217,7 @@ protected void writeAppendBlobCurrentBufferToService() throws IOException {
// Perform the upload within a performance tracking context.
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(
dfsClient.getAbfsPerfTracker(),
"writeCurrentBufferToService", "append")) {
"writeCurrentBufferToService", APPEND_ACTION)) {
LOG.trace("Writing current buffer to service at offset {} and path {}", offset, abfsOutputStream.getPath());
AppendRequestParameters reqParams = new AppendRequestParameters(
offset, 0, bytesLength, AppendRequestParameters.Mode.APPEND_MODE,
@@ -242,7 +244,6 @@ protected void writeAppendBlobCurrentBufferToService() throws IOException {
}
}

rakeshadr marked this conversation as resolved.
Show resolved Hide resolved

/**
* Gets the block manager.
*
Original file line number Diff line number Diff line change
@@ -34,6 +34,10 @@
import org.apache.hadoop.fs.store.DataBlocks;
import org.apache.hadoop.io.IOUtils;

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FALLBACK_APPEND;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FALLBACK_FLUSH;

/**
* Handles the fallback mechanism for Azure Blob Ingress operations.
*/
@@ -106,9 +110,8 @@ protected AbfsRestOperation remoteWrite(AbfsBlock blockToUpload,
TracingContext tracingContext) throws IOException {
AbfsRestOperation op;
TracingContext tracingContextAppend = new TracingContext(tracingContext);
long threadId = Thread.currentThread().getId();
String threadIdStr = String.valueOf(threadId);
tracingContextAppend.setIngressHandler("FBAppend T " + threadIdStr);
String threadIdStr = String.valueOf(Thread.currentThread().getId());
tracingContextAppend.setIngressHandler(FALLBACK_APPEND + " T " + threadIdStr);
tracingContextAppend.setPosition(String.valueOf(blockToUpload.getOffset()));
try {
op = super.remoteWrite(blockToUpload, uploadData, reqParams,
@@ -149,7 +152,7 @@ protected synchronized AbfsRestOperation remoteFlush(final long offset,
}
try {
TracingContext tracingContextFlush = new TracingContext(tracingContext);
tracingContextFlush.setIngressHandler("FBFlush");
tracingContextFlush.setIngressHandler(FALLBACK_FLUSH);
tracingContextFlush.setPosition(String.valueOf(offset));
op = super.remoteFlush(offset, retainUncommitedData, isClose, leaseId,
tracingContextFlush);
@@ -221,7 +224,7 @@ protected void writeAppendBlobCurrentBufferToService() throws IOException {
// Perform the upload within a performance tracking context.
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(
getClient().getAbfsPerfTracker(),
"writeCurrentBufferToService", "append")) {
"writeCurrentBufferToService", APPEND_ACTION)) {
LOG.trace("Writing current buffer to service at offset {} and path {}", offset, abfsOutputStream.getPath());
AppendRequestParameters reqParams = new AppendRequestParameters(
offset, 0, bytesLength, AppendRequestParameters.Mode.APPEND_MODE,
Original file line number Diff line number Diff line change
@@ -627,15 +627,4 @@ protected void assumeBlobServiceType() {
Assume.assumeTrue("Blob service type is required for this test",
getAbfsServiceType() == AbfsServiceType.BLOB);
}

/**
* Assert that the path contains the expected DNS suffix.
* If service type is blob, then path should have blob domain name.
* @param path to be asserted.
*/
protected void assertPathDns(Path path) {
String expectedDns = getAbfsServiceType() == AbfsServiceType.BLOB
? ABFS_BLOB_DOMAIN_NAME : ABFS_DFS_DOMAIN_NAME;
Assertions.assertThat(path.toString()).contains(expectedDns);
}
}
Original file line number Diff line number Diff line change
@@ -92,6 +92,7 @@ public class ITestAbfsCustomEncryption extends AbstractAbfsIntegrationTest {

private final byte[] cpk = new byte[ENCRYPTION_KEY_LEN];
private final String cpkSHAEncoded;
private static final String BLOCK_ID = "MF8tNDE1MjkzOTE4AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA";

private List<AzureBlobFileSystem> fileSystemsOpenedInTest = new ArrayList<>();

@@ -257,6 +258,7 @@ private AbfsRestOperation callOperation(AzureBlobFileSystem fs,
AbfsClient client = fs.getAbfsClient();
AbfsClient ingressClient = fs.getAbfsStore().getClientHandler().getIngressClient();
AbfsClientUtils.setEncryptionContextProvider(client, ecp);
anujmodi2021 marked this conversation as resolved.
Show resolved Hide resolved
AbfsClientUtils.setEncryptionContextProvider(ingressClient, ecp);
if (isExceptionCase) {
LambdaTestUtils.intercept(IOException.class, () -> {
switch (operation) {
@@ -339,7 +341,7 @@ private AbfsRestOperation callOperation(AzureBlobFileSystem fs,
} else {
return ingressClient.append(path, "val".getBytes(),
new AppendRequestParameters(3, 0, 3, APPEND_MODE, false, null,
true, new BlobAppendRequestParameters("MF8tNDE1MjkzOTE4AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", null)),
true, new BlobAppendRequestParameters(BLOCK_ID, null)),
null, encryptionAdapter, getTestTracingContext(fs, false));
}
case SET_ACL:
Original file line number Diff line number Diff line change
@@ -40,6 +40,7 @@
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.GET_RESPONSES;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.SEND_REQUESTS;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH;

public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {

@@ -62,7 +63,7 @@ private int countDirectory(String path) {
return 0;
}
return (int) path.substring(index + getFileSystemName().length()).chars()
.filter(ch -> ch == '/').count();
.filter(ch -> ch == FORWARD_SLASH.charAt(0)).count();
}

/**
@@ -95,7 +96,7 @@ public void testAbfsHttpSendStatistics() throws IOException {
// 1 create request = 1 connection made and 1 send request
if (client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs)) {
expectedRequestsSent += (directory);
// Per directory we have 2 calls :- GetBlobProperties and PutBlob and 1 ListBlobs call (implicit check) for the path.
// Per directory, we have 2 calls :- GetBlobProperties and PutBlob and 1 ListBlobs call (implicit check) for the path.
expectedConnectionsMade += ((directory * 2) + 1);
} else {
expectedRequestsSent++;
@@ -154,6 +155,7 @@ public void testAbfsHttpSendStatistics() throws IOException {
// Operation: AbfsOutputStream close.
// Network Stats calculation: 1 flush (with close) is send.
// 1 flush request = 1 connection and 1 send request
// Flush with no data is a no-op for blob endpoint, hence update only for dfs endpoint.
if (client instanceof AbfsDfsClient) {
anujmodi2021 marked this conversation as resolved.
Show resolved Hide resolved
expectedConnectionsMade++;
expectedRequestsSent++;
@@ -173,7 +175,8 @@ public void testAbfsHttpSendStatistics() throws IOException {
* create overwrite=false (will fail in this case as file is indeed present)
* + getFileStatus to fetch the file ETag
* + create overwrite=true
* = 3 connections and 2 send requests
* = 3 connections and 2 send requests in case of Dfs Client
* = 7 connections (5 GBP and 2 PutBlob calls) in case of Blob Client
*/
if (fs.getAbfsStore().getAbfsConfiguration().isConditionalCreateOverwriteEnabled()) {
if (client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs)) {
Loading