Skip to content

Commit

Permalink
Merge pull request #769 from smallrye/internal/fix-infra-test-flakyness
Browse files Browse the repository at this point in the history
Reduce tests flakiness, especially in CI / constrained environments
  • Loading branch information
jponge authored Dec 7, 2021
2 parents a62cb00 + 007b15d commit 6a72b91
Show file tree
Hide file tree
Showing 22 changed files with 87 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ public void testFlatMapRequestsWithEmissionOnExecutor() {
.request(1);

subscriber.awaitNextItem()
.request(2)
.awaitNextItems(2)
.assertItems("A", "B", "C");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.concurrent.ScheduledExecutorService;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

Expand All @@ -18,7 +19,12 @@

public class MultiCreateFromTimePeriodTest {

private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
private ScheduledExecutorService executor;

@BeforeEach
public void prepare() {
executor = Executors.newScheduledThreadPool(1);
}

@AfterEach
public void cleanup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void testWithRequest0() {

@Test
public void testWithShutdownExecutor() {
ExecutorService executor = Executors.newSingleThreadScheduledExecutor();
ExecutorService executor = Executors.newScheduledThreadPool(1);
executor.shutdownNow();

AssertSubscriber<Integer> subscriber = Multi.createFrom().items(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public void testDurationValidity() {

@Test
public void testFailingOnTimeoutWithShutdownExecutor() {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.shutdown();
AssertSubscriber<Object> subscriber = Multi.createFrom().nothing()
.ifNoItem().after(Duration.ofMillis(10)).on(executor).fail()
Expand All @@ -158,7 +158,7 @@ public void testFailingOnTimeoutWithShutdownExecutor() {

@Test
public void testFailingOnItemWithShutdownExecutor() {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
AssertSubscriber<Object> subscriber = Multi.createFrom().ticks().every(Duration.ofMillis(10))
.ifNoItem().after(Duration.ofMillis(20)).on(executor).fail()
.subscribe().withSubscriber(AssertSubscriber.create(10));
Expand All @@ -172,7 +172,7 @@ public void testFailingOnItemWithShutdownExecutor() {

@Test
public void testFailingOnTimeoutWithImmediateCancellation() {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
AssertSubscriber<Object> subscriber = Multi.createFrom().nothing()
.ifNoItem().after(Duration.ofMillis(10)).on(executor).fail()
.subscribe().withSubscriber(new AssertSubscriber<>(1, true));
Expand All @@ -183,7 +183,7 @@ public void testFailingOnTimeoutWithImmediateCancellation() {

@Test
public void testFailingOnTimeoutWithCancellation() {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
AssertSubscriber<Object> subscriber = Multi.createFrom().nothing()
.ifNoItem().after(Duration.ofMillis(1000)).on(executor).fail()
.subscribe().withSubscriber(new AssertSubscriber<>(1, false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void testCancellationBeforeActionCompletes() {
AtomicBoolean onFailureEntered = new AtomicBoolean();
AtomicBoolean cancelled = new AtomicBoolean();

ExecutorService executor = Executors.newSingleThreadExecutor();
ExecutorService executor = Executors.newFixedThreadPool(1);
AssertSubscriber<Integer> subscriber = failed
.emitOn(executor)
.onFailure().invoke(i -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.function.Function;
import java.util.function.Supplier;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import io.smallrye.mutiny.Multi;
Expand All @@ -20,8 +21,14 @@

public class MultiOnItemFailWithTest {

private final MultiOnCancellationSpy<Integer> items = Spy.onCancellation(Multi.createFrom().items(1, 2));
private final MultiOnCancellationSpy<Integer> nothing = Spy.onCancellation(Multi.createFrom().nothing());
private MultiOnCancellationSpy<Integer> items;
private MultiOnCancellationSpy<Integer> nothing;

@BeforeEach
public void prepare() {
items = Spy.onCancellation(Multi.createFrom().items(1, 2));
nothing = Spy.onCancellation(Multi.createFrom().nothing());
}

@Test
public void testWithNullSupplierOrFunction() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ public void testThatRunSubscriptionOnEmitRequestOnSubscribe() {

@Test
public void testRunSubscriptionOnShutdownExecutor() {
ExecutorService executor = Executors.newSingleThreadExecutor();
ExecutorService executor = Executors.newFixedThreadPool(1);
executor.shutdownNow();

AssertSubscriber<Integer> subscriber = Multi.createFrom().items(1, 2, 3)
Expand All @@ -301,7 +301,7 @@ public void testRunSubscriptionOnShutdownExecutor() {

@Test
public void testRunSubscriptionOnShutdownExecutorRequests() {
ExecutorService executor = Executors.newSingleThreadExecutor();
ExecutorService executor = Executors.newFixedThreadPool(1);

AssertSubscriber<Integer> subscriber = Multi.createFrom().items(1, 2, 3)
.runSubscriptionOn(executor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void testDurationValidity() {

@Test
public void testFailingOnTimeoutWithShutdownExecutor() {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.shutdown();
UniAssertSubscriber<Object> subscriber = Uni.createFrom().emitter(e -> {
// To nothing
Expand All @@ -146,7 +146,7 @@ public void testFailingOnTimeoutWithShutdownExecutor() {

@Test
public void testFailingOnTimeoutWithImmediateCancellation() {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
UniAssertSubscriber<Object> subscriber = Uni.createFrom().emitter(e -> {
// To nothing
})
Expand All @@ -159,7 +159,7 @@ public void testFailingOnTimeoutWithImmediateCancellation() {

@Test
public void testFailingOnTimeoutWithCancellation() {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
UniAssertSubscriber<Object> subscriber = Uni.createFrom().emitter(e -> {
// To nothing
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void testThatMapperCanNotReturnNull() {
@Test
public void testThatMapperIsCalledOnTheRightExecutor() {
UniAssertSubscriber<Integer> subscriber = new UniAssertSubscriber<>();
ExecutorService executor = Executors.newSingleThreadExecutor();
ExecutorService executor = Executors.newFixedThreadPool(1);
try {
AtomicReference<String> threadName = new AtomicReference<>();
failure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public void testWithTwoSubscribers() {
@Test
public void testThatCallbackIsCalledOnTheRightExecutorOnItem() {
UniAssertSubscriber<Integer> subscriber = new UniAssertSubscriber<>();
ExecutorService executor = Executors.newSingleThreadExecutor();
ExecutorService executor = Executors.newFixedThreadPool(1);
try {
AtomicReference<String> threadName = new AtomicReference<>();
one
Expand All @@ -189,7 +189,7 @@ public void testThatCallbackIsCalledOnTheRightExecutorOnItem() {
@Test
public void testThatCallbackIsCalledOnTheRightExecutorOnFailure() {
UniAssertSubscriber<Integer> subscriber = new UniAssertSubscriber<>();
ExecutorService executor = Executors.newSingleThreadExecutor();
ExecutorService executor = Executors.newFixedThreadPool(1);
try {
AtomicReference<String> threadName = new AtomicReference<>();
failed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void testThatMapperCanReturnNull() {
@Test
public void testThatMapperIsCalledOnTheRightExecutorOnItem() {
UniAssertSubscriber<Integer> subscriber = new UniAssertSubscriber<>();
ExecutorService executor = Executors.newSingleThreadExecutor();
ExecutorService executor = Executors.newFixedThreadPool(1);
try {
AtomicReference<String> threadName = new AtomicReference<>();
one
Expand All @@ -160,7 +160,7 @@ public void testThatMapperIsCalledOnTheRightExecutorOnItem() {
@Test
public void testThatMapperIsCalledOnTheRightExecutorOnFailure() {
UniAssertSubscriber<Integer> subscriber = new UniAssertSubscriber<>();
ExecutorService executor = Executors.newSingleThreadExecutor();
ExecutorService executor = Executors.newFixedThreadPool(1);
try {
AtomicReference<String> threadName = new AtomicReference<>();
failed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void testThatMapperIsCalledWithNull() {
@Test
public void testThatMapperIsCalledOnTheRightExecutor() {
UniAssertSubscriber<Integer> subscriber = new UniAssertSubscriber<>();
ExecutorService executor = Executors.newSingleThreadExecutor();
ExecutorService executor = Executors.newFixedThreadPool(1);
try {
AtomicReference<String> threadName = new AtomicReference<>();
Uni.createFrom().item(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.concurrent.ScheduledExecutorService;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;

Expand All @@ -19,7 +20,12 @@

public class UniOrTest {

private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(4);
private ScheduledExecutorService executor;

@BeforeEach
public void prepare() {
executor = Executors.newScheduledThreadPool(4);
}

@AfterEach
public void shutdown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void testWithWithImmediateValue() {

@Test
public void testWithTimeout() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
ExecutorService executorService = Executors.newFixedThreadPool(1);
UniAssertSubscriber<Integer> subscriber = UniAssertSubscriber.create();

Uni.createFrom().item(() -> {
Expand Down Expand Up @@ -110,7 +110,7 @@ public void testCancellation() {

@Test
public void testRejectedTask() {
ExecutorService pool = Executors.newSingleThreadExecutor();
ExecutorService pool = Executors.newFixedThreadPool(1);
pool.shutdown();
AtomicBoolean called = new AtomicBoolean();
UniAssertSubscriber<Integer> subscriber = Uni.createFrom().<Integer> emitter(e -> called.set(true))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void testCancellationWithImmediateValue() {

@Test
public void testCancellationWithAsyncValue() {
executor = Executors.newSingleThreadScheduledExecutor();
executor = Executors.newScheduledThreadPool(1);
AtomicInteger value = new AtomicInteger(-1);
CompletableFuture<Integer> future = Uni.createFrom().item(1)
.onItem().delayIt().onExecutor(executor).by(Duration.ofMillis(100))
Expand All @@ -121,7 +121,7 @@ public void testCancellationWithAsyncValue() {

@Test
public void testWithAsyncValue() {
executor = Executors.newSingleThreadScheduledExecutor();
executor = Executors.newScheduledThreadPool(1);
CompletableFuture<Integer> future = Uni.createFrom().item(1)
.emitOn(executor).subscribe().asCompletionStage();
await().until(future::isDone);
Expand All @@ -130,7 +130,7 @@ public void testWithAsyncValue() {

@Test
public void testWithAsyncVoidItem() {
executor = Executors.newSingleThreadScheduledExecutor();
executor = Executors.newScheduledThreadPool(1);
CompletableFuture<Void> future = Uni.createFrom().voidItem().emitOn(executor)
.subscribe().asCompletionStage();
await().until(future::isDone);
Expand All @@ -139,7 +139,7 @@ public void testWithAsyncVoidItem() {

@Test
public void testWithAsyncNullItem() {
executor = Executors.newSingleThreadScheduledExecutor();
executor = Executors.newScheduledThreadPool(1);
CompletableFuture<String> future = Uni.createFrom().<String> nullItem().emitOn(executor)
.subscribe().asCompletionStage();
await().until(future::isDone);
Expand All @@ -148,7 +148,7 @@ public void testWithAsyncNullItem() {

@Test
public void testWithAsyncFailure() {
executor = Executors.newSingleThreadScheduledExecutor();
executor = Executors.newScheduledThreadPool(1);
CompletableFuture<Integer> future = Uni.createFrom().<Integer> failure(new IOException("boom"))
.emitOn(executor).subscribe().asCompletionStage();
await().until(future::isDone);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public void testCancellationBetweenSubscriptionAndRequest() {
@Test
public void testCancellationBetweenRequestAndValue() {
// TODO This is a very broken implementation of "delay" - to be replace once delay is implemented
executor = Executors.newSingleThreadExecutor();
executor = Executors.newFixedThreadPool(1);
Publisher<Integer> publisher = Uni.createFrom().item(1).emitOn(executor).map(x -> {
try {
Thread.sleep(100);
Expand Down Expand Up @@ -183,7 +183,7 @@ public void testCancellationAfterValue() {

@Test
public void testWithAsyncValue() {
executor = Executors.newSingleThreadScheduledExecutor();
executor = Executors.newScheduledThreadPool(1);
Publisher<Integer> publisher = Uni.createFrom().item(1).emitOn(executor).convert().toPublisher();
assertThat(publisher).isNotNull();
int first = Flowable.fromPublisher(publisher).blockingFirst();
Expand All @@ -192,7 +192,7 @@ public void testWithAsyncValue() {

@Test
public void testWithAsyncNullValue() {
executor = Executors.newSingleThreadScheduledExecutor();
executor = Executors.newScheduledThreadPool(1);

Publisher<Integer> publisher = Uni.createFrom().item((Integer) null)
.emitOn(executor)
Expand All @@ -205,7 +205,7 @@ public void testWithAsyncNullValue() {
@SuppressWarnings("ResultOfMethodCallIgnored")
@Test
public void testWithAsyncFailure() {
executor = Executors.newSingleThreadScheduledExecutor();
executor = Executors.newScheduledThreadPool(1);
Publisher<Integer> publisher = Uni.createFrom().<Integer> failure(new IOException("boom"))
.emitOn(executor).convert().toPublisher();
assertThat(publisher).isNotNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.jupiter.api.RepeatedTest;
Expand All @@ -34,7 +35,7 @@ public void testTheProcessorCanGetOnlyOneSubscriber() {
}

@RepeatedTest(100)
public void testWithMultithreadedUpstream() {
public void testWithMultithreadedUpstream() throws InterruptedException {
UnicastProcessor<String> processor = UnicastProcessor.create();
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
Expand All @@ -50,15 +51,19 @@ public void testWithMultithreadedUpstream() {
AssertSubscriber<Object> subscriber = AssertSubscriber.create(Long.MAX_VALUE);
processor.subscribe(subscriber);

await().until(() -> subscriber.getItems().size() == 5 * 1000);
executor.shutdown();
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
throw new IllegalStateException("The executor shall have terminated");
}

processor.onComplete();

assertThat(subscriber.getItems().size()).isEqualTo(5 * 1000);
for (int i = 0; i < 5; i++) {
for (int j = 0; j < 1000; j++) {
assertThat(subscriber.getItems()).contains(i + "-" + j);
}
}

executor.shutdownNow();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ public void testOnCompleteOnErrorRace() {
Collections.shuffle(runnables);
runnables.forEach(r -> new Thread(r).start());

Awaitility.await().pollInSameThread().until(() -> subscriber.hasCompleted() || subscriber.getFailure() != null);
Awaitility.await().until(() -> subscriber.hasCompleted() || subscriber.getFailure() != null);
if (subscriber.hasCompleted()) {
subscriber.assertCompleted().assertHasNotReceivedAnyItem();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void testSubscription() {
reference.set(s);
}));

await().pollInSameThread().until(() -> reference.get() != null);
await().until(() -> reference.get() != null);
assertThat(reference).hasValue("hey");
}

Expand Down
Loading

0 comments on commit 6a72b91

Please sign in to comment.