Skip to content

Commit

Permalink
Merge pull request ReactiveX#441 from zsxwing/issue-417
Browse files Browse the repository at this point in the history
Fixed the issue that 'take' does not call 'onError'
  • Loading branch information
benjchristensen committed Oct 22, 2013
2 parents 43a5f76 + e0b50fc commit 2f842eb
Showing 1 changed file with 72 additions and 4 deletions.
76 changes: 72 additions & 4 deletions rxjava-core/src/main/java/rx/operators/OperationTake.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,30 @@
*/
package rx.operators;

import static org.junit.Assert.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;
import org.mockito.InOrder;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func1;

/**
* Returns an Observable that emits the first <code>num</code> items emitted by the source
Expand Down Expand Up @@ -114,30 +124,47 @@ private class ItemObserver implements Observer<T> {
private final Observer<? super T> observer;

private final AtomicInteger counter = new AtomicInteger();
private volatile boolean hasEmitedError = false;

public ItemObserver(Observer<? super T> observer) {
this.observer = observer;
}

@Override
public void onCompleted() {
if (hasEmitedError) {
return;
}
if (counter.getAndSet(num) < num) {
observer.onCompleted();
}
}

@Override
public void onError(Throwable e) {
if (hasEmitedError) {
return;
}
if (counter.getAndSet(num) < num) {
observer.onError(e);
}
}

@Override
public void onNext(T args) {
if (hasEmitedError) {
return;
}
final int count = counter.incrementAndGet();
if (count <= num) {
observer.onNext(args);
try {
observer.onNext(args);
} catch (Throwable ex) {
hasEmitedError = true;
observer.onError(ex);
subscription.unsubscribe();
return;
}
if (count == num) {
observer.onCompleted();
}
Expand Down Expand Up @@ -184,6 +211,47 @@ public void testTake2() {
verify(aObserver, times(1)).onCompleted();
}

@Test(expected = IllegalArgumentException.class)
public void testTakeWithError() {
Observable.from(1, 2, 3).take(1).map(new Func1<Integer, Integer>() {
public Integer call(Integer t1) {
throw new IllegalArgumentException("some error");
}
}).toBlockingObservable().single();
}

@Test
public void testTakeWithErrorHappeningInOnNext() {
Observable<Integer> w = Observable.from(1, 2, 3).take(2).map(new Func1<Integer, Integer>() {
public Integer call(Integer t1) {
throw new IllegalArgumentException("some error");
}
});

@SuppressWarnings("unchecked")
Observer<Integer> observer = mock(Observer.class);
w.subscribe(observer);
InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onError(any(IllegalArgumentException.class));
inOrder.verifyNoMoreInteractions();
}

@Test
public void testTakeWithErrorHappeningInTheLastOnNext() {
Observable<Integer> w = Observable.from(1, 2, 3).take(1).map(new Func1<Integer, Integer>() {
public Integer call(Integer t1) {
throw new IllegalArgumentException("some error");
}
});

@SuppressWarnings("unchecked")
Observer<Integer> observer = mock(Observer.class);
w.subscribe(observer);
InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onError(any(IllegalArgumentException.class));
inOrder.verifyNoMoreInteractions();
}

@Test
public void testTakeDoesntLeakErrors() {
Observable<String> source = Observable.create(new OnSubscribeFunc<String>()
Expand Down

0 comments on commit 2f842eb

Please sign in to comment.