From 20c9c91daaa2d66aae260010cb5cf38371ca2e07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Tue, 10 May 2016 22:46:27 +0200 Subject: [PATCH] 1.x: fix using() resource cleanup when factory throws or being non-eager --- .../internal/operators/OnSubscribeUsing.java | 55 +++++++---- .../operators/OnSubscribeUsingTest.java | 98 +++++++++++++++---- 2 files changed, 116 insertions(+), 37 deletions(-) diff --git a/src/main/java/rx/internal/operators/OnSubscribeUsing.java b/src/main/java/rx/internal/operators/OnSubscribeUsing.java index 4dd483b4cc..352c699056 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeUsing.java +++ b/src/main/java/rx/internal/operators/OnSubscribeUsing.java @@ -15,7 +15,6 @@ */ package rx.internal.operators; -import java.util.Arrays; import java.util.concurrent.atomic.AtomicBoolean; import rx.*; @@ -26,6 +25,9 @@ /** * Constructs an observable sequence that depends on a resource object. + * + * @param the output value type + * @param the resource type */ public final class OnSubscribeUsing implements OnSubscribe { @@ -56,26 +58,46 @@ public void call(final Subscriber subscriber) { // dispose on unsubscription subscriber.add(disposeOnceOnly); // create the observable - final Observable source = observableFactory - // create the observable - .call(resource); + final Observable source; + + try { + source = observableFactory + // create the observable + .call(resource); + } catch (Throwable e) { + Throwable disposeError = dispose(disposeOnceOnly); + Exceptions.throwIfFatal(e); + Exceptions.throwIfFatal(disposeError); + if (disposeError != null) { + subscriber.onError(new CompositeException(e, disposeError)); + } else { + // propagate error + subscriber.onError(e); + } + return; + } + final Observable observable; // supplement with on termination disposal if requested - if (disposeEagerly) + if (disposeEagerly) { observable = source // dispose on completion or error .doOnTerminate(disposeOnceOnly); - else - observable = source; + } else { + observable = source + // dispose after the terminal signals were sent out + .doAfterTerminate(disposeOnceOnly); + } + try { // start observable.unsafeSubscribe(Subscribers.wrap(subscriber)); } catch (Throwable e) { - Throwable disposeError = disposeEagerlyIfRequested(disposeOnceOnly); + Throwable disposeError = dispose(disposeOnceOnly); Exceptions.throwIfFatal(e); Exceptions.throwIfFatal(disposeError); if (disposeError != null) - subscriber.onError(new CompositeException(Arrays.asList(e, disposeError))); + subscriber.onError(new CompositeException(e, disposeError)); else // propagate error subscriber.onError(e); @@ -86,16 +108,13 @@ public void call(final Subscriber subscriber) { } } - private Throwable disposeEagerlyIfRequested(final Action0 disposeOnceOnly) { - if (disposeEagerly) - try { - disposeOnceOnly.call(); - return null; - } catch (Throwable e) { - return e; - } - else + private Throwable dispose(final Action0 disposeOnceOnly) { + try { + disposeOnceOnly.call(); return null; + } catch (Throwable e) { + return e; + } } private static final class DisposeAction extends AtomicBoolean implements Action0, diff --git a/src/test/java/rx/internal/operators/OnSubscribeUsingTest.java b/src/test/java/rx/internal/operators/OnSubscribeUsingTest.java index a68605dd8c..47b2338bbc 100644 --- a/src/test/java/rx/internal/operators/OnSubscribeUsingTest.java +++ b/src/test/java/rx/internal/operators/OnSubscribeUsingTest.java @@ -15,31 +15,22 @@ */ package rx.internal.operators; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import org.junit.Test; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.*; import org.mockito.InOrder; +import rx.*; import rx.Observable; import rx.Observable.OnSubscribe; import rx.Observer; -import rx.Subscriber; -import rx.Subscription; import rx.exceptions.TestException; -import rx.functions.Action0; -import rx.functions.Action1; -import rx.functions.Func0; -import rx.functions.Func1; +import rx.functions.*; +import rx.observers.TestSubscriber; import rx.subscriptions.Subscriptions; public class OnSubscribeUsingTest { @@ -432,4 +423,73 @@ public void call() { }; } + @Test + public void factoryThrows() { + + TestSubscriber ts = TestSubscriber.create(); + + final AtomicInteger count = new AtomicInteger(); + + Observable.using( + new Func0() { + @Override + public Integer call() { + return 1; + } + }, + new Func1>() { + @Override + public Observable call(Integer v) { + throw new TestException("forced failure"); + } + }, + new Action1() { + @Override + public void call(Integer c) { + count.incrementAndGet(); + } + } + ) + .unsafeSubscribe(ts); + + ts.assertError(TestException.class); + + Assert.assertEquals(1, count.get()); + } + + @Test + public void nonEagerTermination() { + + TestSubscriber ts = TestSubscriber.create(); + + final AtomicInteger count = new AtomicInteger(); + + Observable.using( + new Func0() { + @Override + public Integer call() { + return 1; + } + }, + new Func1>() { + @Override + public Observable call(Integer v) { + return Observable.just(v); + } + }, + new Action1() { + @Override + public void call(Integer c) { + count.incrementAndGet(); + } + }, false + ) + .unsafeSubscribe(ts); + + ts.assertValue(1); + ts.assertNoErrors(); + ts.assertCompleted(); + + Assert.assertEquals(1, count.get()); + } }