Skip to content

Commit

Permalink
Merge pull request #1264 from akarnokd/ObserveOnScheduleUnsubscribe
Browse files Browse the repository at this point in the history
ObserveOn scheduled unsubscription
  • Loading branch information
benjchristensen committed May 28, 2014
2 parents 78c250b + e1b4348 commit dd52daf
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 1 deletion.
43 changes: 42 additions & 1 deletion rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
*/
package rx.operators;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

import rx.Observable.Operator;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.schedulers.ImmediateScheduler;
import rx.schedulers.TrampolineScheduler;
Expand Down Expand Up @@ -59,6 +61,7 @@ public Subscriber<? super T> call(Subscriber<? super T> child) {
private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
final Subscriber<? super T> observer;
private final Scheduler.Worker recursiveScheduler;
private final ScheduledUnsubscribe scheduledUnsubscribe;
final NotificationLite<T> on = NotificationLite.instance();
/** Guarded by this. */
private FastList queue = new FastList();
Expand All @@ -72,11 +75,15 @@ public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> subscriber
super(subscriber);
this.observer = subscriber;
this.recursiveScheduler = scheduler.createWorker();
subscriber.add(recursiveScheduler);
this.scheduledUnsubscribe = new ScheduledUnsubscribe(recursiveScheduler);
subscriber.add(scheduledUnsubscribe);
}

@Override
public void onNext(final T t) {
if (scheduledUnsubscribe.isUnsubscribed()) {
return;
}
synchronized (this) {
queue.add(on.next(t));
}
Expand All @@ -85,6 +92,9 @@ public void onNext(final T t) {

@Override
public void onCompleted() {
if (scheduledUnsubscribe.isUnsubscribed()) {
return;
}
synchronized (this) {
queue.add(on.completed());
}
Expand All @@ -93,6 +103,9 @@ public void onCompleted() {

@Override
public void onError(final Throwable e) {
if (scheduledUnsubscribe.isUnsubscribed()) {
return;
}
synchronized (this) {
queue.add(on.error(e));
}
Expand Down Expand Up @@ -153,4 +166,32 @@ public void add(Object o) {
size = s + 1;
}
}
static final class ScheduledUnsubscribe implements Subscription {
final Scheduler.Worker worker;
volatile int once;
static final AtomicIntegerFieldUpdater<ScheduledUnsubscribe> ONCE_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(ScheduledUnsubscribe.class, "once");

public ScheduledUnsubscribe(Scheduler.Worker worker) {
this.worker = worker;
}

@Override
public boolean isUnsubscribed() {
return once != 0;
}

@Override
public void unsubscribe() {
if (ONCE_UPDATER.getAndSet(this, 1) == 0) {
worker.schedule(new Action0() {
@Override
public void call() {
worker.unsubscribe();
}
});
}
}

}
}
24 changes: 24 additions & 0 deletions rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@

import org.junit.Test;
import org.mockito.InOrder;
import static org.mockito.Matchers.anyInt;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.exceptions.TestException;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
Expand Down Expand Up @@ -365,4 +367,26 @@ private static int randomIntFrom0to100() {
x ^= (x << 4);
return Math.abs((int) x % 100);
}

@Test
public void testDelayedErrorDeliveryWhenSafeSubscriberUnsubscribes() {
TestScheduler testScheduler = new TestScheduler();

Observable<Integer> source = Observable.concat(Observable.<Integer>error(new TestException()), Observable.just(1));


@SuppressWarnings("unchecked")
Observer<Integer> o = mock(Observer.class);
InOrder inOrder = inOrder(o);

source.observeOn(testScheduler).subscribe(o);

inOrder.verify(o, never()).onError(any(TestException.class));

testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);

inOrder.verify(o).onError(any(TestException.class));
inOrder.verify(o, never()).onNext(anyInt());
inOrder.verify(o, never()).onCompleted();
}
}

0 comments on commit dd52daf

Please sign in to comment.