Skip to content

Commit

Permalink
Fixed request accounting, increased visibility of BackpressureUtils
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Apr 7, 2015
1 parent 9f2fc67 commit 31339a2
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 16 deletions.
23 changes: 15 additions & 8 deletions src/main/java/rx/internal/operators/BackpressureUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
* Utility functions for use with backpressure.
*
*/
final class BackpressureUtils {

public final class BackpressureUtils {
/** Utility class, no instances. */
private BackpressureUtils() {
throw new IllegalStateException("No instances!");
}
/**
* Adds {@code n} to {@code requested} field and returns the value prior to
* addition once the addition is successful (uses CAS semantics). If
Expand All @@ -37,16 +40,18 @@ final class BackpressureUtils {
* the number of requests to add to the requested count
* @return requested value just prior to successful addition
*/
static <T> long getAndAddRequest(AtomicLongFieldUpdater<T> requested, T object, long n) {
public static <T> long getAndAddRequest(AtomicLongFieldUpdater<T> requested, T object, long n) {
// add n to field but check for overflow
while (true) {
long current = requested.get(object);
long next = current + n;
// check for overflow
if (next < 0)
if (next < 0) {
next = Long.MAX_VALUE;
if (requested.compareAndSet(object, current, next))
}
if (requested.compareAndSet(object, current, next)) {
return current;
}
}
}

Expand All @@ -63,16 +68,18 @@ static <T> long getAndAddRequest(AtomicLongFieldUpdater<T> requested, T object,
* the number of requests to add to the requested count
* @return requested value just prior to successful addition
*/
static <T> long getAndAddRequest(AtomicLong requested, long n) {
public static long getAndAddRequest(AtomicLong requested, long n) {
// add n to field but check for overflow
while (true) {
long current = requested.get();
long next = current + n;
// check for overflow
if (next < 0)
if (next < 0) {
next = Long.MAX_VALUE;
if (requested.compareAndSet(current, next))
}
if (requested.compareAndSet(current, next)) {
return current;
}
}
}
}
16 changes: 9 additions & 7 deletions src/main/java/rx/observables/AbstractOnSubscribe.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import rx.annotations.Experimental;
import rx.exceptions.CompositeException;
import rx.functions.*;
import rx.internal.operators.BackpressureUtils;

/**
* Abstract base class for the {@link OnSubscribe} interface that helps you build Observable sources one
Expand Down Expand Up @@ -332,14 +333,15 @@ private SubscriptionProducer(SubscriptionState<T, S> state) {
}
@Override
public void request(long n) {
if (n == Long.MAX_VALUE) {
for (; !state.subscriber.isUnsubscribed(); ) {
if (!doNext()) {
break;
if (n > 0 && BackpressureUtils.getAndAddRequest(state.requestCount, n) == 0) {
if (n == Long.MAX_VALUE) {
// fast-path
for (; !state.subscriber.isUnsubscribed(); ) {
if (!doNext()) {
break;
}
}
}
} else
if (n > 0 && state.requestCount.getAndAdd(n) == 0) {
} else
if (!state.subscriber.isUnsubscribed()) {
do {
if (!doNext()) {
Expand Down
36 changes: 35 additions & 1 deletion src/test/java/rx/observables/AbstractOnSubscribeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@

package rx.observables;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.*;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Test;
import org.mockito.InOrder;
Expand Down Expand Up @@ -503,4 +504,37 @@ public void testMissingEmission() {
verify(o, never()).onNext(any(Object.class));
verify(o).onError(any(IllegalStateException.class));
}

@Test
public void testCanRequestInOnNext() {
AbstractOnSubscribe<Integer, Void> aos = new AbstractOnSubscribe<Integer, Void>() {
@Override
protected void next(SubscriptionState<Integer, Void> state) {
state.onNext(1);
state.onCompleted();
}
};
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
aos.toObservable().subscribe(new Subscriber<Integer>() {

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {
exception.set(e);
}

@Override
public void onNext(Integer t) {
request(1);
}
});
if (exception.get()!=null) {
exception.get().printStackTrace();
}
assertNull(exception.get());
}
}

0 comments on commit 31339a2

Please sign in to comment.