diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 08e18dc96aea2..5a2a801cb2719 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -1613,8 +1613,8 @@ private void orchestrateBlobRenameDir(final Path source, getAbfsConfiguration().getBlobDirRenameMaxThread()); if (blobList.getNextMarker() != null) { - new ListBlobProducer(listSrc, - client, listBlobQueue, blobList.getNextMarker(), tracingContext); + getListBlobProducer(listSrc, listBlobQueue, blobList.getNextMarker(), + tracingContext); } else { listBlobQueue.complete(); } @@ -1678,6 +1678,15 @@ private void orchestrateBlobRenameDir(final Path source, } } + @VisibleForTesting + ListBlobProducer getListBlobProducer(final String listSrc, + final ListBlobQueue listBlobQueue, + final String initNextMarker, + final TracingContext tracingContext) { + return new ListBlobProducer(listSrc, + client, listBlobQueue, initNextMarker, tracingContext); + } + @VisibleForTesting AbfsBlobLease getBlobLease(final String source, final Integer blobLeaseOneMinuteDuration, @@ -1743,6 +1752,7 @@ private void renameBlobDir(final Path source, } catch (InterruptedException | ExecutionException e) { LOG.error(String.format("rename from %s to %s failed", source, destination), e); + listBlobConsumer.fail(); renameBlobExecutorService.shutdown(); if (srcDirBlobLease != null) { srcDirBlobLease.free(); @@ -1931,8 +1941,8 @@ private void orchestrateBlobDirDeletion(final Path path, getAbfsConfiguration().getProducerQueueMaxSize(), getAbfsConfiguration().getBlobDirDeleteMaxThread()); if (blobList.getNextMarker() != null) { - new ListBlobProducer(listSrc, client, queue, - blobList.getNextMarker(), tracingContext); + getListBlobProducer(listSrc, queue, blobList.getNextMarker(), + tracingContext); } else { queue.complete(); } @@ -2015,6 +2025,7 @@ private void deleteOnConsumedBlobs(final Path srcPath, } LOG.error(String.format("Deleting Path %s failed", blobPropertyPathStr), ex); + consumer.fail(); throw new RuntimeException(ex); } })); @@ -2816,8 +2827,7 @@ public void redo(final Path destination, final Path src) listSrcBuilder.append(FORWARD_SLASH); } String listSrc = listSrcBuilder.toString(); - new ListBlobProducer(listSrc, client, listBlobQueue, null, - tracingContext); + getListBlobProducer(listSrc, listBlobQueue, null, tracingContext); AbfsBlobLease abfsBlobLease = getBlobLease(src.toUri().getPath(), BLOB_LEASE_ONE_MINUTE_DURATION, tracingContext); renameBlobDir(src, destination, tracingContext, listBlobQueue, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobConsumer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobConsumer.java index 80017ab558a78..bf6981505e4ea 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobConsumer.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobConsumer.java @@ -41,4 +41,11 @@ public Boolean isCompleted() { return listBlobQueue.getIsCompleted() && listBlobQueue.size() == 0; } + + /** + * Register consumer failure. + */ + public void fail() { + listBlobQueue.consumptionFailed(); + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobProducer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobProducer.java index 577c51e9002fc..a198dc706c84a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobProducer.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobProducer.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.azurebfs.services; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; @@ -66,6 +67,7 @@ public class ListBlobProducer { private final TracingContext tracingContext; private String nextMarker; + private final Thread thread; public ListBlobProducer(final String src, final AbfsClient abfsClient, @@ -78,7 +80,7 @@ public ListBlobProducer(final String src, this.listBlobQueue = listBlobQueue; listBlobQueue.setProducer(this); this.nextMarker = initNextMarker; - new Thread(() -> { + thread = new Thread(() -> { do { int maxResult = listBlobQueue.availableSize(); if (maxResult == 0) { @@ -97,7 +99,13 @@ public ListBlobProducer(final String src, if (nextMarker == null) { listBlobQueue.complete(); } - } while(nextMarker != null); - }).start(); + } while(nextMarker != null && !listBlobQueue.getConsumptionFailed()); + }); + thread.start(); + } + + @VisibleForTesting + public void waitForProcessCompletion() throws InterruptedException { + thread.join(); } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobQueue.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobQueue.java index 3be6bac98db1f..3de0c85ec5000 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobQueue.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobQueue.java @@ -34,6 +34,7 @@ public class ListBlobQueue { private int totalConsumed = 0; private Boolean isCompleted = false; + private Boolean isConsumptionFailed = false; private AzureBlobFileSystemException failureFromProducer; @@ -85,6 +86,14 @@ public void complete() { isCompleted = true; } + void consumptionFailed() { + isConsumptionFailed = true; + } + + Boolean getConsumptionFailed() { + return isConsumptionFailed; + } + public Boolean getIsCompleted() { return isCompleted; } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java index c3bc1b0b2badc..84d4c7069ccd6 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java @@ -27,6 +27,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -41,6 +43,8 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; +import org.apache.hadoop.fs.azurebfs.services.ListBlobProducer; +import org.apache.hadoop.fs.azurebfs.services.ListBlobQueue; import org.apache.hadoop.fs.azurebfs.services.PrefixMode; import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient; import org.apache.hadoop.fs.azurebfs.services.TestAbfsPerfTracker; @@ -60,6 +64,7 @@ import static java.net.HttpURLConnection.HTTP_OK; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_MKDIRS_FALLBACK_TO_DFS; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_PRODUCER_QUEUE_MAX_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_REDIRECT_DELETE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_REDIRECT_RENAME; import static org.mockito.ArgumentMatchers.any; @@ -492,4 +497,88 @@ public void testDeleteEmitDeletionCountInClientRequestId() throws Exception { fs.delete(new Path(dirPathStr), true); } + + @Test + public void testProducerStopOnDeleteFailure() throws Exception { + Assume.assumeTrue(getPrefixMode(getFileSystem()) == PrefixMode.BLOB); + Configuration configuration = Mockito.spy(getRawConfiguration()); + AzureBlobFileSystem fs = Mockito.spy( + (AzureBlobFileSystem) FileSystem.get(configuration)); + + fs.mkdirs(new Path("/src")); + ExecutorService executorService = Executors.newFixedThreadPool(10); + List futureList = new ArrayList<>(); + for (int i = 0; i < 20; i++) { + int iter = i; + Future future = executorService.submit(() -> { + try { + fs.create(new Path("/src/file" + iter)); + } catch (IOException ex) {} + }); + futureList.add(future); + } + + for (Future future : futureList) { + future.get(); + } + + AbfsClient client = fs.getAbfsClient(); + AbfsClient spiedClient = Mockito.spy(client); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + store.setClient(spiedClient); + Mockito.doReturn(store).when(fs).getAbfsStore(); + + ListBlobProducer[] producers = new ListBlobProducer[1]; + Mockito.doAnswer(answer -> { + producers[0] = (ListBlobProducer) answer.callRealMethod(); + return producers[0]; + }).when(store).getListBlobProducer(Mockito.anyString(), Mockito.any( + ListBlobQueue.class), Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); + + AtomicInteger listCounter = new AtomicInteger(0); + AtomicBoolean hasConsumerStarted = new AtomicBoolean(false); + + Mockito.doAnswer(answer -> { + String marker = answer.getArgument(0); + String prefix = answer.getArgument(1); + String delimiter = answer.getArgument(2); + TracingContext tracingContext = answer.getArgument(4); + int counter = listCounter.incrementAndGet(); + if (counter > 1) { + while (!hasConsumerStarted.get()) { + Thread.sleep(1_000L); + } + } + Object result = client.getListBlobs(marker, prefix, delimiter, 1, + tracingContext); + return result; + }) + .when(spiedClient) + .getListBlobs(Mockito.nullable(String.class), + Mockito.nullable(String.class), Mockito.nullable(String.class), + Mockito.nullable(Integer.class), Mockito.any(TracingContext.class)); + + Mockito.doAnswer(answer -> { + spiedClient.acquireBlobLease( + ((Path) answer.getArgument(0)).toUri().getPath(), -1, + answer.getArgument(2)); + hasConsumerStarted.set(true); + return answer.callRealMethod(); + }) + .when(spiedClient) + .deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); + + intercept(Exception.class, () -> { + fs.delete(new Path("/src"), true); + }); + + producers[0].waitForProcessCompletion(); + + Mockito.verify(spiedClient, Mockito.atMost(3)) + .getListBlobs(Mockito.nullable(String.class), + Mockito.nullable(String.class), Mockito.nullable(String.class), + Mockito.nullable(Integer.class), Mockito.any(TracingContext.class)); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index 7bcb5c103d44e..5e60a362b43b4 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -61,12 +61,15 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsLease; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperationTestUtil; +import org.apache.hadoop.fs.azurebfs.services.ListBlobProducer; +import org.apache.hadoop.fs.azurebfs.services.ListBlobQueue; import org.apache.hadoop.fs.azurebfs.services.RenameAtomicityUtils; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.azurebfs.services.PrefixMode; import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_LEASE_CREATE_NON_RECURSIVE; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_PRODUCER_QUEUE_MAX_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_REDIRECT_RENAME; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TRUE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_INGRESS_FALLBACK_TO_DFS; @@ -2225,4 +2228,88 @@ private AtomicInteger assertTracingContextOnRenameResumeProcess(final AzureBlobF Mockito.any(TracingContext.class)); return copied; } + + @Test + public void testProducerStopOnRenameFailure() throws Exception { + assumeNonHnsAccountBlobEndpoint(getFileSystem()); + Configuration configuration = Mockito.spy(getRawConfiguration()); + AzureBlobFileSystem fs = Mockito.spy( + (AzureBlobFileSystem) FileSystem.get(configuration)); + + fs.mkdirs(new Path("/src")); + ExecutorService executorService = Executors.newFixedThreadPool(10); + List futureList = new ArrayList<>(); + for (int i = 0; i < 20; i++) { + int iter = i; + Future future = executorService.submit(() -> { + try { + fs.create(new Path("/src/file" + iter)); + } catch (IOException ex) {} + }); + futureList.add(future); + } + + for (Future future : futureList) { + future.get(); + } + + AbfsClient client = fs.getAbfsClient(); + AbfsClient spiedClient = Mockito.spy(client); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + store.setClient(spiedClient); + Mockito.doReturn(store).when(fs).getAbfsStore(); + + ListBlobProducer[] producers = new ListBlobProducer[1]; + Mockito.doAnswer(answer -> { + producers[0] = (ListBlobProducer) answer.callRealMethod(); + return producers[0]; + }).when(store).getListBlobProducer(Mockito.anyString(), Mockito.any( + ListBlobQueue.class), Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); + + AtomicInteger listCounter = new AtomicInteger(0); + AtomicBoolean hasConsumerStarted = new AtomicBoolean(false); + + Mockito.doAnswer(answer -> { + String marker = answer.getArgument(0); + String prefix = answer.getArgument(1); + String delimiter = answer.getArgument(2); + TracingContext tracingContext = answer.getArgument(4); + int counter = listCounter.incrementAndGet(); + if (counter > 1 && producers[0] != null) { + while (!hasConsumerStarted.get()) { + Thread.sleep(1_000L); + } + } + Object result = client.getListBlobs(marker, prefix, delimiter, 1, + tracingContext); + return result; + }) + .when(spiedClient) + .getListBlobs(Mockito.nullable(String.class), + Mockito.nullable(String.class), Mockito.nullable(String.class), + Mockito.nullable(Integer.class), Mockito.any(TracingContext.class)); + + Mockito.doAnswer(answer -> { + spiedClient.acquireBlobLease( + ((Path) answer.getArgument(0)).toUri().getPath(), -1, + answer.getArgument(2)); + hasConsumerStarted.set(true); + return answer.callRealMethod(); + }) + .when(spiedClient) + .deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); + + intercept(Exception.class, () -> { + fs.rename(new Path("/src"), new Path("/dst")); + }); + + producers[0].waitForProcessCompletion(); + + Mockito.verify(spiedClient, Mockito.atMost(3)) + .getListBlobs(Mockito.nullable(String.class), + Mockito.nullable(String.class), Mockito.nullable(String.class), + Mockito.nullable(Integer.class), Mockito.any(TracingContext.class)); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestListBlobProducer.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestListBlobProducer.java index f3c4bd8d241d9..c3c750c9326cf 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestListBlobProducer.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestListBlobProducer.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.azurebfs; +import java.io.IOException; import java.net.HttpURLConnection; import java.util.ArrayList; import java.util.List; @@ -25,8 +26,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Assert; @@ -41,7 +40,6 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; -import org.apache.hadoop.fs.azurebfs.services.BlobProperty; import org.apache.hadoop.fs.azurebfs.services.ListBlobConsumer; import org.apache.hadoop.fs.azurebfs.services.ListBlobProducer; import org.apache.hadoop.fs.azurebfs.services.ListBlobQueue; @@ -179,4 +177,63 @@ public void testConsumerWhenProducerThrowException() throws Exception { Assert.assertTrue(exceptionCaught); } + + @Test + public void testProducerStopOnConsumerFailure() throws Exception { + Configuration configuration = Mockito.spy(getRawConfiguration()); + AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance( + configuration); + + fs.setWorkingDirectory(new Path("/")); + fs.mkdirs(new Path("/src")); + ExecutorService executorService = Executors.newFixedThreadPool(10); + List futureList = new ArrayList<>(); + for (int i = 0; i < 20; i++) { + int iter = i; + Future future = executorService.submit(() -> { + try { + fs.create(new Path("/src/file" + iter)); + } catch (IOException ex) {} + }); + futureList.add(future); + } + + for (Future future : futureList) { + future.get(); + } + + AbfsClient client = fs.getAbfsClient(); + AbfsClient spiedClient = Mockito.spy(client); + fs.getAbfsStore().setClient(spiedClient); + + ListBlobQueue queue = new ListBlobQueue( + getConfiguration().getProducerQueueMaxSize(), + getConfiguration().getProducerQueueMaxSize()); + ListBlobConsumer consumer = new ListBlobConsumer(queue); + + Mockito.doAnswer(answer -> { + String marker = answer.getArgument(0); + String prefix = answer.getArgument(1); + String delimiter = answer.getArgument(2); + TracingContext tracingContext = answer.getArgument(4); + Object result = client.getListBlobs(marker, prefix, delimiter, 1, + tracingContext); + consumer.fail(); + return result; + }) + .when(spiedClient) + .getListBlobs(Mockito.nullable(String.class), + Mockito.nullable(String.class), Mockito.nullable(String.class), + Mockito.nullable(Integer.class), Mockito.any(TracingContext.class)); + + ListBlobProducer producer = new ListBlobProducer("src/", spiedClient, queue, + null, Mockito.mock( + TracingContext.class)); + + producer.waitForProcessCompletion(); + Mockito.verify(spiedClient, Mockito.times(1)) + .getListBlobs(Mockito.nullable(String.class), + Mockito.nullable(String.class), Mockito.nullable(String.class), + Mockito.nullable(Integer.class), Mockito.any(TracingContext.class)); + } }