diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-kotlin/runtime/pom.xml b/extensions/resteasy-reactive/quarkus-resteasy-reactive-kotlin/runtime/pom.xml
index beddca0f79740..c73c4e7b6290b 100644
--- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-kotlin/runtime/pom.xml
+++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-kotlin/runtime/pom.xml
@@ -27,8 +27,8 @@
kotlinx-coroutines-jdk8
- org.jetbrains.kotlinx
- kotlinx-coroutines-reactive
+ io.smallrye.reactive
+ mutiny-kotlin
diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-kotlin/runtime/src/main/kotlin/org/jboss/resteasy/reactive/server/runtime/kotlin/FlowToPublisherHandler.kt b/extensions/resteasy-reactive/quarkus-resteasy-reactive-kotlin/runtime/src/main/kotlin/org/jboss/resteasy/reactive/server/runtime/kotlin/FlowToPublisherHandler.kt
index 237ad9b2fa90b..7ae72f6c13a6e 100644
--- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-kotlin/runtime/src/main/kotlin/org/jboss/resteasy/reactive/server/runtime/kotlin/FlowToPublisherHandler.kt
+++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-kotlin/runtime/src/main/kotlin/org/jboss/resteasy/reactive/server/runtime/kotlin/FlowToPublisherHandler.kt
@@ -1,17 +1,34 @@
package org.jboss.resteasy.reactive.server.runtime.kotlin
+import io.smallrye.mutiny.coroutines.asMulti
+import io.vertx.core.Vertx
+import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.flow.Flow
-import kotlinx.coroutines.reactive.asPublisher
+import kotlinx.coroutines.launch
import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext
import org.jboss.resteasy.reactive.server.spi.ServerRestHandler
+import javax.enterprise.inject.spi.CDI
class FlowToPublisherHandler : ServerRestHandler {
+ private val originalTCCL: ClassLoader = Thread.currentThread().contextClassLoader
+
override fun handle(requestContext: ResteasyReactiveRequestContext?) {
val result = requestContext!!.result
if (result is Flow<*>) {
- requestContext.result = (result as Flow) // cast needed for extension function
- .asPublisher()
+
+ val requestScope = requestContext.captureCDIRequestScope()
+ val dispatcher: CoroutineDispatcher = Vertx.currentContext()?.let {VertxDispatcher(it,requestScope)}
+ ?: throw IllegalStateException("No Vertx context found")
+
+ val coroutineScope = CDI.current().select(ApplicationCoroutineScope::class.java)
+ requestContext.suspend()
+ coroutineScope.get().launch(context = dispatcher) {
+ // ensure the proper CL is not lost in dev-mode
+ Thread.currentThread().contextClassLoader = originalTCCL
+ requestContext.result = result.asMulti()
+ requestContext.resume()
+ }
}
}
}
diff --git a/integration-tests/resteasy-reactive-kotlin/standard/src/main/kotlin/io/quarkus/it/resteasy/reactive/kotlin/FlowResource.kt b/integration-tests/resteasy-reactive-kotlin/standard/src/main/kotlin/io/quarkus/it/resteasy/reactive/kotlin/FlowResource.kt
index 1e7788b0f73f6..339caf171586c 100644
--- a/integration-tests/resteasy-reactive-kotlin/standard/src/main/kotlin/io/quarkus/it/resteasy/reactive/kotlin/FlowResource.kt
+++ b/integration-tests/resteasy-reactive-kotlin/standard/src/main/kotlin/io/quarkus/it/resteasy/reactive/kotlin/FlowResource.kt
@@ -9,16 +9,16 @@ import javax.ws.rs.Produces
import javax.ws.rs.core.MediaType
@Path("flow")
-class FlowResource {
+class FlowResource(private val uppercaseService: UppercaseService) {
@GET
@Path("str")
@Produces(MediaType.SERVER_SENT_EVENTS)
fun sseStrings() = flow {
- emit("Hello")
- emit("From")
- emit("Kotlin")
- emit("Flow")
+ emit(uppercaseService.convert("Hello"))
+ emit(uppercaseService.convert("From"))
+ emit(uppercaseService.convert("Kotlin"))
+ emit(uppercaseService.convert("Flow"))
}
@GET
diff --git a/integration-tests/resteasy-reactive-kotlin/standard/src/main/kotlin/io/quarkus/it/resteasy/reactive/kotlin/UppercaseService.kt b/integration-tests/resteasy-reactive-kotlin/standard/src/main/kotlin/io/quarkus/it/resteasy/reactive/kotlin/UppercaseService.kt
new file mode 100644
index 0000000000000..131167520db44
--- /dev/null
+++ b/integration-tests/resteasy-reactive-kotlin/standard/src/main/kotlin/io/quarkus/it/resteasy/reactive/kotlin/UppercaseService.kt
@@ -0,0 +1,10 @@
+package io.quarkus.it.resteasy.reactive.kotlin
+
+import java.util.Locale
+import javax.enterprise.context.RequestScoped
+
+@RequestScoped
+class UppercaseService {
+
+ fun convert(input: String) = input.uppercase(Locale.ROOT)
+}
diff --git a/integration-tests/resteasy-reactive-kotlin/standard/src/test/kotlin/io/quarkus/it/resteasy/reactive/kotlin/FlowResourceTest.kt b/integration-tests/resteasy-reactive-kotlin/standard/src/test/kotlin/io/quarkus/it/resteasy/reactive/kotlin/FlowResourceTest.kt
index 6340e285597b1..fd1adfefa40af 100644
--- a/integration-tests/resteasy-reactive-kotlin/standard/src/test/kotlin/io/quarkus/it/resteasy/reactive/kotlin/FlowResourceTest.kt
+++ b/integration-tests/resteasy-reactive-kotlin/standard/src/test/kotlin/io/quarkus/it/resteasy/reactive/kotlin/FlowResourceTest.kt
@@ -20,7 +20,7 @@ class FlowResourceTest {
@Test
fun testSeeStrings() {
testSse("str", 5) {
- assertThat(it).containsExactly("Hello", "From", "Kotlin", "Flow")
+ assertThat(it).containsExactly("HELLO", "FROM", "KOTLIN", "FLOW")
}
}