Skip to content

Commit

Permalink
feat: let spawn_concurrency_level configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
wathenjiang committed Sep 21, 2023
1 parent 833377c commit 70c5591
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 15 deletions.
55 changes: 55 additions & 0 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ pub struct Builder {
/// Only used when not using the current-thread executor.
worker_threads: Option<usize>,

/// Configures the global OwnedTasks's concurrency level
///
/// Only used when not using the current-thread executor.
pub(super) spawn_concurrency_level: Option<usize>,

/// Cap on thread usage.
max_blocking_threads: usize,

Expand Down Expand Up @@ -278,6 +283,9 @@ impl Builder {
// Default to lazy auto-detection (one thread per CPU core)
worker_threads: None,

// Default to lazy auto-detection (twice the number of worker threads)
spawn_concurrency_level: None,

max_blocking_threads: 512,

// Default thread name
Expand Down Expand Up @@ -401,6 +409,49 @@ impl Builder {
self
}

/// Sets the spawn concurrency level the `Runtime` will use.
///
/// This can be any number greater than 0 and less than or equal to 65536,
/// if the parameter is larger than this value, concurrency level will actually select 65536 internally.
///
/// This should be set according to the expected scale of multi-thread concurrency of `tokio::spawn`,
/// This requires a trade-off between concurrency scale and CPU's cache.
///
/// # Default
///
/// The default value is twice the number of worker threads.
///
/// When using the `current_thread` runtime this method has no effect.
///
/// # Examples
///
/// ## Multi threaded runtime with spawn_concurrency_level 8
///
/// ```
/// use tokio::runtime;
///
/// // This will spawn a work-stealing runtime with 4 worker threads.
/// let rt = runtime::Builder::new_multi_thread()
/// .spawn_concurrency_level(8)
/// .build()
/// .unwrap();
///
/// rt.spawn(async move {});
/// ```
///
/// # Panics
///
/// This will panic if `val` is not larger than `0`.
#[track_caller]
pub fn spawn_concurrency_level(&mut self, mut val: usize) -> &mut Self {
assert!(val > 0, "spawn concurrency level cannot be set to 0");
if val > 1 << 16 {
val = 1 << 16;
}
self.spawn_concurrency_level = Some(val);
self
}

/// Specifies the limit for additional threads spawned by the Runtime.
///
/// These threads are used for blocking operations like tasks spawned
Expand Down Expand Up @@ -1231,6 +1282,7 @@ cfg_rt_multi_thread! {
use crate::runtime::scheduler::{self, MultiThread};

let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
let spawn_concurrency_level = self.spawn_concurrency_level.unwrap_or_else(|| core_threads * 2);

let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;

Expand All @@ -1249,6 +1301,7 @@ cfg_rt_multi_thread! {
driver_handle,
blocking_spawner,
seed_generator_2,
spawn_concurrency_level,
Config {
before_park: self.before_park.clone(),
after_unpark: self.after_unpark.clone(),
Expand Down Expand Up @@ -1279,6 +1332,7 @@ cfg_rt_multi_thread! {
use crate::runtime::scheduler::MultiThreadAlt;

let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
let spawn_concurrency_level = self.spawn_concurrency_level.unwrap_or_else(|| core_threads * 2);

let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;

Expand Down Expand Up @@ -1321,6 +1375,7 @@ impl fmt::Debug for Builder {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Builder")
.field("worker_threads", &self.worker_threads)
.field("spawn_concurrency_level", &self.spawn_concurrency_level)
.field("max_blocking_threads", &self.max_blocking_threads)
.field(
"thread_name",
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ impl MultiThread {
driver_handle: driver::Handle,
blocking_spawner: blocking::Spawner,
seed_generator: RngSeedGenerator,
spawn_concurrency_level: usize,
config: Config,
) -> (MultiThread, Arc<Handle>, Launch) {
let parker = Parker::new(driver);
Expand All @@ -69,6 +70,7 @@ impl MultiThread {
driver_handle,
blocking_spawner,
seed_generator,
spawn_concurrency_level,
config,
);

Expand Down
3 changes: 2 additions & 1 deletion tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ pub(super) fn create(
driver_handle: driver::Handle,
blocking_spawner: blocking::Spawner,
seed_generator: RngSeedGenerator,
spawn_concurrency_level: usize,
config: Config,
) -> (Arc<Handle>, Launch) {
let mut cores = Vec::with_capacity(size);
Expand Down Expand Up @@ -287,7 +288,7 @@ pub(super) fn create(
remotes: remotes.into_boxed_slice(),
inject,
idle,
owned: OwnedTasks::new(16),
owned: OwnedTasks::new(spawn_concurrency_level as u32),
synced: Mutex::new(Synced {
idle: idle_synced,
inject: inject_synced,
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread_alt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ impl MultiThread {
driver_handle: driver::Handle,
blocking_spawner: blocking::Spawner,
seed_generator: RngSeedGenerator,
spawn_concurrency_level: usize,
config: Config,
) -> (MultiThread, runtime::Handle) {
let handle = worker::create(
Expand All @@ -57,6 +58,7 @@ impl MultiThread {
driver_handle,
blocking_spawner,
seed_generator,
spawn_concurrency_level,
config,
);

Expand Down
3 changes: 2 additions & 1 deletion tokio/src/runtime/scheduler/multi_thread_alt/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ pub(super) fn create(
driver_handle: driver::Handle,
blocking_spawner: blocking::Spawner,
seed_generator: RngSeedGenerator,
spawn_concurrency_level: usize,
config: Config,
) -> runtime::Handle {
let mut num_workers = num_cores;
Expand Down Expand Up @@ -307,7 +308,7 @@ pub(super) fn create(
remotes: remotes.into_boxed_slice(),
inject,
idle,
owned: OwnedTasks::new(16),
owned: OwnedTasks::new(spawn_concurrency_level as u32),
synced: Mutex::new(Synced {
assigned_cores: (0..num_workers).map(|_| None).collect(),
shutdown_cores: Vec::with_capacity(num_cores),
Expand Down
14 changes: 1 addition & 13 deletions tokio/src/runtime/task/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,7 @@ struct OwnedTasksInner<S: 'static> {
}

impl<S: 'static> OwnedTasks<S> {
/// The concurrency_level should be set according to the expected scale of multi-thread concurrency.
/// A large concurrency_level should not affect performance, but might affect the CPU's cache.
/// The concurrency_level is at least one, otherwise it will panic.
/// The maximum concurrency_level is 65536,
/// if the parameter is larger than this value, OwnedTasks will actually select 65536 internally.
pub(crate) fn new(mut concurrency_level: u32) -> Self {
if concurrency_level > 1 << 16 {
concurrency_level = 1 << 16;
}
assert!(
concurrency_level > 0,
"concurrency_level must be at least one"
);
pub(crate) fn new(concurrency_level: u32) -> Self {
// Find power-of-two sizes best matching arguments
let mut segment_size = 1;
while segment_size < concurrency_level {
Expand Down

0 comments on commit 70c5591

Please sign in to comment.