From 678c62f5e29b597cda44c42372b6771c0da25abf Mon Sep 17 00:00:00 2001 From: akarnokd Date: Sun, 15 Oct 2017 15:24:24 +0200 Subject: [PATCH] 2.x: fix PublishProcessor cancel/emission overflow bug --- .../processors/PublishProcessor.java | 4 +-- .../processors/BehaviorProcessorTest.java | 33 +++++++++++++++++++ .../processors/PublishProcessorTest.java | 33 +++++++++++++++++++ 3 files changed, 67 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/reactivex/processors/PublishProcessor.java b/src/main/java/io/reactivex/processors/PublishProcessor.java index fb3b2ead83..b9341ce961 100644 --- a/src/main/java/io/reactivex/processors/PublishProcessor.java +++ b/src/main/java/io/reactivex/processors/PublishProcessor.java @@ -313,9 +313,7 @@ public void onNext(T t) { } if (r != 0L) { actual.onNext(t); - if (r != Long.MAX_VALUE) { - decrementAndGet(); - } + BackpressureHelper.producedCancel(this, 1); } else { cancel(); actual.onError(new MissingBackpressureException("Could not emit value due to lack of requests")); diff --git a/src/test/java/io/reactivex/processors/BehaviorProcessorTest.java b/src/test/java/io/reactivex/processors/BehaviorProcessorTest.java index 8a1766f724..9cb392adba 100644 --- a/src/test/java/io/reactivex/processors/BehaviorProcessorTest.java +++ b/src/test/java/io/reactivex/processors/BehaviorProcessorTest.java @@ -812,4 +812,37 @@ public void run() { ts.assertFailure(TestException.class); } } + + @Test(timeout = 10000) + public void subscriberCancelOfferRace() { + for (int i = 0; i < 1000; i++) { + final BehaviorProcessor pp = BehaviorProcessor.create(); + + final TestSubscriber ts = pp.test(1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + for (int i = 0; i < 2; i++) { + while (!pp.offer(i)) ; + } + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + TestHelper.race(r1, r2); + + if (ts.valueCount() > 0) { + ts.assertValuesOnly(0); + } else { + ts.assertEmpty(); + } + } + } } diff --git a/src/test/java/io/reactivex/processors/PublishProcessorTest.java b/src/test/java/io/reactivex/processors/PublishProcessorTest.java index 3078250ac0..918650c8a1 100644 --- a/src/test/java/io/reactivex/processors/PublishProcessorTest.java +++ b/src/test/java/io/reactivex/processors/PublishProcessorTest.java @@ -677,4 +677,37 @@ public void run() { .awaitDone(5, TimeUnit.SECONDS) .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); } + + @Test(timeout = 10000) + public void subscriberCancelOfferRace() { + for (int i = 0; i < 1000; i++) { + final PublishProcessor pp = PublishProcessor.create(); + + final TestSubscriber ts = pp.test(1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + for (int i = 0; i < 2; i++) { + while (!pp.offer(i)) ; + } + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + TestHelper.race(r1, r2); + + if (ts.valueCount() > 0) { + ts.assertValuesOnly(0); + } else { + ts.assertEmpty(); + } + } + } }