From bac5523ea0025988361a092c1f0e7f4eb90f6ad7 Mon Sep 17 00:00:00 2001 From: The 8472 Date: Sun, 9 Jan 2022 19:39:02 +0100 Subject: [PATCH 1/3] Use cgroup quotas for calculating `available_parallelism` Manually tested via ``` // spawn a new cgroup scope for the current user $ sudo systemd-run -p CPUQuota="300%" --uid=$(id -u) -tdS // quota.rs #![feature(available_parallelism)] fn main() { println!("{:?}", std::thread::available_parallelism()); // prints Ok(3) } ``` Caveats * cgroup v1 is ignored * funky mountpoints (containing spaces, newlines or control chars) for cgroupfs will not be handled correctly since that would require unescaping /proc/self/mountinfo The escaping behavior of procfs seems to be undocumented. systemd and docker default to `/sys/fs/cgroup` so it should be fine for most systems. * quota will be ignored when `sched_getaffinity` doesn't work * assumes procfs is mounted under `/proc` and cgroupfs mounted and readable somewhere in the directory tree --- library/std/src/sys/unix/thread.rs | 71 ++++++++++++++++++++++++++++-- 1 file changed, 68 insertions(+), 3 deletions(-) diff --git a/library/std/src/sys/unix/thread.rs b/library/std/src/sys/unix/thread.rs index cf8cf5ad49f73..933210e1ff02e 100644 --- a/library/std/src/sys/unix/thread.rs +++ b/library/std/src/sys/unix/thread.rs @@ -279,10 +279,15 @@ pub fn available_parallelism() -> io::Result { ))] { #[cfg(any(target_os = "android", target_os = "linux"))] { + let quota = cgroup2_quota().unwrap_or(usize::MAX).max(1); let mut set: libc::cpu_set_t = unsafe { mem::zeroed() }; - if unsafe { libc::sched_getaffinity(0, mem::size_of::(), &mut set) } == 0 { - let count = unsafe { libc::CPU_COUNT(&set) }; - return Ok(unsafe { NonZeroUsize::new_unchecked(count as usize) }); + unsafe { + if libc::sched_getaffinity(0, mem::size_of::(), &mut set) == 0 { + let count = libc::CPU_COUNT(&set) as usize; + let count = count.min(quota); + // SAFETY: affinity mask can't be empty and the quota gets clamped to a minimum of 1 + return Ok(NonZeroUsize::new_unchecked(count)); + } } } match unsafe { libc::sysconf(libc::_SC_NPROCESSORS_ONLN) } { @@ -368,6 +373,66 @@ pub fn available_parallelism() -> io::Result { } } +#[cfg(any(target_os = "android", target_os = "linux"))] +fn cgroup2_quota() -> Option { + use crate::ffi::OsString; + use crate::fs::{read, read_to_string, File}; + use crate::io::{BufRead, BufReader}; + use crate::os::unix::ffi::OsStringExt; + use crate::path::PathBuf; + + // find cgroup2 fs + let cgroups_mount = BufReader::new(File::open("/proc/self/mountinfo").ok()?) + .split(b'\n') + .map_while(Result::ok) + .filter_map(|line| { + let fields: Vec<_> = line.split(|&c| c == b' ').collect(); + let suffix_at = fields.iter().position(|f| f == b"-")?; + let fs_type = fields[suffix_at + 1]; + if fs_type == b"cgroup2" { Some(fields[4].to_owned()) } else { None } + }) + .next()?; + + let cgroups_mount = PathBuf::from(OsString::from_vec(cgroups_mount)); + + // find our place in the hierarchy + let cgroup_path = read("/proc/self/cgroup") + .ok()? + .split(|&c| c == b'\n') + .filter_map(|line| { + let mut fields = line.splitn(3, |&c| c == b':'); + // expect cgroupv2 which has an empty 2nd field + if fields.nth(1) != Some(b"") { + return None; + } + let path = fields.last()?; + // skip leading slash + Some(path[1..].to_owned()) + }) + .next()?; + let cgroup_path = PathBuf::from(OsString::from_vec(cgroup_path)); + + // walk hierarchy and take the minimum quota + cgroup_path + .ancestors() + .filter_map(|level| { + let cgroup_path = cgroups_mount.join(level); + let quota = match read_to_string(cgroup_path.join("cpu.max")) { + Ok(quota) => quota, + _ => return None, + }; + let quota = quota.lines().next()?; + let mut quota = quota.split(' '); + let limit = quota.next()?; + let period = quota.next()?; + match (limit.parse::(), period.parse::()) { + (Ok(limit), Ok(period)) => Some(limit / period), + _ => None, + } + }) + .min() +} + #[cfg(all( not(target_os = "linux"), not(target_os = "freebsd"), From af6d2ed24557694ff7d32bf2a29a6cd5aaade859 Mon Sep 17 00:00:00 2001 From: The 8472 Date: Thu, 3 Mar 2022 00:35:47 +0100 Subject: [PATCH 2/3] hardcode /sys/fs/cgroup instead of doing a lookup via mountinfo this avoids parsing mountinfo which can be huge on some systems and something might be emulating cgroup fs for sandboxing reasons which means it wouldn't show up as mountpoint additionally the new implementation operates on a single pathbuffer, reducing allocations --- library/std/src/sys/unix/thread.rs | 120 ++++++++++++++++------------- 1 file changed, 67 insertions(+), 53 deletions(-) diff --git a/library/std/src/sys/unix/thread.rs b/library/std/src/sys/unix/thread.rs index 933210e1ff02e..ff01ce2733329 100644 --- a/library/std/src/sys/unix/thread.rs +++ b/library/std/src/sys/unix/thread.rs @@ -279,7 +279,7 @@ pub fn available_parallelism() -> io::Result { ))] { #[cfg(any(target_os = "android", target_os = "linux"))] { - let quota = cgroup2_quota().unwrap_or(usize::MAX).max(1); + let quota = cgroup2_quota().max(1); let mut set: libc::cpu_set_t = unsafe { mem::zeroed() }; unsafe { if libc::sched_getaffinity(0, mem::size_of::(), &mut set) == 0 { @@ -373,64 +373,78 @@ pub fn available_parallelism() -> io::Result { } } +/// Returns cgroup CPU quota in core-equivalents, rounded down, or usize::MAX if the quota cannot +/// be determined or is not set. #[cfg(any(target_os = "android", target_os = "linux"))] -fn cgroup2_quota() -> Option { +fn cgroup2_quota() -> usize { use crate::ffi::OsString; - use crate::fs::{read, read_to_string, File}; - use crate::io::{BufRead, BufReader}; + use crate::fs::{try_exists, File}; + use crate::io::Read; use crate::os::unix::ffi::OsStringExt; use crate::path::PathBuf; - // find cgroup2 fs - let cgroups_mount = BufReader::new(File::open("/proc/self/mountinfo").ok()?) - .split(b'\n') - .map_while(Result::ok) - .filter_map(|line| { - let fields: Vec<_> = line.split(|&c| c == b' ').collect(); - let suffix_at = fields.iter().position(|f| f == b"-")?; - let fs_type = fields[suffix_at + 1]; - if fs_type == b"cgroup2" { Some(fields[4].to_owned()) } else { None } - }) - .next()?; - - let cgroups_mount = PathBuf::from(OsString::from_vec(cgroups_mount)); - - // find our place in the hierarchy - let cgroup_path = read("/proc/self/cgroup") - .ok()? - .split(|&c| c == b'\n') - .filter_map(|line| { - let mut fields = line.splitn(3, |&c| c == b':'); - // expect cgroupv2 which has an empty 2nd field - if fields.nth(1) != Some(b"") { - return None; - } - let path = fields.last()?; - // skip leading slash - Some(path[1..].to_owned()) - }) - .next()?; - let cgroup_path = PathBuf::from(OsString::from_vec(cgroup_path)); - - // walk hierarchy and take the minimum quota - cgroup_path - .ancestors() - .filter_map(|level| { - let cgroup_path = cgroups_mount.join(level); - let quota = match read_to_string(cgroup_path.join("cpu.max")) { - Ok(quota) => quota, - _ => return None, - }; - let quota = quota.lines().next()?; - let mut quota = quota.split(' '); - let limit = quota.next()?; - let period = quota.next()?; - match (limit.parse::(), period.parse::()) { - (Ok(limit), Ok(period)) => Some(limit / period), - _ => None, + let mut quota = usize::MAX; + + let _: Option<()> = try { + let mut buf = Vec::with_capacity(128); + // find our place in the cgroup hierarchy + File::open("/proc/self/cgroup").ok()?.read_to_end(&mut buf).ok()?; + let cgroup_path = buf + .split(|&c| c == b'\n') + .filter_map(|line| { + let mut fields = line.splitn(3, |&c| c == b':'); + // expect cgroupv2 which has an empty 2nd field + if fields.nth(1) != Some(b"") { + return None; + } + let path = fields.last()?; + // skip leading slash + Some(path[1..].to_owned()) + }) + .next()?; + let cgroup_path = PathBuf::from(OsString::from_vec(cgroup_path)); + + let mut path = PathBuf::with_capacity(128); + let mut read_buf = String::with_capacity(20); + + let cgroup_mount = "/sys/fs/cgroup"; + + path.push(cgroup_mount); + path.push(&cgroup_path); + + path.push("cgroup.controllers"); + + // skip if we're not looking at cgroup2 + if matches!(try_exists(&path), Err(_) | Ok(false)) { + return usize::MAX; + }; + + path.pop(); + + while path.starts_with(cgroup_mount) { + path.push("cpu.max"); + + read_buf.clear(); + + if File::open(&path).and_then(|mut f| f.read_to_string(&mut read_buf)).is_ok() { + let raw_quota = read_buf.lines().next()?; + let mut raw_quota = raw_quota.split(' '); + let limit = raw_quota.next()?; + let period = raw_quota.next()?; + match (limit.parse::(), period.parse::()) { + (Ok(limit), Ok(period)) => { + quota = quota.min(limit / period); + } + _ => {} + } } - }) - .min() + + path.pop(); // pop filename + path.pop(); // pop dir + } + }; + + quota } #[cfg(all( From e18abbf2ac544ef744c5aa95df434b2c81c066a2 Mon Sep 17 00:00:00 2001 From: The 8472 Date: Thu, 3 Mar 2022 00:36:23 +0100 Subject: [PATCH 3/3] update available_parallelism docs since cgroups and sched_getaffinity are now taken into account --- library/std/src/thread/mod.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/library/std/src/thread/mod.rs b/library/std/src/thread/mod.rs index beb606099341e..09d1e714ab6dd 100644 --- a/library/std/src/thread/mod.rs +++ b/library/std/src/thread/mod.rs @@ -1524,7 +1524,10 @@ fn _assert_sync_and_send() { /// /// On Linux: /// - It may overcount the amount of parallelism available when limited by a -/// process-wide affinity mask, or when affected by cgroup limits. +/// process-wide affinity mask or cgroup quotas and cgroup2 fs or `sched_getaffinity()` can't be +/// queried, e.g. due to sandboxing. +/// - It may undercount the amount of parallelism if the current thread's affinity mask +/// does not reflect the process' cpuset, e.g. due to pinned threads. /// /// On all targets: /// - It may overcount the amount of parallelism available when running in a VM