Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
dkhalanskyjb committed Apr 16, 2021
1 parent 51b5ff7 commit 388fb24
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 4 deletions.
3 changes: 2 additions & 1 deletion reactive/kotlinx-coroutines-reactor/src/Mono.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package kotlinx.coroutines.reactor

import kotlinx.coroutines.*
import kotlinx.coroutines.reactive.*
import org.reactivestreams.*
import reactor.core.*
import reactor.core.publisher.*
Expand Down Expand Up @@ -42,7 +43,7 @@ public fun <T> mono(
* function immediately cancels its [Subscription] and resumes with [CancellationException].
*/
public suspend fun <T> Mono<T>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
subscribe(object : Subscriber<T> {
injectCoroutineContext(cont.context).subscribe(object : Subscriber<T> {
private var seenValue = false

override fun onSubscribe(s: Subscription) {
Expand Down
6 changes: 3 additions & 3 deletions reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ class FluxSingleTest : TestBase() {
@Test
fun testAwaitSingleOrNullException() {
val flux = flux {
send((Flux.just("!", "#").awaitSingleOrNull() ?: "O") + "K")
send((Flux.just("O", "#").awaitSingleOrNull() ?: "!") + "K")
}

checkSingleValue(flux) {
assertEquals("OK", it)
checkErroneous(flux) {
assert(it is IllegalArgumentException)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ public final class kotlinx/coroutines/rx2/RxAwaitKt {
public static final fun awaitFirstOrNull (Lio/reactivex/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitLast (Lio/reactivex/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitOrDefault (Lio/reactivex/MaybeSource;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitSingle (Lio/reactivex/MaybeSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitSingle (Lio/reactivex/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitSingleOrNull (Lio/reactivex/MaybeSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/rx2/RxChannelKt {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ public final class kotlinx/coroutines/rx3/RxAwaitKt {
public static final fun awaitFirstOrNull (Lio/reactivex/rxjava3/core/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitLast (Lio/reactivex/rxjava3/core/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitOrDefault (Lio/reactivex/rxjava3/core/MaybeSource;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitSingle (Lio/reactivex/rxjava3/core/MaybeSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitSingle (Lio/reactivex/rxjava3/core/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitSingleOrNull (Lio/reactivex/rxjava3/core/MaybeSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/rx3/RxChannelKt {
Expand Down

0 comments on commit 388fb24

Please sign in to comment.