Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observable.compose Generics #1776

Merged
merged 2 commits into from
Oct 18, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,14 @@ public void call(Subscriber<? super R> o) {
@SuppressWarnings("unchecked")
public <R> Observable<R> compose(Transformer<? super T, ? extends R> transformer) {
// Casting to Observable<R> is type-safe because we know Observable is covariant.
return (Observable<R>) transformer.call(this);
return (Observable<R>) ((Transformer<T, ? extends R>) transformer).call(this);
}

/**
* Transformer function used by {@link #compose}.
* @warn more complete description needed
*/
public static interface Transformer<T, R> extends Func1<Observable<? extends T>, Observable<? extends R>> {
public static interface Transformer<T, R> extends Func1<Observable<T>, Observable<? extends R>> {
// cover for generics insanity
}

Expand Down
102 changes: 97 additions & 5 deletions src/test/java/rx/CovarianceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,21 @@
*/
package rx;

import static org.junit.Assert.assertEquals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;

import org.junit.Test;

import rx.Observable.Transformer;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.observables.GroupedObservable;
import rx.observers.TestSubscriber;

/**
* Test super/extends of generics.
Expand Down Expand Up @@ -59,14 +67,52 @@ public Integer call(Media t1, Media t2) {
o2.toSortedList(SORT_FUNCTION);
}


@Test
public void testGroupByCompose() {
Observable<Movie> movies = Observable.just(new HorrorMovie(), new ActionMovie(), new Movie());
TestSubscriber<String> ts = new TestSubscriber<String>();
movies.groupBy(new Func1<Movie, Class<? extends Movie>>() {

@Override
public Class<? extends Movie> call(Movie m) {
return m.getClass();
}

}).flatMap(new Func1<GroupedObservable<Class<? extends Movie>, Movie>, Observable<String>>() {

@Override
public Observable<String> call(GroupedObservable<Class<? extends Movie>, Movie> g) {
return g.compose(new Transformer<Movie, Movie>() {

@Override
public Observable<? extends Movie> call(Observable<Movie> m) {
return m.concatWith(Observable.just(new ActionMovie()));
}

}).map(new Func1<Movie, String>() {

@Override
public String call(Movie m) {
return m.toString();
}

});
}

}).subscribe(ts);
ts.assertTerminalEvent();
ts.assertNoErrors();
// System.out.println(ts.getOnNextEvents());
assertEquals(6, ts.getOnNextEvents().size());
}

@Test
public void testCovarianceOfCompose() {
Observable<HorrorMovie> movie = Observable.just(new HorrorMovie());
Observable<Movie> movie2 = movie.compose(new Transformer<Movie, Movie>() {

@Override
public Observable<? extends Movie> call(Observable<? extends Movie> t1) {
public Observable<? extends Movie> call(Observable<Movie> t1) {
return Observable.just(new Movie());
}

Expand All @@ -78,7 +124,7 @@ public void testCovarianceOfCompose2() {
Observable<Movie> movie = Observable.<Movie> just(new HorrorMovie());
Observable<HorrorMovie> movie2 = movie.compose(new Transformer<Movie, HorrorMovie>() {
@Override
public Observable<? extends HorrorMovie> call(Observable<? extends Movie> t1) {
public Observable<? extends HorrorMovie> call(Observable<Movie> t1) {
return Observable.just(new HorrorMovie());
}
});
Expand All @@ -89,7 +135,7 @@ public void testCovarianceOfCompose3() {
Observable<Movie> movie = Observable.<Movie>just(new HorrorMovie());
Observable<HorrorMovie> movie2 = movie.compose(new Transformer<Movie, HorrorMovie>() {
@Override
public Observable<? extends HorrorMovie> call(Observable<? extends Movie> t1) {
public Observable<? extends HorrorMovie> call(Observable<Movie> t1) {
return Observable.just(new HorrorMovie()).map(new Func1<HorrorMovie, HorrorMovie>() {

@Override
Expand All @@ -106,7 +152,7 @@ public void testCovarianceOfCompose4() {
Observable<HorrorMovie> movie = Observable.just(new HorrorMovie());
Observable<HorrorMovie> movie2 = movie.compose(new Transformer<HorrorMovie, HorrorMovie>() {
@Override
public Observable<? extends HorrorMovie> call(Observable<? extends HorrorMovie> t1) {
public Observable<? extends HorrorMovie> call(Observable<HorrorMovie> t1) {
return t1.map(new Func1<HorrorMovie, HorrorMovie>() {

@Override
Expand All @@ -118,6 +164,52 @@ public HorrorMovie call(HorrorMovie horrorMovie) {
});
}

@Test
public void testComposeWithDeltaLogic() {
List<Movie> list1 = Arrays.asList(new Movie(), new HorrorMovie(), new ActionMovie());
List<Movie> list2 = Arrays.asList(new ActionMovie(), new Movie(), new HorrorMovie(), new ActionMovie());
Observable<List<Movie>> movies = Observable.just(list1, list2);
movies.compose(deltaTransformer);
}

static Transformer<List<Movie>, Movie> deltaTransformer = new Transformer<List<Movie>, Movie>() {
@Override
public Observable<Movie> call(Observable<List<Movie>> movieList) {
return movieList
.startWith(new ArrayList<Movie>())
.buffer(2, 1)
.skip(1)
.flatMap(calculateDelta);
}
};

static Func1<List<List<Movie>>, Observable<Movie>> calculateDelta = new Func1<List<List<Movie>>, Observable<Movie>>() {
public Observable<Movie> call(List<List<Movie>> listOfLists) {
if (listOfLists.size() == 1) {
return Observable.from(listOfLists.get(0));
} else {
// diff the two
List<Movie> newList = listOfLists.get(1);
List<Movie> oldList = new ArrayList<Movie>(listOfLists.get(0));

Set<Movie> delta = new LinkedHashSet<Movie>();
delta.addAll(newList);
// remove all that match in old
delta.removeAll(oldList);

// filter oldList to those that aren't in the newList
oldList.removeAll(newList);

// for all left in the oldList we'll create DROP events
for (Movie old : oldList) {
delta.add(new Movie());
}

return Observable.from(delta);
}
};
};

/*
* Most tests are moved into their applicable classes such as [Operator]Tests.java
*/
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/rx/ObservableTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -1110,7 +1110,7 @@ public void testCompose() {
Observable.just(1, 2, 3).compose(new Transformer<Integer, String>() {

@Override
public Observable<String> call(Observable<? extends Integer> t1) {
public Observable<String> call(Observable<Integer> t1) {
return t1.map(new Func1<Integer, String>() {

@Override
Expand Down