From 42f5311d9017ac06debc428b88ecde11f281dc46 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Thu, 29 May 2014 11:49:17 -0700 Subject: [PATCH] Reduce Subscription Object Allocation - significant reduction in object allocations - details on research available at https://github.com/Netflix/RxJava/issues/1204 --- rxjava-core/src/main/java/rx/Subscriber.java | 13 +- .../java/rx/operators/OperatorGroupBy.java | 4 +- .../main/java/rx/operators/OperatorPivot.java | 12 +- .../main/java/rx/operators/OperatorTake.java | 4 +- .../rx/operators/OperatorUnsubscribeOn.java | 4 +- .../rx/subscriptions/ChainedSubscription.java | 109 +++++++ .../subscriptions/CompositeSubscription.java | 165 ++++------ .../ChainedSubscriptionTest.java | 286 ++++++++++++++++++ 8 files changed, 470 insertions(+), 127 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/subscriptions/ChainedSubscription.java create mode 100644 rxjava-core/src/test/java/rx/subscriptions/ChainedSubscriptionTest.java diff --git a/rxjava-core/src/main/java/rx/Subscriber.java b/rxjava-core/src/main/java/rx/Subscriber.java index 3ad986813c..0009a6a5e6 100644 --- a/rxjava-core/src/main/java/rx/Subscriber.java +++ b/rxjava-core/src/main/java/rx/Subscriber.java @@ -15,6 +15,7 @@ */ package rx; +import rx.subscriptions.ChainedSubscription; import rx.subscriptions.CompositeSubscription; /** @@ -30,17 +31,23 @@ */ public abstract class Subscriber implements Observer, Subscription { - private final CompositeSubscription cs; + private final ChainedSubscription cs; - protected Subscriber(CompositeSubscription cs) { + protected Subscriber(ChainedSubscription cs) { if (cs == null) { throw new IllegalArgumentException("The CompositeSubscription can not be null"); } this.cs = cs; } + + @Deprecated + protected Subscriber(CompositeSubscription cs) { + this(new ChainedSubscription()); + add(cs); + } protected Subscriber() { - this(new CompositeSubscription()); + this(new ChainedSubscription()); } protected Subscriber(Subscriber op) { diff --git a/rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java b/rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java index 8bdb0709e5..5a58d89802 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java @@ -26,7 +26,7 @@ import rx.functions.Action0; import rx.functions.Func1; import rx.observables.GroupedObservable; -import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.ChainedSubscription; import rx.subscriptions.Subscriptions; /** @@ -55,7 +55,7 @@ static final class GroupBySubscriber extends Subscriber { public GroupBySubscriber(Func1 keySelector, Subscriber> childObserver) { // a new CompositeSubscription to decouple the subscription as the inner subscriptions need a separate lifecycle // and will unsubscribe on this parent if they are all unsubscribed - super(new CompositeSubscription()); + super(new ChainedSubscription()); this.keySelector = keySelector; this.childObserver = childObserver; } diff --git a/rxjava-core/src/main/java/rx/operators/OperatorPivot.java b/rxjava-core/src/main/java/rx/operators/OperatorPivot.java index c803bb12b1..9447848be5 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorPivot.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorPivot.java @@ -28,7 +28,7 @@ import rx.Subscriber; import rx.functions.Action0; import rx.observables.GroupedObservable; -import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.ChainedSubscription; import rx.subscriptions.Subscriptions; public final class OperatorPivot implements Operator>, GroupedObservable>> { @@ -36,7 +36,7 @@ public final class OperatorPivot implements Operator>> call(final Subscriber>> child) { final AtomicReference state = new AtomicReference(State.create()); - final PivotSubscriber pivotSubscriber = new PivotSubscriber(new CompositeSubscription(), child, state); + final PivotSubscriber pivotSubscriber = new PivotSubscriber(new ChainedSubscription(), child, state); child.add(Subscriptions.create(new Action0() { @Override @@ -65,12 +65,12 @@ private static final class PivotSubscriber extends Subscriber>> child; private final AtomicReference state; private final GroupState groups; - private PivotSubscriber(CompositeSubscription parentSubscription, Subscriber>> child, AtomicReference state) { + private PivotSubscriber(ChainedSubscription parentSubscription, Subscriber>> child, AtomicReference state) { super(parentSubscription); this.parentSubscription = parentSubscription; this.child = child; @@ -158,7 +158,7 @@ public void onNext(T t) { private static final class GroupState { private final ConcurrentHashMap, Inner> innerSubjects = new ConcurrentHashMap, Inner>(); private final ConcurrentHashMap> outerSubjects = new ConcurrentHashMap>(); - private final CompositeSubscription parentSubscription; + private final ChainedSubscription parentSubscription; private final Subscriber>> child; /** Indicates a terminal state. */ volatile int completed; @@ -167,7 +167,7 @@ private static final class GroupState { static final AtomicIntegerFieldUpdater COMPLETED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(GroupState.class, "completed"); - public GroupState(CompositeSubscription parentSubscription, Subscriber>> child) { + public GroupState(ChainedSubscription parentSubscription, Subscriber>> child) { this.parentSubscription = parentSubscription; this.child = child; } diff --git a/rxjava-core/src/main/java/rx/operators/OperatorTake.java b/rxjava-core/src/main/java/rx/operators/OperatorTake.java index 62ab98b41c..414001fc10 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorTake.java @@ -17,7 +17,7 @@ import rx.Observable.Operator; import rx.Subscriber; -import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.ChainedSubscription; /** * Returns an Observable that emits the first num items emitted by the source @@ -40,7 +40,7 @@ public OperatorTake(int limit) { @Override public Subscriber call(final Subscriber child) { - final CompositeSubscription parent = new CompositeSubscription(); + final ChainedSubscription parent = new ChainedSubscription(); if (limit == 0) { child.onCompleted(); parent.unsubscribe(); diff --git a/rxjava-core/src/main/java/rx/operators/OperatorUnsubscribeOn.java b/rxjava-core/src/main/java/rx/operators/OperatorUnsubscribeOn.java index 82018b65c6..096eaef928 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorUnsubscribeOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorUnsubscribeOn.java @@ -19,7 +19,7 @@ import rx.Scheduler; import rx.Subscriber; import rx.functions.Action0; -import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.ChainedSubscription; import rx.subscriptions.Subscriptions; /** @@ -36,7 +36,7 @@ public OperatorUnsubscribeOn(Scheduler scheduler) { @Override public Subscriber call(final Subscriber subscriber) { - final CompositeSubscription parentSubscription = new CompositeSubscription(); + final ChainedSubscription parentSubscription = new ChainedSubscription(); subscriber.add(Subscriptions.create(new Action0() { @Override diff --git a/rxjava-core/src/main/java/rx/subscriptions/ChainedSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/ChainedSubscription.java new file mode 100644 index 0000000000..ecce5e41cb --- /dev/null +++ b/rxjava-core/src/main/java/rx/subscriptions/ChainedSubscription.java @@ -0,0 +1,109 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.subscriptions; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; + +import rx.Subscription; +import rx.exceptions.CompositeException; + +/** + * Subscription that represents a group of Subscriptions that are unsubscribed together. + * + * @see Rx.Net equivalent CompositeDisposable + */ +public final class ChainedSubscription implements Subscription { + + private List subscriptions; + private boolean unsubscribed = false; + + public ChainedSubscription() { + } + + public ChainedSubscription(final Subscription... subscriptions) { + this.subscriptions = new LinkedList(Arrays.asList(subscriptions)); + } + + @Override + public synchronized boolean isUnsubscribed() { + return unsubscribed; + } + + public void add(final Subscription s) { + Subscription unsubscribe = null; + synchronized (this) { + if (unsubscribed) { + unsubscribe = s; + } else { + if (subscriptions == null) { + subscriptions = new LinkedList(); + } + subscriptions.add(s); + } + } + if (unsubscribe != null) { + // call after leaving the synchronized block so we're not holding a lock while executing this + unsubscribe.unsubscribe(); + } + } + + @Override + public void unsubscribe() { + synchronized (this) { + if (unsubscribed) { + return; + } + unsubscribed = true; + } + // we will only get here once + unsubscribeFromAll(subscriptions); + } + + private static void unsubscribeFromAll(Collection subscriptions) { + if (subscriptions == null) { + return; + } + List es = null; + for (Subscription s : subscriptions) { + try { + s.unsubscribe(); + } catch (Throwable e) { + if (es == null) { + es = new ArrayList(); + } + es.add(e); + } + } + if (es != null) { + if (es.size() == 1) { + Throwable t = es.get(0); + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else { + throw new CompositeException( + "Failed to unsubscribe to 1 or more subscriptions.", es); + } + } else { + throw new CompositeException( + "Failed to unsubscribe to 2 or more subscriptions.", es); + } + } + } +} diff --git a/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java index 10c59a0ba9..c137948a01 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java @@ -16,8 +16,11 @@ package rx.subscriptions; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; import java.util.List; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.Set; import rx.Subscription; import rx.exceptions.CompositeException; @@ -29,156 +32,94 @@ * @see Rx.Net equivalent CompositeDisposable */ public final class CompositeSubscription implements Subscription { - /** The atomic state updater. */ - static final AtomicReferenceFieldUpdater STATE_UPDATER - = AtomicReferenceFieldUpdater.newUpdater(CompositeSubscription.class, State.class, "state"); - /** The subscription state. */ - volatile State state; - /** Empty initial state. */ - private static final State CLEAR_STATE; - /** Unsubscribed empty state. */ - private static final State CLEAR_STATE_UNSUBSCRIBED; - static { - Subscription[] s0 = new Subscription[0]; - CLEAR_STATE = new State(false, s0); - CLEAR_STATE_UNSUBSCRIBED = new State(true, s0); - } - - private static final class State { - final boolean isUnsubscribed; - final Subscription[] subscriptions; - - State(boolean u, Subscription[] s) { - this.isUnsubscribed = u; - this.subscriptions = s; - } - - State unsubscribe() { - return CLEAR_STATE_UNSUBSCRIBED; - } - - State add(Subscription s) { - int idx = subscriptions.length; - Subscription[] newSubscriptions = new Subscription[idx + 1]; - System.arraycopy(subscriptions, 0, newSubscriptions, 0, idx); - newSubscriptions[idx] = s; - return new State(isUnsubscribed, newSubscriptions); - } - - State remove(Subscription s) { - if ((subscriptions.length == 1 && subscriptions[0].equals(s)) || subscriptions.length == 0) { - return clear(); - } - Subscription[] newSubscriptions = new Subscription[subscriptions.length - 1]; - int idx = 0; - for (Subscription _s : subscriptions) { - if (!_s.equals(s)) { - // was not in this composite - if (idx == newSubscriptions.length) { - return this; - } - newSubscriptions[idx] = _s; - idx++; - } - } - if (idx == 0) { - return clear(); - } - // subscription appeared more than once - if (idx < newSubscriptions.length) { - Subscription[] newSub2 = new Subscription[idx]; - System.arraycopy(newSubscriptions, 0, newSub2, 0, idx); - return new State(isUnsubscribed, newSub2); - } - return new State(isUnsubscribed, newSubscriptions); - } - - State clear() { - return isUnsubscribed ? CLEAR_STATE_UNSUBSCRIBED : CLEAR_STATE; - } - } + private Set subscriptions; + private boolean unsubscribed = false; public CompositeSubscription() { - // this creates only a store-store barrier which is generally faster when - // CompositeSubscriptions are created in a tight loop. - state = CLEAR_STATE; } public CompositeSubscription(final Subscription... subscriptions) { - state = new State(false, subscriptions); + this.subscriptions = new HashSet(Arrays.asList(subscriptions)); } @Override - public boolean isUnsubscribed() { - return state.isUnsubscribed; + public synchronized boolean isUnsubscribed() { + return unsubscribed; } public void add(final Subscription s) { - State oldState; - State newState; - do { - oldState = state; - if (oldState.isUnsubscribed) { - s.unsubscribe(); - return; + Subscription unsubscribe = null; + synchronized (this) { + if (unsubscribed) { + unsubscribe = s; } else { - newState = oldState.add(s); + if (subscriptions == null) { + subscriptions = new HashSet(4); + } + subscriptions.add(s); } - } while (!STATE_UPDATER.compareAndSet(this, oldState, newState)); + } + if (unsubscribe != null) { + // call after leaving the synchronized block so we're not holding a lock while executing this + unsubscribe.unsubscribe(); + } } public void remove(final Subscription s) { - State oldState; - State newState; - do { - oldState = state; - if (oldState.isUnsubscribed) { + boolean unsubscribe = false; + synchronized (this) { + if (unsubscribed || subscriptions == null) { return; - } else { - newState = oldState.remove(s); } - } while (!STATE_UPDATER.compareAndSet(this, oldState, newState)); - // if we removed successfully we then need to call unsubscribe on it - s.unsubscribe(); + unsubscribe = subscriptions.remove(s); + } + if (unsubscribe) { + // if we removed successfully we then need to call unsubscribe on it (outside of the lock) + s.unsubscribe(); + } } public void clear() { - State oldState; - State newState; - do { - oldState = state; - if (oldState.isUnsubscribed) { + List unsubscribe = null; + synchronized (this) { + if (unsubscribed || subscriptions == null) { return; } else { - newState = oldState.clear(); + unsubscribe = new ArrayList(subscriptions); } - } while (!STATE_UPDATER.compareAndSet(this, oldState, newState)); - // if we cleared successfully we then need to call unsubscribe on all previous - unsubscribeFromAll(oldState.subscriptions); + } + unsubscribeFromAll(unsubscribe); } @Override public void unsubscribe() { - State oldState = state; - if (oldState.isUnsubscribed) { - return; + synchronized (this) { + if (unsubscribed) { + return; + } + unsubscribed = true; } - // intrinsics may make this a single instruction and may prevent concurrent add/remove faster - oldState = STATE_UPDATER.getAndSet(this, oldState.unsubscribe()); - unsubscribeFromAll(oldState.subscriptions); + // we will only get here once + unsubscribeFromAll(subscriptions); } - private static void unsubscribeFromAll(Subscription[] subscriptions) { - final List es = new ArrayList(); + private static void unsubscribeFromAll(Collection subscriptions) { + if (subscriptions == null) { + return; + } + List es = null; for (Subscription s : subscriptions) { try { s.unsubscribe(); } catch (Throwable e) { + if (es == null) { + es = new ArrayList(); + } es.add(e); } } - if (!es.isEmpty()) { + if (es != null) { if (es.size() == 1) { Throwable t = es.get(0); if (t instanceof RuntimeException) { diff --git a/rxjava-core/src/test/java/rx/subscriptions/ChainedSubscriptionTest.java b/rxjava-core/src/test/java/rx/subscriptions/ChainedSubscriptionTest.java new file mode 100644 index 0000000000..1539bdc74f --- /dev/null +++ b/rxjava-core/src/test/java/rx/subscriptions/ChainedSubscriptionTest.java @@ -0,0 +1,286 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.subscriptions; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import rx.Subscription; +import rx.exceptions.CompositeException; + +public class ChainedSubscriptionTest { + + @Test + public void testSuccess() { + final AtomicInteger counter = new AtomicInteger(); + ChainedSubscription s = new ChainedSubscription(); + s.add(new Subscription() { + + @Override + public void unsubscribe() { + counter.incrementAndGet(); + } + + @Override + public boolean isUnsubscribed() { + return false; + } + }); + + s.add(new Subscription() { + + @Override + public void unsubscribe() { + counter.incrementAndGet(); + } + + @Override + public boolean isUnsubscribed() { + return false; + } + }); + + s.unsubscribe(); + + assertEquals(2, counter.get()); + } + + @Test(timeout = 1000) + public void shouldUnsubscribeAll() throws InterruptedException { + final AtomicInteger counter = new AtomicInteger(); + final ChainedSubscription s = new ChainedSubscription(); + + final int count = 10; + final CountDownLatch start = new CountDownLatch(1); + for (int i = 0; i < count; i++) { + s.add(new Subscription() { + + @Override + public void unsubscribe() { + counter.incrementAndGet(); + } + + @Override + public boolean isUnsubscribed() { + return false; + } + }); + } + + final List threads = new ArrayList(); + for (int i = 0; i < count; i++) { + final Thread t = new Thread() { + @Override + public void run() { + try { + start.await(); + s.unsubscribe(); + } catch (final InterruptedException e) { + fail(e.getMessage()); + } + } + }; + t.start(); + threads.add(t); + } + + start.countDown(); + for (final Thread t : threads) { + t.join(); + } + + assertEquals(count, counter.get()); + } + + @Test + public void testException() { + final AtomicInteger counter = new AtomicInteger(); + ChainedSubscription s = new ChainedSubscription(); + s.add(new Subscription() { + + @Override + public void unsubscribe() { + throw new RuntimeException("failed on first one"); + } + + @Override + public boolean isUnsubscribed() { + return false; + } + }); + + s.add(new Subscription() { + + @Override + public void unsubscribe() { + counter.incrementAndGet(); + } + + @Override + public boolean isUnsubscribed() { + return false; + } + }); + + try { + s.unsubscribe(); + fail("Expecting an exception"); + } catch (RuntimeException e) { + // we expect this + assertEquals(e.getMessage(), "failed on first one"); + } + + // we should still have unsubscribed to the second one + assertEquals(1, counter.get()); + } + + @Test + public void testCompositeException() { + final AtomicInteger counter = new AtomicInteger(); + ChainedSubscription s = new ChainedSubscription(); + s.add(new Subscription() { + + @Override + public void unsubscribe() { + throw new RuntimeException("failed on first one"); + } + + @Override + public boolean isUnsubscribed() { + return false; + } + }); + + s.add(new Subscription() { + + @Override + public void unsubscribe() { + throw new RuntimeException("failed on second one too"); + } + + @Override + public boolean isUnsubscribed() { + return false; + } + }); + + s.add(new Subscription() { + + @Override + public void unsubscribe() { + counter.incrementAndGet(); + } + + @Override + public boolean isUnsubscribed() { + return false; + } + }); + + try { + s.unsubscribe(); + fail("Expecting an exception"); + } catch (CompositeException e) { + // we expect this + assertEquals(e.getExceptions().size(), 2); + } + + // we should still have unsubscribed to the second one + assertEquals(1, counter.get()); + } + + + @Test + public void testUnsubscribeIdempotence() { + final AtomicInteger counter = new AtomicInteger(); + ChainedSubscription s = new ChainedSubscription(); + s.add(new Subscription() { + + @Override + public void unsubscribe() { + counter.incrementAndGet(); + } + + @Override + public boolean isUnsubscribed() { + return false; + } + }); + + s.unsubscribe(); + s.unsubscribe(); + s.unsubscribe(); + + // we should have only unsubscribed once + assertEquals(1, counter.get()); + } + + @Test(timeout = 1000) + public void testUnsubscribeIdempotenceConcurrently() + throws InterruptedException { + final AtomicInteger counter = new AtomicInteger(); + final ChainedSubscription s = new ChainedSubscription(); + + final int count = 10; + final CountDownLatch start = new CountDownLatch(1); + s.add(new Subscription() { + + @Override + public void unsubscribe() { + counter.incrementAndGet(); + } + + @Override + public boolean isUnsubscribed() { + return false; + } + }); + + final List threads = new ArrayList(); + for (int i = 0; i < count; i++) { + final Thread t = new Thread() { + @Override + public void run() { + try { + start.await(); + s.unsubscribe(); + } catch (final InterruptedException e) { + fail(e.getMessage()); + } + } + }; + t.start(); + threads.add(t); + } + + start.countDown(); + for (final Thread t : threads) { + t.join(); + } + + // we should have only unsubscribed once + assertEquals(1, counter.get()); + } +}