Skip to content

Commit

Permalink
Merge pull request #866 from benjchristensen/operator-touchup
Browse files Browse the repository at this point in the history
Update OperationScan to OperatorScan
  • Loading branch information
benjchristensen committed Feb 12, 2014
2 parents 6c578f7 + 13d293f commit 2cedb25
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 153 deletions.
6 changes: 3 additions & 3 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
import rx.operators.OperationReplay;
import rx.operators.OperationRetry;
import rx.operators.OperationSample;
import rx.operators.OperationScan;
import rx.operators.OperatorScan;
import rx.operators.OperationSequenceEqual;
import rx.operators.OperationSingle;
import rx.operators.OperationSkip;
Expand Down Expand Up @@ -6140,7 +6140,7 @@ public final <U> Observable<T> sample(Observable<U> sampler) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665.aspx">MSDN: Observable.Scan</a>
*/
public final Observable<T> scan(Func2<T, T, T> accumulator) {
return lift(OperationScan.scan(accumulator));
return lift(new OperatorScan<T, T>(accumulator));
}

/**
Expand All @@ -6167,7 +6167,7 @@ public final Observable<T> scan(Func2<T, T, T> accumulator) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665.aspx">MSDN: Observable.Scan</a>
*/
public final <R> Observable<R> scan(R initialValue, Func2<R, ? super T, R> accumulator) {
return lift(OperationScan.scan(initialValue, accumulator));
return lift(new OperatorScan<R, T>(initialValue, accumulator));
}

/**
Expand Down
140 changes: 0 additions & 140 deletions rxjava-core/src/main/java/rx/operators/OperationScan.java

This file was deleted.

117 changes: 117 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperatorScan.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import rx.Observable.Operator;
import rx.Subscriber;
import rx.util.functions.Func2;

/**
* Returns an Observable that applies a function to the first item emitted by a source Observable,
* then feeds the result of that function along with the second item emitted by an Observable into
* the same function, and so on until all items have been emitted by the source Observable,
* emitting the result of each of these iterations.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/scan.png">
* <p>
* This sort of function is sometimes called an accumulator.
* <p>
* Note that when you pass a seed to <code>scan()</code> the resulting Observable will emit that
* seed as its first emitted item.
*/
public final class OperatorScan<R, T> implements Operator<R, T> {

private final R initialValue;
private final Func2<R, ? super T, R> accumulator;
// sentinel if we don't receive an initial value
private static final Object NO_INITIAL_VALUE = new Object();

/**
* Applies an accumulator function over an observable sequence and returns each intermediate
* result with the specified source and accumulator.
*
* @param sequence
* An observable sequence of elements to project.
* @param initialValue
* The initial (seed) accumulator value.
* @param accumulator
* An accumulator function to be invoked on each element from the sequence.
*
* @return An observable sequence whose elements are the result of accumulating the output from
* the list of Observables.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212007%28v=vs.103%29.aspx">Observable.Scan(TSource, TAccumulate) Method (IObservable(TSource), TAccumulate, Func(TAccumulate, TSource,
* TAccumulate))</a>
*/
public OperatorScan(R initialValue, Func2<R, ? super T, R> accumulator) {
this.initialValue = initialValue;
this.accumulator = accumulator;
}

/**
* Applies an accumulator function over an observable sequence and returns each intermediate
* result with the specified source and accumulator.
*
* @param sequence
* An observable sequence of elements to project.
* @param accumulator
* An accumulator function to be invoked on each element from the sequence.
*
* @return An observable sequence whose elements are the result of accumulating the output from
* the list of Observables.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665(v=vs.103).aspx">Observable.Scan(TSource) Method (IObservable(TSource), Func(TSource, TSource, TSource))</a>
*/
@SuppressWarnings("unchecked")
public OperatorScan(final Func2<R, ? super T, R> accumulator) {
this((R) NO_INITIAL_VALUE, accumulator);
}

@Override
public Subscriber<? super T> call(final Subscriber<? super R> observer) {
if (initialValue != NO_INITIAL_VALUE) {
observer.onNext(initialValue);
}
return new Subscriber<T>(observer) {
private R value = initialValue;

@SuppressWarnings("unchecked")
@Override
public void onNext(T value) {
if (this.value == NO_INITIAL_VALUE) {
// if there is NO_INITIAL_VALUE then we know it is type T for both so cast T to R
this.value = (R) value;
} else {
try {
this.value = accumulator.call(this.value, value);
} catch (Throwable e) {
observer.onError(e);
observer.unsubscribe();
}
}
observer.onNext(this.value);
}

@Override
public void onError(Throwable e) {
observer.onError(e);
}

@Override
public void onCompleted() {
observer.onCompleted();
}
};
}
}
4 changes: 2 additions & 2 deletions rxjava-core/src/test/java/rx/ReduceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import rx.CovarianceTest.HorrorMovie;
import rx.CovarianceTest.Movie;
import rx.operators.OperationScan;
import rx.operators.OperatorScan;
import rx.util.functions.Func2;

public class ReduceTests {
Expand Down Expand Up @@ -52,7 +52,7 @@ public Movie call(Movie t1, Movie t2) {
}
};

Observable<Movie> reduceResult = horrorMovies.lift(OperationScan.scan(chooseSecondMovie)).takeLast(1);
Observable<Movie> reduceResult = horrorMovies.scan(chooseSecondMovie).takeLast(1);

Observable<Movie> reduceResult2 = horrorMovies.reduce(chooseSecondMovie);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import static rx.operators.OperationScan.*;
import static rx.operators.OperatorScan.*;

import org.junit.Before;
import org.junit.Test;
Expand All @@ -27,7 +27,7 @@
import rx.Observer;
import rx.util.functions.Func2;

public class OperationScanTest {
public class OperatorScanTest {

@Before
public void before() {
Expand All @@ -41,14 +41,14 @@ public void testScanIntegersWithInitialValue() {

Observable<Integer> observable = Observable.from(1, 2, 3);

Observable<String> m = observable.lift(scan("", new Func2<String, Integer, String>() {
Observable<String> m = observable.scan("", new Func2<String, Integer, String>() {

@Override
public String call(String s, Integer n) {
return s + n.toString();
}

}));
});
m.subscribe(observer);

verify(observer, never()).onError(any(Throwable.class));
Expand All @@ -68,14 +68,14 @@ public void testScanIntegersWithoutInitialValue() {

Observable<Integer> observable = Observable.from(1, 2, 3);

Observable<Integer> m = observable.lift(scan(new Func2<Integer, Integer, Integer>() {
Observable<Integer> m = observable.scan(new Func2<Integer, Integer, Integer>() {

@Override
public Integer call(Integer t1, Integer t2) {
return t1 + t2;
}

}));
});
m.subscribe(observer);

verify(observer, never()).onError(any(Throwable.class));
Expand All @@ -95,14 +95,14 @@ public void testScanIntegersWithoutInitialValueAndOnlyOneValue() {

Observable<Integer> observable = Observable.from(1);

Observable<Integer> m = observable.lift(scan(new Func2<Integer, Integer, Integer>() {
Observable<Integer> m = observable.scan(new Func2<Integer, Integer, Integer>() {

@Override
public Integer call(Integer t1, Integer t2) {
return t1 + t2;
}

}));
});
m.subscribe(observer);

verify(observer, never()).onError(any(Throwable.class));
Expand Down

0 comments on commit 2cedb25

Please sign in to comment.