Skip to content

Commit

Permalink
Merge pull request ReactiveX#520 from zsxwing/first
Browse files Browse the repository at this point in the history
Fixed the blocking/non-blocking first
  • Loading branch information
benjchristensen committed Dec 23, 2013
2 parents 8abcfad + a810636 commit ce734f7
Show file tree
Hide file tree
Showing 12 changed files with 1,101 additions and 291 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ def class ObservableTests {
assertEquals("one", s)
}

@Test(expected = IllegalStateException.class)
@Test(expected = IllegalArgumentException.class)
public void testSingle2() {
Observable.from("one", "two").toBlockingObservable().single({ x -> x.length() == 3})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public class BasicKotlinTests {
assertEquals("default", Observable.from("one", "two")!!.toBlockingObservable()!!.lastOrDefault("default") { x -> x!!.length > 3 })
}

[Test(expected = javaClass<IllegalStateException>())]
[Test(expected = javaClass<IllegalArgumentException>())]
public fun testSingle() {
assertEquals("one", Observable.from("one")!!.toBlockingObservable()!!.single { x -> x!!.length == 3 })
Observable.from("one", "two")!!.toBlockingObservable()!!.single { x -> x!!.length == 3 }
Expand Down
163 changes: 139 additions & 24 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,12 @@
import rx.operators.OperationElementAt;
import rx.operators.OperationFilter;
import rx.operators.OperationFinally;
import rx.operators.OperationFirstOrDefault;
import rx.operators.OperationGroupBy;
import rx.operators.OperationGroupByUntil;
import rx.operators.OperationGroupJoin;
import rx.operators.OperationInterval;
import rx.operators.OperationJoin;
import rx.operators.OperationJoinPatterns;
import rx.operators.OperationLast;
import rx.operators.OperationMap;
import rx.operators.OperationMaterialize;
import rx.operators.OperationMerge;
Expand All @@ -78,6 +76,7 @@
import rx.operators.OperationSample;
import rx.operators.OperationScan;
import rx.operators.OperationSequenceEqual;
import rx.operators.OperationSingle;
import rx.operators.OperationSkip;
import rx.operators.OperationSkipLast;
import rx.operators.OperationSkipUntil;
Expand Down Expand Up @@ -5091,37 +5090,107 @@ public Observable<T> skip(int num) {
return create(OperationSkip.skip(this, num));
}

/**
* If the Observable completes after emitting a single item, return an
* Observable containing that item. If it emits more than one item or no
* item, throw an IllegalArgumentException.
*
* @return an Observable containing the single item emitted by the source
* Observable that matches the predicate.
* @throws IllegalArgumentException
* if the source emits more than one item or no item
*/
public Observable<T> single() {
return create(OperationSingle.<T> single(this));
}

/**
* If the Observable completes after emitting a single item that matches a
* predicate, return an Observable containing that item. If it emits more
* than one such item or no item, throw an IllegalArgumentException.
*
* @param predicate
* a predicate function to evaluate items emitted by the source
* Observable
* @return an Observable containing the single item emitted by the source
* Observable that matches the predicate.
* @throws IllegalArgumentException
* if the source emits more than one item or no item matching
* the predicate
*/
public Observable<T> single(Func1<? super T, Boolean> predicate) {
return filter(predicate).single();
}

/**
* If the Observable completes after emitting a single item, return an
* Observable containing that item. If it's empty, return an Observable
* containing the defaultValue. If it emits more than one item, throw an
* IllegalArgumentException.
*
* @param defaultValue
* a default value to return if the Observable emits no item
* @return an Observable containing the single item emitted by the source
* Observable, or an Observable containing the defaultValue if no
* item.
* @throws IllegalArgumentException
* if the source emits more than one item
*/
public Observable<T> singleOrDefault(T defaultValue) {
return create(OperationSingle.<T> singleOrDefault(this, defaultValue));
}

/**
* If the Observable completes after emitting a single item that matches a
* predicate, return an Observable containing that item. If it emits no such
* item, return an Observable containing the defaultValue. If it emits more
* than one such item, throw an IllegalArgumentException.
*
* @param defaultValue
* a default value to return if the {@link Observable} emits no
* matching items
* @param predicate
* a predicate function to evaluate items emitted by the
* Observable
* @return an Observable containing the single item emitted by the source
* Observable that matches the predicate, or an Observable
* containing the defaultValue if no item matches the predicate
* @throws IllegalArgumentException
* if the source emits more than one item matching the predicate
*/
public Observable<T> singleOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
return filter(predicate).singleOrDefault(defaultValue);
}

/**
* Returns an Observable that emits only the very first item emitted by the
* source Observable.
* source Observable, or an <code>IllegalArgumentException</code> if the source
* {@link Observable} is empty.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/first.png">
*
* @return an Observable that emits only the very first item emitted by the
* source Observable, or nothing if the source Observable completes
* without emitting a single item
* @return an Observable that emits only the very first item from the
* source, or an <code>IllegalArgumentException</code> if the source {@link Observable} is empty.
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#first">RxJava Wiki: first()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177.aspx">MSDN: Observable.First</a>
*/
public Observable<T> first() {
return take(1);
return take(1).single();
}

/**
* Returns an Observable that emits only the very first item emitted by the
* source Observable that satisfies a given condition.
* source Observable that satisfies a given condition, or an <code>IllegalArgumentException</code>
* if no such items are emitted.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/firstN.png">
*
* @param predicate the condition any source emitted item has to satisfy
* @return an Observable that emits only the very first item satisfying the
* given condition from the source, or nothing if the source
* Observable completes without emitting a single matching item
* given condition from the source, or an <code>IllegalArgumentException</code> if no such items are emitted.
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#first">RxJava Wiki: first()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177.aspx">MSDN: Observable.First</a>
*/
public Observable<T> first(Func1<? super T, Boolean> predicate) {
return skipWhile(not(predicate)).take(1);
return takeFirst(predicate).single();
}

/**
Expand All @@ -5139,7 +5208,7 @@ public Observable<T> first(Func1<? super T, Boolean> predicate) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229320.aspx">MSDN: Observable.FirstOrDefault</a>
*/
public Observable<T> firstOrDefault(T defaultValue) {
return create(OperationFirstOrDefault.firstOrDefault(this, defaultValue));
return take(1).singleOrDefault(defaultValue);
}

/**
Expand All @@ -5157,8 +5226,8 @@ public Observable<T> firstOrDefault(T defaultValue) {
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#firstordefault">RxJava Wiki: firstOrDefault()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229759.aspx">MSDN: Observable.FirstOrDefault</a>
*/
public Observable<T> firstOrDefault(Func1<? super T, Boolean> predicate, T defaultValue) {
return create(OperationFirstOrDefault.firstOrDefault(this, predicate, defaultValue));
public Observable<T> firstOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
return takeFirst(predicate).singleOrDefault(defaultValue);
}

/**
Expand Down Expand Up @@ -5245,14 +5314,15 @@ public Observable<T> takeWhileWithIndex(final Func2<? super T, ? super Integer,
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/first.png">
*
* @return an Observable that emits only the very first item from the
* source, or none if the source Observable completes without
* source, or an empty Observable if the source Observable completes without
* emitting a single item
* @deprecated Use <code>take(1)</code> directly.
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#first">RxJava Wiki: first()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177.aspx">MSDN: Observable.First</a>
* @see #first()
*/
@Deprecated
public Observable<T> takeFirst() {
return first();
return take(1);
}

/**
Expand All @@ -5263,14 +5333,13 @@ public Observable<T> takeFirst() {
*
* @param predicate the condition any source emitted item has to satisfy
* @return an Observable that emits only the very first item satisfying the
* given condition from the source, or none if the source Observable
* given condition from the source, or an empty Observable if the source Observable
* completes without emitting a single matching item
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#first">RxJava Wiki: first()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177.aspx">MSDN: Observable.First</a>
* @see #first(Func1)
*/
public Observable<T> takeFirst(Func1<? super T, Boolean> predicate) {
return first(predicate);
return filter(predicate).take(1);
}

/**
Expand Down Expand Up @@ -5725,7 +5794,7 @@ public <T2, D1, D2, R> Observable<R> groupJoin(Observable<T2> right, Func1<? sup
public Observable<Boolean> isEmpty() {
return create(OperationAny.isEmpty(this));
}

/**
* Returns an Observable that emits the last item emitted by the source or
* notifies observers of an <code>IllegalArgumentException</code> if the
Expand All @@ -5738,7 +5807,53 @@ public Observable<Boolean> isEmpty() {
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observable-Operators#last">RxJava Wiki: last()</a>
*/
public Observable<T> last() {
return create(OperationLast.last(this));
return takeLast(1).single();
}

/**
* Returns an Observable that emits only the last item emitted by the source
* Observable that satisfies a given condition, or an
* IllegalArgumentException if no such items are emitted.
*
* @param predicate
* the condition any source emitted item has to satisfy
* @return an Observable that emits only the last item satisfying the given
* condition from the source, or an IllegalArgumentException if no
* such items are emitted.
* @throws IllegalArgumentException
* if no such itmes are emmited
*/
public Observable<T> last(Func1<? super T, Boolean> predicate) {
return filter(predicate).takeLast(1).single();
}

/**
* Returns an Observable that emits only the last item emitted by the source
* Observable, or a default item if the source is empty.
*
* @param defaultValue
* the default item to emit if the source Observable is empty
* @return an Observable that emits only the last item from the source, or a
* default item if the source is empty
*/
public Observable<T> lastOrDefault(T defaultValue) {
return takeLast(1).singleOrDefault(defaultValue);
}

/**
* Returns an Observable that emits only the last item emitted by the source
* Observable that satisfies a given condition, or a default item otherwise.
*
* @param defaultValue
* the default item to emit if the source Observable doesn't emit
* anything that satisfies the given condition
* @param predicate
* the condition any source emitted item has to satisfy
* @return an Observable that emits only the last item from the source that
* satisfies the given condition, or a default item otherwise
*/
public Observable<T> lastOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
return filter(predicate).takeLast(1).singleOrDefault(defaultValue);
}

/**
Expand Down
Loading

0 comments on commit ce734f7

Please sign in to comment.