From 459da07f18d8ce34b373c5a4f2c1d580c3f6d174 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 2 Jun 2014 22:30:42 -0700 Subject: [PATCH] Hide ChainedSubscription --- rxjava-core/src/main/java/rx/Subscriber.java | 17 +++----- .../internal/operators/OperatorGroupBy.java | 3 +- .../rx/internal/operators/OperatorPivot.java | 26 ++++-------- .../rx/internal/operators/OperatorTake.java | 34 +++++++-------- .../operators/OperatorUnsubscribeOn.java | 42 ++++++++++--------- .../util/SubscriptionList.java} | 12 +++--- .../util/SubscriptionListTest.java} | 17 ++++---- 7 files changed, 69 insertions(+), 82 deletions(-) rename rxjava-core/src/main/java/rx/{subscriptions/ChainedSubscription.java => internal/util/SubscriptionList.java} (89%) rename rxjava-core/src/test/java/rx/{subscriptions/ChainedSubscriptionTest.java => internal/util/SubscriptionListTest.java} (94%) diff --git a/rxjava-core/src/main/java/rx/Subscriber.java b/rxjava-core/src/main/java/rx/Subscriber.java index 8a5da798bb..c6f21d5daf 100644 --- a/rxjava-core/src/main/java/rx/Subscriber.java +++ b/rxjava-core/src/main/java/rx/Subscriber.java @@ -15,7 +15,7 @@ */ package rx; -import rx.subscriptions.ChainedSubscription; +import rx.internal.util.SubscriptionList; import rx.subscriptions.CompositeSubscription; /** @@ -32,27 +32,20 @@ */ public abstract class Subscriber implements Observer, Subscription { - private final ChainedSubscription cs; + private final SubscriptionList cs; - protected Subscriber(ChainedSubscription cs) { - if (cs == null) { - throw new IllegalArgumentException("The CompositeSubscription can not be null"); - } - this.cs = cs; - } - @Deprecated protected Subscriber(CompositeSubscription cs) { - this(new ChainedSubscription()); + this.cs = new SubscriptionList(); add(cs); } protected Subscriber() { - this(new ChainedSubscription()); + this.cs = new SubscriptionList(); } protected Subscriber(Subscriber op) { - this(op.cs); + this.cs = op.cs; } /** diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorGroupBy.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorGroupBy.java index 9944824b74..9fb71e8ced 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorGroupBy.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorGroupBy.java @@ -26,7 +26,6 @@ import rx.functions.Action0; import rx.functions.Func1; import rx.observables.GroupedObservable; -import rx.subscriptions.ChainedSubscription; import rx.subscriptions.Subscriptions; /** @@ -55,7 +54,7 @@ static final class GroupBySubscriber extends Subscriber { public GroupBySubscriber(Func1 keySelector, Subscriber> child) { // a new CompositeSubscription to decouple the subscription as the inner subscriptions need a separate lifecycle // and will unsubscribe on this parent if they are all unsubscribed - super(new ChainedSubscription()); + super(); this.keySelector = keySelector; this.child = child; } diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorPivot.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorPivot.java index cefb7d7430..d1b3b0b3b4 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorPivot.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorPivot.java @@ -26,9 +26,9 @@ import rx.Observable.OnSubscribe; import rx.Observable.Operator; import rx.Subscriber; +import rx.Subscription; import rx.functions.Action0; import rx.observables.GroupedObservable; -import rx.subscriptions.ChainedSubscription; import rx.subscriptions.Subscriptions; public final class OperatorPivot implements Operator>, GroupedObservable>> { @@ -36,7 +36,7 @@ public final class OperatorPivot implements Operator>> call(final Subscriber>> child) { final AtomicReference state = new AtomicReference(State.create()); - final PivotSubscriber pivotSubscriber = new PivotSubscriber(new ChainedSubscription(), child, state); + final PivotSubscriber pivotSubscriber = new PivotSubscriber(child, state); child.add(Subscriptions.create(new Action0() { @Override @@ -61,21 +61,14 @@ public void call() { } private static final class PivotSubscriber extends Subscriber>> { - /* - * needs to decouple the subscription as the inner subscriptions need a separate lifecycle - * and will unsubscribe on this parent if they are all unsubscribed - */ - private final ChainedSubscription parentSubscription; private final Subscriber>> child; private final AtomicReference state; private final GroupState groups; - private PivotSubscriber(ChainedSubscription parentSubscription, Subscriber>> child, AtomicReference state) { - super(parentSubscription); - this.parentSubscription = parentSubscription; + private PivotSubscriber(Subscriber>> child, AtomicReference state) { this.child = child; this.state = state; - this.groups = new GroupState(parentSubscription, child); + this.groups = new GroupState(this, child); } @Override @@ -102,7 +95,7 @@ public void onError(Throwable e) { @Override public void onNext(final GroupedObservable> k1Group) { groups.startK1Group(state, k1Group.getKey()); - k1Group.unsafeSubscribe(new Subscriber>(parentSubscription) { + k1Group.unsafeSubscribe(new Subscriber>(this) { @Override public void onCompleted() { @@ -124,7 +117,7 @@ public void onNext(final GroupedObservable k2Group) { // we have been unsubscribed return; } - k2Group.unsafeSubscribe(new Subscriber(parentSubscription) { + k2Group.unsafeSubscribe(new Subscriber(this) { @Override public void onCompleted() { @@ -158,16 +151,15 @@ public void onNext(T t) { private static final class GroupState { private final ConcurrentHashMap, Inner> innerSubjects = new ConcurrentHashMap, Inner>(); private final ConcurrentHashMap> outerSubjects = new ConcurrentHashMap>(); - private final ChainedSubscription parentSubscription; + private final Subscription parentSubscription; private final Subscriber>> child; /** Indicates a terminal state. */ volatile int completed; /** Field updater for completed. */ @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater COMPLETED_UPDATER - = AtomicIntegerFieldUpdater.newUpdater(GroupState.class, "completed"); + static final AtomicIntegerFieldUpdater COMPLETED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(GroupState.class, "completed"); - public GroupState(ChainedSubscription parentSubscription, Subscriber>> child) { + public GroupState(Subscription parentSubscription, Subscriber>> child) { this.parentSubscription = parentSubscription; this.child = child; } diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorTake.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorTake.java index 94757fc384..4799ee1fe9 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorTake.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorTake.java @@ -17,7 +17,6 @@ import rx.Observable.Operator; import rx.Subscriber; -import rx.subscriptions.ChainedSubscription; /** * Returns an Observable that emits the first num items emitted by the source @@ -40,22 +39,7 @@ public OperatorTake(int limit) { @Override public Subscriber call(final Subscriber child) { - final ChainedSubscription parent = new ChainedSubscription(); - if (limit == 0) { - child.onCompleted(); - parent.unsubscribe(); - } - - /* - * We decouple the parent and child subscription so there can be multiple take() in a chain - * such as for the groupBy Observer use case where you may take(1) on groups and take(20) on the children. - * - * Thus, we only unsubscribe UPWARDS to the parent and an onComplete DOWNSTREAM. - * - * However, if we receive an unsubscribe from the child we still want to propagate it upwards so we register 'parent' with 'child' - */ - child.add(parent); - return new Subscriber(parent) { + Subscriber parent = new Subscriber() { int count = 0; boolean completed = false; @@ -87,6 +71,22 @@ public void onNext(T i) { } }; + + if (limit == 0) { + child.onCompleted(); + parent.unsubscribe(); + } + + /* + * We decouple the parent and child subscription so there can be multiple take() in a chain + * such as for the groupBy Observer use case where you may take(1) on groups and take(20) on the children. + * + * Thus, we only unsubscribe UPWARDS to the parent and an onComplete DOWNSTREAM. + * + * However, if we receive an unsubscribe from the child we still want to propagate it upwards so we register 'parent' with 'child' + */ + child.add(parent); + return parent; } } diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorUnsubscribeOn.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorUnsubscribeOn.java index b33e62b493..20957e29f4 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorUnsubscribeOn.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorUnsubscribeOn.java @@ -19,7 +19,6 @@ import rx.Scheduler; import rx.Subscriber; import rx.functions.Action0; -import rx.subscriptions.ChainedSubscription; import rx.subscriptions.Subscriptions; /** @@ -36,25 +35,7 @@ public OperatorUnsubscribeOn(Scheduler scheduler) { @Override public Subscriber call(final Subscriber subscriber) { - final ChainedSubscription parentSubscription = new ChainedSubscription(); - subscriber.add(Subscriptions.create(new Action0() { - - @Override - public void call() { - final Scheduler.Worker inner = scheduler.createWorker(); - inner.schedule(new Action0() { - - @Override - public void call() { - parentSubscription.unsubscribe(); - inner.unsubscribe(); - } - }); - } - - })); - - return new Subscriber(parentSubscription) { + final Subscriber parent = new Subscriber() { @Override public void onCompleted() { @@ -72,5 +53,26 @@ public void onNext(T t) { } }; + + subscriber.add(Subscriptions.create(new Action0() { + + @Override + public void call() { + final Scheduler.Worker inner = scheduler.createWorker(); + inner.schedule(new Action0() { + + @Override + public void call() { + parent.unsubscribe(); + inner.unsubscribe(); + } + }); + } + + })); + + return parent; + + } } diff --git a/rxjava-core/src/main/java/rx/subscriptions/ChainedSubscription.java b/rxjava-core/src/main/java/rx/internal/util/SubscriptionList.java similarity index 89% rename from rxjava-core/src/main/java/rx/subscriptions/ChainedSubscription.java rename to rxjava-core/src/main/java/rx/internal/util/SubscriptionList.java index df9416fa8f..baff4f57f4 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/ChainedSubscription.java +++ b/rxjava-core/src/main/java/rx/internal/util/SubscriptionList.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package rx.subscriptions; +package rx.internal.util; import java.util.ArrayList; import java.util.Arrays; @@ -29,15 +29,15 @@ * * @see Rx.Net equivalent CompositeDisposable */ -public final class ChainedSubscription implements Subscription { +public final class SubscriptionList implements Subscription { private List subscriptions; private boolean unsubscribed = false; - public ChainedSubscription() { + public SubscriptionList() { } - public ChainedSubscription(final Subscription... subscriptions) { + public SubscriptionList(final Subscription... subscriptions) { this.subscriptions = new LinkedList(Arrays.asList(subscriptions)); } @@ -47,8 +47,8 @@ public synchronized boolean isUnsubscribed() { } /** - * Adds a new {@link Subscription} to this {@code ChainedSubscription} if the {@code ChainedSubscription} is - * not yet unsubscribed. If the {@code ChainedSubscription} is unsubscribed, {@code add} will + * Adds a new {@link Subscription} to this {@code SubscriptionList} if the {@code SubscriptionList} is + * not yet unsubscribed. If the {@code SubscriptionList} is unsubscribed, {@code add} will * indicate this by explicitly unsubscribing the new {@code Subscription} as well. * * @param s diff --git a/rxjava-core/src/test/java/rx/subscriptions/ChainedSubscriptionTest.java b/rxjava-core/src/test/java/rx/internal/util/SubscriptionListTest.java similarity index 94% rename from rxjava-core/src/test/java/rx/subscriptions/ChainedSubscriptionTest.java rename to rxjava-core/src/test/java/rx/internal/util/SubscriptionListTest.java index 1539bdc74f..f346724e95 100644 --- a/rxjava-core/src/test/java/rx/subscriptions/ChainedSubscriptionTest.java +++ b/rxjava-core/src/test/java/rx/internal/util/SubscriptionListTest.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package rx.subscriptions; +package rx.internal.util; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -29,13 +29,14 @@ import rx.Subscription; import rx.exceptions.CompositeException; +import rx.internal.util.SubscriptionList; -public class ChainedSubscriptionTest { +public class SubscriptionListTest { @Test public void testSuccess() { final AtomicInteger counter = new AtomicInteger(); - ChainedSubscription s = new ChainedSubscription(); + SubscriptionList s = new SubscriptionList(); s.add(new Subscription() { @Override @@ -70,7 +71,7 @@ public boolean isUnsubscribed() { @Test(timeout = 1000) public void shouldUnsubscribeAll() throws InterruptedException { final AtomicInteger counter = new AtomicInteger(); - final ChainedSubscription s = new ChainedSubscription(); + final SubscriptionList s = new SubscriptionList(); final int count = 10; final CountDownLatch start = new CountDownLatch(1); @@ -117,7 +118,7 @@ public void run() { @Test public void testException() { final AtomicInteger counter = new AtomicInteger(); - ChainedSubscription s = new ChainedSubscription(); + SubscriptionList s = new SubscriptionList(); s.add(new Subscription() { @Override @@ -159,7 +160,7 @@ public boolean isUnsubscribed() { @Test public void testCompositeException() { final AtomicInteger counter = new AtomicInteger(); - ChainedSubscription s = new ChainedSubscription(); + SubscriptionList s = new SubscriptionList(); s.add(new Subscription() { @Override @@ -215,7 +216,7 @@ public boolean isUnsubscribed() { @Test public void testUnsubscribeIdempotence() { final AtomicInteger counter = new AtomicInteger(); - ChainedSubscription s = new ChainedSubscription(); + SubscriptionList s = new SubscriptionList(); s.add(new Subscription() { @Override @@ -241,7 +242,7 @@ public boolean isUnsubscribed() { public void testUnsubscribeIdempotenceConcurrently() throws InterruptedException { final AtomicInteger counter = new AtomicInteger(); - final ChainedSubscription s = new ChainedSubscription(); + final SubscriptionList s = new SubscriptionList(); final int count = 10; final CountDownLatch start = new CountDownLatch(1);