From 51b5ff7c92389cf6b958f26e7712768aee481fc9 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Fri, 16 Apr 2021 12:16:08 +0300 Subject: [PATCH] Introduce Mono.singleOrNull --- .../kotlinx-coroutines-reactive/src/Await.kt | 9 +- .../api/kotlinx-coroutines-reactor.api | 7 + .../kotlinx-coroutines-reactor/src/Mono.kt | 131 +++++++++++++++++- .../src/ReactorContext.kt | 4 +- .../test/FlowAsFluxTest.kt | 4 +- .../test/MonoTest.kt | 59 +++++++- .../test/ReactorContextTest.kt | 12 +- 7 files changed, 202 insertions(+), 24 deletions(-) diff --git a/reactive/kotlinx-coroutines-reactive/src/Await.kt b/reactive/kotlinx-coroutines-reactive/src/Await.kt index d338900248..20c0387c14 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Await.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Await.kt @@ -94,7 +94,8 @@ public suspend fun Publisher.awaitSingle(): T = awaitOne(Mode.SINGLE) * @throws IllegalArgumentException if the publisher emits more than one value */ @Deprecated( - message = "Deprecated without a replacement due to its name incorrectly conveying the behavior", + message = "Deprecated without a replacement due to its name incorrectly conveying the behavior. " + + "Please consider using awaitFirstOrDefault().", level = DeprecationLevel.WARNING ) public suspend fun Publisher.awaitSingleOrDefault(default: T): T = awaitOne(Mode.SINGLE_OR_DEFAULT, default) @@ -112,7 +113,8 @@ public suspend fun Publisher.awaitSingleOrDefault(default: T): T = awaitO */ @Deprecated( message = "Deprecated without a replacement due to its name incorrectly conveying the behavior. " + - "There is a specialized version for Reactor's Mono, please use that where applicable.", + "There is a specialized version for Reactor's Mono, please use that where applicable. " + + "Alternatively, please consider using awaitFirstOrNull().", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.awaitSingleOrNull()", "kotlinx.coroutines.reactor") ) @@ -130,7 +132,8 @@ public suspend fun Publisher.awaitSingleOrNull(): T? = awaitOne(Mode.SING * @throws IllegalArgumentException if the publisher emits more than one value */ @Deprecated( - message = "Deprecated without a replacement due to its name incorrectly conveying the behavior", + message = "Deprecated without a replacement due to its name incorrectly conveying the behavior. " + + "Please consider using awaitFirstOrElse().", level = DeprecationLevel.WARNING ) public suspend fun Publisher.awaitSingleOrElse(defaultValue: () -> T): T = diff --git a/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api b/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api index 3a1c8b7d31..0a10aa12a9 100644 --- a/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api +++ b/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api @@ -17,6 +17,13 @@ public final class kotlinx/coroutines/reactor/FluxKt { } public final class kotlinx/coroutines/reactor/MonoKt { + public static final fun awaitFirst (Lreactor/core/publisher/Mono;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun awaitFirstOrDefault (Lreactor/core/publisher/Mono;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun awaitFirstOrElse (Lreactor/core/publisher/Mono;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun awaitFirstOrNull (Lreactor/core/publisher/Mono;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun awaitLast (Lreactor/core/publisher/Mono;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun awaitSingle (Lreactor/core/publisher/Mono;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun awaitSingleOrNull (Lreactor/core/publisher/Mono;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun mono (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Mono; public static final synthetic fun mono (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Mono; public static synthetic fun mono$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lreactor/core/publisher/Mono; diff --git a/reactive/kotlinx-coroutines-reactor/src/Mono.kt b/reactive/kotlinx-coroutines-reactor/src/Mono.kt index 307ec2278c..3a210751bc 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Mono.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Mono.kt @@ -7,12 +7,10 @@ package kotlinx.coroutines.reactor import kotlinx.coroutines.* -import kotlinx.coroutines.reactive.* import org.reactivestreams.* import reactor.core.* import reactor.core.publisher.* import kotlin.coroutines.* -import kotlin.internal.* /** * Creates a cold [mono][Mono] that runs a given [block] in a coroutine and emits its result. @@ -34,6 +32,50 @@ public fun mono( return monoInternal(GlobalScope, context, block) } +/** + * Awaits the single value from the given [Mono] without blocking the thread and returns the resulting value, or, if + * this publisher has produced an error, throws the corresponding exception. If the Mono completed without a value, + * `null` is returned. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Subscription] and resumes with [CancellationException]. + */ +public suspend fun Mono.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont -> + subscribe(object : Subscriber { + private var seenValue = false + + override fun onSubscribe(s: Subscription) { + cont.invokeOnCancellation { s.cancel() } + s.request(Long.MAX_VALUE) + } + + override fun onComplete() { + if (!seenValue) + cont.resume(null) + } + + override fun onNext(t: T) { + seenValue = true + cont.resume(t) + } + + override fun onError(error: Throwable) { cont.resumeWithException(error) } + }) +} + +/** + * Awaits the single value from the given [Mono] without blocking the thread and returns the resulting value, or, + * if this Mono has produced an error, throws the corresponding exception. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Subscription] and resumes with [CancellationException]. + * + * @throws NoSuchElementException if the Mono does not emit any value + */ +public suspend fun Mono.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException() + private fun monoInternal( scope: CoroutineScope, // support for legacy mono in scope context: CoroutineContext, @@ -89,3 +131,88 @@ public fun CoroutineScope.mono( block: suspend CoroutineScope.() -> T? ): Mono = monoInternal(this, context, block) +/** + * Awaits the first value from the given publisher without blocking the thread and returns the resulting value, or, if + * the publisher has produced an error, throws the corresponding exception. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Subscription] and resumes with [CancellationException]. + * + * @throws NoSuchElementException if the publisher does not emit any value + */ +@Deprecated( + message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " + + "Please use awaitSingle() instead.", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("this.awaitSingle()") +) +public suspend fun Mono.awaitFirst(): T = awaitSingle() + +/** + * Awaits the first value from the given publisher, or returns the [default] value if none is emitted, without blocking + * the thread, and returns the resulting value, or, if this publisher has produced an error, throws the corresponding + * exception. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Subscription] and resumes with [CancellationException]. + */ +@Deprecated( + message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " + + "Please use awaitSingleOrNull() instead.", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default") +) +public suspend fun Mono.awaitFirstOrDefault(default: T): T = awaitSingleOrNull() ?: default + +/** + * Awaits the first value from the given publisher, or returns `null` if none is emitted, without blocking the thread, + * and returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Subscription] and resumes with [CancellationException]. + */ +@Deprecated( + message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " + + "Please use awaitSingleOrNull() instead.", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("this.awaitSingleOrNull()") +) +public suspend fun Mono.awaitFirstOrNull(): T? = awaitSingleOrNull() + +/** + * Awaits the first value from the given publisher, or calls [defaultValue] to get a value if none is emitted, without + * blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the + * corresponding exception. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Subscription] and resumes with [CancellationException]. + */ +@Deprecated( + message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " + + "Please use awaitSingleOrNull() instead.", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: defaultValue()") +) +public suspend fun Mono.awaitFirstOrElse(defaultValue: () -> T): T = awaitSingleOrNull() ?: defaultValue() + +/** + * Awaits the last value from the given publisher without blocking the thread and + * returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Subscription] and resumes with [CancellationException]. + * + * @throws NoSuchElementException if the publisher does not emit any value + */ +@Deprecated( + message = "Mono produces at most one value, so the last element is the same as the first. " + + "Please use awaitSingle() instead.", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("this.awaitSingle()") +) +public suspend fun Mono.awaitLast(): T = awaitSingle() \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt index be4b2c7d45..333f056d97 100644 --- a/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt +++ b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt @@ -16,8 +16,8 @@ import kotlinx.coroutines.reactive.* * * This context element is implicitly propagated through subscriber's context by all Reactive integrations, such as [mono], [flux], * [Publisher.asFlow][asFlow], [Flow.asPublisher][asPublisher] and [Flow.asFlux][asFlux]. - * Functions that subscribe to the reactive stream (e.g. [Publisher.awaitFirst][awaitFirst]) also propagate [ReactorContext] to the - * subscriber's [Context]. + * Functions that subscribe to the reactive stream (e.g. [Publisher.awaitFirst][kotlinx.coroutines.reactive.awaitFirst]) + * also propagate the [ReactorContext] to the subscriber's [Context]. ** * ### Examples of Reactive context integration. * diff --git a/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt b/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt index dbe97b17d8..d8807385f0 100644 --- a/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt @@ -14,8 +14,8 @@ import kotlin.test.* class FlowAsFluxTest : TestBase() { @Test fun testFlowAsFluxContextPropagation() { - val flux = flow { - (1..4).forEach { i -> emit(createMono(i).awaitFirst()) } + val flux = flow { + (1..4).forEach { i -> emit(createMono(i).awaitSingle()) } } .asFlux() .contextWrite(Context.of(1, "1")) diff --git a/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt b/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt index a98c514f19..421295d115 100644 --- a/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt @@ -13,7 +13,6 @@ import org.junit.Test import org.reactivestreams.* import reactor.core.publisher.* import reactor.util.context.* -import java.time.* import java.time.Duration.* import java.util.function.* import kotlin.test.* @@ -115,6 +114,52 @@ class MonoTest : TestBase() { @Test fun testMonoAwait() = runBlocking { assertEquals("OK", Mono.just("O").awaitSingle() + "K") + assertEquals("OK", Mono.just("O").awaitSingleOrNull() + "K") + assertFailsWith{ Mono.empty().awaitSingle() } + assertNull(Mono.empty().awaitSingleOrNull()) + } + + /** Tests that the versions of the await methods specialized for Mono for deprecation behave correctly and we don't + * break any code by introducing them. */ + @Test + @Suppress("DEPRECATION") + fun testDeprecatedAwaitMethods() = runBlocking { + val filledMono = mono { "OK" } + assertEquals("OK", filledMono.awaitFirst()) + assertEquals("OK", filledMono.awaitFirstOrDefault("!")) + assertEquals("OK", filledMono.awaitFirstOrNull()) + assertEquals("OK", filledMono.awaitFirstOrElse { "ELSE" }) + assertEquals("OK", filledMono.awaitLast()) + assertEquals("OK", filledMono.awaitSingleOrDefault("!")) + assertEquals("OK", filledMono.awaitSingleOrElse { "ELSE" }) + val emptyMono = mono { null } + assertFailsWith { emptyMono.awaitFirst() } + assertEquals("OK", emptyMono.awaitFirstOrDefault("OK")) + assertNull(emptyMono.awaitFirstOrNull()) + assertEquals("ELSE", emptyMono.awaitFirstOrElse { "ELSE" }) + assertFailsWith { emptyMono.awaitLast() } + assertEquals("OK", emptyMono.awaitSingleOrDefault("OK")) + assertEquals("ELSE", emptyMono.awaitSingleOrElse { "ELSE" }) + } + + /** Tests that calls to [awaitSingleOrNull] (and, thus, to the rest of such functions) throw [CancellationException] + * and unsubscribe from the publisher when their [Job] is cancelled. */ + @Test + fun testAwaitCancellation() = runTest { + expect(1) + val mono = mono { delay(Long.MAX_VALUE) }.doOnSubscribe { expect(3) }.doOnCancel { expect(5) } + val job = launch(start = CoroutineStart.UNDISPATCHED) { + try { + expect(2) + mono.awaitSingleOrNull() + } catch (e: CancellationException) { + expect(6) + throw e + } + } + expect(4) + job.cancelAndJoin() + finish(7) } @Test @@ -264,7 +309,7 @@ class MonoTest : TestBase() { .interval(ofMillis(1)) .switchMap { mono(coroutineContext) { - timeBomb().awaitFirst() + timeBomb().awaitSingle() } } .onErrorReturn({ @@ -275,14 +320,14 @@ class MonoTest : TestBase() { finish(2) } - private fun timeBomb() = Mono.delay(Duration.ofMillis(1)).doOnSuccess { throw Exception("something went wrong") } + private fun timeBomb() = Mono.delay(ofMillis(1)).doOnSuccess { throw Exception("something went wrong") } @Test fun testLeakedException() = runBlocking { // Test exception is not reported to global handler val flow = mono { throw TestException() }.toFlux().asFlow() repeat(10000) { - combine(flow, flow) { _, _ -> Unit } + combine(flow, flow) { _, _ -> } .catch {} .collect { } } @@ -373,13 +418,13 @@ class MonoTest : TestBase() { Hooks.resetOnOperatorError("testDownstreamCancellationDoesNotThrow") } - /** Run the given [Publisher], cancel it, wait for the cancellation handler to finish, and return only then. + /** Run the given [Mono], cancel it, wait for the cancellation handler to finish, and return only then. * * Will not work in the general case, but here, when the publisher uses [Dispatchers.Unconfined], this seems to * ensure that the cancellation handler will have nowhere to execute but serially with the cancellation. */ - private suspend fun Publisher.awaitCancelAndJoin() = coroutineScope { + private suspend fun Mono.awaitCancelAndJoin() = coroutineScope { async(start = CoroutineStart.UNDISPATCHED) { - awaitFirstOrNull() + awaitSingleOrNull() }.cancelAndJoin() } } diff --git a/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt b/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt index aff29241c9..577238be1d 100644 --- a/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt @@ -20,7 +20,7 @@ class ReactorContextTest : TestBase() { } } .contextWrite(Context.of(2, "2", 3, "3", 4, "4", 5, "5")) .contextWrite { ctx -> ctx.put(6, "6") } - assertEquals(mono.awaitFirst(), "1234567") + assertEquals(mono.awaitSingle(), "1234567") } @Test @@ -43,22 +43,18 @@ class ReactorContextTest : TestBase() { (1..3).forEach { append(ctx.getOrDefault(it, "noValue")) } } } .contextWrite(Context.of(2, "2")) - .awaitFirst() + .awaitSingle() assertEquals(result, "123") } @Test fun testMonoAwaitContextPropagation() = runBlocking(Context.of(7, "7").asCoroutineContext()) { - assertEquals(createMono().awaitFirst(), "7") - assertEquals(createMono().awaitFirstOrDefault("noValue"), "7") - assertEquals(createMono().awaitFirstOrNull(), "7") - assertEquals(createMono().awaitFirstOrElse { "noValue" }, "7") - assertEquals(createMono().awaitLast(), "7") assertEquals(createMono().awaitSingle(), "7") + assertEquals(createMono().awaitSingleOrNull(), "7") } @Test - fun testFluxAwaitContextPropagation() = runBlocking( + fun testFluxAwaitContextPropagation() = runBlocking( Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext() ) { assertEquals(createFlux().awaitFirst(), "1")