Skip to content

Commit

Permalink
Merge pull request #1776 from benjchristensen/compose-generics
Browse files Browse the repository at this point in the history
Observable.compose Generics
  • Loading branch information
benjchristensen committed Oct 18, 2014
2 parents 502405d + 4e88e56 commit 26ffe5e
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 9 deletions.
6 changes: 3 additions & 3 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,14 @@ public void call(Subscriber<? super R> o) {
@SuppressWarnings("unchecked")
public <R> Observable<R> compose(Transformer<? super T, ? extends R> transformer) {
// Casting to Observable<R> is type-safe because we know Observable is covariant.
return (Observable<R>) transformer.call(this);
return (Observable<R>) ((Transformer<T, ? extends R>) transformer).call(this);
}

/**
* Transformer function used by {@link #compose}.
* @warn more complete description needed
*/
public static interface Transformer<T, R> extends Func1<Observable<? extends T>, Observable<? extends R>> {
public static interface Transformer<T, R> extends Func1<Observable<T>, Observable<? extends R>> {
// cover for generics insanity
}

Expand Down
102 changes: 97 additions & 5 deletions src/test/java/rx/CovarianceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,21 @@
*/
package rx;

import static org.junit.Assert.assertEquals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;

import org.junit.Test;

import rx.Observable.Transformer;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.observables.GroupedObservable;
import rx.observers.TestSubscriber;

/**
* Test super/extends of generics.
Expand Down Expand Up @@ -59,14 +67,52 @@ public Integer call(Media t1, Media t2) {
o2.toSortedList(SORT_FUNCTION);
}


@Test
public void testGroupByCompose() {
Observable<Movie> movies = Observable.just(new HorrorMovie(), new ActionMovie(), new Movie());
TestSubscriber<String> ts = new TestSubscriber<String>();
movies.groupBy(new Func1<Movie, Class<? extends Movie>>() {

@Override
public Class<? extends Movie> call(Movie m) {
return m.getClass();
}

}).flatMap(new Func1<GroupedObservable<Class<? extends Movie>, Movie>, Observable<String>>() {

@Override
public Observable<String> call(GroupedObservable<Class<? extends Movie>, Movie> g) {
return g.compose(new Transformer<Movie, Movie>() {

@Override
public Observable<? extends Movie> call(Observable<Movie> m) {
return m.concatWith(Observable.just(new ActionMovie()));
}

}).map(new Func1<Movie, String>() {

@Override
public String call(Movie m) {
return m.toString();
}

});
}

}).subscribe(ts);
ts.assertTerminalEvent();
ts.assertNoErrors();
// System.out.println(ts.getOnNextEvents());
assertEquals(6, ts.getOnNextEvents().size());
}

@Test
public void testCovarianceOfCompose() {
Observable<HorrorMovie> movie = Observable.just(new HorrorMovie());
Observable<Movie> movie2 = movie.compose(new Transformer<Movie, Movie>() {

@Override
public Observable<? extends Movie> call(Observable<? extends Movie> t1) {
public Observable<? extends Movie> call(Observable<Movie> t1) {
return Observable.just(new Movie());
}

Expand All @@ -78,7 +124,7 @@ public void testCovarianceOfCompose2() {
Observable<Movie> movie = Observable.<Movie> just(new HorrorMovie());
Observable<HorrorMovie> movie2 = movie.compose(new Transformer<Movie, HorrorMovie>() {
@Override
public Observable<? extends HorrorMovie> call(Observable<? extends Movie> t1) {
public Observable<? extends HorrorMovie> call(Observable<Movie> t1) {
return Observable.just(new HorrorMovie());
}
});
Expand All @@ -89,7 +135,7 @@ public void testCovarianceOfCompose3() {
Observable<Movie> movie = Observable.<Movie>just(new HorrorMovie());
Observable<HorrorMovie> movie2 = movie.compose(new Transformer<Movie, HorrorMovie>() {
@Override
public Observable<? extends HorrorMovie> call(Observable<? extends Movie> t1) {
public Observable<? extends HorrorMovie> call(Observable<Movie> t1) {
return Observable.just(new HorrorMovie()).map(new Func1<HorrorMovie, HorrorMovie>() {

@Override
Expand All @@ -106,7 +152,7 @@ public void testCovarianceOfCompose4() {
Observable<HorrorMovie> movie = Observable.just(new HorrorMovie());
Observable<HorrorMovie> movie2 = movie.compose(new Transformer<HorrorMovie, HorrorMovie>() {
@Override
public Observable<? extends HorrorMovie> call(Observable<? extends HorrorMovie> t1) {
public Observable<? extends HorrorMovie> call(Observable<HorrorMovie> t1) {
return t1.map(new Func1<HorrorMovie, HorrorMovie>() {

@Override
Expand All @@ -118,6 +164,52 @@ public HorrorMovie call(HorrorMovie horrorMovie) {
});
}

@Test
public void testComposeWithDeltaLogic() {
List<Movie> list1 = Arrays.asList(new Movie(), new HorrorMovie(), new ActionMovie());
List<Movie> list2 = Arrays.asList(new ActionMovie(), new Movie(), new HorrorMovie(), new ActionMovie());
Observable<List<Movie>> movies = Observable.just(list1, list2);
movies.compose(deltaTransformer);
}

static Transformer<List<Movie>, Movie> deltaTransformer = new Transformer<List<Movie>, Movie>() {
@Override
public Observable<Movie> call(Observable<List<Movie>> movieList) {
return movieList
.startWith(new ArrayList<Movie>())
.buffer(2, 1)
.skip(1)
.flatMap(calculateDelta);
}
};

static Func1<List<List<Movie>>, Observable<Movie>> calculateDelta = new Func1<List<List<Movie>>, Observable<Movie>>() {
public Observable<Movie> call(List<List<Movie>> listOfLists) {
if (listOfLists.size() == 1) {
return Observable.from(listOfLists.get(0));
} else {
// diff the two
List<Movie> newList = listOfLists.get(1);
List<Movie> oldList = new ArrayList<Movie>(listOfLists.get(0));

Set<Movie> delta = new LinkedHashSet<Movie>();
delta.addAll(newList);
// remove all that match in old
delta.removeAll(oldList);

// filter oldList to those that aren't in the newList
oldList.removeAll(newList);

// for all left in the oldList we'll create DROP events
for (Movie old : oldList) {
delta.add(new Movie());
}

return Observable.from(delta);
}
};
};

/*
* Most tests are moved into their applicable classes such as [Operator]Tests.java
*/
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/rx/ObservableTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -1110,7 +1110,7 @@ public void testCompose() {
Observable.just(1, 2, 3).compose(new Transformer<Integer, String>() {

@Override
public Observable<String> call(Observable<? extends Integer> t1) {
public Observable<String> call(Observable<Integer> t1) {
return t1.map(new Func1<Integer, String>() {

@Override
Expand Down

0 comments on commit 26ffe5e

Please sign in to comment.