-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
2.x: Fix cancel/dispose upon upstream switch for some operators #6258
Conversation
Codecov Report
@@ Coverage Diff @@
## 2.x #6258 +/- ##
============================================
+ Coverage 98.22% 98.28% +0.05%
- Complexity 6203 6210 +7
============================================
Files 667 667
Lines 44889 44898 +9
Branches 6216 6219 +3
============================================
+ Hits 44093 44126 +33
+ Misses 254 242 -12
+ Partials 542 530 -12
Continue to review full report at Codecov.
|
David, can you please allow 2-3 days window for review on this one? |
Sure Artem. |
child.onNext(t); | ||
public void onNext(Object t) { | ||
Subscription s = get(); | ||
if (s != SubscriptionHelper.CANCELLED) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
offtopic: I'm wondering if we should drop SubscriptionHelper.isCancelled()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want to inline all of them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR extends the
SubscriptionArbiter
to optionally allow or disallow cancelling the currentSubscription
if it is replaced by a new one. Some operators do not need to cancel the currentSubscription
:concat
,concatMap
,repeat
,repeatWhen
,retry
andretryWhen
.In addition
repeatWhen
andretryWhen
were cancelling when the handler sequence itself terminated. The code has been updated to disconnect the upstream upon the completion/failure but before signaling the handler.The Reactive Streams specification also disallows synchronous cancellation after the terminal event anyway.
Others may actually need to cancel, such as
Timeout
.Observable
s don't have a specific arbiter, they use theDisposableHelper
methods and the relevant ones were changed toreplace()
instead of the disposingset
call.Some tests actually checking if the dispose/cancel happens and had to be updated.
The
Flowable.delaySubscription(Publisher)
also usedSubscriptionArbiter
but it was unnecessary. The code has been replaced with a more apt deferred requesting scheme as the downstream requests need to be delayed until the main subscription happens, the other publisher is always consumed unbounded.Resolves: #6259