From 6afa40c0493bd90f012a91f2c20ce4cc2695826e Mon Sep 17 00:00:00 2001 From: Nathan Kooij Date: Wed, 21 Jun 2017 14:42:39 +0200 Subject: [PATCH 1/4] Add breaking test for expected peek behavior in a time bound replay subject. --- .../reactivex/subjects/ReplaySubjectTest.java | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/test/java/io/reactivex/subjects/ReplaySubjectTest.java b/src/test/java/io/reactivex/subjects/ReplaySubjectTest.java index 6280788ad7..2945160e2d 100644 --- a/src/test/java/io/reactivex/subjects/ReplaySubjectTest.java +++ b/src/test/java/io/reactivex/subjects/ReplaySubjectTest.java @@ -931,6 +931,26 @@ public void peekStateTimeAndSizeValue() { assertNull(rp.getValues(new Integer[2])[0]); } + @Test + public void peekStateTimeAndSizeValueExpired() { + TestScheduler scheduler = new TestScheduler(); + ReplaySubject rp = ReplaySubject.createWithTimeAndSize(1, TimeUnit.DAYS, scheduler, 1); + + assertNull(rp.getValue()); + assertNull(rp.getValues(new Integer[2])[0]); + + rp.onNext(2); + + assertEquals((Integer)2, rp.getValue()); + assertEquals(2, rp.getValues()[0]); + + scheduler.advanceTimeBy(2, TimeUnit.DAYS); + + assertEquals(null, rp.getValue()); + assertEquals(0, rp.getValues().length); + assertNull(rp.getValues(new Integer[2])[0]); + } + @Test public void onNextNull() { final ReplaySubject s = ReplaySubject.create(); From bf1c1895f5fb66fdb38f1ad9d09212b3462a1ffd Mon Sep 17 00:00:00 2001 From: Nathan Kooij Date: Wed, 21 Jun 2017 14:43:48 +0200 Subject: [PATCH 2/4] Add possible fix for fixing peek behavior. --- src/main/java/io/reactivex/subjects/ReplaySubject.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/main/java/io/reactivex/subjects/ReplaySubject.java b/src/main/java/io/reactivex/subjects/ReplaySubject.java index c56618d17a..4b684e31bf 100644 --- a/src/main/java/io/reactivex/subjects/ReplaySubject.java +++ b/src/main/java/io/reactivex/subjects/ReplaySubject.java @@ -1018,6 +1018,11 @@ public T getValue() { h = next; } + long limit = scheduler.now(unit) - maxAge; + if (h.time < limit) { + return null; + } + Object v = h.value; if (v == null) { return null; From 38d8f0942ff3e52fbc475cf840ef91980a462faf Mon Sep 17 00:00:00 2001 From: Nathan Kooij Date: Wed, 21 Jun 2017 15:42:12 +0200 Subject: [PATCH 3/4] Add test & fix for ReplayProcessor::getValue. --- .../reactivex/processors/ReplayProcessor.java | 5 ++ .../processors/ReplayProcessorTest.java | 46 +++++++++++++++---- .../reactivex/subjects/ReplaySubjectTest.java | 2 +- 3 files changed, 44 insertions(+), 9 deletions(-) diff --git a/src/main/java/io/reactivex/processors/ReplayProcessor.java b/src/main/java/io/reactivex/processors/ReplayProcessor.java index 931d207312..c8650e3981 100644 --- a/src/main/java/io/reactivex/processors/ReplayProcessor.java +++ b/src/main/java/io/reactivex/processors/ReplayProcessor.java @@ -1052,6 +1052,11 @@ public T getValue() { h = next; } + long limit = scheduler.now(unit) - maxAge; + if (h.time < limit) { + return null; + } + Object v = h.value; if (v == null) { return null; diff --git a/src/test/java/io/reactivex/processors/ReplayProcessorTest.java b/src/test/java/io/reactivex/processors/ReplayProcessorTest.java index 14fb6869a1..87ff9d3bd3 100644 --- a/src/test/java/io/reactivex/processors/ReplayProcessorTest.java +++ b/src/test/java/io/reactivex/processors/ReplayProcessorTest.java @@ -13,6 +13,20 @@ package io.reactivex.processors; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.notNull; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + import io.reactivex.Flowable; import io.reactivex.TestHelper; import io.reactivex.disposables.Disposable; @@ -23,20 +37,16 @@ import io.reactivex.schedulers.TestScheduler; import io.reactivex.subscribers.DefaultSubscriber; import io.reactivex.subscribers.TestSubscriber; +import java.util.Arrays; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; import org.mockito.InOrder; import org.mockito.Mockito; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import java.util.Arrays; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; - public class ReplayProcessorTest extends FlowableProcessorTest { private final Throwable testException = new Throwable(); @@ -1030,6 +1040,26 @@ public void peekStateTimeAndSizeValue() { assertNull(rp.getValues(new Integer[2])[0]); } + @Test + public void peekStateTimeAndSizeValueExpired() { + TestScheduler scheduler = new TestScheduler(); + ReplayProcessor rp = ReplayProcessor.createWithTime(1, TimeUnit.DAYS, scheduler); + + assertNull(rp.getValue()); + assertNull(rp.getValues(new Integer[2])[0]); + + rp.onNext(2); + + assertEquals((Integer)2, rp.getValue()); + assertEquals(2, rp.getValues()[0]); + + scheduler.advanceTimeBy(2, TimeUnit.DAYS); + + assertEquals(null, rp.getValue()); + assertEquals(0, rp.getValues().length); + assertNull(rp.getValues(new Integer[2])[0]); + } + @Test public void capacityHint() { ReplayProcessor rp = ReplayProcessor.create(8); diff --git a/src/test/java/io/reactivex/subjects/ReplaySubjectTest.java b/src/test/java/io/reactivex/subjects/ReplaySubjectTest.java index 2945160e2d..0f5e1032ab 100644 --- a/src/test/java/io/reactivex/subjects/ReplaySubjectTest.java +++ b/src/test/java/io/reactivex/subjects/ReplaySubjectTest.java @@ -934,7 +934,7 @@ public void peekStateTimeAndSizeValue() { @Test public void peekStateTimeAndSizeValueExpired() { TestScheduler scheduler = new TestScheduler(); - ReplaySubject rp = ReplaySubject.createWithTimeAndSize(1, TimeUnit.DAYS, scheduler, 1); + ReplaySubject rp = ReplaySubject.createWithTime(1, TimeUnit.DAYS, scheduler); assertNull(rp.getValue()); assertNull(rp.getValues(new Integer[2])[0]); From 3eec6293b3a27a77b406d22a967a9367d702064d Mon Sep 17 00:00:00 2001 From: Nathan Kooij Date: Wed, 21 Jun 2017 15:56:48 +0200 Subject: [PATCH 4/4] Roll-back auto-formatted imports. --- .../processors/ReplayProcessorTest.java | 26 ++++++------------- 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/src/test/java/io/reactivex/processors/ReplayProcessorTest.java b/src/test/java/io/reactivex/processors/ReplayProcessorTest.java index 87ff9d3bd3..9d5c90f605 100644 --- a/src/test/java/io/reactivex/processors/ReplayProcessorTest.java +++ b/src/test/java/io/reactivex/processors/ReplayProcessorTest.java @@ -13,20 +13,6 @@ package io.reactivex.processors; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.notNull; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; - import io.reactivex.Flowable; import io.reactivex.TestHelper; import io.reactivex.disposables.Disposable; @@ -37,16 +23,20 @@ import io.reactivex.schedulers.TestScheduler; import io.reactivex.subscribers.DefaultSubscriber; import io.reactivex.subscribers.TestSubscriber; -import java.util.Arrays; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; import org.mockito.InOrder; import org.mockito.Mockito; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import java.util.Arrays; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + public class ReplayProcessorTest extends FlowableProcessorTest { private final Throwable testException = new Throwable();