Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Scan/Reduce/Collect Factory Ambiguity #1884

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 10 additions & 68 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -3459,7 +3459,7 @@ public final <R> Observable<R> cast(final Class<R> klass) {
* <dd>{@code collect} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param state
* @param stateFactory
* the mutable data structure that will collect the items
* @param collector
* a function that accepts the {@code state} and an emitted item, and modifies {@code state}
Expand All @@ -3468,7 +3468,7 @@ public final <R> Observable<R> cast(final Class<R> klass) {
* into a single mutable data structure
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Mathematical-and-Aggregate-Operators#collect">RxJava wiki: collect</a>
*/
public final <R> Observable<R> collect(R state, final Action2<R, ? super T> collector) {
public final <R> Observable<R> collect(Func0<R> stateFactory, final Action2<R, ? super T> collector) {
Func2<R, T, R> accumulator = new Func2<R, T, R>() {

@Override
Expand All @@ -3478,7 +3478,14 @@ public final R call(R state, T value) {
}

};
return reduce(state, accumulator);

/*
* Discussion and confirmation of implementation at
* https://github.com/ReactiveX/RxJava/issues/423#issuecomment-27642532
*
* It should use last() not takeLast(1) since it needs to emit an error if the sequence is empty.
*/
return lift(new OperatorScan<R, T>(stateFactory, accumulator)).last();
}

/**
Expand Down Expand Up @@ -5293,40 +5300,6 @@ public final <R> Observable<R> reduce(R initialValue, Func2<R, ? super T, R> acc
return scan(initialValue, accumulator).takeLast(1);
}

/**
* Returns an Observable that applies a specified accumulator function to the first item emitted by a source
* Observable and a specified seed value, then feeds the result of that function along with the second item
* emitted by an Observable into the same function, and so on until all items have been emitted by the
* source Observable, emitting the final result from the final call to your function as its sole item.
* <p>
* <img width="640" height="325" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/reduceSeed.png" alt="">
* <p>
* This technique, which is called "reduce" here, is sometimec called "aggregate," "fold," "accumulate,"
* "compress," or "inject" in other programming contexts. Groovy, for instance, has an {@code inject} method
* that does a similar operation on lists.
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator does not support backpressure because by intent it will receive all values and reduce
* them to a single {@code onNext}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code reduce} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param initialValueFactory
* factory to produce the initial (seed) accumulator item each time the Observable is subscribed to
* @param accumulator
* an accumulator function to be invoked on each item emitted by the source Observable, the
* result of which will be used in the next accumulator call
* @return an Observable that emits a single item that is the result of accumulating the output from the
* items emitted by the source Observable
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Mathematical-and-Aggregate-Operators#reduce">RxJava wiki: reduce</a>
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
*/
public final <R> Observable<R> reduce(Func0<R> initialValueFactory, Func2<R, ? super T, R> accumulator) {
return scan(initialValueFactory, accumulator).takeLast(1);
}


/**
* Returns an Observable that repeats the sequence of items emitted by the source Observable indefinitely.
* <p>
Expand Down Expand Up @@ -6359,37 +6332,6 @@ public final Observable<T> scan(Func2<T, T, T> accumulator) {
public final <R> Observable<R> scan(R initialValue, Func2<R, ? super T, R> accumulator) {
return lift(new OperatorScan<R, T>(initialValue, accumulator));
}

/**
* Returns an Observable that applies a specified accumulator function to the first item emitted by a source
* Observable and a seed value, then feeds the result of that function along with the second item emitted by
* the source Observable into the same function, and so on until all items have been emitted by the source
* Observable, emitting the result of each of these iterations.
* <p>
* <img width="640" height="320" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/scanSeed.png" alt="">
* <p>
* This sort of function is sometimes called an accumulator.
* <p>
* Note that the Observable that results from this method will emit the item returned from
* {@code initialValueFactory} as its first emitted item.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code scan} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param initialValueFactory
* factory to produce the initial (seed) accumulator item each time the Observable is subscribed to
* @param accumulator
* an accumulator function to be invoked on each item emitted by the source Observable, whose
* result will be emitted to {@link Observer}s via {@link Observer#onNext onNext} and used in the
* next accumulator call
* @return an Observable that emits the item returned from {@code initialValueFactory} followed by the
* results of each call to the accumulator function
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables#scan">RxJava wiki: scan</a>
*/
public final <R> Observable<R> scan(Func0<R> initialValueFactory, Func2<R, ? super T, R> accumulator) {
return lift(new OperatorScan<R, T>(initialValueFactory, accumulator));
}

/**
* Forces an Observable's emissions and notifications to be serialized and for it to obey the Rx contract
Expand Down
31 changes: 28 additions & 3 deletions src/test/java/rx/ObservableTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import rx.exceptions.OnErrorNotImplementedException;
import rx.functions.Action1;
import rx.functions.Action2;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.observables.ConnectableObservable;
Expand Down Expand Up @@ -965,23 +966,47 @@ public void testRangeWithScheduler() {

@Test
public void testCollectToList() {
List<Integer> list = Observable.just(1, 2, 3).collect(new ArrayList<Integer>(), new Action2<List<Integer>, Integer>() {
Observable<List<Integer>> o = Observable.just(1, 2, 3).collect(new Func0<List<Integer>>() {

@Override
public List<Integer> call() {
return new ArrayList<Integer>();
}

}, new Action2<List<Integer>, Integer>() {

@Override
public void call(List<Integer> list, Integer v) {
list.add(v);
}
}).toBlocking().last();
});

List<Integer> list = o.toBlocking().last();

assertEquals(3, list.size());
assertEquals(1, list.get(0).intValue());
assertEquals(2, list.get(1).intValue());
assertEquals(3, list.get(2).intValue());

// test multiple subscribe
List<Integer> list2 = o.toBlocking().last();

assertEquals(3, list2.size());
assertEquals(1, list2.get(0).intValue());
assertEquals(2, list2.get(1).intValue());
assertEquals(3, list2.get(2).intValue());
}

@Test
public void testCollectToString() {
String value = Observable.just(1, 2, 3).collect(new StringBuilder(), new Action2<StringBuilder, Integer>() {
String value = Observable.just(1, 2, 3).collect(new Func0<StringBuilder>() {

@Override
public StringBuilder call() {
return new StringBuilder();
}

}, new Action2<StringBuilder, Integer>() {

@Override
public void call(StringBuilder sb, Integer v) {
Expand Down
11 changes: 7 additions & 4 deletions src/test/java/rx/internal/operators/OperatorScanTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.functions.Action2;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.Func2;
Expand Down Expand Up @@ -269,22 +270,24 @@ public void onNext(Integer t) {
assertEquals(101, count.get());
}

/**
* This uses the public API collect which uses scan under the covers.
*/
@Test
public void testSeedFactory() {
Observable<List<Integer>> o = Observable.range(1, 10)
.scan(new Func0<List<Integer>>() {
.collect(new Func0<List<Integer>>() {

@Override
public List<Integer> call() {
return new ArrayList<Integer>();
}

}, new Func2<List<Integer>, Integer, List<Integer>>() {
}, new Action2<List<Integer>, Integer>() {

@Override
public List<Integer> call(List<Integer> list, Integer t2) {
public void call(List<Integer> list, Integer t2) {
list.add(t2);
return list;
}

}).takeLast(1);
Expand Down