Skip to content

Commit

Permalink
Add Single.fromCallable()
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-zinnatullin committed Oct 8, 2015
1 parent 29ce486 commit c749075
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 0 deletions.
38 changes: 38 additions & 0 deletions src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package rx;

import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -604,6 +605,43 @@ public final static <T> Single<T> from(Future<? extends T> future, Scheduler sch
return new Single<T>(OnSubscribeToObservableFuture.toObservableFuture(future)).subscribeOn(scheduler);
}

/**
* Returns a {@link Single} that invokes passed function and emits its result for each new Observer that subscribes.
* <p>
* Allows you to defer execution of passed function until Observer subscribes to the {@link Single}.
* It makes passed function "lazy".
* Result of the function invocation will be emitted by the {@link Single}.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromCallable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param func
* function which execution should be deferred, it will be invoked when Observer will subscribe to the {@link Single}.
* @param <T>
* the type of the item emitted by the {@link Single}.
* @return a {@link Single} whose {@link Observer}s' subscriptions trigger an invocation of the given function.
*/
@Experimental
public static <T> Single<T> fromCallable(final Callable<? extends T> func) {
return create(new OnSubscribe<T>() {
@Override
public void call(SingleSubscriber<? super T> singleSubscriber) {
final T value;

try {
value = func.call();
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
singleSubscriber.onError(t);
return;
}

singleSubscriber.onSuccess(value);
}
});
}

/**
* Returns a {@code Single} that emits a specified item.
* <p>
Expand Down
43 changes: 43 additions & 0 deletions src/test/java/rx/SingleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -461,4 +466,42 @@ public void testToObservable() {
ts.assertValue("a");
ts.assertCompleted();
}

@Test
public void shouldEmitValueFromCallable() throws Exception {
Callable<String> callable = mock(Callable.class);

when(callable.call()).thenReturn("value");

TestSubscriber<String> testSubscriber = new TestSubscriber<String>();

Single
.fromCallable(callable)
.subscribe(testSubscriber);

testSubscriber.assertValue("value");
testSubscriber.assertNoErrors();

verify(callable).call();
}

@Test
public void shouldPassErrorFromCallable() throws Exception {
Callable<String> callable = mock(Callable.class);

Throwable error = new IllegalStateException();

when(callable.call()).thenThrow(error);

TestSubscriber<String> testSubscriber = new TestSubscriber<String>();

Single
.fromCallable(callable)
.subscribe(testSubscriber);

testSubscriber.assertNoValues();
testSubscriber.assertError(error);

verify(callable).call();
}
}

0 comments on commit c749075

Please sign in to comment.