From 60f850922e4cf341c23f64c4bfeb20e29f27c204 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 18 Sep 2013 13:07:05 +0800 Subject: [PATCH 1/2] Implemented the 'elementAt' and 'elementAtOrDefault' operators. see #41 --- rxjava-core/src/main/java/rx/Observable.java | 41 +++ .../java/rx/operators/OperationElementAt.java | 238 ++++++++++++++++++ 2 files changed, 279 insertions(+) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationElementAt.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index d5d8fb82975..f20a4ec5e61 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -38,6 +38,7 @@ import rx.operators.OperationDematerialize; import rx.operators.OperationDistinctUntilChanged; import rx.operators.OperationDistinct; +import rx.operators.OperationElementAt; import rx.operators.OperationFilter; import rx.operators.OperationFinally; import rx.operators.OperationFirstOrDefault; @@ -4186,5 +4187,45 @@ private boolean isInternalImplementation(Object o) { Package p = o.getClass().getPackage(); // it can be null return p != null && p.getName().startsWith("rx.operators"); } + + /** + * Returns the element at a specified index in a sequence. + * + * @param index + * The zero-based index of the element to retrieve. + * + * @return An observable sequence that produces the element at the specified + * position in the source sequence. + * + * @throws IndexOutOfBoundsException + * Index is greater than or equal to the number of elements in + * the source sequence. + * @throws IndexOutOfBoundsException + * Index is less than 0. + */ + public Observable elementAt(int index) { + return create(OperationElementAt.elementAt(this, index)); + } + /** + * Returns the element at a specified index in a sequence or the default + * value if the index is out of range. + * + * @param index + * The zero-based index of the element to retrieve. + * @param defaultValue + * The default value. + * + * @return An observable sequence that produces the element at the specified + * position in the source sequence, or the default value if the + * index is outside the bounds of the source sequence. + * + * @throws IndexOutOfBoundsException + * Index is less than 0. + */ + public Observable elementAtOrDefault(int index, T defaultValue) { + return create(OperationElementAt.elementAtOrDefault(this, index, + defaultValue)); + } + } diff --git a/rxjava-core/src/main/java/rx/operators/OperationElementAt.java b/rxjava-core/src/main/java/rx/operators/OperationElementAt.java new file mode 100644 index 00000000000..7585650e19f --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationElementAt.java @@ -0,0 +1,238 @@ +package rx.operators; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.Iterator; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Subscription; + +/** + * Returns the element at a specified index in a sequence. + */ +public class OperationElementAt { + + /** + * Returns the element at a specified index in a sequence. + * + * @param source + * Observable sequence to return the element from. + * @param index + * The zero-based index of the element to retrieve. + * + * @return An observable sequence that produces the element at the specified + * position in the source sequence. + * + * @throws IndexOutOfBoundsException + * Index is greater than or equal to the number of elements in + * the source sequence. + * @throws IndexOutOfBoundsException + * Index is less than 0. + */ + public static OnSubscribeFunc elementAt( + Observable source, int index) { + return new ElementAt(source, index, null, false); + } + + /** + * Returns the element at a specified index in a sequence or the default + * value if the index is out of range. + * + * @param source + * Observable sequence to return the element from. + * @param index + * The zero-based index of the element to retrieve. + * @param defaultValue + * The default value. + * + * @return An observable sequence that produces the element at the specified + * position in the source sequence, or the default value if the + * index is outside the bounds of the source sequence. + * + * @throws IndexOutOfBoundsException + * Index is less than 0. + */ + public static OnSubscribeFunc elementAtOrDefault( + Observable source, int index, T defaultValue) { + return new ElementAt(source, index, defaultValue, true); + } + + private static class ElementAt implements OnSubscribeFunc { + + private final Observable source; + private final int index; + private final boolean hasDefault; + private final T defaultValue; + + private ElementAt(Observable source, int index, + T defaultValue, boolean hasDefault) { + this.source = source; + this.index = index; + this.defaultValue = defaultValue; + this.hasDefault = hasDefault; + } + + @Override + public Subscription onSubscribe(final Observer observer) { + final SafeObservableSubscription subscription = new SafeObservableSubscription(); + return subscription.wrap(source.subscribe(new Observer() { + + private AtomicInteger counter = new AtomicInteger(); + + @Override + public void onNext(T value) { + try { + int currentIndex = counter.getAndIncrement(); + if (currentIndex == index) { + observer.onNext(value); + observer.onCompleted(); + } else if (currentIndex > index) { + // this will work if the sequence is asynchronous, + // it will have no effect on a synchronous + // observable + subscription.unsubscribe(); + } + } catch (Throwable ex) { + observer.onError(ex); + // this will work if the sequence is asynchronous, it + // will have no effect on a synchronous observable + subscription.unsubscribe(); + } + + } + + @Override + public void onError(Throwable ex) { + observer.onError(ex); + } + + @Override + public void onCompleted() { + if (index < 0) { + observer.onError(new IndexOutOfBoundsException(index + + " is out of bounds")); + } else if (counter.get() <= index) { + if (hasDefault) { + observer.onNext(defaultValue); + observer.onCompleted(); + } else { + observer.onError(new IndexOutOfBoundsException( + index + " is out of bounds")); + } + } + } + })); + } + } + + public static class UnitTest { + + @Test + public void testElementAt() { + Observable w = Observable.from(1, 2); + Observable observable = Observable.create(elementAt(w, 1)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + observable.subscribe(aObserver); + verify(aObserver, never()).onNext(1); + verify(aObserver, times(1)).onNext(2); + verify(aObserver, never()).onError( + org.mockito.Matchers.any(Throwable.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testElementAtWithMinusIndex() { + Observable w = Observable.from(1, 2); + Observable observable = Observable + .create(elementAt(w, -1)); + + try { + Iterator iter = OperationToIterator + .toIterator(observable); + assertTrue(iter.hasNext()); + iter.next(); + fail("expect an IndexOutOfBoundsException when index is out of bounds"); + } catch (IndexOutOfBoundsException e) { + } + } + + @Test + public void testElementAtWithIndexOutOfBounds() + throws InterruptedException, ExecutionException { + Observable w = Observable.from(1, 2); + Observable observable = Observable.create(elementAt(w, 2)); + try { + Iterator iter = OperationToIterator + .toIterator(observable); + assertTrue(iter.hasNext()); + iter.next(); + fail("expect an IndexOutOfBoundsException when index is out of bounds"); + } catch (IndexOutOfBoundsException e) { + } + } + + @Test + public void testElementAtOrDefault() throws InterruptedException, + ExecutionException { + Observable w = Observable.from(1, 2); + Observable observable = Observable + .create(elementAtOrDefault(w, 1, 0)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + observable.subscribe(aObserver); + verify(aObserver, never()).onNext(1); + verify(aObserver, times(1)).onNext(2); + verify(aObserver, never()).onError( + org.mockito.Matchers.any(Throwable.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testElementAtOrDefaultWithIndexOutOfBounds() + throws InterruptedException, ExecutionException { + Observable w = Observable.from(1, 2); + Observable observable = Observable + .create(elementAtOrDefault(w, 2, 0)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + observable.subscribe(aObserver); + verify(aObserver, never()).onNext(1); + verify(aObserver, never()).onNext(2); + verify(aObserver, times(1)).onNext(0); + verify(aObserver, never()).onError( + org.mockito.Matchers.any(Throwable.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testElementAtOrDefaultWithMinusIndex() { + Observable w = Observable.from(1, 2); + Observable observable = Observable + .create(elementAtOrDefault(w, -1, 0)); + + try { + Iterator iter = OperationToIterator + .toIterator(observable); + assertTrue(iter.hasNext()); + iter.next(); + fail("expect an IndexOutOfBoundsException when index is out of bounds"); + } catch (IndexOutOfBoundsException e) { + } + } + } +} From 152a69373cc8367ffea84968153769d8fe92fc2c Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Sat, 21 Sep 2013 13:39:13 -0700 Subject: [PATCH 2/2] Added missing license header --- .../src/main/java/rx/operators/OperationAny.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/rxjava-core/src/main/java/rx/operators/OperationAny.java b/rxjava-core/src/main/java/rx/operators/OperationAny.java index cc4cfd2a64c..a5f12c224f1 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationAny.java +++ b/rxjava-core/src/main/java/rx/operators/OperationAny.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.operators; import static org.mockito.Mockito.*;