-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reduce Subscription Object Allocation #1281
Reduce Subscription Object Allocation #1281
Conversation
Grrr, some unit tests are failing ... |
@Override | ||
public void unsubscribe() { | ||
synchronized (this) { | ||
if (unsubscribed || subscriptions == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the chain is never added, this won't set the chain to unsubscribed = true.
return; | ||
} else { | ||
newState = oldState.clear(); | ||
unsubscribe = new ArrayList<Subscription>(subscriptions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't clear the subscriptions, just copies it. You could just pass out the Set and replace it with null.
Set<Subscription> unsubscribe = null;
synchronized (this) {
if (unsubscribed || subscriptions == null) {
return;
}
unsubscribe = subscriptions;
subscriptions = null;
}
unsubscribeFromAll(unsubscribe);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point.
Anyone have a better name than |
if (oldState.isUnsubscribed) { | ||
return; | ||
synchronized (this) { | ||
if (unsubscribed || subscriptions == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the composite is empty, this will never set the unsubscribed flag.
|
I like |
Do you have a different take on how to solve #1204 based on the findings I've posted there? |
What the difference with |
For immediate results, using plain Set and List based composites is okay. We need to recheck the operators and use the appropriate one. For the schedulers, an MPSC-based queue still creates a lot of garbage; my SubscriptionQueue based on a ringbuffer avoids this issue at the expense of synchronization instead of atomics. For even less allocation on the new composites, we would need to implement a more light-weight HashSet (e.g., a linear probing hashset, although we would lose the benefits from the Java 8 HashMap enhancements). |
RxJava-pull-requests #1179 SUCCESS |
Aside from the extra dependency, Disruptor seems like it would be a pretty decent model for the SubscriptionQueue. It would be interesting to see if it'd perform well non-contending cases vs. the MPSC queue. |
Disruptor's RingBuffer is bounded so you'd need to either set it to a decent size or parametrize the schedulers. In addition, it is prone to deadlock because the reader may put new tasks into the queue in case of a recursive schedule. |
Agreed, if we can come up with one that is better.
Yes, I want to get those changes pulled in before we release.
We can evaluate the MPSC queue to replace these before releasing.
Yes, I only migrated the ones that directly injected a |
Talking with @headinthebox he was wondering if we should remove the I'm averse to a breaking change of that kind, even though we're pre-1.0 as we're so late in the game that breaking changes are really painful. What do you think? Anyone else have an opinion on this? The two signatures are: SubscriptionA {
public synchronized boolean isUnsubscribed();
public void add(final Subscription s)
public void remove(final Subscription s)
public void clear()
public void unsubscribe()
} SubscriptionB {
public synchronized boolean isUnsubscribed();
public void add(final Subscription s)
public void unsubscribe()
} The |
RxJava-pull-requests #1180 SUCCESS |
- significant reduction in object allocations - details on research available at ReactiveX#1204
RxJava-pull-requests #1181 SUCCESS |
I never used |
Wouldn't the only use case SubscriptionB be for an entirely linear Observable chain (does not use merge, zip, groupBy, ...)? |
Which is almost all the time. The |
Should we deprecate |
Merging ... then we can bike shed over naming before releasing 0.19. |
…performance Reduce Subscription Object Allocation
Changes
CompositeSubscription
and addedChainedSubscription
Atomic State Machine -> Mutation & Locks
The
CompositeSubscription
implementation was changed from using an atomic state machine to using locks. The state machine requires immutableState
that is swapped using CAS. This means an object allocation is needed each time.It now uses locks to protect mutable state so very few objects are created.
ChainedSubscription
The
CompositeSubscription
requires support of randomly removing aSubscription
via theremove
method. TheSubscriber
type does not expose this so can be optimized. There is now aChainedSubscription
that is used bySubscriber
instead. This allows using aLinkedList
orArrayList
rather thanHashSet
as random access is never needed. This provides a slight performance boost and reduces memory usage (1 minute test shows 16.5GB allocation forComposite
versus 14.4GC forChained
).Allocation Comparison
This shows Java Flight Recorder output from master without these changes:
This shows object allocation after changing
CompositeSubscription
:Throughput Comparison
Testing this code:
Rx 0.16.1
Rx 0.17.6 - using OnSubscribeFunc
Rx 0.17.6 - using OnSubscribe
Rx 0.18.3 - using OnSubscribe
Rx 0.19 master - CompositeSubscription state machine
Rx 0.19 master - CompositeSubscription with synchronized HashSet
Rx 0.19 master - ChainedSubscription with synchronized ArrayList
Rx 0.19 master - ChainedSubscription with synchronized LinkedList