Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ObserveOn Error Propagation #1728

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 34 additions & 38 deletions src/main/java/rx/internal/operators/OperatorObserveOn.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import rx.exceptions.MissingBackpressureException;
import rx.functions.Action0;
import rx.internal.util.RxRingBuffer;
import rx.internal.util.SynchronizedSubscription;
import rx.schedulers.ImmediateScheduler;
import rx.schedulers.TrampolineScheduler;

Expand Down Expand Up @@ -97,12 +96,10 @@ public void request(long n) {
}

});
add(scheduledUnsubscribe);
child.add(recursiveScheduler);
child.add(this);

}

@Override
public void onStart() {
// signal that this is an async operator capable of receiving this many
Expand Down Expand Up @@ -160,53 +157,52 @@ public void call() {
}
}

// only execute this from schedule()
private void pollQueue() {
int emitted = 0;
while (true) {
do {
/*
* Set to 1 otherwise it could have grown very large while in the last poll loop
* and then we can end up looping all those times again here before exiting even once we've drained
*/
COUNTER_UPDATER.set(this, 1);

while (!scheduledUnsubscribe.isUnsubscribed()) {
if (REQUESTED.getAndDecrement(this) != 0) {
if (failure) {
// special handling to short-circuit an error propagation
Object o = queue.poll();
if (o == null) {
// nothing in queue
REQUESTED.incrementAndGet(this);
break;
} else {
if (failure) {
// completed so we will skip onNext if they exist and only emit terminal events
if (on.isError(o)) {
System.out.println("Error: " + o);
// only emit error
on.accept(child, o);
}
// completed so we will skip onNext if they exist and only emit terminal events
if (on.isError(o)) {
// only emit error
on.accept(child, o);
// we have emitted a terminal event so return (exit the loop we're in)
return;
}
} else {
if (REQUESTED.getAndDecrement(this) != 0) {
Object o = queue.poll();
if (o == null) {
// nothing in queue
REQUESTED.incrementAndGet(this);
break;
} else {
if (!on.accept(child, o)) {
// non-terminal event so let's increment count
emitted++;
}
}
} else {
// we hit the end ... so increment back to 0 again
REQUESTED.incrementAndGet(this);
break;
}
} else {
// we hit the end ... so increment back to 0 again
REQUESTED.incrementAndGet(this);
break;
}
}
long c = COUNTER_UPDATER.decrementAndGet(this);
if (c <= 0) {
// request the number of items that we emitted in this poll loop
if (emitted > 0) {
request(emitted);
}
break;
} else {
/*
* Set down to 1 and then iterate again.
* we lower it to 1 otherwise it could have grown very large while in the last poll loop
* and then we can end up looping all those times again here before existing even once we've drained
*/
COUNTER_UPDATER.set(this, 1);
// we now loop again, and if anything tries scheduling again after this it will increment and cause us to loop again after
}
} while (COUNTER_UPDATER.decrementAndGet(this) > 0);

// request the number of items that we emitted in this poll loop
if (emitted > 0) {
request(emitted);
}
}
}
Expand Down
70 changes: 69 additions & 1 deletion src/test/java/rx/internal/operators/OperatorObserveOnTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.junit.Ignore;
import org.junit.Test;
import org.mockito.InOrder;

import rx.Notification;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observer;
Expand All @@ -48,6 +50,7 @@
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.internal.util.RxRingBuffer;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
Expand Down Expand Up @@ -391,7 +394,7 @@ public void testDelayedErrorDeliveryWhenSafeSubscriberUnsubscribes() {
inOrder.verify(o, never()).onNext(anyInt());
inOrder.verify(o, never()).onCompleted();
}

@Test
public void testAfterUnsubscribeCalledThenObserverOnNextNeverCalled() {
final TestScheduler testScheduler = new TestScheduler();
Expand Down Expand Up @@ -647,6 +650,71 @@ public void onNext(Long t) {
assertTrue(ts.getOnNextEvents().size() == ts.getOnNextEvents().get(ts.getOnNextEvents().size() - 1) + 1);
// we should emit the error without emitting the full buffer size
assertTrue(ts.getOnNextEvents().size() < RxRingBuffer.SIZE);
}

/**
* Make sure we get a MissingBackpressureException propagated through when we have a fast temporal (hot) producer.
*/
@Test
public void testHotOperatorBackpressure() {
TestSubscriber<String> ts = new TestSubscriber<String>();
Observable.timer(0, 1, TimeUnit.MICROSECONDS)
.observeOn(Schedulers.computation())
.map(new Func1<Long, String>() {

@Override
public String call(Long t1) {
System.out.println(t1);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
return t1 + " slow value";
}

}).subscribe(ts);

ts.awaitTerminalEvent();
System.out.println("Errors: " + ts.getOnErrorEvents());
assertEquals(1, ts.getOnErrorEvents().size());
assertEquals(MissingBackpressureException.class, ts.getOnErrorEvents().get(0).getClass());
}

@Test
public void testErrorPropagatesWhenNoOutstandingRequests() {
Observable<Long> timer = Observable.timer(0, 1, TimeUnit.MICROSECONDS)
.doOnEach(new Action1<Notification<? super Long>>() {

@Override
public void call(Notification<? super Long> n) {
// System.out.println("BEFORE " + n);
}

})
.observeOn(Schedulers.newThread())
.doOnEach(new Action1<Notification<? super Long>>() {

@Override
public void call(Notification<? super Long> n) {
// System.out.println("AFTER " + n);
}

});

TestSubscriber<Long> ts = new TestSubscriber<Long>();

Observable.combineLatest(timer, Observable.<Integer> never(), new Func2<Long, Integer, Long>() {

@Override
public Long call(Long t1, Integer t2) {
return t1;
}

}).take(RxRingBuffer.SIZE * 2).subscribe(ts);

ts.awaitTerminalEvent();
assertEquals(1, ts.getOnErrorEvents().size());
assertEquals(MissingBackpressureException.class, ts.getOnErrorEvents().get(0).getClass());
}

}