Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merging streams from Future-based subscriptions fails for RxJava >= 0.17 #1022

Closed
s-urbaniak opened this issue Apr 7, 2014 · 7 comments
Closed

Comments

@s-urbaniak
Copy link

Hi,

The merge() operation fails for me starting from RxJava>=0.17. I refactored to the new OnSubscribe idiom as follows:

public class ResponseOnSubscribe implements Observable.OnSubscribe<Response> {
...
    @Override
    public void call(Subscriber<? super Response> subscriber) {
        try {
            Future<Response> f = builder.execute(newAsyncHandler(subscriber));
            subscriber.add(Subscriptions.from(f));
        } catch (IOException e) {
            throw Throwables.propagate(e);
        }
    }
...
}

https://github.com/s-urbaniak/rxning/blob/master/src/main/java/org/urbaniak/ning/ResponseOnSubscribe.java

As you see I add create a Subscription from a future and add it the the subscriber.

I also provide a convenience factory method to create an Observable:

    public static Observable<Response> create(final AsyncHttpClient.BoundRequestBuilder builder) {
        return Observable.create(new ResponseOnSubscribe(builder));
    }

When I construct a merged Observable like so:

    @Test
    public void testMerge() {
        AsyncHttpClient client = new AsyncHttpClient();

        Observable<Response> obs1 = NingObservable
                .create(client.prepareGet("http://www.wikipedia.org"));

        Observable<Response> obs2 = NingObservable
                .create(client.prepareGet("http://www.wikipedia.de"));

        Observable<String> bodies = Observable
                .merge(obs1, obs2)
                .map(Responses.toString);

        bodies.toBlockingObservable().forEach(Actions.systemOut);
    }

I recognize the following behavior and exception:

java.util.concurrent.CancellationException
    at com.ning.http.client.providers.netty.NettyResponseFuture.cancel(NettyResponseFuture.java:177)
    at rx.subscriptions.Subscriptions$2.unsubscribe(Subscriptions.java:76)
    at rx.subscriptions.CompositeSubscription.unsubscribeFromAll(CompositeSubscription.java:175)
    at rx.subscriptions.CompositeSubscription.unsubscribe(CompositeSubscription.java:168)
    at rx.Subscriber.unsubscribe(Subscriber.java:59)
    at rx.subscriptions.CompositeSubscription.remove(CompositeSubscription.java:138)
    at rx.operators.OperatorMerge$1$InnerObserver.cleanup(OperatorMerge.java:103)
    at rx.operators.OperatorMerge$1$InnerObserver.onCompleted(OperatorMerge.java:85)
    at org.urbaniak.ning.ChunkedOnSubscribe$1.onCompleted(ChunkedOnSubscribe.java:47)
    at org.urbaniak.ning.ChunkedOnSubscribe$1.onCompleted(ChunkedOnSubscribe.java:38)

What happens (imho) is that CompositeSubscription.java:168 prematurely unsubsribes from any streams that are not completed yet.

I recognized this behavior in my code starting from #897

Am I missing something or is my assumption about the behavior of merge() fundamentally wrong or do I use the 0.17 facilities in a wrong way?

A minimal failing implementation is available at https://github.com/s-urbaniak/rxning for 0.17.x and a working version for 0.16.x at https://github.com/s-urbaniak/rxning/tree/0.16.1. A 'mvn test' will reveal the failing tests on the console.

Thanks for any pointers and thanks for this very great library!!!
Sergiusz

@benjchristensen
Copy link
Member

It will call unsubscribe on each Observable after it completes. This is what merge is supposed to do but wasn't before 0.17, which is why you're just now seeing this in your code.

You can see the code here: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/operators/OperatorMerge.java#L100 and the issue that resulted in this change at #897

Without this memory leaks can occur.

If you look at the code you'll see that it only calls unsubscribe when onComplete or onError is received by calling childSubscriptions.remove(this) and unsubscribing only the currently terminated Observable, not all others.

I think the problem is that your code is emitting a Response object that is tied to the Future and that it fails even if cancelled "after" having received a successful response.

An Observable can (and should) be unsubscribed as soon as it emits onError or onCompleted. See section 4.3 of Rx Design Guidelines for more on this:

4.3. Assume resources are cleaned up after an OnError or OnCompleted
message
Paragraph 4.1 states that no more messages should arrive after an OnError or OnCompleted message. This makes it possible to cleanup any resource used by the subscription the moment an OnError or OnCompleted arrives. Cleaning up resources immediately will make sure that any side-effect occurs in a predictable fashion. It also makes sure that the runtime can reclaim these resources.

Thus, you should not emit anything to onNext that relies upon unsubscribe not having been called since you can't control the processing later in the sequence (it can be delayed, buffered, rescheduled etc) and should be pure data. In this particular example just emit the value of the Response rather than Response itself and it should work.

@s-urbaniak
Copy link
Author

Thanks a lot for your extensive answer and for taking your time! I refactored my code to emit a String instead of the original Response object but observed still the very same behavior. I was also aware of the Rx Design Guidelines and the design principle not to emit values that rely on possibly cleaned resources (the Future in this case) and did not see any references to the original Future in the Response object.

But you brought me on the right track by explaining the intended behavior of OperatorMerge and by looking at the stacktrace more carefully. The cause of the problem is a side-effect which became visible after the #897 optimizations. What happens is the following:

Assuming you have two Streams 1 and 2 being merged together:

  1. Stream 1 completes
  2. OperatorMerge unsubscribes from Stream 1 as per Automatic unsubscribing in OperatorMerge #897 earlier than in previous implementations
  3. The unsubscription operation is delegated to calling cancel() on the Future of Stream 1
  4. Now (unfortunately) whenever you call cancel() on the request Future the Apache ning library calls AsyncHandler#onThrowable() with an instance of a j.u.c.CancellationException as per
    https://github.com/AsyncHttpClient/async-http-client/blob/master/providers/netty/src/main/java/org/asynchttpclient/providers/netty/future/NettyResponseFuture.java#L168
  5. The AsyncHandler#onThrowable() callback propagates the CancellationException to Stream 1's Observer by calling obs#onError(Throwable)
  6. The underlying MergeOperator (correctly) propagates the thrown Exception and also finishes Stream 2 prematurely with an error.

My workaround unfortunately is to ignore a concrete CancellationException in the callback (and abusing Exceptions for control flow :-( ):

@Override
public void onThrowable(Throwable t) {
  if (!(t instanceof CancellationException)) {
    obs.onError(t);
  }
}

@benjchristensen
Copy link
Member

Am I reading that correctly that the Future always will emit an exception even if already successfully completed?

@s-urbaniak
Copy link
Author

That is the behavior I am observing. Here is the set of String messages gathered by a ConcurrentLinkedQueue replaying the stream of merged events without the instanceof check and Stream 1 ending before Stream 2:

onNext Stream 2
onNext Stream 1
onNext Stream 1
onNext Stream 2
onNext Stream 2
onNext Stream 1
onNext Stream 2
onNext Stream 1
onNext Stream 2
onNext Stream 1
onNext Stream 2
onCompleted Stream 1
unsubscribe: Stream 1 (thus calling Future.cancel())
onError Stream 1 (received java.util.concurrent.CancellationException because the Future was cancel()ed and propagating to onError)
unsubscribe: Stream 2 (unsubscribed due to error)

@s-urbaniak
Copy link
Author

To be more precise: I observe this behavior if and only if one cancel()s (via unsubscribe()) a ning request Future that is already completed (as is done in Observable#merge()).

If one unsubscribes to a Future that has not completed yet (i.e. by using Observable#take()) the CancellationException is not being emitted.

This behavior really is dependent on the library you use (in my case ning).

@benjchristensen
Copy link
Member

I'd suggest this is a bug with the Ning Future since it shouldn't do anything if cancel() is called if it has already received a successful response.

@s-urbaniak
Copy link
Author

Closing my issue as there is nothing to be done in RxJava.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants