From bbd4c3782f9af25e03673a9f7ee0168f1cba4205 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 24 Mar 2015 16:38:28 +0100 Subject: [PATCH] Fix for issue 2844: wrong target of request on repeat --- .../internal/operators/OnSubscribeRedo.java | 2 ++ .../operators/OperatorRepeatTest.java | 27 ++++++++++++++++++- 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/src/main/java/rx/internal/operators/OnSubscribeRedo.java b/src/main/java/rx/internal/operators/OnSubscribeRedo.java index 65fcb3eb92..00553eebb9 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeRedo.java +++ b/src/main/java/rx/internal/operators/OnSubscribeRedo.java @@ -213,12 +213,14 @@ public void call() { Subscriber terminalDelegatingSubscriber = new Subscriber() { @Override public void onCompleted() { + currentProducer.set(null); unsubscribe(); terminals.onNext(Notification.createOnCompleted()); } @Override public void onError(Throwable e) { + currentProducer.set(null); unsubscribe(); terminals.onNext(Notification.createOnError(e)); } diff --git a/src/test/java/rx/internal/operators/OperatorRepeatTest.java b/src/test/java/rx/internal/operators/OperatorRepeatTest.java index d8653a14e6..44371867c5 100644 --- a/src/test/java/rx/internal/operators/OperatorRepeatTest.java +++ b/src/test/java/rx/internal/operators/OperatorRepeatTest.java @@ -20,7 +20,8 @@ import static org.mockito.Matchers.any; import static org.mockito.Mockito.*; -import java.util.Arrays; +import java.util.*; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; @@ -174,4 +175,28 @@ public void testRepeatAndDistinctUnbounded() { ts.assertTerminalEvent(); ts.assertReceivedOnNext(Arrays.asList(1, 2, 3)); } + /** Issue #2844: wrong target of request. */ + @Test(timeout = 3000) + public void testRepeatRetarget() { + final List concatBase = new ArrayList(); + TestSubscriber ts = new TestSubscriber(); + Observable.just(1, 2) + .repeat(5) + .concatMap(new Func1>() { + @Override + public Observable call(Integer x) { + System.out.println("testRepeatRetarget -> " + x); + concatBase.add(x); + return Observable.empty() + .delay(200, TimeUnit.MILLISECONDS); + } + }) + .subscribe(ts); + + ts.awaitTerminalEvent(); + ts.assertNoErrors(); + ts.assertReceivedOnNext(Collections.emptyList()); + + assertEquals(Arrays.asList(1, 2, 1, 2, 1, 2, 1, 2, 1, 2), concatBase); + } }