Skip to content

Commit

Permalink
Merge pull request #1309 from benjchristensen/subscriber-subscription
Browse files Browse the repository at this point in the history
Hide ChainedSubscription/SubscriptionList from Public API
  • Loading branch information
benjchristensen committed Jun 3, 2014
2 parents 6417f0b + 459da07 commit e6086d4
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 82 deletions.
17 changes: 5 additions & 12 deletions rxjava-core/src/main/java/rx/Subscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package rx;

import rx.subscriptions.ChainedSubscription;
import rx.internal.util.SubscriptionList;
import rx.subscriptions.CompositeSubscription;

/**
Expand All @@ -32,27 +32,20 @@
*/
public abstract class Subscriber<T> implements Observer<T>, Subscription {

private final ChainedSubscription cs;
private final SubscriptionList cs;

protected Subscriber(ChainedSubscription cs) {
if (cs == null) {
throw new IllegalArgumentException("The CompositeSubscription can not be null");
}
this.cs = cs;
}

@Deprecated
protected Subscriber(CompositeSubscription cs) {
this(new ChainedSubscription());
this.cs = new SubscriptionList();
add(cs);
}

protected Subscriber() {
this(new ChainedSubscription());
this.cs = new SubscriptionList();
}

protected Subscriber(Subscriber<?> op) {
this(op.cs);
this.cs = op.cs;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import rx.functions.Action0;
import rx.functions.Func1;
import rx.observables.GroupedObservable;
import rx.subscriptions.ChainedSubscription;
import rx.subscriptions.Subscriptions;

/**
Expand Down Expand Up @@ -55,7 +54,7 @@ static final class GroupBySubscriber<K, T> extends Subscriber<T> {
public GroupBySubscriber(Func1<? super T, ? extends K> keySelector, Subscriber<? super GroupedObservable<K, T>> child) {
// a new CompositeSubscription to decouple the subscription as the inner subscriptions need a separate lifecycle
// and will unsubscribe on this parent if they are all unsubscribed
super(new ChainedSubscription());
super();
this.keySelector = keySelector;
this.child = child;
}
Expand Down
26 changes: 9 additions & 17 deletions rxjava-core/src/main/java/rx/internal/operators/OperatorPivot.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@
import rx.Observable.OnSubscribe;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.observables.GroupedObservable;
import rx.subscriptions.ChainedSubscription;
import rx.subscriptions.Subscriptions;

public final class OperatorPivot<K1, K2, T> implements Operator<GroupedObservable<K2, GroupedObservable<K1, T>>, GroupedObservable<K1, GroupedObservable<K2, T>>> {

@Override
public Subscriber<? super GroupedObservable<K1, GroupedObservable<K2, T>>> call(final Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child) {
final AtomicReference<State> state = new AtomicReference<State>(State.create());
final PivotSubscriber<K1, K2, T> pivotSubscriber = new PivotSubscriber<K1, K2, T>(new ChainedSubscription(), child, state);
final PivotSubscriber<K1, K2, T> pivotSubscriber = new PivotSubscriber<K1, K2, T>(child, state);
child.add(Subscriptions.create(new Action0() {

@Override
Expand All @@ -61,21 +61,14 @@ public void call() {
}

private static final class PivotSubscriber<K1, K2, T> extends Subscriber<GroupedObservable<K1, GroupedObservable<K2, T>>> {
/*
* needs to decouple the subscription as the inner subscriptions need a separate lifecycle
* and will unsubscribe on this parent if they are all unsubscribed
*/
private final ChainedSubscription parentSubscription;
private final Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child;
private final AtomicReference<State> state;
private final GroupState<K1, K2, T> groups;

private PivotSubscriber(ChainedSubscription parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child, AtomicReference<State> state) {
super(parentSubscription);
this.parentSubscription = parentSubscription;
private PivotSubscriber(Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child, AtomicReference<State> state) {
this.child = child;
this.state = state;
this.groups = new GroupState<K1, K2, T>(parentSubscription, child);
this.groups = new GroupState<K1, K2, T>(this, child);
}

@Override
Expand All @@ -102,7 +95,7 @@ public void onError(Throwable e) {
@Override
public void onNext(final GroupedObservable<K1, GroupedObservable<K2, T>> k1Group) {
groups.startK1Group(state, k1Group.getKey());
k1Group.unsafeSubscribe(new Subscriber<GroupedObservable<K2, T>>(parentSubscription) {
k1Group.unsafeSubscribe(new Subscriber<GroupedObservable<K2, T>>(this) {

@Override
public void onCompleted() {
Expand All @@ -124,7 +117,7 @@ public void onNext(final GroupedObservable<K2, T> k2Group) {
// we have been unsubscribed
return;
}
k2Group.unsafeSubscribe(new Subscriber<T>(parentSubscription) {
k2Group.unsafeSubscribe(new Subscriber<T>(this) {

@Override
public void onCompleted() {
Expand Down Expand Up @@ -158,16 +151,15 @@ public void onNext(T t) {
private static final class GroupState<K1, K2, T> {
private final ConcurrentHashMap<KeyPair<K1, K2>, Inner<K1, K2, T>> innerSubjects = new ConcurrentHashMap<KeyPair<K1, K2>, Inner<K1, K2, T>>();
private final ConcurrentHashMap<K2, Outer<K1, K2, T>> outerSubjects = new ConcurrentHashMap<K2, Outer<K1, K2, T>>();
private final ChainedSubscription parentSubscription;
private final Subscription parentSubscription;
private final Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child;
/** Indicates a terminal state. */
volatile int completed;
/** Field updater for completed. */
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<GroupState> COMPLETED_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(GroupState.class, "completed");
static final AtomicIntegerFieldUpdater<GroupState> COMPLETED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(GroupState.class, "completed");

public GroupState(ChainedSubscription parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child) {
public GroupState(Subscription parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child) {
this.parentSubscription = parentSubscription;
this.child = child;
}
Expand Down
34 changes: 17 additions & 17 deletions rxjava-core/src/main/java/rx/internal/operators/OperatorTake.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import rx.Observable.Operator;
import rx.Subscriber;
import rx.subscriptions.ChainedSubscription;

/**
* Returns an Observable that emits the first <code>num</code> items emitted by the source
Expand All @@ -40,22 +39,7 @@ public OperatorTake(int limit) {

@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
final ChainedSubscription parent = new ChainedSubscription();
if (limit == 0) {
child.onCompleted();
parent.unsubscribe();
}

/*
* We decouple the parent and child subscription so there can be multiple take() in a chain
* such as for the groupBy Observer use case where you may take(1) on groups and take(20) on the children.
*
* Thus, we only unsubscribe UPWARDS to the parent and an onComplete DOWNSTREAM.
*
* However, if we receive an unsubscribe from the child we still want to propagate it upwards so we register 'parent' with 'child'
*/
child.add(parent);
return new Subscriber<T>(parent) {
Subscriber<T> parent = new Subscriber<T>() {

int count = 0;
boolean completed = false;
Expand Down Expand Up @@ -87,6 +71,22 @@ public void onNext(T i) {
}

};

if (limit == 0) {
child.onCompleted();
parent.unsubscribe();
}

/*
* We decouple the parent and child subscription so there can be multiple take() in a chain
* such as for the groupBy Observer use case where you may take(1) on groups and take(20) on the children.
*
* Thus, we only unsubscribe UPWARDS to the parent and an onComplete DOWNSTREAM.
*
* However, if we receive an unsubscribe from the child we still want to propagate it upwards so we register 'parent' with 'child'
*/
child.add(parent);
return parent;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.subscriptions.ChainedSubscription;
import rx.subscriptions.Subscriptions;

/**
Expand All @@ -36,25 +35,7 @@ public OperatorUnsubscribeOn(Scheduler scheduler) {

@Override
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
final ChainedSubscription parentSubscription = new ChainedSubscription();
subscriber.add(Subscriptions.create(new Action0() {

@Override
public void call() {
final Scheduler.Worker inner = scheduler.createWorker();
inner.schedule(new Action0() {

@Override
public void call() {
parentSubscription.unsubscribe();
inner.unsubscribe();
}
});
}

}));

return new Subscriber<T>(parentSubscription) {
final Subscriber<T> parent = new Subscriber<T>() {

@Override
public void onCompleted() {
Expand All @@ -72,5 +53,26 @@ public void onNext(T t) {
}

};

subscriber.add(Subscriptions.create(new Action0() {

@Override
public void call() {
final Scheduler.Worker inner = scheduler.createWorker();
inner.schedule(new Action0() {

@Override
public void call() {
parent.unsubscribe();
inner.unsubscribe();
}
});
}

}));

return parent;


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.subscriptions;
package rx.internal.util;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -29,15 +29,15 @@
*
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable(v=vs.103).aspx">Rx.Net equivalent CompositeDisposable</a>
*/
public final class ChainedSubscription implements Subscription {
public final class SubscriptionList implements Subscription {

private List<Subscription> subscriptions;
private boolean unsubscribed = false;

public ChainedSubscription() {
public SubscriptionList() {
}

public ChainedSubscription(final Subscription... subscriptions) {
public SubscriptionList(final Subscription... subscriptions) {
this.subscriptions = new LinkedList<Subscription>(Arrays.asList(subscriptions));
}

Expand All @@ -47,8 +47,8 @@ public synchronized boolean isUnsubscribed() {
}

/**
* Adds a new {@link Subscription} to this {@code ChainedSubscription} if the {@code ChainedSubscription} is
* not yet unsubscribed. If the {@code ChainedSubscription} <em>is</em> unsubscribed, {@code add} will
* Adds a new {@link Subscription} to this {@code SubscriptionList} if the {@code SubscriptionList} is
* not yet unsubscribed. If the {@code SubscriptionList} <em>is</em> unsubscribed, {@code add} will
* indicate this by explicitly unsubscribing the new {@code Subscription} as well.
*
* @param s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.subscriptions;
package rx.internal.util;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand All @@ -29,13 +29,14 @@

import rx.Subscription;
import rx.exceptions.CompositeException;
import rx.internal.util.SubscriptionList;

public class ChainedSubscriptionTest {
public class SubscriptionListTest {

@Test
public void testSuccess() {
final AtomicInteger counter = new AtomicInteger();
ChainedSubscription s = new ChainedSubscription();
SubscriptionList s = new SubscriptionList();
s.add(new Subscription() {

@Override
Expand Down Expand Up @@ -70,7 +71,7 @@ public boolean isUnsubscribed() {
@Test(timeout = 1000)
public void shouldUnsubscribeAll() throws InterruptedException {
final AtomicInteger counter = new AtomicInteger();
final ChainedSubscription s = new ChainedSubscription();
final SubscriptionList s = new SubscriptionList();

final int count = 10;
final CountDownLatch start = new CountDownLatch(1);
Expand Down Expand Up @@ -117,7 +118,7 @@ public void run() {
@Test
public void testException() {
final AtomicInteger counter = new AtomicInteger();
ChainedSubscription s = new ChainedSubscription();
SubscriptionList s = new SubscriptionList();
s.add(new Subscription() {

@Override
Expand Down Expand Up @@ -159,7 +160,7 @@ public boolean isUnsubscribed() {
@Test
public void testCompositeException() {
final AtomicInteger counter = new AtomicInteger();
ChainedSubscription s = new ChainedSubscription();
SubscriptionList s = new SubscriptionList();
s.add(new Subscription() {

@Override
Expand Down Expand Up @@ -215,7 +216,7 @@ public boolean isUnsubscribed() {
@Test
public void testUnsubscribeIdempotence() {
final AtomicInteger counter = new AtomicInteger();
ChainedSubscription s = new ChainedSubscription();
SubscriptionList s = new SubscriptionList();
s.add(new Subscription() {

@Override
Expand All @@ -241,7 +242,7 @@ public boolean isUnsubscribed() {
public void testUnsubscribeIdempotenceConcurrently()
throws InterruptedException {
final AtomicInteger counter = new AtomicInteger();
final ChainedSubscription s = new ChainedSubscription();
final SubscriptionList s = new SubscriptionList();

final int count = 10;
final CountDownLatch start = new CountDownLatch(1);
Expand Down

0 comments on commit e6086d4

Please sign in to comment.