Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MergeDelayError & OnErrorFlatMap w/ Merge #1457

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 86 additions & 14 deletions rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
*/
package rx.internal.operators;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

import rx.Observable;
import rx.Observable.Operator;
import rx.Producer;
import rx.Subscriber;
import rx.exceptions.CompositeException;
import rx.exceptions.MissingBackpressureException;
import rx.exceptions.OnErrorThrowable;
import rx.functions.Func1;
import rx.internal.util.RxRingBuffer;
import rx.internal.util.ScalarSynchronousObservable;
Expand All @@ -33,17 +37,26 @@
* <p>
* <img width="640" height="380" src="https://raw.githubusercontent.com/wiki/Netflix/RxJava/images/rx-operators/merge.png" alt="">
* <p>
* You can combine the items emitted by multiple {@code Observable}s so that they act like a single
* {@code Observable}, by using the merge operation.
* You can combine the items emitted by multiple {@code Observable}s so that they act like a single {@code Observable}, by using the merge operation.
*
* @param <T>
* the type of the items emitted by both the source and merged {@code Observable}s
*/
public final class OperatorMerge<T> implements Operator<T, Observable<? extends T>> {
public class OperatorMerge<T> implements Operator<T, Observable<? extends T>> {

public OperatorMerge() {
this.delayErrors = false;
}

public OperatorMerge(boolean delayErrors) {
this.delayErrors = delayErrors;
}

private final boolean delayErrors;

@Override
public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {
return new MergeSubscriber<T>(child);
return new MergeSubscriber<T>(child, delayErrors);

}

Expand All @@ -53,6 +66,8 @@ private static final class MergeSubscriber<T> extends Subscriber<Observable<? ex
private final MergeProducer<T> mergeProducer;
private int wip;
private boolean completed;
private final boolean delayErrors;
private ConcurrentLinkedQueue<Throwable> exceptions;

private volatile SubscriptionIndexedRingBuffer<InnerSubscriber<T>> childrenSubscribers;

Expand All @@ -77,10 +92,11 @@ private static final class MergeSubscriber<T> extends Subscriber<Observable<? ex
* } </pre>
*/

public MergeSubscriber(Subscriber<? super T> actual) {
public MergeSubscriber(Subscriber<? super T> actual, boolean delayErrors) {
super(actual);
this.actual = actual;
this.mergeProducer = new MergeProducer<T>(this);
this.delayErrors = delayErrors;
// decoupled the subscription chain because we need to decouple and control backpressure
actual.add(this);
actual.setProducer(mergeProducer);
Expand Down Expand Up @@ -337,8 +353,26 @@ public Boolean call(InnerSubscriber<T> s) {

@Override
public void onError(Throwable e) {
actual.onError(e);
unsubscribe();
if (delayErrors) {
synchronized (this) {
if (exceptions == null) {
exceptions = new ConcurrentLinkedQueue<Throwable>();
}
}
exceptions.add(e);
boolean sendOnComplete = false;
synchronized (this) {
wip--;
if (wip == 0 && completed) {
sendOnComplete = true;
}
}
if (sendOnComplete) {
drainAndComplete();
}
} else {
actual.onError(e);
}
}

@Override
Expand Down Expand Up @@ -372,7 +406,25 @@ void completeInner(InnerSubscriber<T> s) {

private void drainAndComplete() {
drainQueuesIfNeeded(); // TODO need to confirm whether this is needed or not
actual.onCompleted();
if (delayErrors) {
Queue<Throwable> es = null;
synchronized (this) {
es = exceptions;
}
if (es != null) {
if (es.isEmpty()) {
actual.onCompleted();
} else if (es.size() == 1) {
actual.onError(es.poll());
} else {
actual.onError(new CompositeException(es));
}
} else {
actual.onCompleted();
}
} else {
actual.onCompleted();
}
}

}
Expand Down Expand Up @@ -493,7 +545,12 @@ private void emit(T t, boolean complete) {
if (complete) {
parentSubscriber.completeInner(this);
} else {
parentSubscriber.actual.onNext(t);
try {
parentSubscriber.actual.onNext(t);
} catch (Throwable e) {
// special error handling due to complexity of merge
onError(OnErrorThrowable.addValueAsLastCause(e, t));
}
emitted++;
}
} else {
Expand All @@ -503,7 +560,12 @@ private void emit(T t, boolean complete) {
if (complete) {
parentSubscriber.completeInner(this);
} else {
parentSubscriber.actual.onNext(t);
try {
parentSubscriber.actual.onNext(t);
} catch (Throwable e) {
// special error handling due to complexity of merge
onError(OnErrorThrowable.addValueAsLastCause(e, t));
}
emitted++;
producer.REQUESTED.decrementAndGet(producer);
}
Expand Down Expand Up @@ -585,8 +647,13 @@ private int drainRequested() {
} else if (q.isCompleted(o)) {
parentSubscriber.completeInner(this);
} else {
if (!q.accept(o, parentSubscriber.actual)) {
emitted++;
try {
if (!q.accept(o, parentSubscriber.actual)) {
emitted++;
}
} catch (Throwable e) {
// special error handling due to complexity of merge
onError(OnErrorThrowable.addValueAsLastCause(e, o));
}
}
}
Expand All @@ -604,8 +671,13 @@ private int drainAll() {
if (q.isCompleted(o)) {
parentSubscriber.completeInner(this);
} else {
if (!q.accept(o, parentSubscriber.actual)) {
emitted++;
try {
if (!q.accept(o, parentSubscriber.actual)) {
emitted++;
}
} catch (Throwable e) {
// special error handling due to complexity of merge
onError(OnErrorThrowable.addValueAsLastCause(e, o));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,20 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.operators;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import rx.Observable;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.exceptions.CompositeException;
import rx.observers.SerializedSubscriber;
import rx.subscriptions.CompositeSubscription;

/**
* This behaves like {@link OperatorMerge} except that if any of the merged Observables notify of
* an error via {@code onError}, {@code mergeDelayError} will refrain from propagating that error
Expand All @@ -37,113 +28,15 @@
* This operation allows an Observer to receive all successfully emitted items from all of the
* source Observables without being interrupted by an error notification from one of them.
* <p>
* <em>Note:</em> If this is used on an Observable that never completes, it will never call
* {@code onError} and will effectively swallow errors.
* <em>Note:</em> If this is used on an Observable that never completes, it will never call {@code onError} and will effectively swallow errors.
*
* @param <T> the source and result value type
* @param <T>
* the source and result value type
*/
public final class OperatorMergeDelayError<T> implements Operator<T, Observable<? extends T>> {

@Override
public Subscriber<? super Observable<? extends T>> call(Subscriber<? super T> child) {
final SerializedSubscriber<T> s = new SerializedSubscriber<T>(child);
final CompositeSubscription csub = new CompositeSubscription();
child.add(csub);

return new MergeDelayErrorSubscriber<T>(s, csub);
}

static final class MergeDelayErrorSubscriber<T> extends Subscriber<Observable<? extends T>> {
final Subscriber<? super T> s;
final CompositeSubscription csub;
final ConcurrentLinkedQueue<Throwable> exceptions = new ConcurrentLinkedQueue<Throwable>();

volatile int wip;
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<MergeDelayErrorSubscriber> WIP_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(MergeDelayErrorSubscriber.class, "wip");

public MergeDelayErrorSubscriber(Subscriber<? super T> s, CompositeSubscription csub) {
super(s);
this.s = s;
this.csub = csub;
this.wip = 1;
}

@Override
public void onNext(Observable<? extends T> t) {
WIP_UPDATER.incrementAndGet(this);

Subscriber<T> itemSub = new Subscriber<T>() {
/** Make sure terminal events are handled once to avoid wip problems. */
boolean once = true;
@Override
public void onNext(T t) {
// prevent misbehaving source to emit past the error
if (once) {
try {
s.onNext(t);
} catch (Throwable e) {
// in case the source doesn't properly handle exceptions
onError(e);
}
}
}

@Override
public void onError(Throwable e) {
if (once) {
once = false;
error(e);
}
}

@Override
public void onCompleted() {
if (once) {
once = false;
try {
complete();
} finally {
csub.remove(this);
}
}
}

};
csub.add(itemSub);

t.unsafeSubscribe(itemSub);
}

@Override
public void onError(Throwable e) {
error(e);
}

@Override
public void onCompleted() {
complete();
}
public final class OperatorMergeDelayError<T> extends OperatorMerge<T> {

void error(Throwable e) {
exceptions.add(e);
complete();
}

void complete() {
if (WIP_UPDATER.decrementAndGet(this) == 0) {
if (exceptions.isEmpty()) {
s.onCompleted();
} else
if (exceptions.size() > 1) {
s.onError(new CompositeException(exceptions));
} else {
s.onError(exceptions.peek());
}
exceptions.clear();
unsubscribe();
}
}
public OperatorMergeDelayError() {
super(true);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ public void onError(final Throwable e) {
if (terminated) {
return;
}
terminated = true;
if (emitting) {
if (queue == null) {
queue = new FastList();
Expand All @@ -121,6 +120,9 @@ public void onError(final Throwable e) {
}
drainQueue(list);
actual.onError(e);
synchronized(this) {
emitting = false;
}
}

@Override
Expand Down
Loading