diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/test/AssertSubscriber.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/test/AssertSubscriber.java index 0783c7ace..4c75165ac 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/helpers/test/AssertSubscriber.java +++ b/implementation/src/main/java/io/smallrye/mutiny/helpers/test/AssertSubscriber.java @@ -4,11 +4,10 @@ import java.time.Duration; import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.NoSuchElementException; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -60,7 +59,6 @@ public class AssertSubscriber implements Subscriber { private final AtomicReference failure = new AtomicReference<>(); /** - * 1 * Whether the multi completed successfully. */ private final AtomicBoolean completed = new AtomicBoolean(); @@ -311,25 +309,50 @@ public AssertSubscriber awaitNextItems(int number, Duration duration) { "Expecting a next items, but a failure event has already being received: " + getFailure()); } } - try { - awaitNextItemEvents(number, duration); - } catch (ItemTimeoutException e) { - if (hasCompleted()) { - throw new AssertionError("Expected " + number + " item event(s) in the last " + duration.toMillis() - + " ms, but only " + e.received - + " item event(s) have been received, however the completion event has been received."); - } - if (getFailure() != null) { - throw new AssertionError("Expected an item event in the last " + duration.toMillis() + " ms, but only " - + e.received + " item event(s) have been received, however a failure event has been received: " - + getFailure()); - } + awaitNextItemEvents(number, duration); + + return this; + } + + /** + * Awaits for the subscriber to receive {@code number} items in total (including the ones received after calling + * this method). + * If not enough items have been received before the default timeout, an {@link AssertionError} is thrown. + * + * @param number the number of item to expect, must not be 0 or negative. + * @return this {@link AssertSubscriber} + */ + public AssertSubscriber awaitItems(int number) { + return awaitItems(number, DEFAULT_TIMEOUT); + } + + /** + * Awaits for the subscriber to receive {@code number} items in total (including the ones received after calling + * this method). + * If not enough items have been received before the given timeout, an {@link AssertionError} is thrown. + * + * @param number the number of item to expect, must not be 0 or negative. + * @param duration the timeout, must not be {@code null} + * @return this {@link AssertSubscriber} + */ + public AssertSubscriber awaitItems(int number, Duration duration) { + if (items.size() > number) { throw new AssertionError( - "Expected an item event in the last " + duration.toMillis() + " ms, but only " + e.received - + " item event(s) have been received."); + "Expected the number of items to be " + number + ", but it's already " + items.size()); + } + + if (isCancelled() || hasCompleted() || getFailure() != null) { + if (items.size() != number) { + throw new AssertionError( + "Expected the number of items to be " + number + ", but received " + items.size() + + " and we received a terminal event already"); + } + return this; } + awaitItemEvents(number, duration); + return this; } @@ -370,11 +393,6 @@ public AssertSubscriber awaitCompletion(Duration duration) { throw new AssertionError("Expected a completion event but got a failure: " + throwable); } - if (isCancelled()) { - throw new AssertionError( - "Expected a completion event, but got a cancellation event instead"); - } - // We have been interrupted. return this; @@ -447,10 +465,6 @@ public AssertSubscriber awaitFailure(Consumer assertion, Duration } final Throwable throwable = failure.get(); - if (throwable == null) { - throw new AssertionError("Expected a failure event, but didn't get a terminal event"); - } - try { assertion.accept(throwable); return this; @@ -505,29 +519,66 @@ private void awaitEvent(CountDownLatch latch, Duration duration) throws TimeoutE } } - private final List eventListeners = new CopyOnWriteArrayList<>(); + private final List eventListeners = new CopyOnWriteArrayList<>(); - private void awaitNextItemEvents(int number, Duration duration) throws ItemTimeoutException { - CountDownLatch latch = new CountDownLatch(number); - eventListeners.add(latch); + private void awaitNextItemEvents(int number, Duration duration) { + NextItemTask task = new NextItemTask<>(number, this); + int size = items.size(); try { - if (!latch.await(duration.toMillis(), TimeUnit.MILLISECONDS)) { - throw new ItemTimeoutException(number - (int) latch.getCount(), number); - } - } catch (InterruptedException ex) { + task.future().get(duration.toMillis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { Thread.currentThread().interrupt(); - } finally { - eventListeners.remove(latch); + } catch (ExecutionException e) { + // Terminal event received + int received = items.size() - size; + if (isCancelled()) { + throw new AssertionError( + "Expected " + number + " items, but received a cancellation event while waiting. Only " + received + + " item(s) have been received."); + } else if (hasCompleted()) { + throw new AssertionError( + "Expected " + number + " items, but received a completion event while waiting. Only " + received + + " item(s) have been received."); + } else { + throw new AssertionError( + "Expected " + number + " items, but received a failure event while waiting: " + getFailure() + ". Only " + + received + " item(s) have been received."); + } + } catch (TimeoutException e) { + // Timeout + int received = items.size() - size; + throw new AssertionError( + "Expected " + number + " items in " + duration.toMillis() + " ms, but only received " + received + + " items."); } } - static class ItemTimeoutException extends TimeoutException { - public final int received; - public final int expected; - - public ItemTimeoutException(int received, int expected) { - this.received = received; - this.expected = expected; + private void awaitItemEvents(int expected, Duration duration) { + ItemTask task = new ItemTask<>(expected, this); + try { + task.future().get(duration.toMillis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + // Terminal event received + if (isCancelled()) { + throw new AssertionError( + "Expected " + expected + " items, but received a cancellation event while waiting. Only " + items.size() + + " items have been received."); + } else if (hasCompleted()) { + throw new AssertionError( + "Expected " + expected + " items, but received a completion event while waiting. Only " + items.size() + + " items have been received."); + } else { + throw new AssertionError( + "Expected " + expected + " items, but received a failure event while waiting: " + getFailure() + + ". Only " + items.size() + " items have been received."); + } + } catch (TimeoutException e) { + // Timeout + throw new AssertionError( + "Expected " + expected + " items in " + duration.toMillis() + " ms, but only received " + + items.size() + " items."); } } @@ -564,6 +615,8 @@ public AssertSubscriber cancel() { shouldBeSubscribed(numberOfSubscription); subscription.get().cancel(); cancelled = true; + Event ev = new Event(null, null, false, true); + eventListeners.forEach(l -> l.accept(ev)); return this; } @@ -581,16 +634,6 @@ public AssertSubscriber request(long req) { return this; } - private void fire() { - for (CountDownLatch latch : eventListeners) { - try { - latch.countDown(); - } catch (Exception e) { - // Ignored cancellation and other exceptions. - } - } - } - @Override public void onSubscribe(Subscription s) { numberOfSubscription++; @@ -611,19 +654,24 @@ public void onSubscribe(Subscription s) { @Override public synchronized void onNext(T t) { items.add(t); - fire(); + Event ev = new Event(t, null, false, false); + eventListeners.forEach(l -> l.accept(ev)); } @Override public void onError(Throwable t) { failure.set(t); terminal.countDown(); + Event ev = new Event(null, t, false, false); + eventListeners.forEach(l -> l.accept(ev)); } @Override public void onComplete() { completed.set(true); terminal.countDown(); + Event ev = new Event(null, null, true, false); + eventListeners.forEach(l -> l.accept(ev)); } /** @@ -678,4 +726,107 @@ public boolean isCancelled() { public boolean hasCompleted() { return completed.get(); } + + private void registerListener(EventListener listener) { + eventListeners.add(listener); + } + + private void unregisterListener(EventListener listener) { + eventListeners.remove(listener); + } + + private interface EventListener extends Consumer { + } + + private static class Event { + + private final Object item; + private final Throwable failure; + private final boolean completion; + private final boolean cancellation; + + private Event(Object item, Throwable failure, boolean completion, boolean cancellation) { + this.item = item; + this.failure = failure; + this.completion = completion; + this.cancellation = cancellation; + } + + public boolean isItem() { + return item != null; + } + + public boolean isCancellation() { + return cancellation; + } + + public boolean isFailure() { + return failure != null; + } + + public boolean isCompletion() { + return completion; + } + } + + private static class NextItemTask { + + private final int expected; + private final AssertSubscriber subscriber; + + public NextItemTask(int expected, AssertSubscriber subscriber) { + this.expected = expected; + this.subscriber = subscriber; + } + + public CompletableFuture future() { + CompletableFuture future = new CompletableFuture<>(); + AtomicInteger count = new AtomicInteger(this.expected); + + EventListener listener = event -> { + if (event.isItem()) { + if (count.decrementAndGet() == 0) { + future.complete(null); + } + } else if (event.isCancellation() || event.isFailure() || event.isCompletion()) { + future.completeExceptionally( + new NoSuchElementException("Received a terminal event while waiting for items")); + } + // Else wait for timeout or next event. + }; + subscriber.registerListener(listener); + return future + .whenComplete((x, f) -> subscriber.unregisterListener(listener)); + } + } + + private static class ItemTask { + + private final int expected; + private final AssertSubscriber subscriber; + + public ItemTask(int expected, AssertSubscriber subscriber) { + this.expected = expected; + this.subscriber = subscriber; + } + + public CompletableFuture future() { + CompletableFuture future = new CompletableFuture<>(); + EventListener listener = event -> { + if (event.isItem()) { + if (subscriber.items.size() >= expected) { + future.complete(null); + } + } else if (event.isCancellation() || event.isFailure() || event.isCompletion()) { + future.completeExceptionally( + new NoSuchElementException("Received a terminal event while waiting for items")); + } + // Else wait for timeout or next event. + }; + subscriber.registerListener(listener); + return future + .whenComplete((x, f) -> subscriber.unregisterListener(listener)); + } + } + } diff --git a/implementation/src/test/java/io/smallrye/mutiny/helpers/test/AssertSubscriberTest.java b/implementation/src/test/java/io/smallrye/mutiny/helpers/test/AssertSubscriberTest.java index 25aec0404..d3649c251 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/helpers/test/AssertSubscriberTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/helpers/test/AssertSubscriberTest.java @@ -366,6 +366,12 @@ public void testAwaitCompletion() { assertThatThrownBy(() -> tmp.awaitCompletion(SMALL)) .isInstanceOf(AssertionError.class) .hasMessageContaining("completion").hasMessageContaining(SMALL.toMillis() + " ms"); + + // Failure instead of completion + assertThatThrownBy(() -> Multi.createFrom(). failure(new TestException()) + .onFailure().call(() -> smallDelay) + .subscribe().withSubscriber(AssertSubscriber.create(1)).awaitCompletion()).isInstanceOf(AssertionError.class) + .hasMessageContaining("failure"); } @Test @@ -407,6 +413,11 @@ public void testAwaitFailure() { AssertSubscriber tmp2 = subscriber; assertThatThrownBy(() -> tmp2.awaitFailure(SMALL)).isInstanceOf(AssertionError.class) .hasMessageContaining("failure").hasMessageContaining(SMALL.toMillis() + " ms"); + + // Completion instead of failure + assertThatThrownBy(() -> Multi.createFrom().items(1, 2) + .subscribe().withSubscriber(AssertSubscriber.create(2)) + .awaitFailure()).isInstanceOf(AssertionError.class).hasMessageContaining("completion"); } @Test @@ -423,13 +434,13 @@ public void testAwaitItem() { assertThatThrownBy(() -> Multi.createFrom().empty() .subscribe().withSubscriber(AssertSubscriber.create(1)) .awaitNextItem()).isInstanceOf(AssertionError.class) - .hasMessageContaining("completion").hasMessageContaining("event"); + .hasMessageContaining("completion").hasMessageContaining("item"); // Already failed assertThatThrownBy(() -> Multi.createFrom().failure(new TestException()) .subscribe().withSubscriber(AssertSubscriber.create(1)) .awaitNextItem()).isInstanceOf(AssertionError.class) - .hasMessageContaining("failure").hasMessageContaining("event") + .hasMessageContaining("failure").hasMessageContaining("item") .hasMessageContaining(TestException.class.getName()); // Completion instead of item @@ -437,16 +448,14 @@ public void testAwaitItem() { .onCompletion().call(() -> smallDelay) .subscribe().withSubscriber(AssertSubscriber.create(1)) .awaitNextItem(SMALL.multipliedBy(2))).isInstanceOf(AssertionError.class) - .hasMessageContaining("completion").hasMessageContaining("item") - .hasMessageContaining(SMALL.multipliedBy(2).toMillis() + " ms"); + .hasMessageContaining("completion").hasMessageContaining("item"); // Failure instead of item assertThatThrownBy(() -> Multi.createFrom().failure(new TestException()) .onFailure().call(() -> smallDelay) .subscribe().withSubscriber(AssertSubscriber.create(1)) .awaitNextItem(SMALL.multipliedBy(2))).isInstanceOf(AssertionError.class) - .hasMessageContaining("failure").hasMessageContaining("item") - .hasMessageContaining(SMALL.multipliedBy(2).toMillis() + " ms"); + .hasMessageContaining("failure").hasMessageContaining("item"); // Item Multi.createFrom().items(1) @@ -471,18 +480,18 @@ public void testAwaitItem() { } @Test - public void testAwaitItems() { + public void testAwaitNextItems() { // Already completed assertThatThrownBy(() -> Multi.createFrom().empty() .subscribe().withSubscriber(AssertSubscriber.create(1)) .awaitNextItems(2)).isInstanceOf(AssertionError.class) - .hasMessageContaining("completion").hasMessageContaining("event"); + .hasMessageContaining("completion").hasMessageContaining("item"); // Already failed assertThatThrownBy(() -> Multi.createFrom().failure(new TestException()) .subscribe().withSubscriber(AssertSubscriber.create(1)) .awaitNextItems(2)).isInstanceOf(AssertionError.class) - .hasMessageContaining("failure").hasMessageContaining("event") + .hasMessageContaining("failure").hasMessageContaining("item") .hasMessageContaining(TestException.class.getName()); // Completion instead of item @@ -490,16 +499,14 @@ public void testAwaitItems() { .onCompletion().call(() -> smallDelay) .subscribe().withSubscriber(AssertSubscriber.create(1)) .awaitNextItems(2, SMALL.multipliedBy(2))).isInstanceOf(AssertionError.class) - .hasMessageContaining("completion").hasMessageContaining("item") - .hasMessageContaining(SMALL.multipliedBy(2).toMillis() + " ms"); + .hasMessageContaining("completion").hasMessageContaining("item").hasMessageContaining("0"); // Failure instead of item assertThatThrownBy(() -> Multi.createFrom().emitter(e -> e.emit(1).fail(new TestException())) .onFailure().call(() -> smallDelay) .subscribe().withSubscriber(AssertSubscriber.create(1)) .awaitNextItems(2, SMALL.multipliedBy(2))).isInstanceOf(AssertionError.class) - .hasMessageContaining("failure").hasMessageContaining("item") - .hasMessageContaining(SMALL.multipliedBy(2).toMillis() + " ms"); + .hasMessageContaining("failure").hasMessageContaining("item").hasMessageContaining("0"); // Item Multi.createFrom().items(1, 2, 3) @@ -526,11 +533,102 @@ public void testAwaitItems() { .hasMessageContaining(SMALL.toMillis() + " ms"); } + @Test + public void testAwaitItems() { + // Already completed + assertThatThrownBy(() -> Multi.createFrom().empty() + .subscribe().withSubscriber(AssertSubscriber.create(1)) + .awaitItems(2)).isInstanceOf(AssertionError.class) + .hasMessageContaining("terminal").hasMessageContaining("item"); + + // Already failed + assertThatThrownBy(() -> Multi.createFrom().failure(new TestException()) + .subscribe().withSubscriber(AssertSubscriber.create(1)) + .awaitItems(2)).isInstanceOf(AssertionError.class) + .hasMessageContaining("terminal").hasMessageContaining("item"); + + // Completion instead of item + assertThatThrownBy(() -> Multi.createFrom().emitter(e -> e.emit(1).complete()) + .onCompletion().call(() -> smallDelay) + .subscribe().withSubscriber(AssertSubscriber.create(1)) + .awaitItems(2, SMALL.multipliedBy(2))).isInstanceOf(AssertionError.class) + .hasMessageContaining("completion").hasMessageContaining("item").hasMessageContaining("1"); + + // Failure instead of item + assertThatThrownBy(() -> Multi.createFrom().emitter(e -> e.emit(1).fail(new TestException())) + .onFailure().call(() -> smallDelay) + .subscribe().withSubscriber(AssertSubscriber.create(1)) + .awaitItems(2, SMALL.multipliedBy(2))).isInstanceOf(AssertionError.class) + .hasMessageContaining("failure").hasMessageContaining("item").hasMessageContaining("1"); + + // Item + Multi.createFrom().items(1, 2, 3) + .onItem().call(() -> smallDelay) + .subscribe().withSubscriber(AssertSubscriber.create(3)) + .awaitItems(3, MEDIUM) + .assertItems(1, 2, 3) + .assertLastItem(3); + + // Timeout + assertThatThrownBy(() -> Multi.createFrom().emitter(e -> { + e.emit(1).emit(2); + try { + Thread.sleep(MEDIUM.toMillis()); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + e.emit(3); + }) + .runSubscriptionOn(Infrastructure.getDefaultExecutor()) + .subscribe().withSubscriber(AssertSubscriber.create(3)) + .awaitItems(3, SMALL)).isInstanceOf(AssertionError.class) + .hasMessageContaining("item") + .hasMessageContaining(SMALL.toMillis() + " ms"); + + // Have received more items than expected. + assertThatThrownBy(() -> Multi.createFrom().items(1, 2, 3) + .subscribe().withSubscriber(AssertSubscriber.create(3)) + .awaitItems(2, SMALL)).isInstanceOf(AssertionError.class) + .hasMessageContaining("item").hasMessageContaining("2").hasMessageContaining("3"); + + // Already Cancelled + assertThatThrownBy(() -> Multi.createFrom().items(1, 2, 3) + .subscribe().withSubscriber(new AssertSubscriber<>(1, true)) + .awaitItems(2, SMALL)).isInstanceOf(AssertionError.class) + .hasMessageContaining("item").hasMessageContaining("2").hasMessageContaining("0") + .hasMessageContaining("terminal"); + + // Cancellation while waiting. + assertThatThrownBy(() -> { + AssertSubscriber subscriber = Multi.createFrom().nothing() + .subscribe().withSubscriber(new AssertSubscriber<>(1, true)); + subscriber + .run(() -> new Thread(() -> { + try { + Thread.sleep(SMALL.toMillis()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + subscriber.cancel(); + }).start()) + .awaitItems(2, MEDIUM); + }).isInstanceOf(AssertionError.class) + .hasMessageContaining("item").hasMessageContaining("2").hasMessageContaining("0") + .hasMessageContaining("terminal"); + + // Already receive the right number + Multi.createFrom().items(1, 2, 3) + .subscribe().withSubscriber(AssertSubscriber.create(4)) + .awaitItems(3) + .cancel(); + } + @Test public void testAssertLast() { assertThatThrownBy( - () -> Multi.createFrom().empty().subscribe().withSubscriber(AssertSubscriber.create(1)).assertLastItem(1)) - .isInstanceOf(AssertionError.class); + () -> Multi.createFrom().empty().subscribe().withSubscriber(AssertSubscriber.create(1)) + .assertLastItem(1)) + .isInstanceOf(AssertionError.class); Multi.createFrom().items(1, 2, 3, 4) .subscribe().withSubscriber(AssertSubscriber.create(2)) diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiGroupTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiGroupTest.java index 5f4b57d12..05b5ca6b5 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiGroupTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiGroupTest.java @@ -16,6 +16,7 @@ import java.util.function.Consumer; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -869,7 +870,7 @@ public void testImmediateCancellationOnUpstream() { assertThat(subscriber.assertNotTerminated().isCancelled()).isTrue(); } - @Test + @RepeatedTest(10) public void testGroupByWithUpstreamFailure() { AtomicReference> emitter = new AtomicReference<>(); @@ -882,7 +883,7 @@ public void testGroupByWithUpstreamFailure() { .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); subscriber.assertSubscribed(); - subscriber.awaitNextItems(2); + await().until(() -> subscriber.getItems().size() == 2); AssertSubscriber s1 = subscriber.getItems().get(0).subscribe() .withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); s1.assertSubscribed();