diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 851506a85c..255d1af38d 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -177,14 +177,14 @@ public void call(Subscriber o) { @SuppressWarnings("unchecked") public Observable compose(Transformer transformer) { // Casting to Observable is type-safe because we know Observable is covariant. - return (Observable) transformer.call(this); + return (Observable) ((Transformer) transformer).call(this); } - + /** * Transformer function used by {@link #compose}. * @warn more complete description needed */ - public static interface Transformer extends Func1, Observable> { + public static interface Transformer extends Func1, Observable> { // cover for generics insanity } diff --git a/src/test/java/rx/CovarianceTest.java b/src/test/java/rx/CovarianceTest.java index 26f060b5d3..b269751bec 100644 --- a/src/test/java/rx/CovarianceTest.java +++ b/src/test/java/rx/CovarianceTest.java @@ -15,13 +15,21 @@ */ package rx; +import static org.junit.Assert.assertEquals; + import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; import org.junit.Test; import rx.Observable.Transformer; import rx.functions.Func1; import rx.functions.Func2; +import rx.observables.GroupedObservable; +import rx.observers.TestSubscriber; /** * Test super/extends of generics. @@ -59,14 +67,52 @@ public Integer call(Media t1, Media t2) { o2.toSortedList(SORT_FUNCTION); } - + @Test + public void testGroupByCompose() { + Observable movies = Observable.just(new HorrorMovie(), new ActionMovie(), new Movie()); + TestSubscriber ts = new TestSubscriber(); + movies.groupBy(new Func1>() { + + @Override + public Class call(Movie m) { + return m.getClass(); + } + + }).flatMap(new Func1, Movie>, Observable>() { + + @Override + public Observable call(GroupedObservable, Movie> g) { + return g.compose(new Transformer() { + + @Override + public Observable call(Observable m) { + return m.concatWith(Observable.just(new ActionMovie())); + } + + }).map(new Func1() { + + @Override + public String call(Movie m) { + return m.toString(); + } + + }); + } + + }).subscribe(ts); + ts.assertTerminalEvent(); + ts.assertNoErrors(); + // System.out.println(ts.getOnNextEvents()); + assertEquals(6, ts.getOnNextEvents().size()); + } + @Test public void testCovarianceOfCompose() { Observable movie = Observable.just(new HorrorMovie()); Observable movie2 = movie.compose(new Transformer() { @Override - public Observable call(Observable t1) { + public Observable call(Observable t1) { return Observable.just(new Movie()); } @@ -78,7 +124,7 @@ public void testCovarianceOfCompose2() { Observable movie = Observable. just(new HorrorMovie()); Observable movie2 = movie.compose(new Transformer() { @Override - public Observable call(Observable t1) { + public Observable call(Observable t1) { return Observable.just(new HorrorMovie()); } }); @@ -89,7 +135,7 @@ public void testCovarianceOfCompose3() { Observable movie = Observable.just(new HorrorMovie()); Observable movie2 = movie.compose(new Transformer() { @Override - public Observable call(Observable t1) { + public Observable call(Observable t1) { return Observable.just(new HorrorMovie()).map(new Func1() { @Override @@ -106,7 +152,7 @@ public void testCovarianceOfCompose4() { Observable movie = Observable.just(new HorrorMovie()); Observable movie2 = movie.compose(new Transformer() { @Override - public Observable call(Observable t1) { + public Observable call(Observable t1) { return t1.map(new Func1() { @Override @@ -118,6 +164,52 @@ public HorrorMovie call(HorrorMovie horrorMovie) { }); } + @Test + public void testComposeWithDeltaLogic() { + List list1 = Arrays.asList(new Movie(), new HorrorMovie(), new ActionMovie()); + List list2 = Arrays.asList(new ActionMovie(), new Movie(), new HorrorMovie(), new ActionMovie()); + Observable> movies = Observable.just(list1, list2); + movies.compose(deltaTransformer); + } + + static Transformer, Movie> deltaTransformer = new Transformer, Movie>() { + @Override + public Observable call(Observable> movieList) { + return movieList + .startWith(new ArrayList()) + .buffer(2, 1) + .skip(1) + .flatMap(calculateDelta); + } + }; + + static Func1>, Observable> calculateDelta = new Func1>, Observable>() { + public Observable call(List> listOfLists) { + if (listOfLists.size() == 1) { + return Observable.from(listOfLists.get(0)); + } else { + // diff the two + List newList = listOfLists.get(1); + List oldList = new ArrayList(listOfLists.get(0)); + + Set delta = new LinkedHashSet(); + delta.addAll(newList); + // remove all that match in old + delta.removeAll(oldList); + + // filter oldList to those that aren't in the newList + oldList.removeAll(newList); + + // for all left in the oldList we'll create DROP events + for (Movie old : oldList) { + delta.add(new Movie()); + } + + return Observable.from(delta); + } + }; + }; + /* * Most tests are moved into their applicable classes such as [Operator]Tests.java */ diff --git a/src/test/java/rx/ObservableTests.java b/src/test/java/rx/ObservableTests.java index 33b1e9da02..10a4a9ae2d 100644 --- a/src/test/java/rx/ObservableTests.java +++ b/src/test/java/rx/ObservableTests.java @@ -1110,7 +1110,7 @@ public void testCompose() { Observable.just(1, 2, 3).compose(new Transformer() { @Override - public Observable call(Observable t1) { + public Observable call(Observable t1) { return t1.map(new Func1() { @Override