Skip to content

Commit

Permalink
Merge pull request #1003 from rickbw/callable-func0
Browse files Browse the repository at this point in the history
Func0 can transparently implement java.util.concurrent.Callable.
  • Loading branch information
benjchristensen committed Mar 31, 2014
2 parents 2180f39 + 59ded83 commit 8e242a4
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1596,9 +1596,13 @@ public static <R> Observable<R> fromAction(Action0 action, R result) {
* @see #start(rx.functions.Func0)
* @see #fromCallable(java.util.concurrent.Callable)
* @see <a href="https://github.com/Netflix/RxJava/wiki/Async-Operators#fromfunc0">RxJava Wiki: fromFunc0()</a>
*
* @deprecated Unnecessary now that Func0 extends Callable. Just call
* {@link #fromCallable(Callable)} instead.
*/
@Deprecated
public static <R> Observable<R> fromFunc0(Func0<? extends R> function) {
return fromFunc0(function, Schedulers.computation());
return fromCallable(function);
}

/**
Expand Down Expand Up @@ -1674,9 +1678,13 @@ public static <R> Observable<R> fromAction(Action0 action, R result, Scheduler s
* @see #start(rx.functions.Func0)
* @see #fromCallable(java.util.concurrent.Callable)
* @see <a href="https://github.com/Netflix/RxJava/wiki/Async-Operators#fromfunc0">RxJava Wiki: fromFunc0()</a>
*
* @deprecated Unnecessary now that Func0 extends Callable. Just call
* {@link #fromCallable(Callable, Scheduler)} instead.
*/
@Deprecated
public static <R> Observable<R> fromFunc0(Func0<? extends R> function, Scheduler scheduler) {
return Observable.create(OperationFromFunctionals.fromFunc0(function)).subscribeOn(scheduler);
return fromCallable(function, scheduler);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,28 @@ public final class OperationFromFunctionals {
public static <R> OnSubscribeFunc<R> fromAction(Action0 action, R result) {
return new InvokeAsync<R>(Actions.toFunc(action, result));
}

/** Subscriber function that invokes a function and returns its value. */

/**
* Subscriber function that invokes a function and returns its value.
*
* @deprecated Unnecessary now that Func0 extends Callable. Just call
* {@link #fromCallable(Callable)} instead.
*/
@Deprecated
public static <R> OnSubscribeFunc<R> fromFunc0(Func0<? extends R> function) {
return new InvokeAsync<R>(function);
return fromCallable(function);
}

/**
* Subscriber function that invokes the callable and returns its value or
* propagates its checked exception.
*/
public static <R> OnSubscribeFunc<R> fromCallable(Callable<? extends R> callable) {
return new InvokeAsyncCallable<R>(callable);
return new InvokeAsync<R>(callable);
}
/** Subscriber function that invokes a runnable and returns the given result. */
public static <R> OnSubscribeFunc<R> fromRunnable(final Runnable run, final R result) {
return new InvokeAsync(new Func0<R>() {
return new InvokeAsync<R>(new Func0<R>() {
@Override
public R call() {
run.run();
Expand All @@ -62,38 +68,13 @@ public R call() {
});
}

/**
* Invokes a function when an observer subscribes.
* @param <R> the return type
*/
static final class InvokeAsync<R> implements OnSubscribeFunc<R> {
final Func0<? extends R> function;
public InvokeAsync(Func0<? extends R> function) {
if (function == null) {
throw new NullPointerException("function");
}
this.function = function;
}
@Override
public Subscription onSubscribe(Observer<? super R> t1) {
Subscription s = Subscriptions.empty();
try {
t1.onNext(function.call());
} catch (Throwable t) {
t1.onError(t);
return s;
}
t1.onCompleted();
return s;
}
}
/**
* Invokes a java.util.concurrent.Callable when an observer subscribes.
* @param <R> the return type
*/
static final class InvokeAsyncCallable<R> implements OnSubscribeFunc<R> {
static final class InvokeAsync<R> implements OnSubscribeFunc<R> {
final Callable<? extends R> callable;
public InvokeAsyncCallable(Callable<? extends R> callable) {
public InvokeAsync(Callable<? extends R> callable) {
if (callable == null) {
throw new NullPointerException("function");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ public void call() {

testRunShouldThrow(source, RuntimeException.class);
}

/**
* @deprecated {@link Func0} now extends {@link Callable}, so
* {@link Async#fromFunc0(Func0)} is unnecessary. Once it's
* removed, this test can be removed as well.
*/
@Deprecated
@Test
public void testFromFunc0() {
Func0<Integer> func = new Func0<Integer>() {
Expand Down Expand Up @@ -139,7 +146,14 @@ public Integer call() {
verify(observer, never()).onError(any(Throwable.class));
}
}


/**
* @deprecated {@link Func0} now extends {@link Callable}, so
* {@link Async#fromFunc0(Func0, rx.Scheduler)} is
* unnecessary. Once it's removed, this test can be removed
* as well.
*/
@Deprecated
@Test
public void testFromFunc0Throws() {
Func0<Integer> func = new Func0<Integer>() {
Expand Down
7 changes: 5 additions & 2 deletions rxjava-core/src/main/java/rx/functions/Func0.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
package rx.functions;

public interface Func0<R> extends Function {
import java.util.concurrent.Callable;

public interface Func0<R> extends Function, Callable<R> {
@Override
public R call();
}
}

0 comments on commit 8e242a4

Please sign in to comment.