Skip to content

Commit

Permalink
Use deferred requesting
Browse files Browse the repository at this point in the history
  • Loading branch information
ZacSweers committed Nov 18, 2017
1 parent daad41b commit 500f337
Showing 1 changed file with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import io.reactivex.Maybe;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.subscriptions.EmptySubscription;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.observers.DisposableMaybeObserver;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
Expand All @@ -32,6 +34,8 @@ final class AutoDisposingSubscriberImpl<T> extends AtomicInteger
private final AtomicReference<Subscription> mainSubscription = new AtomicReference<>();
private final AtomicReference<Disposable> lifecycleDisposable = new AtomicReference<>();
private final AtomicThrowable error = new AtomicThrowable();
private final AtomicReference<Subscription> ref = new AtomicReference<>();
private final AtomicLong requested = new AtomicLong();
private final Maybe<?> lifecycle;
private final Subscriber<? super T> delegate;

Expand Down Expand Up @@ -69,7 +73,7 @@ final class AutoDisposingSubscriberImpl<T> extends AtomicInteger
if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable, o, getClass())) {
lifecycle.subscribe(o);
if (AutoDisposeEndConsumerHelper.setOnce(mainSubscription, s, getClass())) {
delegate.onSubscribe(this);
SubscriptionHelper.deferredSetOnce(ref, requested, s);
}
}
}
Expand All @@ -84,8 +88,7 @@ final class AutoDisposingSubscriberImpl<T> extends AtomicInteger
* @param n the request amount, positive
*/
@SuppressWarnings("NullAway") @Override public void request(long n) {
mainSubscription.get()
.request(n);
SubscriptionHelper.deferredRequest(ref, requested, n);
}

/**
Expand Down

0 comments on commit 500f337

Please sign in to comment.