Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Confusion when using Backpressure operators #3751

Closed
troinine opened this issue Mar 13, 2016 · 5 comments
Closed

Confusion when using Backpressure operators #3751

troinine opened this issue Mar 13, 2016 · 5 comments
Labels

Comments

@troinine
Copy link

Hey,

We have a use case in which a consumer might not be able to process items fast enough than what is emitted from a source observable. I understood that in this case, a backpressure with either onBackPressureBuffer() or onBackPressureDrop() might be useful. In case of overflow / drop, we would like to store items to a local storage and try processing them later when the consumer in this case is again able to handle the input rate. Our consumer is actually a remote REST call which might timeout or not be available in which case we retry.

Anyways, I tried alternative ways to address the problem but I can't find a suitable way to solve it. To illustrate my testings, here is some code:

package io.reactivex;

import org.junit.Test;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;

public class BackPressureTest {
    @Test
    public void testOnBackPressureDrop() throws InterruptedException {
        Observable<Integer> emitter = toObservable()
                .subscribeOn(Schedulers.newThread());

        emitter.onBackpressureDrop(i -> System.out.println("Dropped " + i))
                .observeOn(Schedulers.computation())
                .map(this::doWork)
                .doOnNext(i -> System.out.println("Output " + i))
                .toBlocking()
                .subscribe(new SingleItemSubscriber<>());
    }

    @Test
    public void testOnBackPressureBuffer() throws InterruptedException {
        Observable<Integer> emitter = toObservable()
                .subscribeOn(Schedulers.newThread());

        emitter.onBackpressureBuffer(2, () -> System.out.println("Overflow"))
                .observeOn(Schedulers.computation())
                .map(this::doWork)
                .doOnNext(i -> System.out.println("Output " + i))
                .toBlocking()
                .subscribe(i -> System.out.println("Subscriber received " + i));
    }

    private Observable<Integer> toObservable() {
        return Observable.create(subscriber -> {
            for (int i = 0; i < 10; i++) {
                System.out.println("Emitting " + i);

                subscriber.onNext(i);

                try {
                    Thread.sleep(250);
                } catch (InterruptedException e) {
                    subscriber.onError(e);
                }
            }

            subscriber.onCompleted();
        });
    }

    private int doWork(int integer) {
        System.out.println("Consuming " + integer);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return integer;
    }

    private static class SingleItemSubscriber<T> extends Subscriber<T> {
        @Override
        public void onStart() {
            request(1);
        }

        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(T t) {
            System.out.println("Subscriber received " + t);
            request(1);
        }
    }
}

In testOnBackPressureDrop() I would assume that after the emitter has queued some items, it would start dropping them. However, it seems that the backpressure operation subscription gets a receive size of 128 items. 128 items in memory in this case is far too much for us and we would like to control the size of the request items.

In testOnBackPressureBuffer() I would assume that the emitter would overflow after emitting more than two items into the buffer.

However, in neither of the cases, I don't experience an oveflow or dropped items. Also I realized that when using onBackPressureBuffer() it seems that in overflow, the observable emits onError(). To me that wouldn't be an option since I want the emitter to continue and I wan't to deal with the problem myself.

Could you please instruct me that what we are missing here or are we trying to do something that is not yet even possible, e.g. is the API missing an operator like onBackPressureBufferAndDrop(int capacity, Action1 onDrop)?

I wrote my tests based on the documentation in https://github.com/ReactiveX/RxJava/wiki/Backpressure

@akarnokd
Copy link
Member

It seems your source doesn't emit enough values thus the default buffer of 128 elements in observeOn can hold all of it without backpressure. The onBackpressureBuffer(int) behavior is expected and is there to give room to bursty sources but fail on sustained backpressure to prompt the developer to reevaluate the flow.

There is a PR in limbo that tries to address this buffer behavior by allowing dropping, but if you want to queue on disk, you have to write a custom operator.

@davidmoten
Copy link
Collaborator

I've bumped into the queueing on disk use case a few times but haven't implemented anything. I'll have a look (probably in a couple of weeks).

@troinine
Copy link
Author

I see.

The challenge with the PR above is that currently the overflow function does not supply the item(s) which caused the overflow. It is just a void action (Action0). Changing that to e.g. Action1 would probably break the semantics of the original onBackPressureBuffer() API so I guess the naming would need to be reconsidered for a new API if it would accept a Action1 type function.

Based on @akarnokd's comment, we should not use the buffer backpressure but ideally it seems that the current implementation of onBackpressureDrop(Action1<? super T> onDrop) is close what I'm after here but I would need to be able to control the size of the default internal buffer (128 now). We can write a custom operator of course but I was just wondering would this be something other may benefit if part of the Observable API? When using the current implementation of drop you have very little control when the dropping starts to happen and in our case we can calculate a value for the buffer based on the characteristics and configuration of the application.

The term "buffer" fooled me a bit in the API docs and I assumed that the capacity controls the size after which the source observable starts to overflow. Would it make sense to clarify that documentation at least and mention that there is an internal buffer which can hold actually more items than what you specify as your overflow buffer?

Thanks for the quick reply!

@srvaroa
Copy link
Contributor

srvaroa commented Mar 17, 2016

I've hit this a number of times and generally ended up turning most .observeOn() into .onBackpressureBuffer().observeOn(), I guess the ability to control the 128 size buffer as an optional parameter to observeOn would be a nice addition.

@akarnokd
Copy link
Member

@srvaroa PR welcome.

sebaslogen pushed a commit to sebaslogen/RxJava that referenced this issue Mar 28, 2016
The observeOn operator is backed by a small queue of 128 slots that may
overflow quickly on slow producers.  This could only be avoided by
adding a backpressure operator before the observeOn (not only
inconvenient, but also taking a perf. hit as it forces hops between two
queues).

This patch allows modifying the default queue size on the observeOn
operator.

Fixes: ReactiveX#3751
Signed-off-by: Galo Navarro <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants