From f6ca9ee01c835a5be384ca5dfecf97dc740e2ad6 Mon Sep 17 00:00:00 2001 From: Dave Moten Date: Wed, 29 Apr 2015 13:33:58 +1000 Subject: [PATCH 1/2] prevent request overflow in OperatorObserveOn and add unit test that fails on original codebase but passes with change --- .../internal/operators/OperatorObserveOn.java | 19 ++++++--- .../operators/OperatorObserveOnTest.java | 41 +++++++++++++++++++ 2 files changed, 54 insertions(+), 6 deletions(-) diff --git a/src/main/java/rx/internal/operators/OperatorObserveOn.java b/src/main/java/rx/internal/operators/OperatorObserveOn.java index af08f2a1b9..ab0f5ca501 100644 --- a/src/main/java/rx/internal/operators/OperatorObserveOn.java +++ b/src/main/java/rx/internal/operators/OperatorObserveOn.java @@ -16,15 +16,22 @@ package rx.internal.operators; import java.util.Queue; -import java.util.concurrent.atomic.*; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import rx.Observable.Operator; -import rx.*; +import rx.Producer; +import rx.Scheduler; +import rx.Subscriber; +import rx.Subscription; import rx.exceptions.MissingBackpressureException; import rx.functions.Action0; -import rx.internal.util.*; -import rx.internal.util.unsafe.*; -import rx.schedulers.*; +import rx.internal.util.RxRingBuffer; +import rx.internal.util.SynchronizedQueue; +import rx.internal.util.unsafe.SpscArrayQueue; +import rx.internal.util.unsafe.UnsafeAccess; +import rx.schedulers.ImmediateScheduler; +import rx.schedulers.TrampolineScheduler; /** * Delivers events on the specified {@code Scheduler} asynchronously via an unbounded buffer. @@ -96,7 +103,7 @@ public ObserveOnSubscriber(Scheduler scheduler, Subscriber child) { @Override public void request(long n) { - REQUESTED.getAndAdd(ObserveOnSubscriber.this, n); + BackpressureUtils.getAndAddRequest(REQUESTED, ObserveOnSubscriber.this, n); schedule(); } diff --git a/src/test/java/rx/internal/operators/OperatorObserveOnTest.java b/src/test/java/rx/internal/operators/OperatorObserveOnTest.java index de204a82ec..b0c8a5bcfd 100644 --- a/src/test/java/rx/internal/operators/OperatorObserveOnTest.java +++ b/src/test/java/rx/internal/operators/OperatorObserveOnTest.java @@ -724,4 +724,45 @@ public Long call(Long t1, Integer t2) { assertEquals(MissingBackpressureException.class, ts.getOnErrorEvents().get(0).getClass()); } + @Test + public void testRequestOverflow() throws InterruptedException { + + final CountDownLatch latch = new CountDownLatch(1); + final AtomicInteger count = new AtomicInteger(); + Observable.range(1, 100).observeOn(Schedulers.computation()) + .subscribe(new Subscriber() { + + boolean first = true; + + @Override + public void onStart() { + request(2); + } + + @Override + public void onCompleted() { + latch.countDown(); + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onNext(Integer t) { + count.incrementAndGet(); + if (first) { + request(Long.MAX_VALUE - 1); + request(Long.MAX_VALUE - 1); + request(10); + first = false; + } + } + }); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertEquals(100, count.get()); + + } + } From fcfa4e83845ca21a5a622a726d346b46c487dce0 Mon Sep 17 00:00:00 2001 From: Dave Moten Date: Wed, 29 Apr 2015 17:34:54 +1000 Subject: [PATCH 2/2] ensure this does not escape from ObserveOnSubscriber constructor by moving code to an init() method and calling after construction --- .../java/rx/internal/operators/OperatorObserveOn.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/main/java/rx/internal/operators/OperatorObserveOn.java b/src/main/java/rx/internal/operators/OperatorObserveOn.java index ab0f5ca501..13a78ca14c 100644 --- a/src/main/java/rx/internal/operators/OperatorObserveOn.java +++ b/src/main/java/rx/internal/operators/OperatorObserveOn.java @@ -61,7 +61,9 @@ public Subscriber call(Subscriber child) { // avoid overhead, execute directly return child; } else { - return new ObserveOnSubscriber(scheduler, child); + ObserveOnSubscriber parent = new ObserveOnSubscriber(scheduler, child); + parent.init(); + return parent; } } @@ -98,6 +100,11 @@ public ObserveOnSubscriber(Scheduler scheduler, Subscriber child) { queue = new SynchronizedQueue(RxRingBuffer.SIZE); } this.scheduledUnsubscribe = new ScheduledUnsubscribe(recursiveScheduler); + } + + void init() { + // don't want this code in the constructor because `this` can escape through the + // setProducer call child.add(scheduledUnsubscribe); child.setProducer(new Producer() {