Skip to content

Commit

Permalink
Refactor testing utilities to add awaitNextItems and awaitItems
Browse files Browse the repository at this point in the history
  • Loading branch information
cescoffier committed Mar 6, 2021
1 parent ab1acf8 commit d2045da
Show file tree
Hide file tree
Showing 3 changed files with 324 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@

import java.time.Duration;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.NoSuchElementException;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
Expand Down Expand Up @@ -60,7 +59,6 @@ public class AssertSubscriber<T> implements Subscriber<T> {
private final AtomicReference<Throwable> failure = new AtomicReference<>();

/**
* 1
* Whether the multi completed successfully.
*/
private final AtomicBoolean completed = new AtomicBoolean();
Expand Down Expand Up @@ -311,25 +309,50 @@ public AssertSubscriber<T> awaitNextItems(int number, Duration duration) {
"Expecting a next items, but a failure event has already being received: " + getFailure());
}
}
try {
awaitNextItemEvents(number, duration);
} catch (ItemTimeoutException e) {
if (hasCompleted()) {
throw new AssertionError("Expected " + number + " item event(s) in the last " + duration.toMillis()
+ " ms, but only " + e.received
+ " item event(s) have been received, however the completion event has been received.");
}
if (getFailure() != null) {
throw new AssertionError("Expected an item event in the last " + duration.toMillis() + " ms, but only "
+ e.received + " item event(s) have been received, however a failure event has been received: "
+ getFailure());
}

awaitNextItemEvents(number, duration);

return this;
}

/**
* Awaits for the subscriber to receive {@code number} items in total (including the ones received after calling
* this method).
* If not enough items have been received before the default timeout, an {@link AssertionError} is thrown.
*
* @param number the number of item to expect, must not be 0 or negative.
* @return this {@link AssertSubscriber}
*/
public AssertSubscriber<T> awaitItems(int number) {
return awaitItems(number, DEFAULT_TIMEOUT);
}

/**
* Awaits for the subscriber to receive {@code number} items in total (including the ones received after calling
* this method).
* If not enough items have been received before the given timeout, an {@link AssertionError} is thrown.
*
* @param number the number of item to expect, must not be 0 or negative.
* @param duration the timeout, must not be {@code null}
* @return this {@link AssertSubscriber}
*/
public AssertSubscriber<T> awaitItems(int number, Duration duration) {
if (items.size() > number) {
throw new AssertionError(
"Expected an item event in the last " + duration.toMillis() + " ms, but only " + e.received
+ " item event(s) have been received.");
"Expected the number of items to be " + number + ", but it's already " + items.size());
}

if (isCancelled() || hasCompleted() || getFailure() != null) {
if (items.size() != number) {
throw new AssertionError(
"Expected the number of items to be " + number + ", but received " + items.size()
+ " and we received a terminal event already");
}
return this;
}

awaitItemEvents(number, duration);

return this;
}

Expand Down Expand Up @@ -370,11 +393,6 @@ public AssertSubscriber<T> awaitCompletion(Duration duration) {
throw new AssertionError("Expected a completion event but got a failure: " + throwable);
}

if (isCancelled()) {
throw new AssertionError(
"Expected a completion event, but got a cancellation event instead");
}

// We have been interrupted.
return this;

Expand Down Expand Up @@ -447,10 +465,6 @@ public AssertSubscriber<T> awaitFailure(Consumer<Throwable> assertion, Duration
}

final Throwable throwable = failure.get();
if (throwable == null) {
throw new AssertionError("Expected a failure event, but didn't get a terminal event");
}

try {
assertion.accept(throwable);
return this;
Expand Down Expand Up @@ -505,29 +519,66 @@ private void awaitEvent(CountDownLatch latch, Duration duration) throws TimeoutE
}
}

private final List<CountDownLatch> eventListeners = new CopyOnWriteArrayList<>();
private final List<EventListener> eventListeners = new CopyOnWriteArrayList<>();

private void awaitNextItemEvents(int number, Duration duration) throws ItemTimeoutException {
CountDownLatch latch = new CountDownLatch(number);
eventListeners.add(latch);
private void awaitNextItemEvents(int number, Duration duration) {
NextItemTask<T> task = new NextItemTask<>(number, this);
int size = items.size();
try {
if (!latch.await(duration.toMillis(), TimeUnit.MILLISECONDS)) {
throw new ItemTimeoutException(number - (int) latch.getCount(), number);
}
} catch (InterruptedException ex) {
task.future().get(duration.toMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
eventListeners.remove(latch);
} catch (ExecutionException e) {
// Terminal event received
int received = items.size() - size;
if (isCancelled()) {
throw new AssertionError(
"Expected " + number + " items, but received a cancellation event while waiting. Only " + received
+ " item(s) have been received.");
} else if (hasCompleted()) {
throw new AssertionError(
"Expected " + number + " items, but received a completion event while waiting. Only " + received
+ " item(s) have been received.");
} else {
throw new AssertionError(
"Expected " + number + " items, but received a failure event while waiting: " + getFailure() + ". Only "
+ received + " item(s) have been received.");
}
} catch (TimeoutException e) {
// Timeout
int received = items.size() - size;
throw new AssertionError(
"Expected " + number + " items in " + duration.toMillis() + " ms, but only received " + received
+ " items.");
}
}

static class ItemTimeoutException extends TimeoutException {
public final int received;
public final int expected;

public ItemTimeoutException(int received, int expected) {
this.received = received;
this.expected = expected;
private void awaitItemEvents(int expected, Duration duration) {
ItemTask<T> task = new ItemTask<>(expected, this);
try {
task.future().get(duration.toMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
// Terminal event received
if (isCancelled()) {
throw new AssertionError(
"Expected " + expected + " items, but received a cancellation event while waiting. Only " + items.size()
+ " items have been received.");
} else if (hasCompleted()) {
throw new AssertionError(
"Expected " + expected + " items, but received a completion event while waiting. Only " + items.size()
+ " items have been received.");
} else {
throw new AssertionError(
"Expected " + expected + " items, but received a failure event while waiting: " + getFailure()
+ ". Only " + items.size() + " items have been received.");
}
} catch (TimeoutException e) {
// Timeout
throw new AssertionError(
"Expected " + expected + " items in " + duration.toMillis() + " ms, but only received "
+ items.size() + " items.");
}
}

Expand Down Expand Up @@ -564,6 +615,8 @@ public AssertSubscriber<T> cancel() {
shouldBeSubscribed(numberOfSubscription);
subscription.get().cancel();
cancelled = true;
Event ev = new Event(null, null, false, true);
eventListeners.forEach(l -> l.accept(ev));
return this;
}

Expand All @@ -581,16 +634,6 @@ public AssertSubscriber<T> request(long req) {
return this;
}

private void fire() {
for (CountDownLatch latch : eventListeners) {
try {
latch.countDown();
} catch (Exception e) {
// Ignored cancellation and other exceptions.
}
}
}

@Override
public void onSubscribe(Subscription s) {
numberOfSubscription++;
Expand All @@ -611,19 +654,24 @@ public void onSubscribe(Subscription s) {
@Override
public synchronized void onNext(T t) {
items.add(t);
fire();
Event ev = new Event(t, null, false, false);
eventListeners.forEach(l -> l.accept(ev));
}

@Override
public void onError(Throwable t) {
failure.set(t);
terminal.countDown();
Event ev = new Event(null, t, false, false);
eventListeners.forEach(l -> l.accept(ev));
}

@Override
public void onComplete() {
completed.set(true);
terminal.countDown();
Event ev = new Event(null, null, true, false);
eventListeners.forEach(l -> l.accept(ev));
}

/**
Expand Down Expand Up @@ -678,4 +726,107 @@ public boolean isCancelled() {
public boolean hasCompleted() {
return completed.get();
}

private void registerListener(EventListener listener) {
eventListeners.add(listener);
}

private void unregisterListener(EventListener listener) {
eventListeners.remove(listener);
}

private interface EventListener extends Consumer<Event> {
}

private static class Event {

private final Object item;
private final Throwable failure;
private final boolean completion;
private final boolean cancellation;

private Event(Object item, Throwable failure, boolean completion, boolean cancellation) {
this.item = item;
this.failure = failure;
this.completion = completion;
this.cancellation = cancellation;
}

public boolean isItem() {
return item != null;
}

public boolean isCancellation() {
return cancellation;
}

public boolean isFailure() {
return failure != null;
}

public boolean isCompletion() {
return completion;
}
}

private static class NextItemTask<T> {

private final int expected;
private final AssertSubscriber<T> subscriber;

public NextItemTask(int expected, AssertSubscriber<T> subscriber) {
this.expected = expected;
this.subscriber = subscriber;
}

public CompletableFuture<Void> future() {
CompletableFuture<Void> future = new CompletableFuture<>();
AtomicInteger count = new AtomicInteger(this.expected);

EventListener listener = event -> {
if (event.isItem()) {
if (count.decrementAndGet() == 0) {
future.complete(null);
}
} else if (event.isCancellation() || event.isFailure() || event.isCompletion()) {
future.completeExceptionally(
new NoSuchElementException("Received a terminal event while waiting for items"));
}
// Else wait for timeout or next event.
};
subscriber.registerListener(listener);
return future
.whenComplete((x, f) -> subscriber.unregisterListener(listener));
}
}

private static class ItemTask<T> {

private final int expected;
private final AssertSubscriber<T> subscriber;

public ItemTask(int expected, AssertSubscriber<T> subscriber) {
this.expected = expected;
this.subscriber = subscriber;
}

public CompletableFuture<Void> future() {
CompletableFuture<Void> future = new CompletableFuture<>();
EventListener listener = event -> {
if (event.isItem()) {
if (subscriber.items.size() >= expected) {
future.complete(null);
}
} else if (event.isCancellation() || event.isFailure() || event.isCompletion()) {
future.completeExceptionally(
new NoSuchElementException("Received a terminal event while waiting for items"));
}
// Else wait for timeout or next event.
};
subscriber.registerListener(listener);
return future
.whenComplete((x, f) -> subscriber.unregisterListener(listener));
}
}

}
Loading

0 comments on commit d2045da

Please sign in to comment.