Skip to content

Commit

Permalink
Upgrade to Coroutines 1.4.0-M1 and use awaitSingle()
Browse files Browse the repository at this point in the history
This commit raises the minimum Coroutines version supported
to 1.4.0-M1 and above, and changes usages of awaitFirst() or
awaitFirstOrNull() to awaitSingle() or awaitSingleOrNull()
to fix gh-25007.

Closes gh-25914
Closes gh-25007
  • Loading branch information
sdeleuze committed Oct 13, 2020
1 parent cd835b3 commit 3ed8813
Show file tree
Hide file tree
Showing 14 changed files with 38 additions and 38 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ configure(allprojects) { project ->
mavenBom "io.rsocket:rsocket-bom:1.1.0-RC1"
mavenBom "org.eclipse.jetty:jetty-bom:9.4.32.v20200930"
mavenBom "org.jetbrains.kotlin:kotlin-bom:1.4.10"
mavenBom "org.jetbrains.kotlinx:kotlinx-coroutines-bom:1.3.9"
mavenBom "org.jetbrains.kotlinx:kotlinx-coroutines-bom:1.4.0-M1"
mavenBom "org.junit:junit-bom:5.7.0"
}
dependencies {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactive.awaitSingleOrNull
import kotlinx.coroutines.reactor.asFlux

import kotlinx.coroutines.reactor.mono
Expand All @@ -49,7 +49,7 @@ internal fun <T: Any> deferredToMono(source: Deferred<T>) =
* @since 5.2
*/
internal fun <T: Any> monoToDeferred(source: Mono<T>) =
GlobalScope.async(Dispatchers.Unconfined) { source.awaitFirstOrNull() }
GlobalScope.async(Dispatchers.Unconfined) { source.awaitSingleOrNull() }

/**
* Invoke a suspending function and converts it to [Mono] or [reactor.core.publisher.Flux].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.springframework.messaging.rsocket
import io.rsocket.transport.ClientTransport
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactive.awaitSingleOrNull
import kotlinx.coroutines.reactive.awaitSingle
import org.reactivestreams.Publisher
import org.springframework.core.ParameterizedTypeReference
Expand Down Expand Up @@ -103,7 +103,7 @@ inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(flow: Flo
* @since 5.2
*/
suspend fun RSocketRequester.RetrieveSpec.sendAndAwait() {
send().awaitFirstOrNull()
send().awaitSingleOrNull()
}

/**
Expand All @@ -122,7 +122,7 @@ suspend inline fun <reified T : Any> RSocketRequester.RetrieveSpec.retrieveAndAw
* @since 5.2.1
*/
suspend inline fun <reified T : Any> RSocketRequester.RetrieveSpec.retrieveAndAwaitOrNull(): T? =
retrieveMono(object : ParameterizedTypeReference<T>() {}).awaitFirstOrNull()
retrieveMono(object : ParameterizedTypeReference<T>() {}).awaitSingleOrNull()

/**
* Coroutines variant of [RSocketRequester.RetrieveSpec.retrieveFlux].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@

package org.springframework.r2dbc.core

import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactive.awaitSingleOrNull

/**
* Coroutines variant of [DatabaseClient.GenericExecuteSpec.then].
*
* @author Sebastien Deleuze
*/
suspend fun DatabaseClient.GenericExecuteSpec.await() {
then().awaitFirstOrNull()
then().awaitSingleOrNull()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package org.springframework.r2dbc.core

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactive.awaitSingleOrNull
import org.springframework.dao.EmptyResultDataAccessException

/**
Expand All @@ -26,7 +26,7 @@ import org.springframework.dao.EmptyResultDataAccessException
* @author Sebastien Deleuze
*/
suspend fun <T> RowsFetchSpec<T>.awaitOne(): T {
return one().awaitFirstOrNull() ?: throw EmptyResultDataAccessException(1)
return one().awaitSingleOrNull() ?: throw EmptyResultDataAccessException(1)
}

/**
Expand All @@ -35,24 +35,24 @@ suspend fun <T> RowsFetchSpec<T>.awaitOne(): T {
* @author Sebastien Deleuze
*/
suspend fun <T> RowsFetchSpec<T>.awaitOneOrNull(): T? =
one().awaitFirstOrNull()
one().awaitSingleOrNull()

/**
* Non-nullable Coroutines variant of [RowsFetchSpec.first].
*
* @author Sebastien Deleuze
*/
suspend fun <T> RowsFetchSpec<T>.awaitFirst(): T {
return first().awaitFirstOrNull() ?: throw EmptyResultDataAccessException(1)
suspend fun <T> RowsFetchSpec<T>.awaitSingle(): T {
return first().awaitSingleOrNull() ?: throw EmptyResultDataAccessException(1)
}

/**
* Nullable Coroutines variant of [RowsFetchSpec.first].
*
* @author Sebastien Deleuze
*/
suspend fun <T> RowsFetchSpec<T>.awaitFirstOrNull(): T? =
first().awaitFirstOrNull()
suspend fun <T> RowsFetchSpec<T>.awaitSingleOrNull(): T? =
first().awaitSingleOrNull()

/**
* Coroutines [Flow] variant of [RowsFetchSpec.all].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class RowsFetchSpecExtensionsTests {
every { spec.first() } returns Mono.just("foo")

runBlocking {
assertThat(spec.awaitFirst()).isEqualTo("foo")
assertThat(spec.awaitSingle()).isEqualTo("foo")
}

verify {
Expand All @@ -112,7 +112,7 @@ class RowsFetchSpecExtensionsTests {
every { spec.first() } returns Mono.empty()

assertThatExceptionOfType(EmptyResultDataAccessException::class.java).isThrownBy {
runBlocking { spec.awaitFirst() }
runBlocking { spec.awaitSingle() }
}

verify {
Expand All @@ -121,12 +121,12 @@ class RowsFetchSpecExtensionsTests {
}

@Test
fun awaitFirstOrNullWithValue() {
fun awaitSingleOrNullWithValue() {
val spec = mockk<RowsFetchSpec<String>>()
every { spec.first() } returns Mono.just("foo")

runBlocking {
assertThat(spec.awaitFirstOrNull()).isEqualTo("foo")
assertThat(spec.awaitSingleOrNull()).isEqualTo("foo")
}

verify {
Expand All @@ -135,12 +135,12 @@ class RowsFetchSpecExtensionsTests {
}

@Test
fun awaitFirstOrNullWithNull() {
fun awaitSingleOrNullWithNull() {
val spec = mockk<RowsFetchSpec<String>>()
every { spec.first() } returns Mono.empty()

runBlocking {
assertThat(spec.awaitFirstOrNull()).isNull()
assertThat(spec.awaitSingleOrNull()).isNull()
}

verify {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.springframework.web.reactive.function.client

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactive.awaitSingleOrNull
import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.coroutines.reactive.asFlow
import org.springframework.core.ParameterizedTypeReference
Expand Down Expand Up @@ -115,7 +115,7 @@ suspend fun <T : Any> ClientResponse.awaitBody(clazz: KClass<T>): T =
* @since 5.2
*/
suspend inline fun <reified T : Any> ClientResponse.awaitBodyOrNull(): T? =
bodyToMono<T>().awaitFirstOrNull()
bodyToMono<T>().awaitSingleOrNull()

/**
* `KClass` nullable coroutines variant of [ClientResponse.bodyToMono].
Expand All @@ -125,7 +125,7 @@ suspend inline fun <reified T : Any> ClientResponse.awaitBodyOrNull(): T? =
* @since 5.3
*/
suspend fun <T : Any> ClientResponse.awaitBodyOrNull(clazz: KClass<T>): T? =
bodyToMono(clazz.java).awaitFirstOrNull()
bodyToMono(clazz.java).awaitSingleOrNull()

/**
* Coroutines variant of [ClientResponse.toEntity].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.springframework.web.reactive.function.client
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.awaitFirst
import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.coroutines.reactor.asFlux
import kotlinx.coroutines.reactor.mono
Expand Down Expand Up @@ -87,7 +87,7 @@ suspend fun RequestHeadersSpec<out RequestHeadersSpec<*>>.awaitExchange(): Clien
* @since 5.3
*/
suspend fun <T: Any> RequestHeadersSpec<out RequestHeadersSpec<*>>.awaitExchange(responseHandler: suspend (ClientResponse) -> T): T =
exchangeToMono { mono(Dispatchers.Unconfined) { responseHandler.invoke(it) } }.awaitFirst()
exchangeToMono { mono(Dispatchers.Unconfined) { responseHandler.invoke(it) } }.awaitSingle()

/**
* Coroutines variant of [WebClient.RequestHeadersSpec.exchangeToFlux].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.springframework.web.reactive.function.server

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.reactive.awaitFirst
import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.coroutines.reactor.mono
import org.springframework.core.io.Resource
import org.springframework.http.HttpMethod
Expand Down Expand Up @@ -532,7 +532,7 @@ class CoRouterFunctionDsl internal constructor (private val init: (CoRouterFunct
builder.filter { serverRequest, handlerFunction ->
mono(Dispatchers.Unconfined) {
filterFunction(serverRequest) {
handlerFunction.handle(serverRequest).awaitFirst()
handlerFunction.handle(serverRequest).awaitSingle()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.springframework.web.reactive.function.server

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactive.awaitSingleOrNull
import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.coroutines.reactive.asFlow
import org.springframework.core.ParameterizedTypeReference
Expand Down Expand Up @@ -99,7 +99,7 @@ suspend fun <T : Any> ServerRequest.awaitBody(clazz: KClass<T>): T =
* @since 5.2
*/
suspend inline fun <reified T : Any> ServerRequest.awaitBodyOrNull(): T? =
bodyToMono<T>().awaitFirstOrNull()
bodyToMono<T>().awaitSingleOrNull()

/**
* `KClass` nullable Coroutines variant of [ServerRequest.bodyToMono].
Expand All @@ -109,7 +109,7 @@ suspend inline fun <reified T : Any> ServerRequest.awaitBodyOrNull(): T? =
* @since 5.3
*/
suspend fun <T : Any> ServerRequest.awaitBodyOrNull(clazz: KClass<T>): T? =
bodyToMono(clazz.java).awaitFirstOrNull()
bodyToMono(clazz.java).awaitSingleOrNull()

/**
* Coroutines variant of [ServerRequest.formData].
Expand All @@ -136,7 +136,7 @@ suspend fun ServerRequest.awaitMultipartData(): MultiValueMap<String, Part> =
* @since 5.2
*/
suspend fun ServerRequest.awaitPrincipal(): Principal? =
principal().awaitFirstOrNull()
principal().awaitSingleOrNull()

/**
* Coroutines variant of [ServerRequest.session].
Expand Down
6 changes: 3 additions & 3 deletions src/docs/asciidoc/data-access.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -6749,7 +6749,7 @@ The following query gets the `id` and `name` columns from a table:
.Kotlin
----
val first = client.sql("SELECT id, name FROM person")
.fetch().awaitFirst()
.fetch().awaitSingle()
----

The following query uses a bind variable:
Expand All @@ -6766,7 +6766,7 @@ The following query uses a bind variable:
----
val first = client.sql("SELECT id, name FROM person WHERE WHERE first_name = :fn")
.bind("fn", "Joe")
.fetch().awaitFirst()
.fetch().awaitSingle()
----

You might have noticed the use of `fetch()` in the example above. `fetch()` is a
Expand All @@ -6776,7 +6776,7 @@ Calling `first()` returns the first row from the result and discards remaining r
You can consume data with the following operators:

* `first()` return the first row of the entire result. Its Kotlin Coroutine variant
is named `awaitFirst()` for non-nullable return values and `awaitFirstOrNull()`
is named `awaitSingle()` for non-nullable return values and `awaitSingleOrNull()`
if the value is optional.
* `one()` returns exactly one result and fails if the result contains more rows.
Using Kotlin Coroutines, `awaitOne()` for exactly one value or `awaitOneOrNull()`
Expand Down
2 changes: 1 addition & 1 deletion src/docs/asciidoc/languages/kotlin.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ dependencies {
}
----

Version `1.3.9` and above are supported.
Version `1.4.0-M1` and above are supported.

=== How Reactive translates to Coroutines?

Expand Down
2 changes: 1 addition & 1 deletion src/docs/asciidoc/rsocket.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ Then start an RSocket server through the Java RSocket API and plug the
val server = RSocketServer.create(handler.responder())
.bind(TcpServerTransport.create("localhost", 7000))
.awaitFirst()
.awaitSingle()
----

`RSocketMessageHandler` supports
Expand Down
2 changes: 1 addition & 1 deletion src/docs/asciidoc/web/webflux-functional.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ Flux<Person> people = request.body(BodyExtractors.toFlux(Person.class));
[source,kotlin,role="secondary"]
.Kotlin
----
val string = request.body(BodyExtractors.toMono(String::class.java)).awaitFirst()
val string = request.body(BodyExtractors.toMono(String::class.java)).awaitSingle()
val people = request.body(BodyExtractors.toFlux(Person::class.java)).asFlow()
----

Expand Down

0 comments on commit 3ed8813

Please sign in to comment.