Skip to content

Commit

Permalink
Merge pull request #2921 from davidmoten/observe-on-request-overflow
Browse files Browse the repository at this point in the history
OperatorObserveOn - handle request overflow correctly
  • Loading branch information
akarnokd committed Apr 29, 2015
2 parents aeee037 + fcfa4e8 commit 24dadf1
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 7 deletions.
28 changes: 21 additions & 7 deletions src/main/java/rx/internal/operators/OperatorObserveOn.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,22 @@
package rx.internal.operators;

import java.util.Queue;
import java.util.concurrent.atomic.*;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

import rx.Observable.Operator;
import rx.*;
import rx.Producer;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.exceptions.MissingBackpressureException;
import rx.functions.Action0;
import rx.internal.util.*;
import rx.internal.util.unsafe.*;
import rx.schedulers.*;
import rx.internal.util.RxRingBuffer;
import rx.internal.util.SynchronizedQueue;
import rx.internal.util.unsafe.SpscArrayQueue;
import rx.internal.util.unsafe.UnsafeAccess;
import rx.schedulers.ImmediateScheduler;
import rx.schedulers.TrampolineScheduler;

/**
* Delivers events on the specified {@code Scheduler} asynchronously via an unbounded buffer.
Expand Down Expand Up @@ -54,7 +61,9 @@ public Subscriber<? super T> call(Subscriber<? super T> child) {
// avoid overhead, execute directly
return child;
} else {
return new ObserveOnSubscriber<T>(scheduler, child);
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child);
parent.init();
return parent;
}
}

Expand Down Expand Up @@ -91,12 +100,17 @@ public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child) {
queue = new SynchronizedQueue<Object>(RxRingBuffer.SIZE);
}
this.scheduledUnsubscribe = new ScheduledUnsubscribe(recursiveScheduler);
}

void init() {
// don't want this code in the constructor because `this` can escape through the
// setProducer call
child.add(scheduledUnsubscribe);
child.setProducer(new Producer() {

@Override
public void request(long n) {
REQUESTED.getAndAdd(ObserveOnSubscriber.this, n);
BackpressureUtils.getAndAddRequest(REQUESTED, ObserveOnSubscriber.this, n);
schedule();
}

Expand Down
41 changes: 41 additions & 0 deletions src/test/java/rx/internal/operators/OperatorObserveOnTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -724,4 +724,45 @@ public Long call(Long t1, Integer t2) {
assertEquals(MissingBackpressureException.class, ts.getOnErrorEvents().get(0).getClass());
}

@Test
public void testRequestOverflow() throws InterruptedException {

final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger count = new AtomicInteger();
Observable.range(1, 100).observeOn(Schedulers.computation())
.subscribe(new Subscriber<Integer>() {

boolean first = true;

@Override
public void onStart() {
request(2);
}

@Override
public void onCompleted() {
latch.countDown();
}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Integer t) {
count.incrementAndGet();
if (first) {
request(Long.MAX_VALUE - 1);
request(Long.MAX_VALUE - 1);
request(10);
first = false;
}
}
});
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertEquals(100, count.get());

}

}

0 comments on commit 24dadf1

Please sign in to comment.