diff --git a/src/main/java/rx/internal/operators/OperatorAll.java b/src/main/java/rx/internal/operators/OperatorAll.java index 77145c71cc..24619e3b20 100644 --- a/src/main/java/rx/internal/operators/OperatorAll.java +++ b/src/main/java/rx/internal/operators/OperatorAll.java @@ -34,7 +34,7 @@ public OperatorAll(Func1 predicate) { @Override public Subscriber call(final Subscriber child) { - return new Subscriber(child) { + Subscriber s = new Subscriber() { boolean done; @Override @@ -65,5 +65,7 @@ public void onCompleted() { } } }; + child.add(s); + return s; } } diff --git a/src/test/java/rx/internal/operators/OperatorAllTest.java b/src/test/java/rx/internal/operators/OperatorAllTest.java index e5f4f32a7d..92a2dfa056 100644 --- a/src/test/java/rx/internal/operators/OperatorAllTest.java +++ b/src/test/java/rx/internal/operators/OperatorAllTest.java @@ -15,19 +15,17 @@ */ package rx.internal.operators; -import static org.junit.Assert.assertFalse; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; import org.junit.Test; -import rx.Observable; -import rx.Observer; +import rx.*; import rx.functions.Func1; -import java.util.Arrays; - public class OperatorAllTest { @Test @@ -113,4 +111,21 @@ public Boolean call(Integer i) { }); assertFalse(allOdd.toBlocking().first()); } + @Test(timeout = 5000) + public void testIssue1935NoUnsubscribeDownstream() { + Observable source = Observable.just(1) + .all(new Func1() { + @Override + public Boolean call(Object t1) { + return false; + } + }) + .flatMap(new Func1>() { + @Override + public Observable call(Boolean t1) { + return Observable.just(2).delay(500, TimeUnit.MILLISECONDS); + } + }); + assertEquals((Object)2, source.toBlocking().first()); + } }