Skip to content

Commit

Permalink
Merge pull request #1281 from benjchristensen/composite-subscription-…
Browse files Browse the repository at this point in the history
…performance

Reduce Subscription Object Allocation
  • Loading branch information
benjchristensen committed May 29, 2014
2 parents da9aa93 + 15c385d commit 798fa7e
Show file tree
Hide file tree
Showing 8 changed files with 471 additions and 127 deletions.
13 changes: 10 additions & 3 deletions rxjava-core/src/main/java/rx/Subscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package rx;

import rx.subscriptions.ChainedSubscription;
import rx.subscriptions.CompositeSubscription;

/**
Expand All @@ -31,17 +32,23 @@
*/
public abstract class Subscriber<T> implements Observer<T>, Subscription {

private final CompositeSubscription cs;
private final ChainedSubscription cs;

protected Subscriber(CompositeSubscription cs) {
protected Subscriber(ChainedSubscription cs) {
if (cs == null) {
throw new IllegalArgumentException("The CompositeSubscription can not be null");
}
this.cs = cs;
}

@Deprecated
protected Subscriber(CompositeSubscription cs) {
this(new ChainedSubscription());
add(cs);
}

protected Subscriber() {
this(new CompositeSubscription());
this(new ChainedSubscription());
}

protected Subscriber(Subscriber<?> op) {
Expand Down
4 changes: 2 additions & 2 deletions rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import rx.functions.Action0;
import rx.functions.Func1;
import rx.observables.GroupedObservable;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.ChainedSubscription;
import rx.subscriptions.Subscriptions;

/**
Expand Down Expand Up @@ -55,7 +55,7 @@ static final class GroupBySubscriber<K, T> extends Subscriber<T> {
public GroupBySubscriber(Func1<? super T, ? extends K> keySelector, Subscriber<? super GroupedObservable<K, T>> childObserver) {
// a new CompositeSubscription to decouple the subscription as the inner subscriptions need a separate lifecycle
// and will unsubscribe on this parent if they are all unsubscribed
super(new CompositeSubscription());
super(new ChainedSubscription());
this.keySelector = keySelector;
this.childObserver = childObserver;
}
Expand Down
12 changes: 6 additions & 6 deletions rxjava-core/src/main/java/rx/operators/OperatorPivot.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@
import rx.Subscriber;
import rx.functions.Action0;
import rx.observables.GroupedObservable;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.ChainedSubscription;
import rx.subscriptions.Subscriptions;

public final class OperatorPivot<K1, K2, T> implements Operator<GroupedObservable<K2, GroupedObservable<K1, T>>, GroupedObservable<K1, GroupedObservable<K2, T>>> {

@Override
public Subscriber<? super GroupedObservable<K1, GroupedObservable<K2, T>>> call(final Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child) {
final AtomicReference<State> state = new AtomicReference<State>(State.create());
final PivotSubscriber<K1, K2, T> pivotSubscriber = new PivotSubscriber<K1, K2, T>(new CompositeSubscription(), child, state);
final PivotSubscriber<K1, K2, T> pivotSubscriber = new PivotSubscriber<K1, K2, T>(new ChainedSubscription(), child, state);
child.add(Subscriptions.create(new Action0() {

@Override
Expand Down Expand Up @@ -65,12 +65,12 @@ private static final class PivotSubscriber<K1, K2, T> extends Subscriber<Grouped
* needs to decouple the subscription as the inner subscriptions need a separate lifecycle
* and will unsubscribe on this parent if they are all unsubscribed
*/
private final CompositeSubscription parentSubscription;
private final ChainedSubscription parentSubscription;
private final Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child;
private final AtomicReference<State> state;
private final GroupState<K1, K2, T> groups;

private PivotSubscriber(CompositeSubscription parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child, AtomicReference<State> state) {
private PivotSubscriber(ChainedSubscription parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child, AtomicReference<State> state) {
super(parentSubscription);
this.parentSubscription = parentSubscription;
this.child = child;
Expand Down Expand Up @@ -158,7 +158,7 @@ public void onNext(T t) {
private static final class GroupState<K1, K2, T> {
private final ConcurrentHashMap<KeyPair<K1, K2>, Inner<K1, K2, T>> innerSubjects = new ConcurrentHashMap<KeyPair<K1, K2>, Inner<K1, K2, T>>();
private final ConcurrentHashMap<K2, Outer<K1, K2, T>> outerSubjects = new ConcurrentHashMap<K2, Outer<K1, K2, T>>();
private final CompositeSubscription parentSubscription;
private final ChainedSubscription parentSubscription;
private final Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child;
/** Indicates a terminal state. */
volatile int completed;
Expand All @@ -167,7 +167,7 @@ private static final class GroupState<K1, K2, T> {
static final AtomicIntegerFieldUpdater<GroupState> COMPLETED_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(GroupState.class, "completed");

public GroupState(CompositeSubscription parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child) {
public GroupState(ChainedSubscription parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child) {
this.parentSubscription = parentSubscription;
this.child = child;
}
Expand Down
4 changes: 2 additions & 2 deletions rxjava-core/src/main/java/rx/operators/OperatorTake.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import rx.Observable.Operator;
import rx.Subscriber;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.ChainedSubscription;

/**
* Returns an Observable that emits the first <code>num</code> items emitted by the source
Expand All @@ -40,7 +40,7 @@ public OperatorTake(int limit) {

@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
final CompositeSubscription parent = new CompositeSubscription();
final ChainedSubscription parent = new ChainedSubscription();
if (limit == 0) {
child.onCompleted();
parent.unsubscribe();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.ChainedSubscription;
import rx.subscriptions.Subscriptions;

/**
Expand All @@ -36,7 +36,7 @@ public OperatorUnsubscribeOn(Scheduler scheduler) {

@Override
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
final CompositeSubscription parentSubscription = new CompositeSubscription();
final ChainedSubscription parentSubscription = new ChainedSubscription();
subscriber.add(Subscriptions.create(new Action0() {

@Override
Expand Down
109 changes: 109 additions & 0 deletions rxjava-core/src/main/java/rx/subscriptions/ChainedSubscription.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.subscriptions;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;

import rx.Subscription;
import rx.exceptions.CompositeException;

/**
* Subscription that represents a group of Subscriptions that are unsubscribed together.
*
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable(v=vs.103).aspx">Rx.Net equivalent CompositeDisposable</a>
*/
public final class ChainedSubscription implements Subscription {

private List<Subscription> subscriptions;
private boolean unsubscribed = false;

public ChainedSubscription() {
}

public ChainedSubscription(final Subscription... subscriptions) {
this.subscriptions = new LinkedList<Subscription>(Arrays.asList(subscriptions));
}

@Override
public synchronized boolean isUnsubscribed() {
return unsubscribed;
}

public void add(final Subscription s) {
Subscription unsubscribe = null;
synchronized (this) {
if (unsubscribed) {
unsubscribe = s;
} else {
if (subscriptions == null) {
subscriptions = new LinkedList<Subscription>();
}
subscriptions.add(s);
}
}
if (unsubscribe != null) {
// call after leaving the synchronized block so we're not holding a lock while executing this
unsubscribe.unsubscribe();
}
}

@Override
public void unsubscribe() {
synchronized (this) {
if (unsubscribed) {
return;
}
unsubscribed = true;
}
// we will only get here once
unsubscribeFromAll(subscriptions);
}

private static void unsubscribeFromAll(Collection<Subscription> subscriptions) {
if (subscriptions == null) {
return;
}
List<Throwable> es = null;
for (Subscription s : subscriptions) {
try {
s.unsubscribe();
} catch (Throwable e) {
if (es == null) {
es = new ArrayList<Throwable>();
}
es.add(e);
}
}
if (es != null) {
if (es.size() == 1) {
Throwable t = es.get(0);
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else {
throw new CompositeException(
"Failed to unsubscribe to 1 or more subscriptions.", es);
}
} else {
throw new CompositeException(
"Failed to unsubscribe to 2 or more subscriptions.", es);
}
}
}
}
Loading

0 comments on commit 798fa7e

Please sign in to comment.