Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
dkhalanskyjb committed Apr 21, 2021
1 parent 55f33ba commit 29538f4
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 60 deletions.
2 changes: 1 addition & 1 deletion reactive/kotlinx-coroutines-jdk9/src/Await.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package kotlinx.coroutines.jdk9

import kotlinx.coroutines.Job
import kotlinx.coroutines.*
import java.util.concurrent.*
import org.reactivestreams.FlowAdapters
import kotlinx.coroutines.reactive.*
Expand Down
7 changes: 3 additions & 4 deletions reactive/kotlinx-coroutines-jdk9/test/AwaitTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@
package kotlinx.coroutines.jdk9

import kotlinx.coroutines.*
import kotlinx.coroutines.CancellationException
import org.junit.*
import java.util.concurrent.*
import java.util.concurrent.Flow as JFlow

class AwaitTest: TestBase() {

Expand All @@ -16,8 +15,8 @@ class AwaitTest: TestBase() {
@Test
fun testAwaitCancellation() = runTest {
expect(1)
val publisher = Flow.Publisher<Int> { s ->
s.onSubscribe(object : Flow.Subscription {
val publisher = JFlow.Publisher<Int> { s ->
s.onSubscribe(object : JFlow.Subscription {
override fun request(n: Long) {
expect(3)
}
Expand Down
34 changes: 30 additions & 4 deletions reactive/kotlinx-coroutines-reactive/src/Await.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import java.lang.IllegalStateException
import kotlin.NoSuchElementException
import kotlin.coroutines.*

/**
Expand Down Expand Up @@ -90,14 +89,23 @@ public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately cancels its [Subscription] and resumes with [CancellationException].
*
* ### Deprecation
*
* This method is deprecated because the conventions established in Kotlin mandate that an operation with the name
* `awaitSingleOrDefault` returns the default value instead of throwing in case there is an error; however, this would
* also mean that this method would return the default value if there are *too many* values. This could be confusing to
* those who expect this function to validate that there is a single element or none at all emitted, and cases where
* there are no elements are indistinguishable from those where there are too many, though these cases have different
* meaning.
*
* @throws NoSuchElementException if the publisher does not emit any value
* @throws IllegalArgumentException if the publisher emits more than one value
*/
@Deprecated(
message = "Deprecated without a replacement due to its name incorrectly conveying the behavior. " +
"Please consider using awaitFirstOrDefault().",
level = DeprecationLevel.WARNING
)
) // Warning since 1.5, error in 1.6, hidden in 1.7
public suspend fun <T> Publisher<T>.awaitSingleOrDefault(default: T): T = awaitOne(Mode.SINGLE_OR_DEFAULT, default)

/**
Expand All @@ -109,6 +117,15 @@ public suspend fun <T> Publisher<T>.awaitSingleOrDefault(default: T): T = awaitO
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately cancels its [Subscription] and resumes with [CancellationException].
*
* ### Deprecation
*
* This method is deprecated because the conventions established in Kotlin mandate that an operation with the name
* `awaitSingleOrNull` returns `null` instead of throwing in case there is an error; however, this would
* also mean that this method would return `null` if there are *too many* values. This could be confusing to
* those who expect this function to validate that there is a single element or none at all emitted, and cases where
* there are no elements are indistinguishable from those where there are too many, though these cases have different
* meaning.
*
* @throws IllegalArgumentException if the publisher emits more than one value
*/
@Deprecated(
Expand All @@ -117,7 +134,7 @@ public suspend fun <T> Publisher<T>.awaitSingleOrDefault(default: T): T = awaitO
"Alternatively, please consider using awaitFirstOrNull().",
level = DeprecationLevel.WARNING,
replaceWith = ReplaceWith("this.awaitSingleOrNull()", "kotlinx.coroutines.reactor")
)
) // Warning since 1.5, error in 1.6, hidden in 1.7
public suspend fun <T> Publisher<T>.awaitSingleOrNull(): T? = awaitOne(Mode.SINGLE_OR_DEFAULT)

/**
Expand All @@ -129,13 +146,22 @@ public suspend fun <T> Publisher<T>.awaitSingleOrNull(): T? = awaitOne(Mode.SING
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately cancels its [Subscription] and resumes with [CancellationException].
*
* ### Deprecation
*
* This method is deprecated because the conventions established in Kotlin mandate that an operation with the name
* `awaitSingleOrElse` returns the calculated value instead of throwing in case there is an error; however, this would
* also mean that this method would return the calculated value if there are *too many* values. This could be confusing
* to those who expect this function to validate that there is a single element or none at all emitted, and cases where
* there are no elements are indistinguishable from those where there are too many, though these cases have different
* meaning.
*
* @throws IllegalArgumentException if the publisher emits more than one value
*/
@Deprecated(
message = "Deprecated without a replacement due to its name incorrectly conveying the behavior. " +
"Please consider using awaitFirstOrElse().",
level = DeprecationLevel.WARNING
)
) // Warning since 1.5, error in 1.6, hidden in 1.7
public suspend fun <T> Publisher<T>.awaitSingleOrElse(defaultValue: () -> T): T =
awaitOne(Mode.SINGLE_OR_DEFAULT) ?: defaultValue()

Expand Down
64 changes: 21 additions & 43 deletions reactive/kotlinx-coroutines-reactor/src/Mono.kt
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ public suspend fun <T> Mono<T>.awaitSingleOrNull(): T? = suspendCancellableCorou
}

override fun onComplete() {
if (!seenValue)
cont.resume(null)
if (!seenValue) cont.resume(null)
}

override fun onNext(t: T) {
Expand Down Expand Up @@ -136,87 +135,66 @@ public fun <T> CoroutineScope.mono(
): Mono<T> = monoInternal(this, context, block)

/**
* Awaits the first value from the given publisher without blocking the thread and returns the resulting value, or, if
* the publisher has produced an error, throws the corresponding exception.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately cancels its [Subscription] and resumes with [CancellationException].
*
* @throws NoSuchElementException if the publisher does not emit any value
* This function is deprecated in favor of [Mono.awaitSingle].
* Both functions await the first value, or throw [NoSuchElementException] if there is none, but the name
* [Mono.awaitSingle] better reflects the semantics of [Mono].
*/
@Deprecated(
message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " +
"Please use awaitSingle() instead.",
level = DeprecationLevel.WARNING,
replaceWith = ReplaceWith("this.awaitSingle()")
)
) // Warning since 1.5, error in 1.6, hidden in 1.7
public suspend fun <T> Mono<T>.awaitFirst(): T = awaitSingle()

/**
* Awaits the first value from the given publisher, or returns the [default] value if none is emitted, without blocking
* the thread, and returns the resulting value, or, if this publisher has produced an error, throws the corresponding
* exception.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately cancels its [Subscription] and resumes with [CancellationException].
* This function is deprecated in favor of [Mono.awaitSingleOrNull].
* Both functions await the first value or return some special value if there is none, but the name
* [Mono.awaitSingleOrNull] better reflects the semantics of [Mono] than an operation with a "first" in its name.
*/
@Deprecated(
message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " +
"Please use awaitSingleOrNull() instead.",
level = DeprecationLevel.WARNING,
replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default")
)
) // Warning since 1.5, error in 1.6, hidden in 1.7
public suspend fun <T> Mono<T>.awaitFirstOrDefault(default: T): T = awaitSingleOrNull() ?: default

/**
* Awaits the first value from the given publisher, or returns `null` if none is emitted, without blocking the thread,
* and returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately cancels its [Subscription] and resumes with [CancellationException].
* This function is deprecated in favor of [Mono.awaitSingleOrNull].
* Both functions await the first value or return some special value if there is none, but the name
* [Mono.awaitSingleOrNull] better reflects the semantics of [Mono].
*/
@Deprecated(
message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " +
"Please use awaitSingleOrNull() instead.",
level = DeprecationLevel.WARNING,
replaceWith = ReplaceWith("this.awaitSingleOrNull()")
)
) // Warning since 1.5, error in 1.6, hidden in 1.7
public suspend fun <T> Mono<T>.awaitFirstOrNull(): T? = awaitSingleOrNull()

/**
* Awaits the first value from the given publisher, or calls [defaultValue] to get a value if none is emitted, without
* blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the
* corresponding exception.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately cancels its [Subscription] and resumes with [CancellationException].
* This function is deprecated in favor of [Mono.awaitSingleOrNull].
* Both functions await the first value or return some special value if there is none, but the name
* [Mono.awaitSingleOrNull] better reflects the semantics of [Mono] than an operation with a "first" in its name.
*/
@Deprecated(
message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " +
"Please use awaitSingleOrNull() instead.",
level = DeprecationLevel.WARNING,
replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: defaultValue()")
)
) // Warning since 1.5, error in 1.6, hidden in 1.7
public suspend fun <T> Mono<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitSingleOrNull() ?: defaultValue()

/**
* Awaits the last value from the given publisher without blocking the thread and
* returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately cancels its [Subscription] and resumes with [CancellationException].
*
* @throws NoSuchElementException if the publisher does not emit any value
* This function is deprecated in favor of [Mono.awaitSingle].
* Both functions await the only value or return some special value if there is none, but the name
* "awaitLast" strongly suggests that there is more than one value, which is not the case.
*/
@Deprecated(
message = "Mono produces at most one value, so the last element is the same as the first. " +
"Please use awaitSingle() instead.",
level = DeprecationLevel.WARNING,
replaceWith = ReplaceWith("this.awaitSingle()")
)
) // Warning since 1.5, error in 1.6, hidden in 1.7
public suspend fun <T> Mono<T>.awaitLast(): T = awaitSingle()
14 changes: 12 additions & 2 deletions reactive/kotlinx-coroutines-rx2/src/RxAwait.kt
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,17 @@ public suspend fun <T> MaybeSource<T>.awaitSingle(): T = awaitSingleOrNull() ?:
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
*
* ### Deprecation
*
* Deprecated in favor of [awaitSingleOrNull] in order to reflect that `null` can be returned to denote the absence of
* a value, as opposed to throwing in such case.
*/
@Deprecated(
message = "Deprecated in favor of awaitSingleOrNull()",
level = DeprecationLevel.WARNING,
replaceWith = ReplaceWith("this.awaitSingleOrNull()")
)
) // Warning since 1.5, error in 1.6, hidden in 1.7
public suspend fun <T> MaybeSource<T>.await(): T? = awaitSingleOrNull()

/**
Expand All @@ -87,12 +92,17 @@ public suspend fun <T> MaybeSource<T>.await(): T? = awaitSingleOrNull()
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
*
* ### Deprecation
*
* Deprecated in favor of [awaitSingleOrNull] for naming consistency (see the deprecation of [MaybeSource.await] for
* details).
*/
@Deprecated(
message = "Deprecated in favor of awaitSingleOrNull()",
level = DeprecationLevel.WARNING,
replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default")
)
) // Warning since 1.5, error in 1.6, hidden in 1.7
public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default

// ------------------------ SingleSource ------------------------
Expand Down
4 changes: 2 additions & 2 deletions reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import io.reactivex.disposables.*
import io.reactivex.exceptions.*
import io.reactivex.internal.functions.Functions.*
import kotlinx.coroutines.*
import kotlinx.coroutines.CancellationException
import org.junit.*
import org.junit.Test
import java.util.concurrent.*
import java.util.concurrent.CancellationException as jCancellationException
import kotlin.test.*

class MaybeTest : TestBase() {
Expand Down Expand Up @@ -235,7 +235,7 @@ class MaybeTest : TestBase() {
expect(4)
try {
delay(Long.MAX_VALUE)
} catch (e: jCancellationException) {
} catch (e: CancellationException) {
expect(6)
}
42
Expand Down
14 changes: 12 additions & 2 deletions reactive/kotlinx-coroutines-rx3/src/RxAwait.kt
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,17 @@ public suspend fun <T> MaybeSource<T>.awaitSingle(): T = awaitSingleOrNull() ?:
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
*
* ### Deprecation
*
* Deprecated in favor of [awaitSingleOrNull] in order to reflect that `null` can be returned to denote the absence of
* a value, as opposed to throwing in such case.
*/
@Deprecated(
message = "Deprecated in favor of awaitSingleOrNull()",
level = DeprecationLevel.WARNING,
replaceWith = ReplaceWith("this.awaitSingleOrNull()")
)
) // Warning since 1.5, error in 1.6, hidden in 1.7
public suspend fun <T> MaybeSource<T>.await(): T? = awaitSingleOrNull()

/**
Expand All @@ -87,12 +92,17 @@ public suspend fun <T> MaybeSource<T>.await(): T? = awaitSingleOrNull()
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
*
* ### Deprecation
*
* Deprecated in favor of [awaitSingleOrNull] for naming consistency (see the deprecation of [MaybeSource.await] for
* details).
*/
@Deprecated(
message = "Deprecated in favor of awaitSingleOrNull()",
level = DeprecationLevel.WARNING,
replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default")
)
) // Warning since 1.5, error in 1.6, hidden in 1.7
public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default

// ------------------------ SingleSource ------------------------
Expand Down
4 changes: 2 additions & 2 deletions reactive/kotlinx-coroutines-rx3/test/MaybeTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import io.reactivex.rxjava3.disposables.*
import io.reactivex.rxjava3.exceptions.*
import io.reactivex.rxjava3.internal.functions.Functions.*
import kotlinx.coroutines.*
import kotlinx.coroutines.CancellationException
import org.junit.*
import org.junit.Test
import java.util.concurrent.*
import java.util.concurrent.CancellationException as jCancellationException
import kotlin.test.*

class MaybeTest : TestBase() {
Expand Down Expand Up @@ -235,7 +235,7 @@ class MaybeTest : TestBase() {
expect(4)
try {
delay(Long.MAX_VALUE)
} catch (e: jCancellationException) {
} catch (e: CancellationException) {
expect(6)
}
42
Expand Down

0 comments on commit 29538f4

Please sign in to comment.