Skip to content

Commit

Permalink
Introduce Mono<T>.singleOrNull
Browse files Browse the repository at this point in the history
  • Loading branch information
dkhalanskyjb committed Apr 16, 2021
1 parent a01d0bd commit 51b5ff7
Show file tree
Hide file tree
Showing 7 changed files with 202 additions and 24 deletions.
9 changes: 6 additions & 3 deletions reactive/kotlinx-coroutines-reactive/src/Await.kt
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
* @throws IllegalArgumentException if the publisher emits more than one value
*/
@Deprecated(
message = "Deprecated without a replacement due to its name incorrectly conveying the behavior",
message = "Deprecated without a replacement due to its name incorrectly conveying the behavior. " +
"Please consider using awaitFirstOrDefault().",
level = DeprecationLevel.WARNING
)
public suspend fun <T> Publisher<T>.awaitSingleOrDefault(default: T): T = awaitOne(Mode.SINGLE_OR_DEFAULT, default)
Expand All @@ -112,7 +113,8 @@ public suspend fun <T> Publisher<T>.awaitSingleOrDefault(default: T): T = awaitO
*/
@Deprecated(
message = "Deprecated without a replacement due to its name incorrectly conveying the behavior. " +
"There is a specialized version for Reactor's Mono, please use that where applicable.",
"There is a specialized version for Reactor's Mono, please use that where applicable. " +
"Alternatively, please consider using awaitFirstOrNull().",
level = DeprecationLevel.WARNING,
replaceWith = ReplaceWith("this.awaitSingleOrNull()", "kotlinx.coroutines.reactor")
)
Expand All @@ -130,7 +132,8 @@ public suspend fun <T> Publisher<T>.awaitSingleOrNull(): T? = awaitOne(Mode.SING
* @throws IllegalArgumentException if the publisher emits more than one value
*/
@Deprecated(
message = "Deprecated without a replacement due to its name incorrectly conveying the behavior",
message = "Deprecated without a replacement due to its name incorrectly conveying the behavior. " +
"Please consider using awaitFirstOrElse().",
level = DeprecationLevel.WARNING
)
public suspend fun <T> Publisher<T>.awaitSingleOrElse(defaultValue: () -> T): T =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ public final class kotlinx/coroutines/reactor/FluxKt {
}

public final class kotlinx/coroutines/reactor/MonoKt {
public static final fun awaitFirst (Lreactor/core/publisher/Mono;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitFirstOrDefault (Lreactor/core/publisher/Mono;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitFirstOrElse (Lreactor/core/publisher/Mono;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitFirstOrNull (Lreactor/core/publisher/Mono;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitLast (Lreactor/core/publisher/Mono;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitSingle (Lreactor/core/publisher/Mono;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitSingleOrNull (Lreactor/core/publisher/Mono;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun mono (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Mono;
public static final synthetic fun mono (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Mono;
public static synthetic fun mono$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lreactor/core/publisher/Mono;
Expand Down
131 changes: 129 additions & 2 deletions reactive/kotlinx-coroutines-reactor/src/Mono.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@
package kotlinx.coroutines.reactor

import kotlinx.coroutines.*
import kotlinx.coroutines.reactive.*
import org.reactivestreams.*
import reactor.core.*
import reactor.core.publisher.*
import kotlin.coroutines.*
import kotlin.internal.*

/**
* Creates a cold [mono][Mono] that runs a given [block] in a coroutine and emits its result.
Expand All @@ -34,6 +32,50 @@ public fun <T> mono(
return monoInternal(GlobalScope, context, block)
}

/**
* Awaits the single value from the given [Mono] without blocking the thread and returns the resulting value, or, if
* this publisher has produced an error, throws the corresponding exception. If the Mono completed without a value,
* `null` is returned.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately cancels its [Subscription] and resumes with [CancellationException].
*/
public suspend fun <T> Mono<T>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
subscribe(object : Subscriber<T> {
private var seenValue = false

override fun onSubscribe(s: Subscription) {
cont.invokeOnCancellation { s.cancel() }
s.request(Long.MAX_VALUE)
}

override fun onComplete() {
if (!seenValue)
cont.resume(null)
}

override fun onNext(t: T) {
seenValue = true
cont.resume(t)
}

override fun onError(error: Throwable) { cont.resumeWithException(error) }
})
}

/**
* Awaits the single value from the given [Mono] without blocking the thread and returns the resulting value, or,
* if this Mono has produced an error, throws the corresponding exception.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately cancels its [Subscription] and resumes with [CancellationException].
*
* @throws NoSuchElementException if the Mono does not emit any value
*/
public suspend fun <T> Mono<T>.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException()

private fun <T> monoInternal(
scope: CoroutineScope, // support for legacy mono in scope
context: CoroutineContext,
Expand Down Expand Up @@ -89,3 +131,88 @@ public fun <T> CoroutineScope.mono(
block: suspend CoroutineScope.() -> T?
): Mono<T> = monoInternal(this, context, block)

/**
* Awaits the first value from the given publisher without blocking the thread and returns the resulting value, or, if
* the publisher has produced an error, throws the corresponding exception.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately cancels its [Subscription] and resumes with [CancellationException].
*
* @throws NoSuchElementException if the publisher does not emit any value
*/
@Deprecated(
message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " +
"Please use awaitSingle() instead.",
level = DeprecationLevel.WARNING,
replaceWith = ReplaceWith("this.awaitSingle()")
)
public suspend fun <T> Mono<T>.awaitFirst(): T = awaitSingle()

/**
* Awaits the first value from the given publisher, or returns the [default] value if none is emitted, without blocking
* the thread, and returns the resulting value, or, if this publisher has produced an error, throws the corresponding
* exception.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately cancels its [Subscription] and resumes with [CancellationException].
*/
@Deprecated(
message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " +
"Please use awaitSingleOrNull() instead.",
level = DeprecationLevel.WARNING,
replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default")
)
public suspend fun <T> Mono<T>.awaitFirstOrDefault(default: T): T = awaitSingleOrNull() ?: default

/**
* Awaits the first value from the given publisher, or returns `null` if none is emitted, without blocking the thread,
* and returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately cancels its [Subscription] and resumes with [CancellationException].
*/
@Deprecated(
message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " +
"Please use awaitSingleOrNull() instead.",
level = DeprecationLevel.WARNING,
replaceWith = ReplaceWith("this.awaitSingleOrNull()")
)
public suspend fun <T> Mono<T>.awaitFirstOrNull(): T? = awaitSingleOrNull()

/**
* Awaits the first value from the given publisher, or calls [defaultValue] to get a value if none is emitted, without
* blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the
* corresponding exception.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately cancels its [Subscription] and resumes with [CancellationException].
*/
@Deprecated(
message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " +
"Please use awaitSingleOrNull() instead.",
level = DeprecationLevel.WARNING,
replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: defaultValue()")
)
public suspend fun <T> Mono<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitSingleOrNull() ?: defaultValue()

/**
* Awaits the last value from the given publisher without blocking the thread and
* returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately cancels its [Subscription] and resumes with [CancellationException].
*
* @throws NoSuchElementException if the publisher does not emit any value
*/
@Deprecated(
message = "Mono produces at most one value, so the last element is the same as the first. " +
"Please use awaitSingle() instead.",
level = DeprecationLevel.WARNING,
replaceWith = ReplaceWith("this.awaitSingle()")
)
public suspend fun <T> Mono<T>.awaitLast(): T = awaitSingle()
4 changes: 2 additions & 2 deletions reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import kotlinx.coroutines.reactive.*
*
* This context element is implicitly propagated through subscriber's context by all Reactive integrations, such as [mono], [flux],
* [Publisher.asFlow][asFlow], [Flow.asPublisher][asPublisher] and [Flow.asFlux][asFlux].
* Functions that subscribe to the reactive stream (e.g. [Publisher.awaitFirst][awaitFirst]) also propagate [ReactorContext] to the
* subscriber's [Context].
* Functions that subscribe to the reactive stream (e.g. [Publisher.awaitFirst][kotlinx.coroutines.reactive.awaitFirst])
* also propagate the [ReactorContext] to the subscriber's [Context].
**
* ### Examples of Reactive context integration.
*
Expand Down
4 changes: 2 additions & 2 deletions reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import kotlin.test.*
class FlowAsFluxTest : TestBase() {
@Test
fun testFlowAsFluxContextPropagation() {
val flux = flow<String> {
(1..4).forEach { i -> emit(createMono(i).awaitFirst()) }
val flux = flow {
(1..4).forEach { i -> emit(createMono(i).awaitSingle()) }
}
.asFlux()
.contextWrite(Context.of(1, "1"))
Expand Down
59 changes: 52 additions & 7 deletions reactive/kotlinx-coroutines-reactor/test/MonoTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import org.junit.Test
import org.reactivestreams.*
import reactor.core.publisher.*
import reactor.util.context.*
import java.time.*
import java.time.Duration.*
import java.util.function.*
import kotlin.test.*
Expand Down Expand Up @@ -115,6 +114,52 @@ class MonoTest : TestBase() {
@Test
fun testMonoAwait() = runBlocking {
assertEquals("OK", Mono.just("O").awaitSingle() + "K")
assertEquals("OK", Mono.just("O").awaitSingleOrNull() + "K")
assertFailsWith<NoSuchElementException>{ Mono.empty<String>().awaitSingle() }
assertNull(Mono.empty<Int>().awaitSingleOrNull())
}

/** Tests that the versions of the await methods specialized for Mono for deprecation behave correctly and we don't
* break any code by introducing them. */
@Test
@Suppress("DEPRECATION")
fun testDeprecatedAwaitMethods() = runBlocking {
val filledMono = mono<String> { "OK" }
assertEquals("OK", filledMono.awaitFirst())
assertEquals("OK", filledMono.awaitFirstOrDefault("!"))
assertEquals("OK", filledMono.awaitFirstOrNull())
assertEquals("OK", filledMono.awaitFirstOrElse { "ELSE" })
assertEquals("OK", filledMono.awaitLast())
assertEquals("OK", filledMono.awaitSingleOrDefault("!"))
assertEquals("OK", filledMono.awaitSingleOrElse { "ELSE" })
val emptyMono = mono<String> { null }
assertFailsWith<NoSuchElementException> { emptyMono.awaitFirst() }
assertEquals("OK", emptyMono.awaitFirstOrDefault("OK"))
assertNull(emptyMono.awaitFirstOrNull())
assertEquals("ELSE", emptyMono.awaitFirstOrElse { "ELSE" })
assertFailsWith<NoSuchElementException> { emptyMono.awaitLast() }
assertEquals("OK", emptyMono.awaitSingleOrDefault("OK"))
assertEquals("ELSE", emptyMono.awaitSingleOrElse { "ELSE" })
}

/** Tests that calls to [awaitSingleOrNull] (and, thus, to the rest of such functions) throw [CancellationException]
* and unsubscribe from the publisher when their [Job] is cancelled. */
@Test
fun testAwaitCancellation() = runTest {
expect(1)
val mono = mono { delay(Long.MAX_VALUE) }.doOnSubscribe { expect(3) }.doOnCancel { expect(5) }
val job = launch(start = CoroutineStart.UNDISPATCHED) {
try {
expect(2)
mono.awaitSingleOrNull()
} catch (e: CancellationException) {
expect(6)
throw e
}
}
expect(4)
job.cancelAndJoin()
finish(7)
}

@Test
Expand Down Expand Up @@ -264,7 +309,7 @@ class MonoTest : TestBase() {
.interval(ofMillis(1))
.switchMap {
mono(coroutineContext) {
timeBomb().awaitFirst()
timeBomb().awaitSingle()
}
}
.onErrorReturn({
Expand All @@ -275,14 +320,14 @@ class MonoTest : TestBase() {
finish(2)
}

private fun timeBomb() = Mono.delay(Duration.ofMillis(1)).doOnSuccess { throw Exception("something went wrong") }
private fun timeBomb() = Mono.delay(ofMillis(1)).doOnSuccess { throw Exception("something went wrong") }

@Test
fun testLeakedException() = runBlocking {
// Test exception is not reported to global handler
val flow = mono<Unit> { throw TestException() }.toFlux().asFlow()
repeat(10000) {
combine(flow, flow) { _, _ -> Unit }
combine(flow, flow) { _, _ -> }
.catch {}
.collect { }
}
Expand Down Expand Up @@ -373,13 +418,13 @@ class MonoTest : TestBase() {
Hooks.resetOnOperatorError("testDownstreamCancellationDoesNotThrow")
}

/** Run the given [Publisher], cancel it, wait for the cancellation handler to finish, and return only then.
/** Run the given [Mono], cancel it, wait for the cancellation handler to finish, and return only then.
*
* Will not work in the general case, but here, when the publisher uses [Dispatchers.Unconfined], this seems to
* ensure that the cancellation handler will have nowhere to execute but serially with the cancellation. */
private suspend fun <T> Publisher<T>.awaitCancelAndJoin() = coroutineScope {
private suspend fun <T> Mono<T>.awaitCancelAndJoin() = coroutineScope {
async(start = CoroutineStart.UNDISPATCHED) {
awaitFirstOrNull()
awaitSingleOrNull()
}.cancelAndJoin()
}
}
12 changes: 4 additions & 8 deletions reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class ReactorContextTest : TestBase() {
}
} .contextWrite(Context.of(2, "2", 3, "3", 4, "4", 5, "5"))
.contextWrite { ctx -> ctx.put(6, "6") }
assertEquals(mono.awaitFirst(), "1234567")
assertEquals(mono.awaitSingle(), "1234567")
}

@Test
Expand All @@ -43,22 +43,18 @@ class ReactorContextTest : TestBase() {
(1..3).forEach { append(ctx.getOrDefault(it, "noValue")) }
}
} .contextWrite(Context.of(2, "2"))
.awaitFirst()
.awaitSingle()
assertEquals(result, "123")
}

@Test
fun testMonoAwaitContextPropagation() = runBlocking(Context.of(7, "7").asCoroutineContext()) {
assertEquals(createMono().awaitFirst(), "7")
assertEquals(createMono().awaitFirstOrDefault("noValue"), "7")
assertEquals(createMono().awaitFirstOrNull(), "7")
assertEquals(createMono().awaitFirstOrElse { "noValue" }, "7")
assertEquals(createMono().awaitLast(), "7")
assertEquals(createMono().awaitSingle(), "7")
assertEquals(createMono().awaitSingleOrNull(), "7")
}

@Test
fun testFluxAwaitContextPropagation() = runBlocking<Unit>(
fun testFluxAwaitContextPropagation() = runBlocking(
Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext()
) {
assertEquals(createFlux().awaitFirst(), "1")
Expand Down

0 comments on commit 51b5ff7

Please sign in to comment.