Skip to content

Commit

Permalink
Implements BlockingSingle
Browse files Browse the repository at this point in the history
This commit adds BlockingSingle, the blocking version of rx.Single.

BlockingSingle has the following methods:

i `from(Single)` -- factory method for creating a `BlockingSingle` from a
`Single`
- `get()` -- returns the value emitted from the Single
- `get(Func1<T,Boolean> predicate)` -- returns the value if it matches
  the provided predicate
- `toFuture()` -- returns a `java.util.concurrent.Future`

Adds Single.toBlocking
  • Loading branch information
hyleung committed Oct 15, 2015
1 parent 563fc3f commit d80d70a
Show file tree
Hide file tree
Showing 7 changed files with 375 additions and 22 deletions.
16 changes: 16 additions & 0 deletions src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import rx.internal.operators.OperatorTimeout;
import rx.internal.operators.OperatorZip;
import rx.internal.producers.SingleDelayedProducer;
import rx.singles.BlockingSingle;
import rx.observers.SafeSubscriber;
import rx.plugins.RxJavaObservableExecutionHook;
import rx.plugins.RxJavaPlugins;
Expand Down Expand Up @@ -1801,6 +1802,21 @@ public final Single<T> timeout(long timeout, TimeUnit timeUnit, Single<? extends
return lift(new OperatorTimeout<T>(timeout, timeUnit, asObservable(other), scheduler));
}

/**
* Converts a Single into a {@link BlockingSingle} (a Single with blocking operators).
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code toBlocking} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return a {@code BlockingSingle} version of this Single.
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX operators documentation: To</a>
*/
@Experimental
public final BlockingSingle<T> toBlocking() {
return BlockingSingle.from(this);
}

/**
* Returns a Single that emits the result of applying a specified function to the pair of items emitted by
* the source Single and another specified Single.
Expand Down
56 changes: 56 additions & 0 deletions src/main/java/rx/internal/util/BlockingUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package rx.internal.util;

import rx.Subscription;

import java.util.concurrent.CountDownLatch;

/**
* Utility functions relating to blocking types.
* <p/>
* Not intended to be part of the public API.
*/
public final class BlockingUtils {

private BlockingUtils() { }

/**
* Blocks and waits for a {@link Subscription} to complete.
*
* @param latch a CountDownLatch
* @param subscription the Subscription to wait on.
*/
public static void awaitForComplete(CountDownLatch latch, Subscription subscription) {
if (latch.getCount() == 0) {
// Synchronous observable completes before awaiting for it.
// Skip await so InterruptedException will never be thrown.
return;
}
// block until the subscription completes and then return
try {
latch.await();
} catch (InterruptedException e) {
subscription.unsubscribe();
// set the interrupted flag again so callers can still get it
// for more information see https://github.com/ReactiveX/RxJava/pull/147#issuecomment-13624780
Thread.currentThread().interrupt();
// using Runtime so it is not checked
throw new RuntimeException("Interrupted while waiting for subscription to complete.", e);
}
}
}
26 changes: 4 additions & 22 deletions src/main/java/rx/observables/BlockingObservable.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import rx.exceptions.OnErrorNotImplementedException;
import rx.functions.*;
import rx.internal.operators.*;
import rx.internal.util.BlockingUtils;
import rx.internal.util.UtilityFunctions;
import rx.subscriptions.Subscriptions;

Expand Down Expand Up @@ -123,7 +124,7 @@ public void onNext(T args) {
onNext.call(args);
}
});
awaitForComplete(latch, subscription);
BlockingUtils.awaitForComplete(latch, subscription);

if (exceptionFromOnError.get() != null) {
if (exceptionFromOnError.get() instanceof RuntimeException) {
Expand Down Expand Up @@ -446,7 +447,7 @@ public void onNext(final T item) {
returnItem.set(item);
}
});
awaitForComplete(latch, subscription);
BlockingUtils.awaitForComplete(latch, subscription);

if (returnException.get() != null) {
if (returnException.get() instanceof RuntimeException) {
Expand All @@ -458,25 +459,6 @@ public void onNext(final T item) {

return returnItem.get();
}

private void awaitForComplete(CountDownLatch latch, Subscription subscription) {
if (latch.getCount() == 0) {
// Synchronous observable completes before awaiting for it.
// Skip await so InterruptedException will never be thrown.
return;
}
// block until the subscription completes and then return
try {
latch.await();
} catch (InterruptedException e) {
subscription.unsubscribe();
// set the interrupted flag again so callers can still get it
// for more information see https://github.com/ReactiveX/RxJava/pull/147#issuecomment-13624780
Thread.currentThread().interrupt();
// using Runtime so it is not checked
throw new RuntimeException("Interrupted while waiting for subscription to complete.", e);
}
}

/**
* Runs the source observable to a terminal event, ignoring any values and rethrowing any exception.
Expand All @@ -502,7 +484,7 @@ public void onCompleted() {
}
});

awaitForComplete(cdl, s);
BlockingUtils.awaitForComplete(cdl, s);
Throwable e = error[0];
if (e != null) {
if (e instanceof RuntimeException) {
Expand Down
103 changes: 103 additions & 0 deletions src/main/java/rx/singles/BlockingSingle.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package rx.singles;

import rx.Single;
import rx.SingleSubscriber;
import rx.Subscription;
import rx.annotations.Experimental;
import rx.internal.operators.BlockingOperatorToFuture;
import rx.internal.util.BlockingUtils;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;

/**
* {@code BlockingSingle} is a blocking "version" of {@link Single} that provides blocking
* operators.
* <p/>
* You construct a {@code BlockingSingle} from a {@code Single} with {@link #from(Single)}
* or {@link Single#toBlocking()}.
*/
@Experimental
public class BlockingSingle<T> {
private final Single<? extends T> single;

private BlockingSingle(Single<? extends T> single) {
this.single = single;
}

/**
* Converts a {@link Single} into a {@code BlockingSingle}.
*
* @param single the {@link Single} you want to convert
* @return a {@code BlockingSingle} version of {@code single}
*/
public static <T> BlockingSingle<T> from(Single<? extends T> single) {
return new BlockingSingle<T>(single);
}

/**
* Returns the item emitted by this {@code BlockingSingle}.
* <p/>
* If the underlying {@link Single} returns successfully, the value emitted
* by the {@link Single} is returned. If the {@link Single} emits an error,
* the throwable emitted ({@link SingleSubscriber#onError(Throwable)}) is
* thrown.
*
* @return the value emitted by this {@code BlockingSingle}
*/
public T value() {
final AtomicReference<T> returnItem = new AtomicReference<T>();
final AtomicReference<Throwable> returnException = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
Subscription subscription = single.subscribe(new SingleSubscriber<T>() {
@Override
public void onSuccess(T value) {
returnItem.set(value);
latch.countDown();
}

@Override
public void onError(Throwable error) {
returnException.set(error);
latch.countDown();
}
});

BlockingUtils.awaitForComplete(latch, subscription);
Throwable throwable = returnException.get();
if (throwable != null) {
if (throwable instanceof RuntimeException) {
throw (RuntimeException) throwable;
}
throw new RuntimeException(throwable);
}
return returnItem.get();
}

/**
* Returns a {@link Future} representing the value emitted by this {@code BlockingSingle}.
*
* @return a {@link Future} that returns the value
*/
public Future<T> toFuture() {
return BlockingOperatorToFuture.toFuture(single.toObservable());
}
}

11 changes: 11 additions & 0 deletions src/test/java/rx/SingleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,13 @@
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.singles.BlockingSingle;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
import rx.subscriptions.Subscriptions;

import static org.junit.Assert.*;

public class SingleTest {

@Test
Expand Down Expand Up @@ -258,6 +261,14 @@ public void call(SingleSubscriber<? super String> s) {
ts.assertValue("hello");
}

@Test
public void testToBlocking() {
Single<String> s = Single.just("one");
BlockingSingle<String> blocking = s.toBlocking();
assertNotNull(blocking);
assertEquals("one", blocking.value());
}

@Test
public void testUnsubscribe() throws InterruptedException {
TestSubscriber<String> ts = new TestSubscriber<String>();
Expand Down
105 changes: 105 additions & 0 deletions src/test/java/rx/internal/util/BlockingUtilsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package rx.internal.util;

import static org.mockito.Mockito.*;
import static org.junit.Assert.*;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Test;

import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.schedulers.Schedulers;

/**
* Test suite for {@link BlockingUtils}.
*/
public class BlockingUtilsTest {
@Test
public void awaitCompleteShouldReturnIfCountIsZero() {
Subscription subscription = mock(Subscription.class);
CountDownLatch latch = new CountDownLatch(0);
BlockingUtils.awaitForComplete(latch, subscription);
verifyZeroInteractions(subscription);
}

@Test
public void awaitCompleteShouldReturnOnEmpty() {
final CountDownLatch latch = new CountDownLatch(1);
Subscriber<Object> subscription = createSubscription(latch);
Observable<Object> observable = Observable.empty().subscribeOn(Schedulers.newThread());
observable.subscribe(subscription);
BlockingUtils.awaitForComplete(latch, subscription);
}

@Test
public void awaitCompleteShouldReturnOnError() {
final CountDownLatch latch = new CountDownLatch(1);
Subscriber<Object> subscription = createSubscription(latch);
Observable<Object> observable = Observable.error(new RuntimeException()).subscribeOn(Schedulers.newThread());
observable.subscribe(subscription);
BlockingUtils.awaitForComplete(latch, subscription);
}

@Test
public void shouldThrowRuntimeExceptionOnThreadInterrupted() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final Subscription subscription = mock(Subscription.class);
final AtomicReference<Exception> caught = new AtomicReference<Exception>();
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
Thread.currentThread().interrupt();
try {
BlockingUtils.awaitForComplete(latch, subscription);
} catch (RuntimeException e) {
caught.set(e);
}
}
});
thread.run();
verify(subscription).unsubscribe();
Exception actual = caught.get();
assertNotNull(actual);
assertNotNull(actual.getCause());
assertTrue(actual.getCause() instanceof InterruptedException);
}


private static <T> Subscriber<T> createSubscription(final CountDownLatch latch) {
return new Subscriber<T>() {
@Override
public void onNext(T t) {
//no-oop
}

@Override
public void onError(Throwable e) {
latch.countDown();
}

@Override
public void onCompleted() {
latch.countDown();
}
};
}
}
Loading

0 comments on commit d80d70a

Please sign in to comment.