From 98778aa32e6b4ec0ce597445e96d65c4b978ec28 Mon Sep 17 00:00:00 2001 From: Matt Jacobs Date: Mon, 28 Jul 2014 21:29:11 -0700 Subject: [PATCH 1/2] Failing unit test for reduce, showing it does not implement backpressure correctly --- .../rx/internal/operators/OperatorReduceTest.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/rxjava-core/src/test/java/rx/internal/operators/OperatorReduceTest.java b/rxjava-core/src/test/java/rx/internal/operators/OperatorReduceTest.java index f5fcc2b2b1..283fffd55a 100644 --- a/rxjava-core/src/test/java/rx/internal/operators/OperatorReduceTest.java +++ b/rxjava-core/src/test/java/rx/internal/operators/OperatorReduceTest.java @@ -16,6 +16,7 @@ package rx.internal.operators; +import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -114,4 +115,18 @@ public Integer call(Integer t1) { verify(observer, times(1)).onError(any(TestException.class)); } + @Test(timeout = 13000) + public void testBackpressure() throws InterruptedException { + Observable source = Observable.from(1, 2, 3, 4, 5, 6); + Observable reduced = source.reduce(new Func2() { + @Override + public Integer call(Integer i1, Integer i2) { + return i1 + i2; + } + }); + + Integer r = reduced.toBlocking().first(); + assertEquals(21, r.intValue()); + } + } From 46290b81452a202235142b80b53432e9aaf03306 Mon Sep 17 00:00:00 2001 From: Matt Jacobs Date: Mon, 28 Jul 2014 21:56:10 -0700 Subject: [PATCH 2/2] Added another unit test to OperatorReduce/backpressure --- .../operators/OperatorReduceTest.java | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/rxjava-core/src/test/java/rx/internal/operators/OperatorReduceTest.java b/rxjava-core/src/test/java/rx/internal/operators/OperatorReduceTest.java index 283fffd55a..fa8e9648f2 100644 --- a/rxjava-core/src/test/java/rx/internal/operators/OperatorReduceTest.java +++ b/rxjava-core/src/test/java/rx/internal/operators/OperatorReduceTest.java @@ -115,18 +115,24 @@ public Integer call(Integer t1) { verify(observer, times(1)).onError(any(TestException.class)); } - @Test(timeout = 13000) - public void testBackpressure() throws InterruptedException { + @Test + public void testBackpressureWithNoInitialValue() throws InterruptedException { Observable source = Observable.from(1, 2, 3, 4, 5, 6); - Observable reduced = source.reduce(new Func2() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - }); + Observable reduced = source.reduce(sum); + + Integer r = reduced.toBlocking().first(); + assertEquals(21, r.intValue()); + } + + @Test + public void testBackpressureWithInitialValue() throws InterruptedException { + Observable source = Observable.from(1, 2, 3, 4, 5, 6); + Observable reduced = source.reduce(0, sum); Integer r = reduced.toBlocking().first(); assertEquals(21, r.intValue()); } + + }