Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelism limitation of 100 VirtualThread in Flux #3857

Closed
expe-elenigen opened this issue Jul 29, 2024 · 2 comments
Closed

Parallelism limitation of 100 VirtualThread in Flux #3857

expe-elenigen opened this issue Jul 29, 2024 · 2 comments
Assignees
Labels
type/bug A general bug
Milestone

Comments

@expe-elenigen
Copy link

Expected Behavior

With Java 21 VirtualThread, I was expecting to be able to run a huge number of virtual threads in parallel, since they are much lighter to handle. I tried to run a Flux configured with a schedule which is a Schedulers.boundedElastic() while enabling the virtual threads: -Dreactor.schedulers.defaultBoundedElasticOnVirtualThreads=true.

The test is simple, in a loop, I run simple Thread sleep of 1 sec and at first it worked, if I picked a value 100 virtual threads, I can see them from loomBoundedElastic-1 to loomBoundedElastic-100 are executed and the total time elapsed is ~1 sec but if I double this number to 200, I would still expect to see ~1 sec.

Actual Behavior

With 200, I see a different pattern where the first 100 virtual threads are executed with the same ~1 sec, but then the remaining 100 are executed in ~ 100 sec for a total of 101542 ms.

Steps to Reproduce

Here's the code:

package com.my.test

import mu.KotlinLogging
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers

class Main

private val log = KotlinLogging.logger {}

const val ONE_SEC = 1_000L
const val TOTAL = 200

fun main(args: Array<String>) {
    val steps = (1..TOTAL).toList()
    val scheduler = Schedulers.boundedElastic()
    val startTime = System.currentTimeMillis()
    log.info { "=== START ===" }
    Flux
        .fromIterable(steps)
        .flatMap { step ->
            Mono
                .fromCallable {
                    Thread.sleep(ONE_SEC)
                }.log()
                .subscribeOn(scheduler)
        }.collectList()
        .block()
    log.info { "=== END ===" }
    log.info { "Time taken: ${System.currentTimeMillis() - startTime} ms" }
}
Logs TOTAL = 200
10:47:10.905 [main] INFO com.my.test.Main -- === START ===
10:47:10.957 [loomBoundedElastic-6] INFO reactor.Mono.Callable.6 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:47:10.957 [loomBoundedElastic-4] INFO reactor.Mono.Callable.4 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:47:10.959 [loomBoundedElastic-11] INFO reactor.Mono.Callable.11 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:47:10.957 [loomBoundedElastic-9] INFO reactor.Mono.Callable.9 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:47:10.957 [loomBoundedElastic-2] INFO reactor.Mono.Callable.2 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:47:10.959 [loomBoundedElastic-11] INFO reactor.Mono.Callable.11 -- | request(32)
10:47:10.957 [loomBoundedElastic-1] INFO reactor.Mono.Callable.1 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:47:10.959 [loomBoundedElastic-1] INFO reactor.Mono.Callable.1 -- | request(32)
10:47:10.960 [loomBoundedElastic-6] INFO reactor.Mono.Callable.6 -- | request(32)
10:47:10.957 [loomBoundedElastic-7] INFO reactor.Mono.Callable.7 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
...
10:47:11.969 [loomBoundedElastic-73] INFO reactor.Mono.Callable.73 -- | onComplete()
10:47:12.975 [loomBoundedElastic-101] INFO reactor.Mono.Callable.101 -- | onNext(kotlin.Unit)
10:47:12.976 [loomBoundedElastic-101] INFO reactor.Mono.Callable.101 -- | onComplete()
10:47:12.976 [loomBoundedElastic-102] INFO reactor.Mono.Callable.102 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:47:12.977 [loomBoundedElastic-102] INFO reactor.Mono.Callable.102 -- | request(32)
10:47:13.978 [loomBoundedElastic-102] INFO reactor.Mono.Callable.102 -- | onNext(kotlin.Unit)
10:47:13.979 [loomBoundedElastic-102] INFO reactor.Mono.Callable.102 -- | onComplete()
10:47:13.980 [loomBoundedElastic-103] INFO reactor.Mono.Callable.103 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:47:13.980 [loomBoundedElastic-103] INFO reactor.Mono.Callable.103 -- | request(32)
10:47:14.985 [loomBoundedElastic-103] INFO reactor.Mono.Callable.103 -- | onNext(kotlin.Unit)
10:47:14.986 [loomBoundedElastic-103] INFO reactor.Mono.Callable.103 -- | onComplete()
10:47:14.986 [loomBoundedElastic-104] INFO reactor.Mono.Callable.104 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:47:14.987 [loomBoundedElastic-104] INFO reactor.Mono.Callable.104 -- | request(32)
10:47:15.991 [loomBoundedElastic-104] INFO reactor.Mono.Callable.104 -- | onNext(kotlin.Unit)
10:47:15.992 [loomBoundedElastic-104] INFO reactor.Mono.Callable.104 -- | onComplete()
10:47:15.992 [loomBoundedElastic-105] INFO reactor.Mono.Callable.105 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:47:15.993 [loomBoundedElastic-105] INFO reactor.Mono.Callable.105 -- | request(32)
10:47:16.995 [loomBoundedElastic-105] INFO reactor.Mono.Callable.105 -- | onNext(kotlin.Unit)
10:47:16.995 [loomBoundedElastic-105] INFO reactor.Mono.Callable.105 -- | onComplete()
10:47:16.996 [loomBoundedElastic-106] INFO reactor.Mono.Callable.106 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:47:16.996 [loomBoundedElastic-106] INFO reactor.Mono.Callable.106 -- | request(32)
10:47:17.998 [loomBoundedElastic-106] INFO reactor.Mono.Callable.106 -- | onNext(kotlin.Unit)
...
10:48:51.440 [loomBoundedElastic-200] INFO reactor.Mono.Callable.200 -- | request(32)
10:48:52.443 [loomBoundedElastic-200] INFO reactor.Mono.Callable.200 -- | onNext(kotlin.Unit)
10:48:52.443 [loomBoundedElastic-200] INFO reactor.Mono.Callable.200 -- | onComplete()
10:48:52.445 [main] INFO com.my.test.Main -- === END ===
10:48:52.448 [main] INFO com.my.test.Main -- Time taken: 101542 ms
Logs TOTAL = 100
10:59:05.849 [main] INFO com.my.test.Main -- === START ===
10:59:05.905 [loomBoundedElastic-7] INFO reactor.Mono.Callable.7 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:59:05.905 [loomBoundedElastic-2] INFO reactor.Mono.Callable.2 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:59:05.905 [loomBoundedElastic-5] INFO reactor.Mono.Callable.5 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
...
10:59:06.918 [loomBoundedElastic-30] INFO reactor.Mono.Callable.30 -- | onComplete()
10:59:06.919 [loomBoundedElastic-63] INFO reactor.Mono.Callable.63 -- | onComplete()
10:59:06.919 [loomBoundedElastic-65] INFO reactor.Mono.Callable.65 -- | onComplete()
10:59:06.919 [loomBoundedElastic-72] INFO reactor.Mono.Callable.72 -- | onComplete()
10:59:06.919 [loomBoundedElastic-70] INFO reactor.Mono.Callable.70 -- | onComplete()
10:59:06.919 [loomBoundedElastic-86] INFO reactor.Mono.Callable.86 -- | onComplete()
10:59:06.920 [main] INFO com.my.test.Main -- === END ===
10:59:06.922 [main] INFO com.my.test.Main -- Time taken: 1073 ms

Possible Solution

I didn't see anywhere mentioning a limitation of 100 virtual threads and I'm curious also if the threads are supposed to be reused or they are simply disposable? I was also thinking, if there's a cap of 100 threads, once the first batch is processed, the second 100 tasks should be done within 1 sec, so a total of ~2 sec.

Your Environment

MacBook Pro M1

  • Reactor version(s) used: 3.6.0
  • Other relevant libraries versions (eg. netty, ...): N/A
  • JVM version (java -version): OpenJDK Runtime Environment Temurin-21.0.2+13 (build 21.0.2+13-LTS)
  • OS and version (eg uname -a): 23.5.0 Darwin Kernel Version 23.5.0
@chemicL chemicL added ❓need-triage This issue needs triage, hasn't been looked at by a team member yet status/need-investigation This needs more in-depth investigation and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet labels Jul 29, 2024
@chemicL chemicL self-assigned this Aug 1, 2024
@chemicL
Copy link
Member

chemicL commented Aug 1, 2024

Hey @expe-elenigen! Thank you for the report.

This VT-based Schedulers.boundedElastic() has the same limitation of 10 * availableCPUs as the regular one.

You can change that default with the reactor.schedulers.defaultBoundedElasticSize System property.

I will investigate the other effects you are seeing but please consider the above config for the time being.

@chemicL chemicL added type/bug A general bug and removed status/need-investigation This needs more in-depth investigation labels Aug 1, 2024
@chemicL chemicL added this to the 3.6.9 milestone Aug 1, 2024
@chemicL
Copy link
Member

chemicL commented Aug 1, 2024

I didn't see anywhere mentioning a limitation of 100 virtual threads

This aspect is explained in my above comment.

I'm curious also if the threads are supposed to be reused or they are simply disposable?

The Virtual Threads are not reused, they use a thread-per-task model with in-order processing within a Worker. A particular operator/chain (like Mono#subscribeOn) allocates a Worker and as new tasks are scheduled they need to maintain the order so the Virtual Threads are created and their completion is awaited sequentially for a particular Worker.

I was also thinking, if there's a cap of 100 threads, once the first batch is processed, the second 100 tasks should be done within 1 sec, so a total of ~2 sec.

Absolutely correct. flatMap does not have any sequentiality with regards to the completion of individual Monos that you create. You have encountered a bug which schedules all Mono subscriptions to the same Worker. With what I explained above, it appears that all the Callables are exercised sequentially after the first batch which is not correct.

I will push a fix soon.

chemicL added a commit that referenced this issue Aug 1, 2024
This change prevents the same `BoundedElasticThreadPerTaskScheduler` being
picked up when the maximum number of Virtual Threads are already being
executed in parallel. The consequence of improper busyness accounting
was that tasks were executed sequentially instead of being run in
parallel because the same `Worker` was being picked by operators.

Resolves #3857
@chemicL chemicL closed this as completed in bcab229 Aug 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug A general bug
Projects
None yet
Development

No branches or pull requests

2 participants