Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kotlin Multi<T>.asFlow() blocking the thread #1103

Closed
j128919965 opened this issue Oct 28, 2022 · 6 comments · Fixed by #1104 or #1105
Closed

kotlin Multi<T>.asFlow() blocking the thread #1103

j128919965 opened this issue Oct 28, 2022 · 6 comments · Fixed by #1104 or #1105
Assignees
Labels
bug Something isn't working Kotlin Kotlin support

Comments

@j128919965
Copy link

j128919965 commented Oct 28, 2022

Description

Hi. I'm using quarkus with mutiny and kotlin. I found that when the number of elements in the Multi reaches a large number,Multi<T>.asFlow() will blocking the thread,and never recovery.

The log :

2022-10-26 20:15:23,550 WARN  [io.ver.cor.imp.BlockedThreadChecker] (vertx-blocked-thread-checker) Thread Thread[vert.x-eventloop-thread-6,5,main] has been blocked for 323816 ms, time limit is 2000 ms: io.vertx.core.VertxException: Thread blocked

My code:

 @ExperimentalCoroutinesApi
    suspend fun getByProj(projectId: Int): List<InputDataLoadResult> =
        pgPool.preparedQuery("select * from input_data where project_id=$1")
            .execute(Tuple.of(projectId))
            .onItem().transformToMulti { Multi.createFrom().iterable(it) }
            .filter { it != null }
            .map { InputDataLoadResult(it) }
            .asFlow()
            .toList()

I noticed that io.smallrye.mutiny.coroutines Multi.kt:53 uses runblocking to send items to the channel. I tried to change it to launch, the code is no longer blocking.

I dont know is this a bug? Or some other reason to use runblocking rather than using launch?

Env

windows11
graalvm ce 17
kotlin 1.7.20
quarkus 2.13.3.Final

@heubeck
Copy link
Collaborator

heubeck commented Oct 28, 2022

Thanks for reporting, need to have a detailed look (again ;)).
runblocking should join the emitting thread whereas launch would detach.

@j128919965
Copy link
Author

emmmm,I'm a bit green, can't understand you completely. Do you mean runBlocking should be used here ?
anyway,I want to know how can I avoid blocking the eventloop thread when using asFlow ? :)
thank for a lot

@heubeck
Copy link
Collaborator

heubeck commented Oct 28, 2022

I'm not saying runBlocking should be used, but only that I thought it behaves that way, at least according to the current unit tests.

You can definite confirm, that your example without asFlow doesn't block?
Can you build a little reproducer, that doesn't require a kind of infinite external data source?

@j128919965
Copy link
Author

You can definite confirm, that your example without asFlow doesn't block?

yes. When I change .asFlow().toList() to .collect().asList().awaitSuspending(), it no longer blocks.

suspend fun getByProj(projectId: Int): List<InputDataLoadResult> =
    pgPool.preparedQuery("select * from input_data where project_id=$1")
        .execute(Tuple.of(projectId))
        .onItem().transformToMulti { Multi.createFrom().iterable(it) }
        .filter { it != null }
        .map { InputDataLoadResult(it) }
        .collect()
        .asList()
        .awaitSuspending()
//        .asFlow()
//        .toList()

Can you build a little reproducer, that doesn't require a kind of infinite external data source?

The following code shows this problem well.The function getTestStrings runs normal in fun main = runBlocking {...}, but it not work well in web resource (suspend fun test() = getTestStrings()).

@Path("/")
class TestResource {
    @GET
    @Path("/test")
    @ExperimentalCoroutinesApi
    suspend fun test() = getTestStrings()
}


@ExperimentalCoroutinesApi
suspend fun getTestStrings() : List<String> {
    val l = ArrayList<String>()
    for (i in 1..100) {
        l.add("this is item $i")
    }
    val collect = Multi.createFrom().iterable(l)
        .asFlow()
        .toList()
    println("get all items")
    return collect
}


fun main() = runBlocking {
    getTestStrings()
    println("returns success")
    return@runBlocking
}

the log :

2022-10-29 10:04:57,762 WARN  [io.ver.cor.imp.BlockedThreadChecker] (vertx-blocked-thread-checker) Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 2696 ms, time limit is 2000 ms: io.vertx.core.VertxException: Thread blocked
	at [email protected]/jdk.internal.misc.Unsafe.park(Native Method)
	at [email protected]/java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:252)
	at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:88)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
	at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
	at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
	at io.smallrye.mutiny.coroutines.MultiKt$asFlow$2$subscriber$1.onItem(Multi.kt:53)
	at io.smallrye.mutiny.operators.multi.builders.IterableBasedMulti$IteratorSubscription.fastPath(IterableBasedMulti.java:108)
	at io.smallrye.mutiny.operators.multi.builders.IterableBasedMulti$IteratorSubscription.request(IterableBasedMulti.java:70)
	at io.smallrye.mutiny.coroutines.MultiKt$asFlow$2$subscriber$1.onSubscribe(Multi.kt:45)
	at io.smallrye.mutiny.operators.multi.builders.IterableBasedMulti.subscribe(IterableBasedMulti.java:49)
	at io.smallrye.mutiny.operators.multi.builders.IterableBasedMulti.subscribe(IterableBasedMulti.java:33)
	at io.smallrye.mutiny.groups.MultiSubscribe.withSubscriber(MultiSubscribe.java:72)
	at io.smallrye.mutiny.coroutines.MultiKt$asFlow$2.invokeSuspend(Multi.kt:78)
	at io.smallrye.mutiny.coroutines.MultiKt$asFlow$2.invoke(Multi.kt)
	at io.smallrye.mutiny.coroutines.MultiKt$asFlow$2.invoke(Multi.kt)
	at kotlinx.coroutines.flow.ChannelFlowBuilder.collectTo$suspendImpl(Builders.kt:322)
	at kotlinx.coroutines.flow.ChannelFlowBuilder.collectTo(Builders.kt)
	at kotlinx.coroutines.flow.internal.ChannelFlow$collectToFun$1.invokeSuspend(ChannelFlow.kt:60)
	at app//kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
	at org.jboss.resteasy.reactive.server.runtime.kotlin.VertxDispatcher.dispatch$lambda$0(ApplicationCoroutineScope.kt:43)
	at org.jboss.resteasy.reactive.server.runtime.kotlin.VertxDispatcher$$Lambda$1761/0x0000000801352bb0.handle(Unknown Source)
	at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:264)
	at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:246)
	at io.vertx.core.impl.EventLoopContext.lambda$runOnContext$0(EventLoopContext.java:43)
	at io.vertx.core.impl.EventLoopContext$$Lambda$1717/0x00000008012dd608.run(Unknown Source)
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at [email protected]/java.lang.Thread.run(Thread.java:833)

@heubeck
Copy link
Collaborator

heubeck commented Oct 30, 2022

Ok, I could properly reproduce it using your example - thank you.
The blocking was reasoned by the channel buffer capacity that underlies the Flow.

@heubeck heubeck added bug Something isn't working Kotlin Kotlin support labels Oct 30, 2022
@heubeck heubeck linked a pull request Oct 30, 2022 that will close this issue
@jponge jponge added this to the 2.0.0 milestone Oct 31, 2022
jponge added a commit that referenced this issue Oct 31, 2022
#1103 Refactor Multi.toFlow() to avoid blocking - Mutiny2
jponge added a commit that referenced this issue Nov 2, 2022
#1103 Refactor Multi.toFlow() to avoid blocking - Mutiny1
@jponge jponge modified the milestones: 2.0.0, 2.0.0-milestone3 Nov 9, 2022
@heubeck
Copy link
Collaborator

heubeck commented Nov 9, 2022

Hey @j128919965,

Mutiny 1.8 just go released, containing this issue's solution.
Please let us know how it goes in your use-case.

Best regards - and keep reporting ;)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working Kotlin Kotlin support
Projects
None yet
3 participants