Skip to content

Commit

Permalink
Merge pull request #2455 from duncani/Issue#2191
Browse files Browse the repository at this point in the history
Fix for #2191 - OperatorMulticast fails to unsubscribe from source
  • Loading branch information
akarnokd committed Jan 21, 2015
2 parents 833c6be + ce76ea9 commit 0d96903
Show file tree
Hide file tree
Showing 2 changed files with 217 additions and 6 deletions.
2 changes: 1 addition & 1 deletion src/main/java/rx/internal/operators/OperatorMulticast.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void call() {
subscriptionIsNull = subscription == null;
}
if (!subscriptionIsNull)
source.unsafeSubscribe(subscription);
source.subscribe(subscription);
}
}
}
221 changes: 216 additions & 5 deletions src/test/java/rx/internal/operators/OperatorReplayTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,22 @@
package rx.internal.operators;

import static org.mockito.Matchers.any;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.notNull;
import static org.mockito.Mockito.*;

import org.junit.Test;
import org.mockito.InOrder;


import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Scheduler.Worker;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
Expand Down Expand Up @@ -465,4 +466,214 @@ public void call() {
assertEquals(2, effectCounter.get());
}
}


/**
* test the basic expectation of OperatorMulticast via replay
*/
@Test
public void testIssue2191_UnsubscribeSource() {
// setup mocks
Action1 sourceNext = mock(Action1.class);
Action0 sourceCompleted = mock(Action0.class);
Action0 sourceUnsubscribed = mock(Action0.class);
Observer spiedSubscriberBeforeConnect = mock(Observer.class);
Observer spiedSubscriberAfterConnect = mock(Observer.class);

// Observable under test
Observable<Integer> source = Observable.just(1,2);

ConnectableObservable<Integer> replay = source
.doOnNext(sourceNext)
.doOnUnsubscribe(sourceUnsubscribed)
.doOnCompleted(sourceCompleted)
.replay();

replay.subscribe(spiedSubscriberBeforeConnect);
replay.subscribe(spiedSubscriberBeforeConnect);
replay.connect();
replay.subscribe(spiedSubscriberAfterConnect);
replay.subscribe(spiedSubscriberAfterConnect);


// verify interactions
verify(sourceNext, times(1)).call(1);
verify(sourceNext, times(1)).call(2);
verify(sourceCompleted, times(1)).call();
verifyObserverMock(spiedSubscriberBeforeConnect, 2, 4);
verifyObserverMock(spiedSubscriberAfterConnect, 2, 4);

verify(sourceUnsubscribed, times(1)).call();

verifyNoMoreInteractions(sourceNext);
verifyNoMoreInteractions(sourceCompleted);
verifyNoMoreInteractions(sourceUnsubscribed);
verifyNoMoreInteractions(spiedSubscriberBeforeConnect);
verifyNoMoreInteractions(spiedSubscriberAfterConnect);

}

/**
* Specifically test interaction with a Scheduler with subscribeOn
*
* @throws Exception
*/
@Test
public void testIssue2191_SchedulerUnsubscribe() throws Exception {
// setup mocks
Action1 sourceNext = mock(Action1.class);
Action0 sourceCompleted = mock(Action0.class);
Action0 sourceUnsubscribed = mock(Action0.class);
final Scheduler mockScheduler = mock(Scheduler.class);
final Subscription mockSubscription = mock(Subscription.class);
Worker spiedWorker = workerSpy(mockSubscription);
Observer mockObserverBeforeConnect = mock(Observer.class);
Observer mockObserverAfterConnect = mock(Observer.class);

when(mockScheduler.createWorker()).thenReturn(spiedWorker);

// Observable under test
ConnectableObservable<Integer> replay = Observable.just(1, 2, 3)
.doOnNext(sourceNext)
.doOnUnsubscribe(sourceUnsubscribed)
.doOnCompleted(sourceCompleted)
.subscribeOn(mockScheduler).replay();

replay.subscribe(mockObserverBeforeConnect);
replay.subscribe(mockObserverBeforeConnect);
replay.connect();
replay.subscribe(mockObserverAfterConnect);
replay.subscribe(mockObserverAfterConnect);

// verify interactions
verify(sourceNext, times(1)).call(1);
verify(sourceNext, times(1)).call(2);
verify(sourceNext, times(1)).call(3);
verify(sourceCompleted, times(1)).call();
verify(mockScheduler, times(1)).createWorker();
verify(spiedWorker, times(1)).schedule((Action0)notNull());
verifyObserverMock(mockObserverBeforeConnect, 2, 6);
verifyObserverMock(mockObserverAfterConnect, 2, 6);

verify(spiedWorker, times(1)).unsubscribe();
verify(sourceUnsubscribed, times(1)).call();

verifyNoMoreInteractions(sourceNext);
verifyNoMoreInteractions(sourceCompleted);
verifyNoMoreInteractions(sourceUnsubscribed);
verifyNoMoreInteractions(spiedWorker);
verifyNoMoreInteractions(mockSubscription);
verifyNoMoreInteractions(mockScheduler);
verifyNoMoreInteractions(mockObserverBeforeConnect);
verifyNoMoreInteractions(mockObserverAfterConnect);
}

/**
* Specifically test interaction with a Scheduler with subscribeOn
*
* @throws Exception
*/
@Test
public void testIssue2191_SchedulerUnsubscribeOnError() throws Exception {
// setup mocks
Action1 sourceNext = mock(Action1.class);
Action0 sourceCompleted = mock(Action0.class);
Action1 sourceError = mock(Action1.class);
Action0 sourceUnsubscribed = mock(Action0.class);
final Scheduler mockScheduler = mock(Scheduler.class);
final Subscription mockSubscription = mock(Subscription.class);
Worker spiedWorker = workerSpy(mockSubscription);
Observer mockObserverBeforeConnect = mock(Observer.class);
Observer mockObserverAfterConnect = mock(Observer.class);

when(mockScheduler.createWorker()).thenReturn(spiedWorker);

// Observable under test
Func1<Integer, Integer> mockFunc = mock(Func1.class);
IllegalArgumentException illegalArgumentException = new IllegalArgumentException();
when(mockFunc.call(1)).thenReturn(1);
when(mockFunc.call(2)).thenThrow(illegalArgumentException);
ConnectableObservable<Integer> replay = Observable.just(1, 2, 3).map(mockFunc)
.doOnNext(sourceNext)
.doOnUnsubscribe(sourceUnsubscribed)
.doOnCompleted(sourceCompleted)
.doOnError(sourceError)
.subscribeOn(mockScheduler).replay();

replay.subscribe(mockObserverBeforeConnect);
replay.subscribe(mockObserverBeforeConnect);
replay.connect();
replay.subscribe(mockObserverAfterConnect);
replay.subscribe(mockObserverAfterConnect);

// verify interactions
verify(mockScheduler, times(1)).createWorker();
verify(spiedWorker, times(1)).schedule((Action0)notNull());
verify(sourceNext, times(1)).call(1);
verify(sourceError, times(1)).call(illegalArgumentException);
verifyObserver(mockObserverBeforeConnect, 2, 2, illegalArgumentException);
verifyObserver(mockObserverAfterConnect, 2, 2, illegalArgumentException);

verify(spiedWorker, times(1)).unsubscribe();
verify(sourceUnsubscribed, times(1)).call();

verifyNoMoreInteractions(sourceNext);
verifyNoMoreInteractions(sourceCompleted);
verifyNoMoreInteractions(sourceError);
verifyNoMoreInteractions(sourceUnsubscribed);
verifyNoMoreInteractions(spiedWorker);
verifyNoMoreInteractions(mockSubscription);
verifyNoMoreInteractions(mockScheduler);
verifyNoMoreInteractions(mockObserverBeforeConnect);
verifyNoMoreInteractions(mockObserverAfterConnect);
}

private static void verifyObserverMock(Observer mock, int numSubscriptions, int numItemsExpected) {
verify(mock, times(numItemsExpected)).onNext(notNull());
verify(mock, times(numSubscriptions)).onCompleted();
verifyNoMoreInteractions(mock);
}

private static void verifyObserver(Observer mock, int numSubscriptions, int numItemsExpected, Throwable error) {
verify(mock, times(numItemsExpected)).onNext(notNull());
verify(mock, times(numSubscriptions)).onError(error);
verifyNoMoreInteractions(mock);
}

public static Worker workerSpy(final Subscription mockSubscription) {
return spy(new InprocessWorker(mockSubscription));
}


private static class InprocessWorker extends Worker {
private final Subscription mockSubscription;
public boolean unsubscribed;

public InprocessWorker(Subscription mockSubscription) {
this.mockSubscription = mockSubscription;
}

@Override
public Subscription schedule(Action0 action) {
action.call();
return mockSubscription; // this subscription is returned but discarded
}

@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
action.call();
return mockSubscription;
}

@Override
public void unsubscribe() {
unsubscribed = true;
}

@Override
public boolean isUnsubscribed() {
return unsubscribed;
}
}

}

0 comments on commit 0d96903

Please sign in to comment.