Skip to content

Commit

Permalink
1.x: fix using() resource cleanup when factory throws or being non-eager
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed May 10, 2016
1 parent 1512c10 commit 20c9c91
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 37 deletions.
55 changes: 37 additions & 18 deletions src/main/java/rx/internal/operators/OnSubscribeUsing.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package rx.internal.operators;

import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;

import rx.*;
Expand All @@ -26,6 +25,9 @@

/**
* Constructs an observable sequence that depends on a resource object.
*
* @param <T> the output value type
* @param <Resource> the resource type
*/
public final class OnSubscribeUsing<T, Resource> implements OnSubscribe<T> {

Expand Down Expand Up @@ -56,26 +58,46 @@ public void call(final Subscriber<? super T> subscriber) {
// dispose on unsubscription
subscriber.add(disposeOnceOnly);
// create the observable
final Observable<? extends T> source = observableFactory
// create the observable
.call(resource);
final Observable<? extends T> source;

try {
source = observableFactory
// create the observable
.call(resource);
} catch (Throwable e) {
Throwable disposeError = dispose(disposeOnceOnly);
Exceptions.throwIfFatal(e);
Exceptions.throwIfFatal(disposeError);
if (disposeError != null) {
subscriber.onError(new CompositeException(e, disposeError));
} else {
// propagate error
subscriber.onError(e);
}
return;
}

final Observable<? extends T> observable;
// supplement with on termination disposal if requested
if (disposeEagerly)
if (disposeEagerly) {
observable = source
// dispose on completion or error
.doOnTerminate(disposeOnceOnly);
else
observable = source;
} else {
observable = source
// dispose after the terminal signals were sent out
.doAfterTerminate(disposeOnceOnly);
}

try {
// start
observable.unsafeSubscribe(Subscribers.wrap(subscriber));
} catch (Throwable e) {
Throwable disposeError = disposeEagerlyIfRequested(disposeOnceOnly);
Throwable disposeError = dispose(disposeOnceOnly);
Exceptions.throwIfFatal(e);
Exceptions.throwIfFatal(disposeError);
if (disposeError != null)
subscriber.onError(new CompositeException(Arrays.asList(e, disposeError)));
subscriber.onError(new CompositeException(e, disposeError));
else
// propagate error
subscriber.onError(e);
Expand All @@ -86,16 +108,13 @@ public void call(final Subscriber<? super T> subscriber) {
}
}

private Throwable disposeEagerlyIfRequested(final Action0 disposeOnceOnly) {
if (disposeEagerly)
try {
disposeOnceOnly.call();
return null;
} catch (Throwable e) {
return e;
}
else
private Throwable dispose(final Action0 disposeOnceOnly) {
try {
disposeOnceOnly.call();
return null;
} catch (Throwable e) {
return e;
}
}

private static final class DisposeAction<Resource> extends AtomicBoolean implements Action0,
Expand Down
98 changes: 79 additions & 19 deletions src/test/java/rx/internal/operators/OnSubscribeUsingTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,22 @@
*/
package rx.internal.operators;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.junit.Test;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.*;
import org.mockito.InOrder;

import rx.*;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.exceptions.TestException;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.*;
import rx.observers.TestSubscriber;
import rx.subscriptions.Subscriptions;

public class OnSubscribeUsingTest {
Expand Down Expand Up @@ -432,4 +423,73 @@ public void call() {
};
}

@Test
public void factoryThrows() {

TestSubscriber<Integer> ts = TestSubscriber.create();

final AtomicInteger count = new AtomicInteger();

Observable.<Integer, Integer>using(
new Func0<Integer>() {
@Override
public Integer call() {
return 1;
}
},
new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer v) {
throw new TestException("forced failure");
}
},
new Action1<Integer>() {
@Override
public void call(Integer c) {
count.incrementAndGet();
}
}
)
.unsafeSubscribe(ts);

ts.assertError(TestException.class);

Assert.assertEquals(1, count.get());
}

@Test
public void nonEagerTermination() {

TestSubscriber<Integer> ts = TestSubscriber.create();

final AtomicInteger count = new AtomicInteger();

Observable.<Integer, Integer>using(
new Func0<Integer>() {
@Override
public Integer call() {
return 1;
}
},
new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer v) {
return Observable.just(v);
}
},
new Action1<Integer>() {
@Override
public void call(Integer c) {
count.incrementAndGet();
}
}, false
)
.unsafeSubscribe(ts);

ts.assertValue(1);
ts.assertNoErrors();
ts.assertCompleted();

Assert.assertEquals(1, count.get());
}
}

0 comments on commit 20c9c91

Please sign in to comment.