Skip to content

Commit

Permalink
Merge pull request ReactiveX#443 from ylecaillez/master
Browse files Browse the repository at this point in the history
OperationSwitch notify onComplete() too early.
  • Loading branch information
benjchristensen committed Oct 22, 2013
2 parents 2f842eb + 76c0daa commit 2dc5240
Showing 1 changed file with 175 additions and 44 deletions.
219 changes: 175 additions & 44 deletions rxjava-core/src/main/java/rx/operators/OperationSwitch.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
*/
package rx.operators;

import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Before;
import org.junit.Test;
Expand All @@ -30,24 +34,30 @@
import rx.Observer;
import rx.Subscription;
import rx.concurrency.TestScheduler;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.MultipleAssignmentSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Func1;

/**
* Transforms an Observable that emits Observables into a single Observable that emits the items
* emitted by the most recently published of those Observables.
* Transforms an Observable that emits Observables into a single Observable that
* emits the items emitted by the most recently published of those Observables.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/switchDo.png">
* <img width="640" src=
* "https://github.com/Netflix/RxJava/wiki/images/rx-operators/switchDo.png">
*/
public final class OperationSwitch {

/**
* This function transforms an {@link Observable} sequence of {@link Observable} sequences into a single {@link Observable} sequence which produces values from the most recently published
* {@link Observable}.
* This function transforms an {@link Observable} sequence of
* {@link Observable} sequences into a single {@link Observable} sequence
* which produces values from the most recently published {@link Observable}
* .
*
* @param sequences
* The {@link Observable} sequence consisting of {@link Observable} sequences.
* The {@link Observable} sequence consisting of
* {@link Observable} sequences.
* @return A {@link Func1} which does this transformation.
*/
public static <T> OnSubscribeFunc<T> switchDo(final Observable<? extends Observable<? extends T>> sequences) {
Expand All @@ -69,69 +79,113 @@ public Switch(Observable<? extends Observable<? extends T>> sequences) {

@Override
public Subscription onSubscribe(Observer<? super T> observer) {
SafeObservableSubscription subscription = new SafeObservableSubscription();
subscription.wrap(sequences.subscribe(new SwitchObserver<T>(observer, subscription)));
return subscription;
SafeObservableSubscription parent;
parent = new SafeObservableSubscription();

MultipleAssignmentSubscription child;
child = new MultipleAssignmentSubscription();

parent.wrap(sequences.subscribe(new SwitchObserver<T>(observer, parent, child)));

return new CompositeSubscription(parent, child);
}
}

private static class SwitchObserver<T> implements Observer<Observable<? extends T>> {

private final Observer<? super T> observer;
private final SafeObservableSubscription parent;
private final AtomicReference<Subscription> subsequence = new AtomicReference<Subscription>();
private final Object gate;
private final Observer<? super T> observer;
private final SafeObservableSubscription parent;
private final MultipleAssignmentSubscription child;
private long latest;
private boolean stopped;
private boolean hasLatest;

public SwitchObserver(Observer<? super T> observer, SafeObservableSubscription parent) {
public SwitchObserver(Observer<? super T> observer, SafeObservableSubscription parent,
MultipleAssignmentSubscription child) {
this.observer = observer;
this.parent = parent;
}

@Override
public void onCompleted() {
unsubscribeFromSubSequence();
observer.onCompleted();
}

@Override
public void onError(Throwable e) {
unsubscribeFromSubSequence();
observer.onError(e);
this.child = child;
this.gate = new Object();
}

@Override
public void onNext(Observable<? extends T> args) {
unsubscribeFromSubSequence();
final long id;
synchronized (gate) {
id = ++latest;
this.hasLatest = true;
}

subsequence.set(args.subscribe(new Observer<T>() {
final SafeObservableSubscription sub;
sub = new SafeObservableSubscription();
sub.wrap(args.subscribe(new Observer<T>() {
@Override
public void onCompleted() {
// Do nothing.
public void onNext(T args) {
synchronized (gate) {
if (latest == id) {
SwitchObserver.this.observer.onNext(args);
}
}
}

@Override
public void onError(Throwable e) {
parent.unsubscribe();
observer.onError(e);
synchronized (gate) {
sub.unsubscribe();
if (latest == id) {
SwitchObserver.this.observer.onError(e);
SwitchObserver.this.parent.unsubscribe();
}
}
}

@Override
public void onNext(T args) {
observer.onNext(args);
public void onCompleted() {
synchronized (gate) {
sub.unsubscribe();
if (latest == id) {
SwitchObserver.this.hasLatest = false;
}

if (stopped) {
SwitchObserver.this.observer.onCompleted();
SwitchObserver.this.parent.unsubscribe();
}

}
}

}));

this.child.setSubscription(sub);
}

private void unsubscribeFromSubSequence() {
Subscription previousSubscription = subsequence.get();
if (previousSubscription != null) {
previousSubscription.unsubscribe();
@Override
public void onError(Throwable e) {
synchronized (gate) {
this.observer.onError(e);
}

this.parent.unsubscribe();
}

@Override
public void onCompleted() {
synchronized (gate) {
this.stopped = true;
if (!this.hasLatest) {
this.observer.onCompleted();
this.parent.unsubscribe();
}
}
}

}

public static class UnitTest {

private TestScheduler scheduler;
private TestScheduler scheduler;
private Observer<String> observer;

@Before
Expand All @@ -141,6 +195,83 @@ public void before() {
observer = mock(Observer.class);
}

@Test
public void testSwitchWhenOuterCompleteBeforeInner() {
Observable<Observable<String>> source = Observable.create(new OnSubscribeFunc<Observable<String>>() {
@Override
public Subscription onSubscribe(Observer<? super Observable<String>> observer) {
publishNext(observer, 50, Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription onSubscribe(Observer<? super String> observer) {
publishNext(observer, 70, "one");
publishNext(observer, 100, "two");
publishCompleted(observer, 200);
return Subscriptions.empty();
}
}));
publishCompleted(observer, 60);

return Subscriptions.empty();
}
});

Observable<String> sampled = Observable.create(OperationSwitch.switchDo(source));
sampled.subscribe(observer);

InOrder inOrder = inOrder(observer);

scheduler.advanceTimeTo(350, TimeUnit.MILLISECONDS);
inOrder.verify(observer, times(2)).onNext(anyString());
inOrder.verify(observer, times(1)).onCompleted();
}

@Test
public void testSwitchWhenInnerCompleteBeforeOuter() {
Observable<Observable<String>> source = Observable.create(new OnSubscribeFunc<Observable<String>>() {
@Override
public Subscription onSubscribe(Observer<? super Observable<String>> observer) {
publishNext(observer, 10, Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription onSubscribe(Observer<? super String> observer) {
publishNext(observer, 0, "one");
publishNext(observer, 10, "two");
publishCompleted(observer, 20);
return Subscriptions.empty();
}
}));

publishNext(observer, 100, Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription onSubscribe(Observer<? super String> observer) {
publishNext(observer, 0, "three");
publishNext(observer, 10, "four");
publishCompleted(observer, 20);
return Subscriptions.empty();
}
}));
publishCompleted(observer, 200);

return Subscriptions.empty();
}
});

Observable<String> sampled = Observable.create(OperationSwitch.switchDo(source));
sampled.subscribe(observer);

InOrder inOrder = inOrder(observer);

scheduler.advanceTimeTo(150, TimeUnit.MILLISECONDS);
inOrder.verify(observer, never()).onCompleted();
inOrder.verify(observer, times(1)).onNext("one");
inOrder.verify(observer, times(1)).onNext("two");
inOrder.verify(observer, times(1)).onNext("three");
inOrder.verify(observer, times(1)).onNext("four");

scheduler.advanceTimeTo(250, TimeUnit.MILLISECONDS);
inOrder.verify(observer, never()).onNext(anyString());
inOrder.verify(observer, times(1)).onCompleted();
}

@Test
public void testSwitchWithComplete() {
Observable<Observable<String>> source = Observable.create(new OnSubscribeFunc<Observable<String>>() {
Expand All @@ -149,7 +280,7 @@ public Subscription onSubscribe(Observer<? super Observable<String>> observer) {
publishNext(observer, 50, Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription onSubscribe(Observer<? super String> observer) {
publishNext(observer, 50, "one");
publishNext(observer, 60, "one");
publishNext(observer, 100, "two");
return Subscriptions.empty();
}
Expand Down Expand Up @@ -196,8 +327,8 @@ public Subscription onSubscribe(Observer<? super String> observer) {
verify(observer, never()).onError(any(Throwable.class));

scheduler.advanceTimeTo(350, TimeUnit.MILLISECONDS);
inOrder.verify(observer, never()).onNext(anyString());
verify(observer, times(1)).onCompleted();
inOrder.verify(observer, times(1)).onNext("four");
verify(observer, never()).onCompleted();
verify(observer, never()).onError(any(Throwable.class));
}

Expand Down

0 comments on commit 2dc5240

Please sign in to comment.