From c63c76b5df000686be73d8fd0bbde210c85bee9b Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Sat, 15 Nov 2014 11:02:14 -0800 Subject: [PATCH] Fix Scan/Reduce/Collect Factory Ambiguity This puts the seed factory on `collect` and removes it from `scan` and `reduce` due to ambiguity. See https://github.com/ReactiveX/RxJava/pull/1883 and https://github.com/ReactiveX/RxJava/issues/1881 --- src/main/java/rx/Observable.java | 78 +++---------------- src/test/java/rx/ObservableTests.java | 31 +++++++- .../internal/operators/OperatorScanTest.java | 11 ++- 3 files changed, 45 insertions(+), 75 deletions(-) diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index e706cf5d2e..27a97b4b3d 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -3459,7 +3459,7 @@ public final Observable cast(final Class klass) { *
{@code collect} does not operate by default on a particular {@link Scheduler}.
* * - * @param state + * @param stateFactory * the mutable data structure that will collect the items * @param collector * a function that accepts the {@code state} and an emitted item, and modifies {@code state} @@ -3468,7 +3468,7 @@ public final Observable cast(final Class klass) { * into a single mutable data structure * @see RxJava wiki: collect */ - public final Observable collect(R state, final Action2 collector) { + public final Observable collect(Func0 stateFactory, final Action2 collector) { Func2 accumulator = new Func2() { @Override @@ -3478,7 +3478,14 @@ public final R call(R state, T value) { } }; - return reduce(state, accumulator); + + /* + * Discussion and confirmation of implementation at + * https://github.com/ReactiveX/RxJava/issues/423#issuecomment-27642532 + * + * It should use last() not takeLast(1) since it needs to emit an error if the sequence is empty. + */ + return lift(new OperatorScan(stateFactory, accumulator)).last(); } /** @@ -5293,40 +5300,6 @@ public final Observable reduce(R initialValue, Func2 acc return scan(initialValue, accumulator).takeLast(1); } - /** - * Returns an Observable that applies a specified accumulator function to the first item emitted by a source - * Observable and a specified seed value, then feeds the result of that function along with the second item - * emitted by an Observable into the same function, and so on until all items have been emitted by the - * source Observable, emitting the final result from the final call to your function as its sole item. - *

- * - *

- * This technique, which is called "reduce" here, is sometimec called "aggregate," "fold," "accumulate," - * "compress," or "inject" in other programming contexts. Groovy, for instance, has an {@code inject} method - * that does a similar operation on lists. - *

- *
Backpressure Support:
- *
This operator does not support backpressure because by intent it will receive all values and reduce - * them to a single {@code onNext}.
- *
Scheduler:
- *
{@code reduce} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param initialValueFactory - * factory to produce the initial (seed) accumulator item each time the Observable is subscribed to - * @param accumulator - * an accumulator function to be invoked on each item emitted by the source Observable, the - * result of which will be used in the next accumulator call - * @return an Observable that emits a single item that is the result of accumulating the output from the - * items emitted by the source Observable - * @see RxJava wiki: reduce - * @see Wikipedia: Fold (higher-order function) - */ - public final Observable reduce(Func0 initialValueFactory, Func2 accumulator) { - return scan(initialValueFactory, accumulator).takeLast(1); - } - - /** * Returns an Observable that repeats the sequence of items emitted by the source Observable indefinitely. *

@@ -6359,37 +6332,6 @@ public final Observable scan(Func2 accumulator) { public final Observable scan(R initialValue, Func2 accumulator) { return lift(new OperatorScan(initialValue, accumulator)); } - - /** - * Returns an Observable that applies a specified accumulator function to the first item emitted by a source - * Observable and a seed value, then feeds the result of that function along with the second item emitted by - * the source Observable into the same function, and so on until all items have been emitted by the source - * Observable, emitting the result of each of these iterations. - *

- * - *

- * This sort of function is sometimes called an accumulator. - *

- * Note that the Observable that results from this method will emit the item returned from - * {@code initialValueFactory} as its first emitted item. - *

- *
Scheduler:
- *
{@code scan} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param initialValueFactory - * factory to produce the initial (seed) accumulator item each time the Observable is subscribed to - * @param accumulator - * an accumulator function to be invoked on each item emitted by the source Observable, whose - * result will be emitted to {@link Observer}s via {@link Observer#onNext onNext} and used in the - * next accumulator call - * @return an Observable that emits the item returned from {@code initialValueFactory} followed by the - * results of each call to the accumulator function - * @see RxJava wiki: scan - */ - public final Observable scan(Func0 initialValueFactory, Func2 accumulator) { - return lift(new OperatorScan(initialValueFactory, accumulator)); - } /** * Forces an Observable's emissions and notifications to be serialized and for it to obey the Rx contract diff --git a/src/test/java/rx/ObservableTests.java b/src/test/java/rx/ObservableTests.java index 9dec4e616f..41aa61645b 100644 --- a/src/test/java/rx/ObservableTests.java +++ b/src/test/java/rx/ObservableTests.java @@ -50,6 +50,7 @@ import rx.exceptions.OnErrorNotImplementedException; import rx.functions.Action1; import rx.functions.Action2; +import rx.functions.Func0; import rx.functions.Func1; import rx.functions.Func2; import rx.observables.ConnectableObservable; @@ -965,23 +966,47 @@ public void testRangeWithScheduler() { @Test public void testCollectToList() { - List list = Observable.just(1, 2, 3).collect(new ArrayList(), new Action2, Integer>() { + Observable> o = Observable.just(1, 2, 3).collect(new Func0>() { + + @Override + public List call() { + return new ArrayList(); + } + + }, new Action2, Integer>() { @Override public void call(List list, Integer v) { list.add(v); } - }).toBlocking().last(); + }); + + List list = o.toBlocking().last(); assertEquals(3, list.size()); assertEquals(1, list.get(0).intValue()); assertEquals(2, list.get(1).intValue()); assertEquals(3, list.get(2).intValue()); + + // test multiple subscribe + List list2 = o.toBlocking().last(); + + assertEquals(3, list2.size()); + assertEquals(1, list2.get(0).intValue()); + assertEquals(2, list2.get(1).intValue()); + assertEquals(3, list2.get(2).intValue()); } @Test public void testCollectToString() { - String value = Observable.just(1, 2, 3).collect(new StringBuilder(), new Action2() { + String value = Observable.just(1, 2, 3).collect(new Func0() { + + @Override + public StringBuilder call() { + return new StringBuilder(); + } + + }, new Action2() { @Override public void call(StringBuilder sb, Integer v) { diff --git a/src/test/java/rx/internal/operators/OperatorScanTest.java b/src/test/java/rx/internal/operators/OperatorScanTest.java index c382e4da20..a2361c1e6f 100644 --- a/src/test/java/rx/internal/operators/OperatorScanTest.java +++ b/src/test/java/rx/internal/operators/OperatorScanTest.java @@ -37,6 +37,7 @@ import rx.Observable; import rx.Observer; import rx.Subscriber; +import rx.functions.Action2; import rx.functions.Func0; import rx.functions.Func1; import rx.functions.Func2; @@ -269,22 +270,24 @@ public void onNext(Integer t) { assertEquals(101, count.get()); } + /** + * This uses the public API collect which uses scan under the covers. + */ @Test public void testSeedFactory() { Observable> o = Observable.range(1, 10) - .scan(new Func0>() { + .collect(new Func0>() { @Override public List call() { return new ArrayList(); } - }, new Func2, Integer, List>() { + }, new Action2, Integer>() { @Override - public List call(List list, Integer t2) { + public void call(List list, Integer t2) { list.add(t2); - return list; } }).takeLast(1);