Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Large CompositeSubscription performance improvements #1145

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
package rx.subscriptions;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

import rx.Subscription;
Expand All @@ -36,40 +39,78 @@ public final class CompositeSubscription implements Subscription {
private static final State CLEAR_STATE;
/** Unsubscribed empty state. */
private static final State CLEAR_STATE_UNSUBSCRIBED;
/** Set mode threshold count. */
private static final int SET_MODE_THRESHOLD = 16;
/** Array mode threshold count. */
private static final int ARRAY_MODE_THRESHOLD = SET_MODE_THRESHOLD * 3 / 4;
static {
Subscription[] s0 = new Subscription[0];
Object[] s0 = new Object[0];
CLEAR_STATE = new State(false, s0);
CLEAR_STATE_UNSUBSCRIBED = new State(true, s0);
}

private static final class State {
final boolean isUnsubscribed;
final Subscription[] subscriptions;
final Object[] subscriptions;
final Set<Object> subscriptionSet;

State(boolean u, Subscription[] s) {
State(boolean u, Object[] s) {
this.isUnsubscribed = u;
this.subscriptions = s;
this.subscriptionSet = null;
}
State(Object[] s, Subscription add) {
this.isUnsubscribed = false;
this.subscriptions = null;
this.subscriptionSet = new HashSet<Object>(s.length * 6 / 5);
for (Object o : s) {
this.subscriptionSet.add(o);
}
this.subscriptionSet.add(add);
}
State(Set<Object> s) {
this.isUnsubscribed = false;
this.subscriptions = null;
this.subscriptionSet = s;
}

State unsubscribe() {
return CLEAR_STATE_UNSUBSCRIBED;
}

State add(Subscription s) {
if (subscriptions == null) {
synchronized (subscriptionSet) {
subscriptionSet.add(s);
return this;
}
}
int idx = subscriptions.length;
Subscription[] newSubscriptions = new Subscription[idx + 1];
if (idx == SET_MODE_THRESHOLD) {
return new State(subscriptions, s);
}
Object[] newSubscriptions = new Object[idx + 1];
System.arraycopy(subscriptions, 0, newSubscriptions, 0, idx);
newSubscriptions[idx] = s;
return new State(isUnsubscribed, newSubscriptions);
}

State remove(Subscription s) {
if (subscriptions == null) {
synchronized (subscriptionSet) {
subscriptionSet.remove(s);
if (subscriptionSet.size() == ARRAY_MODE_THRESHOLD) {
return new State(isUnsubscribed, subscriptionSet.toArray(new Object[subscriptionSet.size()]));
}
return this;
}
}
if ((subscriptions.length == 1 && subscriptions[0].equals(s)) || subscriptions.length == 0) {
return clear();
}
Subscription[] newSubscriptions = new Subscription[subscriptions.length - 1];
Object[] newSubscriptions = new Object[subscriptions.length - 1];
int idx = 0;
for (Subscription _s : subscriptions) {
for (Object _s : subscriptions) {
if (!_s.equals(s)) {
// was not in this composite
if (idx == newSubscriptions.length) {
Expand All @@ -84,7 +125,7 @@ State remove(Subscription s) {
}
// subscription appeared more than once
if (idx < newSubscriptions.length) {
Subscription[] newSub2 = new Subscription[idx];
Object[] newSub2 = new Object[idx];
System.arraycopy(newSubscriptions, 0, newSub2, 0, idx);
return new State(isUnsubscribed, newSub2);
}
Expand All @@ -94,14 +135,33 @@ State remove(Subscription s) {
State clear() {
return isUnsubscribed ? CLEAR_STATE_UNSUBSCRIBED : CLEAR_STATE;
}
void unsubscribeAll() {
if (subscriptions == null) {
unsubscribeFromAll(subscriptionSet);
} else {
unsubscribeFromAll(subscriptions);
}
}
/* test support.*/ int size() {
if (subscriptions == null) {
synchronized (subscriptionSet) {
return subscriptionSet.size();
}
}
return subscriptions.length;
}
}

public CompositeSubscription() {
state.set(CLEAR_STATE);
}

public CompositeSubscription(final Subscription... subscriptions) {
state.set(new State(false, subscriptions));
if (subscriptions.length > 0) {
state.set(new State(false, subscriptions));
} else {
state.set(CLEAR_STATE);
}
}

@Override
Expand Down Expand Up @@ -150,7 +210,7 @@ public void clear() {
}
} while (!state.compareAndSet(oldState, newState));
// if we cleared successfully we then need to call unsubscribe on all previous
unsubscribeFromAll(oldState.subscriptions);
oldState.unsubscribeAll();
}

@Override
Expand All @@ -165,14 +225,17 @@ public void unsubscribe() {
newState = oldState.unsubscribe();
}
} while (!state.compareAndSet(oldState, newState));
unsubscribeFromAll(oldState.subscriptions);
oldState.unsubscribeAll();
}

private static void unsubscribeFromAll(Subscription[] subscriptions) {
private static void unsubscribeFromAll(Object[] subscriptions) {
unsubscribeFromAll(Arrays.asList(subscriptions));
}
private static void unsubscribeFromAll(Iterable<Object> subscriptions) {
final List<Throwable> es = new ArrayList<Throwable>();
for (Subscription s : subscriptions) {
for (Object s : subscriptions) {
try {
s.unsubscribe();
((Subscription)s).unsubscribe();
} catch (Throwable e) {
es.add(e);
}
Expand All @@ -192,4 +255,7 @@ private static void unsubscribeFromAll(Subscription[] subscriptions) {
}
}
}
/* Test support. */ int size() {
return state.get().size();
}
}