diff --git a/rxjava-contrib/rxjava-async-util/build.gradle b/rxjava-contrib/rxjava-async-util/build.gradle new file mode 100644 index 0000000000..09d9aae655 --- /dev/null +++ b/rxjava-contrib/rxjava-async-util/build.gradle @@ -0,0 +1,20 @@ +apply plugin: 'osgi' + +sourceCompatibility = JavaVersion.VERSION_1_6 +targetCompatibility = JavaVersion.VERSION_1_6 + +dependencies { + compile project(':rxjava-core') + testCompile project(":rxjava-core").sourceSets.test.output + provided 'junit:junit-dep:4.10' + provided 'org.mockito:mockito-core:1.8.5' +} + +jar { + manifest { + name = 'rxjava-async-util' + instruction 'Bundle-Vendor', 'Netflix' + instruction 'Bundle-DocURL', 'https://github.com/Netflix/RxJava' + instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,*' + } +} diff --git a/rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/Async.java b/rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/Async.java new file mode 100644 index 0000000000..dbbafbd50f --- /dev/null +++ b/rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/Async.java @@ -0,0 +1,1681 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.util.async; + +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import rx.Observable; +import rx.Scheduler; +import rx.schedulers.Schedulers; +import rx.subjects.AsyncSubject; +import rx.util.async.operators.Functionals; +import rx.util.async.operators.OperationDeferFuture; +import rx.util.async.operators.OperationForEachFuture; +import rx.util.async.operators.OperationFromFunctionals; +import rx.util.async.operators.OperationStartFuture; +import rx.util.functions.Action0; +import rx.util.functions.Action1; +import rx.util.functions.Action2; +import rx.util.functions.Action3; +import rx.util.functions.Action4; +import rx.util.functions.Action5; +import rx.util.functions.Action6; +import rx.util.functions.Action7; +import rx.util.functions.Action8; +import rx.util.functions.Action9; +import rx.util.functions.ActionN; +import rx.util.functions.Actions; +import rx.util.functions.Func0; +import rx.util.functions.Func1; +import rx.util.functions.Func2; +import rx.util.functions.Func3; +import rx.util.functions.Func4; +import rx.util.functions.Func5; +import rx.util.functions.Func6; +import rx.util.functions.Func7; +import rx.util.functions.Func8; +import rx.util.functions.Func9; +import rx.util.functions.FuncN; + +/** + * Utility methods to convert functions and actions into asynchronous operations + * through the Observable/Observer pattern. + */ +public final class Async { + + private Async() { + throw new IllegalStateException("No instances!"); + } + + /** + * Invokes the specified function asynchronously and returns an Observable + * that emits the result. + *

+ * Note: The function is called immediately and once, not whenever an + * observer subscribes to the resulting Observable. Multiple subscriptions + * to this Observable observe the same return value. + *

+ * + * + * @param the result value type + * @param func function to run asynchronously + * @return an Observable that emits the function's result value, or notifies + * observers of an exception + * @see RxJava + * Wiki: start() + * @see MSDN: + * Observable.Start + */ + public static Observable start(Func0 func) { + return Async.toAsync(func).call(); + } + + /** + * Invokes the specified function asynchronously on the specified scheduler + * and returns an Observable that emits the result. + *

+ * Note: The function is called immediately and once, not whenever an + * observer subscribes to the resulting Observable. Multiple subscriptions + * to this Observable observe the same return value. + *

+ * + * + * @param the result value type + * @param func function to run asynchronously + * @param scheduler scheduler to run the function on + * @return an Observable that emits the function's result value, or notifies + * observers of an exception + * @see RxJava + * Wiki: start() + * @see MSDN: + * Observable.Start + */ + public static Observable start(Func0 func, Scheduler scheduler) { + return Async.toAsync(func, scheduler).call(); + } + + /** + * Convert a synchronous action call into an asynchronous function call + * through an Observable sequence. + * + * @param action the action to convert + * + * @return a function which returns an observable sequence which executes + * the {@code action} and emits {@code null}. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func0> toAsync(Action0 action) { + return toAsync(action, Schedulers.threadPoolForComputation()); + } + + /** + * Convert a synchronous function call into an asynchronous function call + * through an Observable sequence. + * + * @param the result value type + * @param func the function to convert + * + * @return a function which returns an observable sequence which executes + * the {@code func} and emits its returned value. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func0> toAsync(Func0 func) { + return toAsync(func, Schedulers.threadPoolForComputation()); + } + + /** + * Convert a synchronous action call into an asynchronous function call + * through an Observable sequence. + * + * @param first parameter type of the action + * @param action the action to convert + * + * @return a function which returns an observable sequence which executes + * the {@code action} and emits {@code null}. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func1> toAsync(Action1 action) { + return toAsync(action, Schedulers.threadPoolForComputation()); + } + + /** + * Convert a synchronous function call into an asynchronous function call + * through an Observable sequence. + * + * @param first parameter type of the action + * @param the result type + * @param func the function to convert + * + * @return a function which returns an observable sequence which executes + * the {@code func} and emits its returned value. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func1> toAsync(Func1 func) { + return toAsync(func, Schedulers.threadPoolForComputation()); + } + + /** + * Convert a synchronous action call into an asynchronous function call + * through an Observable sequence. + * + * @param the first parameter type + * @param the second parameter type + * @param action the action to convert + * + * @return a function which returns an observable sequence which executes + * the {@code action} and emits {@code null}. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func2> toAsync(Action2 action) { + return toAsync(action, Schedulers.threadPoolForComputation()); + } + + /** + * Convert a synchronous function call into an asynchronous function call + * through an Observable sequence. + * + * @param the first parameter type + * @param the second parameter type + * @param the result type + * @param func the function to convert + * + * @return a function which returns an observable sequence which executes + * the {@code func} and emits its returned value. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func2> toAsync(Func2 func) { + return toAsync(func, Schedulers.threadPoolForComputation()); + } + + /** + * Convert a synchronous action call into an asynchronous function call + * through an Observable sequence. + * + * @param the first parameter type + * @param the second parameter type + * @param the third parameter type + * @param action the action to convert + * + * @return a function which returns an observable sequence which executes + * the {@code action} and emits {@code null}. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func3> toAsync(Action3 action) { + return toAsync(action, Schedulers.threadPoolForComputation()); + } + + /** + * Convert a synchronous function call into an asynchronous function call + * through an Observable sequence. + * + * @param the first parameter type + * @param the second parameter type + * @param the third parameter type + * @param the result type + * @param func the function to convert + * + * @return a function which returns an observable sequence which executes + * the {@code func} and emits its returned value. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func3> toAsync(Func3 func) { + return toAsync(func, Schedulers.threadPoolForComputation()); + } + + /** + * Convert a synchronous action call into an asynchronous function call + * through an Observable sequence. + * + * @param the first parameter type + * @param the second parameter type + * @param the third parameter type + * @param the fourth parameter type + * @param action the action to convert + * + * @return a function which returns an observable sequence which executes + * the {@code action} and emits {@code null}. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func4> toAsync(Action4 action) { + return toAsync(action, Schedulers.threadPoolForComputation()); + } + + /** + * Convert a synchronous function call into an asynchronous function call + * through an Observable sequence. + * + * @param the first parameter type + * @param the second parameter type + * @param the third parameter type + * @param the fourth parameter type + * @param the result type + * @param func the function to convert + * + * @return a function which returns an observable sequence which executes + * the {@code func} and emits its returned value. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func4> toAsync(Func4 func) { + return toAsync(func, Schedulers.threadPoolForComputation()); + } + + /** + * Convert a synchronous action call into an asynchronous function call + * through an Observable sequence. + * + * @param the first parameter type + * @param the second parameter type + * @param the third parameter type + * @param the fourth parameter type + * @param the fifth parameter type + * @param action the action to convert + * + * @return a function which returns an observable sequence which executes + * the {@code action} and emits {@code null}. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func5> toAsync(Action5 action) { + return toAsync(action, Schedulers.threadPoolForComputation()); + } + + /** + * Convert a synchronous function call into an asynchronous function call + * through an Observable sequence. + * + * @param the first parameter type + * @param the second parameter type + * @param the third parameter type + * @param the fourth parameter type + * @param the fifth parameter type + * @param the result type + * @param func the function to convert + * + * @return a function which returns an observable sequence which executes + * the {@code func} and emits its returned value. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func5> toAsync(Func5 func) { + return toAsync(func, Schedulers.threadPoolForComputation()); + } + + /** + * Convert a synchronous action call into an asynchronous function call + * through an Observable sequence. + * + * @param the first parameter type + * @param the second parameter type + * @param the third parameter type + * @param the fourth parameter type + * @param the fifth parameter type + * @param the sixth parameter type + * @param action the action to convert + * + * @return a function which returns an observable sequence which executes + * the {@code action} and emits {@code null}. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func6> toAsync(Action6 action) { + return toAsync(action, Schedulers.threadPoolForComputation()); + } + + /** + * Convert a synchronous function call into an asynchronous function call + * through an Observable sequence. + * + * @param the first parameter type + * @param the second parameter type + * @param the third parameter type + * @param the fourth parameter type + * @param the fifth parameter type + * @param the sixth parameter type + * @param the result type + * @param func the function to convert + * + * @return a function which returns an observable sequence which executes + * the {@code func} and emits its returned value. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func6> toAsync(Func6 func) { + return toAsync(func, Schedulers.threadPoolForComputation()); + } + + /** + * Convert a synchronous action call into an asynchronous function call + * through an Observable sequence. + * + * @param the first parameter type + * @param the second parameter type + * @param the third parameter type + * @param the fourth parameter type + * @param the fifth parameter type + * @param the sixth parameter type + * @param the seventh parameter type + * @param action the action to convert + * + * @return a function which returns an observable sequence which executes + * the {@code action} and emits {@code null}. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func7> toAsync(Action7 action) { + return toAsync(action, Schedulers.threadPoolForComputation()); + } + + /** + * Convert a synchronous function call into an asynchronous function call + * through an Observable sequence. + * + * @param the first parameter type + * @param the second parameter type + * @param the third parameter type + * @param the fourth parameter type + * @param the fifth parameter type + * @param the sixth parameter type + * @param the seventh parameter type + * @param the result type + * @param func the function to convert + * + * @return a function which returns an observable sequence which executes + * the {@code func} and emits its returned value. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func7> toAsync(Func7 func) { + return toAsync(func, Schedulers.threadPoolForComputation()); + } + + /** + * Convert a synchronous action call into an asynchronous function call + * through an Observable sequence. + * + * @param the first parameter type + * @param the second parameter type + * @param the third parameter type + * @param the fourth parameter type + * @param the fifth parameter type + * @param the sixth parameter type + * @param the seventh parameter type + * @param the eighth parameter type + * @param action the action to convert + * + * @return a function which returns an observable sequence which executes + * the {@code action} and emits {@code null}. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func8> toAsync(Action8 action) { + return toAsync(action, Schedulers.threadPoolForComputation()); + } + + /** + * Convert a synchronous function call into an asynchronous function call + * through an Observable sequence. + * + * @param the first parameter type + * @param the second parameter type + * @param the third parameter type + * @param the fourth parameter type + * @param the fifth parameter type + * @param the sixth parameter type + * @param the seventh parameter type + * @param the eighth parameter type + * @param the result type + * @param func the function to convert + * + * @return a function which returns an observable sequence which executes + * the {@code func} and emits its returned value. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func8> toAsync(Func8 func) { + return toAsync(func, Schedulers.threadPoolForComputation()); + } + + /** + * Convert a synchronous action call into an asynchronous function call + * through an Observable sequence. + * + * @param the first parameter type + * @param the second parameter type + * @param the third parameter type + * @param the fourth parameter type + * @param the fifth parameter type + * @param the sixth parameter type + * @param the seventh parameter type + * @param the eighth parameter type + * @param the ninth parameter type + * @param action the action to convert + * + * @return a function which returns an observable sequence which executes + * the {@code action} and emits {@code null}. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func9> toAsync(Action9 action) { + return toAsync(action, Schedulers.threadPoolForComputation()); + } + + /** + * Convert a synchronous function call into an asynchronous function call + * through an Observable sequence. + * + * @param the first parameter type + * @param the second parameter type + * @param the third parameter type + * @param the fourth parameter type + * @param the fifth parameter type + * @param the sixth parameter type + * @param the seventh parameter type + * @param the eighth parameter type + * @param the ninth parameter type + * @param the result type + * @param func the function to convert + * + * @return a function which returns an observable sequence which executes + * the {@code func} and emits its returned value. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func9> toAsync(Func9 func) { + return toAsync(func, Schedulers.threadPoolForComputation()); + } + + /** + * Convert a synchronous action call into an asynchronous function call + * through an Observable sequence. + * + * @param action the action to convert + * + * @return a function which returns an observable sequence which executes + * the {@code action} and emits {@code null}. + * + */ + public static FuncN> toAsync(ActionN action) { + return toAsync(action, Schedulers.threadPoolForComputation()); + } + + /** + * Convert a synchronous function call into an asynchronous function call + * through an Observable sequence. + * + * @param the result type + * @param func the function to convert + * + * @return a function which returns an observable sequence which executes + * the {@code func} and emits its returned value. + * + */ + public static FuncN> toAsync(FuncN func) { + return toAsync(func, Schedulers.threadPoolForComputation()); + } + + /** + * Convert a synchronous action call into an asynchronous function call + * through an Observable sequence. + * + * @param action the action to convert + * @param scheduler the scheduler used to execute the {@code action} + * + * @return a function which returns an observable sequence which executes + * the {@code action} and emits {@code null}. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func0> toAsync(final Action0 action, final Scheduler scheduler) { + return toAsync(Actions.toFunc(action), scheduler); + } + + /** + * Convert a synchronous function call into an asynchronous function call + * through an Observable sequence. + * + * @param the result type + * @param func the function to convert + * @param scheduler the scheduler used to call the {@code func} + * + * @return a function which returns an observable sequence which executes + * the {@code func} and emits its returned value. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func0> toAsync(final Func0 func, final Scheduler scheduler) { + return new Func0>() { + @Override + public Observable call() { + final AsyncSubject subject = AsyncSubject.create(); + scheduler.schedule(new Action0() { + @Override + public void call() { + R result; + try { + result = func.call(); + } catch (Throwable t) { + subject.onError(t); + return; + } + subject.onNext(result); + subject.onCompleted(); + } + }); + return subject; + } + }; + } + + /** + * Convert a synchronous action call into an asynchronous function call + * through an Observable sequence. + * + * @param the first parameter type + * @param action the action to convert + * @param scheduler the scheduler used to execute the {@code action} + * + * @return a function which returns an observable sequence which executes + * the {@code action} and emits {@code null}. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func1> toAsync(final Action1 action, final Scheduler scheduler) { + return toAsync(Actions.toFunc(action), scheduler); + } + + /** + * Convert a synchronous function call into an asynchronous function call + * through an Observable sequence. + * + * @param the first parameter type + * @param the result type + * @param func the function to convert + * @param scheduler the scheduler used to call the {@code func} + * + * @return a function which returns an observable sequence which executes + * the {@code func} and emits its returned value. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func1> toAsync(final Func1 func, final Scheduler scheduler) { + return new Func1>() { + @Override + public Observable call(final T1 t1) { + final AsyncSubject subject = AsyncSubject.create(); + scheduler.schedule(new Action0() { + @Override + public void call() { + R result; + try { + result = func.call(t1); + } catch (Throwable t) { + subject.onError(t); + return; + } + subject.onNext(result); + subject.onCompleted(); + } + }); + return subject; + } + }; + } + + /** + * Convert a synchronous action call into an asynchronous function call + * through an Observable sequence. + * + * @param the first parameter type + * @param the second parameter type + * @param action the action to convert + * @param scheduler the scheduler used to execute the {@code action} + * + * @return a function which returns an observable sequence which executes + * the {@code action} and emits {@code null}. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func2> toAsync(final Action2 action, final Scheduler scheduler) { + return toAsync(Actions.toFunc(action), scheduler); + } + + /** + * Convert a synchronous function call into an asynchronous function call + * through an Observable sequence. + * + * @param the first parameter type + * @param the second parameter type + * @param the result type + * @param func the function to convert + * @param scheduler the scheduler used to call the {@code func} + * + * @return a function which returns an observable sequence which executes + * the {@code func} and emits its returned value. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func2> toAsync(final Func2 func, final Scheduler scheduler) { + return new Func2>() { + @Override + public Observable call(final T1 t1, final T2 t2) { + final AsyncSubject subject = AsyncSubject.create(); + scheduler.schedule(new Action0() { + @Override + public void call() { + R result; + try { + result = func.call(t1, t2); + } catch (Throwable t) { + subject.onError(t); + return; + } + subject.onNext(result); + subject.onCompleted(); + } + }); + return subject; + } + }; + } + + /** + * Convert a synchronous action call into an asynchronous function call + * through an Observable sequence. + * + * @param the first parameter type + * @param the second parameter type + * @param the third parameter type + * @param action the action to convert + * @param scheduler the scheduler used to execute the {@code action} + * + * @return a function which returns an observable sequence which executes + * the {@code action} and emits {@code null}. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func3> toAsync(final Action3 action, final Scheduler scheduler) { + return toAsync(Actions.toFunc(action), scheduler); + } + + /** + * Convert a synchronous function call into an asynchronous function call + * through an Observable sequence. + * + * @param the first parameter type + * @param the second parameter type + * @param the third parameter type + * @param the result type + * @param func the function to convert + * @param scheduler the scheduler used to call the {@code func} + * + * @return a function which returns an observable sequence which executes + * the {@code func} and emits its returned value. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func3> toAsync(final Func3 func, final Scheduler scheduler) { + return new Func3>() { + @Override + public Observable call(final T1 t1, final T2 t2, final T3 t3) { + final AsyncSubject subject = AsyncSubject.create(); + scheduler.schedule(new Action0() { + @Override + public void call() { + R result; + try { + result = func.call(t1, t2, t3); + } catch (Throwable t) { + subject.onError(t); + return; + } + subject.onNext(result); + subject.onCompleted(); + } + }); + return subject; + } + }; + } + + /** + * Convert a synchronous action call into an asynchronous function call + * through an Observable sequence. + * + * @param the first parameter type + * @param the second parameter type + * @param the third parameter type + * @param the fourth parameter type + * @param action the action to convert + * @param scheduler the scheduler used to execute the {@code action} + * + * @return a function which returns an observable sequence which executes + * the {@code action} and emits {@code null}. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func4> toAsync(final Action4 action, final Scheduler scheduler) { + return toAsync(Actions.toFunc(action), scheduler); + } + + /** + * Convert a synchronous function call into an asynchronous function call + * through an Observable sequence. + * + * @param the first parameter type + * @param the second parameter type + * @param the third parameter type + * @param the fourth parameter type + * @param the result type + * @param func the function to convert + * @param scheduler the scheduler used to call the {@code func} + * + * @return a function which returns an observable sequence which executes + * the {@code func} and emits its returned value. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func4> toAsync(final Func4 func, final Scheduler scheduler) { + return new Func4>() { + @Override + public Observable call(final T1 t1, final T2 t2, final T3 t3, final T4 t4) { + final AsyncSubject subject = AsyncSubject.create(); + scheduler.schedule(new Action0() { + @Override + public void call() { + R result; + try { + result = func.call(t1, t2, t3, t4); + } catch (Throwable t) { + subject.onError(t); + return; + } + subject.onNext(result); + subject.onCompleted(); + } + }); + return subject; + } + }; + } + + /** + * Convert a synchronous action call into an asynchronous function call + * through an Observable sequence. + * + * @param the first parameter type + * @param the second parameter type + * @param the third parameter type + * @param the fourth parameter type + * @param the fifth parameter type + * @param action the action to convert + * @param scheduler the scheduler used to execute the {@code action} + * + * @return a function which returns an observable sequence which executes + * the {@code action} and emits {@code null}. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func5> toAsync(final Action5 action, final Scheduler scheduler) { + return toAsync(Actions.toFunc(action), scheduler); + } + + /** + * Convert a synchronous function call into an asynchronous function call + * through an Observable sequence. + * + * @param the first parameter type + * @param the second parameter type + * @param the third parameter type + * @param the fourth parameter type + * @param the fifth parameter type + * @param the result type + * @param func the function to convert + * @param scheduler the scheduler used to call the {@code func} + * + * @return a function which returns an observable sequence which executes + * the {@code func} and emits its returned value. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func5> toAsync(final Func5 func, final Scheduler scheduler) { + return new Func5>() { + @Override + public Observable call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5) { + final AsyncSubject subject = AsyncSubject.create(); + scheduler.schedule(new Action0() { + @Override + public void call() { + R result; + try { + result = func.call(t1, t2, t3, t4, t5); + } catch (Throwable t) { + subject.onError(t); + return; + } + subject.onNext(result); + subject.onCompleted(); + } + }); + return subject; + } + }; + } + + /** + * Convert a synchronous action call into an asynchronous function call + * through an Observable sequence. + * + * @param the first parameter type + * @param the second parameter type + * @param the third parameter type + * @param the fourth parameter type + * @param the fifth parameter type + * @param the sixth parameter type + * @param action the action to convert + * @param scheduler the scheduler used to execute the {@code action} + * + * @return a function which returns an observable sequence which executes + * the {@code action} and emits {@code null}. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func6> toAsync(final Action6 action, final Scheduler scheduler) { + return toAsync(Actions.toFunc(action), scheduler); + } + + /** + * Convert a synchronous function call into an asynchronous function call + * through an Observable sequence. + * + * @param the first parameter type + * @param the second parameter type + * @param the third parameter type + * @param the fourth parameter type + * @param the fifth parameter type + * @param the sixth parameter type + * @param the result type + * @param func the function to convert + * @param scheduler the scheduler used to call the {@code func} + * + * @return a function which returns an observable sequence which executes + * the {@code func} and emits its returned value. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func6> toAsync(final Func6 func, final Scheduler scheduler) { + return new Func6>() { + @Override + public Observable call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5, final T6 t6) { + final AsyncSubject subject = AsyncSubject.create(); + scheduler.schedule(new Action0() { + @Override + public void call() { + R result; + try { + result = func.call(t1, t2, t3, t4, t5, t6); + } catch (Throwable t) { + subject.onError(t); + return; + } + subject.onNext(result); + subject.onCompleted(); + } + }); + return subject; + } + }; + } + + /** + * Convert a synchronous action call into an asynchronous function call + * through an Observable sequence. + * + * @param the first parameter type + * @param the second parameter type + * @param the third parameter type + * @param the fourth parameter type + * @param the fifth parameter type + * @param the sixth parameter type + * @param the seventh parameter type + * @param action the action to convert + * @param scheduler the scheduler used to execute the {@code action} + * + * @return a function which returns an observable sequence which executes + * the {@code action} and emits {@code null}. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func7> toAsync(final Action7 action, final Scheduler scheduler) { + return toAsync(Actions.toFunc(action), scheduler); + } + + /** + * Convert a synchronous function call into an asynchronous function call + * through an Observable sequence. + * + * @param the first parameter type + * @param the second parameter type + * @param the third parameter type + * @param the fourth parameter type + * @param the fifth parameter type + * @param the sixth parameter type + * @param the seventh parameter type + * @param the result type + * @param func the function to convert + * @param scheduler the scheduler used to call the {@code func} + * + * @return a function which returns an observable sequence which executes + * the {@code func} and emits its returned value. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func7> toAsync(final Func7 func, final Scheduler scheduler) { + return new Func7>() { + @Override + public Observable call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5, final T6 t6, final T7 t7) { + final AsyncSubject subject = AsyncSubject.create(); + scheduler.schedule(new Action0() { + @Override + public void call() { + R result; + try { + result = func.call(t1, t2, t3, t4, t5, t6, t7); + } catch (Throwable t) { + subject.onError(t); + return; + } + subject.onNext(result); + subject.onCompleted(); + } + }); + return subject; + } + }; + } + + /** + * Convert a synchronous action call into an asynchronous function call + * through an Observable sequence. + * + * @param the first parameter type + * @param the second parameter type + * @param the third parameter type + * @param the fourth parameter type + * @param the fifth parameter type + * @param the sixth parameter type + * @param the seventh parameter type + * @param the eighth parameter type + * @param action the action to convert + * @param scheduler the scheduler used to execute the {@code action} + * + * @return a function which returns an observable sequence which executes + * the {@code action} and emits {@code null}. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func8> toAsync(final Action8 action, final Scheduler scheduler) { + return toAsync(Actions.toFunc(action), scheduler); + } + + /** + * Convert a synchronous function call into an asynchronous function call + * through an Observable sequence. + * + * @param the first parameter type + * @param the second parameter type + * @param the third parameter type + * @param the fourth parameter type + * @param the fifth parameter type + * @param the sixth parameter type + * @param the seventh parameter type + * @param the eighth parameter type + * @param the result type + * @param func the function to convert + * @param scheduler the scheduler used to call the {@code func} + * + * @return a function which returns an observable sequence which executes + * the {@code func} and emits its returned value. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func8> toAsync(final Func8 func, final Scheduler scheduler) { + return new Func8>() { + @Override + public Observable call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5, final T6 t6, final T7 t7, final T8 t8) { + final AsyncSubject subject = AsyncSubject.create(); + scheduler.schedule(new Action0() { + @Override + public void call() { + R result; + try { + result = func.call(t1, t2, t3, t4, t5, t6, t7, t8); + } catch (Throwable t) { + subject.onError(t); + return; + } + subject.onNext(result); + subject.onCompleted(); + } + }); + return subject; + } + }; + } + + /** + * Convert a synchronous action call into an asynchronous function call + * through an Observable sequence. + * + * @param the first parameter type + * @param the second parameter type + * @param the third parameter type + * @param the fourth parameter type + * @param the fifth parameter type + * @param the sixth parameter type + * @param the seventh parameter type + * @param the eighth parameter type + * @param the ninth parameter type + * @param action the action to convert + * @param scheduler the scheduler used to execute the {@code action} + * + * @return a function which returns an observable sequence which executes + * the {@code action} and emits {@code null}. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func9> toAsync(final Action9 action, final Scheduler scheduler) { + return toAsync(Actions.toFunc(action), scheduler); + } + + /** + * Convert a synchronous function call into an asynchronous function call + * through an Observable sequence. + * + * @param the first parameter type + * @param the second parameter type + * @param the third parameter type + * @param the fourth parameter type + * @param the fifth parameter type + * @param the sixth parameter type + * @param the seventh parameter type + * @param the eighth parameter type + * @param the ninth parameter type + * @param the result type + * @param func the function to convert + * @param scheduler the scheduler used to call the {@code func} + * + * @return a function which returns an observable sequence which executes + * the {@code func} and emits its returned value. + * + * @see MSDN: + * Observable.ToAsync + */ + public static Func9> toAsync(final Func9 func, final Scheduler scheduler) { + return new Func9>() { + @Override + public Observable call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5, final T6 t6, final T7 t7, final T8 t8, final T9 t9) { + final AsyncSubject subject = AsyncSubject.create(); + scheduler.schedule(new Action0() { + @Override + public void call() { + R result; + try { + result = func.call(t1, t2, t3, t4, t5, t6, t7, t8, t9); + } catch (Throwable t) { + subject.onError(t); + return; + } + subject.onNext(result); + subject.onCompleted(); + } + }); + return subject; + } + }; + } + + /** + * Convert a synchronous action call into an asynchronous function call + * through an Observable sequence. + * + * @param action the action to convert + * @param scheduler the scheduler used to execute the {@code action} + * + * @return a function which returns an observable sequence which executes + * the {@code action} and emits {@code null}. + * + */ + public static FuncN> toAsync(final ActionN action, final Scheduler scheduler) { + return toAsync(Actions.toFunc(action), scheduler); + } + + /** + * Convert a synchronous function call into an asynchronous function call + * through an Observable sequence. + * + * @param the result type + * @param func the function to convert + * @param scheduler the scheduler used to call the {@code func} + * + * @return a function which returns an observable sequence which executes + * the {@code func} and emits its returned value. + * + */ + public static FuncN> toAsync(final FuncN func, final Scheduler scheduler) { + return new FuncN>() { + @Override + public Observable call(final Object... args) { + final AsyncSubject subject = AsyncSubject.create(); + scheduler.schedule(new Action0() { + @Override + public void call() { + R result; + try { + result = func.call(args); + } catch (Throwable t) { + subject.onError(t); + return; + } + subject.onNext(result); + subject.onCompleted(); + } + }); + return subject; + } + }; + } + + /** + * Convert a synchronous action call into an asynchronous function call + * through an Observable sequence. + *

+ * Alias for toAsync(ActionN) intended for dynamic languages. + * + * @param action the action to convert + * + * @return a function which returns an observable sequence which executes + * the {@code action} and emits {@code null}. + * + */ + public static FuncN> asyncAction(final ActionN action) { + return toAsync(action); + } + + /** + * Convert a synchronous action call into an asynchronous function call + * through an Observable sequence. + *

+ * Alias for toAsync(ActionN, Scheduler) intended for dynamic languages. + * + * @param action the action to convert + * @param scheduler the scheduler used to execute the {@code action} + * + * @return a function which returns an observable sequence which executes + * the {@code action} and emits {@code null}. + * + */ + public static FuncN> asyncAction(final ActionN action, final Scheduler scheduler) { + return toAsync(action, scheduler); + } + + /** + * Convert a synchronous function call into an asynchronous function call + * through an Observable sequence. + *

+ * Alias for toAsync(FuncN) intended for dynamic languages. + * + * @param the result type + * @param func the function to convert + * + * @return a function which returns an observable sequence which executes + * the {@code func} and emits its returned value. + * + */ + public static FuncN> asyncFunc(final FuncN func) { + return toAsync(func); + } + + /** + * Convert a synchronous function call into an asynchronous function call + * through an Observable sequence. + *

+ * Alias for toAsync(FuncN, Scheduler) intended for dynamic languages. + * + * @param the result type + * @param func the function to convert + * @param scheduler the scheduler used to call the {@code func} + * + * @return a function which returns an observable sequence which executes + * the {@code func} and emits its returned value. + * + */ + public static FuncN> asyncFunc(final FuncN func, final Scheduler scheduler) { + return toAsync(func, scheduler); + } + + /** + * Invokes the asynchronous function immediately, surfacing the result + * through an observable sequence. + *

+ * Important note subscribing to the resulting observable blocks + * until the future completes. + * + * @param the result type + * @param functionAsync the asynchronous function to run + * @return an observable which surfaces the result of the future. + * @see #startFuture(rx.util.functions.Func0, rx.Scheduler) + */ + public static Observable startFuture(Func0> functionAsync) { + return OperationStartFuture.startFuture(functionAsync); + } + + /** + * Invokes the asynchronous function immediately, surfacing the result + * through an observable sequence and waits on the specified scheduler. + * + * @param the result type + * @param functionAsync the asynchronous function to run + * @param scheduler the scheduler where the completion of the Future is + * awaited + * @return an observable which surfaces the result of the future. + */ + public static Observable startFuture(Func0> functionAsync, + Scheduler scheduler) { + return OperationStartFuture.startFuture(functionAsync, scheduler); + } + + /** + * Returns an observable sequence that starts the specified asynchronous + * factory function whenever a new observer subscribes. + *

+ * Important note subscribing to the resulting observable blocks + * until the future completes. + * + * @param the result type + * @param observableFactoryAsync the asynchronous function to start for each + * observer + * @return the observable sequence containing values produced by the + * asynchronous observer produced by the factory + * @see #deferFuture(rx.util.functions.Func0, rx.Scheduler) + */ + public static Observable deferFuture(Func0>> observableFactoryAsync) { + return OperationDeferFuture.deferFuture(observableFactoryAsync); + } + + /** + * Returns an observable sequence that starts the specified asynchronous + * factory function whenever a new observer subscribes. + * + * @param the result type + * @param observableFactoryAsync the asynchronous function to start for each + * observer + * @param scheduler the scheduler where the completion of the Future is + * awaited + * @return the observable sequence containing values produced by the + * asynchronous observer produced by the factory + */ + public static Observable deferFuture( + Func0>> observableFactoryAsync, + Scheduler scheduler) { + return OperationDeferFuture.deferFuture(observableFactoryAsync, scheduler); + } + + /** + * Subscribes to the given source and calls the callback for each + * emitted item, and surfaces the completion or error through a Future. + *

+ * Important note: The returned task blocks indefinitely unless + * the run() method is called or the task is scheduled on an Executor. + * @param the source value type + * @param source the source Observable sequence + * @param onNext the action to call with each emitted element + * @return the Future representing the entire for-each operation + * @see #forEachFuture(rx.util.functions.Action1, rx.Scheduler) + */ + public static FutureTask forEachFuture( + Observable source, + Action1 onNext) { + return OperationForEachFuture.forEachFuture(source, onNext); + } + + + /** + * Subscribes to the given source and calls the callback for each emitted item, + * and surfaces the completion or error through a Future. + *

+ * Important note: The returned task blocks indefinitely unless + * the run() method is called or the task is scheduled on an Executor. + * @param the source value type + * @param source the source Observable sequence + * @param onNext the action to call with each emitted element + * @param onError the action to call when an exception is emitted + * @return the Future representing the entire for-each operation + * @see #forEachFuture(rx.util.functions.Action1, rx.util.functions.Action1, rx.Scheduler) + */ + public static FutureTask forEachFuture( + Observable source, + Action1 onNext, + Action1 onError) { + return OperationForEachFuture.forEachFuture(source, onNext, onError); + } + + + /** + * Subscribes to the given source and calls the callback for each emitted item, + * and surfaces the completion or error through a Future. + *

+ * Important note: The returned task blocks indefinitely unless + * the run() method is called or the task is scheduled on an Executor. + * @param the source value type + * @param source the source Observable sequence + * @param onNext the action to call with each emitted element + * @param onError the action to call when an exception is emitted + * @param onCompleted the action to call when the source completes + * @return the Future representing the entire for-each operation + * @see #forEachFuture(rx.util.functions.Action1, rx.util.functions.Action1, rx.util.functions.Action0, rx.Scheduler) + */ + public static FutureTask forEachFuture( + Observable source, + Action1 onNext, + Action1 onError, + Action0 onCompleted) { + return OperationForEachFuture.forEachFuture(source, onNext, onError, onCompleted); + } + + + /** + * Subscribes to the given source and calls the callback for each emitted item, + * and surfaces the completion or error through a Future, scheduled on the given scheduler. + * @param the source value type + * @param source the source Observable sequence + * @param onNext the action to call with each emitted element + * @param scheduler the scheduler where the task will await the termination of the for-each + * @return the Future representing the entire for-each operation + */ + public static FutureTask forEachFuture( + Observable source, + Action1 onNext, + Scheduler scheduler) { + FutureTask < Void > task = OperationForEachFuture.forEachFuture(source, onNext); + scheduler.schedule(Functionals.fromRunnable(task)); + return task; + } + + + /** + * Subscribes to the given source and calls the callback for each emitted item, + * and surfaces the completion or error through a Future, scheduled on the given scheduler. + * @param the source value type + * @param source the source Observable sequence + * @param onNext the action to call with each emitted element + * @param onError the action to call when an exception is emitted + * @param scheduler the scheduler where the task will await the termination of the for-each + * @return the Future representing the entire for-each operation + */ + public static FutureTask forEachFuture( + Observable source, + Action1 onNext, + Action1 onError, + Scheduler scheduler) { + FutureTask < Void > task = OperationForEachFuture.forEachFuture(source, onNext, onError); + scheduler.schedule(Functionals.fromRunnable(task)); + return task; + } + + + /** + * Subscribes to the given source and calls the callback for each emitted item, + * and surfaces the completion or error through a Future, scheduled on the given scheduler. + * @param the source value type + * @param source the source Observable sequence + * @param onNext the action to call with each emitted element + * @param onError the action to call when an exception is emitted + * @param onCompleted the action to call when the source completes + * @param scheduler the scheduler where the task will await the termination of the for-each + * @return the Future representing the entire for-each operation + */ + public static FutureTask forEachFuture( + Observable source, + Action1 onNext, + Action1 onError, + Action0 onCompleted, + Scheduler scheduler) { + FutureTask task = OperationForEachFuture.forEachFuture(source, onNext, onError, onCompleted); + scheduler.schedule(Functionals.fromRunnable(task)); + return task; + } + + /** + * Return an Observable which calls the given action and emits the given + * result when an Observer subscribes. + *

+ * The action is run on the default thread pool for computation. + * @param the return type + * @param action the action to invoke on each subscription + * @param result the result to emit to observers + * @return an Observable which calls the given action and emits the given + * result when an Observer subscribes + */ + public static Observable fromAction(Action0 action, R result) { + return fromAction(action, result, Schedulers.threadPoolForComputation()); + } + + /** + * Return an Observable which calls the given function and emits its + * result when an Observer subscribes. + *

+ * The function is called on the default thread pool for computation. + * + * @param the return type + * @param function the function to call on each subscription + * @return an Observable which calls the given function and emits its + * result when an Observer subscribes + * @see #start(rx.util.functions.Func0) + * @see #fromCallable(java.util.concurrent.Callable) + */ + public static Observable fromFunc0(Func0 function) { + return fromFunc0(function, Schedulers.threadPoolForComputation()); + } + /** + * Return an Observable which calls the given Callable and emits its + * result or Exception when an Observer subscribes. + *

+ * The Callable is called on the default thread pool for computation. + * + * @param the return type + * @param callable the callable to call on each subscription + * @return an Observable which calls the given Callable and emits its + * result or Exception when an Observer subscribes + * @see #start(rx.util.functions.Func0) + * @see #fromFunc0(rx.util.functions.Func0) + */ + public static Observable fromCallable(Callable callable) { + return fromCallable(callable, Schedulers.threadPoolForComputation()); + } + + /** + * Return an Observable which calls the given Runnable and emits the given + * result when an Observer subscribes. + *

+ * The Runnable is called on the default thread pool for computation. + * + * @param the return type + * @param run the runnable to invoke on each subscription + * @param result the result to emit to observers + * @return an Observable which calls the given Runnable and emits the given + * result when an Observer subscribes + */ + public static Observable fromRunnable(final Runnable run, final R result) { + return fromRunnable(run, result, Schedulers.threadPoolForComputation()); + } + + /** + * Return an Observable which calls the given action and emits the given + * result when an Observer subscribes. + * + * @param the return type + * @param action the action to invoke on each subscription + * @param scheduler the scheduler where the function is called and the result is emitted + * @param result the result to emit to observers + * @return an Observable which calls the given action and emits the given + * result when an Observer subscribes + */ + public static Observable fromAction(Action0 action, R result, Scheduler scheduler) { + return Observable.create(OperationFromFunctionals.fromAction(action, result)).subscribeOn(scheduler); + } + + /** + * Return an Observable which calls the given function and emits its + * result when an Observer subscribes. + * + * @param the return type + * @param function the function to call on each subscription + * @param scheduler the scheduler where the function is called and the result is emitted + * @return an Observable which calls the given function and emits its + * result when an Observer subscribes + * @see #start(rx.util.functions.Func0) + * @see #fromCallable(java.util.concurrent.Callable) + */ + public static Observable fromFunc0(Func0 function, Scheduler scheduler) { + return Observable.create(OperationFromFunctionals.fromFunc0(function)).subscribeOn(scheduler); + } + /** + * Return an Observable which calls the given Callable and emits its + * result or Exception when an Observer subscribes. + * + * @param the return type + * @param callable the callable to call on each subscription + * @param scheduler the scheduler where the function is called and the result is emitted + * @return an Observable which calls the given Callable and emits its + * result or Exception when an Observer subscribes + * @see #start(rx.util.functions.Func0) + * @see #fromFunc0(rx.util.functions.Func0) + */ + public static Observable fromCallable(Callable callable, Scheduler scheduler) { + return Observable.create(OperationFromFunctionals.fromCallable(callable)).subscribeOn(scheduler); + } + + /** + * Return an Observable which calls the given Runnable and emits the given + * result when an Observer subscribes. + * + * @param the return type + * @param run the runnable to invoke on each subscription + * @param scheduler the scheduler where the function is called and the result is emitted + * @param result the result to emit to observers + * @return an Observable which calls the given Runnable and emits the given + * result when an Observer subscribes + */ + public static Observable fromRunnable(final Runnable run, final R result, Scheduler scheduler) { + return Observable.create(OperationFromFunctionals.fromRunnable(run, result)).subscribeOn(scheduler); + } +} diff --git a/rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/Functionals.java b/rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/Functionals.java new file mode 100644 index 0000000000..1117fef941 --- /dev/null +++ b/rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/Functionals.java @@ -0,0 +1,113 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.util.async.operators; + +import rx.util.functions.Action0; +import rx.util.functions.Action1; + +/** + * Utility methods convert between functional interfaces of actions and functions. + */ +public final class Functionals { + private Functionals() { + throw new IllegalStateException("No instances!"); + } + /** + * Return an action which takes a Throwable and does nothing. + *

(To avoid casting from the generic empty1().) + * @return the action + */ + public static Action1 emptyThrowable() { + return EMPTY_THROWABLE; + } + /** + * An action that takes a Throwable and does nothing. + */ + private static final Action1 EMPTY_THROWABLE = new EmptyThrowable(); + /** An empty throwable class. */ + private static final class EmptyThrowable implements Action1 { + @Override + public void call(Throwable t1) { + } + } + /** + * Return an Action0 instance which does nothing. + * @return an Action0 instance which does nothing + */ + public static Action0 empty() { + return EMPTY; + } + /** A single empty instance. */ + private static final Action0 EMPTY = new EmptyAction(); + /** An empty action class. */ + private static final class EmptyAction implements Action0 { + @Override + public void call() { + } + } + + /** + * Converts a runnable instance into an Action0 instance. + * @param run the Runnable to run when the Action0 is called + * @return the Action0 wrapping the Runnable + */ + public static Action0 fromRunnable(Runnable run) { + if (run == null) { + throw new NullPointerException("run"); + } + return new ActionWrappingRunnable(run); + } + /** An Action0 which wraps and calls a Runnable. */ + private static final class ActionWrappingRunnable implements Action0 { + final Runnable run; + + public ActionWrappingRunnable(Runnable run) { + this.run = run; + } + + @Override + public void call() { + run.run(); + } + + } + /** + * Converts an Action0 instance into a Runnable instance. + * @param action the Action0 to call when the Runnable is run + * @return the Runnable wrapping the Action0 + */ + public static Runnable toRunnable(Action0 action) { + if (action == null) { + throw new NullPointerException("action"); + } + return new RunnableAction(action); + } + /** An Action0 which wraps and calls a Runnable. */ + private static final class RunnableAction implements Runnable { + final Action0 action; + + public RunnableAction(Action0 action) { + this.action = action; + } + + @Override + public void run() { + action.call(); + } + + } +} diff --git a/rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/LatchedObserver.java b/rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/LatchedObserver.java new file mode 100644 index 0000000000..dc89ad2748 --- /dev/null +++ b/rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/LatchedObserver.java @@ -0,0 +1,304 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.util.async.operators; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import rx.Observer; +import rx.util.functions.Action0; +import rx.util.functions.Action1; +import rx.util.functions.Action2; + +/** + * An observer implementation that calls a CountDownLatch in case + * a terminal state has been reached. + * @param the observed value type + */ +abstract class LatchedObserver implements Observer { + /** The CountDownLatch to count-down on a terminal state. */ + protected final CountDownLatch latch; + /** Contains the error. */ + protected volatile Throwable error; + /** + * Indicates the completion status. + */ + protected final AtomicBoolean done; + /** + * Consturcts a LatchedObserver instance. + * @param latch the CountDownLatch to use + */ + public LatchedObserver(CountDownLatch latch) { + this.latch = latch; + this.done = new AtomicBoolean(); + } + + /** + * Override this method to handle an onNext event. + * @param value + */ + protected abstract void onNextCore(T value); + /** + * Override this method to handle an onError event. + * @param e + */ + protected abstract void onErrorCore(Throwable e); + /** + * Override this to handle th onCompleted event. + */ + protected abstract void onCompletedCore(); + /** + * Try to move into an error state. + * @param e + * @return true if succeded, false if this observable has already terminated + */ + protected boolean fail(Throwable e) { + if (done.compareAndSet(false, true)) { + onErrorCore(e); + return true; + } + return false; + } + + @Override + public final void onNext(T args) { + if (!done.get()) { + onNextCore(args); + } + } + + @Override + public final void onError(Throwable e) { + fail(e); + } + + @Override + public final void onCompleted() { + if (done.compareAndSet(false, true)) { + onCompletedCore(); + } + } + + /** + * Block and await the latch. + * @throws InterruptedException if the wait is interrupted + */ + public void await() throws InterruptedException { + latch.await(); + } + /** + * Block and await the latch for a given amount of time. + * @see CountDownLatch#await(long, java.util.concurrent.TimeUnit) + */ + public boolean await(long time, TimeUnit unit) throws InterruptedException { + return latch.await(time, unit); + } + /** + * Returns the observed error or null if there was none. + *

+ * Should be generally called after the await() returns. + * @return the observed error + */ + public Throwable getThrowable() { + return error; + } + + /** + * Create a LatchedObserver with the given callback function(s). + */ + public static LatchedObserver create(Action1 onNext) { + return create(onNext, new CountDownLatch(1)); + } + + /** + * Create a LatchedObserver with the given callback function(s). + */ + public static LatchedObserver create(Action1 onNext, Action1 onError) { + return create(onNext, onError, new CountDownLatch(1)); + } + + /** + * Create a LatchedObserver with the given callback function(s). + */ + public static LatchedObserver create(Action1 onNext, Action1 onError, Action0 onCompleted) { + return create(onNext, onError, onCompleted, new CountDownLatch(1)); + } + + /** + * Create a LatchedObserver with the given callback function(s) and a shared latch. + */ + public static LatchedObserver create(Action1 onNext, CountDownLatch latch) { + return new LatchedObserverImpl(onNext, Functionals.emptyThrowable(), Functionals.empty(), latch); + } + + /** + * Create a LatchedObserver with the given callback function(s) and a shared latch. + */ + public static LatchedObserver create(Action1 onNext, Action1 onError, CountDownLatch latch) { + return new LatchedObserverImpl(onNext, onError, Functionals.empty(), latch); + } + + /** + * Create a LatchedObserver with the given callback function(s) and a shared latch. + */ + public static LatchedObserver create(Action1 onNext, Action1 onError, Action0 onCompleted, CountDownLatch latch) { + return new LatchedObserverImpl(onNext, onError, onCompleted, latch); + } + + /** + * Create a LatchedObserver with the given indexed callback function(s). + */ + public static LatchedObserver createIndexed(Action2 onNext) { + return createIndexed(onNext, new CountDownLatch(1)); + } + + /** + * Create a LatchedObserver with the given indexed callback function(s). + */ + public static LatchedObserver createIndexed(Action2 onNext, Action1 onError) { + return createIndexed(onNext, onError, new CountDownLatch(1)); + } + + /** + * Create a LatchedObserver with the given indexed callback function(s). + */ + public static LatchedObserver createIndexed(Action2 onNext, Action1 onError, Action0 onCompleted) { + return createIndexed(onNext, onError, onCompleted, new CountDownLatch(1)); + } + + /** + * Create a LatchedObserver with the given indexed callback function(s) and a shared latch. + */ + public static LatchedObserver createIndexed(Action2 onNext, CountDownLatch latch) { + return new LatchedObserverIndexedImpl(onNext, Functionals.emptyThrowable(), Functionals.empty(), latch); + } + + /** + * Create a LatchedObserver with the given indexed callback function(s) and a shared latch. + */ + public static LatchedObserver createIndexed(Action2 onNext, Action1 onError, CountDownLatch latch) { + return new LatchedObserverIndexedImpl(onNext, onError, Functionals.empty(), latch); + } + + /** + * Create a LatchedObserver with the given indexed callback function(s) and a shared latch. + */ + public static LatchedObserver createIndexed(Action2 onNext, Action1 onError, Action0 onCompleted, CountDownLatch latch) { + return new LatchedObserverIndexedImpl(onNext, onError, onCompleted, latch); + } + + /** + * A latched observer which calls an action for each observed value + * and checks if a cancellation token is not unsubscribed. + * @param the observed value type + */ + private static final class LatchedObserverImpl extends LatchedObserver { + final Action1 onNext; + final Action1 onError; + final Action0 onCompleted; + + public LatchedObserverImpl(Action1 onNext, + Action1 onError, + Action0 onCompleted, + CountDownLatch latch) { + super(latch); + this.onNext = onNext; + this.onError = onError; + this.onCompleted = onCompleted; + } + + @Override + protected void onNextCore(T args) { + try { + onNext.call(args); + } catch (Throwable t) { + fail(t); + } + } + + @Override + protected void onErrorCore(Throwable e) { + try { + error = e; + onError.call(e); + } finally { + latch.countDown(); + } + } + + @Override + protected void onCompletedCore() { + try { + onCompleted.call(); + } finally { + latch.countDown(); + } + } + } + /** + * A latched observer which calls an action for each observed value + * and checks if a cancellation token is not unsubscribed. + * @param the observed value type + */ + private static final class LatchedObserverIndexedImpl extends LatchedObserver { + final Action2 onNext; + final Action1 onError; + final Action0 onCompleted; + int index; + + public LatchedObserverIndexedImpl(Action2 onNext, + Action1 onError, + Action0 onCompleted, + CountDownLatch latch) { + super(latch); + this.onNext = onNext; + this.onError = onError; + this.onCompleted = onCompleted; + } + + @Override + protected void onNextCore(T args) { + if (index == Integer.MAX_VALUE) { + fail(new ArithmeticException("index overflow")); + return; + } + try { + onNext.call(args, index++); + } catch (Throwable t) { + fail(t); + } + } + + @Override + protected void onErrorCore(Throwable e) { + try { + error = e; + onError.call(e); + } finally { + latch.countDown(); + } + } + + @Override + protected void onCompletedCore() { + try { + onCompleted.call(); + } finally { + latch.countDown(); + } + } + } +} diff --git a/rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/OperationDeferFuture.java b/rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/OperationDeferFuture.java new file mode 100644 index 0000000000..fab5f2766f --- /dev/null +++ b/rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/OperationDeferFuture.java @@ -0,0 +1,88 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.util.async.operators; + +import java.util.concurrent.Future; +import rx.Observable; +import rx.Scheduler; +import rx.util.functions.Func0; +import rx.util.functions.Func1; + +/** + * Defer the execution of a factory method which produces an observable sequence. + */ +public final class OperationDeferFuture { + /** Utility class. */ + private OperationDeferFuture() { throw new IllegalStateException("No instances!"); } + + /** + * Returns an observable sequence that starts the specified asynchronous + * factory function whenever a new observer subscribes. + * @param the result type + * @param observableFactoryAsync the asynchronous function to start for each observer + * @return the observable sequence containing values produced by the asynchronous observer + * produced by the factory + */ + public static Observable deferFuture(Func0>> observableFactoryAsync) { + return Observable.defer(new DeferFutureFunc0(observableFactoryAsync)); + } + /** The function called by the defer operator. */ + private static final class DeferFutureFunc0 implements Func0> { + final Func0>> observableFactoryAsync; + + public DeferFutureFunc0(Func0>> observableFactoryAsync) { + this.observableFactoryAsync = observableFactoryAsync; + } + + @Override + public Observable call() { + return Observable.merge(OperationStartFuture.startFuture(observableFactoryAsync)); + } + + } + + /** + * Returns an observable sequence that starts the specified asynchronous + * factory function whenever a new observer subscribes. + * @param the result type + * @param observableFactoryAsync the asynchronous function to start for each observer + * @param scheduler the scheduler where the completion of the Future is awaited + * @return the observable sequence containing values produced by the asynchronous observer + * produced by the factory + */ + public static Observable deferFuture( + Func0>> observableFactoryAsync, + Scheduler scheduler) { + return Observable.defer(new DeferFutureFunc0Scheduled(observableFactoryAsync, scheduler)); + } + /** The function called by the defer operator. */ + private static final class DeferFutureFunc0Scheduled implements Func0> { + final Func0>> observableFactoryAsync; + final Scheduler scheduler; + + public DeferFutureFunc0Scheduled(Func0>> observableFactoryAsync, + Scheduler scheduler) { + this.observableFactoryAsync = observableFactoryAsync; + this.scheduler = scheduler; + } + + @Override + public Observable call() { + return Observable.merge(OperationStartFuture.startFuture(observableFactoryAsync, scheduler)); + } + + } +} diff --git a/rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/OperationForEachFuture.java b/rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/OperationForEachFuture.java new file mode 100644 index 0000000000..74a060b345 --- /dev/null +++ b/rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/OperationForEachFuture.java @@ -0,0 +1,132 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.util.async.operators; + +import java.util.concurrent.Callable; +import java.util.concurrent.FutureTask; +import rx.Observable; +import rx.Subscription; +import rx.util.Exceptions; +import rx.util.functions.Action0; +import rx.util.functions.Action1; + +/** + * Convert the observation of a source observable to a big Future call. + *

+ * Remark: the cancellation token version's behavior is in doubt, so left out. + */ +public final class OperationForEachFuture { + /** Utility class. */ + private OperationForEachFuture() { throw new IllegalStateException("No instances!"); } + + /** + * Subscribes to the given source and calls the callback for each emitted item, + * and surfaces the completion or error through a Future. + * @param the element type of the Observable + * @param source the source Observable + * @param onNext the action to call with each emitted element + * @return the Future representing the entire for-each operation + */ + public static FutureTask forEachFuture( + Observable source, + Action1 onNext) { + return forEachFuture(source, onNext, Functionals.emptyThrowable(), Functionals.empty()); + } + + /** + * Subscribes to the given source and calls the callback for each emitted item, + * and surfaces the completion or error through a Future. + * @param the element type of the Observable + * @param source the source Observable + * @param onNext the action to call with each emitted element + * @param onError the action to call when an exception is emitted + * @return the Future representing the entire for-each operation + */ + public static FutureTask forEachFuture( + Observable source, + Action1 onNext, + Action1 onError) { + return forEachFuture(source, onNext, onError, Functionals.empty()); + } + + /** + * Subscribes to the given source and calls the callback for each emitted item, + * and surfaces the completion or error through a Future. + * @param the element type of the Observable + * @param source the source Observable + * @param onNext the action to call with each emitted element + * @param onError the action to call when an exception is emitted + * @param onCompleted the action to call when the source completes + * @return the Future representing the entire for-each operation + */ + public static FutureTask forEachFuture( + Observable source, + Action1 onNext, + Action1 onError, + Action0 onCompleted) { + + LatchedObserver lo = LatchedObserver.create(onNext, onError, onCompleted); + + Subscription s = source.subscribe(lo); + + FutureTaskCancel task = new FutureTaskCancel(s, new RunAwait(lo)); + + return task; + } + /** + * A future task that unsubscribes the given subscription when cancelled. + * @param the return value type + */ + private static final class FutureTaskCancel extends FutureTask { + final Subscription cancel; + + public FutureTaskCancel(Subscription cancel, Callable callable) { + super(callable); + this.cancel = cancel; + } + + public FutureTaskCancel(Subscription cancel, Runnable runnable, T result) { + super(runnable, result); + this.cancel = cancel; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + cancel.unsubscribe(); + return super.cancel(mayInterruptIfRunning); + } + + } + + /** Await the completion of a latched observer and throw its exception if any. */ + private static final class RunAwait implements Callable { + final LatchedObserver observer; + + public RunAwait(LatchedObserver observer) { + this.observer = observer; + } + + @Override + public Void call() throws Exception { + observer.await(); + Throwable t = observer.getThrowable(); + if (t != null) { + throw Exceptions.propagate(t); + } + return null; + } + } +} diff --git a/rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/OperationFromFunctionals.java b/rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/OperationFromFunctionals.java new file mode 100644 index 0000000000..24af4e0939 --- /dev/null +++ b/rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/OperationFromFunctionals.java @@ -0,0 +1,114 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.util.async.operators; + +import java.util.concurrent.Callable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Actions; +import rx.util.functions.Func0; + +/** + * Operators that invoke a function or action if + * an observer subscribes. + * Asynchrony can be achieved by using subscribeOn or observeOn. + */ +public final class OperationFromFunctionals { + /** Utility class. */ + private OperationFromFunctionals() { throw new IllegalStateException("No instances!"); } + + /** Subscriber function that invokes an action and returns the given result. */ + public static OnSubscribeFunc fromAction(Action0 action, R result) { + return new InvokeAsync(Actions.toFunc(action, result)); + } + + /** Subscriber function that invokes a function and returns its value. */ + public static OnSubscribeFunc fromFunc0(Func0 function) { + return new InvokeAsync(function); + } + + /** + * Subscriber function that invokes the callable and returns its value or + * propagates its checked exception. + */ + public static OnSubscribeFunc fromCallable(Callable callable) { + return new InvokeAsyncCallable(callable); + } + /** Subscriber function that invokes a runnable and returns the given result. */ + public static OnSubscribeFunc fromRunnable(final Runnable run, final R result) { + return new InvokeAsync(new Func0() { + @Override + public R call() { + run.run(); + return result; + } + }); + } + + /** + * Invokes a function when an observer subscribes. + * @param the return type + */ + static final class InvokeAsync implements OnSubscribeFunc { + final Func0 function; + public InvokeAsync(Func0 function) { + if (function == null) { + throw new NullPointerException("function"); + } + this.function = function; + } + @Override + public Subscription onSubscribe(Observer t1) { + Subscription s = Subscriptions.empty(); + try { + t1.onNext(function.call()); + } catch (Throwable t) { + t1.onError(t); + return s; + } + t1.onCompleted(); + return s; + } + } + /** + * Invokes a java.util.concurrent.Callable when an observer subscribes. + * @param the return type + */ + static final class InvokeAsyncCallable implements OnSubscribeFunc { + final Callable callable; + public InvokeAsyncCallable(Callable callable) { + if (callable == null) { + throw new NullPointerException("function"); + } + this.callable = callable; + } + @Override + public Subscription onSubscribe(Observer t1) { + Subscription s = Subscriptions.empty(); + try { + t1.onNext(callable.call()); + } catch (Throwable t) { + t1.onError(t); + return s; + } + t1.onCompleted(); + return s; + } + } +} diff --git a/rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/OperationStartFuture.java b/rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/OperationStartFuture.java new file mode 100644 index 0000000000..992023e7f5 --- /dev/null +++ b/rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/OperationStartFuture.java @@ -0,0 +1,68 @@ + /** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.util.async.operators; + +import java.util.concurrent.Future; +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Scheduler; +import rx.util.functions.Func0; + +/** + * Start an asynchronous Future immediately and observe its result through + * an observable. + */ +public final class OperationStartFuture { + /** Utility class. */ + private OperationStartFuture() { throw new IllegalStateException("No instances!"); } + /** + * Invokes the asynchronous function, surfacing the result through an observable sequence. + *

+ * Important note subscribing to the resulting observable blocks until + * the future completes. + * @param the result type + * @param functionAsync the asynchronous function to run + * @return the observable + */ + public static Observable startFuture(Func0> functionAsync) { + Future task; + try { + task = functionAsync.call(); + } catch (Throwable t) { + return Observable.error(t); + } + return Observable.from(task); + } + /** + * Invokes the asynchronous function, surfacing the result through an observable sequence + * running on the given scheduler. + * @param the result type + * @param functionAsync the asynchronous function to run + * @param scheduler the scheduler where the completion of the Future is awaited + * @return the observable + */ + public static Observable startFuture(Func0> functionAsync, + Scheduler scheduler) { + Future task; + try { + task = functionAsync.call(); + } catch (Throwable t) { + return Observable.error(t); + } + return Observable.from(task, scheduler); + } +} diff --git a/rxjava-core/src/test/java/rx/util/functions/AsyncTest.java b/rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/AsyncTest.java similarity index 84% rename from rxjava-core/src/test/java/rx/util/functions/AsyncTest.java rename to rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/AsyncTest.java index 05a75c4f85..943e8ff898 100644 --- a/rxjava-core/src/test/java/rx/util/functions/AsyncTest.java +++ b/rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/AsyncTest.java @@ -14,20 +14,29 @@ * limitations under the License. */ -package rx.util.functions; +package rx.util.async; +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; + import junit.framework.Assert; + import org.junit.Before; import org.junit.Test; -import static org.mockito.Matchers.any; +import org.mockito.InOrder; import org.mockito.Mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import rx.Observable; import rx.Observer; import rx.schedulers.Schedulers; +import rx.schedulers.TestScheduler; import rx.util.functions.Action0; import rx.util.functions.Action1; import rx.util.functions.Action2; @@ -54,10 +63,12 @@ public class AsyncTest { @Mock Observer observer; + @Before public void before() { MockitoAnnotations.initMocks(this); } + @Test public void testAction0() { final AtomicInteger value = new AtomicInteger(); @@ -67,17 +78,18 @@ public void call() { value.incrementAndGet(); } }; - + Async.toAsync(action, Schedulers.immediate()) .call() .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(null); verify(observer, times(1)).onCompleted(); - + Assert.assertEquals(1, value.get()); } + @Test public void testAction0Error() { Action0 action = new Action0() { @@ -86,15 +98,16 @@ public void call() { throw new RuntimeException("Forced failure"); } }; - + Async.toAsync(action, Schedulers.immediate()) .call() .subscribe(observer); - + verify(observer, times(1)).onError(any(Throwable.class)); verify(observer, never()).onNext(null); verify(observer, never()).onCompleted(); } + @Test public void testAction1() { final AtomicInteger value = new AtomicInteger(); @@ -104,17 +117,18 @@ public void call(Integer t1) { value.set(t1); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(null); verify(observer, times(1)).onCompleted(); - + Assert.assertEquals(1, value.get()); } + @Test public void testAction1Error() { Action1 action = new Action1() { @@ -123,15 +137,16 @@ public void call(Integer t1) { throw new RuntimeException("Forced failure"); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1) .subscribe(observer); - + verify(observer, times(1)).onError(any(Throwable.class)); verify(observer, never()).onNext(null); verify(observer, never()).onCompleted(); } + @Test public void testAction2() { final AtomicInteger value = new AtomicInteger(); @@ -141,17 +156,18 @@ public void call(Integer t1, Integer t2) { value.set(t1 | t2); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(null); verify(observer, times(1)).onCompleted(); - + Assert.assertEquals(3, value.get()); } + @Test public void testAction2Error() { Action2 action = new Action2() { @@ -160,15 +176,16 @@ public void call(Integer t1, Integer t2) { throw new RuntimeException("Forced failure"); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2) .subscribe(observer); - + verify(observer, times(1)).onError(any(Throwable.class)); verify(observer, never()).onNext(null); verify(observer, never()).onCompleted(); } + @Test public void testAction3() { final AtomicInteger value = new AtomicInteger(); @@ -178,17 +195,18 @@ public void call(Integer t1, Integer t2, Integer t3) { value.set(t1 | t2 | t3); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2, 4) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(null); verify(observer, times(1)).onCompleted(); - + Assert.assertEquals(7, value.get()); } + @Test public void testAction3Error() { Action3 action = new Action3() { @@ -197,15 +215,16 @@ public void call(Integer t1, Integer t2, Integer t3) { throw new RuntimeException("Forced failure"); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2, 4) .subscribe(observer); - + verify(observer, times(1)).onError(any(Throwable.class)); verify(observer, never()).onNext(null); verify(observer, never()).onCompleted(); } + @Test public void testAction4() { final AtomicInteger value = new AtomicInteger(); @@ -215,17 +234,18 @@ public void call(Integer t1, Integer t2, Integer t3, Integer t4) { value.set(t1 | t2 | t3 | t4); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2, 4, 8) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(null); verify(observer, times(1)).onCompleted(); - + Assert.assertEquals(15, value.get()); } + @Test public void testAction4Error() { Action4 action = new Action4() { @@ -234,15 +254,16 @@ public void call(Integer t1, Integer t2, Integer t3, Integer t4) { throw new RuntimeException("Forced failure"); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2, 4, 8) .subscribe(observer); - + verify(observer, times(1)).onError(any(Throwable.class)); verify(observer, never()).onNext(null); verify(observer, never()).onCompleted(); } + @Test public void testAction5() { final AtomicInteger value = new AtomicInteger(); @@ -252,17 +273,18 @@ public void call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5) { value.set(t1 | t2 | t3 | t4 | t5); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2, 4, 8, 16) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(null); verify(observer, times(1)).onCompleted(); - + Assert.assertEquals(31, value.get()); } + @Test public void testAction5Error() { Action5 action = new Action5() { @@ -271,15 +293,16 @@ public void call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5) { throw new RuntimeException("Forced failure"); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2, 4, 8, 16) .subscribe(observer); - + verify(observer, times(1)).onError(any(Throwable.class)); verify(observer, never()).onNext(null); verify(observer, never()).onCompleted(); } + @Test public void testAction6() { final AtomicInteger value = new AtomicInteger(); @@ -289,17 +312,18 @@ public void call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Int value.set(t1 | t2 | t3 | t4 | t5 | t6); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2, 4, 8, 16, 32) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(null); verify(observer, times(1)).onCompleted(); - + Assert.assertEquals(63, value.get()); } + @Test public void testAction6Error() { Action6 action = new Action6() { @@ -308,15 +332,16 @@ public void call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Int throw new RuntimeException("Forced failure"); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2, 4, 8, 16, 32) .subscribe(observer); - + verify(observer, times(1)).onError(any(Throwable.class)); verify(observer, never()).onNext(null); verify(observer, never()).onCompleted(); } + @Test public void testAction7() { final AtomicInteger value = new AtomicInteger(); @@ -326,17 +351,18 @@ public void call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Int value.set(t1 | t2 | t3 | t4 | t5 | t6 | t7); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2, 4, 8, 16, 32, 64) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(null); verify(observer, times(1)).onCompleted(); - + Assert.assertEquals(127, value.get()); } + @Test public void testAction7Error() { Action7 action = new Action7() { @@ -345,15 +371,16 @@ public void call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Int throw new RuntimeException("Forced failure"); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2, 4, 8, 16, 32, 64) .subscribe(observer); - + verify(observer, times(1)).onError(any(Throwable.class)); verify(observer, never()).onNext(null); verify(observer, never()).onCompleted(); } + @Test public void testAction8() { final AtomicInteger value = new AtomicInteger(); @@ -363,17 +390,18 @@ public void call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Int value.set(t1 | t2 | t3 | t4 | t5 | t6 | t7 | t8); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2, 4, 8, 16, 32, 64, 128) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(null); verify(observer, times(1)).onCompleted(); - + Assert.assertEquals(255, value.get()); } + @Test public void testAction8Error() { Action8 action = new Action8() { @@ -382,15 +410,16 @@ public void call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Int throw new RuntimeException("Forced failure"); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2, 4, 8, 16, 32, 64, 128) .subscribe(observer); - + verify(observer, times(1)).onError(any(Throwable.class)); verify(observer, never()).onNext(null); verify(observer, never()).onCompleted(); } + @Test public void testAction9() { final AtomicInteger value = new AtomicInteger(); @@ -400,17 +429,18 @@ public void call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Int value.set(t1 | t2 | t3 | t4 | t5 | t6 | t7 | t8 | t9); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2, 4, 8, 16, 32, 64, 128, 256) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(null); verify(observer, times(1)).onCompleted(); - + Assert.assertEquals(511, value.get()); } + @Test public void testAction9Error() { Action9 action = new Action9() { @@ -419,15 +449,16 @@ public void call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Int throw new RuntimeException("Forced failure"); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2, 4, 8, 16, 32, 64, 128, 256) .subscribe(observer); - + verify(observer, times(1)).onError(any(Throwable.class)); verify(observer, never()).onNext(null); verify(observer, never()).onCompleted(); } + @Test public void testActionN() { final AtomicInteger value = new AtomicInteger(); @@ -436,22 +467,23 @@ public void testActionN() { public void call(Object... args) { int i = 0; for (Object o : args) { - i = i | (Integer)o; + i = i | (Integer) o; } value.set(i); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2, 4, 8, 16, 32, 64, 128, 256, 512) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(null); verify(observer, times(1)).onCompleted(); - + Assert.assertEquals(1023, value.get()); } + @Test public void testActionNError() { ActionN action = new ActionN() { @@ -460,15 +492,16 @@ public void call(Object... args) { throw new RuntimeException("Forced failure"); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2, 4, 8, 16, 32, 64, 128, 256, 512) .subscribe(observer); - + verify(observer, times(1)).onError(any(Throwable.class)); verify(observer, never()).onNext(null); verify(observer, never()).onCompleted(); } + @Test public void testFunc0() { Func0 func = new Func0() { @@ -480,12 +513,13 @@ public Integer call() { Async.toAsync(func, Schedulers.immediate()) .call() .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(0); verify(observer, times(1)).onCompleted(); - + } + @Test public void testFunc1() { Func1 func = new Func1() { @@ -497,11 +531,12 @@ public Integer call(Integer t1) { Async.toAsync(func, Schedulers.immediate()) .call(1) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(1); verify(observer, times(1)).onCompleted(); } + @Test public void testFunc2() { Func2 func = new Func2() { @@ -513,11 +548,12 @@ public Integer call(Integer t1, Integer t2) { Async.toAsync(func, Schedulers.immediate()) .call(1, 2) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(3); verify(observer, times(1)).onCompleted(); } + @Test public void testFunc3() { Func3 func = new Func3() { @@ -529,11 +565,12 @@ public Integer call(Integer t1, Integer t2, Integer t3) { Async.toAsync(func, Schedulers.immediate()) .call(1, 2, 4) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(7); verify(observer, times(1)).onCompleted(); } + @Test public void testFunc4() { Func4 func = new Func4() { @@ -545,11 +582,12 @@ public Integer call(Integer t1, Integer t2, Integer t3, Integer t4) { Async.toAsync(func, Schedulers.immediate()) .call(1, 2, 4, 8) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(15); verify(observer, times(1)).onCompleted(); } + @Test public void testFunc5() { Func5 func = new Func5() { @@ -561,11 +599,12 @@ public Integer call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5) Async.toAsync(func, Schedulers.immediate()) .call(1, 2, 4, 8, 16) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(31); verify(observer, times(1)).onCompleted(); } + @Test public void testFunc6() { Func6 func = new Func6() { @@ -577,11 +616,12 @@ public Integer call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Async.toAsync(func, Schedulers.immediate()) .call(1, 2, 4, 8, 16, 32) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(63); verify(observer, times(1)).onCompleted(); } + @Test public void testFunc7() { Func7 func = new Func7() { @@ -593,11 +633,12 @@ public Integer call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Async.toAsync(func, Schedulers.immediate()) .call(1, 2, 4, 8, 16, 32, 64) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(127); verify(observer, times(1)).onCompleted(); } + @Test public void testFunc8() { Func8 func = new Func8() { @@ -609,11 +650,12 @@ public Integer call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Async.toAsync(func, Schedulers.immediate()) .call(1, 2, 4, 8, 16, 32, 64, 128) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(255); verify(observer, times(1)).onCompleted(); } + @Test public void testFunc9() { Func9 func = new Func9() { @@ -625,11 +667,12 @@ public Integer call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Async.toAsync(func, Schedulers.immediate()) .call(1, 2, 4, 8, 16, 32, 64, 128, 256) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(511); verify(observer, times(1)).onCompleted(); } + @Test public void testFuncN() { FuncN func = new FuncN() { @@ -637,7 +680,7 @@ public void testFuncN() { public Integer call(Object... args) { int i = 0; for (Object o : args) { - i = i | (Integer)o; + i = i | (Integer) o; } return i; } @@ -645,9 +688,133 @@ public Integer call(Object... args) { Async.toAsync(func, Schedulers.immediate()) .call(1, 2, 4, 8, 16, 32, 64, 128, 256, 512) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(1023); verify(observer, times(1)).onCompleted(); } + + @Test + public void testStartWithFunc() { + Func0 func = new Func0() { + @Override + public String call() { + return "one"; + } + }; + assertEquals("one", Async.start(func).toBlockingObservable().single()); + } + + @Test(expected = RuntimeException.class) + public void testStartWithFuncError() { + Func0 func = new Func0() { + @Override + public String call() { + throw new RuntimeException("Some error"); + } + }; + Async.start(func).toBlockingObservable().single(); + } + + @Test + public void testStartWhenSubscribeRunBeforeFunc() { + TestScheduler scheduler = new TestScheduler(); + + Func0 func = new Func0() { + @Override + public String call() { + return "one"; + } + }; + + Observable observable = Async.start(func, scheduler); + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verifyNoMoreInteractions(); + + // Run func + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + inOrder.verify(observer, times(1)).onNext("one"); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testStartWhenSubscribeRunAfterFunc() { + TestScheduler scheduler = new TestScheduler(); + + Func0 func = new Func0() { + @Override + public String call() { + return "one"; + } + }; + + Observable observable = Async.start(func, scheduler); + + // Run func + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext("one"); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testStartWithFuncAndMultipleObservers() { + TestScheduler scheduler = new TestScheduler(); + + @SuppressWarnings("unchecked") + Func0 func = (Func0) mock(Func0.class); + doAnswer(new Answer() { + @Override + public String answer(InvocationOnMock invocation) throws Throwable { + return "one"; + } + }).when(func).call(); + + Observable observable = Async.start(func, scheduler); + + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + @SuppressWarnings("unchecked") + Observer observer1 = mock(Observer.class); + @SuppressWarnings("unchecked") + Observer observer2 = mock(Observer.class); + @SuppressWarnings("unchecked") + Observer observer3 = mock(Observer.class); + + observable.subscribe(observer1); + observable.subscribe(observer2); + observable.subscribe(observer3); + + InOrder inOrder; + inOrder = inOrder(observer1); + inOrder.verify(observer1, times(1)).onNext("one"); + inOrder.verify(observer1, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + + inOrder = inOrder(observer2); + inOrder.verify(observer2, times(1)).onNext("one"); + inOrder.verify(observer2, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + + inOrder = inOrder(observer3); + inOrder.verify(observer3, times(1)).onNext("one"); + inOrder.verify(observer3, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + + verify(func, times(1)).call(); + } + } diff --git a/rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/operators/OperationDeferFutureTest.java b/rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/operators/OperationDeferFutureTest.java new file mode 100644 index 0000000000..f07d9b5e04 --- /dev/null +++ b/rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/operators/OperationDeferFutureTest.java @@ -0,0 +1,105 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.util.async.operators; + +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import static org.junit.Assert.fail; +import org.junit.Test; +import org.mockito.InOrder; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import rx.Observable; +import rx.Observer; +import rx.schedulers.Schedulers; +import rx.util.async.Async; +import rx.util.functions.Func0; + +public class OperationDeferFutureTest { + @Test + @SuppressWarnings("unchecked") + public void testSimple() throws InterruptedException { + final ExecutorService exec = Executors.newCachedThreadPool(); + try { + final CountDownLatch ready = new CountDownLatch(1); + + Func0>> func = new Func0>>() { + @Override + public Future> call() { + return exec.submit(new Callable>() { + @Override + public Observable call() throws Exception { + if (!ready.await(1000, TimeUnit.MILLISECONDS)) { + throw new IllegalStateException("Not started in time"); + } + return Observable.from(1); + } + }); + } + }; + + Observable result = Async.deferFuture(func, Schedulers.threadPoolForComputation()); + + final Observer observer = mock(Observer.class); + + final CountDownLatch done = new CountDownLatch(1); + + result.subscribe(new OperationStartFutureTest.MockHelper(observer, done)); + + ready.countDown(); + + if (!done.await(1000, TimeUnit.MILLISECONDS)) { + fail("Not completed in time!"); + } + + InOrder inOrder = inOrder(observer); + + inOrder.verify(observer).onNext(1); + inOrder.verify(observer).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + } finally { + exec.shutdown(); + } + } + + @Test + @SuppressWarnings("unchecked") + public void testSimpleFactoryThrows() { + Func0>> func = new Func0>>() { + + @Override + public Future> call() { + throw new OperationStartFutureTest.CustomException(); + } + }; + + Observable result = Async.deferFuture(func); + + final Observer observer = mock(Observer.class); + result.subscribe(observer); + + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + verify(observer).onError(any(OperationStartFutureTest.CustomException.class)); + } +} \ No newline at end of file diff --git a/rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/operators/OperationForEachFutureTest.java b/rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/operators/OperationForEachFutureTest.java new file mode 100644 index 0000000000..e5915050ee --- /dev/null +++ b/rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/operators/OperationForEachFutureTest.java @@ -0,0 +1,170 @@ + /** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.util.async.operators; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import org.junit.Test; +import rx.Observable; +import rx.schedulers.Schedulers; +import rx.util.async.Async; +import rx.util.functions.Action1; + +public class OperationForEachFutureTest { + @Test + public void testSimple() { + final ExecutorService exec = Executors.newCachedThreadPool(); + + try { + Observable source = Observable.from(1, 2, 3) + .subscribeOn(Schedulers.threadPoolForComputation()); + + final AtomicInteger sum = new AtomicInteger(); + Action1 add = new Action1() { + @Override + public void call(Integer t1) { + sum.addAndGet(t1); + } + }; + + FutureTask task = Async.forEachFuture(source, add); + + exec.execute(task); + + try { + Void value = task.get(1000, TimeUnit.MILLISECONDS); + + assertEquals(null, value); + + assertEquals(6, sum.get()); + } catch (TimeoutException ex) { + fail("Timed out: " + ex); + } catch (ExecutionException ex) { + fail("Exception: " + ex); + } catch (InterruptedException ex) { + fail("Exception: " + ex); + } + } finally { + exec.shutdown(); + } + } + private static final class CustomException extends RuntimeException { } + @Test + public void testSimpleThrowing() { + + final ExecutorService exec = Executors.newCachedThreadPool(); + + try { + Observable source = Observable.error(new CustomException()) + .subscribeOn(Schedulers.threadPoolForComputation()); + + final AtomicInteger sum = new AtomicInteger(); + Action1 add = new Action1() { + @Override + public void call(Integer t1) { + sum.addAndGet(t1); + } + }; + + FutureTask task = Async.forEachFuture(source, add); + + exec.execute(task); + + try { + task.get(1000, TimeUnit.MILLISECONDS); + } catch (TimeoutException ex) { + fail("Timed out: " + ex); + } catch (ExecutionException ex) { + if (!(ex.getCause() instanceof CustomException)) { + fail("Got different exception: " + ex.getCause()); + } + } catch (InterruptedException ex) { + fail("Exception: " + ex); + } + + assertEquals(0, sum.get()); + } finally { + exec.shutdown(); + } + } + + @Test + public void testSimpleScheduled() { + Observable source = Observable.from(1, 2, 3) + .subscribeOn(Schedulers.threadPoolForComputation()); + + final AtomicInteger sum = new AtomicInteger(); + Action1 add = new Action1() { + @Override + public void call(Integer t1) { + sum.addAndGet(t1); + } + }; + + FutureTask task = Async.forEachFuture(source, add, Schedulers.newThread()); + + try { + Void value = task.get(1000, TimeUnit.MILLISECONDS); + + assertEquals(null, value); + + assertEquals(6, sum.get()); + } catch (TimeoutException ex) { + fail("Timed out: " + ex); + } catch (ExecutionException ex) { + fail("Exception: " + ex); + } catch (InterruptedException ex) { + fail("Exception: " + ex); + } + } + @Test + public void testSimpleScheduledThrowing() { + + Observable source = Observable.error(new CustomException()) + .subscribeOn(Schedulers.threadPoolForComputation()); + + final AtomicInteger sum = new AtomicInteger(); + Action1 add = new Action1() { + @Override + public void call(Integer t1) { + sum.addAndGet(t1); + } + }; + + FutureTask task = Async.forEachFuture(source, add, Schedulers.newThread()); + + try { + task.get(1000, TimeUnit.MILLISECONDS); + } catch (TimeoutException ex) { + fail("Timed out: " + ex); + } catch (ExecutionException ex) { + if (!(ex.getCause() instanceof CustomException)) { + fail("Got different exception: " + ex.getCause()); + } + } catch (InterruptedException ex) { + fail("Exception: " + ex); + } + + assertEquals(0, sum.get()); + } +} diff --git a/rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/operators/OperationFromFunctionalsTest.java b/rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/operators/OperationFromFunctionalsTest.java new file mode 100644 index 0000000000..ebd741bc4f --- /dev/null +++ b/rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/operators/OperationFromFunctionalsTest.java @@ -0,0 +1,241 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.util.async.operators; + +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import junit.framework.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; +import static org.mockito.Mockito.*; +import rx.Observable; +import rx.Observer; +import rx.schedulers.TestScheduler; +import rx.util.async.Async; +import rx.util.functions.Action0; +import rx.util.functions.Func0; + +public class OperationFromFunctionalsTest { + TestScheduler scheduler; + @Before + public void before() { + scheduler = new TestScheduler(); + } + private void testRunShouldThrow(Observable source, Class exception) { + for (int i = 0; i < 3; i++) { + + Observer observer = mock(Observer.class); + source.subscribe(observer); + + InOrder inOrder = inOrder(observer); + + inOrder.verify(observer, never()).onError(any(Throwable.class)); + + scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS); + + inOrder.verify(observer, times(1)).onError(any(exception)); + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + } + @Test + public void testFromAction() { + final AtomicInteger value = new AtomicInteger(); + + Action0 action = new Action0() { + @Override + public void call() { + value.set(2); + } + }; + + Observable source = Async.fromAction(action, 1, scheduler); + + for (int i = 0; i < 3; i++) { + + value.set(0); + + Observer observer = mock(Observer.class); + source.subscribe(observer); + + InOrder inOrder = inOrder(observer); + + inOrder.verify(observer, never()).onNext(any()); + inOrder.verify(observer, never()).onCompleted(); + + scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS); + + inOrder.verify(observer, times(1)).onNext(1); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + verify(observer, never()).onError(any(Throwable.class)); + + Assert.assertEquals(2, value.get()); + } + } + @Test + public void testFromActionThrows() { + Action0 action = new Action0() { + @Override + public void call() { + throw new RuntimeException("Forced failure!"); + } + }; + + Observable source = Async.fromAction(action, 1, scheduler); + + testRunShouldThrow(source, RuntimeException.class); + } + @Test + public void testFromFunc0() { + Func0 func = new Func0() { + @Override + public Integer call() { + return 1; + } + }; + + Observable source = Async.fromFunc0(func, scheduler); + + for (int i = 0; i < 3; i++) { + + Observer observer = mock(Observer.class); + source.subscribe(observer); + + InOrder inOrder = inOrder(observer); + + inOrder.verify(observer, never()).onNext(any()); + inOrder.verify(observer, never()).onCompleted(); + + scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS); + + inOrder.verify(observer, times(1)).onNext(1); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + verify(observer, never()).onError(any(Throwable.class)); + } + } + + @Test + public void testFromFunc0Throws() { + Func0 func = new Func0() { + @Override + public Integer call() { + throw new RuntimeException("Forced failure!"); + } + }; + + Observable source = Async.fromFunc0(func, scheduler); + + testRunShouldThrow(source, RuntimeException.class); + } + @Test + public void testFromRunnable() { + final AtomicInteger value = new AtomicInteger(); + + Runnable action = new Runnable() { + @Override + public void run() { + value.set(2); + } + }; + + Observable source = Async.fromRunnable(action, 1, scheduler); + + for (int i = 0; i < 3; i++) { + + value.set(0); + + Observer observer = mock(Observer.class); + source.subscribe(observer); + + InOrder inOrder = inOrder(observer); + + inOrder.verify(observer, never()).onNext(any()); + inOrder.verify(observer, never()).onCompleted(); + + scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS); + + inOrder.verify(observer, times(1)).onNext(1); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + verify(observer, never()).onError(any(Throwable.class)); + + Assert.assertEquals(2, value.get()); + } + } + @Test + public void testFromRunnableThrows() { + Runnable action = new Runnable() { + @Override + public void run() { + throw new RuntimeException("Forced failure!"); + } + }; + + Observable source = Async.fromRunnable(action, 1, scheduler); + + testRunShouldThrow(source, RuntimeException.class); + } + @Test + public void testFromCallable() { + Callable callable = new Callable() { + @Override + public Integer call() throws Exception { + return 1; + } + }; + + Observable source = Async.fromCallable(callable, scheduler); + + for (int i = 0; i < 3; i++) { + + Observer observer = mock(Observer.class); + source.subscribe(observer); + + InOrder inOrder = inOrder(observer); + + inOrder.verify(observer, never()).onNext(any()); + inOrder.verify(observer, never()).onCompleted(); + + scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS); + + inOrder.verify(observer, times(1)).onNext(1); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + verify(observer, never()).onError(any(Throwable.class)); + } + } + + @Test + public void testFromCallableThrows() { + Callable callable = new Callable() { + @Override + public Integer call() throws Exception { + throw new IOException("Forced failure!"); + } + }; + + Observable source = Async.fromCallable(callable, scheduler); + + testRunShouldThrow(source, IOException.class); + } +} diff --git a/rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/operators/OperationStartFutureTest.java b/rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/operators/OperationStartFutureTest.java new file mode 100644 index 0000000000..733fdb39b3 --- /dev/null +++ b/rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/operators/OperationStartFutureTest.java @@ -0,0 +1,147 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.util.async.operators; + +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import static org.junit.Assert.fail; +import org.junit.Test; +import org.mockito.InOrder; +import static org.mockito.Mockito.*; +import rx.Observable; +import rx.Observer; +import rx.schedulers.Schedulers; +import rx.util.async.Async; +import rx.util.functions.Func0; + +public class OperationStartFutureTest { + /** Custom exception to distinguish from any other RuntimeException. */ + static class CustomException extends RuntimeException {} + /** + * Forwards the events to the underlying observer and counts down the latch + * on terminal conditions. + * @param + */ + static class MockHelper implements Observer { + final Observer observer; + final CountDownLatch latch; + + public MockHelper(Observer observer, CountDownLatch latch) { + this.observer = observer; + this.latch = latch; + } + + @Override + public void onNext(T args) { + try { + observer.onNext(args); + } catch (Throwable t) { + onError(t); + } + } + + @Override + public void onError(Throwable e) { + try { + observer.onError(e); + } finally { + latch.countDown(); + } + } + + + @Override + public void onCompleted() { + try { + observer.onCompleted(); + } finally { + latch.countDown(); + } + } + + } + @Test + @SuppressWarnings("unchecked") + public void testSimple() throws InterruptedException { + final ExecutorService exec = Executors.newCachedThreadPool(); + try { + final CountDownLatch ready = new CountDownLatch(1); + + Func0> func = new Func0>() { + + @Override + public Future call() { + return exec.submit(new Callable() { + @Override + public Integer call() throws Exception { + if (!ready.await(1000, TimeUnit.MILLISECONDS)) { + throw new IllegalStateException("Not started in time"); + } + return 1; + } + }); + } + }; + + Observable result = Async.startFuture(func, Schedulers.threadPoolForComputation()); + + final Observer observer = mock(Observer.class); + + final CountDownLatch done = new CountDownLatch(1); + + result.subscribe(new MockHelper(observer, done)); + + ready.countDown(); + + if (!done.await(1000, TimeUnit.MILLISECONDS)) { + fail("Not completed in time!"); + } + + InOrder inOrder = inOrder(observer); + + inOrder.verify(observer).onNext(1); + inOrder.verify(observer).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + } finally { + exec.shutdown(); + } + } + + @Test + @SuppressWarnings("unchecked") + public void testSimpleFactoryThrows() { + Func0> func = new Func0>() { + + @Override + public Future call() { + throw new CustomException(); + } + }; + + Observable result = Async.startFuture(func); + + final Observer observer = mock(Observer.class); + result.subscribe(observer); + + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + verify(observer).onError(any(CustomException.class)); + } +} diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index ebf9362123..e311414af8 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -125,7 +125,6 @@ import rx.util.functions.Action0; import rx.util.functions.Action1; import rx.util.functions.Action2; -import rx.util.functions.Async; import rx.util.functions.Func0; import rx.util.functions.Func1; import rx.util.functions.Func2; @@ -7711,45 +7710,4 @@ public Observable> gro return create(new OperationGroupByUntil(this, keySelector, valueSelector, durationSelector)); } - /** - * Invokes the specified function asynchronously and returns an Observable - * that emits the result. - *

- * Note: The function is called immediately and once, not whenever an - * observer subscribes to the resulting Observable. Multiple subscriptions - * to this Observable observe the same return value. - *

- * - * - * @param func function to run asynchronously - * @return an Observable that emits the function's result value, or notifies - * observers of an exception - * @see RxJava Wiki: start() - * @see MSDN: Observable.Start - */ - public static Observable start(Func0 func) { - return Async.toAsync(func).call(); - } - - /** - * Invokes the specified function asynchronously on the specified scheduler - * and returns an Observable that emits the result. - *

- * Note: The function is called immediately and once, not whenever an - * observer subscribes to the resulting Observable. Multiple subscriptions - * to this Observable observe the same return value. - *

- * - * - * @param func function to run asynchronously - * @param scheduler scheduler to run the function on - * @return an Observable that emits the function's result value, or notifies - * observers of an exception - * @see RxJava Wiki: start() - * @see MSDN: Observable.Start - */ - public static Observable start(Func0 func, Scheduler scheduler) { - return Async.toAsync(func, scheduler).call(); - } - } diff --git a/rxjava-core/src/main/java/rx/util/functions/Async.java b/rxjava-core/src/main/java/rx/util/functions/Async.java deleted file mode 100644 index 51a1787715..0000000000 --- a/rxjava-core/src/main/java/rx/util/functions/Async.java +++ /dev/null @@ -1,957 +0,0 @@ - /** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package rx.util.functions; - -import rx.Observable; -import rx.Scheduler; -import rx.schedulers.ExecutorScheduler; -import rx.schedulers.Schedulers; -import rx.subjects.AsyncSubject; - -/** - * Utility methods to convert functions and actions into asynchronous - * operations through the Observable/Observer pattern. - */ -public final class Async { - private Async() { throw new IllegalStateException("No instances!"); } - /** - * {@link Scheduler} intended for asynchronous conversions. - *

- * Defaults to {@link #threadPoolForComputation()}. - * - * @return {@link ExecutorScheduler} for asynchronous conversion work. - */ - public static Scheduler threadPoolForAsyncConversions() { - return Schedulers.threadPoolForComputation(); - } - /** - * Convert a synchronous action call into an asynchronous function - * call through an Observable sequence. - * - * @param action the action to convert - * - * @return a function which returns an observable sequence which - * executes the {@code action} and emits {@code null}. - * - * @see MSDN: Observable.ToAsync - */ - public static Func0> toAsync(Action0 action) { - return toAsync(action, threadPoolForAsyncConversions()); - } - /** - * Convert a synchronous function call into an asynchronous function - * call through an Observable sequence. - * - * @param func the function to convert - * - * @return a function which returns an observable sequence which - * executes the {@code func} and emits its returned value. - * - * @see MSDN: Observable.ToAsync - */ - public static Func0> toAsync(Func0 func) { - return toAsync(func, threadPoolForAsyncConversions()); - } - /** - * Convert a synchronous action call into an asynchronous function - * call through an Observable sequence. - * - * @param action the action to convert - * - * @return a function which returns an observable sequence which - * executes the {@code action} and emits {@code null}. - * - * @see MSDN: Observable.ToAsync - */ - public static Func1> toAsync(Action1 action) { - return toAsync(action, threadPoolForAsyncConversions()); - } - /** - * Convert a synchronous function call into an asynchronous function - * call through an Observable sequence. - * - * @param func the function to convert - * - * @return a function which returns an observable sequence which - * executes the {@code func} and emits its returned value. - * - * @see MSDN: Observable.ToAsync - */ - public static Func1> toAsync(Func1 func) { - return toAsync(func, threadPoolForAsyncConversions()); - } - /** - * Convert a synchronous action call into an asynchronous function - * call through an Observable sequence. - * - * @param action the action to convert - * - * @return a function which returns an observable sequence which - * executes the {@code action} and emits {@code null}. - * - * @see MSDN: Observable.ToAsync - */ - public static Func2> toAsync(Action2 action) { - return toAsync(action, threadPoolForAsyncConversions()); - } - /** - * Convert a synchronous function call into an asynchronous function - * call through an Observable sequence. - * - * @param func the function to convert - * - * @return a function which returns an observable sequence which - * executes the {@code func} and emits its returned value. - * - * @see MSDN: Observable.ToAsync - */ - public static Func2> toAsync(Func2 func) { - return toAsync(func, threadPoolForAsyncConversions()); - } - /** - * Convert a synchronous action call into an asynchronous function - * call through an Observable sequence. - * - * @param action the action to convert - * - * @return a function which returns an observable sequence which - * executes the {@code action} and emits {@code null}. - * - * @see MSDN: Observable.ToAsync - */ - public static Func3> toAsync(Action3 action) { - return toAsync(action, threadPoolForAsyncConversions()); - } - /** - * Convert a synchronous function call into an asynchronous function - * call through an Observable sequence. - * - * @param func the function to convert - * - * @return a function which returns an observable sequence which - * executes the {@code func} and emits its returned value. - * - * @see MSDN: Observable.ToAsync - */ - public static Func3> toAsync(Func3 func) { - return toAsync(func, threadPoolForAsyncConversions()); - } - /** - * Convert a synchronous action call into an asynchronous function - * call through an Observable sequence. - * - * @param action the action to convert - * - * @return a function which returns an observable sequence which - * executes the {@code action} and emits {@code null}. - * - * @see MSDN: Observable.ToAsync - */ - public static Func4> toAsync(Action4 action) { - return toAsync(action, threadPoolForAsyncConversions()); - } - /** - * Convert a synchronous function call into an asynchronous function - * call through an Observable sequence. - * - * @param func the function to convert - * - * @return a function which returns an observable sequence which - * executes the {@code func} and emits its returned value. - * - * @see MSDN: Observable.ToAsync - */ - public static Func4> toAsync(Func4 func) { - return toAsync(func, threadPoolForAsyncConversions()); - } - /** - * Convert a synchronous action call into an asynchronous function - * call through an Observable sequence. - * - * @param action the action to convert - * - * @return a function which returns an observable sequence which - * executes the {@code action} and emits {@code null}. - * - * @see MSDN: Observable.ToAsync - */ - public static Func5> toAsync(Action5 action) { - return toAsync(action, threadPoolForAsyncConversions()); - } - /** - * Convert a synchronous function call into an asynchronous function - * call through an Observable sequence. - * - * @param func the function to convert - * - * @return a function which returns an observable sequence which - * executes the {@code func} and emits its returned value. - * - * @see MSDN: Observable.ToAsync - */ - public static Func5> toAsync(Func5 func) { - return toAsync(func, threadPoolForAsyncConversions()); - } - /** - * Convert a synchronous action call into an asynchronous function - * call through an Observable sequence. - * - * @param action the action to convert - * - * @return a function which returns an observable sequence which - * executes the {@code action} and emits {@code null}. - * - * @see MSDN: Observable.ToAsync - */ - public static Func6> toAsync(Action6 action) { - return toAsync(action, threadPoolForAsyncConversions()); - } - /** - * Convert a synchronous function call into an asynchronous function - * call through an Observable sequence. - * - * @param func the function to convert - * - * @return a function which returns an observable sequence which - * executes the {@code func} and emits its returned value. - * - * @see MSDN: Observable.ToAsync - */ - public static Func6> toAsync(Func6 func) { - return toAsync(func, threadPoolForAsyncConversions()); - } - /** - * Convert a synchronous action call into an asynchronous function - * call through an Observable sequence. - * - * @param action the action to convert - * - * @return a function which returns an observable sequence which - * executes the {@code action} and emits {@code null}. - * - * @see MSDN: Observable.ToAsync - */ - public static Func7> toAsync(Action7 action) { - return toAsync(action, threadPoolForAsyncConversions()); - } - /** - * Convert a synchronous function call into an asynchronous function - * call through an Observable sequence. - * - * @param func the function to convert - * - * @return a function which returns an observable sequence which - * executes the {@code func} and emits its returned value. - * - * @see MSDN: Observable.ToAsync - */ - public static Func7> toAsync(Func7 func) { - return toAsync(func, threadPoolForAsyncConversions()); - } - /** - * Convert a synchronous action call into an asynchronous function - * call through an Observable sequence. - * - * @param action the action to convert - * - * @return a function which returns an observable sequence which - * executes the {@code action} and emits {@code null}. - * - * @see MSDN: Observable.ToAsync - */ - public static Func8> toAsync(Action8 action) { - return toAsync(action, threadPoolForAsyncConversions()); - } - /** - * Convert a synchronous function call into an asynchronous function - * call through an Observable sequence. - * - * @param func the function to convert - * - * @return a function which returns an observable sequence which - * executes the {@code func} and emits its returned value. - * - * @see MSDN: Observable.ToAsync - */ - public static Func8> toAsync(Func8 func) { - return toAsync(func, threadPoolForAsyncConversions()); - } - /** - * Convert a synchronous action call into an asynchronous function - * call through an Observable sequence. - * - * @param action the action to convert - * - * @return a function which returns an observable sequence which - * executes the {@code action} and emits {@code null}. - * - * @see MSDN: Observable.ToAsync - */ - public static Func9> toAsync(Action9 action) { - return toAsync(action, threadPoolForAsyncConversions()); - } - /** - * Convert a synchronous function call into an asynchronous function - * call through an Observable sequence. - * - * @param func the function to convert - * - * @return a function which returns an observable sequence which - * executes the {@code func} and emits its returned value. - * - * @see MSDN: Observable.ToAsync - */ - public static Func9> toAsync(Func9 func) { - return toAsync(func, threadPoolForAsyncConversions()); - } - /** - * Convert a synchronous action call into an asynchronous function - * call through an Observable sequence. - * - * @param action the action to convert - * - * @return a function which returns an observable sequence which - * executes the {@code action} and emits {@code null}. - * - */ - public static FuncN> toAsync(ActionN action) { - return toAsync(action, threadPoolForAsyncConversions()); - } - /** - * Convert a synchronous function call into an asynchronous function - * call through an Observable sequence. - * - * @param func the function to convert - * - * @return a function which returns an observable sequence which - * executes the {@code func} and emits its returned value. - * - */ - public static FuncN> toAsync(FuncN func) { - return toAsync(func, threadPoolForAsyncConversions()); - } - /** - * Convert a synchronous action call into an asynchronous function - * call through an Observable sequence. - * - * @param action the action to convert - * @param scheduler the scheduler used to execute the {@code action} - * - * @return a function which returns an observable sequence which - * executes the {@code action} and emits {@code null}. - * - * @see MSDN: Observable.ToAsync - */ - public static Func0> toAsync(final Action0 action, final Scheduler scheduler) { - return toAsync(Actions.toFunc(action), scheduler); - } - /** - * Convert a synchronous function call into an asynchronous function - * call through an Observable sequence. - * - * @param func the function to convert - * @param scheduler the scheduler used to call the {@code func} - * - * @return a function which returns an observable sequence which - * executes the {@code func} and emits its returned value. - * - * @see MSDN: Observable.ToAsync - */ - public static Func0> toAsync(final Func0 func, final Scheduler scheduler) { - return new Func0>() { - @Override - public Observable call() { - final AsyncSubject subject = AsyncSubject.create(); - scheduler.schedule(new Action0() { - @Override - public void call() { - R result; - try { - result = func.call(); - } catch (Throwable t) { - subject.onError(t); - return; - } - subject.onNext(result); - subject.onCompleted(); - } - }); - return subject; - } - }; - } - /** - * Convert a synchronous action call into an asynchronous function - * call through an Observable sequence. - * - * @param action the action to convert - * @param scheduler the scheduler used to execute the {@code action} - * - * @return a function which returns an observable sequence which - * executes the {@code action} and emits {@code null}. - * - * @see MSDN: Observable.ToAsync - */ - public static Func1> toAsync(final Action1 action, final Scheduler scheduler) { - return toAsync(Actions.toFunc(action), scheduler); - } - /** - * Convert a synchronous function call into an asynchronous function - * call through an Observable sequence. - * - * @param func the function to convert - * @param scheduler the scheduler used to call the {@code func} - * - * @return a function which returns an observable sequence which - * executes the {@code func} and emits its returned value. - * - * @see MSDN: Observable.ToAsync - */ - public static Func1> toAsync(final Func1 func, final Scheduler scheduler) { - return new Func1>() { - @Override - public Observable call(final T1 t1) { - final AsyncSubject subject = AsyncSubject.create(); - scheduler.schedule(new Action0() { - @Override - public void call() { - R result; - try { - result = func.call(t1); - } catch (Throwable t) { - subject.onError(t); - return; - } - subject.onNext(result); - subject.onCompleted(); - } - }); - return subject; - } - }; - } - /** - * Convert a synchronous action call into an asynchronous function - * call through an Observable sequence. - * - * @param action the action to convert - * @param scheduler the scheduler used to execute the {@code action} - * - * @return a function which returns an observable sequence which - * executes the {@code action} and emits {@code null}. - * - * @see MSDN: Observable.ToAsync - */ - public static Func2> toAsync(final Action2 action, final Scheduler scheduler) { - return toAsync(Actions.toFunc(action), scheduler); - } - /** - * Convert a synchronous function call into an asynchronous function - * call through an Observable sequence. - * - * @param func the function to convert - * @param scheduler the scheduler used to call the {@code func} - * - * @return a function which returns an observable sequence which - * executes the {@code func} and emits its returned value. - * - * @see MSDN: Observable.ToAsync - */ - public static Func2> toAsync(final Func2 func, final Scheduler scheduler) { - return new Func2>() { - @Override - public Observable call(final T1 t1, final T2 t2) { - final AsyncSubject subject = AsyncSubject.create(); - scheduler.schedule(new Action0() { - @Override - public void call() { - R result; - try { - result = func.call(t1, t2); - } catch (Throwable t) { - subject.onError(t); - return; - } - subject.onNext(result); - subject.onCompleted(); - } - }); - return subject; - } - }; - } - /** - * Convert a synchronous action call into an asynchronous function - * call through an Observable sequence. - * - * @param action the action to convert - * @param scheduler the scheduler used to execute the {@code action} - * - * @return a function which returns an observable sequence which - * executes the {@code action} and emits {@code null}. - * - * @see MSDN: Observable.ToAsync - */ - public static Func3> toAsync(final Action3 action, final Scheduler scheduler) { - return toAsync(Actions.toFunc(action), scheduler); - } - /** - * Convert a synchronous function call into an asynchronous function - * call through an Observable sequence. - * - * @param func the function to convert - * @param scheduler the scheduler used to call the {@code func} - * - * @return a function which returns an observable sequence which - * executes the {@code func} and emits its returned value. - * - * @see MSDN: Observable.ToAsync - */ - public static Func3> toAsync(final Func3 func, final Scheduler scheduler) { - return new Func3>() { - @Override - public Observable call(final T1 t1, final T2 t2, final T3 t3) { - final AsyncSubject subject = AsyncSubject.create(); - scheduler.schedule(new Action0() { - @Override - public void call() { - R result; - try { - result = func.call(t1, t2, t3); - } catch (Throwable t) { - subject.onError(t); - return; - } - subject.onNext(result); - subject.onCompleted(); - } - }); - return subject; - } - }; - } - /** - * Convert a synchronous action call into an asynchronous function - * call through an Observable sequence. - * - * @param action the action to convert - * @param scheduler the scheduler used to execute the {@code action} - * - * @return a function which returns an observable sequence which - * executes the {@code action} and emits {@code null}. - * - * @see MSDN: Observable.ToAsync - */ - public static Func4> toAsync(final Action4 action, final Scheduler scheduler) { - return toAsync(Actions.toFunc(action), scheduler); - } - /** - * Convert a synchronous function call into an asynchronous function - * call through an Observable sequence. - * - * @param func the function to convert - * @param scheduler the scheduler used to call the {@code func} - * - * @return a function which returns an observable sequence which - * executes the {@code func} and emits its returned value. - * - * @see MSDN: Observable.ToAsync - */ - public static Func4> toAsync(final Func4 func, final Scheduler scheduler) { - return new Func4>() { - @Override - public Observable call(final T1 t1, final T2 t2, final T3 t3, final T4 t4) { - final AsyncSubject subject = AsyncSubject.create(); - scheduler.schedule(new Action0() { - @Override - public void call() { - R result; - try { - result = func.call(t1, t2, t3, t4); - } catch (Throwable t) { - subject.onError(t); - return; - } - subject.onNext(result); - subject.onCompleted(); - } - }); - return subject; - } - }; - } - /** - * Convert a synchronous action call into an asynchronous function - * call through an Observable sequence. - * - * @param action the action to convert - * @param scheduler the scheduler used to execute the {@code action} - * - * @return a function which returns an observable sequence which - * executes the {@code action} and emits {@code null}. - * - * @see MSDN: Observable.ToAsync - */ - public static Func5> toAsync(final Action5 action, final Scheduler scheduler) { - return toAsync(Actions.toFunc(action), scheduler); - } - /** - * Convert a synchronous function call into an asynchronous function - * call through an Observable sequence. - * - * @param func the function to convert - * @param scheduler the scheduler used to call the {@code func} - * - * @return a function which returns an observable sequence which - * executes the {@code func} and emits its returned value. - * - * @see MSDN: Observable.ToAsync - */ - public static Func5> toAsync(final Func5 func, final Scheduler scheduler) { - return new Func5>() { - @Override - public Observable call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5) { - final AsyncSubject subject = AsyncSubject.create(); - scheduler.schedule(new Action0() { - @Override - public void call() { - R result; - try { - result = func.call(t1, t2, t3, t4, t5); - } catch (Throwable t) { - subject.onError(t); - return; - } - subject.onNext(result); - subject.onCompleted(); - } - }); - return subject; - } - }; - } - /** - * Convert a synchronous action call into an asynchronous function - * call through an Observable sequence. - * - * @param action the action to convert - * @param scheduler the scheduler used to execute the {@code action} - * - * @return a function which returns an observable sequence which - * executes the {@code action} and emits {@code null}. - * - * @see MSDN: Observable.ToAsync - */ - public static Func6> toAsync(final Action6 action, final Scheduler scheduler) { - return toAsync(Actions.toFunc(action), scheduler); - } - /** - * Convert a synchronous function call into an asynchronous function - * call through an Observable sequence. - * - * @param func the function to convert - * @param scheduler the scheduler used to call the {@code func} - * - * @return a function which returns an observable sequence which - * executes the {@code func} and emits its returned value. - * - * @see MSDN: Observable.ToAsync - */ - public static Func6> toAsync(final Func6 func, final Scheduler scheduler) { - return new Func6>() { - @Override - public Observable call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5, final T6 t6) { - final AsyncSubject subject = AsyncSubject.create(); - scheduler.schedule(new Action0() { - @Override - public void call() { - R result; - try { - result = func.call(t1, t2, t3, t4, t5, t6); - } catch (Throwable t) { - subject.onError(t); - return; - } - subject.onNext(result); - subject.onCompleted(); - } - }); - return subject; - } - }; - } - /** - * Convert a synchronous action call into an asynchronous function - * call through an Observable sequence. - * - * @param action the action to convert - * @param scheduler the scheduler used to execute the {@code action} - * - * @return a function which returns an observable sequence which - * executes the {@code action} and emits {@code null}. - * - * @see MSDN: Observable.ToAsync - */ - public static Func7> toAsync(final Action7 action, final Scheduler scheduler) { - return toAsync(Actions.toFunc(action), scheduler); - } - /** - * Convert a synchronous function call into an asynchronous function - * call through an Observable sequence. - * - * @param func the function to convert - * @param scheduler the scheduler used to call the {@code func} - * - * @return a function which returns an observable sequence which - * executes the {@code func} and emits its returned value. - * - * @see MSDN: Observable.ToAsync - */ - public static Func7> toAsync(final Func7 func, final Scheduler scheduler) { - return new Func7>() { - @Override - public Observable call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5, final T6 t6, final T7 t7) { - final AsyncSubject subject = AsyncSubject.create(); - scheduler.schedule(new Action0() { - @Override - public void call() { - R result; - try { - result = func.call(t1, t2, t3, t4, t5, t6, t7); - } catch (Throwable t) { - subject.onError(t); - return; - } - subject.onNext(result); - subject.onCompleted(); - } - }); - return subject; - } - }; - } - /** - * Convert a synchronous action call into an asynchronous function - * call through an Observable sequence. - * - * @param action the action to convert - * @param scheduler the scheduler used to execute the {@code action} - * - * @return a function which returns an observable sequence which - * executes the {@code action} and emits {@code null}. - * - * @see MSDN: Observable.ToAsync - */ - public static Func8> toAsync(final Action8 action, final Scheduler scheduler) { - return toAsync(Actions.toFunc(action), scheduler); - } - /** - * Convert a synchronous function call into an asynchronous function - * call through an Observable sequence. - * - * @param func the function to convert - * @param scheduler the scheduler used to call the {@code func} - * - * @return a function which returns an observable sequence which - * executes the {@code func} and emits its returned value. - * - * @see MSDN: Observable.ToAsync - */ - public static Func8> toAsync(final Func8 func, final Scheduler scheduler) { - return new Func8>() { - @Override - public Observable call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5, final T6 t6, final T7 t7, final T8 t8) { - final AsyncSubject subject = AsyncSubject.create(); - scheduler.schedule(new Action0() { - @Override - public void call() { - R result; - try { - result = func.call(t1, t2, t3, t4, t5, t6, t7, t8); - } catch (Throwable t) { - subject.onError(t); - return; - } - subject.onNext(result); - subject.onCompleted(); - } - }); - return subject; - } - }; - } - /** - * Convert a synchronous action call into an asynchronous function - * call through an Observable sequence. - * - * @param action the action to convert - * @param scheduler the scheduler used to execute the {@code action} - * - * @return a function which returns an observable sequence which - * executes the {@code action} and emits {@code null}. - * - * @see MSDN: Observable.ToAsync - */ - public static Func9> toAsync(final Action9 action, final Scheduler scheduler) { - return toAsync(Actions.toFunc(action), scheduler); - } - /** - * Convert a synchronous function call into an asynchronous function - * call through an Observable sequence. - * - * @param func the function to convert - * @param scheduler the scheduler used to call the {@code func} - * - * @return a function which returns an observable sequence which - * executes the {@code func} and emits its returned value. - * - * @see MSDN: Observable.ToAsync - */ - public static Func9> toAsync(final Func9 func, final Scheduler scheduler) { - return new Func9>() { - @Override - public Observable call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5, final T6 t6, final T7 t7, final T8 t8, final T9 t9) { - final AsyncSubject subject = AsyncSubject.create(); - scheduler.schedule(new Action0() { - @Override - public void call() { - R result; - try { - result = func.call(t1, t2, t3, t4, t5, t6, t7, t8, t9); - } catch (Throwable t) { - subject.onError(t); - return; - } - subject.onNext(result); - subject.onCompleted(); - } - }); - return subject; - } - }; - } - /** - * Convert a synchronous action call into an asynchronous function - * call through an Observable sequence. - * - * @param action the action to convert - * @param scheduler the scheduler used to execute the {@code action} - * - * @return a function which returns an observable sequence which - * executes the {@code action} and emits {@code null}. - * - */ - public static FuncN> toAsync(final ActionN action, final Scheduler scheduler) { - return toAsync(Actions.toFunc(action), scheduler); - } - /** - * Convert a synchronous function call into an asynchronous function - * call through an Observable sequence. - * - * @param func the function to convert - * @param scheduler the scheduler used to call the {@code func} - * - * @return a function which returns an observable sequence which - * executes the {@code func} and emits its returned value. - * - */ - public static FuncN> toAsync(final FuncN func, final Scheduler scheduler) { - return new FuncN>() { - @Override - public Observable call(final Object... args) { - final AsyncSubject subject = AsyncSubject.create(); - scheduler.schedule(new Action0() { - @Override - public void call() { - R result; - try { - result = func.call(args); - } catch (Throwable t) { - subject.onError(t); - return; - } - subject.onNext(result); - subject.onCompleted(); - } - }); - return subject; - } - }; - } - /** - * Convert a synchronous action call into an asynchronous function - * call through an Observable sequence. - *

- * Alias for toAsync(ActionN) intended for dynamic languages. - * - * @param action the action to convert - * - * @return a function which returns an observable sequence which - * executes the {@code action} and emits {@code null}. - * - */ - public static FuncN> asyncAction(final ActionN action) { - return toAsync(action); - } - /** - * Convert a synchronous action call into an asynchronous function - * call through an Observable sequence. - *

- * Alias for toAsync(ActionN, Scheduler) intended for dynamic languages. - * - * @param action the action to convert - * @param scheduler the scheduler used to execute the {@code action} - * - * @return a function which returns an observable sequence which - * executes the {@code action} and emits {@code null}. - * - */ - public static FuncN> asyncAction(final ActionN action, final Scheduler scheduler) { - return toAsync(action, scheduler); - } - /** - * Convert a synchronous function call into an asynchronous function - * call through an Observable sequence. - *

- * Alias for toAsync(FuncN) intended for dynamic languages. - * - * @param func the function to convert - * - * @return a function which returns an observable sequence which - * executes the {@code func} and emits its returned value. - * - */ - public static FuncN> asyncFunc(final FuncN func) { - return toAsync(func); - } - /** - * Convert a synchronous function call into an asynchronous function - * call through an Observable sequence. - *

- * Alias for toAsync(FuncN, Scheduler) intended for dynamic languages. - * - * @param func the function to convert - * @param scheduler the scheduler used to call the {@code func} - * - * @return a function which returns an observable sequence which - * executes the {@code func} and emits its returned value. - * - */ - public static FuncN> asyncFunc(final FuncN func, final Scheduler scheduler) { - return toAsync(func, scheduler); - } -} diff --git a/rxjava-core/src/test/java/rx/ObservableTests.java b/rxjava-core/src/test/java/rx/ObservableTests.java index 22e0f9cd60..fb3b95e7db 100644 --- a/rxjava-core/src/test/java/rx/ObservableTests.java +++ b/rxjava-core/src/test/java/rx/ObservableTests.java @@ -969,129 +969,6 @@ public void testRangeWithScheduler() { inOrder.verify(aObserver, times(1)).onCompleted(); inOrder.verifyNoMoreInteractions(); } - - @Test - public void testStartWithFunc() { - Func0 func = new Func0() { - @Override - public String call() { - return "one"; - } - }; - assertEquals("one", Observable.start(func).toBlockingObservable().single()); - } - - @Test(expected = RuntimeException.class) - public void testStartWithFuncError() { - Func0 func = new Func0() { - @Override - public String call() { - throw new RuntimeException("Some error"); - } - }; - Observable.start(func).toBlockingObservable().single(); - } - - @Test - public void testStartWhenSubscribeRunBeforeFunc() { - TestScheduler scheduler = new TestScheduler(); - - Func0 func = new Func0() { - @Override - public String call() { - return "one"; - } - }; - - Observable observable = Observable.start(func, scheduler); - - @SuppressWarnings("unchecked") - Observer observer = mock(Observer.class); - observable.subscribe(observer); - - InOrder inOrder = inOrder(observer); - inOrder.verifyNoMoreInteractions(); - - // Run func - scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); - - inOrder.verify(observer, times(1)).onNext("one"); - inOrder.verify(observer, times(1)).onCompleted(); - inOrder.verifyNoMoreInteractions(); - } - - @Test - public void testStartWhenSubscribeRunAfterFunc() { - TestScheduler scheduler = new TestScheduler(); - - Func0 func = new Func0() { - @Override - public String call() { - return "one"; - } - }; - - Observable observable = Observable.start(func, scheduler); - - // Run func - scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); - - @SuppressWarnings("unchecked") - Observer observer = mock(Observer.class); - observable.subscribe(observer); - - InOrder inOrder = inOrder(observer); - inOrder.verify(observer, times(1)).onNext("one"); - inOrder.verify(observer, times(1)).onCompleted(); - inOrder.verifyNoMoreInteractions(); - } - - @Test - public void testStartWithFuncAndMultipleObservers() { - TestScheduler scheduler = new TestScheduler(); - - @SuppressWarnings("unchecked") - Func0 func = (Func0) mock(Func0.class); - doAnswer(new Answer() { - @Override - public String answer(InvocationOnMock invocation) throws Throwable { - return "one"; - } - }).when(func).call(); - - Observable observable = Observable.start(func, scheduler); - - scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); - - @SuppressWarnings("unchecked") - Observer observer1 = mock(Observer.class); - @SuppressWarnings("unchecked") - Observer observer2 = mock(Observer.class); - @SuppressWarnings("unchecked") - Observer observer3 = mock(Observer.class); - - observable.subscribe(observer1); - observable.subscribe(observer2); - observable.subscribe(observer3); - - InOrder inOrder; - inOrder = inOrder(observer1); - inOrder.verify(observer1, times(1)).onNext("one"); - inOrder.verify(observer1, times(1)).onCompleted(); - inOrder.verifyNoMoreInteractions(); - - inOrder = inOrder(observer2); - inOrder.verify(observer2, times(1)).onNext("one"); - inOrder.verify(observer2, times(1)).onCompleted(); - inOrder.verifyNoMoreInteractions(); - - inOrder = inOrder(observer3); - inOrder.verify(observer3, times(1)).onNext("one"); - inOrder.verify(observer3, times(1)).onCompleted(); - inOrder.verifyNoMoreInteractions(); - - verify(func, times(1)).call(); - } @Test public void testCollectToList() { diff --git a/settings.gradle b/settings.gradle index 176e9150c5..c40f29adb6 100644 --- a/settings.gradle +++ b/settings.gradle @@ -8,4 +8,5 @@ include 'rxjava-core', \ 'rxjava-contrib:rxjava-swing', \ 'rxjava-contrib:rxjava-android', \ 'rxjava-contrib:rxjava-apache-http', \ -'rxjava-contrib:rxjava-string' +'rxjava-contrib:rxjava-string', \ +'rxjava-contrib:rxjava-async-util'