diff --git a/src/main/java/rx/internal/operators/BackpressureUtils.java b/src/main/java/rx/internal/operators/BackpressureUtils.java index 7e7ce2ae55c..88d3c59ad60 100644 --- a/src/main/java/rx/internal/operators/BackpressureUtils.java +++ b/src/main/java/rx/internal/operators/BackpressureUtils.java @@ -22,8 +22,11 @@ * Utility functions for use with backpressure. * */ -final class BackpressureUtils { - +public final class BackpressureUtils { + /** Utility class, no instances. */ + private BackpressureUtils() { + throw new IllegalStateException("No instances!"); + } /** * Adds {@code n} to {@code requested} field and returns the value prior to * addition once the addition is successful (uses CAS semantics). If @@ -37,7 +40,7 @@ final class BackpressureUtils { * the number of requests to add to the requested count * @return requested value just prior to successful addition */ - static long getAndAddRequest(AtomicLongFieldUpdater requested, T object, long n) { + public static long getAndAddRequest(AtomicLongFieldUpdater requested, T object, long n) { // add n to field but check for overflow while (true) { long current = requested.get(object); @@ -63,7 +66,7 @@ static long getAndAddRequest(AtomicLongFieldUpdater requested, T object, * the number of requests to add to the requested count * @return requested value just prior to successful addition */ - static long getAndAddRequest(AtomicLong requested, long n) { + public static long getAndAddRequest(AtomicLong requested, long n) { // add n to field but check for overflow while (true) { long current = requested.get(); diff --git a/src/main/java/rx/observables/AbstractOnSubscribe.java b/src/main/java/rx/observables/AbstractOnSubscribe.java index ea78c56c75b..1a1526766eb 100644 --- a/src/main/java/rx/observables/AbstractOnSubscribe.java +++ b/src/main/java/rx/observables/AbstractOnSubscribe.java @@ -24,6 +24,7 @@ import rx.annotations.Experimental; import rx.exceptions.CompositeException; import rx.functions.*; +import rx.internal.operators.BackpressureUtils; /** * Abstract base class for the {@link OnSubscribe} interface that helps you build Observable sources one @@ -332,14 +333,15 @@ private SubscriptionProducer(SubscriptionState state) { } @Override public void request(long n) { - if (n == Long.MAX_VALUE) { - for (; !state.subscriber.isUnsubscribed(); ) { - if (!doNext()) { - break; + if (n > 0 && BackpressureUtils.getAndAddRequest(state.requestCount, n) == 0) { + if (n == Long.MAX_VALUE) { + // fast-path + for (; !state.subscriber.isUnsubscribed(); ) { + if (!doNext()) { + break; + } } - } - } else - if (n > 0 && state.requestCount.getAndAdd(n) == 0) { + } else if (!state.subscriber.isUnsubscribed()) { do { if (!doNext()) { diff --git a/src/test/java/rx/observables/AbstractOnSubscribeTest.java b/src/test/java/rx/observables/AbstractOnSubscribeTest.java index e408a166f03..95e3eac0117 100644 --- a/src/test/java/rx/observables/AbstractOnSubscribeTest.java +++ b/src/test/java/rx/observables/AbstractOnSubscribeTest.java @@ -16,12 +16,13 @@ package rx.observables; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; import static org.mockito.Matchers.any; import static org.mockito.Mockito.*; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; import org.mockito.InOrder; @@ -503,4 +504,37 @@ public void testMissingEmission() { verify(o, never()).onNext(any(Object.class)); verify(o).onError(any(IllegalStateException.class)); } + + @Test + public void testCanRequestInOnNext() { + AbstractOnSubscribe aos = new AbstractOnSubscribe() { + @Override + protected void next(SubscriptionState state) { + state.onNext(1); + state.onCompleted(); + } + }; + final AtomicReference exception = new AtomicReference(); + aos.toObservable().subscribe(new Subscriber() { + + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + exception.set(e); + } + + @Override + public void onNext(Integer t) { + request(1); + } + }); + if (exception.get()!=null) { + exception.get().printStackTrace(); + } + assertNull(exception.get()); + } }