Skip to content

Commit

Permalink
Merge pull request apache#84 from saxenapranav/ABFS_3.3.2_dev_procude…
Browse files Browse the repository at this point in the history
…r_improvements

Stop producer thread when consumer fails due to irrecoverable exception.
  • Loading branch information
saxenapranav authored Jul 25, 2023
2 parents 6ab5f9a + 5194e32 commit e858497
Show file tree
Hide file tree
Showing 7 changed files with 279 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1613,8 +1613,8 @@ private void orchestrateBlobRenameDir(final Path source,
getAbfsConfiguration().getBlobDirRenameMaxThread());

if (blobList.getNextMarker() != null) {
new ListBlobProducer(listSrc,
client, listBlobQueue, blobList.getNextMarker(), tracingContext);
getListBlobProducer(listSrc, listBlobQueue, blobList.getNextMarker(),
tracingContext);
} else {
listBlobQueue.complete();
}
Expand Down Expand Up @@ -1678,6 +1678,15 @@ private void orchestrateBlobRenameDir(final Path source,
}
}

@VisibleForTesting
ListBlobProducer getListBlobProducer(final String listSrc,
final ListBlobQueue listBlobQueue,
final String initNextMarker,
final TracingContext tracingContext) {
return new ListBlobProducer(listSrc,
client, listBlobQueue, initNextMarker, tracingContext);
}

@VisibleForTesting
AbfsBlobLease getBlobLease(final String source,
final Integer blobLeaseOneMinuteDuration,
Expand Down Expand Up @@ -1743,6 +1752,7 @@ private void renameBlobDir(final Path source,
} catch (InterruptedException | ExecutionException e) {
LOG.error(String.format("rename from %s to %s failed", source,
destination), e);
listBlobConsumer.fail();
renameBlobExecutorService.shutdown();
if (srcDirBlobLease != null) {
srcDirBlobLease.free();
Expand Down Expand Up @@ -1931,8 +1941,8 @@ private void orchestrateBlobDirDeletion(final Path path,
getAbfsConfiguration().getProducerQueueMaxSize(),
getAbfsConfiguration().getBlobDirDeleteMaxThread());
if (blobList.getNextMarker() != null) {
new ListBlobProducer(listSrc, client, queue,
blobList.getNextMarker(), tracingContext);
getListBlobProducer(listSrc, queue, blobList.getNextMarker(),
tracingContext);
} else {
queue.complete();
}
Expand Down Expand Up @@ -2015,6 +2025,7 @@ private void deleteOnConsumedBlobs(final Path srcPath,
}
LOG.error(String.format("Deleting Path %s failed",
blobPropertyPathStr), ex);
consumer.fail();
throw new RuntimeException(ex);
}
}));
Expand Down Expand Up @@ -2816,8 +2827,7 @@ public void redo(final Path destination, final Path src)
listSrcBuilder.append(FORWARD_SLASH);
}
String listSrc = listSrcBuilder.toString();
new ListBlobProducer(listSrc, client, listBlobQueue, null,
tracingContext);
getListBlobProducer(listSrc, listBlobQueue, null, tracingContext);
AbfsBlobLease abfsBlobLease = getBlobLease(src.toUri().getPath(),
BLOB_LEASE_ONE_MINUTE_DURATION, tracingContext);
renameBlobDir(src, destination, tracingContext, listBlobQueue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,11 @@ public Boolean isCompleted() {
return listBlobQueue.getIsCompleted()
&& listBlobQueue.size() == 0;
}

/**
* Register consumer failure.
*/
public void fail() {
listBlobQueue.consumptionFailed();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.fs.azurebfs.services;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;

Expand Down Expand Up @@ -66,6 +67,7 @@ public class ListBlobProducer {
private final TracingContext tracingContext;

private String nextMarker;
private final Thread thread;

public ListBlobProducer(final String src,
final AbfsClient abfsClient,
Expand All @@ -78,7 +80,7 @@ public ListBlobProducer(final String src,
this.listBlobQueue = listBlobQueue;
listBlobQueue.setProducer(this);
this.nextMarker = initNextMarker;
new Thread(() -> {
thread = new Thread(() -> {
do {
int maxResult = listBlobQueue.availableSize();
if (maxResult == 0) {
Expand All @@ -97,7 +99,13 @@ public ListBlobProducer(final String src,
if (nextMarker == null) {
listBlobQueue.complete();
}
} while(nextMarker != null);
}).start();
} while(nextMarker != null && !listBlobQueue.getConsumptionFailed());
});
thread.start();
}

@VisibleForTesting
public void waitForProcessCompletion() throws InterruptedException {
thread.join();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class ListBlobQueue {
private int totalConsumed = 0;

private Boolean isCompleted = false;
private Boolean isConsumptionFailed = false;

private AzureBlobFileSystemException failureFromProducer;

Expand Down Expand Up @@ -85,6 +86,14 @@ public void complete() {
isCompleted = true;
}

void consumptionFailed() {
isConsumptionFailed = true;
}

Boolean getConsumptionFailed() {
return isConsumptionFailed;
}

public Boolean getIsCompleted() {
return isCompleted;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -41,6 +43,8 @@
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.services.ListBlobProducer;
import org.apache.hadoop.fs.azurebfs.services.ListBlobQueue;
import org.apache.hadoop.fs.azurebfs.services.PrefixMode;
import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient;
import org.apache.hadoop.fs.azurebfs.services.TestAbfsPerfTracker;
Expand All @@ -60,6 +64,7 @@
import static java.net.HttpURLConnection.HTTP_OK;

import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_MKDIRS_FALLBACK_TO_DFS;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_PRODUCER_QUEUE_MAX_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_REDIRECT_DELETE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_REDIRECT_RENAME;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -492,4 +497,88 @@ public void testDeleteEmitDeletionCountInClientRequestId() throws Exception {

fs.delete(new Path(dirPathStr), true);
}

@Test
public void testProducerStopOnDeleteFailure() throws Exception {
Assume.assumeTrue(getPrefixMode(getFileSystem()) == PrefixMode.BLOB);
Configuration configuration = Mockito.spy(getRawConfiguration());
AzureBlobFileSystem fs = Mockito.spy(
(AzureBlobFileSystem) FileSystem.get(configuration));

fs.mkdirs(new Path("/src"));
ExecutorService executorService = Executors.newFixedThreadPool(10);
List<Future> futureList = new ArrayList<>();
for (int i = 0; i < 20; i++) {
int iter = i;
Future future = executorService.submit(() -> {
try {
fs.create(new Path("/src/file" + iter));
} catch (IOException ex) {}
});
futureList.add(future);
}

for (Future future : futureList) {
future.get();
}

AbfsClient client = fs.getAbfsClient();
AbfsClient spiedClient = Mockito.spy(client);
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
store.setClient(spiedClient);
Mockito.doReturn(store).when(fs).getAbfsStore();

ListBlobProducer[] producers = new ListBlobProducer[1];
Mockito.doAnswer(answer -> {
producers[0] = (ListBlobProducer) answer.callRealMethod();
return producers[0];
}).when(store).getListBlobProducer(Mockito.anyString(), Mockito.any(
ListBlobQueue.class), Mockito.nullable(String.class),
Mockito.any(TracingContext.class));

AtomicInteger listCounter = new AtomicInteger(0);
AtomicBoolean hasConsumerStarted = new AtomicBoolean(false);

Mockito.doAnswer(answer -> {
String marker = answer.getArgument(0);
String prefix = answer.getArgument(1);
String delimiter = answer.getArgument(2);
TracingContext tracingContext = answer.getArgument(4);
int counter = listCounter.incrementAndGet();
if (counter > 1) {
while (!hasConsumerStarted.get()) {
Thread.sleep(1_000L);
}
}
Object result = client.getListBlobs(marker, prefix, delimiter, 1,
tracingContext);
return result;
})
.when(spiedClient)
.getListBlobs(Mockito.nullable(String.class),
Mockito.nullable(String.class), Mockito.nullable(String.class),
Mockito.nullable(Integer.class), Mockito.any(TracingContext.class));

Mockito.doAnswer(answer -> {
spiedClient.acquireBlobLease(
((Path) answer.getArgument(0)).toUri().getPath(), -1,
answer.getArgument(2));
hasConsumerStarted.set(true);
return answer.callRealMethod();
})
.when(spiedClient)
.deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class),
Mockito.any(TracingContext.class));

intercept(Exception.class, () -> {
fs.delete(new Path("/src"), true);
});

producers[0].waitForProcessCompletion();

Mockito.verify(spiedClient, Mockito.atMost(3))
.getListBlobs(Mockito.nullable(String.class),
Mockito.nullable(String.class), Mockito.nullable(String.class),
Mockito.nullable(Integer.class), Mockito.any(TracingContext.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,15 @@
import org.apache.hadoop.fs.azurebfs.services.AbfsLease;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperationTestUtil;
import org.apache.hadoop.fs.azurebfs.services.ListBlobProducer;
import org.apache.hadoop.fs.azurebfs.services.ListBlobQueue;
import org.apache.hadoop.fs.azurebfs.services.RenameAtomicityUtils;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.azurebfs.services.PrefixMode;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;

import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_LEASE_CREATE_NON_RECURSIVE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_PRODUCER_QUEUE_MAX_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_REDIRECT_RENAME;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TRUE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_INGRESS_FALLBACK_TO_DFS;
Expand Down Expand Up @@ -2225,4 +2228,88 @@ private AtomicInteger assertTracingContextOnRenameResumeProcess(final AzureBlobF
Mockito.any(TracingContext.class));
return copied;
}

@Test
public void testProducerStopOnRenameFailure() throws Exception {
assumeNonHnsAccountBlobEndpoint(getFileSystem());
Configuration configuration = Mockito.spy(getRawConfiguration());
AzureBlobFileSystem fs = Mockito.spy(
(AzureBlobFileSystem) FileSystem.get(configuration));

fs.mkdirs(new Path("/src"));
ExecutorService executorService = Executors.newFixedThreadPool(10);
List<Future> futureList = new ArrayList<>();
for (int i = 0; i < 20; i++) {
int iter = i;
Future future = executorService.submit(() -> {
try {
fs.create(new Path("/src/file" + iter));
} catch (IOException ex) {}
});
futureList.add(future);
}

for (Future future : futureList) {
future.get();
}

AbfsClient client = fs.getAbfsClient();
AbfsClient spiedClient = Mockito.spy(client);
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
store.setClient(spiedClient);
Mockito.doReturn(store).when(fs).getAbfsStore();

ListBlobProducer[] producers = new ListBlobProducer[1];
Mockito.doAnswer(answer -> {
producers[0] = (ListBlobProducer) answer.callRealMethod();
return producers[0];
}).when(store).getListBlobProducer(Mockito.anyString(), Mockito.any(
ListBlobQueue.class), Mockito.nullable(String.class),
Mockito.any(TracingContext.class));

AtomicInteger listCounter = new AtomicInteger(0);
AtomicBoolean hasConsumerStarted = new AtomicBoolean(false);

Mockito.doAnswer(answer -> {
String marker = answer.getArgument(0);
String prefix = answer.getArgument(1);
String delimiter = answer.getArgument(2);
TracingContext tracingContext = answer.getArgument(4);
int counter = listCounter.incrementAndGet();
if (counter > 1 && producers[0] != null) {
while (!hasConsumerStarted.get()) {
Thread.sleep(1_000L);
}
}
Object result = client.getListBlobs(marker, prefix, delimiter, 1,
tracingContext);
return result;
})
.when(spiedClient)
.getListBlobs(Mockito.nullable(String.class),
Mockito.nullable(String.class), Mockito.nullable(String.class),
Mockito.nullable(Integer.class), Mockito.any(TracingContext.class));

Mockito.doAnswer(answer -> {
spiedClient.acquireBlobLease(
((Path) answer.getArgument(0)).toUri().getPath(), -1,
answer.getArgument(2));
hasConsumerStarted.set(true);
return answer.callRealMethod();
})
.when(spiedClient)
.deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class),
Mockito.any(TracingContext.class));

intercept(Exception.class, () -> {
fs.rename(new Path("/src"), new Path("/dst"));
});

producers[0].waitForProcessCompletion();

Mockito.verify(spiedClient, Mockito.atMost(3))
.getListBlobs(Mockito.nullable(String.class),
Mockito.nullable(String.class), Mockito.nullable(String.class),
Mockito.nullable(Integer.class), Mockito.any(TracingContext.class));
}
}
Loading

0 comments on commit e858497

Please sign in to comment.