Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit Uni.join().{all,first}() concurrency #825

Merged
merged 6 commits into from
Jan 26, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions implementation/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
Expand Down
119 changes: 98 additions & 21 deletions implementation/src/main/java/io/smallrye/mutiny/groups/UniJoin.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.smallrye.common.annotation.Experimental;
import io.smallrye.mutiny.CompositeException;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.operators.uni.builders.UniJoinAll;
import io.smallrye.mutiny.operators.uni.builders.UniJoinFirst;
Expand Down Expand Up @@ -68,39 +69,77 @@ public final <T> JoinAllStrategy<T> all(List<Uni<T>> unis) {
return new JoinAllStrategy<>(unis);
}

/**
* Terminal interface for {@link UniJoin#all(List)}
*
* @param <T> the type of the {@link Uni} values
*/
public interface JoinAllStrategyTerminal<T> {

/**
* Wait for all {@link Uni} references to terminate, and collect all failures in a
* {@link io.smallrye.mutiny.CompositeException}.
*
* @return a new {@link Uni}
*/
@CheckReturnValue
Uni<List<T>> andCollectFailures();

/**
* Immediately forward the first failure from any of the {@link Uni}, and cancel the remaining {@link Uni}
* subscriptions, ignoring eventual subsequent failures.
*
* @return a new {@link Uni}
*/
@CheckReturnValue
Uni<List<T>> andFailFast();
}

/**
* Defines how to deal with failures while joining {@link Uni} references with {@link UniJoin#all(List)}.
*
* @param <T> the type of the {@link Uni} values
*/
public static class JoinAllStrategy<T> {
public static class JoinAllStrategy<T> implements JoinAllStrategyTerminal<T> {

private final List<Uni<T>> unis;
private int concurrency = -1;

private JoinAllStrategy(List<Uni<T>> unis) {
this.unis = unis;
}

/**
* Wait for all {@link Uni} references to terminate, and collect all failures in a
* {@link io.smallrye.mutiny.CompositeException}.
* Limit the number of concurrent upstream subscriptions.
* <p>
* When not specified all upstream {@link Uni} are being subscribed when the joining {@link Uni} is subscribed.
* <p>
* Setting a limit is useful when you have a large number of {@link Uni} to join and their simultaneous
* subscriptions might overwhelm resources (e.g., database connections, etc).
*
* @return a new {@link Uni}
* @param limit the concurrency limit, must be strictly positive
* @return an object to conclude the join strategy
*/
@CheckReturnValue
public JoinAllStrategyTerminal<T> usingConcurrencyOf(int limit) {
this.concurrency = ParameterValidation.positive(limit, "limit");
return this;
}

/**
* {@inheritDoc}
*/
@CheckReturnValue
public Uni<List<T>> andCollectFailures() {
return Infrastructure.onUniCreation(new UniJoinAll<>(unis, UniJoinAll.Mode.COLLECT_FAILURES));
return Infrastructure.onUniCreation(new UniJoinAll<>(unis, UniJoinAll.Mode.COLLECT_FAILURES, concurrency));
}

/**
* Immediately forward the first failure from any of the {@link Uni}, and cancel the remaining {@link Uni}
* subscriptions, ignoring eventual subsequent failures.
*
* @return a new {@link Uni}
* {@inheritDoc}
*/
@CheckReturnValue
public Uni<List<T>> andFailFast() {
return Infrastructure.onUniCreation(new UniJoinAll<>(unis, UniJoinAll.Mode.FAIL_FAST));
return Infrastructure.onUniCreation(new UniJoinAll<>(unis, UniJoinAll.Mode.FAIL_FAST, concurrency));
}
}

Expand Down Expand Up @@ -141,40 +180,78 @@ public final <T> JoinFirstStrategy<T> first(List<Uni<T>> unis) {
return new JoinFirstStrategy<>(unis);
}

/**
* Terminal interface for {@link UniJoin#first(List)}
*
* @param <T> the type of the {@link Uni} values
*/
public interface JoinFirstStrategyTerminal<T> {

/**
* Forward the value or failure from the first {@link Uni} to terminate.
*
* @return a new {@link Uni}
*/
@CheckReturnValue
Uni<T> toTerminate();

/**
* Forward the value from the first {@link Uni} to terminate with a value.
* <p>
* When all {@link Uni} references fail then failures are collected into a {@link CompositeException},
* which is then forwarded by the returned {@link Uni}.
*
* @return a new {@link Uni}
*/
@CheckReturnValue
Uni<T> withItem();
}

/**
* Defines how to deal with failures while joining {@link Uni} references with {@link UniJoin#first(List)}}.
*
* @param <T> the type of the {@link Uni} values
*/
public static class JoinFirstStrategy<T> {
public static class JoinFirstStrategy<T> implements JoinFirstStrategyTerminal<T> {

private final List<Uni<T>> unis;
private int concurrency = -1;

private JoinFirstStrategy(List<Uni<T>> unis) {
this.unis = unis;
}

/**
* Forward the value or failure from the first {@link Uni} to terminate.
* Limit the number of concurrent upstream subscriptions.
* <p>
* When not specified all upstream {@link Uni} are being subscribed when the joining {@link Uni} is subscribed.
* <p>
* Setting a limit is useful when you have a large number of {@link Uni} to join and their simultaneous
* subscriptions might overwhelm resources (e.g., database connections, etc).
*
* @return a new {@link Uni}
* @param limit the concurrency limit, must be strictly positive
* @return an object to conclude the join strategy
*/
@CheckReturnValue
public JoinFirstStrategyTerminal<T> usingConcurrencyOf(int limit) {
this.concurrency = ParameterValidation.positive(limit, "limit");
return this;
}

/**
* {@inheritDoc}
*/
@CheckReturnValue
public Uni<T> toTerminate() {
return Infrastructure.onUniCreation(new UniJoinFirst<>(unis, UniJoinFirst.Mode.FIRST_TO_EMIT));
return Infrastructure.onUniCreation(new UniJoinFirst<>(unis, UniJoinFirst.Mode.FIRST_TO_EMIT, concurrency));
}

/**
* Forward the value from the first {@link Uni} to terminate with a value.
* <p>
* When all {@link Uni} references fail then failures are collected into a {@link CompositeException},
* which is then forwarded by the returned {@link Uni}.
*
* @return a new {@link Uni}
* {@inheritDoc}
*/
@CheckReturnValue
public Uni<T> withItem() {
return Infrastructure.onUniCreation(new UniJoinFirst<>(unis, UniJoinFirst.Mode.FIRST_WITH_ITEM));
return Infrastructure.onUniCreation(new UniJoinFirst<>(unis, UniJoinFirst.Mode.FIRST_WITH_ITEM, concurrency));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ public enum Mode {

private final List<Uni<T>> unis;
private final Mode mode;
private final int concurrency;

public UniJoinAll(List<Uni<T>> unis, Mode mode) {
public UniJoinAll(List<Uni<T>> unis, Mode mode, int concurrency) {
this.unis = unis;
this.mode = mode;
this.concurrency = concurrency;
}

@Override
Expand All @@ -45,22 +47,33 @@ private class UniJoinAllSubscription implements UniSubscription {

private final AtomicReferenceArray<T> items = new AtomicReferenceArray<>(unis.size());
private final List<Throwable> failures = Collections.synchronizedList(new ArrayList<>());
private final AtomicInteger signalsCount = new AtomicInteger();
private final AtomicInteger completionSignalsCount = new AtomicInteger();

private AtomicInteger nextSubscriptionIndex;

public UniJoinAllSubscription(UniSubscriber<? super List<T>> subscriber) {
this.subscriber = subscriber;
}

public void triggerSubscriptions() {
for (int i = 0; i < unis.size() && !cancelled.get(); i++) {
int index = i;
Uni<? extends T> uni = unis.get(i);
uni.onSubscription()
.invoke(subscription -> subscriptions.set(index, subscription))
.subscribe().with(subscriber.context(), item -> this.onItem(index, item), this::onFailure);
int limit;
if (concurrency != -1) {
limit = Math.min(concurrency, unis.size());
nextSubscriptionIndex = new AtomicInteger(concurrency - 1);
} else {
limit = unis.size();
}
jponge marked this conversation as resolved.
Show resolved Hide resolved
for (int index = 0; index < limit && !cancelled.get(); index++) {
performSubscription(index, unis.get(index));
}
}

private void performSubscription(int index, Uni<? extends T> uni) {
uni.onSubscription()
.invoke(subscription -> this.onSubscribe(index, subscription))
.subscribe().with(subscriber.context(), item -> this.onItem(index, item), this::onFailure);
}

@Override
public void cancel() {
cancelled.set(true);
Expand All @@ -76,15 +89,23 @@ private void cancelSubscriptions() {
}
}

private void onSubscribe(int index, UniSubscription subscription) {
if (!cancelled.get()) {
subscriptions.set(index, subscription);
} else {
subscription.cancel();
}
}

private void onItem(int index, T item) {
if (!cancelled.get()) {
items.set(index, item);
forwardSignalWhenComplete();
forwardSignalWhenCompleteOrSubscribeNext();
}
}

private void forwardSignalWhenComplete() {
if (signalsCount.incrementAndGet() == unis.size()) {
private void forwardSignalWhenCompleteOrSubscribeNext() {
if (completionSignalsCount.incrementAndGet() == unis.size()) {
cancelled.set(true);
if (failures.isEmpty()) {
ArrayList<T> result = new ArrayList<>(unis.size());
Expand All @@ -95,6 +116,11 @@ private void forwardSignalWhenComplete() {
} else {
subscriber.onFailure(new CompositeException(failures));
}
} else if (concurrency != -1 && !cancelled.get()) {
int nextIndex = nextSubscriptionIndex.incrementAndGet();
cescoffier marked this conversation as resolved.
Show resolved Hide resolved
if (nextIndex < unis.size()) {
performSubscription(nextIndex, unis.get(nextIndex));
}
}
}

Expand All @@ -103,7 +129,7 @@ private void onFailure(Throwable failure) {
case COLLECT_FAILURES:
if (!cancelled.get()) {
failures.add(failure);
forwardSignalWhenComplete();
forwardSignalWhenCompleteOrSubscribeNext();
} else {
Infrastructure.handleDroppedException(failure);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;

import io.smallrye.mutiny.CompositeException;
Expand All @@ -22,10 +23,12 @@ public enum Mode {

private final List<Uni<T>> unis;
private final Mode mode;
private final int concurrency;

public UniJoinFirst(List<Uni<T>> unis, Mode mode) {
public UniJoinFirst(List<Uni<T>> unis, Mode mode, int concurrency) {
this.unis = unis;
this.mode = mode;
this.concurrency = concurrency;
}

@Override
Expand All @@ -42,17 +45,37 @@ private class UniJoinFirstSubscription implements UniSubscription {
private final AtomicReferenceArray<UniSubscription> subscriptions = new AtomicReferenceArray<>(unis.size());
private final AtomicBoolean cancelled = new AtomicBoolean();

private AtomicInteger nextSubscriptionIndex;

public UniJoinFirstSubscription(UniSubscriber<? super T> subscriber) {
this.subscriber = subscriber;
}

public void triggerSubscriptions() {
for (int i = 0; i < unis.size() && !cancelled.get(); i++) {
int index = i;
Uni<? extends T> uni = unis.get(i);
uni.onSubscription()
.invoke(subscription -> subscriptions.set(index, subscription))
.subscribe().with(subscriber.context(), this::onItem, this::onFailure);
int limit;
if (concurrency != -1) {
limit = Math.min(concurrency, unis.size());
nextSubscriptionIndex = new AtomicInteger(concurrency - 1);
} else {
limit = unis.size();
}
for (int i = 0; i < limit && !cancelled.get(); i++) {
performSubscription(i);
}
}

private void performSubscription(int index) {
Uni<? extends T> uni = unis.get(index);
uni.onSubscription()
.invoke(subscription -> this.onSubscribe(index, subscription))
.subscribe().with(subscriber.context(), this::onItem, this::onFailure);
}

private void onSubscribe(int index, UniSubscription subscription) {
if (!cancelled.get()) {
subscriptions.set(index, subscription);
} else {
subscription.cancel();
}
}

Expand Down Expand Up @@ -96,6 +119,11 @@ private void onFailure(Throwable failure) {
if (failures.size() == unis.size()) {
cancelled.set(true);
subscriber.onFailure(new CompositeException(failures));
} else if (concurrency != -1) {
int nextIndex = nextSubscriptionIndex.incrementAndGet();
if (nextIndex < unis.size()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment here - we need to check for cancellation before doing the subscription.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jponge it needs to check for cancellation too.

performSubscription(nextIndex);
}
}
} else {
Infrastructure.handleDroppedException(failure);
Expand Down
Loading