Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.x: Add an operator to throttle data via controlling the requests going upstream #3781

Closed
wants to merge 1 commit into from

Conversation

abersnaze
Copy link
Contributor

Came across an interesting use case where someone needed a way to get an external signal (unknown to the subscriber) to throttle the data going through an observable chain.

@@ -1258,6 +1258,7 @@ public Completable toCompletable() {
final static Observable<Object> INSTANCE = create(new OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> subscriber) {
subscriber.setProducer(ProducerHolder.INSTANCE);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting the producer is optional in 1.x and many other operators don't do it either; one has to work around it. Why add it just here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My unit test didn't work because I don't create a producer until setProducer is called called. If setProducer is optional then I'll probably have to refactor to use the producer arbiter.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not about the arbiter, but when you trigger the subscription to the other subscriber. You'd probably have to use the frequent pattern:

MainSubscriber parent = new MainSubscriber(...)
ValveSubscriber valve = new ValveSubscriber(parent, ...);

child.add(parent);
child.add(valve);
child.setProducer(r -> parent.requestFromChild(r));

other.subscribe(valve);

return parent;

@@ -6705,6 +6706,26 @@ public final Boolean call(T t) {
}

/**
* Allow the an external signal control the amount of data being set through this Observable chain.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's a typo there: "Allow the an"

@akarnokd
Copy link
Member

@abersnaze abersnaze force-pushed the valve branch 2 times, most recently from 8f03b8e to 6f8c25c Compare June 1, 2016 22:09
@akarnokd
Copy link
Member

Do you want to pursue this further?

@akarnokd akarnokd changed the title Add an operator to throttle data via controlling the requests going upstream 1.x: Add an operator to throttle data via controlling the requests going upstream Jun 19, 2016
@akarnokd
Copy link
Member

I'm in support for this functionality but the PR has test issues. Will you have time to fix it before 1.1.7? If not, would you want me to propose my take on it?

My first month goal is to have 0 open 1.x PRs as we reach 1.1.7.

@abersnaze
Copy link
Contributor Author

Not in its current form. I was thinking that this could be made much simpler if this was composed with the rebatch operator since most of the code was batching the initial MAX_LONG request.

Because this rebatch doesn't have any queuing it makes it difficult to come up with invariants to test.

@akarnokd
Copy link
Member

Okay then, closing this for now. Don't hesitate to post a follow-up PR once you are ready.

@akarnokd akarnokd closed this Jun 22, 2016
@ZacSweers
Copy link
Contributor

I kind of attempted the inverse of this and was going to release it separately. Rather than throttling requests to the provider, it throttles emissions from the stream and buffers them. Been busy with work but here's what it looked like - https://gist.github.com/hzsweers/91de6e2b87fa2542dd38034c4cee198c. Could open a separate PR here if you think it's something you'd want here.

@akarnokd
Copy link
Member

@hzsweers is it like a "burster", i.e., collect a window of values and then burst them out? Not sure about its general use. In addition, it has race conditions and potentially missed signals.

@ZacSweers
Copy link
Contributor

ZacSweers commented Jun 23, 2016

It wouldn't "burst" them out, per se. It just ensures a minimum time between emissions, normalizing a flow. I even attempted a marble diagram for it, hope this makes sense. Note that the first three emissions after normalizing are all equally spaced by a minimum window of time. It's admittedly a niche use case, but figured I'd mention it after seeing this discussion.

image

@ZacSweers
Copy link
Contributor

There is a potential for missed signals, but only in the event of backpressure (which I don't really know how to account for) or unsubscribing before the normalizer has been able to finish draining its queue.

@akarnokd
Copy link
Member

Yeah, this came up on Stackoverflow a couple of times. I can't find it but I answered such questions.

@davidmoten
Copy link
Collaborator

This is an interesting one @hzsweers. I have a use case somewhat similar in that I have an archived timestamped stream and I'd like to replay it sped up by a factor.

If you do want to make a Normalize operator like this check out @akarnokd's blog Advanced RxJava and check out operators like OperatorObserveOn and OperatorOnBackpressureBuffer. I'd be happy to have a go at this operator but also happy to assist your PR. @akarnokd has always been very helpful when I've attempted this sort of thing and I'm sure he'd chip in (actually he'd knock it up in 5 mins but there'd be no fun for the rest of us).

Actually @hzsweers couldn't the Normalize use case be sorted by zipping with Observable.interval?

@abersnaze abersnaze mentioned this pull request Jul 18, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants