From 5d52da39177dd59cca65f82f30ee5da3da153c7f Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Mon, 17 Jul 2023 22:29:09 -0700 Subject: [PATCH 1/6] consumer failure should stop producer --- .../hadoop/fs/azurebfs/AzureBlobFileSystemStore.java | 2 ++ .../hadoop/fs/azurebfs/services/ListBlobConsumer.java | 4 ++++ .../hadoop/fs/azurebfs/services/ListBlobProducer.java | 2 +- .../hadoop/fs/azurebfs/services/ListBlobQueue.java | 9 +++++++++ 4 files changed, 16 insertions(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 08e18dc96aea2..1db74c009dafd 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -1743,6 +1743,7 @@ private void renameBlobDir(final Path source, } catch (InterruptedException | ExecutionException e) { LOG.error(String.format("rename from %s to %s failed", source, destination), e); + listBlobConsumer.fail(); renameBlobExecutorService.shutdown(); if (srcDirBlobLease != null) { srcDirBlobLease.free(); @@ -2015,6 +2016,7 @@ private void deleteOnConsumedBlobs(final Path srcPath, } LOG.error(String.format("Deleting Path %s failed", blobPropertyPathStr), ex); + consumer.fail(); throw new RuntimeException(ex); } })); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobConsumer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobConsumer.java index 80017ab558a78..4ec31a5073a15 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobConsumer.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobConsumer.java @@ -41,4 +41,8 @@ public Boolean isCompleted() { return listBlobQueue.getIsCompleted() && listBlobQueue.size() == 0; } + + public void fail() { + listBlobQueue.consumptionFailed(); + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobProducer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobProducer.java index 577c51e9002fc..00e1fbdf712e2 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobProducer.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobProducer.java @@ -97,7 +97,7 @@ public ListBlobProducer(final String src, if (nextMarker == null) { listBlobQueue.complete(); } - } while(nextMarker != null); + } while(nextMarker != null && listBlobQueue.getConsumptionFailed()); }).start(); } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobQueue.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobQueue.java index 3be6bac98db1f..5c226ae4ac01a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobQueue.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobQueue.java @@ -34,6 +34,7 @@ public class ListBlobQueue { private int totalConsumed = 0; private Boolean isCompleted = false; + private Boolean isConsumptionFailed = false; private AzureBlobFileSystemException failureFromProducer; @@ -85,6 +86,14 @@ public void complete() { isCompleted = true; } + void consumptionFailed() { + isConsumptionFailed = true; + } + + public Boolean getConsumptionFailed() { + return isConsumptionFailed; + } + public Boolean getIsCompleted() { return isCompleted; } From 60bd7727ba7193cfb49f6bc1fc00adf92e57b74c Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Tue, 18 Jul 2023 20:08:04 -0700 Subject: [PATCH 2/6] condition change in producer loop --- .../apache/hadoop/fs/azurebfs/services/ListBlobProducer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobProducer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobProducer.java index 00e1fbdf712e2..7c40a829f2de7 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobProducer.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobProducer.java @@ -97,7 +97,7 @@ public ListBlobProducer(final String src, if (nextMarker == null) { listBlobQueue.complete(); } - } while(nextMarker != null && listBlobQueue.getConsumptionFailed()); + } while(nextMarker != null && !listBlobQueue.getConsumptionFailed()); }).start(); } } From 27f2e228f1d49fa7cb5d6060c7e37aff66f7ddc7 Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Wed, 19 Jul 2023 02:19:44 -0700 Subject: [PATCH 3/6] testProducerStopOnConsumerFailure --- .../azurebfs/services/ListBlobProducer.java | 12 +++- .../fs/azurebfs/ITestListBlobProducer.java | 64 ++++++++++++++++++- 2 files changed, 71 insertions(+), 5 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobProducer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobProducer.java index 7c40a829f2de7..a198dc706c84a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobProducer.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobProducer.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.azurebfs.services; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; @@ -66,6 +67,7 @@ public class ListBlobProducer { private final TracingContext tracingContext; private String nextMarker; + private final Thread thread; public ListBlobProducer(final String src, final AbfsClient abfsClient, @@ -78,7 +80,7 @@ public ListBlobProducer(final String src, this.listBlobQueue = listBlobQueue; listBlobQueue.setProducer(this); this.nextMarker = initNextMarker; - new Thread(() -> { + thread = new Thread(() -> { do { int maxResult = listBlobQueue.availableSize(); if (maxResult == 0) { @@ -98,6 +100,12 @@ public ListBlobProducer(final String src, listBlobQueue.complete(); } } while(nextMarker != null && !listBlobQueue.getConsumptionFailed()); - }).start(); + }); + thread.start(); + } + + @VisibleForTesting + public void waitForProcessCompletion() throws InterruptedException { + thread.join(); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestListBlobProducer.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestListBlobProducer.java index f3c4bd8d241d9..dfcbafadd642b 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestListBlobProducer.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestListBlobProducer.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.azurebfs; +import java.io.IOException; import java.net.HttpURLConnection; import java.util.ArrayList; import java.util.List; @@ -25,8 +26,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Assert; @@ -41,7 +40,6 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; -import org.apache.hadoop.fs.azurebfs.services.BlobProperty; import org.apache.hadoop.fs.azurebfs.services.ListBlobConsumer; import org.apache.hadoop.fs.azurebfs.services.ListBlobProducer; import org.apache.hadoop.fs.azurebfs.services.ListBlobQueue; @@ -179,4 +177,64 @@ public void testConsumerWhenProducerThrowException() throws Exception { Assert.assertTrue(exceptionCaught); } + + @Test + public void testProducerStopOnConsumerFailure() throws Exception { + Configuration configuration = Mockito.spy(getRawConfiguration()); + configuration.set(FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, "10"); + AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance( + configuration); + + fs.setWorkingDirectory(new Path("/")); + fs.mkdirs(new Path("/src")); + ExecutorService executorService = Executors.newFixedThreadPool(10); + List futureList = new ArrayList<>(); + for (int i = 0; i < 20; i++) { + int iter = i; + Future future = executorService.submit(() -> { + try { + fs.create(new Path("/src/file" + iter)); + } catch (IOException ex) {} + }); + futureList.add(future); + } + + for (Future future : futureList) { + future.get(); + } + + AbfsClient client = fs.getAbfsClient(); + AbfsClient spiedClient = Mockito.spy(client); + fs.getAbfsStore().setClient(spiedClient); + + ListBlobQueue queue = new ListBlobQueue( + getConfiguration().getProducerQueueMaxSize(), + getConfiguration().getProducerQueueMaxSize()); + ListBlobConsumer consumer = new ListBlobConsumer(queue); + + Mockito.doAnswer(answer -> { + String marker = answer.getArgument(0); + String prefix = answer.getArgument(1); + String delimiter = answer.getArgument(2); + TracingContext tracingContext = answer.getArgument(4); + Object result = client.getListBlobs(marker, prefix, delimiter, 1, + tracingContext); + consumer.fail(); + return result; + }) + .when(spiedClient) + .getListBlobs(Mockito.nullable(String.class), + Mockito.nullable(String.class), Mockito.nullable(String.class), + Mockito.nullable(Integer.class), Mockito.any(TracingContext.class)); + + ListBlobProducer producer = new ListBlobProducer("src/", spiedClient, queue, + null, Mockito.mock( + TracingContext.class)); + + producer.waitForProcessCompletion(); + Mockito.verify(spiedClient, Mockito.times(1)) + .getListBlobs(Mockito.nullable(String.class), + Mockito.nullable(String.class), Mockito.nullable(String.class), + Mockito.nullable(Integer.class), Mockito.any(TracingContext.class)); + } } From 168257e7a2a071132e8f760437a52d3c2e07fdf6 Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Wed, 19 Jul 2023 04:12:25 -0700 Subject: [PATCH 4/6] test for rename producer failure --- .../fs/azurebfs/AzureBlobFileSystemStore.java | 20 ++++-- .../fs/azurebfs/services/ListBlobQueue.java | 4 +- .../ITestAzureBlobFileSystemRename.java | 72 +++++++++++++++++++ .../fs/azurebfs/ITestListBlobProducer.java | 1 - 4 files changed, 88 insertions(+), 9 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 1db74c009dafd..5a2a801cb2719 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -1613,8 +1613,8 @@ private void orchestrateBlobRenameDir(final Path source, getAbfsConfiguration().getBlobDirRenameMaxThread()); if (blobList.getNextMarker() != null) { - new ListBlobProducer(listSrc, - client, listBlobQueue, blobList.getNextMarker(), tracingContext); + getListBlobProducer(listSrc, listBlobQueue, blobList.getNextMarker(), + tracingContext); } else { listBlobQueue.complete(); } @@ -1678,6 +1678,15 @@ private void orchestrateBlobRenameDir(final Path source, } } + @VisibleForTesting + ListBlobProducer getListBlobProducer(final String listSrc, + final ListBlobQueue listBlobQueue, + final String initNextMarker, + final TracingContext tracingContext) { + return new ListBlobProducer(listSrc, + client, listBlobQueue, initNextMarker, tracingContext); + } + @VisibleForTesting AbfsBlobLease getBlobLease(final String source, final Integer blobLeaseOneMinuteDuration, @@ -1932,8 +1941,8 @@ private void orchestrateBlobDirDeletion(final Path path, getAbfsConfiguration().getProducerQueueMaxSize(), getAbfsConfiguration().getBlobDirDeleteMaxThread()); if (blobList.getNextMarker() != null) { - new ListBlobProducer(listSrc, client, queue, - blobList.getNextMarker(), tracingContext); + getListBlobProducer(listSrc, queue, blobList.getNextMarker(), + tracingContext); } else { queue.complete(); } @@ -2818,8 +2827,7 @@ public void redo(final Path destination, final Path src) listSrcBuilder.append(FORWARD_SLASH); } String listSrc = listSrcBuilder.toString(); - new ListBlobProducer(listSrc, client, listBlobQueue, null, - tracingContext); + getListBlobProducer(listSrc, listBlobQueue, null, tracingContext); AbfsBlobLease abfsBlobLease = getBlobLease(src.toUri().getPath(), BLOB_LEASE_ONE_MINUTE_DURATION, tracingContext); renameBlobDir(src, destination, tracingContext, listBlobQueue, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobQueue.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobQueue.java index 5c226ae4ac01a..b551955fb3726 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobQueue.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobQueue.java @@ -86,11 +86,11 @@ public void complete() { isCompleted = true; } - void consumptionFailed() { + synchronized void consumptionFailed() { isConsumptionFailed = true; } - public Boolean getConsumptionFailed() { + synchronized Boolean getConsumptionFailed() { return isConsumptionFailed; } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index 7bcb5c103d44e..6f2440b8213a2 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -61,12 +61,15 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsLease; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperationTestUtil; +import org.apache.hadoop.fs.azurebfs.services.ListBlobProducer; +import org.apache.hadoop.fs.azurebfs.services.ListBlobQueue; import org.apache.hadoop.fs.azurebfs.services.RenameAtomicityUtils; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.azurebfs.services.PrefixMode; import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_LEASE_CREATE_NON_RECURSIVE; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_PRODUCER_QUEUE_MAX_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_REDIRECT_RENAME; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TRUE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_INGRESS_FALLBACK_TO_DFS; @@ -2225,4 +2228,73 @@ private AtomicInteger assertTracingContextOnRenameResumeProcess(final AzureBlobF Mockito.any(TracingContext.class)); return copied; } + + @Test + public void testProducerStopOnRenameFailure() throws Exception { + assumeNonHnsAccountBlobEndpoint(getFileSystem()); + Configuration configuration = Mockito.spy(getRawConfiguration()); + configuration.set(FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, "1"); + AzureBlobFileSystem fs = Mockito.spy((AzureBlobFileSystem) FileSystem.get(configuration)); + + fs.mkdirs(new Path("/src")); + ExecutorService executorService = Executors.newFixedThreadPool(10); + List futureList = new ArrayList<>(); + for (int i = 0; i < 20; i++) { + int iter = i; + Future future = executorService.submit(() -> { + try { + fs.create(new Path("/src/file" + iter)); + } catch (IOException ex) {} + }); + futureList.add(future); + } + + for (Future future : futureList) { + future.get(); + } + + AbfsClient client = fs.getAbfsClient(); + AbfsClient spiedClient = Mockito.spy(client); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + store.setClient(spiedClient); + Mockito.doReturn(store).when(fs).getAbfsStore(); + + ListBlobProducer[] producers = new ListBlobProducer[1]; + Mockito.doAnswer(answer -> { + producers[0] = (ListBlobProducer) answer.callRealMethod(); + return producers[0]; + }).when(store).getListBlobProducer(Mockito.anyString(), Mockito.any( + ListBlobQueue.class), Mockito.nullable(String.class), Mockito.any(TracingContext.class)); + + Mockito.doAnswer(answer -> { + String marker = answer.getArgument(0); + String prefix = answer.getArgument(1); + String delimiter = answer.getArgument(2); + TracingContext tracingContext = answer.getArgument(4); + Object result = client.getListBlobs(marker, prefix, delimiter, 1, + tracingContext); + return result; + }) + .when(spiedClient) + .getListBlobs(Mockito.nullable(String.class), + Mockito.nullable(String.class), Mockito.nullable(String.class), + Mockito.nullable(Integer.class), Mockito.any(TracingContext.class)); + + Mockito.doAnswer(answer -> { + spiedClient.acquireBlobLease(((Path)answer.getArgument(0)).toUri().getPath(), -1, answer.getArgument(2)); + return answer.callRealMethod(); + }).when(spiedClient).deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class), Mockito.any(TracingContext.class)); + + intercept(Exception.class, () -> { + fs.rename(new Path("/src"), new Path("/dst")); + }); + + + + producers[0].waitForProcessCompletion(); + + Mockito.verify(spiedClient, Mockito.atMost(3)).getListBlobs(Mockito.nullable(String.class), + Mockito.nullable(String.class), Mockito.nullable(String.class), + Mockito.nullable(Integer.class), Mockito.any(TracingContext.class)); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestListBlobProducer.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestListBlobProducer.java index dfcbafadd642b..c3c750c9326cf 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestListBlobProducer.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestListBlobProducer.java @@ -181,7 +181,6 @@ public void testConsumerWhenProducerThrowException() throws Exception { @Test public void testProducerStopOnConsumerFailure() throws Exception { Configuration configuration = Mockito.spy(getRawConfiguration()); - configuration.set(FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, "10"); AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance( configuration); From 8e993beb13d0ef3abe2f65a365d15293337fd1c0 Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Wed, 19 Jul 2023 04:17:31 -0700 Subject: [PATCH 5/6] checkstyle basic refactor of renameTest class; add of test for producer stop in delete test class --- .../ITestAzureBlobFileSystemDelete.java | 78 +++++++++++++++++++ .../ITestAzureBlobFileSystemRename.java | 26 ++++--- 2 files changed, 94 insertions(+), 10 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java index c3bc1b0b2badc..f5e80bf570f97 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java @@ -41,6 +41,8 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; +import org.apache.hadoop.fs.azurebfs.services.ListBlobProducer; +import org.apache.hadoop.fs.azurebfs.services.ListBlobQueue; import org.apache.hadoop.fs.azurebfs.services.PrefixMode; import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient; import org.apache.hadoop.fs.azurebfs.services.TestAbfsPerfTracker; @@ -60,6 +62,7 @@ import static java.net.HttpURLConnection.HTTP_OK; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_MKDIRS_FALLBACK_TO_DFS; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_PRODUCER_QUEUE_MAX_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_REDIRECT_DELETE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_REDIRECT_RENAME; import static org.mockito.ArgumentMatchers.any; @@ -492,4 +495,79 @@ public void testDeleteEmitDeletionCountInClientRequestId() throws Exception { fs.delete(new Path(dirPathStr), true); } + + @Test + public void testProducerStopOnDeleteFailure() throws Exception { + Assume.assumeTrue(getPrefixMode(getFileSystem()) == PrefixMode.BLOB); + Configuration configuration = Mockito.spy(getRawConfiguration()); + configuration.set(FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, "1"); + AzureBlobFileSystem fs = Mockito.spy( + (AzureBlobFileSystem) FileSystem.get(configuration)); + + fs.mkdirs(new Path("/src")); + ExecutorService executorService = Executors.newFixedThreadPool(10); + List futureList = new ArrayList<>(); + for (int i = 0; i < 20; i++) { + int iter = i; + Future future = executorService.submit(() -> { + try { + fs.create(new Path("/src/file" + iter)); + } catch (IOException ex) {} + }); + futureList.add(future); + } + + for (Future future : futureList) { + future.get(); + } + + AbfsClient client = fs.getAbfsClient(); + AbfsClient spiedClient = Mockito.spy(client); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + store.setClient(spiedClient); + Mockito.doReturn(store).when(fs).getAbfsStore(); + + ListBlobProducer[] producers = new ListBlobProducer[1]; + Mockito.doAnswer(answer -> { + producers[0] = (ListBlobProducer) answer.callRealMethod(); + return producers[0]; + }).when(store).getListBlobProducer(Mockito.anyString(), Mockito.any( + ListBlobQueue.class), Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); + + Mockito.doAnswer(answer -> { + String marker = answer.getArgument(0); + String prefix = answer.getArgument(1); + String delimiter = answer.getArgument(2); + TracingContext tracingContext = answer.getArgument(4); + Object result = client.getListBlobs(marker, prefix, delimiter, 1, + tracingContext); + return result; + }) + .when(spiedClient) + .getListBlobs(Mockito.nullable(String.class), + Mockito.nullable(String.class), Mockito.nullable(String.class), + Mockito.nullable(Integer.class), Mockito.any(TracingContext.class)); + + Mockito.doAnswer(answer -> { + spiedClient.acquireBlobLease( + ((Path) answer.getArgument(0)).toUri().getPath(), -1, + answer.getArgument(2)); + return answer.callRealMethod(); + }) + .when(spiedClient) + .deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); + + intercept(Exception.class, () -> { + fs.delete(new Path("/src"), true); + }); + + producers[0].waitForProcessCompletion(); + + Mockito.verify(spiedClient, Mockito.atMost(3)) + .getListBlobs(Mockito.nullable(String.class), + Mockito.nullable(String.class), Mockito.nullable(String.class), + Mockito.nullable(Integer.class), Mockito.any(TracingContext.class)); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index 6f2440b8213a2..07332ea357913 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -2234,7 +2234,8 @@ public void testProducerStopOnRenameFailure() throws Exception { assumeNonHnsAccountBlobEndpoint(getFileSystem()); Configuration configuration = Mockito.spy(getRawConfiguration()); configuration.set(FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, "1"); - AzureBlobFileSystem fs = Mockito.spy((AzureBlobFileSystem) FileSystem.get(configuration)); + AzureBlobFileSystem fs = Mockito.spy( + (AzureBlobFileSystem) FileSystem.get(configuration)); fs.mkdirs(new Path("/src")); ExecutorService executorService = Executors.newFixedThreadPool(10); @@ -2264,7 +2265,8 @@ public void testProducerStopOnRenameFailure() throws Exception { producers[0] = (ListBlobProducer) answer.callRealMethod(); return producers[0]; }).when(store).getListBlobProducer(Mockito.anyString(), Mockito.any( - ListBlobQueue.class), Mockito.nullable(String.class), Mockito.any(TracingContext.class)); + ListBlobQueue.class), Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); Mockito.doAnswer(answer -> { String marker = answer.getArgument(0); @@ -2281,20 +2283,24 @@ public void testProducerStopOnRenameFailure() throws Exception { Mockito.nullable(Integer.class), Mockito.any(TracingContext.class)); Mockito.doAnswer(answer -> { - spiedClient.acquireBlobLease(((Path)answer.getArgument(0)).toUri().getPath(), -1, answer.getArgument(2)); - return answer.callRealMethod(); - }).when(spiedClient).deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class), Mockito.any(TracingContext.class)); + spiedClient.acquireBlobLease( + ((Path) answer.getArgument(0)).toUri().getPath(), -1, + answer.getArgument(2)); + return answer.callRealMethod(); + }) + .when(spiedClient) + .deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); intercept(Exception.class, () -> { fs.rename(new Path("/src"), new Path("/dst")); }); - - producers[0].waitForProcessCompletion(); - Mockito.verify(spiedClient, Mockito.atMost(3)).getListBlobs(Mockito.nullable(String.class), - Mockito.nullable(String.class), Mockito.nullable(String.class), - Mockito.nullable(Integer.class), Mockito.any(TracingContext.class)); + Mockito.verify(spiedClient, Mockito.atMost(3)) + .getListBlobs(Mockito.nullable(String.class), + Mockito.nullable(String.class), Mockito.nullable(String.class), + Mockito.nullable(Integer.class), Mockito.any(TracingContext.class)); } } From 5194e32b2859d3149f7798bb9af2a1764e0326b9 Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Wed, 19 Jul 2023 06:14:24 -0700 Subject: [PATCH 6/6] listBlobQueue not to have synchronized methods --- .../fs/azurebfs/services/ListBlobConsumer.java | 3 +++ .../hadoop/fs/azurebfs/services/ListBlobQueue.java | 4 ++-- .../fs/azurebfs/ITestAzureBlobFileSystemDelete.java | 13 ++++++++++++- .../fs/azurebfs/ITestAzureBlobFileSystemRename.java | 11 ++++++++++- 4 files changed, 27 insertions(+), 4 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobConsumer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobConsumer.java index 4ec31a5073a15..bf6981505e4ea 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobConsumer.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobConsumer.java @@ -42,6 +42,9 @@ public Boolean isCompleted() { && listBlobQueue.size() == 0; } + /** + * Register consumer failure. + */ public void fail() { listBlobQueue.consumptionFailed(); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobQueue.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobQueue.java index b551955fb3726..3de0c85ec5000 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobQueue.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobQueue.java @@ -86,11 +86,11 @@ public void complete() { isCompleted = true; } - synchronized void consumptionFailed() { + void consumptionFailed() { isConsumptionFailed = true; } - synchronized Boolean getConsumptionFailed() { + Boolean getConsumptionFailed() { return isConsumptionFailed; } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java index f5e80bf570f97..84d4c7069ccd6 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java @@ -27,6 +27,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -500,7 +502,6 @@ public void testDeleteEmitDeletionCountInClientRequestId() throws Exception { public void testProducerStopOnDeleteFailure() throws Exception { Assume.assumeTrue(getPrefixMode(getFileSystem()) == PrefixMode.BLOB); Configuration configuration = Mockito.spy(getRawConfiguration()); - configuration.set(FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, "1"); AzureBlobFileSystem fs = Mockito.spy( (AzureBlobFileSystem) FileSystem.get(configuration)); @@ -535,11 +536,20 @@ public void testProducerStopOnDeleteFailure() throws Exception { ListBlobQueue.class), Mockito.nullable(String.class), Mockito.any(TracingContext.class)); + AtomicInteger listCounter = new AtomicInteger(0); + AtomicBoolean hasConsumerStarted = new AtomicBoolean(false); + Mockito.doAnswer(answer -> { String marker = answer.getArgument(0); String prefix = answer.getArgument(1); String delimiter = answer.getArgument(2); TracingContext tracingContext = answer.getArgument(4); + int counter = listCounter.incrementAndGet(); + if (counter > 1) { + while (!hasConsumerStarted.get()) { + Thread.sleep(1_000L); + } + } Object result = client.getListBlobs(marker, prefix, delimiter, 1, tracingContext); return result; @@ -553,6 +563,7 @@ public void testProducerStopOnDeleteFailure() throws Exception { spiedClient.acquireBlobLease( ((Path) answer.getArgument(0)).toUri().getPath(), -1, answer.getArgument(2)); + hasConsumerStarted.set(true); return answer.callRealMethod(); }) .when(spiedClient) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index 07332ea357913..5e60a362b43b4 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -2233,7 +2233,6 @@ private AtomicInteger assertTracingContextOnRenameResumeProcess(final AzureBlobF public void testProducerStopOnRenameFailure() throws Exception { assumeNonHnsAccountBlobEndpoint(getFileSystem()); Configuration configuration = Mockito.spy(getRawConfiguration()); - configuration.set(FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, "1"); AzureBlobFileSystem fs = Mockito.spy( (AzureBlobFileSystem) FileSystem.get(configuration)); @@ -2268,11 +2267,20 @@ public void testProducerStopOnRenameFailure() throws Exception { ListBlobQueue.class), Mockito.nullable(String.class), Mockito.any(TracingContext.class)); + AtomicInteger listCounter = new AtomicInteger(0); + AtomicBoolean hasConsumerStarted = new AtomicBoolean(false); + Mockito.doAnswer(answer -> { String marker = answer.getArgument(0); String prefix = answer.getArgument(1); String delimiter = answer.getArgument(2); TracingContext tracingContext = answer.getArgument(4); + int counter = listCounter.incrementAndGet(); + if (counter > 1 && producers[0] != null) { + while (!hasConsumerStarted.get()) { + Thread.sleep(1_000L); + } + } Object result = client.getListBlobs(marker, prefix, delimiter, 1, tracingContext); return result; @@ -2286,6 +2294,7 @@ public void testProducerStopOnRenameFailure() throws Exception { spiedClient.acquireBlobLease( ((Path) answer.getArgument(0)).toUri().getPath(), -1, answer.getArgument(2)); + hasConsumerStarted.set(true); return answer.callRealMethod(); }) .when(spiedClient)