From c1e7a32a4d75ddea047772d9877b79afdb691bf1 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Tue, 20 Apr 2021 11:37:43 +0300 Subject: [PATCH] Fixes --- reactive/kotlinx-coroutines-jdk9/src/Await.kt | 2 +- .../kotlinx-coroutines-jdk9/test/AwaitTest.kt | 7 +- .../kotlinx-coroutines-reactive/src/Await.kt | 34 ++++++++-- .../kotlinx-coroutines-reactor/src/Mono.kt | 64 ++++++------------- .../kotlinx-coroutines-rx2/src/RxAwait.kt | 14 +++- .../kotlinx-coroutines-rx2/test/MaybeTest.kt | 4 +- .../kotlinx-coroutines-rx3/src/RxAwait.kt | 14 +++- .../kotlinx-coroutines-rx3/test/MaybeTest.kt | 4 +- 8 files changed, 83 insertions(+), 60 deletions(-) diff --git a/reactive/kotlinx-coroutines-jdk9/src/Await.kt b/reactive/kotlinx-coroutines-jdk9/src/Await.kt index b42abb2b5a..dfe6ec52f2 100644 --- a/reactive/kotlinx-coroutines-jdk9/src/Await.kt +++ b/reactive/kotlinx-coroutines-jdk9/src/Await.kt @@ -4,7 +4,7 @@ package kotlinx.coroutines.jdk9 -import kotlinx.coroutines.Job +import kotlinx.coroutines.* import java.util.concurrent.* import org.reactivestreams.FlowAdapters import kotlinx.coroutines.reactive.* diff --git a/reactive/kotlinx-coroutines-jdk9/test/AwaitTest.kt b/reactive/kotlinx-coroutines-jdk9/test/AwaitTest.kt index 80ca0faa3d..5a95d098fd 100644 --- a/reactive/kotlinx-coroutines-jdk9/test/AwaitTest.kt +++ b/reactive/kotlinx-coroutines-jdk9/test/AwaitTest.kt @@ -5,9 +5,8 @@ package kotlinx.coroutines.jdk9 import kotlinx.coroutines.* -import kotlinx.coroutines.CancellationException import org.junit.* -import java.util.concurrent.* +import java.util.concurrent.Flow as JFlow class AwaitTest: TestBase() { @@ -16,8 +15,8 @@ class AwaitTest: TestBase() { @Test fun testAwaitCancellation() = runTest { expect(1) - val publisher = Flow.Publisher { s -> - s.onSubscribe(object : Flow.Subscription { + val publisher = JFlow.Publisher { s -> + s.onSubscribe(object : JFlow.Subscription { override fun request(n: Long) { expect(3) } diff --git a/reactive/kotlinx-coroutines-reactive/src/Await.kt b/reactive/kotlinx-coroutines-reactive/src/Await.kt index 20c0387c14..067f5e8031 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Await.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Await.kt @@ -9,7 +9,6 @@ import org.reactivestreams.Publisher import org.reactivestreams.Subscriber import org.reactivestreams.Subscription import java.lang.IllegalStateException -import kotlin.NoSuchElementException import kotlin.coroutines.* /** @@ -90,6 +89,15 @@ public suspend fun Publisher.awaitSingle(): T = awaitOne(Mode.SINGLE) * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this * function immediately cancels its [Subscription] and resumes with [CancellationException]. * + * ### Deprecation + * + * This method is deprecated because the conventions established in Kotlin mandate that an operation with the name + * `awaitSingleOrDefault` returns the default value instead of throwing in case there is an error; however, this would + * also mean that this method would return the default value if there are *too many* values. This could be confusing to + * those who expect this function to validate that there is a single element or none at all emitted, and cases where + * there are no elements are indistinguishable from those where there are too many, though these cases have different + * meaning. + * * @throws NoSuchElementException if the publisher does not emit any value * @throws IllegalArgumentException if the publisher emits more than one value */ @@ -97,7 +105,7 @@ public suspend fun Publisher.awaitSingle(): T = awaitOne(Mode.SINGLE) message = "Deprecated without a replacement due to its name incorrectly conveying the behavior. " + "Please consider using awaitFirstOrDefault().", level = DeprecationLevel.WARNING -) +) // Warning since 1.5, error in 1.6, hidden in 1.7 public suspend fun Publisher.awaitSingleOrDefault(default: T): T = awaitOne(Mode.SINGLE_OR_DEFAULT, default) /** @@ -109,6 +117,15 @@ public suspend fun Publisher.awaitSingleOrDefault(default: T): T = awaitO * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this * function immediately cancels its [Subscription] and resumes with [CancellationException]. * + * ### Deprecation + * + * This method is deprecated because the conventions established in Kotlin mandate that an operation with the name + * `awaitSingleOrNull` returns `null` instead of throwing in case there is an error; however, this would + * also mean that this method would return `null` if there are *too many* values. This could be confusing to + * those who expect this function to validate that there is a single element or none at all emitted, and cases where + * there are no elements are indistinguishable from those where there are too many, though these cases have different + * meaning. + * * @throws IllegalArgumentException if the publisher emits more than one value */ @Deprecated( @@ -117,7 +134,7 @@ public suspend fun Publisher.awaitSingleOrDefault(default: T): T = awaitO "Alternatively, please consider using awaitFirstOrNull().", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.awaitSingleOrNull()", "kotlinx.coroutines.reactor") -) +) // Warning since 1.5, error in 1.6, hidden in 1.7 public suspend fun Publisher.awaitSingleOrNull(): T? = awaitOne(Mode.SINGLE_OR_DEFAULT) /** @@ -129,13 +146,22 @@ public suspend fun Publisher.awaitSingleOrNull(): T? = awaitOne(Mode.SING * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this * function immediately cancels its [Subscription] and resumes with [CancellationException]. * + * ### Deprecation + * + * This method is deprecated because the conventions established in Kotlin mandate that an operation with the name + * `awaitSingleOrElse` returns the calculated value instead of throwing in case there is an error; however, this would + * also mean that this method would return the calculated value if there are *too many* values. This could be confusing + * to those who expect this function to validate that there is a single element or none at all emitted, and cases where + * there are no elements are indistinguishable from those where there are too many, though these cases have different + * meaning. + * * @throws IllegalArgumentException if the publisher emits more than one value */ @Deprecated( message = "Deprecated without a replacement due to its name incorrectly conveying the behavior. " + "Please consider using awaitFirstOrElse().", level = DeprecationLevel.WARNING -) +) // Warning since 1.5, error in 1.6, hidden in 1.7 public suspend fun Publisher.awaitSingleOrElse(defaultValue: () -> T): T = awaitOne(Mode.SINGLE_OR_DEFAULT) ?: defaultValue() diff --git a/reactive/kotlinx-coroutines-reactor/src/Mono.kt b/reactive/kotlinx-coroutines-reactor/src/Mono.kt index af8aec2eb4..fa8239f8bf 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Mono.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Mono.kt @@ -52,8 +52,7 @@ public suspend fun Mono.awaitSingleOrNull(): T? = suspendCancellableCorou } override fun onComplete() { - if (!seenValue) - cont.resume(null) + if (!seenValue) cont.resume(null) } override fun onNext(t: T) { @@ -133,87 +132,66 @@ public fun CoroutineScope.mono( ): Mono = monoInternal(this, context, block) /** - * Awaits the first value from the given publisher without blocking the thread and returns the resulting value, or, if - * the publisher has produced an error, throws the corresponding exception. - * - * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this - * function immediately cancels its [Subscription] and resumes with [CancellationException]. - * - * @throws NoSuchElementException if the publisher does not emit any value + * This function is deprecated in favor of [Mono.awaitSingle]. + * Both functions await the first value, or throw [NoSuchElementException] if there is none, but the name + * [Mono.awaitSingle] better reflects the semantics of [Mono]. */ @Deprecated( message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " + "Please use awaitSingle() instead.", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.awaitSingle()") -) +) // Warning since 1.5, error in 1.6, hidden in 1.7 public suspend fun Mono.awaitFirst(): T = awaitSingle() /** - * Awaits the first value from the given publisher, or returns the [default] value if none is emitted, without blocking - * the thread, and returns the resulting value, or, if this publisher has produced an error, throws the corresponding - * exception. - * - * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this - * function immediately cancels its [Subscription] and resumes with [CancellationException]. + * This function is deprecated in favor of [Mono.awaitSingleOrNull]. + * Both functions await the first value or return some special value if there is none, but the name + * [Mono.awaitSingleOrNull] better reflects the semantics of [Mono] than an operation with a "first" in its name. */ @Deprecated( message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " + "Please use awaitSingleOrNull() instead.", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default") -) +) // Warning since 1.5, error in 1.6, hidden in 1.7 public suspend fun Mono.awaitFirstOrDefault(default: T): T = awaitSingleOrNull() ?: default /** - * Awaits the first value from the given publisher, or returns `null` if none is emitted, without blocking the thread, - * and returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception. - * - * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this - * function immediately cancels its [Subscription] and resumes with [CancellationException]. + * This function is deprecated in favor of [Mono.awaitSingleOrNull]. + * Both functions await the first value or return some special value if there is none, but the name + * [Mono.awaitSingleOrNull] better reflects the semantics of [Mono]. */ @Deprecated( message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " + "Please use awaitSingleOrNull() instead.", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.awaitSingleOrNull()") -) +) // Warning since 1.5, error in 1.6, hidden in 1.7 public suspend fun Mono.awaitFirstOrNull(): T? = awaitSingleOrNull() /** - * Awaits the first value from the given publisher, or calls [defaultValue] to get a value if none is emitted, without - * blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the - * corresponding exception. - * - * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this - * function immediately cancels its [Subscription] and resumes with [CancellationException]. + * This function is deprecated in favor of [Mono.awaitSingleOrNull]. + * Both functions await the first value or return some special value if there is none, but the name + * [Mono.awaitSingleOrNull] better reflects the semantics of [Mono] than an operation with a "first" in its name. */ @Deprecated( message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " + "Please use awaitSingleOrNull() instead.", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: defaultValue()") -) +) // Warning since 1.5, error in 1.6, hidden in 1.7 public suspend fun Mono.awaitFirstOrElse(defaultValue: () -> T): T = awaitSingleOrNull() ?: defaultValue() /** - * Awaits the last value from the given publisher without blocking the thread and - * returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception. - * - * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this - * function immediately cancels its [Subscription] and resumes with [CancellationException]. - * - * @throws NoSuchElementException if the publisher does not emit any value + * This function is deprecated in favor of [Mono.awaitSingle]. + * Both functions await the only value or return some special value if there is none, but the name + * "awaitLast" strongly suggests that there is more than one value, which is not the case. */ @Deprecated( message = "Mono produces at most one value, so the last element is the same as the first. " + "Please use awaitSingle() instead.", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.awaitSingle()") -) +) // Warning since 1.5, error in 1.6, hidden in 1.7 public suspend fun Mono.awaitLast(): T = awaitSingle() \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt b/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt index c4d54dbfc9..d7b8ee26f6 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt @@ -71,12 +71,17 @@ public suspend fun MaybeSource.awaitSingle(): T = awaitSingleOrNull() ?: * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function * immediately resumes with [CancellationException]. + * + * ### Deprecation + * + * Deprecated in favor of [awaitSingleOrNull] in order to reflect that `null` can be returned to denote the absence of + * a value, as opposed to throwing in such case. */ @Deprecated( message = "Deprecated in favor of awaitSingleOrNull()", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.awaitSingleOrNull()") -) +) // Warning since 1.5, error in 1.6, hidden in 1.7 public suspend fun MaybeSource.await(): T? = awaitSingleOrNull() /** @@ -87,12 +92,17 @@ public suspend fun MaybeSource.await(): T? = awaitSingleOrNull() * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function * immediately resumes with [CancellationException]. + * + * ### Deprecation + * + * Deprecated in favor of [awaitSingleOrNull] for naming consistency (see the deprecation of [MaybeSource.await] for + * details). */ @Deprecated( message = "Deprecated in favor of awaitSingleOrNull()", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default") -) +) // Warning since 1.5, error in 1.6, hidden in 1.7 public suspend fun MaybeSource.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default // ------------------------ SingleSource ------------------------ diff --git a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt index b15eb39e48..292f6187fa 100644 --- a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt @@ -9,10 +9,10 @@ import io.reactivex.disposables.* import io.reactivex.exceptions.* import io.reactivex.internal.functions.Functions.* import kotlinx.coroutines.* +import kotlinx.coroutines.CancellationException import org.junit.* import org.junit.Test import java.util.concurrent.* -import java.util.concurrent.CancellationException as jCancellationException import kotlin.test.* class MaybeTest : TestBase() { @@ -235,7 +235,7 @@ class MaybeTest : TestBase() { expect(4) try { delay(Long.MAX_VALUE) - } catch (e: jCancellationException) { + } catch (e: CancellationException) { expect(6) } 42 diff --git a/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt b/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt index 557abe0abb..a4435b88e2 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt @@ -71,12 +71,17 @@ public suspend fun MaybeSource.awaitSingle(): T = awaitSingleOrNull() ?: * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function * immediately resumes with [CancellationException]. + * + * ### Deprecation + * + * Deprecated in favor of [awaitSingleOrNull] in order to reflect that `null` can be returned to denote the absence of + * a value, as opposed to throwing in such case. */ @Deprecated( message = "Deprecated in favor of awaitSingleOrNull()", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.awaitSingleOrNull()") -) +) // Warning since 1.5, error in 1.6, hidden in 1.7 public suspend fun MaybeSource.await(): T? = awaitSingleOrNull() /** @@ -87,12 +92,17 @@ public suspend fun MaybeSource.await(): T? = awaitSingleOrNull() * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function * immediately resumes with [CancellationException]. + * + * ### Deprecation + * + * Deprecated in favor of [awaitSingleOrNull] for naming consistency (see the deprecation of [MaybeSource.await] for + * details). */ @Deprecated( message = "Deprecated in favor of awaitSingleOrNull()", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default") -) +) // Warning since 1.5, error in 1.6, hidden in 1.7 public suspend fun MaybeSource.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default // ------------------------ SingleSource ------------------------ diff --git a/reactive/kotlinx-coroutines-rx3/test/MaybeTest.kt b/reactive/kotlinx-coroutines-rx3/test/MaybeTest.kt index 186829bf24..bdb5481d80 100644 --- a/reactive/kotlinx-coroutines-rx3/test/MaybeTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/MaybeTest.kt @@ -9,10 +9,10 @@ import io.reactivex.rxjava3.disposables.* import io.reactivex.rxjava3.exceptions.* import io.reactivex.rxjava3.internal.functions.Functions.* import kotlinx.coroutines.* +import kotlinx.coroutines.CancellationException import org.junit.* import org.junit.Test import java.util.concurrent.* -import java.util.concurrent.CancellationException as jCancellationException import kotlin.test.* class MaybeTest : TestBase() { @@ -235,7 +235,7 @@ class MaybeTest : TestBase() { expect(4) try { delay(Long.MAX_VALUE) - } catch (e: jCancellationException) { + } catch (e: CancellationException) { expect(6) } 42