-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
switchIfEmpty - fix backpressure bug and lost requests #2996
Conversation
@@ -38,33 +38,35 @@ public OperatorSwitchIfEmpty(Observable<? extends T> alternate) { | |||
final SwitchIfEmptySubscriber parent = new SwitchIfEmptySubscriber(child, ssub); | |||
ssub.set(parent); | |||
child.add(ssub); | |||
child.setProducer(new Producer() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd instantiate the ProducerArbiter before parent
and supply it to SwitchIfEmptySubscriber
and child
, saving on this 'relaying' allocation. (Remark, our life would be much easier if we could implement Producer
in Subscriber
, but since request()
is protected final
, it is unoverridable :( )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah life would be easier if request()
was public. I wouldn't mind seeing it opened up, do you think it worth discussing in an issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's too late for that. We will be careful in RxJava 2.0.
0202597
to
4f7926e
Compare
Made those changes, ta. |
4f7926e
to
c0309f0
Compare
Cleaned up a bit, made the parent subscriber class a static class and improved names |
child.onNext(t); | ||
} | ||
})); | ||
ssub.set(alternate.unsafeSubscribe(new AlternateSubscriber<T>(child, arbiter))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead:
AlternateSubscriber<T> as = new AlternateSubscriber<T>(child, arbiter);
ssub.set(as);
alternate.unsafeSubscribe(as);
c0309f0
to
81f575c
Compare
Well spotted, thanks. Updated. |
@Override | ||
public void onNext(T t) { | ||
child.onNext(t); | ||
arbiter.produced(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this produced(1)
is unnecessary: sure the arbiter's value will grow indefinitely, but there won't be any further setProducer
on it thus conserving the requested/delivered amount is less important.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The hey-you-delivered-more-than-requested check is useful though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case, you need to add this to the other child.onNext() call as well.
81f575c
to
c4fcd91
Compare
Erk, thanks for noticing that. Updated. |
Great, thanks! |
switchIfEmpty - fix backpressure bug and lost requests
OperatorSwitchIfEmpty
suffered from these conditions:setProducer
calledsuper.setProducer
instead ofchild.setProducer
Bot the of the above problems are solved by using
ProducerArbiter
from @akarnokd.Included two unit tests that failed on previous code.