From 63b67d5de211d27a76071e50a7ad6c89cb97b90e Mon Sep 17 00:00:00 2001 From: Dave Moten Date: Thu, 11 Jun 2015 11:16:16 +1000 Subject: [PATCH] fix awaitTerminalEventAndUnsubscribeOnTimeout --- .../java/rx/observers/TestSubscriber.java | 10 ++++-- .../java/rx/observers/TestSubscriberTest.java | 34 ++++++++++++++++++- 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/src/main/java/rx/observers/TestSubscriber.java b/src/main/java/rx/observers/TestSubscriber.java index 284002d452..e963b14cb4 100644 --- a/src/main/java/rx/observers/TestSubscriber.java +++ b/src/main/java/rx/observers/TestSubscriber.java @@ -281,7 +281,7 @@ public void awaitTerminalEvent(long timeout, TimeUnit unit) { * Blocks until this {@link Subscriber} receives a notification that the {@code Observable} is complete * (either an {@code onCompleted} or {@code onError} notification), or until a timeout expires; if the * Subscriber is interrupted before either of these events take place, this method unsubscribes the - * Subscriber from the Observable). + * Subscriber from the Observable). If timeout expires then the Subscriber is unsubscribed from the Observable. * * @param timeout * the duration of the timeout @@ -290,8 +290,12 @@ public void awaitTerminalEvent(long timeout, TimeUnit unit) { */ public void awaitTerminalEventAndUnsubscribeOnTimeout(long timeout, TimeUnit unit) { try { - awaitTerminalEvent(timeout, unit); - } catch (RuntimeException e) { + boolean result = latch.await(timeout, unit); + if (!result) { + // timeout occurred + unsubscribe(); + } + } catch (InterruptedException e) { unsubscribe(); } } diff --git a/src/test/java/rx/observers/TestSubscriberTest.java b/src/test/java/rx/observers/TestSubscriberTest.java index c07c261f77..75d59fc1f8 100644 --- a/src/test/java/rx/observers/TestSubscriberTest.java +++ b/src/test/java/rx/observers/TestSubscriberTest.java @@ -16,12 +16,16 @@ package rx.observers; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -29,6 +33,7 @@ import rx.Observable; import rx.Observer; +import rx.functions.Action0; import rx.subjects.PublishSubject; public class TestSubscriberTest { @@ -124,8 +129,35 @@ public void testWrappingMockWhenUnsubscribeInvolved() { @Test public void testAssertError() { RuntimeException e = new RuntimeException("Oops"); - TestSubscriber subscriber = new TestSubscriber(); + TestSubscriber subscriber = new TestSubscriber(); Observable.error(e).subscribe(subscriber); subscriber.assertError(e); } + + @Test + public void testAwaitTerminalEventWithDuration() { + TestSubscriber ts = new TestSubscriber(); + Observable.just(1).subscribe(ts); + ts.awaitTerminalEvent(1, TimeUnit.SECONDS); + ts.assertTerminalEvent(); + } + + @Test + public void testAwaitTerminalEventWithDurationAndUnsubscribeOnTimeout() { + TestSubscriber ts = new TestSubscriber(); + final AtomicBoolean unsub = new AtomicBoolean(false); + Observable.just(1) + // + .doOnUnsubscribe(new Action0() { + @Override + public void call() { + unsub.set(true); + } + }) + // + .delay(1000, TimeUnit.MILLISECONDS).subscribe(ts); + ts.awaitTerminalEventAndUnsubscribeOnTimeout(100, TimeUnit.MILLISECONDS); + assertTrue(unsub.get()); + } + }