Skip to content

Commit

Permalink
Merge pull request #831 from jponge/feature/uni-join-no-concurrency-l…
Browse files Browse the repository at this point in the history
…imit-on-first-to-terminate

Avoid `Uni.join().first(unis).toTerminate()` with a concurrency limit
  • Loading branch information
jponge authored Jan 30, 2022
2 parents 26e1499 + f7db080 commit 34fd96d
Show file tree
Hide file tree
Showing 4 changed files with 4 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -187,14 +187,6 @@ public final <T> JoinFirstStrategy<T> first(List<Uni<T>> unis) {
*/
public interface JoinFirstStrategyTerminal<T> {

/**
* Forward the value or failure from the first {@link Uni} to terminate.
*
* @return a new {@link Uni}
*/
@CheckReturnValue
Uni<T> toTerminate();

/**
* Forward the value from the first {@link Uni} to terminate with a value.
* <p>
Expand Down Expand Up @@ -239,7 +231,9 @@ public JoinFirstStrategyTerminal<T> usingConcurrencyOf(int limit) {
}

/**
* {@inheritDoc}
* Forward the value or failure from the first {@link Uni} to terminate.
*
* @return a new {@link Uni}
*/
@CheckReturnValue
public Uni<T> toTerminate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import io.smallrye.mutiny.CompositeException;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.operators.AbstractUni;
import io.smallrye.mutiny.subscription.UniSubscriber;
import io.smallrye.mutiny.subscription.UniSubscription;
Expand Down Expand Up @@ -136,16 +135,12 @@ private void onFailure(Throwable failure) {
if (!cancelled.get()) {
failures.add(failure);
forwardSignalWhenCompleteOrSubscribeNext();
} else {
Infrastructure.handleDroppedException(failure);
}
break;
case FAIL_FAST:
if (cancelled.compareAndSet(false, true)) {
cancelSubscriptions();
subscriber.onFailure(failure);
} else {
Infrastructure.handleDroppedException(failure);
}
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import io.smallrye.mutiny.CompositeException;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.operators.AbstractUni;
import io.smallrye.mutiny.subscription.UniSubscriber;
import io.smallrye.mutiny.subscription.UniSubscription;
Expand All @@ -29,6 +28,7 @@ public UniJoinFirst(List<Uni<T>> unis, Mode mode, int concurrency) {
this.unis = unis;
this.mode = mode;
this.concurrency = concurrency;
assert (mode == Mode.FIRST_WITH_ITEM || (mode == Mode.FIRST_TO_EMIT && concurrency == -1)); // Invariant enforced by the caller DSL
}

@Override
Expand Down Expand Up @@ -118,25 +118,19 @@ private void onFailure(Throwable failure) {
if (cancelled.compareAndSet(false, true)) {
cancelSubscriptions();
subscriber.onFailure(failure);
} else {
Infrastructure.handleDroppedException(failure);
}
break;
case FIRST_WITH_ITEM:
failures.add(failure);
if (failures.size() == unis.size()) {
if (cancelled.compareAndSet(false, true)) {
subscriber.onFailure(new CompositeException(failures));
} else {
Infrastructure.handleDroppedException(failure);
}
} else if (concurrency != -1) {
int nextIndex = nextSubscriptionIndex.incrementAndGet();
if (nextIndex < unis.size()) {
trySubscribe(nextIndex);
}
} else {
Infrastructure.handleDroppedException(failure);
}
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,22 +469,6 @@ void joinAllItemsAndCollectSmokeTest() {
sub.assertCompleted().assertItem(Arrays.asList(1, 2, 3));
}

@Test
void joinFirstWithItemSmokeTest() {
Uni<Integer> a = Uni.createFrom().failure(new IOException("boom"));
Uni<Integer> b = Uni.createFrom().failure(new IOException("bam"));
Uni<Integer> c = Uni.createFrom().item(3);

Uni<Integer> uni = Uni.join().first(a, b, c).usingConcurrencyOf(1).withItem();

UniAssertSubscriber<Integer> sub = uni.subscribe().withSubscriber(UniAssertSubscriber.create());
sub.assertCompleted().assertItem(3);

uni = Uni.join().first(a, b, c).usingConcurrencyOf(1).toTerminate();
sub = uni.subscribe().withSubscriber(UniAssertSubscriber.create());
sub.assertFailedWith(IOException.class, "boom");
}

@ParameterizedTest(name = "poolSize={0}, delays={1}, limit={2}, minTime={3}")
@CsvSource(value = {
"4, 100, 1, 400",
Expand Down Expand Up @@ -589,34 +573,6 @@ void joinFirstWithItemCheckConcurrency(int poolSize, int delays, int limit, int
pool.shutdownNow();
}

@ParameterizedTest(name = "poolSize={0}, delays={1}, limit={2}, minTime={3}, shallFail={4}")
@CsvSource(value = {
"4, 100, 1, 200, true",
"4, 100, 16, 100, false"
})
void joinFirstToSignalCheckConcurrency(int poolSize, int delays, int limit, int minTime, boolean shallFail) {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(poolSize);

Uni<String> a = Uni.createFrom().future(() -> pool.schedule(() -> {
throw new RuntimeException("boom");
}, delays * 2L, TimeUnit.MILLISECONDS));
Uni<String> b = Uni.createFrom().future(() -> pool.schedule(() -> "b", delays, TimeUnit.MILLISECONDS));

Uni<String> uni = Uni.join().first(a, b).usingConcurrencyOf(limit).toTerminate();

long start = System.currentTimeMillis();
UniAssertSubscriber<String> sub = uni.subscribe().withSubscriber(UniAssertSubscriber.create());

if (shallFail) {
sub.awaitFailure().assertFailed();
} else {
sub.awaitItem().assertCompleted().assertItem("b");
}
assertThat(System.currentTimeMillis() - start).isGreaterThanOrEqualTo(minTime);

pool.shutdownNow();
}

@Test
void joinAllShallNotSubscribeWhenCancelled() {
AtomicBoolean probe1 = new AtomicBoolean();
Expand Down

0 comments on commit 34fd96d

Please sign in to comment.