Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ChainedSubscription -> SubscriptionList #1308

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions rxjava-core/src/main/java/rx/Subscriber.java
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@
*/
package rx;

import rx.subscriptions.ChainedSubscription;
import rx.subscriptions.SubscriptionList;
import rx.subscriptions.CompositeSubscription;

/**
@@ -32,9 +32,9 @@
*/
public abstract class Subscriber<T> implements Observer<T>, Subscription {

private final ChainedSubscription cs;
private final SubscriptionList cs;

protected Subscriber(ChainedSubscription cs) {
protected Subscriber(SubscriptionList cs) {
if (cs == null) {
throw new IllegalArgumentException("The CompositeSubscription can not be null");
}
@@ -43,12 +43,12 @@ protected Subscriber(ChainedSubscription cs) {

@Deprecated
protected Subscriber(CompositeSubscription cs) {
this(new ChainedSubscription());
this(new SubscriptionList());
add(cs);
}

protected Subscriber() {
this(new ChainedSubscription());
this(new SubscriptionList());
}

protected Subscriber(Subscriber<?> op) {
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@
import rx.functions.Action0;
import rx.functions.Func1;
import rx.observables.GroupedObservable;
import rx.subscriptions.ChainedSubscription;
import rx.subscriptions.SubscriptionList;
import rx.subscriptions.Subscriptions;

/**
@@ -55,7 +55,7 @@ static final class GroupBySubscriber<K, T> extends Subscriber<T> {
public GroupBySubscriber(Func1<? super T, ? extends K> keySelector, Subscriber<? super GroupedObservable<K, T>> child) {
// a new CompositeSubscription to decouple the subscription as the inner subscriptions need a separate lifecycle
// and will unsubscribe on this parent if they are all unsubscribed
super(new ChainedSubscription());
super(new SubscriptionList());
this.keySelector = keySelector;
this.child = child;
}
Original file line number Diff line number Diff line change
@@ -28,15 +28,15 @@
import rx.Subscriber;
import rx.functions.Action0;
import rx.observables.GroupedObservable;
import rx.subscriptions.ChainedSubscription;
import rx.subscriptions.SubscriptionList;
import rx.subscriptions.Subscriptions;

public final class OperatorPivot<K1, K2, T> implements Operator<GroupedObservable<K2, GroupedObservable<K1, T>>, GroupedObservable<K1, GroupedObservable<K2, T>>> {

@Override
public Subscriber<? super GroupedObservable<K1, GroupedObservable<K2, T>>> call(final Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child) {
final AtomicReference<State> state = new AtomicReference<State>(State.create());
final PivotSubscriber<K1, K2, T> pivotSubscriber = new PivotSubscriber<K1, K2, T>(new ChainedSubscription(), child, state);
final PivotSubscriber<K1, K2, T> pivotSubscriber = new PivotSubscriber<K1, K2, T>(new SubscriptionList(), child, state);
child.add(Subscriptions.create(new Action0() {

@Override
@@ -65,12 +65,12 @@ private static final class PivotSubscriber<K1, K2, T> extends Subscriber<Grouped
* needs to decouple the subscription as the inner subscriptions need a separate lifecycle
* and will unsubscribe on this parent if they are all unsubscribed
*/
private final ChainedSubscription parentSubscription;
private final SubscriptionList parentSubscription;
private final Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child;
private final AtomicReference<State> state;
private final GroupState<K1, K2, T> groups;

private PivotSubscriber(ChainedSubscription parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child, AtomicReference<State> state) {
private PivotSubscriber(SubscriptionList parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child, AtomicReference<State> state) {
super(parentSubscription);
this.parentSubscription = parentSubscription;
this.child = child;
@@ -158,7 +158,7 @@ public void onNext(T t) {
private static final class GroupState<K1, K2, T> {
private final ConcurrentHashMap<KeyPair<K1, K2>, Inner<K1, K2, T>> innerSubjects = new ConcurrentHashMap<KeyPair<K1, K2>, Inner<K1, K2, T>>();
private final ConcurrentHashMap<K2, Outer<K1, K2, T>> outerSubjects = new ConcurrentHashMap<K2, Outer<K1, K2, T>>();
private final ChainedSubscription parentSubscription;
private final SubscriptionList parentSubscription;
private final Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child;
/** Indicates a terminal state. */
volatile int completed;
@@ -167,7 +167,7 @@ private static final class GroupState<K1, K2, T> {
static final AtomicIntegerFieldUpdater<GroupState> COMPLETED_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(GroupState.class, "completed");

public GroupState(ChainedSubscription parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child) {
public GroupState(SubscriptionList parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child) {
this.parentSubscription = parentSubscription;
this.child = child;
}
Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@

import rx.Observable.Operator;
import rx.Subscriber;
import rx.subscriptions.ChainedSubscription;
import rx.subscriptions.SubscriptionList;

/**
* Returns an Observable that emits the first <code>num</code> items emitted by the source
@@ -40,7 +40,7 @@ public OperatorTake(int limit) {

@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
final ChainedSubscription parent = new ChainedSubscription();
final SubscriptionList parent = new SubscriptionList();
if (limit == 0) {
child.onCompleted();
parent.unsubscribe();
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.subscriptions.ChainedSubscription;
import rx.subscriptions.SubscriptionList;
import rx.subscriptions.Subscriptions;

/**
@@ -36,7 +36,7 @@ public OperatorUnsubscribeOn(Scheduler scheduler) {

@Override
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
final ChainedSubscription parentSubscription = new ChainedSubscription();
final SubscriptionList parentSubscription = new SubscriptionList();
subscriber.add(Subscriptions.create(new Action0() {

@Override
Original file line number Diff line number Diff line change
@@ -29,15 +29,15 @@
*
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable(v=vs.103).aspx">Rx.Net equivalent CompositeDisposable</a>
*/
public final class ChainedSubscription implements Subscription {
public final class SubscriptionList implements Subscription {

private List<Subscription> subscriptions;
private boolean unsubscribed = false;

public ChainedSubscription() {
public SubscriptionList() {
}

public ChainedSubscription(final Subscription... subscriptions) {
public SubscriptionList(final Subscription... subscriptions) {
this.subscriptions = new LinkedList<Subscription>(Arrays.asList(subscriptions));
}

@@ -47,8 +47,8 @@ public synchronized boolean isUnsubscribed() {
}

/**
* Adds a new {@link Subscription} to this {@code ChainedSubscription} if the {@code ChainedSubscription} is
* not yet unsubscribed. If the {@code ChainedSubscription} <em>is</em> unsubscribed, {@code add} will
* Adds a new {@link Subscription} to this {@code SubscriptionList} if the {@code SubscriptionList} is
* not yet unsubscribed. If the {@code SubscriptionList} <em>is</em> unsubscribed, {@code add} will
* indicate this by explicitly unsubscribing the new {@code Subscription} as well.
*
* @param s
Original file line number Diff line number Diff line change
@@ -30,12 +30,12 @@
import rx.Subscription;
import rx.exceptions.CompositeException;

public class ChainedSubscriptionTest {
public class SubscriptionListTest {

@Test
public void testSuccess() {
final AtomicInteger counter = new AtomicInteger();
ChainedSubscription s = new ChainedSubscription();
SubscriptionList s = new SubscriptionList();
s.add(new Subscription() {

@Override
@@ -70,7 +70,7 @@ public boolean isUnsubscribed() {
@Test(timeout = 1000)
public void shouldUnsubscribeAll() throws InterruptedException {
final AtomicInteger counter = new AtomicInteger();
final ChainedSubscription s = new ChainedSubscription();
final SubscriptionList s = new SubscriptionList();

final int count = 10;
final CountDownLatch start = new CountDownLatch(1);
@@ -117,7 +117,7 @@ public void run() {
@Test
public void testException() {
final AtomicInteger counter = new AtomicInteger();
ChainedSubscription s = new ChainedSubscription();
SubscriptionList s = new SubscriptionList();
s.add(new Subscription() {

@Override
@@ -159,7 +159,7 @@ public boolean isUnsubscribed() {
@Test
public void testCompositeException() {
final AtomicInteger counter = new AtomicInteger();
ChainedSubscription s = new ChainedSubscription();
SubscriptionList s = new SubscriptionList();
s.add(new Subscription() {

@Override
@@ -215,7 +215,7 @@ public boolean isUnsubscribed() {
@Test
public void testUnsubscribeIdempotence() {
final AtomicInteger counter = new AtomicInteger();
ChainedSubscription s = new ChainedSubscription();
SubscriptionList s = new SubscriptionList();
s.add(new Subscription() {

@Override
@@ -241,7 +241,7 @@ public boolean isUnsubscribed() {
public void testUnsubscribeIdempotenceConcurrently()
throws InterruptedException {
final AtomicInteger counter = new AtomicInteger();
final ChainedSubscription s = new ChainedSubscription();
final SubscriptionList s = new SubscriptionList();

final int count = 10;
final CountDownLatch start = new CountDownLatch(1);