Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x Add concatMapCompletable() to Observable #5649

Merged
merged 8 commits into from
Oct 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -6251,6 +6251,52 @@ public final <R> Observable<R> concatMapEagerDelayError(Function<? super T, ? ex
return RxJavaPlugins.onAssembly(new ObservableConcatMapEager<T, R>(this, mapper, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY, maxConcurrency, prefetch));
}

/**
* Maps each element of the upstream Observable into CompletableSources, subscribes to them one at a time in
* order and waits until the upstream and all CompletableSources complete.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param mapper
* a function that, when applied to an item emitted by the source ObservableSource, returns a CompletableSource
* @return a Completable that signals {@code onComplete} when the upstream and all CompletableSources complete
* @since 2.1.6 - experimental
*/
@Experimental
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable concatMapCompletable(Function<? super T, ? extends CompletableSource> mapper) {
return concatMapCompletable(mapper, 2);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not we pass 8 here since it'll get overridden to 8 in the implementation?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leave it for now.

}

/**
* Maps each element of the upstream Observable into CompletableSources, subscribes to them one at a time in
* order and waits until the upstream and all CompletableSources complete.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param mapper
* a function that, when applied to an item emitted by the source ObservableSource, returns a CompletableSource
*
* @param capacityHint
* the number of upstream items expected to be buffered until the current CompletableSource, mapped from
* the current item, completes.
* @return a Completable that signals {@code onComplete} when the upstream and all CompletableSources complete
* @since 2.1.6 - experimental
*/
@Experimental
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable concatMapCompletable(Function<? super T, ? extends CompletableSource> mapper, int capacityHint) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
ObjectHelper.verifyPositive(capacityHint, "capacityHint");
return RxJavaPlugins.onAssembly(new ObservableConcatMapCompletable<T>(this, mapper, capacityHint));
}

/**
* Returns an Observable that concatenate each item emitted by the source ObservableSource with the values in an
* Iterable corresponding to that item that is generated by a selector.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.internal.operators.observable;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Function;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.disposables.SequentialDisposable;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.fuseable.QueueDisposable;
import io.reactivex.internal.fuseable.SimpleQueue;
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
import io.reactivex.plugins.RxJavaPlugins;

import java.util.concurrent.atomic.AtomicInteger;

public final class ObservableConcatMapCompletable<T> extends Completable {

final ObservableSource<T> source;
final Function<? super T, ? extends CompletableSource> mapper;
final int bufferSize;

public ObservableConcatMapCompletable(ObservableSource<T> source,
Function<? super T, ? extends CompletableSource> mapper, int bufferSize) {
this.source = source;
this.mapper = mapper;
this.bufferSize = Math.max(8, bufferSize);
}
@Override
public void subscribeActual(CompletableObserver observer) {
source.subscribe(new SourceObserver<T>(observer, mapper, bufferSize));
}

static final class SourceObserver<T> extends AtomicInteger implements Observer<T>, Disposable {

private static final long serialVersionUID = 6893587405571511048L;
final CompletableObserver actual;
final SequentialDisposable sa;
final Function<? super T, ? extends CompletableSource> mapper;
final CompletableObserver inner;
final int bufferSize;

SimpleQueue<T> queue;

Disposable s;

volatile boolean active;

volatile boolean disposed;

volatile boolean done;

int sourceMode;

SourceObserver(CompletableObserver actual,
Function<? super T, ? extends CompletableSource> mapper, int bufferSize) {
this.actual = actual;
this.mapper = mapper;
this.bufferSize = bufferSize;
this.inner = new InnerObserver(actual, this);
this.sa = new SequentialDisposable();
}
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) s;

int m = qd.requestFusion(QueueDisposable.ANY);
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;

actual.onSubscribe(this);

drain();
return;
}

if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;

actual.onSubscribe(this);

return;
}
}

queue = new SpscLinkedArrayQueue<T>(bufferSize);

actual.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode == QueueDisposable.NONE) {
queue.offer(t);
}
drain();
}
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
done = true;
dispose();
actual.onError(t);
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
drain();
}

void innerComplete() {
active = false;
drain();
}

@Override
public boolean isDisposed() {
return disposed;
}

@Override
public void dispose() {
disposed = true;
sa.dispose();
s.dispose();

if (getAndIncrement() == 0) {
queue.clear();
}
}

void innerSubscribe(Disposable s) {
sa.update(s);
}

void drain() {
if (getAndIncrement() != 0) {
return;
}

for (;;) {
if (disposed) {
queue.clear();
return;
}
if (!active) {

boolean d = done;

T t;

try {
t = queue.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
dispose();
queue.clear();
actual.onError(ex);
return;
}

boolean empty = t == null;

if (d && empty) {
disposed = true;
actual.onComplete();
return;
}

if (!empty) {
CompletableSource c;

try {
c = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null CompletableSource");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
dispose();
queue.clear();
actual.onError(ex);
return;
}

active = true;
c.subscribe(inner);
}
}

if (decrementAndGet() == 0) {
break;
}
}
}

static final class InnerObserver implements CompletableObserver {
final CompletableObserver actual;
final SourceObserver<?> parent;

InnerObserver(CompletableObserver actual, SourceObserver<?> parent) {
this.actual = actual;
this.parent = parent;
}

@Override
public void onSubscribe(Disposable s) {
parent.innerSubscribe(s);
}

@Override
public void onError(Throwable t) {
parent.dispose();
actual.onError(t);
}
@Override
public void onComplete() {
parent.innerComplete();
}
}
}
}
Loading