Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observable.compose Generics #1663

Closed
benjchristensen opened this issue Sep 4, 2014 · 34 comments
Closed

Observable.compose Generics #1663

benjchristensen opened this issue Sep 4, 2014 · 34 comments
Labels
Milestone

Comments

@benjchristensen
Copy link
Member

The generics still don't work on compose.

    public Observable<EurekaInstance> getInstanceEvents(String appName) {
        return Observable.
                create((Subscriber<? super EurekaInstance> subscriber) -> {
                    try {
                        logger.info("Fetching instance list for app: " + appName);
                        Application app = DiscoveryManager.getInstance().getDiscoveryClient().getApplication(appName);
                        if (app == null) {
                            subscriber.onError(new RuntimeException("App not found: " + appName));
                            return;
                        }
                        System.out.println("App: " + app);
                        List<InstanceInfo> instancesForApp = app.getInstances();
                        if (instancesForApp != null) {
                            logger.info("Received instance list for app: " + appName + " = " + instancesForApp.size());
                            for (InstanceInfo instance : instancesForApp) {
                                subscriber.onNext(EurekaInstance.create(instance));
                            }
                            subscriber.onCompleted();
                        } else {
                            subscriber.onError(new RuntimeException("Failed to retrieve instances for appName: " + appName));
                        }
                    } catch (Throwable e) {
                        subscriber.onError(e);
                    }
                })
                .subscribeOn(Schedulers.io())
                .compose(o -> o.map(t -> t)) // this won't let me do anything
                .repeatWhen(a -> a.flatMap(n -> Observable.timer(30, TimeUnit.SECONDS))); // repeat after 30 second delay
    }
@benjchristensen benjchristensen added this to the 1.0 milestone Sep 4, 2014
@benjchristensen
Copy link
Member Author

    public <R> Observable<R> compose(Transformer<? super T, R> transformer) {
        return transformer.call(this);
    }

    /**
     * Transformer function used by {@link #compose}.
     * @warn more complete description needed
     */
    public static interface Transformer<T, R> extends Func1<Observable<? extends T>, Observable<R>> {
        // cover for generics insanity
    }

This is what it is right now ... it's not right.

@benjchristensen
Copy link
Member Author

Either @headinthebox or @benjchristensen will buy a beer for whoever solves this!

@akarnokd
Copy link
Member

This compiles for me in Eclipse 4.4:

Observable<EurekaInstance> v = Observable.
    create((Subscriber<? super EurekaInstance> subscriber) -> {
// ...
    });
Observable<EurekaInstance> q = v.subscribeOn(Schedulers.io())
    .compose((Observable<? extends EurekaInstance> o) -> o.map(t -> t));
Observable<EurekaInstance> r = q.repeatWhen(a -> a.flatMap(
    n -> Observable.timer(30, TimeUnit.SECONDS))); // repeat after 30 second delay;

return r;

The sad thing is that the types need to be reinforced at more places and the chain split into several parts.

@zsxwing
Copy link
Member

zsxwing commented Sep 24, 2014

@benjchristensen could you try #1701?

@benjchristensen
Copy link
Member Author

This may be fixed via #1701 ... keeping open until fully confirming in systems using 1.0.0-rc.4

@benjchristensen
Copy link
Member Author

This is still hard to use ... I haven't figured this out for example:

    @Test
    public void testComposeWithDeltaLogic() {
        List<Movie> list1 = Arrays.asList(new Movie(), new HorrorMovie(), new ActionMovie());
        List<Movie> list2 = Arrays.asList(new ActionMovie(), new Movie(), new HorrorMovie(), new ActionMovie());
        Observable<List<Movie>> movies = Observable.just(list1, list2);
        movies.compose(deltaTransformer);

    }

    static Transformer<Observable<? extends List<? super Movie>>, Observable<? super Movie>> deltaTransformer = new Transformer<Observable<? extends List<? super Movie>>, Observable<? super Movie>>() {

        @Override
        public Observable<? extends Observable<? super Movie>> call(Observable<? extends Observable<? extends List<? super Movie>>> movieList) {
            return movieList
                    .startWith(new ArrayList<Movie>())
                    .buffer(2, 1)
                    .skip(1)
                    .flatMap(calculateDelta);
        }


    };


    static Func1<List<List<Movie>>, Observable<Movie>> calculateDelta = new Func1<List<List<Movie>>, Observable<Movie>>() {

        public Observable<Movie> call(List<List<Movie>> listOfLists) {
            if (listOfLists.size() == 1) {
                return Observable.from(listOfLists.get(0));
            } else {
                // diff the two
                List<Movie> newList = listOfLists.get(1);
                List<Movie> oldList = new ArrayList<Movie>(listOfLists.get(0));

                Set<Movie> delta = new LinkedHashSet<Movie>();
                delta.addAll(newList);
                // remove all that match in old
                delta.removeAll(oldList);

                // filter oldList to those that aren't in the newList
                oldList.removeAll(newList);

                // for all left in the oldList we'll create DROP events
                for (Movie old : oldList) {
                    delta.add(new Movie());
                }

                return Observable.from(delta);
            }
        };
    };

@benjchristensen
Copy link
Member Author

If we can't get compose working easily I think we should remove it from 1.0 and only bring it back if we can eventually get the generics happy.

@benjchristensen
Copy link
Member Author

@akarnokd @zsxwing What do you think about this method signature?

@zsxwing
Copy link
Member

zsxwing commented Oct 14, 2014

The following codes can work:

    @Test
    public void testComposeWithDeltaLogic() {
        List<Movie> list1 = Arrays.asList(new Movie(), new HorrorMovie(), new ActionMovie());
        List<Movie> list2 = Arrays.asList(new ActionMovie(), new Movie(), new HorrorMovie(), new ActionMovie());
        Observable<List<Movie>> movies = Observable.just(list1, list2);
        movies.compose(deltaTransformer);

    }

    static Transformer<List<Movie>, Movie> deltaTransformer = new Transformer<List<Movie>, Movie>() {

        @Override
        public Observable<? extends Movie> call(Observable<? extends List<Movie>> o) {
            Observable<List<Movie>> movieList = (Observable<List<Movie>>)o;
            return movieList
                    .startWith(new ArrayList<Movie>())
                    .buffer(2, 1)
                    .skip(1)
                    .flatMap(calculateDelta);
        }

    };


    static Func1<List<List<Movie>>, Observable<Movie>> calculateDelta = new Func1<List<List<Movie>>, Observable<Movie>>() {

        public Observable<Movie> call(List<List<Movie>> listOfLists) {
            if (listOfLists.size() == 1) {
                return Observable.from(listOfLists.get(0));
            } else {
                // diff the two
                List<Movie> newList = listOfLists.get(1);
                List<Movie> oldList = new ArrayList<Movie>(listOfLists.get(0));

                Set<Movie> delta = new LinkedHashSet<Movie>();
                delta.addAll(newList);
                // remove all that match in old
                delta.removeAll(oldList);

                // filter oldList to those that aren't in the newList
                oldList.removeAll(newList);

                // for all left in the oldList we'll create DROP events
                for (Movie old : oldList) {
                    delta.add(new Movie());
                }

                return Observable.from(delta);
            }
        };
    };

@zsxwing
Copy link
Member

zsxwing commented Oct 14, 2014

So the problem of Transformer is that Observable<? extends List<Movie>> o is hard to use. Cannot assign Observable<? extends List<Movie>> to Observable<List<Movie>> is painful in Java. I cannot find any way to fix it.

@akarnokd
Copy link
Member

In the operators publish and replay the signatures are of Func1<? super Observable<T>, ? extends Observable<R>> selector, so no nested variance. The same happens in multicast. The rationale is that the first ? super Observable<T> must be exact because the function is applied to the exact Observable<T> whereas the type of R comes from outside already and may carry its own variance.

@zsxwing
Copy link
Member

zsxwing commented Oct 14, 2014

@akarnokd so we should remove Transformer and change the signature to public <R> Observable<R> compose(Func1<? super Observable<T>, ? extends Observable<R>> transformer)?

@akarnokd
Copy link
Member

I guess in lift the use of Operator is a good way to convey the intent and meaning of the callback function to set it apart from regular stream processing. So if compose differs from publish and replay in a similar way, then lets keep it.

@benjchristensen
Copy link
Member Author

The reason for Operator in lift was to simplify the complex generics and allow the nested variance ... which works in that case.

For compose it has proven near impossible to use in any use case I've tried to use it for which means it's not very useful in its current state.

@davidmoten
Copy link
Collaborator

I've been using compose of late and +1 for Func1 as parameter instead
of Transformer. Is just a hassle especially for pre lambda code.
On 15 Oct 2014 05:39, "Ben Christensen" [email protected] wrote:

The reason for Operator in lift was to simplify the complex generics and
allow the nested variance ... which works in that case.

For compose it has proven near impossible to use in any use case I've
tried to use it for which means it's not very useful in its current state.


Reply to this email directly or view it on GitHub
#1663 (comment).

@benjchristensen
Copy link
Member Author

How would Transformer vs Func be any different? It's just a named Func to allow the nested variance, just like Operator.

@vadims
Copy link
Contributor

vadims commented Oct 15, 2014

We're using compose and it's very convenient!

  public static <T> Observable.Transformer<Optional<T>, T> ifAbsentThrow(Func0<RuntimeException> absentCase) {
    return source -> source.map(optional -> {
      if (optional.isPresent()) {
        return optional.get();
      }
      throw absentCase.call();
    });
  }

    userQueryObservables.findByUsername(username)
        .compose(ifAbsentThrow(() -> new UsernameNotFoundException(username)))
        .flatMap(user -> ...)

@benjchristensen
Copy link
Member Author

Would you recommend any signature changes based on your usage?

@vadims
Copy link
Contributor

vadims commented Oct 15, 2014

We've hit the issue described above in a couple of places, so we had to suppress the unchecked warning.

Have you considered creating a CovariantTransformer (for lack of a better name)?

    @SuppressWarnings("unchecked")
    public <R> Observable<R> compose(Transformer<T, ? extends R> transformer) {
        return (Observable<R>) transformer.call(this);
    }

    @SuppressWarnings("unchecked")
    public <R> Observable<R> compose(CovariantTransformer<? super T, ? extends R> transformer) {
        return (Observable<R>) transformer.call(this);
    }

    public static interface Transformer<T, R> extends Func1<Observable<T>, Observable<? extends R>> {
    }

    public static interface CovariantTransformer<T, R> extends Func1<Observable<? extends T>, Observable<? extends R>> {
    }

then your example above would be:

    @Test
    public void testComposeWithDeltaLogic() {
        List<Movie> list1 = Arrays.asList(new Movie(), new HorrorMovie(), new ActionMovie());
        List<Movie> list2 = Arrays.asList(new ActionMovie(), new Movie(), new HorrorMovie(), new ActionMovie());
        Observable<List<Movie>> movies = Observable.just(list1, list2);
        movies.compose(deltaTransformer);

    }

    static Transformer<List<Movie>, Movie> deltaTransformer = new Transformer<List<Movie>, Movie>() {

        @Override
        public Observable<? extends Movie> call(Observable<List<Movie>> movieList) {
            return movieList
                .startWith(new ArrayList<Movie>())
                .buffer(2, 1)
                .skip(1)
                .flatMap(calculateDelta);
        }
    };


    static Func1<List<List<Movie>>, Observable<Movie>> calculateDelta = new Func1<List<List<Movie>>, Observable<Movie>>() {

        public Observable<Movie> call(List<List<Movie>> listOfLists) {
            if (listOfLists.size() == 1) {
                return Observable.from(listOfLists.get(0));
            } else {
                // diff the two
                List<Movie> newList = listOfLists.get(1);
                List<Movie> oldList = new ArrayList<Movie>(listOfLists.get(0));

                Set<Movie> delta = new LinkedHashSet<Movie>();
                delta.addAll(newList);
                // remove all that match in old
                delta.removeAll(oldList);

                // filter oldList to those that aren't in the newList
                oldList.removeAll(newList);

                // for all left in the oldList we'll create DROP events
                for (Movie old : oldList) {
                    delta.add(new Movie());
                }

                return Observable.from(delta);
            }
        };
    };

In CovarianceTest only one test case needed to be updated to use CovariantTransformer, the rest still use Transformer with the updated signature:

    @Test
    public void testCovarianceOfCompose() {
        Observable<HorrorMovie> movie = Observable.just(new HorrorMovie());
        Observable<Movie> movie2 = movie.compose(new CovariantTransformer<Movie, Movie>() {

            @Override
            public Observable<? extends Movie> call(Observable<? extends Movie> t1) {
                return Observable.just(new Movie());
            }

        });
    }

@benjchristensen
Copy link
Member Author

Have you considered creating a CovariantTransformer (for lack of a better name)?

I haven't and it concerns me somewhat because of type erasure and impact on dynamic languages as they would not be able to disambiguate between the two.

Are these issues just weaknesses with Java or are we doing something wrong?

@vadims
Copy link
Contributor

vadims commented Oct 16, 2014

What about #1762?

@benjchristensen
Copy link
Member Author

#1762 seems to work better. The following code using Java 8 compiles with #1762 but not with what is in trunk:

import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;

import rx.Observable;

public class ComposeExample2 {

    public static void main(String[] args) {
        List<Movie> list1 = Arrays.asList(new Movie(), new HorrorMovie(), new ActionMovie());
        List<Movie> list2 = Arrays.asList(new ActionMovie(), new Movie(), new HorrorMovie(), new ActionMovie());
        Observable<List<Movie>> movies = Observable.just(list1, list2);
        Observable<Movie> compose = movies.compose(ComposeExample2::transform);
        compose.subscribe(System.out::println);
    }

    public static Observable<Movie> transform(Observable<List<Movie>> movieList) {
        return movieList
                .startWith(new ArrayList<Movie>())
                .buffer(2, 1)
                .skip(1)
                .flatMap(ComposeExample2::calculateDelta);
    }

    public static Observable<Movie> calculateDelta(List<List<Movie>> listOfLists) {
        if (listOfLists.size() == 1) {
            return Observable.from(listOfLists.get(0));
        } else {
            // diff the two
            List<Movie> newList = listOfLists.get(1);
            List<Movie> oldList = new ArrayList<Movie>(listOfLists.get(0));

            Set<Movie> delta = new LinkedHashSet<Movie>();
            delta.addAll(newList);
            // remove all that match in old
            delta.removeAll(oldList);

            // filter oldList to those that aren't in the newList
            oldList.removeAll(newList);

            // for all left in the oldList we'll create DROP events
            for (Movie old : oldList) {
                delta.add(new Movie());
            }

            return Observable.from(delta);
        }
    };

    /*
     * Most tests are moved into their applicable classes such as [Operator]Tests.java
     */

    static class Media {
    }

    static class Movie extends Media {
    }

    static class HorrorMovie extends Movie {
    }

    static class ActionMovie extends Movie {
    }

    static class Album extends Media {
    }

    static class TVSeason extends Media {
    }

    static class Rating {
    }

    static class CoolRating extends Rating {
    }

    static class Result {
    }

    static class ExtendedResult extends Result {
    }

}

Thank you @vadims

@davidmoten
Copy link
Collaborator

Problem is the obvious candidate is a Func1 and the method cannot be passed
a Func1. I think it's unnecessary slowdown for confirming what the
signature of Transformer is and yes it's a Func1 under the covers. So
having used it I'd categorise it as more obfuscation then help. It also
precludes reuse of Func1s.
On 15 Oct 2014 07:58, "Ben Christensen" [email protected] wrote:

How would Transformer vs Func be any different? It's just a named Func to
allow the nested variance, just like Operator.


Reply to this email directly or view it on GitHub
#1663 (comment).

@benjchristensen
Copy link
Member Author

@davidmoten Are you saying you'd prefer this:

    public <R> Observable<R> compose(Func1<? super Observable<T>, ? extends Observable<? extends R>> transformer) {
        // Casting to Observable<R> is type-safe because we know Observable is covariant.
        return (Observable<R>) transformer.call(this);
    }

If we went that way then code like this:

    private static Transformer<? super String, String> appendWorldTransformer() {
        return o -> o.map(s -> s + " world!").finallyDo(() -> {
            System.out.println("  some side-effect");
        });
    }

would need to change to this:

    private static Func1<Observable<? super String>, Observable<? extends String>> appendWorldTransformer() {
        return o -> o.map(s -> s + " world!").finallyDo(() -> {
            System.out.println("  some side-effect");
        });
    }

Is that what you're suggesting?

In Java 8 a method like this works with either signature:

    public static Observable<Movie> transform(Observable<List<Movie>> movieList) {
        return movieList
                .startWith(new ArrayList<Movie>())
                .buffer(2, 1)
                .skip(1)
                .flatMap(ComposeExample2::calculateDelta);
    }

Originally the reason for the Transformer cover type was that we couldn't get the nested generics to work. The second reason is to make using it easier to use ... as figuring out nested variance is REALLY hard.

I still can't figure out how to make this code work using Func1:

        Observable<Movie> movie2 = movie.compose(new Func1<Observable<Movie>, Observable<? extends Movie>>() {

            @Override
            public Observable<? extends Movie> call(Observable<Movie> t1) {
                return Observable.just(new Movie());
            }


        });

Here is the error:

The method compose(Func1<? super Observable<CovarianceTest.HorrorMovie>,? extends 
Observable<? extends R>>) in the type Observable<CovarianceTest.HorrorMovie> is not 
applicable for the arguments (new Func1<Observable<CovarianceTest.Movie>,Observable<? extends 
CovarianceTest.Movie>>(){})

So if we're going to use Func1 can you please propose a modification of #1762 that works?

@benjchristensen
Copy link
Member Author

Take a look at #1770 (comment) for example of code that needs to be made easy to write with compose.

@benjchristensen
Copy link
Member Author

I don't have a strong preference to using Func1 or Transformer but I do want code that is usable by people without fighting for several minutes to figure out which incantation of super and extend they need to sprinkle in their signatures.

The type of code we should be able to easily write is shown above in #1663 (comment)

@benjchristensen
Copy link
Member Author

I'd like to make a decision on this soon and move forward. The best option I've seen so far is #1762

@benjchristensen
Copy link
Member Author

Anyone have a better alternative than #1762 that makes the code examples above work? If not I'm proceeding with it.

@davidmoten
Copy link
Collaborator

Re theFunc1 or Transformer, I'd suggest that the Func1 be used for the compose signature and whatever gets resolved for Transformer is essentially a helper class for this method (that might have a corresponding overload but I suspect we won't be able to do that?). I'll have a look at #1762 hopefully in a few hours.

@benjchristensen
Copy link
Member Author

that might have a corresponding overload but I suspect we won't be able to do that

No that can't be done because of type erasure.

I'd suggest that the Func1 be used for the compose signature and whatever gets resolved for Transformer is essentially a helper class for this method

One of the reasons we didn't do that for lift is because it is not discoverable and means the main public API that gets documented doesn't work correctly for co/contra-variance.

The other reason is that doing that seems to break some of the co/contra-variance use cases, such as this one:

screen shot 2014-10-16 at 10 44 07 pm

@benjchristensen
Copy link
Member Author

Just to be clear ... as I said earlier in the thread, if we can achieve co/contra-variance on the unit tests and make the example code (#1663 (comment)) work using just Func1 I'm open to that as I don't like cover types. We spent a lot of time on lift though and determined we needed a cover type for the same reasons. compose and lift are similar in purpose and signature.

@benjchristensen
Copy link
Member Author

I went with @vadims proposal as I have not seen anything else achieve the desired combination of usage simplicity and support for co/contra-variance. It was merged in #1776.

I am not releasing the next version for at least 24 hours so if anyone wishes to debate this further that is the window of time to offer an alternative that works with the committed unit tests and the Java 8 code shown above (#1663 (comment)).

Thank you everyone for your help in figuring this out and looking at various alternative. (Yet again I am reminded of how little I like working with co/contra-variant generics in Java.)

@zsxwing
Copy link
Member

zsxwing commented Oct 19, 2014

@benjchristensen could you take a look at #1778?

@benjchristensen
Copy link
Member Author

I will later today.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants