From f82a19743155fe7d2006e239dde24a78712434c0 Mon Sep 17 00:00:00 2001 From: David Karnok Date: Tue, 20 Jun 2017 10:02:05 +0200 Subject: [PATCH] 2.x: fix buffer(time, maxSize) duplicating buffers on time-size race (#5427) --- .../flowable/FlowableBufferTimed.java | 17 +++----- .../observable/ObservableBufferTimed.java | 18 +++----- .../flowable/FlowableBufferTest.java | 41 +++++++++++++++++++ .../observable/ObservableBufferTest.java | 41 +++++++++++++++++++ 4 files changed, 94 insertions(+), 23 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java index e5ecf84f49..f6320ced12 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java @@ -460,12 +460,12 @@ public void onNext(T t) { if (b.size() < maxSize) { return; } - } - if (restartTimerOnMaxSize) { buffer = null; producerIndex++; + } + if (restartTimerOnMaxSize) { timer.dispose(); } @@ -480,17 +480,12 @@ public void onNext(T t) { return; } + synchronized (this) { + buffer = b; + consumerIndex++; + } if (restartTimerOnMaxSize) { - synchronized (this) { - buffer = b; - consumerIndex++; - } - timer = w.schedulePeriodically(this, timespan, timespan, unit); - } else { - synchronized (this) { - buffer = b; - } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferTimed.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferTimed.java index c93a8e7f57..d521ebae80 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferTimed.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferTimed.java @@ -458,12 +458,11 @@ public void onNext(T t) { if (b.size() < maxSize) { return; } - } - - if (restartTimerOnMaxSize) { buffer = null; producerIndex++; + } + if (restartTimerOnMaxSize) { timer.dispose(); } @@ -478,17 +477,12 @@ public void onNext(T t) { return; } + synchronized (this) { + buffer = b; + consumerIndex++; + } if (restartTimerOnMaxSize) { - synchronized (this) { - buffer = b; - consumerIndex++; - } - timer = w.schedulePeriodically(this, timespan, timespan, unit); - } else { - synchronized (this) { - buffer = b; - } } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java index dcc14b5eba..d2ebe2ca11 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java @@ -1973,4 +1973,45 @@ public void skipBackpressure() { .test() .assertResult(Arrays.asList(1, 2), Arrays.asList(4, 5), Arrays.asList(7, 8), Arrays.asList(10)); } + + @Test + public void withTimeAndSizeCapacityRace() { + for (int i = 0; i < 1000; i++) { + final TestScheduler scheduler = new TestScheduler(); + + final PublishProcessor ps = PublishProcessor.create(); + + TestSubscriber> ts = ps.buffer(1, TimeUnit.SECONDS, scheduler, 5).test(); + + ps.onNext(1); + ps.onNext(2); + ps.onNext(3); + ps.onNext(4); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps.onNext(5); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + } + }; + + TestHelper.race(r1, r2); + + ps.onComplete(); + + int items = 0; + for (List o : ts.values()) { + items += o.size(); + } + + assertEquals("Round: " + i, 5, items); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java index 6d88854344..d7b6046e42 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java @@ -1398,4 +1398,45 @@ public void bufferTimedExactBoundedError() { to .assertFailure(TestException.class); } + + @Test + public void withTimeAndSizeCapacityRace() { + for (int i = 0; i < 1000; i++) { + final TestScheduler scheduler = new TestScheduler(); + + final PublishSubject ps = PublishSubject.create(); + + TestObserver> ts = ps.buffer(1, TimeUnit.SECONDS, scheduler, 5).test(); + + ps.onNext(1); + ps.onNext(2); + ps.onNext(3); + ps.onNext(4); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps.onNext(5); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + } + }; + + TestHelper.race(r1, r2); + + ps.onComplete(); + + int items = 0; + for (List o : ts.values()) { + items += o.size(); + } + + assertEquals("Round: " + i, 5, items); + } + } }