Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter breaks backpressure #3910

Closed
boris-petrov opened this issue May 5, 2016 · 2 comments
Closed

Filter breaks backpressure #3910

boris-petrov opened this issue May 5, 2016 · 2 comments
Labels

Comments

@boris-petrov
Copy link

The following code:

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import rx.Observable;
import rx.Subscription;

public class Test {
    public static void main(String[] arguments) throws InterruptedException {
        new Test().test();
    }

    void test() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        Subscription subscription = createBackpressuredObservable()
            .filter(item -> true)
            .mergeWith(createBackpressuredObservable())
            .subscribe(
                item -> {},
                e -> {
                    e.printStackTrace();
                    latch.countDown();
                },
                latch::countDown);
        latch.await(7, TimeUnit.SECONDS);
        subscription.unsubscribe();
    }

    private Observable<Integer> createBackpressuredObservable() {
        return Observable
            .<Integer> create(subscriber -> {
                new Thread(() -> {
                    while (!subscriber.isUnsubscribed()) {
                        subscriber.onNext(0);
                    }
                    if (!subscriber.isUnsubscribed()) {
                        subscriber.onCompleted();
                    }
                }).start();
            })
            .onBackpressureBuffer();
    }
}

Works with RxJava version 1.1.3, but breaks in 1.1.4 with a rx.exceptions.MissingBackpressureException. If you remove the filter on line 16, it works fine on both versions. I'm not sure I understand everything about how backpressure works, but I think this is a bug. Please correct me if I'm wrong.

As a side note, the documentation on backpressure, how it is propagated and which operators use reactive pull backpressure internally is really scarce and difficult to follow.

@akarnokd akarnokd added the Bug label May 5, 2016
@akarnokd
Copy link
Member

akarnokd commented May 5, 2016

Thanks for reporting. Fix in #3912.

@akarnokd
Copy link
Member

akarnokd commented May 5, 2016

Fix released with 1.1.5. Should be available from maven within a few hours.

@akarnokd akarnokd closed this as completed May 5, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants