Skip to content

Commit

Permalink
Merge pull request #378 from smallrye/fix/onCancellationPropagation
Browse files Browse the repository at this point in the history
Do not trigger onCancellation() when the stream has already completed
  • Loading branch information
cescoffier authored Dec 3, 2020
2 parents 16a831a + 9fddfee commit 3ef50de
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ public MultiOnCancellationCallProcessor(MultiSubscriber<? super T> downstream) {
super(downstream);
}

@Override
public void onCompletion() {
supplierInvoked.set(true);
super.onCompletion();
}

@Override
public void cancel() {
execute().subscribe().with(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ public MultiOnCancellationInvokeProcessor(MultiSubscriber<? super T> downstream)
super(downstream);
}

@Override
public void onCompletion() {
actionInvoked.set(true);
super.onCompletion();
}

@Override
public void cancel() {
if (actionInvoked.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,36 @@ public void testCancellationAfterOneItem() {
subscriber.cancel();
assertThat(counter.get()).isEqualTo(1);
}

@Test
public void testCancellationPropagatedBeforeCompletion() {
AtomicBoolean invokeCalled = new AtomicBoolean();
AtomicBoolean callCalled = new AtomicBoolean();

Multi<Integer> multi = Multi.createFrom().items(1, 2, 3)
.onCancellation().invoke(() -> invokeCalled.set(true))
.onCancellation().invoke(() -> callCalled.set(true));

AssertSubscriber<Integer> subscriber = multi.subscribe().withSubscriber(AssertSubscriber.create());
subscriber.cancel();
subscriber.assertNotTerminated();
assertThat(invokeCalled.get()).isTrue();
assertThat(callCalled.get()).isTrue();
}

@Test
public void testCancellationNotPropagatedAfterCompletion() {
AtomicBoolean invokeCalled = new AtomicBoolean();
AtomicBoolean callCalled = new AtomicBoolean();

Multi<Integer> multi = Multi.createFrom().items(1, 2, 3)
.onCancellation().invoke(() -> invokeCalled.set(true))
.onCancellation().invoke(() -> callCalled.set(true));

AssertSubscriber<Integer> subscriber = multi.subscribe().withSubscriber(AssertSubscriber.create(69L));
subscriber.assertCompleted().assertItems(1, 2, 3);
subscriber.cancel();
assertThat(invokeCalled.get()).isFalse();
assertThat(callCalled.get()).isFalse();
}
}

0 comments on commit 3ef50de

Please sign in to comment.