PublishSubject ReSubscribe for publish().refCount() Behavior #426
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Allow
publish
+refCount
to support re-subscribing to the origin after decrementing the count to 0 then adding new Observers which increment count to 1+.The PublishSubject implementation was performing onError/onCompleted unsubscribe logic that was put in place long ago and I am now pretty sure it was wrong.
This was revealed while playing with
refCount
which intends on allowing a re-subscription to the source once new Observers arrive. PublishSubject was preventing that.The one use case that I'm still wondering about though is if someone subscribes to a PublishSubject after it has emitted onCompleted and isn't "restarted". That Observer would wait forever if it is a "single-shot" PublishSubject use case. I'm not sure if that's just a bad use and fits into the "don't do that" scenario, or if it's a legit issue that has a solution.
Right now this code is "thread-safe" in the visibility sense, but it's not atomic and could have race conditions between adding/removing Observers and event notifications. I don't think that's an issue as if someone is concurrently adding/removing it's always a race, but am not 100% sure if there's a use case I'm missing. This also assumes (as it always did) that someone is not invoking onNext concurrently as that would break the Rx contract.