Skip to content

Commit

Permalink
Fix for AbstractOnSubscribe request accounting.
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Apr 3, 2015
1 parent 9f2fc67 commit 622b0a9
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 16 deletions.
11 changes: 7 additions & 4 deletions src/main/java/rx/internal/operators/BackpressureUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
* Utility functions for use with backpressure.
*
*/
final class BackpressureUtils {

public final class BackpressureUtils {
/** Utility class, no instances. */
private BackpressureUtils() {
throw new IllegalStateException("No instances!");
}
/**
* Adds {@code n} to {@code requested} field and returns the value prior to
* addition once the addition is successful (uses CAS semantics). If
Expand All @@ -37,7 +40,7 @@ final class BackpressureUtils {
* the number of requests to add to the requested count
* @return requested value just prior to successful addition
*/
static <T> long getAndAddRequest(AtomicLongFieldUpdater<T> requested, T object, long n) {
public static <T> long getAndAddRequest(AtomicLongFieldUpdater<T> requested, T object, long n) {
// add n to field but check for overflow
while (true) {
long current = requested.get(object);
Expand All @@ -63,7 +66,7 @@ static <T> long getAndAddRequest(AtomicLongFieldUpdater<T> requested, T object,
* the number of requests to add to the requested count
* @return requested value just prior to successful addition
*/
static <T> long getAndAddRequest(AtomicLong requested, long n) {
public static <T> long getAndAddRequest(AtomicLong requested, long n) {
// add n to field but check for overflow
while (true) {
long current = requested.get();
Expand Down
25 changes: 14 additions & 11 deletions src/main/java/rx/observables/AbstractOnSubscribe.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import rx.annotations.Experimental;
import rx.exceptions.CompositeException;
import rx.functions.*;
import rx.internal.operators.BackpressureUtils;

/**
* Abstract base class for the {@link OnSubscribe} interface that helps you build Observable sources one
Expand Down Expand Up @@ -332,20 +333,22 @@ private SubscriptionProducer(SubscriptionState<T, S> state) {
}
@Override
public void request(long n) {
if (n == Long.MAX_VALUE) {
for (; !state.subscriber.isUnsubscribed(); ) {
if (!doNext()) {
break;
}
}
} else
if (n > 0 && state.requestCount.getAndAdd(n) == 0) {
if (!state.subscriber.isUnsubscribed()) {
do {
if (n > 0 && BackpressureUtils.getAndAddRequest(state.requestCount, n) == 0) {
// fast-path
if (n == Long.MAX_VALUE) {
for (; !state.subscriber.isUnsubscribed(); ) {
if (!doNext()) {
break;
}
} while (state.requestCount.decrementAndGet() > 0 && !state.subscriber.isUnsubscribed());
}
} else {
if (!state.subscriber.isUnsubscribed()) {
do {
if (!doNext()) {
break;
}
} while (state.requestCount.decrementAndGet() > 0 && !state.subscriber.isUnsubscribed());
}
}
}
}
Expand Down
36 changes: 35 additions & 1 deletion src/test/java/rx/observables/AbstractOnSubscribeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@

package rx.observables;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.*;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Test;
import org.mockito.InOrder;
Expand Down Expand Up @@ -503,4 +504,37 @@ public void testMissingEmission() {
verify(o, never()).onNext(any(Object.class));
verify(o).onError(any(IllegalStateException.class));
}

@Test
public void testCanRequestInOnNext() {
AbstractOnSubscribe<Integer, Void> aos = new AbstractOnSubscribe<Integer, Void>() {
@Override
protected void next(SubscriptionState<Integer, Void> state) {
state.onNext(1);
state.onCompleted();
}
};
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
aos.toObservable().subscribe(new Subscriber<Integer>() {

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {
exception.set(e);
}

@Override
public void onNext(Integer t) {
request(1);
}
});
if (exception.get()!=null) {
exception.get().printStackTrace();
}
assertNull(exception.get());
}
}

0 comments on commit 622b0a9

Please sign in to comment.