diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 73a11e421c..9336237c7e 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -1696,30 +1696,9 @@ public static Observable reduce(Observable sequence, Func2 ac } /** - * Returns an Observable that applies a function of your choosing to the first item emitted by a - * source Observable, then feeds the result of that function along with the second item emitted - * by an Observable into the same function, and so on until all items have been emitted by the - * source Observable, emitting the final result from the final call to your function as its sole - * output. - *

- * This technique, which is called "reduce" here, is sometimes called "fold," "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance, has an inject - * method that does a similar operation on lists. - *

- * + * Used by dynamic languages. * - * @param - * the type item emitted by the source Observable - * @param sequence - * the source Observable - * @param accumulator - * an accumulator function to be invoked on each element from the sequence, whose - * result will be used in the next accumulator call (if applicable) - * - * @return an Observable that emits a single element that is the result of accumulating the - * output from applying the accumulator to the sequence of items emitted by the source - * Observable - * @see MSDN: Observable.Aggregate - * @see Wikipedia: Fold (higher-order function) + * @see #reduce(Observable, Func2) */ public static Observable reduce(final Observable sequence, final Object accumulator) { @SuppressWarnings("rawtypes") @@ -1735,6 +1714,22 @@ public T call(T t1, T t2) { }); } + /** + * @see #reduce(Observable, Func2) + */ + public static Observable aggregate(Observable sequence, Func2 accumulator) { + return reduce(sequence, accumulator); + } + + /** + * Used by dynamic languages. + * + * @see #reduce(Observable, Func2) + */ + public static Observable aggregate(Observable sequence, Object accumulator) { + return reduce(sequence, accumulator); + } + /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted @@ -1749,6 +1744,8 @@ public T call(T t1, T t2) { * * @param * the type item emitted by the source Observable + * @param + * the type returned for each item of the target observable * @param sequence * the source Observable * @param initialValue @@ -1763,51 +1760,43 @@ public T call(T t1, T t2) { * @see MSDN: Observable.Aggregate * @see Wikipedia: Fold (higher-order function) */ - public static Observable reduce(Observable sequence, T initialValue, Func2 accumulator) { + public static Observable reduce(Observable sequence, R initialValue, Func2 accumulator) { return takeLast(create(OperationScan.scan(sequence, initialValue, accumulator)), 1); } /** - * Returns an Observable that applies a function of your choosing to the first item emitted by a - * source Observable, then feeds the result of that function along with the second item emitted - * by an Observable into the same function, and so on until all items have been emitted by the - * source Observable, emitting the final result from the final call to your function as its sole - * output. - *

- * This technique, which is called "reduce" here, is sometimes called "fold," "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance, has an inject - * method that does a similar operation on lists. - *

- * + * Used by dynamic languages. * - * @param - * the type item emitted by the source Observable - * @param sequence - * the source Observable - * @param initialValue - * a seed passed into the first execution of the accumulator function - * @param accumulator - * an accumulator function to be invoked on each element from the sequence, whose - * result will be used in the next accumulator call (if applicable) - * @return an Observable that emits a single element that is the result of accumulating the - * output from applying the accumulator to the sequence of items emitted by the source - * Observable - * @see MSDN: Observable.Aggregate - * @see Wikipedia: Fold (higher-order function) + * @see #reduce(Observable, Object, Func2) */ - public static Observable reduce(final Observable sequence, final T initialValue, final Object accumulator) { + public static Observable reduce(final Observable sequence, final R initialValue, final Object accumulator) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(accumulator); - return reduce(sequence, initialValue, new Func2() { - + return reduce(sequence, initialValue, new Func2() { @SuppressWarnings("unchecked") @Override - public T call(T t1, T t2) { - return (T) _f.call(t1, t2); + public R call(R r, T t) { + return (R) _f.call(r, t); } - }); } + /** + * @see #reduce(Observable, Object, Func2) + */ + public static Observable aggregate(Observable sequence, R initialValue, Func2 accumulator) { + return reduce(sequence, initialValue, accumulator); + } + + /** + * Used by dynamic languages. + * + * @see #reduce(Observable, Object, Func2) + */ + public static Observable aggregate(Observable sequence, R initialValue, Object accumulator) { + return reduce(sequence, initialValue, accumulator); + } + /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted @@ -1832,23 +1821,9 @@ public static Observable scan(Observable sequence, Func2 accu } /** - * Returns an Observable that applies a function of your choosing to the first item emitted by a - * source Observable, then feeds the result of that function along with the second item emitted - * by an Observable into the same function, and so on until all items have been emitted by the - * source Observable, emitting the result of each of these iterations as its own sequence. - *

- * + * Used by dynamic languages. * - * @param - * the type item emitted by the source Observable - * @param sequence - * the source Observable - * @param accumulator - * an accumulator function to be invoked on each element from the sequence, whose - * result will be emitted and used in the next accumulator call (if applicable) - * @return an Observable that emits a sequence of items that are the result of accumulating the - * output from the sequence emitted by the source Observable - * @see MSDN: Observable.Scan + * @see #scan(Observable, Func2) */ public static Observable scan(final Observable sequence, final Object accumulator) { @SuppressWarnings("rawtypes") @@ -1874,6 +1849,8 @@ public T call(T t1, T t2) { * * @param * the type item emitted by the source Observable + * @param + * the type returned for each item of the target observable * @param sequence * the source Observable * @param initialValue @@ -1885,42 +1862,25 @@ public T call(T t1, T t2) { * output from the sequence emitted by the source Observable * @see MSDN: Observable.Scan */ - public static Observable scan(Observable sequence, T initialValue, Func2 accumulator) { + public static Observable scan(Observable sequence, R initialValue, Func2 accumulator) { return create(OperationScan.scan(sequence, initialValue, accumulator)); } /** - * Returns an Observable that applies a function of your choosing to the first item emitted by a - * source Observable, then feeds the result of that function along with the second item emitted - * by an Observable into the same function, and so on until all items have been emitted by the - * source Observable, emitting the result of each of these iterations as its own sequence. - *

- * + * Used by dynamic languages. * - * @param - * the type item emitted by the source Observable - * @param sequence - * the source Observable - * @param initialValue - * the initial (seed) accumulator value - * @param accumulator - * an accumulator function to be invoked on each element from the sequence, whose - * result will be emitted and used in the next accumulator call (if applicable) - * @return an Observable that emits a sequence of items that are the result of accumulating the - * output from the sequence emitted by the source Observable - * @see MSDN: Observable.Scan + * @see #scan(Observable, Object, Func2) */ - public static Observable scan(final Observable sequence, final T initialValue, final Object accumulator) { + public static Observable scan(final Observable sequence, final R initialValue, final Object accumulator) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(accumulator); - return scan(sequence, initialValue, new Func2() { + return scan(sequence, initialValue, new Func2() { @SuppressWarnings("unchecked") @Override - public T call(T t1, T t2) { - return (T) _f.call(t1, t2); + public R call(R r, T t) { + return (R) _f.call(r, t); } - }); } @@ -3227,31 +3187,30 @@ public Observable reduce(Func2 accumulator) { } /** - * Returns an Observable that applies a function of your choosing to the first item emitted by a - * source Observable, then feeds the result of that function along with the second item emitted - * by an Observable into the same function, and so on until all items have been emitted by the - * source Observable, emitting the final result from the final call to your function as its sole - * output. - *

- * This technique, which is called "reduce" here, is sometimes called "fold," "accumulate," - * "compress," or "inject" in other programming contexts. Groovy, for instance, has an - * inject method that does a similar operation on lists. - *

- * - * - * @param accumulator - * An accumulator function to be invoked on each element from the sequence, whose result - * will be used in the next accumulator call (if applicable). + * Used by dynamic languages. * - * @return an Observable that emits a single element from the result of accumulating the output - * from the list of Observables. - * @see MSDN: Observable.Aggregate - * @see Wikipedia: Fold (higher-order function) + * @see #reduce(Func2) */ public Observable reduce(Object accumulator) { return reduce(this, accumulator); } + /** + * @see #reduce(Func2) + */ + public Observable aggregate(Func2 accumulator) { + return aggregate(this, accumulator); + } + + /** + * Used by dynamic languages. + * + * @see #reduce(Func2) + */ + public Observable aggregate(Object accumulator) { + return aggregate(this, accumulator); + } + /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted @@ -3276,37 +3235,35 @@ public Observable reduce(Object accumulator) { * @see MSDN: Observable.Aggregate * @see Wikipedia: Fold (higher-order function) */ - public Observable reduce(T initialValue, Func2 accumulator) { + public Observable reduce(R initialValue, Func2 accumulator) { return reduce(this, initialValue, accumulator); } /** - * Returns an Observable that applies a function of your choosing to the first item emitted by a - * source Observable, then feeds the result of that function along with the second item emitted - * by an Observable into the same function, and so on until all items have been emitted by the - * source Observable, emitting the final result from the final call to your function as its sole - * output. - *

- * This technique, which is called "reduce" here, is sometimes called "fold," "accumulate," - * "compress," or "inject" in other programming contexts. Groovy, for instance, has an - * inject method that does a similar operation on lists. - *

- * + * Used by dynamic languages. * - * @param initialValue - * The initial (seed) accumulator value. - * @param accumulator - * An accumulator function to be invoked on each element from the sequence, whose - * result will be used in the next accumulator call (if applicable). - * @return an Observable that emits a single element from the result of accumulating the output - * from the list of Observables. - * @see MSDN: Observable.Aggregate - * @see Wikipedia: Fold (higher-order function) + * @see #reduce(Object, Func2) */ - public Observable reduce(T initialValue, Object accumulator) { + public Observable reduce(R initialValue, Object accumulator) { return reduce(this, initialValue, accumulator); } + /** + * @see #reduce(Object, Func2) + */ + public Observable aggregate(R initialValue, Func2 accumulator) { + return aggregate(this, initialValue, accumulator); + } + + /** + * Used by dynamic languages. + * + * @see #reduce(Object, Func2) + */ + public Observable aggregate(R initialValue, Object accumulator) { + return aggregate(this, initialValue, accumulator); + } + /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted @@ -3358,23 +3315,9 @@ public Observable sample(long period, TimeUnit unit, Scheduler scheduler) { } /** - * Returns an Observable that applies a function of your choosing to the first item emitted by a - * source Observable, then feeds the result of that function along with the second item emitted - * by an Observable into the same function, and so on until all items have been emitted by the - * source Observable, emitting the result of each of these iterations. It emits the result of - * each of these iterations as a sequence from the returned Observable. This sort of function is - * sometimes called an accumulator. - *

- * - * - * @param accumulator - * An accumulator function to be invoked on each element from the sequence whose - * result will be sent via onNext and used in the next accumulator call - * (if applicable). - * - * @return an Observable sequence whose elements are the result of accumulating the output from - * the list of Observables. - * @see MSDN: Observable.Scan + * Used by dynamic languages. + * + * @see #scan(Func2) */ public Observable scan(final Object accumulator) { return scan(this, accumulator); @@ -3399,30 +3342,16 @@ public Observable scan(final Object accumulator) { * the list of Observables. * @see MSDN: Observable.Scan */ - public Observable scan(T initialValue, Func2 accumulator) { + public Observable scan(R initialValue, Func2 accumulator) { return scan(this, initialValue, accumulator); } /** - * Returns an Observable that applies a function of your choosing to the first item emitted by a - * source Observable, then feeds the result of that function along with the second item emitted - * by an Observable into the same function, then feeds the result of that function along with the - * third item into the same function, and so on, emitting the result of each of these - * iterations. This sort of function is sometimes called an accumulator. - *

- * - * - * @param initialValue - * The initial (seed) accumulator value. - * @param accumulator - * An accumulator function to be invoked on each element from the sequence whose result - * will be sent via onNext and used in the next accumulator call (if - * applicable). - * @return an Observable sequence whose elements are the result of accumulating the output from - * the list of Observables. - * @see MSDN: Observable.Scan + * Used by dynamic languages. + * + * @see #scan(Object, Func2) */ - public Observable scan(final T initialValue, final Object accumulator) { + public Observable scan(final R initialValue, final Object accumulator) { return scan(this, initialValue, accumulator); } diff --git a/rxjava-core/src/main/java/rx/operators/OperationScan.java b/rxjava-core/src/main/java/rx/operators/OperationScan.java index 0614326f0c..94ecaaf6d0 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationScan.java +++ b/rxjava-core/src/main/java/rx/operators/OperationScan.java @@ -25,7 +25,6 @@ import rx.Observable; import rx.Observer; import rx.Subscription; -import rx.util.AtomicObservableSubscription; import rx.util.functions.Func1; import rx.util.functions.Func2; @@ -43,8 +42,8 @@ public final class OperationScan { * @return An observable sequence whose elements are the result of accumulating the output from the list of Observables. * @see Observable.Scan(TSource, TAccumulate) Method (IObservable(TSource), TAccumulate, Func(TAccumulate, TSource, TAccumulate)) */ - public static Func1, Subscription> scan(Observable sequence, T initialValue, Func2 accumulator) { - return new Accumulator(sequence, initialValue, accumulator); + public static Func1, Subscription> scan(Observable sequence, R initialValue, Func2 accumulator) { + return new Accumulator(sequence, initialValue, accumulator); } /** @@ -59,78 +58,107 @@ public static Func1, Subscription> scan(Observable sequence, * @see Observable.Scan(TSource) Method (IObservable(TSource), Func(TSource, TSource, TSource)) */ public static Func1, Subscription> scan(Observable sequence, Func2 accumulator) { - return new Accumulator(sequence, null, accumulator); + return new AccuWithoutInitialValue(sequence, accumulator); } - private static class Accumulator implements Func1, Subscription> { + private static class AccuWithoutInitialValue implements Func1, Subscription> { private final Observable sequence; - private final T initialValue; - private Func2 accumlatorFunction; - private final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); - - private Accumulator(Observable sequence, T initialValue, Func2 accumulator) { + private final Func2 accumulatorFunction; + + private AccumulatingObserver accumulatingObserver; + + private AccuWithoutInitialValue(Observable sequence, Func2 accumulator) { this.sequence = sequence; - this.initialValue = initialValue; - this.accumlatorFunction = accumulator; + this.accumulatorFunction = accumulator; } - + + @Override public Subscription call(final Observer observer) { - - return subscription.wrap(sequence.subscribe(new Observer() { - private T acc = initialValue; - private boolean hasSentInitialValue = false; - - /** - * We must synchronize this because we can't allow - * multiple threads to execute the 'accumulatorFunction' at the same time because - * the accumulator code very often will be doing mutation of the 'acc' object such as a non-threadsafe HashMap - * - * Because it's synchronized it's using non-atomic variables since everything in this method is single-threaded - */ + return sequence.subscribe(new Observer() { + + // has to be synchronized so that the initial value is always sent only once. + @Override public synchronized void onNext(T value) { - if (acc == null) { - // we assume that acc is not allowed to be returned from accumulatorValue - // so it's okay to check null as being the state we initialize on - acc = value; - // this is all we do for this first value if we didn't have an initialValue - return; - } - if (!hasSentInitialValue) { - hasSentInitialValue = true; - observer.onNext(acc); - } - - try { - - acc = accumlatorFunction.call(acc, value); - if (acc == null) { - onError(new IllegalArgumentException("Null is an unsupported return value for an accumulator.")); - return; - } - observer.onNext(acc); - } catch (Exception ex) { - observer.onError(ex); - // this will work if the sequence is asynchronous, it will have no effect on a synchronous observable - subscription.unsubscribe(); + if (accumulatingObserver == null) { + observer.onNext(value); + accumulatingObserver = new AccumulatingObserver(observer, value, accumulatorFunction); + } else { + accumulatingObserver.onNext(value); } } - - public void onError(Exception ex) { - observer.onError(ex); + + @Override + public void onError(Exception e) { + observer.onError(e); } - // synchronized because we access 'hasSentInitialValue' - public synchronized void onCompleted() { - // if only one sequence value existed, we send it without any accumulation - if (!hasSentInitialValue) { - observer.onNext(acc); - } + @Override + public void onCompleted() { observer.onCompleted(); } - })); + }); + } + } + + private static class Accumulator implements Func1, Subscription> { + private final Observable sequence; + private final R initialValue; + private final Func2 accumulatorFunction; + + private Accumulator(Observable sequence, R initialValue, Func2 accumulator) { + this.sequence = sequence; + this.initialValue = initialValue; + this.accumulatorFunction = accumulator; + } + + @Override + public Subscription call(final Observer observer) { + observer.onNext(initialValue); + return sequence.subscribe(new AccumulatingObserver(observer, initialValue, accumulatorFunction)); } } + private static class AccumulatingObserver implements Observer { + private final Observer observer; + private final Func2 accumulatorFunction; + + private R acc; + + private AccumulatingObserver(Observer observer, R initialValue, Func2 accumulator) { + this.observer = observer; + this.accumulatorFunction = accumulator; + + this.acc = initialValue; + } + + /** + * We must synchronize this because we can't allow + * multiple threads to execute the 'accumulatorFunction' at the same time because + * the accumulator code very often will be doing mutation of the 'acc' object such as a non-threadsafe HashMap + * + * Because it's synchronized it's using non-atomic variables since everything in this method is single-threaded + */ + @Override + public synchronized void onNext(T value) { + try { + acc = accumulatorFunction.call(acc, value); + observer.onNext(acc); + } catch (Exception ex) { + observer.onError(ex); + } + } + + @Override + public void onError(Exception e) { + observer.onError(e); + } + + @Override + public void onCompleted() { + observer.onCompleted(); + } + } + public static class UnitTest { @Before @@ -141,28 +169,28 @@ public void before() { @Test public void testScanIntegersWithInitialValue() { @SuppressWarnings("unchecked") - Observer Observer = mock(Observer.class); + Observer observer = mock(Observer.class); Observable observable = Observable.toObservable(1, 2, 3); - Observable m = Observable.create(scan(observable, 0, new Func2() { + Observable m = Observable.create(scan(observable, "", new Func2() { @Override - public Integer call(Integer t1, Integer t2) { - return t1 + t2; + public String call(String s, Integer n) { + return s + n.toString(); } })); - m.subscribe(Observer); - - verify(Observer, never()).onError(any(Exception.class)); - verify(Observer, times(1)).onNext(0); - verify(Observer, times(1)).onNext(1); - verify(Observer, times(1)).onNext(3); - verify(Observer, times(1)).onNext(6); - verify(Observer, times(4)).onNext(anyInt()); - verify(Observer, times(1)).onCompleted(); - verify(Observer, never()).onError(any(Exception.class)); + m.subscribe(observer); + + verify(observer, never()).onError(any(Exception.class)); + verify(observer, times(1)).onNext(""); + verify(observer, times(1)).onNext("1"); + verify(observer, times(1)).onNext("12"); + verify(observer, times(1)).onNext("123"); + verify(observer, times(4)).onNext(anyString()); + verify(observer, times(1)).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); } @Test