Skip to content

Commit

Permalink
Merge pull request ReactiveX#399 from benjchristensen/pull-390-elemen…
Browse files Browse the repository at this point in the history
…t-at

Merge Pull ReactiveX#390 - elementAt
  • Loading branch information
benjchristensen committed Sep 21, 2013
2 parents 8129a47 + 152a693 commit 49d1cad
Show file tree
Hide file tree
Showing 3 changed files with 302 additions and 1 deletion.
41 changes: 40 additions & 1 deletion rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import rx.observables.ConnectableObservable;
import rx.observables.GroupedObservable;
import rx.operators.OperationAll;
import rx.operators.OperationAny;
import rx.operators.OperationAverage;
import rx.operators.OperationBuffer;
import rx.operators.OperationCache;
Expand All @@ -39,6 +40,7 @@
import rx.operators.OperationDematerialize;
import rx.operators.OperationDistinct;
import rx.operators.OperationDistinctUntilChanged;
import rx.operators.OperationElementAt;
import rx.operators.OperationFilter;
import rx.operators.OperationFinally;
import rx.operators.OperationFirstOrDefault;
Expand Down Expand Up @@ -78,7 +80,6 @@
import rx.operators.OperationZip;
import rx.operators.SafeObservableSubscription;
import rx.operators.SafeObserver;
import rx.operators.OperationAny;
import rx.plugins.RxJavaErrorHandler;
import rx.plugins.RxJavaObservableExecutionHook;
import rx.plugins.RxJavaPlugins;
Expand Down Expand Up @@ -3139,6 +3140,44 @@ public <U> Observable<T> distinct(Func1<? super T, ? extends U> keySelector, Com
return create(OperationDistinct.distinct(this, keySelector, equalityComparator));
}

/**
* Returns the element at a specified index in a sequence.
*
* @param index
* The zero-based index of the element to retrieve.
*
* @return An observable sequence that produces the element at the specified
* position in the source sequence.
*
* @throws IndexOutOfBoundsException
* Index is greater than or equal to the number of elements in the source sequence.
* @throws IndexOutOfBoundsException
* Index is less than 0.
*/
public Observable<T> elementAt(int index) {
return create(OperationElementAt.elementAt(this, index));
}

/**
* Returns the element at a specified index in a sequence or the default
* value if the index is out of range.
*
* @param index
* The zero-based index of the element to retrieve.
* @param defaultValue
* The default value.
*
* @return An observable sequence that produces the element at the specified
* position in the source sequence, or the default value if the
* index is outside the bounds of the source sequence.
*
* @throws IndexOutOfBoundsException
* Index is less than 0.
*/
public Observable<T> elementAtOrDefault(int index, T defaultValue) {
return create(OperationElementAt.elementAtOrDefault(this, index, defaultValue));
}

/**
* Returns an {@link Observable} that emits <code>true</code> if any element of the source {@link Observable} satisfies
* the given condition, otherwise <code>false</code>. Note: always emit <code>false</code> if the source {@link Observable} is empty.
Expand Down
15 changes: 15 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationAny.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import static org.mockito.Mockito.*;
Expand Down
247 changes: 247 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationElementAt.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.util.Iterator;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;

/**
* Returns the element at a specified index in a sequence.
*/
public class OperationElementAt {

/**
* Returns the element at a specified index in a sequence.
*
* @param source
* Observable sequence to return the element from.
* @param index
* The zero-based index of the element to retrieve.
*
* @return An observable sequence that produces the element at the specified
* position in the source sequence.
*
* @throws IndexOutOfBoundsException
* Index is greater than or equal to the number of elements in
* the source sequence.
* @throws IndexOutOfBoundsException
* Index is less than 0.
*/
public static <T> OnSubscribeFunc<T> elementAt(Observable<? extends T> source, int index) {
return new ElementAt<T>(source, index, null, false);
}

/**
* Returns the element at a specified index in a sequence or the default
* value if the index is out of range.
*
* @param source
* Observable sequence to return the element from.
* @param index
* The zero-based index of the element to retrieve.
* @param defaultValue
* The default value.
*
* @return An observable sequence that produces the element at the specified
* position in the source sequence, or the default value if the
* index is outside the bounds of the source sequence.
*
* @throws IndexOutOfBoundsException
* Index is less than 0.
*/
public static <T> OnSubscribeFunc<T> elementAtOrDefault(Observable<? extends T> source, int index, T defaultValue) {
return new ElementAt<T>(source, index, defaultValue, true);
}

private static class ElementAt<T> implements OnSubscribeFunc<T> {

private final Observable<? extends T> source;
private final int index;
private final boolean hasDefault;
private final T defaultValue;

private ElementAt(Observable<? extends T> source, int index,
T defaultValue, boolean hasDefault) {
this.source = source;
this.index = index;
this.defaultValue = defaultValue;
this.hasDefault = hasDefault;
}

@Override
public Subscription onSubscribe(final Observer<? super T> observer) {
final SafeObservableSubscription subscription = new SafeObservableSubscription();
return subscription.wrap(source.subscribe(new Observer<T>() {

private AtomicInteger counter = new AtomicInteger();

@Override
public void onNext(T value) {
try {
int currentIndex = counter.getAndIncrement();
if (currentIndex == index) {
observer.onNext(value);
observer.onCompleted();
} else if (currentIndex > index) {
// this will work if the sequence is asynchronous,
// it will have no effect on a synchronous observable
subscription.unsubscribe();
}
} catch (Throwable ex) {
observer.onError(ex);
// this will work if the sequence is asynchronous, it
// will have no effect on a synchronous observable
subscription.unsubscribe();
}

}

@Override
public void onError(Throwable ex) {
observer.onError(ex);
}

@Override
public void onCompleted() {
if (index < 0) {
observer.onError(new IndexOutOfBoundsException(index + " is out of bounds"));
} else if (counter.get() <= index) {
if (hasDefault) {
observer.onNext(defaultValue);
observer.onCompleted();
} else {
observer.onError(new IndexOutOfBoundsException(index + " is out of bounds"));
}
}
}
}));
}
}

public static class UnitTest {

@Test
public void testElementAt() {
Observable<Integer> w = Observable.from(1, 2);
Observable<Integer> observable = Observable.create(elementAt(w, 1));

@SuppressWarnings("unchecked")
Observer<Integer> aObserver = mock(Observer.class);
observable.subscribe(aObserver);
verify(aObserver, never()).onNext(1);
verify(aObserver, times(1)).onNext(2);
verify(aObserver, never()).onError(
any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testElementAtWithMinusIndex() {
Observable<Integer> w = Observable.from(1, 2);
Observable<Integer> observable = Observable
.create(elementAt(w, -1));

try {
Iterator<Integer> iter = OperationToIterator
.toIterator(observable);
assertTrue(iter.hasNext());
iter.next();
fail("expect an IndexOutOfBoundsException when index is out of bounds");
} catch (IndexOutOfBoundsException e) {
}
}

@Test
public void testElementAtWithIndexOutOfBounds()
throws InterruptedException, ExecutionException {
Observable<Integer> w = Observable.from(1, 2);
Observable<Integer> observable = Observable.create(elementAt(w, 2));
try {
Iterator<Integer> iter = OperationToIterator
.toIterator(observable);
assertTrue(iter.hasNext());
iter.next();
fail("expect an IndexOutOfBoundsException when index is out of bounds");
} catch (IndexOutOfBoundsException e) {
}
}

@Test
public void testElementAtOrDefault() throws InterruptedException,
ExecutionException {
Observable<Integer> w = Observable.from(1, 2);
Observable<Integer> observable = Observable
.create(elementAtOrDefault(w, 1, 0));

@SuppressWarnings("unchecked")
Observer<Integer> aObserver = mock(Observer.class);
observable.subscribe(aObserver);
verify(aObserver, never()).onNext(1);
verify(aObserver, times(1)).onNext(2);
verify(aObserver, never()).onError(any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testElementAtOrDefaultWithIndexOutOfBounds()
throws InterruptedException, ExecutionException {
Observable<Integer> w = Observable.from(1, 2);
Observable<Integer> observable = Observable
.create(elementAtOrDefault(w, 2, 0));

@SuppressWarnings("unchecked")
Observer<Integer> aObserver = mock(Observer.class);
observable.subscribe(aObserver);
verify(aObserver, never()).onNext(1);
verify(aObserver, never()).onNext(2);
verify(aObserver, times(1)).onNext(0);
verify(aObserver, never()).onError(any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testElementAtOrDefaultWithMinusIndex() {
Observable<Integer> w = Observable.from(1, 2);
Observable<Integer> observable = Observable
.create(elementAtOrDefault(w, -1, 0));

try {
Iterator<Integer> iter = OperationToIterator
.toIterator(observable);
assertTrue(iter.hasNext());
iter.next();
fail("expect an IndexOutOfBoundsException when index is out of bounds");
} catch (IndexOutOfBoundsException e) {
}
}
}
}

0 comments on commit 49d1cad

Please sign in to comment.