Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CombineLatestDelayError emits error before all inner observables emit result #4414

Closed
UMFsimke opened this issue Aug 23, 2016 · 10 comments
Closed
Assignees

Comments

@UMFsimke
Copy link
Contributor

Recently I came across following issue while using combineLatestDealyError operator. This code has been tested on Android, Nexus 5x running OS 6.0.1.

Observable<Boolean> errorObservable = Observable.create(subscriber -> {
    subscriber.onError(new NullPointerException());
});

Observable timeoutObservable = Observable.create(subscriber -> {
    subscriber.onNext(true);
    subscriber.onCompleted();
})
.delay(5, TimeUnit.SECONDS, AndroidSchedulers.mainThread())
.doOnNext(result -> Log.d("Combine", "Delay emitted"));

Observable<Object> zippedObservable = Observable.zip(
        Observable.just(true), timeoutObs,
        (flag1, flag2) -> {
            Log.d("Combine", "On Zip emitted");
            return new Object();
        });

return Observable.combineLatestDelayError(
        Arrays.asList(errorObservable, zippedObservable),
        result -> {
            Boolean flag1 = (Boolean) result[0];
            Object object = result[1];
            Log.d("Combine", "Result of errorObservable: " + flag1);
            Log.d("Combine", "Result of zippedObservable: " + object);
            return null;
        }).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<Object>() {
            @Override
            public void onCompleted() {
                Log.d("Combine", "OnCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.d("Combine", "OnError");
            }

            @Override
            public void onNext(Object o) {
                Log.d("Combine", "OnNext");
            }
        });

This piece of code is trying to simulate callbacks using delays - for example Android Pay availability callback.

Problem here arises once this code is run. The log will be:

OnError
Delay emitted
On Zip emitted

This is quite contradictory to the docs that state that expected behavior should be:

Delay emitted
On Zip emitted
OnError

The expected result does happens if you add subscriber.onNext(true) to the errorObservable before subscriber.onError...

@akarnokd
Copy link
Member

Your errorObservable doesn't emit any value and since combineLatestDelayError won't ever be able to combine values, it terminates with the error immediately.

@UMFsimke
Copy link
Contributor Author

Docs state:

Concatenates the Observable sequence of Observables into a single sequence by subscribing to each inner Observable, one after the other, one at a time and delays any errors till the all inner and the outer Observables terminate.

In diff:
https://github.com/ReactiveX/RxJava/pull/3759/files#diff-c3bbc6e9e497930d46361b0b8140431cR1163

So empahsis is on:

and delays any errors till the ALL inner and the outer Observables terminate.

In this scenario second observable is not finished and error is not delayed.

What if you want to make two API calls no matter of the result of previous and combine their result? I do understand that there is always different type of solution and approach but I just believe that either docs are not correct or this is potential bug since it should behave as stated.

@akarnokd
Copy link
Member

The documentation is incorrect as there is no "outer Observable" with this operator. Otherwise, it's an undocumented corner case that needs a revisit. Note however that even if the operator delayed the error to the very end, you wouldn't see any combined values ever (but perhaps the errors of the others).
I'll see what it takes to make the operator wait out all events.

@akarnokd
Copy link
Member

This is more complicated to work out the proper rules. For example, what should happen if you combineLatestDelayError(empty(), error(x), func2) or combineLatestDelayError(error(x), empty(), just(1), func3)?

@UMFsimke
Copy link
Contributor Author

For combineLatestDelayError(empty(), error(x), func2) it makes sense that empty() will terminate and onError will be invoked?

For second, I would say empty(), then just(1) then onError will be invoked?

I'm just following up with an idea, its not that I expect it to work that way.

@akarnokd
Copy link
Member

combineLatestDelayError can wait out all terminal events to make sure errors are reported after. The problem is that if you write combineLatest(empty(), just()) and combineLatestDelayError(empty(), just()), they are no longer behaving the same way.

@akarnokd
Copy link
Member

/cc @zsxwing

@akarnokd akarnokd self-assigned this Sep 22, 2016
@akarnokd
Copy link
Member

Sorry for the delay. Once RC3 is released, I'll look into it and work out the details.

@akarnokd
Copy link
Member

akarnokd commented Jan 5, 2017

I've considered many alternatives and can give you a reasonable workaround: apply cache() on the source Observable which you want to be consumed entirely even if the combineLatest cancels its consumer.

@akarnokd
Copy link
Member

I spent a considerable amount of time on this and was unable to come up with a reasonable way for changing combineLatest inside RxJava. All I can offer is I write you a custom operator that awaits all sources to terminate in some fashion, regardless of having been emitted values or not.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants