Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConcatMap operator without prefetch #997

Merged
merged 1 commit into from
Jul 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion implementation/src/main/java/io/smallrye/mutiny/Multi.java
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ default Multi<T> invoke(Runnable callback) {
* produced {@link Multi}. The flatten process makes sure that the items are not interleaved.
* </ul>
* <p>
* This method is equivalent to {@code multi.onItem().transformToMulti(mapper).concatenate()}.
* This method is equivalent to {@code multi.onItem().transformToMulti(mapper).concatenate(true)}.
*
* @param mapper the {@link Function} producing {@link Publisher} / {@link Multi} for each items emitted by the
* upstream {@link Multi}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.queues.Queues;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.operators.multi.MultiConcatMapOp;
import io.smallrye.mutiny.operators.multi.MultiFlatMapOp;

/**
Expand Down Expand Up @@ -112,13 +113,41 @@ public Multi<O> merge(int concurrency) {
* (potentially a {@code Multi}). The mapper must not return {@code null}</li>
* <li>The items contained in each of the produced {@link Publisher} are then <strong>concatenated</strong> in the
* produced {@link Multi}. The returned object lets you configure the flattening process.</li>
* <li>If {@code prefetch} is set to {@code true}, {@code flatMap} operator is used with upstream prefetch of
* {@code requests} parameter configured previously.</li>
* <li>If {@code prefetch} is set to {@code false}, {@code concatMap} operator is used without prefetch,
* meaning that items are requested lazily from the upstream, one at a time.</li>
* </ul>
*
* @param prefetch whether the operator prefetches items from upstream, or not.
* @return the object to configure the {@code concatMap} operation.
*/
@CheckReturnValue
public Multi<O> concatenate(boolean prefetch) {
return Infrastructure
.onMultiCreation(prefetch ? new MultiFlatMapOp<>(upstream, mapper, collectFailureUntilCompletion, 1, requests)
: new MultiConcatMapOp<>(upstream, mapper, collectFailureUntilCompletion));
}

/**
* Produces a {@link Multi} containing the items from {@link Publisher} produced by the {@code mapper} for each
* item emitted by this {@link Multi}.
* <p>
* The operators behaves as follows:
* <ul>
* <li>for each item emitted by this {@link Multi}, the mapper is called and produces a {@link Publisher}
* (potentially a {@code Multi}). The mapper must not return {@code null}</li>
* <li>The items contained in each of the produced {@link Publisher} are then <strong>concatenated</strong> in the
* produced {@link Multi}. The returned object lets you configure the flattening process.</li>
* <li>This operator defaults to without prefetch, meaning that items are requested lazily from the upstream,
* one at a time.</li>
* </ul>
*
* @return the object to configure the {@code concatMap} operation.
*/
@CheckReturnValue
public Multi<O> concatenate() {
return Infrastructure.onMultiCreation(
new MultiFlatMapOp<>(upstream, mapper, collectFailureUntilCompletion, 1, requests));
return concatenate(false);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,317 @@
package io.smallrye.mutiny.operators.multi;

import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;

import io.smallrye.mutiny.Context;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.subscription.ContextSupport;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import io.smallrye.mutiny.subscription.SwitchableSubscriptionSubscriber;

/**
* ConcatMap operator without prefetching items from the upstream.
* Requests are forwarded lazily to the upstream when:
* <ul>
* <li>First downstream request.</li>
* <li>The inner has no more outstanding requests.</li>
* <li>The inner completed without emitting items or with outstanding requests.</li>
* </ul>
*
* This operator can collect failures and postpone them until termination.
*
* @param <I> the upstream value type / input type
* @param <O> the output value type / produced type
*/
public class MultiConcatMapOp<I, O> extends AbstractMultiOperator<I, O> {

private final Function<? super I, ? extends Publisher<? extends O>> mapper;

private final boolean postponeFailurePropagation;

public MultiConcatMapOp(Multi<? extends I> upstream,
Function<? super I, ? extends Publisher<? extends O>> mapper,
boolean postponeFailurePropagation) {
super(upstream);
this.mapper = mapper;
this.postponeFailurePropagation = postponeFailurePropagation;
}

@Override
public void subscribe(MultiSubscriber<? super O> subscriber) {
if (subscriber == null) {
throw new NullPointerException("The subscriber must not be `null`");
}
ConcatMapMainSubscriber<I, O> sub = new ConcatMapMainSubscriber<>(subscriber,
mapper,
postponeFailurePropagation);

upstream.subscribe(Infrastructure.onMultiSubscription(upstream, sub));
}

public static final class ConcatMapMainSubscriber<I, O> implements MultiSubscriber<I>, Subscription, ContextSupport {

private static final int STATE_NEW = 0; // no request yet -- send first upstream request at this state
private static final int STATE_READY = 1; // first upstream request done, ready to receive items
private static final int STATE_EMITTING = 2; // received item from the upstream, subscribed to the inner
private static final int STATE_OUTER_TERMINATED = 3; // outer terminated, waiting for the inner to terminate
private static final int STATE_TERMINATED = 4; // inner and outer terminated
private static final int STATE_CANCELLED = 5; // cancelled
final AtomicInteger state = new AtomicInteger(STATE_NEW);

final MultiSubscriber<? super O> downstream;
final Function<? super I, ? extends Publisher<? extends O>> mapper;
private final boolean delayError;

final AtomicReference<Throwable> failures = new AtomicReference<>();

volatile Subscription upstream = null;
private static final AtomicReferenceFieldUpdater<ConcatMapMainSubscriber, Subscription> UPSTREAM_UPDATER = AtomicReferenceFieldUpdater
.newUpdater(ConcatMapMainSubscriber.class, Subscription.class, "upstream");

final ConcatMapInner<O> inner;

ConcatMapMainSubscriber(
MultiSubscriber<? super O> downstream,
Function<? super I, ? extends Publisher<? extends O>> mapper,
boolean delayError) {
this.downstream = downstream;
this.mapper = mapper;
this.delayError = delayError;
this.inner = new ConcatMapInner<>(this);
}

@Override
public void request(long n) {
if (n > 0) {
if (state.compareAndSet(STATE_NEW, STATE_READY)) {
upstream.request(1);
// No outstanding requests from inner, forward the request to upstream
} else if (state.get() == STATE_READY && inner.requested() == 0) {
upstream.request(1);
}
inner.request(n);
} else {
downstream.onFailure(new IllegalArgumentException("Invalid requests, must be greater than 0"));
}
}

@Override
public void cancel() {
while (true) {
int state = this.state.get();
if (state == STATE_CANCELLED) {
return;
}
if (this.state.compareAndSet(state, STATE_CANCELLED)) {
if (state == STATE_OUTER_TERMINATED) {
inner.cancel();
} else {
inner.cancel();
upstream.cancel();
}
return;
}
}
}

@Override
public void onSubscribe(Subscription s) {
if (UPSTREAM_UPDATER.compareAndSet(this, null, s)) {
downstream.onSubscribe(this);
}
}

@Override
public void onItem(I item) {
if (!state.compareAndSet(STATE_READY, STATE_EMITTING)) {
return;
}

try {
Publisher<? extends O> p = mapper.apply(item);
if (p == null) {
throw new NullPointerException(ParameterValidation.MAPPER_RETURNED_NULL);
}

p.subscribe(inner);
} catch (Throwable e) {
ozangunalp marked this conversation as resolved.
Show resolved Hide resolved
if (postponeFailure(e, upstream)) {
innerComplete(0L);
}
}
}

@Override
public void onFailure(Throwable t) {
if (postponeFailure(t, inner)) {
onCompletion();
}
}

@Override
public void onCompletion() {
while (true) {
int state = this.state.get();
if (state == STATE_NEW || state == STATE_READY) {
if (this.state.compareAndSet(state, STATE_TERMINATED)) {
terminateDownstream();
return;
}
} else if (state == STATE_EMITTING) {
if (this.state.compareAndSet(state, STATE_OUTER_TERMINATED)) {
return;
}
} else {
return;
}
}
}

public synchronized void tryEmit(O value) {
switch (state.get()) {
case STATE_EMITTING:
case STATE_OUTER_TERMINATED:
downstream.onItem(value);
break;
default:
break;
}
}

public void innerComplete(long emitted) {
while (true) {
int state = this.state.get();
if (state == STATE_EMITTING) {
if (this.state.compareAndSet(state, STATE_READY)) {
// Inner completed but there are outstanding requests from inner,
// Or the inner completed without producing any items
// Request new item from upstream
if (inner.requested() != 0L || emitted == 0) {
upstream.request(1);
}
return;
}
} else if (state == STATE_OUTER_TERMINATED) {
if (this.state.compareAndSet(state, STATE_TERMINATED)) {
terminateDownstream();
return;
}
} else {
return;
}
}
}

public void innerFailure(Throwable e, long emitted) {
if (postponeFailure(e, upstream)) {
innerComplete(emitted);
}
}

private boolean postponeFailure(Throwable e, Subscription subscription) {
if (e == null) {
return true;
}

Subscriptions.addFailure(failures, e);

if (delayError) {
return true;
}

while (true) {
int state = this.state.get();
if (state == STATE_CANCELLED || state == STATE_TERMINATED) {
return false;
} else {
if (this.state.compareAndSet(state, STATE_TERMINATED)) {
subscription.cancel();
synchronized (this) {
downstream.onFailure(failures.get());
}
return false;
}
}
}
}

private void terminateDownstream() {
Throwable ex = failures.get();
if (ex != null) {
downstream.onFailure(ex);
return;
}
downstream.onCompletion();
}

@Override
public Context context() {
if (downstream instanceof ContextSupport) {
return ((ContextSupport) downstream).context();
} else {
return Context.empty();
}
}

}

static final class ConcatMapInner<O> extends SwitchableSubscriptionSubscriber<O> {
private final ConcatMapMainSubscriber<?, O> parent;

long emitted;

/**
* Downstream passed as {@code null} to {@link SwitchableSubscriptionSubscriber} as accessors are not reachable.
* Effective downstream is {@code parent}.
*
* @param parent parent as downstream
*/
ConcatMapInner(ConcatMapMainSubscriber<?, O> parent) {
super(null);
this.parent = parent;
}

@Override
public void onItem(O item) {
emitted++;
parent.tryEmit(item);
}

@Override
public void onFailure(Throwable failure) {
long p = emitted;

if (p != 0L) {
emitted = 0L;
emitted(p);
}

parent.innerFailure(failure, p);
}

@Override
public void onCompletion() {
long p = emitted;

if (p != 0L) {
emitted = 0L;
emitted(p);
}

parent.innerComplete(p);
}

@Override
public Context context() {
return parent.context();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ public void emitted(long n) {
drain();
}

public long requested() {
return requested;
}

@Override
public final void request(long n) {
if (n <= 0) {
Expand Down
Loading