Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
Cancel in-progress async operations when the pipeline is aborted. (#1205
Browse files Browse the repository at this point in the history
)
  • Loading branch information
ajsutton authored Apr 4, 2019
1 parent 4fce251 commit cf7a13a
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ public void finalize(final WritePipe<O> outputPipe) {
}
}

@Override
public void abort() {
inProgress.forEach(future -> future.cancel(true));
}

private void outputNextCompletedTask(final WritePipe<O> outputPipe) {
try {
waitForAnyFutureToComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ public boolean hasMore() {
return input.hasMore();
}

@Override
public boolean isAborted() {
return input.isAborted();
}

@Override
public List<T> get() {
final T firstItem = input.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public boolean isOpen() {
return !closed.get() && !aborted.get();
}

@Override
public boolean isAborted() {
return aborted.get();
}

/**
* Get the number of items that can be queued inside this pipe.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public void run() {
while (inputPipe.hasMore()) {
processor.processNextInput(inputPipe, outputPipe);
}
if (inputPipe.isAborted()) {
processor.abort();
}
processor.finalize(outputPipe);
outputPipe.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@ interface Processor<I, O> {
void processNextInput(final ReadPipe<I> inputPipe, final WritePipe<O> outputPipe);

default void finalize(final WritePipe<O> outputPipe) {}

default void abort() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ public interface ReadPipe<T> {
*/
boolean hasMore();

/**
* Determines if this pipeline this pipe is a part of has been aborted.
*
* @return true if the pipeline has been aborted, otherwise false.
*/
boolean isAborted();

/**
* Get and remove the next item from this pipe. This method will block until the next item is
* available but may still return <code>null</code> if the pipe is closed or the thread
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,21 @@ public void shouldOutputRemainingInProgressTasksWhenFinalizing() {
verify(writePipe).put("b");
}

@Test
public void shouldCancelInProgressTasksWhenAborted() {
final CompletableFuture<String> task1 = new CompletableFuture<>();
final CompletableFuture<String> task2 = new CompletableFuture<>();
when(readPipe.get()).thenReturn(task1).thenReturn(task2);

processor.processNextInput(readPipe, writePipe);
processor.processNextInput(readPipe, writePipe);

processor.abort();

assertThat(task1).isCancelled();
assertThat(task2).isCancelled();
}

@Test
public void shouldInterruptThreadWhenFutureCompletes() {
// Ensures that if we're waiting for the next input we wake up and output completed tasks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public void shouldNotHaveMoreWhenAbortedEvenIfNotEmpty() {
pipe.abort();

assertThat(pipe.hasMore()).isFalse();
assertThat(pipe.isAborted()).isTrue();
}

@Test
Expand All @@ -79,6 +80,7 @@ public void shouldNotWaitToReachMaximumSizeBeforeReturningBatch() {
public void shouldNotBeOpenAfterAbort() {
pipe.abort();
assertThat(pipe.isOpen()).isFalse();
assertThat(pipe.isAborted()).isTrue();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,14 @@ public void shouldFinalizeSingleStepStageAndCloseOutputPipeWhenInputCloses() {
verifyNoMoreInteractions(singleStep);
assertThat(outputPipe.isOpen()).isFalse();
}

@Test
public void shouldAbortProcessorIfReadPipeIsAborted() {
inputPipe.abort();
stage.run();

verify(singleStep).abort();
verify(singleStep).finalize(outputPipe);
assertThat(outputPipe.isOpen()).isFalse();
}
}

0 comments on commit cf7a13a

Please sign in to comment.