Skip to content

Commit

Permalink
Changes as per comments given
Browse files Browse the repository at this point in the history
  • Loading branch information
bhattmanish98 committed Jan 8, 2025
1 parent 5a29c44 commit 83fc86a
Show file tree
Hide file tree
Showing 9 changed files with 373 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;

import static java.net.HttpURLConnection.*;
import static java.net.HttpURLConnection.HTTP_OK;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ACQUIRE_LEASE_ACTION;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPLICATION_JSON;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPLICATION_OCTET_STREAM;
Expand Down Expand Up @@ -557,7 +560,7 @@ public AbfsRestOperation breakLease(final String path,
* @return AbfsClientRenameResult result of rename operation indicating the
* AbfsRest operation, rename recovery and incomplete metadata state failure.
*
* @throws AzureBlobFileSystemException failure, excluding any recovery from overload failures.
* @throws IOException failure, excluding any recovery from overload failures.
*/
@Override
public AbfsClientRenameResult renamePath(final String source,
Expand All @@ -572,7 +575,7 @@ public AbfsClientRenameResult renamePath(final String source,
destination, sourceEtag, isAtomicRenameKey(source), tracingContext
);
incrementAbfsRenamePath();
if( blobRenameHandler.execute()) {
if (blobRenameHandler.execute()) {
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
final URL url = createRequestUrl(destination, abfsUriQueryBuilder.toString());
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
Expand Down Expand Up @@ -1426,6 +1429,7 @@ public boolean isAtomicRenameKey(String key) {
* Action to be taken when atomic-key is present on a getPathStatus path.
*
* @param path path of the pendingJson for the atomic path.
* @param pathLease lease on the path.
* @param tracingContext tracing context.
*
* @throws AzureBlobFileSystemException server error or the path is renamePending json file and action is taken.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public BlobDeleteHandler(final Path path,
*/
@Override
int getMaxConsumptionParallelism() {
return abfsClient.getAbfsConfiguration()
return getAbfsClient().getAbfsConfiguration()
.getBlobDeleteDirConsumptionParallelism();
}

Expand All @@ -83,7 +83,7 @@ int getMaxConsumptionParallelism() {
*/
private boolean deleteInternal(final Path path)
throws AzureBlobFileSystemException {
abfsClient.deleteBlobPath(path, null, tracingContext);
getAbfsClient().deleteBlobPath(path, null, tracingContext);
deleteCount.incrementAndGet();
return true;
}
Expand Down Expand Up @@ -151,7 +151,7 @@ private void ensurePathParentExist()
throws AzureBlobFileSystemException {
if (!path.isRoot() && !path.getParent().isRoot()) {
try {
abfsClient.createPath(path.getParent().toUri().getPath(),
getAbfsClient().createPath(path.getParent().toUri().getPath(),
false,
false,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public BlobRenameHandler(final String src,

@Override
int getMaxConsumptionParallelism() {
return abfsClient.getAbfsConfiguration()
return getAbfsClient().getAbfsConfiguration()
.getBlobRenameDirConsumptionParallelism();
}

Expand All @@ -137,7 +137,7 @@ public boolean execute() throws AzureBlobFileSystemException {
RenameAtomicity renameAtomicity = null;
if (pathInformation.getIsDirectory()
&& pathInformation.getIsImplicit()) {
AbfsRestOperation createMarkerOp = abfsClient.createPath(src.toUri().getPath(),
AbfsRestOperation createMarkerOp = getAbfsClient().createPath(src.toUri().getPath(),
false, false, null,
false, null, null, tracingContext);
pathInformation.setETag(extractEtagHeader(createMarkerOp.getResult()));
Expand Down Expand Up @@ -217,7 +217,7 @@ public RenameAtomicity getRenameAtomicity(final PathInformation pathInformation)
new Path(src.getParent(), src.getName() + RenameAtomicity.SUFFIX),
tracingContext,
pathInformation.getETag(),
abfsClient);
getAbfsClient());
}

/** Takes a lease on the path.
Expand All @@ -230,8 +230,8 @@ public RenameAtomicity getRenameAtomicity(final PathInformation pathInformation)
*/
private AbfsLease takeLease(final Path path, final String eTag)
throws AzureBlobFileSystemException {
AbfsLease lease = new AbfsLease(abfsClient, path.toUri().getPath(), false,
abfsClient.getAbfsConfiguration()
AbfsLease lease = new AbfsLease(getAbfsClient(), path.toUri().getPath(), false,
getAbfsClient().getAbfsConfiguration()
.getAtomicRenameLeaseRefreshDuration(),
eTag, tracingContext);
leases.add(lease);
Expand Down Expand Up @@ -460,7 +460,7 @@ private boolean renameInternal(final Path path,
boolean operated = false;
try {
copyPath(path, destinationPathForBlobPartOfRenameSrcDir, leaseId);
abfsClient.deleteBlobPath(path, leaseId, tracingContext);
getAbfsClient().deleteBlobPath(path, leaseId, tracingContext);
operated = true;
} finally {
if (abfsLease != null) {
Expand Down Expand Up @@ -489,7 +489,7 @@ private void copyPath(final Path src, final Path dst, final String leaseId)
throws AzureBlobFileSystemException {
String copyId;
try {
AbfsRestOperation copyPathOp = abfsClient.copyBlob(src, dst, leaseId,
AbfsRestOperation copyPathOp = getAbfsClient().copyBlob(src, dst, leaseId,
tracingContext);
final String progress = copyPathOp.getResult()
.getResponseHeader(X_MS_COPY_STATUS);
Expand All @@ -500,10 +500,10 @@ private void copyPath(final Path src, final Path dst, final String leaseId)
.getResponseHeader(X_MS_COPY_ID);
} catch (AbfsRestOperationException ex) {
if (ex.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
AbfsRestOperation dstPathStatus = abfsClient.getPathStatus(
AbfsRestOperation dstPathStatus = getAbfsClient().getPathStatus(
dst.toUri().getPath(),
tracingContext, null, false);
final String srcCopyPath = ROOT_PATH + abfsClient.getFileSystem()
final String srcCopyPath = ROOT_PATH + getAbfsClient().getFileSystem()
+ src.toUri().getPath();
if (dstPathStatus.getResult() != null && (srcCopyPath.equals(
getDstSource(dstPathStatus)))) {
Expand All @@ -512,7 +512,7 @@ private void copyPath(final Path src, final Path dst, final String leaseId)
}
throw ex;
}
final long pollWait = abfsClient.getAbfsConfiguration()
final long pollWait = getAbfsClient().getAbfsConfiguration()
.getBlobCopyProgressPollWaitMillis();
while (handleCopyInProgress(dst, tracingContext, copyId)
== BlobCopyProgress.PENDING) {
Expand Down Expand Up @@ -563,7 +563,7 @@ private String getDstSource(final AbfsRestOperation dstPathStatus) {
public BlobCopyProgress handleCopyInProgress(final Path dstPath,
final TracingContext tracingContext,
final String copyId) throws AzureBlobFileSystemException {
AbfsRestOperation op = abfsClient.getPathStatus(dstPath.toUri().getPath(),
AbfsRestOperation op = getAbfsClient().getPathStatus(dstPath.toUri().getPath(),
tracingContext, null, false);

if (op.getResult() != null && copyId.equals(
Expand Down Expand Up @@ -627,11 +627,11 @@ private PathInformation getPathInformation(Path path,
TracingContext tracingContext)
throws AzureBlobFileSystemException {
try {
AbfsRestOperation op = abfsClient.getPathStatus(path.toString(),
AbfsRestOperation op = getAbfsClient().getPathStatus(path.toString(),
tracingContext, null, true);

return new PathInformation(true,
abfsClient.checkIsDir(op.getResult()),
getAbfsClient().checkIsDir(op.getResult()),
extractEtagHeader(op.getResult()),
op.getResult() instanceof AbfsHttpOperation.AbfsHttpOperationWithFixedResultForGetFileStatus);
} catch (AzureBlobFileSystemException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public abstract class ListActionTaker {

private final Path path;

protected final AbfsBlobClient abfsClient;
private final AbfsBlobClient abfsClient;

private final TracingContext tracingContext;

Expand All @@ -80,6 +80,10 @@ public ListActionTaker(Path path,
getMaxConsumptionParallelism());
}

public AbfsBlobClient getAbfsClient() {
return abfsClient;
}

/** Get the maximum number of parallelism for consumption.
*
* @return the maximum number of parallelism for consumption.
Expand Down Expand Up @@ -134,7 +138,7 @@ private boolean takeAction(List<Path> paths)
*/
public boolean listRecursiveAndTakeAction()
throws AzureBlobFileSystemException {
AbfsConfiguration configuration = abfsClient.getAbfsConfiguration();
AbfsConfiguration configuration = getAbfsClient().getAbfsConfiguration();
Thread producerThread = null;
try {
ListBlobQueue listBlobQueue = createListBlobQueue(configuration);
Expand Down Expand Up @@ -223,7 +227,7 @@ protected String listAndEnqueue(final ListBlobQueue listBlobQueue,
}
final AbfsRestOperation op;
try {
op = abfsClient.listPath(path.toUri().getPath(),
op = getAbfsClient().listPath(path.toUri().getPath(),
true,
queueAvailableSizeForProduction, continuationToken,
tracingContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ public void copy(PathInformation pathInformation) {
this.isImplicit = pathInformation.getIsImplicit();
}

/** Setters and Getters */
/** Setters and Getters
*
* @return Boolean Values*/
public String getETag() {
return eTag;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ void createRenamePendingJson(Path path, byte[] bytes)

List<String> blockIdList = new ArrayList<>(Collections.singleton(blockId));
// PutBlockList on the path.
String blockList = "";//generateBlockListXml(blockIdList);
String blockList = ""; //generateBlockListXml(blockIdList);
abfsClient.flush(blockList.getBytes(StandardCharsets.UTF_8),
path.toUri().getPath(), true, null, null, eTag, null, tracingContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,9 @@ public void testDeleteIdempotencyTriggerHttp404() throws Exception {
Mockito.anyInt(), Mockito.nullable(String.class),
Mockito.nullable(TracingContext.class),
Mockito.anyBoolean());
doCallRealMethod().when((AbfsBlobClient) mockClient).getPathStatus(Mockito.nullable(String.class), Mockito.nullable(TracingContext.class), Mockito.nullable(
ContextEncryptionAdapter.class), Mockito.anyBoolean());
doCallRealMethod().when((AbfsBlobClient) mockClient)
.getPathStatus(Mockito.nullable(String.class), Mockito.nullable(TracingContext.class),
Mockito.nullable(ContextEncryptionAdapter.class), Mockito.anyBoolean());
}
when(mockClient.deletePath("/NonExistingPath", false, null,
tracingContext, fs.getIsNamespaceEnabled(tracingContext)))
Expand Down Expand Up @@ -348,6 +349,7 @@ public void deleteBlobDirParallelThreadToDeleteOnDifferentTracingContext()
private void assumeBlobClient() throws IOException {
assertTrue(getFileSystem().getAbfsClient() instanceof AbfsBlobClient);
}

/**
* Assert that deleting an implicit directory delete all the children of the
* folder.
Expand All @@ -366,8 +368,10 @@ public void testDeleteImplicitDir() throws Exception {
.describedAs("FileStatus of the deleted directory should not exist")
.isTrue();
Assertions.assertThat(!fs.exists(new Path("/testDir/dir1/file1")))
.describedAs("Child of a deleted directory should not be present");
.describedAs("Child of a deleted directory should not be present")
.isTrue();
}

/**
* Assert deleting an implicit directory, for which paginated list is required.
*/
Expand Down Expand Up @@ -401,6 +405,7 @@ public void testDeleteImplicitDirWithSingleListResults() throws Exception {
.describedAs("FileStatus of the deleted directory should not exist")
.isFalse();
}

/**
* Assert deleting of the only child of an implicit directory ensures that the
* parent directory's marker is present.
Expand All @@ -425,6 +430,24 @@ public void testDeleteExplicitDirInImplicitParentDir() throws Exception {
.describedAs("Parent Implicit directory should exist")
.isTrue();
}

/**
* Tests the scenario where a parallel blob deletion fails due to an access denied exception.
* This test simulates a failure while deleting blobs in a directory in parallel, where
* an exception is thrown when attempting to delete the blobs.
* <p>
* The test sets up a directory (`/testDir`) with multiple files (`file1`, `file2`, `file3`)
* and attempts to delete the directory using the `fs.delete` method with the recursive flag
* set to `true`. During the deletion process, the mock `deleteBlobPath` method of the
* `AbfsBlobClient` is set to throw an `AbfsRestOperationException` with a `HTTP_FORBIDDEN`
* status, simulating an access denial error. The test expects an `AccessDeniedException` to
* be thrown when the deletion is attempted.
* <p>
* The purpose of this test is to ensure that the system properly handles access denied errors
* when attempting to delete blobs in parallel.
*
* @throws Exception If an error occurs during the file system operations or mock setup.
*/
@Test
public void testDeleteParallelBlobFailure() throws Exception {
AzureBlobFileSystem fs = Mockito.spy(getFileSystem());
Expand All @@ -448,12 +471,27 @@ public void testDeleteParallelBlobFailure() throws Exception {
fs.delete(new Path("/testDir"), true);
});
}

/**
* Tests the deletion of the root directory with the non-recursive option set to false.
* This test verifies that when attempting to delete the root directory (or any
* non-empty directory) without recursion, the operation should fail, returning
* false as it cannot delete a non-empty directory without recursion.
* <p>
* The test sets up a directory `/testDir` inside the root directory and then
* attempts to delete the root directory (`ROOT_PATH`) without the recursive flag.
* Since the root directory is not empty and the non-recursive option is set to false,
* the delete operation is expected to fail.
*
* @throws Exception If an error occurs during file system operations.
*/
@Test
public void testDeleteRootWithNonRecursion() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
fs.mkdirs(new Path("/testDir"));
Assertions.assertThat(fs.delete(new Path(ROOT_PATH), false)).isFalse();
}

/**
* Assert that delete operation failure should stop List producer.
*/
Expand Down Expand Up @@ -484,7 +522,6 @@ public void testProducerStopOnDeleteFailure() throws Exception {
store.setClient(spiedClient);
Mockito.doReturn(store).when(fs).getAbfsStore();
final int[] deleteCallInvocation = new int[1];
deleteCallInvocation[0] = 0;
Mockito.doAnswer(answer -> {
throw new AbfsRestOperationException(HTTP_FORBIDDEN, "", "",
new Exception());
Expand All @@ -508,10 +545,9 @@ public void testProducerStopOnDeleteFailure() throws Exception {
return null;
});
final int[] listCallInvocation = new int[1];
listCallInvocation[0] = 0;
Mockito.doAnswer(answer -> {
if (listCallInvocation[0] == 1) {
while (deleteCallInvocation[0] == 0) ;
while (deleteCallInvocation[0] == 0) {}
}
listCallInvocation[0]++;
return answer.callRealMethod();
Expand All @@ -527,6 +563,7 @@ public void testProducerStopOnDeleteFailure() throws Exception {
.listPath(Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyInt(),
Mockito.nullable(String.class), Mockito.any(TracingContext.class));
}

/**
* Test to assert that the CID in src marker delete contains the
* total number of blobs operated in the delete directory.
Expand All @@ -548,9 +585,7 @@ public void testDeleteEmitDeletionCountInClientRequestId() throws Exception {
List<Future> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final int iter = i;
Future future = executorService.submit(() -> {
return fs.create(new Path("/testDir/dir1/file" + iter));
});
Future future = executorService.submit(() -> fs.create(new Path("/testDir/dir1/file" + iter)));
futures.add(future);
}
for (Future future : futures) {
Expand Down
Loading

0 comments on commit 83fc86a

Please sign in to comment.