Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the 'Start' operator #594

Merged
merged 3 commits into from
Dec 11, 2013
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Remove the Action0 overloads
zsxwing committed Dec 11, 2013
commit c799e525680e95744a129b427df16791bfad02a1
37 changes: 0 additions & 37 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
@@ -6271,43 +6271,6 @@ public <TKey, TValue, TDuration> Observable<GroupedObservable<TKey, TValue>> gro
return create(new OperationGroupByUntil<T, TKey, TValue, TDuration>(this, keySelector, valueSelector, durationSelector));
}

/**
* Invokes the action asynchronously, surfacing the result through an observable sequence.
* <p>
* Note: The action is called immediately, not during the subscription of the resulting
* sequence. Multiple subscriptions to the resulting sequence can observe the
* action's outcome.
*
* @param action
* Action to run asynchronously.
* @return An observable sequence exposing a null value upon completion of the action,
* or an exception.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229265(v=vs.103).aspx">MSDN: Observable.Start</a>
*/
public static Observable<Void> start(Action0 action) {
return Async.toAsync(action).call();
}

/**
* Invokes the action asynchronously on the specified scheduler, surfacing the
* result through an observable sequence.
* <p>
* Note: The action is called immediately, not during the subscription of the resulting
* sequence. Multiple subscriptions to the resulting sequence can observe the
* action's outcome.
*
* @param action
* Action to run asynchronously.
* @param scheduler
* Scheduler to run the function on.
* @return An observable sequence exposing a null value upon completion of the action,
* or an exception.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211971(v=vs.103).aspx">MSDN: Observable.Start</a>
*/
public static Observable<Void> start(Action0 action, Scheduler scheduler) {
return Async.toAsync(action, scheduler).call();
}

/**
* Invokes the specified function asynchronously, surfacing the result through an observable sequence.
* <p>
101 changes: 0 additions & 101 deletions rxjava-core/src/test/java/rx/ObservableTests.java
Original file line number Diff line number Diff line change
@@ -951,107 +951,6 @@ public void testRangeWithScheduler() {
inOrder.verifyNoMoreInteractions();
}

@Test
public void testStartWithAction() {
Action0 action = mock(Action0.class);
assertEquals(null, Observable.start(action).toBlockingObservable().single());
}

@Test(expected = RuntimeException.class)
public void testStartWithActionError() {
Action0 action = new Action0() {
@Override
public void call() {
throw new RuntimeException("Some error");
}
};
Observable.start(action).toBlockingObservable().single();
}

@Test
public void testStartWhenSubscribeRunBeforeAction() {
TestScheduler scheduler = new TestScheduler();

Action0 action = mock(Action0.class);

Observable<Void> observable = Observable.start(action, scheduler);

@SuppressWarnings("unchecked")
Observer<Void> observer = mock(Observer.class);
observable.subscribe(observer);

InOrder inOrder = inOrder(observer);
inOrder.verifyNoMoreInteractions();

// Run action
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);

inOrder.verify(observer, times(1)).onNext(null);
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}

@Test
public void testStartWhenSubscribeRunAfterAction() {
TestScheduler scheduler = new TestScheduler();

Action0 action = mock(Action0.class);

Observable<Void> observable = Observable.start(action, scheduler);

// Run action
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);

@SuppressWarnings("unchecked")
Observer<Void> observer = mock(Observer.class);
observable.subscribe(observer);

InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onNext(null);
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}

@Test
public void testStartWithActionAndMultipleObservers() {
TestScheduler scheduler = new TestScheduler();

Action0 action = mock(Action0.class);

Observable<Void> observable = Observable.start(action, scheduler);

scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);

@SuppressWarnings("unchecked")
Observer<Void> observer1 = mock(Observer.class);
@SuppressWarnings("unchecked")
Observer<Void> observer2 = mock(Observer.class);
@SuppressWarnings("unchecked")
Observer<Void> observer3 = mock(Observer.class);

observable.subscribe(observer1);
observable.subscribe(observer2);
observable.subscribe(observer3);

InOrder inOrder;
inOrder = inOrder(observer1);
inOrder.verify(observer1, times(1)).onNext(null);
inOrder.verify(observer1, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();

inOrder = inOrder(observer2);
inOrder.verify(observer2, times(1)).onNext(null);
inOrder.verify(observer2, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();

inOrder = inOrder(observer3);
inOrder.verify(observer3, times(1)).onNext(null);
inOrder.verify(observer3, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();

verify(action, times(1)).call();
}

@Test
public void testStartWithFunc() {
Func0<String> func = new Func0<String>() {