diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java index 5dd880001735d..93cf42287b08b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java @@ -75,7 +75,10 @@ import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; -import static java.net.HttpURLConnection.*; +import static java.net.HttpURLConnection.HTTP_OK; +import static java.net.HttpURLConnection.HTTP_NOT_FOUND; +import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR; +import static java.net.HttpURLConnection.HTTP_PRECON_FAILED; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ACQUIRE_LEASE_ACTION; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPLICATION_JSON; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPLICATION_OCTET_STREAM; @@ -557,7 +560,7 @@ public AbfsRestOperation breakLease(final String path, * @return AbfsClientRenameResult result of rename operation indicating the * AbfsRest operation, rename recovery and incomplete metadata state failure. * - * @throws AzureBlobFileSystemException failure, excluding any recovery from overload failures. + * @throws IOException failure, excluding any recovery from overload failures. */ @Override public AbfsClientRenameResult renamePath(final String source, @@ -572,7 +575,7 @@ public AbfsClientRenameResult renamePath(final String source, destination, sourceEtag, isAtomicRenameKey(source), tracingContext ); incrementAbfsRenamePath(); - if( blobRenameHandler.execute()) { + if (blobRenameHandler.execute()) { final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); final URL url = createRequestUrl(destination, abfsUriQueryBuilder.toString()); final List requestHeaders = createDefaultHeaders(); @@ -1426,6 +1429,7 @@ public boolean isAtomicRenameKey(String key) { * Action to be taken when atomic-key is present on a getPathStatus path. * * @param path path of the pendingJson for the atomic path. + * @param pathLease lease on the path. * @param tracingContext tracing context. * * @throws AzureBlobFileSystemException server error or the path is renamePending json file and action is taken. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobDeleteHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobDeleteHandler.java index fe929e5e4526b..c04c8dbdbb072 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobDeleteHandler.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobDeleteHandler.java @@ -71,7 +71,7 @@ public BlobDeleteHandler(final Path path, */ @Override int getMaxConsumptionParallelism() { - return abfsClient.getAbfsConfiguration() + return getAbfsClient().getAbfsConfiguration() .getBlobDeleteDirConsumptionParallelism(); } @@ -83,7 +83,7 @@ int getMaxConsumptionParallelism() { */ private boolean deleteInternal(final Path path) throws AzureBlobFileSystemException { - abfsClient.deleteBlobPath(path, null, tracingContext); + getAbfsClient().deleteBlobPath(path, null, tracingContext); deleteCount.incrementAndGet(); return true; } @@ -151,7 +151,7 @@ private void ensurePathParentExist() throws AzureBlobFileSystemException { if (!path.isRoot() && !path.getParent().isRoot()) { try { - abfsClient.createPath(path.getParent().toUri().getPath(), + getAbfsClient().createPath(path.getParent().toUri().getPath(), false, false, null, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java index 4db632f4a4f6d..fd86ea0085887 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java @@ -120,7 +120,7 @@ public BlobRenameHandler(final String src, @Override int getMaxConsumptionParallelism() { - return abfsClient.getAbfsConfiguration() + return getAbfsClient().getAbfsConfiguration() .getBlobRenameDirConsumptionParallelism(); } @@ -137,7 +137,7 @@ public boolean execute() throws AzureBlobFileSystemException { RenameAtomicity renameAtomicity = null; if (pathInformation.getIsDirectory() && pathInformation.getIsImplicit()) { - AbfsRestOperation createMarkerOp = abfsClient.createPath(src.toUri().getPath(), + AbfsRestOperation createMarkerOp = getAbfsClient().createPath(src.toUri().getPath(), false, false, null, false, null, null, tracingContext); pathInformation.setETag(extractEtagHeader(createMarkerOp.getResult())); @@ -217,7 +217,7 @@ public RenameAtomicity getRenameAtomicity(final PathInformation pathInformation) new Path(src.getParent(), src.getName() + RenameAtomicity.SUFFIX), tracingContext, pathInformation.getETag(), - abfsClient); + getAbfsClient()); } /** Takes a lease on the path. @@ -230,8 +230,8 @@ public RenameAtomicity getRenameAtomicity(final PathInformation pathInformation) */ private AbfsLease takeLease(final Path path, final String eTag) throws AzureBlobFileSystemException { - AbfsLease lease = new AbfsLease(abfsClient, path.toUri().getPath(), false, - abfsClient.getAbfsConfiguration() + AbfsLease lease = new AbfsLease(getAbfsClient(), path.toUri().getPath(), false, + getAbfsClient().getAbfsConfiguration() .getAtomicRenameLeaseRefreshDuration(), eTag, tracingContext); leases.add(lease); @@ -460,7 +460,7 @@ private boolean renameInternal(final Path path, boolean operated = false; try { copyPath(path, destinationPathForBlobPartOfRenameSrcDir, leaseId); - abfsClient.deleteBlobPath(path, leaseId, tracingContext); + getAbfsClient().deleteBlobPath(path, leaseId, tracingContext); operated = true; } finally { if (abfsLease != null) { @@ -489,7 +489,7 @@ private void copyPath(final Path src, final Path dst, final String leaseId) throws AzureBlobFileSystemException { String copyId; try { - AbfsRestOperation copyPathOp = abfsClient.copyBlob(src, dst, leaseId, + AbfsRestOperation copyPathOp = getAbfsClient().copyBlob(src, dst, leaseId, tracingContext); final String progress = copyPathOp.getResult() .getResponseHeader(X_MS_COPY_STATUS); @@ -500,10 +500,10 @@ private void copyPath(final Path src, final Path dst, final String leaseId) .getResponseHeader(X_MS_COPY_ID); } catch (AbfsRestOperationException ex) { if (ex.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) { - AbfsRestOperation dstPathStatus = abfsClient.getPathStatus( + AbfsRestOperation dstPathStatus = getAbfsClient().getPathStatus( dst.toUri().getPath(), tracingContext, null, false); - final String srcCopyPath = ROOT_PATH + abfsClient.getFileSystem() + final String srcCopyPath = ROOT_PATH + getAbfsClient().getFileSystem() + src.toUri().getPath(); if (dstPathStatus.getResult() != null && (srcCopyPath.equals( getDstSource(dstPathStatus)))) { @@ -512,7 +512,7 @@ private void copyPath(final Path src, final Path dst, final String leaseId) } throw ex; } - final long pollWait = abfsClient.getAbfsConfiguration() + final long pollWait = getAbfsClient().getAbfsConfiguration() .getBlobCopyProgressPollWaitMillis(); while (handleCopyInProgress(dst, tracingContext, copyId) == BlobCopyProgress.PENDING) { @@ -563,7 +563,7 @@ private String getDstSource(final AbfsRestOperation dstPathStatus) { public BlobCopyProgress handleCopyInProgress(final Path dstPath, final TracingContext tracingContext, final String copyId) throws AzureBlobFileSystemException { - AbfsRestOperation op = abfsClient.getPathStatus(dstPath.toUri().getPath(), + AbfsRestOperation op = getAbfsClient().getPathStatus(dstPath.toUri().getPath(), tracingContext, null, false); if (op.getResult() != null && copyId.equals( @@ -627,11 +627,11 @@ private PathInformation getPathInformation(Path path, TracingContext tracingContext) throws AzureBlobFileSystemException { try { - AbfsRestOperation op = abfsClient.getPathStatus(path.toString(), + AbfsRestOperation op = getAbfsClient().getPathStatus(path.toString(), tracingContext, null, true); return new PathInformation(true, - abfsClient.checkIsDir(op.getResult()), + getAbfsClient().checkIsDir(op.getResult()), extractEtagHeader(op.getResult()), op.getResult() instanceof AbfsHttpOperation.AbfsHttpOperationWithFixedResultForGetFileStatus); } catch (AzureBlobFileSystemException e) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListActionTaker.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListActionTaker.java index 9c41eb36a2367..0bfd25073b8b2 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListActionTaker.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListActionTaker.java @@ -55,7 +55,7 @@ public abstract class ListActionTaker { private final Path path; - protected final AbfsBlobClient abfsClient; + private final AbfsBlobClient abfsClient; private final TracingContext tracingContext; @@ -80,6 +80,10 @@ public ListActionTaker(Path path, getMaxConsumptionParallelism()); } + public AbfsBlobClient getAbfsClient() { + return abfsClient; + } + /** Get the maximum number of parallelism for consumption. * * @return the maximum number of parallelism for consumption. @@ -134,7 +138,7 @@ private boolean takeAction(List paths) */ public boolean listRecursiveAndTakeAction() throws AzureBlobFileSystemException { - AbfsConfiguration configuration = abfsClient.getAbfsConfiguration(); + AbfsConfiguration configuration = getAbfsClient().getAbfsConfiguration(); Thread producerThread = null; try { ListBlobQueue listBlobQueue = createListBlobQueue(configuration); @@ -223,7 +227,7 @@ protected String listAndEnqueue(final ListBlobQueue listBlobQueue, } final AbfsRestOperation op; try { - op = abfsClient.listPath(path.toUri().getPath(), + op = getAbfsClient().listPath(path.toUri().getPath(), true, queueAvailableSizeForProduction, continuationToken, tracingContext); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/PathInformation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/PathInformation.java index 95a733eae1cdd..b0548497d0372 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/PathInformation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/PathInformation.java @@ -55,7 +55,9 @@ public void copy(PathInformation pathInformation) { this.isImplicit = pathInformation.getIsImplicit(); } - /** Setters and Getters */ + /** Setters and Getters + * + * @return Boolean Values*/ public String getETag() { return eTag; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java index bb03c61c5d8f7..31521e0ca671e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java @@ -230,7 +230,7 @@ void createRenamePendingJson(Path path, byte[] bytes) List blockIdList = new ArrayList<>(Collections.singleton(blockId)); // PutBlockList on the path. - String blockList = "";//generateBlockListXml(blockIdList); + String blockList = ""; //generateBlockListXml(blockIdList); abfsClient.flush(blockList.getBytes(StandardCharsets.UTF_8), path.toUri().getPath(), true, null, null, eTag, null, tracingContext); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java index a5ac42eb0b25b..4886e3502ff46 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java @@ -301,8 +301,9 @@ public void testDeleteIdempotencyTriggerHttp404() throws Exception { Mockito.anyInt(), Mockito.nullable(String.class), Mockito.nullable(TracingContext.class), Mockito.anyBoolean()); - doCallRealMethod().when((AbfsBlobClient) mockClient).getPathStatus(Mockito.nullable(String.class), Mockito.nullable(TracingContext.class), Mockito.nullable( - ContextEncryptionAdapter.class), Mockito.anyBoolean()); + doCallRealMethod().when((AbfsBlobClient) mockClient) + .getPathStatus(Mockito.nullable(String.class), Mockito.nullable(TracingContext.class), + Mockito.nullable(ContextEncryptionAdapter.class), Mockito.anyBoolean()); } when(mockClient.deletePath("/NonExistingPath", false, null, tracingContext, fs.getIsNamespaceEnabled(tracingContext))) @@ -348,6 +349,7 @@ public void deleteBlobDirParallelThreadToDeleteOnDifferentTracingContext() private void assumeBlobClient() throws IOException { assertTrue(getFileSystem().getAbfsClient() instanceof AbfsBlobClient); } + /** * Assert that deleting an implicit directory delete all the children of the * folder. @@ -366,8 +368,10 @@ public void testDeleteImplicitDir() throws Exception { .describedAs("FileStatus of the deleted directory should not exist") .isTrue(); Assertions.assertThat(!fs.exists(new Path("/testDir/dir1/file1"))) - .describedAs("Child of a deleted directory should not be present"); + .describedAs("Child of a deleted directory should not be present") + .isTrue(); } + /** * Assert deleting an implicit directory, for which paginated list is required. */ @@ -401,6 +405,7 @@ public void testDeleteImplicitDirWithSingleListResults() throws Exception { .describedAs("FileStatus of the deleted directory should not exist") .isFalse(); } + /** * Assert deleting of the only child of an implicit directory ensures that the * parent directory's marker is present. @@ -425,6 +430,24 @@ public void testDeleteExplicitDirInImplicitParentDir() throws Exception { .describedAs("Parent Implicit directory should exist") .isTrue(); } + + /** + * Tests the scenario where a parallel blob deletion fails due to an access denied exception. + * This test simulates a failure while deleting blobs in a directory in parallel, where + * an exception is thrown when attempting to delete the blobs. + *

+ * The test sets up a directory (`/testDir`) with multiple files (`file1`, `file2`, `file3`) + * and attempts to delete the directory using the `fs.delete` method with the recursive flag + * set to `true`. During the deletion process, the mock `deleteBlobPath` method of the + * `AbfsBlobClient` is set to throw an `AbfsRestOperationException` with a `HTTP_FORBIDDEN` + * status, simulating an access denial error. The test expects an `AccessDeniedException` to + * be thrown when the deletion is attempted. + *

+ * The purpose of this test is to ensure that the system properly handles access denied errors + * when attempting to delete blobs in parallel. + * + * @throws Exception If an error occurs during the file system operations or mock setup. + */ @Test public void testDeleteParallelBlobFailure() throws Exception { AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); @@ -448,12 +471,27 @@ public void testDeleteParallelBlobFailure() throws Exception { fs.delete(new Path("/testDir"), true); }); } + + /** + * Tests the deletion of the root directory with the non-recursive option set to false. + * This test verifies that when attempting to delete the root directory (or any + * non-empty directory) without recursion, the operation should fail, returning + * false as it cannot delete a non-empty directory without recursion. + *

+ * The test sets up a directory `/testDir` inside the root directory and then + * attempts to delete the root directory (`ROOT_PATH`) without the recursive flag. + * Since the root directory is not empty and the non-recursive option is set to false, + * the delete operation is expected to fail. + * + * @throws Exception If an error occurs during file system operations. + */ @Test public void testDeleteRootWithNonRecursion() throws Exception { AzureBlobFileSystem fs = getFileSystem(); fs.mkdirs(new Path("/testDir")); Assertions.assertThat(fs.delete(new Path(ROOT_PATH), false)).isFalse(); } + /** * Assert that delete operation failure should stop List producer. */ @@ -484,7 +522,6 @@ public void testProducerStopOnDeleteFailure() throws Exception { store.setClient(spiedClient); Mockito.doReturn(store).when(fs).getAbfsStore(); final int[] deleteCallInvocation = new int[1]; - deleteCallInvocation[0] = 0; Mockito.doAnswer(answer -> { throw new AbfsRestOperationException(HTTP_FORBIDDEN, "", "", new Exception()); @@ -508,10 +545,9 @@ public void testProducerStopOnDeleteFailure() throws Exception { return null; }); final int[] listCallInvocation = new int[1]; - listCallInvocation[0] = 0; Mockito.doAnswer(answer -> { if (listCallInvocation[0] == 1) { - while (deleteCallInvocation[0] == 0) ; + while (deleteCallInvocation[0] == 0) {} } listCallInvocation[0]++; return answer.callRealMethod(); @@ -527,6 +563,7 @@ public void testProducerStopOnDeleteFailure() throws Exception { .listPath(Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyInt(), Mockito.nullable(String.class), Mockito.any(TracingContext.class)); } + /** * Test to assert that the CID in src marker delete contains the * total number of blobs operated in the delete directory. @@ -548,9 +585,7 @@ public void testDeleteEmitDeletionCountInClientRequestId() throws Exception { List futures = new ArrayList<>(); for (int i = 0; i < 10; i++) { final int iter = i; - Future future = executorService.submit(() -> { - return fs.create(new Path("/testDir/dir1/file" + iter)); - }); + Future future = executorService.submit(() -> fs.create(new Path("/testDir/dir1/file" + iter))); futures.add(future); } for (Future future : futures) { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index 4aa437434b149..76e74d1f82c18 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -321,6 +321,22 @@ public void testPosixRenameDirectoryWhereDirectoryAlreadyThereOnDestination() assertFalse(fs.exists(new Path("testDir2/test1/test2/test3/file"))); } } + + /** + *

+   * Test to check behaviour of rename API if the destination directory is already
+   * there. The HNS call and the one for Blob endpoint should have same behaviour.
+   *
+   * /testDir2/test1/test2/test3 contains (/file)
+   * There is another path that exists: /testDir2/test4/test3
+   * On rename(/testDir2/test1/test2/test3, /testDir2/test4).
+   * 
+ * + * Expectation for HNS / Blob endpoint:
    + *
  1. Rename should fail
  2. + *
  3. No file should be transferred to destination directory
  4. + *
+ */ @Test public void testPosixRenameDirectoryWherePartAlreadyThereOnDestination() throws Exception { @@ -346,6 +362,7 @@ public void testPosixRenameDirectoryWherePartAlreadyThereOnDestination() assertFalse(fs.exists(new Path("testDir2/test1/test2/test3/file"))); assertFalse(fs.exists(new Path("testDir2/test1/test2/test3/file1"))); } + /** * Test that after completing rename for a directory which is enabled for * AtomicRename, the RenamePending JSON file is deleted. @@ -378,9 +395,11 @@ public void testRenamePendingJsonIsRemovedPostSuccessfulRename() Mockito.any(TracingContext.class)); assertTrue(fs.rename(new Path("hbase/test1/test2/test3"), new Path("hbase/test4"))); - assertTrue("RenamePendingJson should be deleted", - correctDeletePathCount[0] == 1); + assertEquals("RenamePendingJson should be deleted", + 1, + (int) correctDeletePathCount[0]); } + private AbfsClient addSpyHooksOnClient(final AzureBlobFileSystem fs) { AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); Mockito.doReturn(store).when(fs).getAbfsStore(); @@ -388,6 +407,7 @@ private AbfsClient addSpyHooksOnClient(final AzureBlobFileSystem fs) { Mockito.doReturn(client).when(store).getClient(); return client; } + /** * Test for a directory in /hbase directory. To simulate the crash of process, * test will throw an exception with 403 on a copy of one of the blob.
@@ -414,6 +434,7 @@ public void testHBaseHandlingForFailedRenameWithListRecovery() return null; }); } + /** * Test for a directory in /hbase directory. To simulate the crash of process, * test will throw an exception with 403 on a copy of one of the blob. The @@ -442,6 +463,7 @@ public void testHBaseHandlingForFailedRenameWithGetFileStatusRecovery() return null; }); } + private void crashRenameAndRecover(final AzureBlobFileSystem fs, AbfsBlobClient client, final String srcPath, @@ -452,7 +474,6 @@ private void crashRenameAndRecover(final AzureBlobFileSystem fs, fs2.setWorkingDirectory(new Path(ROOT_PATH)); client = (AbfsBlobClient) addSpyHooksOnClient(fs2); int[] renameJsonDeleteCounter = new int[1]; - renameJsonDeleteCounter[0] = 0; Mockito.doAnswer(answer -> { if ((ROOT_PATH + srcPath + SUFFIX) .equalsIgnoreCase(((Path) answer.getArgument(0)).toUri().getPath())) { @@ -476,6 +497,7 @@ private void crashRenameAndRecover(final AzureBlobFileSystem fs, assertFalse(fs2.exists(new Path("hbase/test1/test2/test3/file1"))); assertTrue(fs2.exists(new Path("hbase/test4/test2/test3/file1"))); } + private void crashRename(final AzureBlobFileSystem fs, final AbfsBlobClient client, final String srcPath) throws Exception { @@ -502,6 +524,7 @@ private void crashRename(final AzureBlobFileSystem fs, lease.free(); } } + /** * Simulates a scenario where HMaster in Hbase starts up and executes listStatus * API on the directory that has to be renamed by some other executor-machine. @@ -519,6 +542,7 @@ public void testHbaseListStatusBeforeRenamePendingFileAppendedWithIngressOnBlob( testRenamePreRenameFailureResolution(fs); testAtomicityRedoInvalidFile(fs); } + private void testRenamePreRenameFailureResolution(final AzureBlobFileSystem fs) throws Exception { AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); @@ -554,6 +578,7 @@ private void testRenamePreRenameFailureResolution(final AzureBlobFileSystem fs) .describedAs("Creation of RenamePendingJson should be attempted twice") .isEqualTo(2); } + private void testAtomicityRedoInvalidFile(final AzureBlobFileSystem fs) throws Exception { AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); @@ -565,7 +590,6 @@ private void testAtomicityRedoInvalidFile(final AzureBlobFileSystem fs) os.write("{".getBytes(StandardCharsets.UTF_8)); os.close(); int[] renameJsonDeleteCounter = new int[1]; - renameJsonDeleteCounter[0] = 0; Mockito.doAnswer(deleteAnswer -> { Path ansPath = deleteAnswer.getArgument(0); if (renameJson.toUri() @@ -587,6 +611,11 @@ private void testAtomicityRedoInvalidFile(final AzureBlobFileSystem fs) Mockito.any(Path.class), Mockito.nullable(String.class), Mockito.any(TracingContext.class)); } + + /** + * Test to check the atomicity of rename operation. The rename operation should + * be atomic and should not leave any intermediate state. + */ @Test public void testRenameJsonDeletedBeforeRenameAtomicityCanDelete() throws Exception { @@ -601,7 +630,6 @@ public void testRenameJsonDeletedBeforeRenameAtomicityCanDelete() os.write("{}".getBytes(StandardCharsets.UTF_8)); os.close(); int[] renameJsonDeleteCounter = new int[1]; - renameJsonDeleteCounter[0] = 0; Mockito.doAnswer(deleteAnswer -> { Path ansPath = deleteAnswer.getArgument(0); if (renameJson.toUri() @@ -618,6 +646,23 @@ public void testRenameJsonDeletedBeforeRenameAtomicityCanDelete() new RenameAtomicity(renameJson, 2, getTestTracingContext(fs, true), null, client, null); } + + /** + * Tests the scenario where the rename operation is complete before the redo + * operation for atomicity, leading to a failure. This test verifies that the + * system correctly handles the case when a rename operation is attempted after + * the source path has already been deleted, which should result in an error. + *

+ * The test simulates a situation where a `renameJson` file is created for the + * rename operation, and the source path is deleted during the read process in + * the redo operation. The `redoRenameAtomicity` is then executed, and it is + * expected to fail with a `404` error, indicating that the source path no longer exists. + *

+ * The test ensures that the system can handle this error condition and return + * the correct response, preventing a potentially invalid or inconsistent state. + * + * @throws Exception If an error occurs during file system operations. + */ @Test public void testRenameCompleteBeforeRenameAtomicityRedo() throws Exception { final AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem()); @@ -645,9 +690,7 @@ public void testRenameCompleteBeforeRenameAtomicityRedo() throws Exception { return bytes; }); AbfsRestOperationException ex = intercept(AbfsRestOperationException.class, - () -> { - redoRenameAtomicity.redo(); - }); + redoRenameAtomicity::redo); Assertions.assertThat(ex.getStatusCode()) .describedAs("RenameAtomicity redo should fail with 404") .isEqualTo(SOURCE_PATH_NOT_FOUND.getStatusCode()); @@ -655,6 +698,21 @@ public void testRenameCompleteBeforeRenameAtomicityRedo() throws Exception { .describedAs("RenameAtomicity redo should fail with 404") .isEqualTo(SOURCE_PATH_NOT_FOUND); } + + /** + * Tests the idempotency of the copyBlob operation during a rename when the + * destination already exists. This test simulates a scenario where the source + * blob is copied to the destination before the actual rename operation is invoked. + * It ensures that the copyBlob operation can handle idempotency issues and perform + * the rename successfully even when the destination is pre-created. + *

+ * The test verifies that the rename operation successfully copies the blob from + * the source to the destination, and the source is deleted, leaving only the + * destination file. This ensures that the system behaves correctly in scenarios + * where the destination path already contains the blob. + * + * @throws Exception If an error occurs during file system operations. + */ @Test public void testCopyBlobIdempotency() throws Exception { final AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem()); @@ -690,6 +748,21 @@ public void testCopyBlobIdempotency() throws Exception { .describedAs("Destination should exist after rename") .isTrue(); } + + /** + * Tests the idempotency of the rename operation when the destination path is + * created by some other process before the rename operation. This test simulates + * the scenario where a source blob is renamed, and the destination path already + * exists due to actions from another process. It ensures that the rename operation + * behaves idempotently and correctly handles the case where the destination is + * pre-created. + *

+ * The test verifies that the rename operation fails (since the destination already + * exists), but the source path remains intact, and the blob copy operation is able + * to handle the idempotency issue. + * + * @throws IOException If an error occurs during file system operations. + */ @Test public void testRenameBlobIdempotencyWhereDstIsCreatedFromSomeOtherProcess() throws IOException { @@ -715,6 +788,15 @@ public void testRenameBlobIdempotencyWhereDstIsCreatedFromSomeOtherProcess() .describedAs("Source should exist after rename failure") .isTrue(); } + + /** + * Tests renaming a directory when the destination directory is missing a marker blob. + * This test involves creating multiple directories and files, deleting a blob (marker) in the + * destination directory, and renaming the source directory to the destination. + * It then verifies that the renamed directory exists at the expected destination path. + * + * @throws Exception If an error occurs during the file system operations or assertions. + */ @Test public void testRenameDirWhenMarkerBlobIsAbsentOnDstDir() throws Exception { AzureBlobFileSystem fs = getFileSystem(); @@ -730,6 +812,15 @@ public void testRenameDirWhenMarkerBlobIsAbsentOnDstDir() throws Exception { fs.rename(new Path("/test4"), new Path("/test1/test2")); assertTrue(fs.exists(new Path("/test1/test2/test4/test5"))); } + + /** + * Tests the renaming of a directory when the source directory does not have a marker file. + * This test creates a file within a source directory, deletes the source directory from the blob storage, + * creates a new target directory, and renames the source directory to the target location. + * It verifies that the renamed source directory exists in the target path. + * + * @throws Exception If an error occurs during the file system operations or assertions. + */ @Test public void testBlobRenameSrcDirHasNoMarker() throws Exception { AzureBlobFileSystem fs = getFileSystem(); @@ -742,6 +833,16 @@ public void testBlobRenameSrcDirHasNoMarker() throws Exception { fs.rename(new Path("/test1"), new Path("/test2")); assertTrue(fs.exists(new Path("/test2/test1"))); } + + /** + * Mocks the progress status for a copy blob operation. + * This method simulates a copy operation that is pending and not yet completed. + * It intercepts the `copyBlob` method and modifies its response to return a "COPY_STATUS_PENDING" + * status for the copy operation. + * + * @param spiedClient The {@link AbfsBlobClient} instance that is being spied on. + * @throws AzureBlobFileSystemException if the mock setup fails. + */ private void addMockForProgressStatusOnCopyOperation(final AbfsBlobClient spiedClient) throws AzureBlobFileSystemException { Mockito.doAnswer(answer -> { @@ -757,6 +858,20 @@ private void addMockForProgressStatusOnCopyOperation(final AbfsBlobClient spiedC .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), Mockito.nullable(String.class), Mockito.any(TracingContext.class)); } + + /** + * Verifies the behavior of a blob copy operation that takes time to complete. + * The test ensures the following: + *

    + *
  • A file is created and a rename operation is initiated.
  • + *
  • The copy operation progress is mocked to simulate a time-consuming process.
  • + *
  • The rename operation triggers a call to handle the copy progress.
  • + *
  • The test checks that the file exists after the rename and that the + * `handleCopyInProgress` method is called exactly once.
  • + *
+ * + * @throws Exception if an error occurs during the test execution + */ @Test public void testCopyBlobTakeTime() throws Exception { AzureBlobFileSystem fileSystem = Mockito.spy(getFileSystem()); @@ -777,6 +892,16 @@ public void testCopyBlobTakeTime() throws Exception { .handleCopyInProgress(Mockito.any(Path.class), Mockito.any(TracingContext.class), Mockito.any(String.class)); } + + /** + * Mocks the final status of a blob copy operation. + * This method ensures that when checking the status of a copy operation in progress, + * it returns the specified final status (e.g., success, failure, aborted). + * + * @param spiedClient The mocked Azure Blob client to apply the mock behavior. + * @param requiredCopyFinalStatus The final status of the copy operation to be returned + * (e.g., COPY_STATUS_FAILED, COPY_STATUS_ABORTED). + */ private void addMockForCopyOperationFinalStatus(final AbfsBlobClient spiedClient, final String requiredCopyFinalStatus) { AbfsClientTestUtil.mockGetRenameBlobHandler(spiedClient, @@ -807,6 +932,19 @@ private void addMockForCopyOperationFinalStatus(final AbfsBlobClient spiedClient return null; }); } + + /** + * Verifies the behavior when a blob copy operation takes time and eventually fails. + * The test ensures the following: + *
    + *
  • A file is created and a copy operation is initiated.
  • + *
  • The copy operation is mocked to eventually fail.
  • + *
  • The rename operation triggers an exception due to the failed copy.
  • + *
  • The test checks that the appropriate 'COPY_FAILED' error code and status code are returned.
  • + *
+ * + * @throws Exception if an error occurs during the test execution + */ @Test public void testCopyBlobTakeTimeAndEventuallyFail() throws Exception { AzureBlobFileSystem fileSystem = Mockito.spy(getFileSystem()); @@ -815,8 +953,7 @@ public void testCopyBlobTakeTimeAndEventuallyFail() throws Exception { fileSystem); addMockForProgressStatusOnCopyOperation(spiedClient); fileSystem.create(new Path("/test1/file")); - final String requiredCopyFinalStatus = COPY_STATUS_FAILED; - addMockForCopyOperationFinalStatus(spiedClient, requiredCopyFinalStatus); + addMockForCopyOperationFinalStatus(spiedClient, COPY_STATUS_FAILED); AbfsRestOperationException ex = intercept(AbfsRestOperationException.class, () -> { fileSystem.rename(new Path("/test1/file"), new Path("/test1/file2")); @@ -828,6 +965,19 @@ public void testCopyBlobTakeTimeAndEventuallyFail() throws Exception { .describedAs("Expecting COPY_FAILED error code") .isEqualTo(COPY_BLOB_FAILED); } + + /** + * Verifies the behavior when a blob copy operation takes time and is eventually aborted. + * The test ensures the following: + *
    + *
  • A file is created and a copy operation is initiated.
  • + *
  • The copy operation is mocked to eventually be aborted.
  • + *
  • The rename operation triggers an exception due to the aborted copy.
  • + *
  • The test checks that the appropriate 'COPY_ABORTED' error code and status code are returned.
  • + *
+ * + * @throws Exception if an error occurs during the test execution + */ @Test public void testCopyBlobTakeTimeAndEventuallyAborted() throws Exception { AzureBlobFileSystem fileSystem = Mockito.spy(getFileSystem()); @@ -836,8 +986,7 @@ public void testCopyBlobTakeTimeAndEventuallyAborted() throws Exception { fileSystem); addMockForProgressStatusOnCopyOperation(spiedClient); fileSystem.create(new Path("/test1/file")); - final String requiredCopyFinalStatus = COPY_STATUS_ABORTED; - addMockForCopyOperationFinalStatus(spiedClient, requiredCopyFinalStatus); + addMockForCopyOperationFinalStatus(spiedClient, COPY_STATUS_ABORTED); AbfsRestOperationException ex = intercept(AbfsRestOperationException.class, () -> { fileSystem.rename(new Path("/test1/file"), new Path("/test1/file2")); @@ -849,6 +998,19 @@ public void testCopyBlobTakeTimeAndEventuallyAborted() throws Exception { .describedAs("Expecting COPY_ABORTED error code") .isEqualTo(COPY_BLOB_ABORTED); } + + /** + * Verifies the behavior when a blob copy operation takes time and the destination blob + * is deleted during the process. The test ensures the following: + *
    + *
  • A source file is created and a copy operation is initiated.
  • + *
  • During the copy process, the destination file is deleted.
  • + *
  • The copy operation returns a pending status.
  • + *
  • The test checks that the destination file does not exist after the copy operation is interrupted.
  • + *
+ * + * @throws Exception if an error occurs during the test execution + */ @Test public void testCopyBlobTakeTimeAndBlobIsDeleted() throws Exception { AzureBlobFileSystem fileSystem = Mockito.spy(getFileSystem()); @@ -874,6 +1036,18 @@ public void testCopyBlobTakeTimeAndBlobIsDeleted() throws Exception { assertFalse(fileSystem.rename(new Path(srcFile), new Path(dstFile))); assertFalse(fileSystem.exists(new Path(dstFile))); } + + /** + * Verifies the behavior when attempting to copy a blob after the source has been deleted + * in the Azure Blob FileSystem. The test ensures the following: + *
    + *
  • A source blob is created and then deleted.
  • + *
  • An attempt to copy the deleted source blob results in a 'not found' error.
  • + *
  • The test checks that the correct HTTP error (404 Not Found) is returned when copying a non-existent source.
  • + *
+ * + * @throws Exception if an error occurs during the test execution + */ @Test public void testCopyAfterSourceHasBeenDeleted() throws Exception { AzureBlobFileSystem fs = getFileSystem(); @@ -896,6 +1070,19 @@ public void testCopyAfterSourceHasBeenDeleted() throws Exception { .describedAs("Source has to be not found at copy") .isEqualTo(HTTP_NOT_FOUND); } + + /** + * Verifies that parallel rename operations in the Azure Blob FileSystem fail when + * trying to perform an atomic rename with lease acquisition. The test ensures the following: + *
    + *
  • A directory is created and a rename operation is attempted.
  • + *
  • A parallel thread attempts to rename the directory while the lease is being acquired.
  • + *
  • The parallel rename operation should fail due to a lease conflict, triggering an exception.
  • + *
  • The test verifies that the expected conflict exception is thrown when attempting a parallel rename.
  • + *
+ * + * @throws Exception if an error occurs during the test execution + */ @Test public void testParallelRenameForAtomicRenameShouldFail() throws Exception { Configuration config = getRawConfiguration(); @@ -914,7 +1101,7 @@ public void testParallelRenameForAtomicRenameShouldFail() throws Exception { Mockito.doAnswer(answer -> { AbfsRestOperation op = (AbfsRestOperation) answer.callRealMethod(); leaseAcquired.set(true); - while (!parallelThreadDone.get()) ; + while (!parallelThreadDone.get()) {} return op; }) .when(client) @@ -922,7 +1109,7 @@ public void testParallelRenameForAtomicRenameShouldFail() throws Exception { Mockito.nullable(String.class), Mockito.any(TracingContext.class)); new Thread(() -> { - while (!leaseAcquired.get()) ; + while (!leaseAcquired.get()) {} try { fs.rename(src, dst); } catch (Exception e) { @@ -937,11 +1124,23 @@ public void testParallelRenameForAtomicRenameShouldFail() throws Exception { } }).start(); fs.rename(src, dst); - while (!parallelThreadDone.get()) ; + while (!parallelThreadDone.get()) {} Assertions.assertThat(exceptionOnParallelRename.get()) .describedAs("Parallel rename should fail") .isTrue(); } + + /** + * Verifies the behavior of appending data to a blob during a rename operation in the + * Azure Blob FileSystem. The test ensures the following: + *
    + *
  • A file is created and data is appended to it while a rename operation is in progress.
  • + *
  • The append operation should fail due to the rename operation in progress.
  • + *
  • The test checks that the append operation is properly interrupted and fails as expected.
  • + *
+ * + * @throws Exception if an error occurs during the test execution + */ @Test public void testAppendAtomicBlobDuringRename() throws Exception { AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); @@ -955,12 +1154,12 @@ public void testAppendAtomicBlobDuringRename() throws Exception { AtomicBoolean appendFailed = new AtomicBoolean(false); Mockito.doAnswer(answer -> { copyInProgress.set(true); - while (!outputStreamClosed.get()) ; + while (!outputStreamClosed.get()) {} return answer.callRealMethod(); }).when(client).copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), Mockito.nullable(String.class), Mockito.any(TracingContext.class)); new Thread(() -> { - while (!copyInProgress.get()) ; + while (!copyInProgress.get()) {} try { os.write(1); os.close(); @@ -975,6 +1174,19 @@ public void testAppendAtomicBlobDuringRename() throws Exception { .describedAs("Append should fail") .isTrue(); } + + /** + * Verifies the behavior of renaming a directory in the Azure Blob FileSystem when + * there is a neighboring directory with the same prefix. The test ensures the following: + *
    + *
  • Two directories with similar prefixes are created, along with files inside them.
  • + *
  • The rename operation moves one directory to a new location.
  • + *
  • Files in the renamed directory are moved, while files in the neighboring directory with the same prefix remain unaffected.
  • + *
  • Correct existence checks are performed to confirm the renamed directory and its files are moved, and the original directory is deleted.
  • + *
+ * + * @throws Exception if an error occurs during the test execution + */ @Test public void testBlobRenameOfDirectoryHavingNeighborWithSamePrefix() throws Exception { @@ -996,6 +1208,20 @@ public void testBlobRenameOfDirectoryHavingNeighborWithSamePrefix() Assertions.assertThat(fs.exists(new Path("/testDir/dir/"))) .isFalse(); } + + /** + * Verifies the behavior of renaming a directory in the Azure Blob FileSystem when + * the `listPath` operation returns paginated results with one object per list. + * The test ensures the following: + *
    + *
  • A directory and its files are created.
  • + *
  • The `listPath` operation is mocked to return one file at a time in each paginated result.
  • + *
  • The rename operation successfully moves the directory and its files to a new location.
  • + *
  • All files are verified to exist in the new location after the rename.
  • + *
+ * + * @throws Exception if an error occurs during the test execution + */ @Test public void testBlobRenameWithListGivingPaginatedResultWithOneObjectPerList() throws Exception { @@ -1025,8 +1251,19 @@ public void testBlobRenameWithListGivingPaginatedResultWithOneObjectPerList() .isTrue(); } } + /** - * Assert that Rename operation failure should stop List producer. + * Verifies that the producer stops on a rename failure due to an access denial + * (HTTP_FORBIDDEN error) in the Azure Blob FileSystem. The test ensures the following: + *
    + *
  • Multiple file creation tasks are submitted concurrently.
  • + *
  • The rename operation is attempted but fails with an access denied exception.
  • + *
  • On failure, the list operation for the source directory is invoked at most twice.
  • + *
+ * The test simulates a failure scenario where the rename operation encounters an access + * denied error, and the list operation should stop after the failure. + * + * @throws Exception if an error occurs during the test execution */ @Test public void testProducerStopOnRenameFailure() throws Exception { @@ -1053,7 +1290,6 @@ public void testProducerStopOnRenameFailure() throws Exception { store.setClient(spiedClient); Mockito.doReturn(store).when(fs).getAbfsStore(); final int[] copyCallInvocation = new int[1]; - copyCallInvocation[0] = 0; Mockito.doAnswer(answer -> { throw new AbfsRestOperationException(HTTP_FORBIDDEN, "", "", new Exception()); @@ -1077,11 +1313,10 @@ public void testProducerStopOnRenameFailure() throws Exception { return null; }); final int[] listCallInvocation = new int[1]; - listCallInvocation[0] = 0; Mockito.doAnswer(answer -> { if (answer.getArgument(0).equals("/src")) { if (listCallInvocation[0] == 1) { - while (copyCallInvocation[0] == 0) ; + while (copyCallInvocation[0] == 0) {} } listCallInvocation[0]++; return getFileSystem().getAbfsClient().listPath(answer.getArgument(0), @@ -1103,6 +1338,18 @@ public void testProducerStopOnRenameFailure() throws Exception { + "Once consumption fails, listing would be stopped.") .isLessThanOrEqualTo(2); } + + /** + * Verifies the behavior of renaming a directory through the Azure Blob FileSystem + * when the source directory is deleted just before the rename operation is resumed. + * It ensures that: + *
    + *
  • No blobs are copied during the resume operation.
  • + *
+ * The test simulates a crash, deletes the source directory, and checks for the expected result. + * + * @throws Exception if an error occurs during the test execution + */ @Test public void testRenameResumeThroughListStatusWithSrcDirDeletedJustBeforeResume() throws Exception { @@ -1127,6 +1374,19 @@ public void testRenameResumeThroughListStatusWithSrcDirDeletedJustBeforeResume() .describedAs("No Copy on resume") .isEqualTo(0); } + + /** + * Verifies the behavior of renaming a directory through the Azure Blob FileSystem + * when the source directory's ETag changes just before the rename operation is resumed. + * It ensures that: + *
    + *
  • No blobs are copied during the resume operation.
  • + *
  • The pending rename JSON file is deleted.
  • + *
+ * The test simulates a crash, retries the operation, and checks for the expected results. + * + * @throws Exception if an error occurs during the test execution + */ @Test public void testRenameResumeThroughListStatusWithSrcDirETagChangedJustBeforeResume() throws Exception { @@ -1167,6 +1427,14 @@ public void testRenameResumeThroughListStatusWithSrcDirETagChangedJustBeforeResu .describedAs("RenamePendingJson should be deleted") .isEqualTo(1); } + + /** + * Test case to verify the behavior of renaming a directory through the Azure Blob + * FileSystem when the source directory's ETag changes just before the rename operation + * is resumed. This test specifically checks the following: + * + * @throws Exception if any errors occur during the test execution + */ @Test public void testRenameResumeThroughGetStatusWithSrcDirETagChangedJustBeforeResume() throws Exception { @@ -1209,6 +1477,7 @@ public void testRenameResumeThroughGetStatusWithSrcDirETagChangedJustBeforeResum .describedAs("RenamePendingJson should be deleted") .isEqualTo(1); } + /** * Test to assert that the CID in src marker blob copy and delete contains the * total number of blobs operated in the rename directory. @@ -1227,9 +1496,8 @@ public void testRenameSrcDirDeleteEmitDeletionCountInClientRequestId() List futures = new ArrayList<>(); for (int i = 0; i < 10; i++) { final int iter = i; - Future future = executorService.submit(() -> { - return fs.create(new Path("/testDir/dir1/file" + iter)); - }); + Future future = executorService.submit(() -> + fs.create(new Path("/testDir/dir1/file" + iter))); futures.add(future); } for (Future future : futures) { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicityTestUtils.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicityTestUtils.java index 02fe0e7781a1c..97635a563fc0f 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicityTestUtils.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicityTestUtils.java @@ -27,6 +27,9 @@ public class RenameAtomicityTestUtils { + public RenameAtomicityTestUtils() { + } + /** * Creates a spied object of {@link BlobRenameHandler} and {@link RenameAtomicity} * and adds mocked behavior to {@link RenameAtomicity#createRenamePendingJson(Path, byte[])}.