From cbd1c84304ab3ab88e606312b5afc5e4adb2b3c7 Mon Sep 17 00:00:00 2001 From: Robert Collins Date: Wed, 5 May 2021 19:42:09 +0200 Subject: [PATCH 1/2] Improve tests of incremental file writes --- src/diskio/test.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/diskio/test.rs b/src/diskio/test.rs index 8bf5ca2e43..5939f24733 100644 --- a/src/diskio/test.rs +++ b/src/diskio/test.rs @@ -66,6 +66,11 @@ fn test_incremental_file(io_threads: &str) -> Result<()> { } Ok(()) })?; + // We should be able to read back the file + assert_eq!( + std::fs::read_to_string(work_dir.path().join("scratch"))?, + "01234567890123456789".to_string() + ); Ok(()) } From 74f472786e398d4cb8e05a2ac282b30b8e1e7e39 Mon Sep 17 00:00:00 2001 From: Robert Collins Date: Wed, 5 May 2021 20:52:56 +0200 Subject: [PATCH 2/2] Reuse the same memory buffers during unpacking We think we have memory fragmentation causing failed extraction in Windows containers and smaller Unix devices. Writes of both full objects and streamed objects now re-use the Vec via a sharded-slab implementation. To facilitate the more complicated memory logic, buffer limit management is now integrated into the IO Executor: the immediate executor doesn't limit at all as no outstanding buffers occur, and the threaded executor tracks both the total allocated buffers as well as whether a reusable buffer is available. --- Cargo.lock | 31 +++++ Cargo.toml | 2 + src/cli/common.rs | 3 + src/diskio/immediate.rs | 23 +++- src/diskio/mod.rs | 101 ++++++++++++-- src/diskio/test.rs | 89 +++++++++++- src/diskio/threaded.rs | 247 +++++++++++++++++++++++++++++++--- src/dist/component/package.rs | 162 ++++++++-------------- src/lib.rs | 1 + src/utils/notifications.rs | 3 +- src/utils/notify.rs | 1 + tests/cli-exact.rs | 3 - tests/cli-rustup.rs | 5 - tests/cli-self-upd.rs | 1 - tests/cli-v2.rs | 1 - 15 files changed, 520 insertions(+), 153 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 04716feafd..022af194d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -552,6 +552,26 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "enum-map" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34c3f3c98ee954dbafd343b3e0d827bb167cdda151b3c88ae04b0e5fa7db1028" +dependencies = [ + "enum-map-derive", +] + +[[package]] +name = "enum-map-derive" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2dccd930f0c0a8968d873b10d3611f71bffc4dff84879dabf7b746b0686ea81" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "env_proxy" version = "0.4.1" @@ -1694,6 +1714,7 @@ dependencies = [ "clap", "download", "effective-limits", + "enum-map", "flate2", "git-testament", "home", @@ -1714,6 +1735,7 @@ dependencies = [ "semver", "serde", "sha2", + "sharded-slab", "strsim 0.10.0", "tar", "tempfile", @@ -1899,6 +1921,15 @@ dependencies = [ "opaque-debug", ] +[[package]] +name = "sharded-slab" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79c719719ee05df97490f80a45acfc99e5a30ce98a1e4fb67aee422745ae14e3" +dependencies = [ + "lazy_static", +] + [[package]] name = "signature" version = "1.3.0" diff --git a/Cargo.toml b/Cargo.toml index 13e3b5012c..52d4c183b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ chrono = "0.4" clap = "2" download = {path = "download", default-features = false} effective-limits = "0.5.2" +enum-map = "1.1.0" flate2 = "1" git-testament = "0.1.4" home = {git = "https://github.com/rbtcollins/home", rev = "a243ee2fbee6022c57d56f5aa79aefe194eabe53"} @@ -52,6 +53,7 @@ scopeguard = "1" semver = "0.11" serde = {version = "1.0", features = ["derive"]} sha2 = "0.9" +sharded-slab = "0.1.1" strsim = "0.10" tar = "0.4.26" tempfile = "3.1" diff --git a/src/cli/common.rs b/src/cli/common.rs index 80dc48ec61..1b7645feb8 100644 --- a/src/cli/common.rs +++ b/src/cli/common.rs @@ -148,6 +148,9 @@ impl NotifyOnConsole { NotificationLevel::Error => { err!("{}", n); } + NotificationLevel::Debug => { + debug!("{}", n); + } } } } diff --git a/src/diskio/immediate.rs b/src/diskio/immediate.rs index 676dee98d9..6cc096f795 100644 --- a/src/diskio/immediate.rs +++ b/src/diskio/immediate.rs @@ -11,7 +11,7 @@ use std::{ time::Instant, }; -use super::{CompletedIo, Executor, Item}; +use super::{CompletedIo, Executor, FileBuffer, Item}; #[derive(Debug)] pub struct _IncrementalFileState { @@ -70,7 +70,11 @@ impl Executor for ImmediateUnpacker { item.result = match &mut item.kind { super::Kind::Directory => super::create_dir(&item.full_path), super::Kind::File(ref contents) => { - super::write_file(&item.full_path, &contents, item.mode) + if let super::FileBuffer::Immediate(ref contents) = &contents { + super::write_file(&item.full_path, contents, item.mode) + } else { + unreachable!() + } } super::Kind::IncrementalFile(_incremental_file) => { return { @@ -124,6 +128,14 @@ impl Executor for ImmediateUnpacker { super::IncrementalFileState::Immediate(self.incremental_state.clone()) } } + + fn get_buffer(&mut self, capacity: usize) -> super::FileBuffer { + super::FileBuffer::Immediate(Vec::with_capacity(capacity)) + } + + fn buffer_available(&self, _len: usize) -> bool { + true + } } /// The non-shared state for writing a file incrementally @@ -160,10 +172,15 @@ impl IncrementalFileWriter { }) } - pub fn chunk_submit(&mut self, chunk: Vec) -> bool { + pub fn chunk_submit(&mut self, chunk: FileBuffer) -> bool { if (self.state.lock().unwrap()).is_none() { return false; } + let chunk = if let FileBuffer::Immediate(v) = chunk { + v + } else { + unreachable!() + }; match self.write(chunk) { Ok(v) => v, Err(e) => { diff --git a/src/diskio/mod.rs b/src/diskio/mod.rs index 01a5de1963..ca6f59c825 100644 --- a/src/diskio/mod.rs +++ b/src/diskio/mod.rs @@ -57,6 +57,7 @@ mod test; pub mod threaded; use std::io::{self, Write}; +use std::ops::{Deref, DerefMut}; use std::path::{Path, PathBuf}; use std::sync::mpsc::Receiver; use std::time::{Duration, Instant}; @@ -66,12 +67,73 @@ use anyhow::{Context, Result}; use crate::process; use crate::utils::notifications::Notification; +use threaded::PoolReference; + +/// Carries the implementation specific data for complete file transfers into the executor. +#[derive(Debug)] +pub enum FileBuffer { + Immediate(Vec), + // A reference to the object in the pool, and a handle to write to it + Threaded(PoolReference), +} + +impl FileBuffer { + /// All the buffers space to be re-used when the last reference to it is dropped. + pub(crate) fn clear(&mut self) { + if let FileBuffer::Threaded(ref mut contents) = self { + contents.clear() + } + } + + pub(crate) fn len(&self) -> usize { + match self { + FileBuffer::Immediate(ref vec) => vec.len(), + FileBuffer::Threaded(PoolReference::Owned(owned, _)) => owned.len(), + FileBuffer::Threaded(PoolReference::Mut(mutable, _)) => mutable.len(), + } + } + + pub(crate) fn finished(self) -> Self { + match self { + FileBuffer::Threaded(PoolReference::Mut(mutable, pool)) => { + FileBuffer::Threaded(PoolReference::Owned(mutable.downgrade(), pool)) + } + _ => self, + } + } +} + +impl Deref for FileBuffer { + type Target = Vec; + + fn deref(&self) -> &Self::Target { + match self { + FileBuffer::Immediate(ref vec) => &vec, + FileBuffer::Threaded(PoolReference::Owned(owned, _)) => owned, + FileBuffer::Threaded(PoolReference::Mut(mutable, _)) => mutable, + } + } +} + +impl DerefMut for FileBuffer { + fn deref_mut(&mut self) -> &mut Self::Target { + match self { + FileBuffer::Immediate(ref mut vec) => vec, + FileBuffer::Threaded(PoolReference::Owned(_, _)) => { + unimplemented!() + } + FileBuffer::Threaded(PoolReference::Mut(mutable, _)) => mutable, + } + } +} + +pub(crate) const IO_CHUNK_SIZE: usize = 16_777_216; /// Carries the implementation specific channel data into the executor. #[derive(Debug)] pub enum IncrementalFile { ImmediateReceiver, - ThreadedReceiver(Receiver>), + ThreadedReceiver(Receiver), } // The basic idea is that in single threaded mode we get this pattern: @@ -116,7 +178,7 @@ pub enum IncrementalFile { #[derive(Debug)] pub enum Kind { Directory, - File(Vec), + File(FileBuffer), IncrementalFile(IncrementalFile), } @@ -160,7 +222,7 @@ impl Item { } } - pub fn write_file(full_path: PathBuf, content: Vec, mode: u32) -> Self { + pub fn write_file(full_path: PathBuf, mode: u32, content: FileBuffer) -> Self { let len = content.len(); Self { full_path, @@ -177,7 +239,7 @@ impl Item { full_path: PathBuf, mode: u32, state: IncrementalFileState, - ) -> Result<(Self, Box) -> bool + 'a>)> { + ) -> Result<(Self, Box bool + 'a>)> { let (chunk_submit, content_callback) = state.incremental_file_channel(&full_path, mode)?; let result = Self { full_path, @@ -210,19 +272,19 @@ impl IncrementalFileState { &self, path: &Path, mode: u32, - ) -> Result<(Box) -> bool>, IncrementalFile)> { + ) -> Result<(Box bool>, IncrementalFile)> { use std::sync::mpsc::channel; match *self { IncrementalFileState::Threaded => { - let (tx, rx) = channel::>(); + let (tx, rx) = channel::(); let content_callback = IncrementalFile::ThreadedReceiver(rx); - let chunk_submit = move |chunk: Vec| tx.send(chunk).is_ok(); + let chunk_submit = move |chunk: FileBuffer| tx.send(chunk).is_ok(); Ok((Box::new(chunk_submit), content_callback)) } IncrementalFileState::Immediate(ref state) => { let content_callback = IncrementalFile::ImmediateReceiver; let mut writer = immediate::IncrementalFileWriter::new(path, mode, state.clone())?; - let chunk_submit = move |chunk: Vec| writer.chunk_submit(chunk); + let chunk_submit = move |chunk: FileBuffer| writer.chunk_submit(chunk); Ok((Box::new(chunk_submit), content_callback)) } } @@ -258,6 +320,14 @@ pub trait Executor { /// Get any state needed for incremental file processing fn incremental_file_state(&self) -> IncrementalFileState; + + /// Get a disk buffer E.g. this gets the right sized pool object for + /// optimized situations, or just a malloc when optimisations are off etc + /// etc. + fn get_buffer(&mut self, len: usize) -> FileBuffer; + + /// Query the memory budget to see if a particular size buffer is available + fn buffer_available(&self, len: usize) -> bool; } /// Trivial single threaded IO to be used from executors. @@ -267,7 +337,17 @@ pub fn perform(item: &mut Item, chunk_complete_callback: F) { // Files, write them. item.result = match &mut item.kind { Kind::Directory => create_dir(&item.full_path), - Kind::File(ref contents) => write_file(&item.full_path, &contents, item.mode), + Kind::File(ref mut contents) => { + contents.clear(); + match contents { + FileBuffer::Immediate(ref contents) => { + write_file(&item.full_path, &contents, item.mode) + } + FileBuffer::Threaded(ref mut contents) => { + write_file(&item.full_path, &contents, item.mode) + } + } + } Kind::IncrementalFile(incremental_file) => write_file_incremental( &item.full_path, incremental_file, @@ -367,6 +447,7 @@ pub fn create_dir>(path: P) -> io::Result<()> { /// Get the executor for disk IO. pub fn get_executor<'a>( notify_handler: Option<&'a dyn Fn(Notification<'_>)>, + ram_budget: usize, ) -> Result> { // If this gets lots of use, consider exposing via the config file. let thread_count = match process().var("RUSTUP_IO_THREADS") { @@ -377,6 +458,6 @@ pub fn get_executor<'a>( }; Ok(match thread_count { 0 | 1 => Box::new(immediate::ImmediateUnpacker::new()), - n => Box::new(threaded::Threaded::new(notify_handler, n)), + n => Box::new(threaded::Threaded::new(notify_handler, n, ram_budget)), }) } diff --git a/src/diskio/test.rs b/src/diskio/test.rs index 5939f24733..f8f4570d66 100644 --- a/src/diskio/test.rs +++ b/src/diskio/test.rs @@ -18,7 +18,7 @@ fn test_incremental_file(io_threads: &str) -> Result<()> { currentprocess::with(tp, || -> Result<()> { let mut written = 0; let mut file_finished = false; - let mut io_executor: Box = get_executor(None)?; + let mut io_executor: Box = get_executor(None, 32 * 1024 * 1024)?; let (item, mut sender) = Item::write_file_segmented( work_dir.path().join("scratch"), 0o666, @@ -28,10 +28,13 @@ fn test_incremental_file(io_threads: &str) -> Result<()> { // The file should be open and incomplete, and no completed chunks unreachable!(); } - let mut chunk: Vec = vec![]; + let mut chunk = io_executor.get_buffer(super::IO_CHUNK_SIZE); chunk.extend(b"0123456789"); - // We should be able to submit more than one chunk - sender(chunk.clone()); + chunk = chunk.finished(); + sender(chunk); + let mut chunk = io_executor.get_buffer(super::IO_CHUNK_SIZE); + chunk.extend(b"0123456789"); + chunk = chunk.finished(); sender(chunk); loop { for work in io_executor.completed().collect::>() { @@ -45,7 +48,9 @@ fn test_incremental_file(io_threads: &str) -> Result<()> { } } // sending a zero length chunk closes the file - sender(vec![]); + let mut chunk = io_executor.get_buffer(0); + chunk = chunk.finished(); + sender(chunk); loop { for work in io_executor.completed().collect::>() { match work { @@ -74,6 +79,70 @@ fn test_incremental_file(io_threads: &str) -> Result<()> { Ok(()) } +fn test_complete_file(io_threads: &str) -> Result<()> { + let work_dir = test_dir()?; + let mut vars = HashMap::new(); + vars.insert("RUSTUP_IO_THREADS".to_string(), io_threads.to_string()); + let tp = Box::new(currentprocess::TestProcess { + vars, + ..Default::default() + }); + currentprocess::with(tp, || -> Result<()> { + let mut io_executor: Box = get_executor(None, 32 * 1024 * 1024)?; + let mut chunk = io_executor.get_buffer(10); + chunk.extend(b"0123456789"); + assert_eq!(chunk.len(), 10); + chunk = chunk.finished(); + let item = Item::write_file(work_dir.path().join("scratch"), 0o666, chunk); + assert_eq!(item.size, Some(10)); + let mut items = 0; + let mut check_item = |item: Item| { + assert_eq!(item.size, Some(10)); + items += 1; + assert_eq!(1, items); + }; + let mut finished = false; + for work in io_executor.execute(item).collect::>() { + // The file might complete immediately + match work { + super::CompletedIo::Chunk(size) => unreachable!(format!("{:?}", size)), + super::CompletedIo::Item(item) => { + check_item(item); + finished = true; + } + } + } + if !finished { + loop { + for work in io_executor.completed().collect::>() { + match work { + super::CompletedIo::Chunk(size) => unreachable!(format!("{:?}", size)), + super::CompletedIo::Item(item) => { + check_item(item); + finished = true; + } + } + } + if finished { + break; + } + } + } + assert!(items > 0); + for _ in io_executor.join().collect::>() { + // no more work should be outstanding + unreachable!(); + } + Ok(()) + })?; + // We should be able to read back the file with correct content + assert_eq!( + std::fs::read_to_string(work_dir.path().join("scratch"))?, + "0123456789".to_string() + ); + Ok(()) +} + #[test] fn test_incremental_file_immediate() -> Result<()> { test_incremental_file("1") @@ -83,3 +152,13 @@ fn test_incremental_file_immediate() -> Result<()> { fn test_incremental_file_threaded() -> Result<()> { test_incremental_file("2") } + +#[test] +fn test_complete_file_immediate() -> Result<()> { + test_complete_file("1") +} + +#[test] +fn test_complete_file_threaded() -> Result<()> { + test_complete_file("2") +} diff --git a/src/diskio/threaded.rs b/src/diskio/threaded.rs index 80039473ef..efc4dc3076 100644 --- a/src/diskio/threaded.rs +++ b/src/diskio/threaded.rs @@ -4,15 +4,56 @@ /// than desired. In particular the docs workload with 20K files requires /// very low latency per file, which even a few ms per syscall per file /// will cause minutes of wall clock time. -use std::cell::Cell; +use std::cell::{Cell, RefCell}; +use std::fmt; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::Arc; +use enum_map::{enum_map, Enum, EnumMap}; +use sharded_slab::pool::{OwnedRef, OwnedRefMut}; + use super::{perform, CompletedIo, Executor, Item}; use crate::utils::notifications::Notification; use crate::utils::units::Unit; +#[derive(Copy, Clone, Debug, Enum)] +pub enum Bucket { + FourK, + EightK, + OneM, + EightM, + SixteenM, +} + +#[derive(Debug)] +pub enum PoolReference { + Owned(OwnedRef>, Arc>>), + Mut(OwnedRefMut>, Arc>>), +} + +impl PoolReference { + pub fn clear(&mut self) { + match self { + PoolReference::Mut(orm, pool) => { + pool.clear(orm.key()); + } + PoolReference::Owned(rm, pool) => { + pool.clear(rm.key()); + } + } + } +} + +impl AsRef<[u8]> for PoolReference { + fn as_ref(&self) -> &[u8] { + match self { + PoolReference::Owned(owned, _) => owned, + PoolReference::Mut(mutable, _) => mutable, + } + } +} + enum Task { Request(CompletedIo), // Used to synchronise in the join method. @@ -25,16 +66,53 @@ impl Default for Task { } } +struct Pool { + pool: Arc>>, + high_watermark: RefCell, + in_use: RefCell, + size: usize, +} + +impl Pool { + fn claim(&self) { + if self.in_use == self.high_watermark { + *self.high_watermark.borrow_mut() += self.size; + } + *self.in_use.borrow_mut() += self.size; + } + + fn reclaim(&self) { + *self.in_use.borrow_mut() -= self.size; + } +} + +impl fmt::Debug for Pool { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Pool") + .field("size", &self.size) + .field("in_use", &self.in_use) + .field("high_watermark", &self.high_watermark) + .finish() + } +} + pub struct Threaded<'a> { n_files: Arc, pool: threadpool::ThreadPool, notify_handler: Option<&'a dyn Fn(Notification<'_>)>, rx: Receiver, tx: Sender, + vec_pools: EnumMap, + ram_budget: usize, } impl<'a> Threaded<'a> { - pub fn new(notify_handler: Option<&'a dyn Fn(Notification<'_>)>, thread_count: usize) -> Self { + /// Construct a new Threaded executor. + pub fn new( + notify_handler: Option<&'a dyn Fn(Notification<'_>)>, + thread_count: usize, + ram_budget: usize, + ) -> Self { // Defaults to hardware thread count threads; this is suitable for // our needs as IO bound operations tend to show up as write latencies // rather than close latencies, so we don't need to look at @@ -45,15 +123,81 @@ impl<'a> Threaded<'a> { .thread_stack_size(1_048_576) .build(); let (tx, rx) = channel(); + let vec_pools = enum_map! { + Bucket::FourK => Pool{ + pool: Arc::new(sharded_slab::Pool::new()), + high_watermark: RefCell::new(4096), + in_use: RefCell::new(0), + size:4096 + }, + Bucket::EightK=> Pool{ + pool: Arc::new(sharded_slab::Pool::new()), + high_watermark: RefCell::new(8192), + in_use: RefCell::new(0), + size:8192 + }, + Bucket::OneM=> Pool{ + pool: Arc::new(sharded_slab::Pool::new()), + high_watermark: RefCell::new(1024*1024), + in_use: RefCell::new(0), + size:1024*1024 + }, + Bucket::EightM=> Pool{ + pool: Arc::new(sharded_slab::Pool::new()), + high_watermark: RefCell::new(8*1024*1024), + in_use: RefCell::new(0), + size:8*1024*1024 + }, + Bucket::SixteenM=> Pool{ + pool: Arc::new(sharded_slab::Pool::new()), + high_watermark: RefCell::new(16*1024*1024), + in_use: RefCell::new(0), + size: 16*1024*1024 + }, + }; + // Ensure there is at least one each size buffer, so we can always make forward progress. + for (_, pool) in &vec_pools { + let key = pool + .pool + .create_with(|vec| vec.reserve_exact(pool.size - vec.len())) + .unwrap(); + pool.pool.clear(key); + } + // Since we've just *used* this memory, we had better have been allowed to! + assert!(Threaded::ram_highwater(&vec_pools) < ram_budget); Self { n_files: Arc::new(AtomicUsize::new(0)), pool, notify_handler, rx, tx, + vec_pools, + ram_budget, } } + /// How much RAM is allocated across all the pools right now + fn ram_highwater(vec_pools: &EnumMap) -> usize { + vec_pools + .iter() + .map(|(_, pool)| *pool.high_watermark.borrow()) + .sum() + } + + fn reclaim(&self, op: &CompletedIo) { + let size = match &op { + CompletedIo::Item(op) => match &op.kind { + super::Kind::Directory => return, + super::Kind::File(content) => content.len(), + super::Kind::IncrementalFile(_) => return, + }, + CompletedIo::Chunk(_) => super::IO_CHUNK_SIZE, + }; + let bucket = self.find_bucket(size); + let pool = &self.vec_pools[bucket]; + pool.reclaim(); + } + fn submit(&self, mut item: Item) { let tx = self.tx.clone(); self.n_files.fetch_add(1, Ordering::Relaxed); @@ -69,6 +213,24 @@ impl<'a> Threaded<'a> { .expect("receiver should be listening"); }); } + + fn find_bucket(&self, capacity: usize) -> Bucket { + let mut bucket = Bucket::FourK; + for (next_bucket, pool) in &self.vec_pools { + bucket = next_bucket; + if pool.size >= capacity { + break; + } + } + let pool = &self.vec_pools[bucket]; + assert!( + capacity <= pool.size, + "capacity <= pool.size: {} > {}", + capacity, + pool.size + ); + bucket + } } impl<'a> Executor for Threaded<'a> { @@ -143,15 +305,22 @@ impl<'a> Executor for Threaded<'a> { self.tx .send(Task::Sentinel) .expect("must still be listening"); + if crate::currentprocess::process().var("RUSTUP_DEBUG").is_ok() { + // debug! is in the cli layer. erg. And notification stack is still terrible. + debug!(""); + for (bucket, pool) in &self.vec_pools { + debug!("{:?}: {:?}", bucket, pool); + } + } Box::new(JoinIterator { - iter: self.rx.iter(), + executor: self, consume_sentinel: false, }) } fn completed(&self) -> Box + '_> { Box::new(JoinIterator { - iter: self.rx.try_iter(), + executor: self, consume_sentinel: true, }) } @@ -159,6 +328,28 @@ impl<'a> Executor for Threaded<'a> { fn incremental_file_state(&self) -> super::IncrementalFileState { super::IncrementalFileState::Threaded } + + fn get_buffer(&mut self, capacity: usize) -> super::FileBuffer { + let bucket = self.find_bucket(capacity); + let pool = &mut self.vec_pools[bucket]; + let mut item = pool.pool.clone().create_owned().unwrap(); + item.reserve_exact(pool.size); + pool.claim(); + super::FileBuffer::Threaded(PoolReference::Mut(item, pool.pool.clone())) + } + + fn buffer_available(&self, len: usize) -> bool { + // if either: there is room in the budget to assign a new slab entry of + // this size, or there is an unused slab entry of this size. + let bucket = self.find_bucket(len); + let pool = &self.vec_pools[bucket]; + if pool.in_use < pool.high_watermark { + return true; + } + let size = pool.size; + let total_used = Threaded::ram_highwater(&self.vec_pools); + total_used + size < self.ram_budget + } } impl<'a> Drop for Threaded<'a> { @@ -168,28 +359,43 @@ impl<'a> Drop for Threaded<'a> { } } -struct JoinIterator> { - iter: T, +struct JoinIterator<'a, 'b> { + executor: &'a Threaded<'b>, consume_sentinel: bool, } -impl> Iterator for JoinIterator { +impl<'a, 'b> JoinIterator<'a, 'b> { + fn inner>(&self, mut iter: T) -> Option { + loop { + let task_o = iter.next(); + match task_o { + None => break None, + Some(task) => match task { + Task::Sentinel => { + if self.consume_sentinel { + continue; + } else { + break None; + } + } + Task::Request(item) => { + self.executor.reclaim(&item); + break Some(item); + } + }, + } + } + } +} + +impl<'a, 'b> Iterator for JoinIterator<'a, 'b> { type Item = CompletedIo; fn next(&mut self) -> Option { - let task_o = self.iter.next(); - match task_o { - None => None, - Some(task) => match task { - Task::Sentinel => { - if self.consume_sentinel { - self.next() - } else { - None - } - } - Task::Request(item) => Some(item), - }, + if self.consume_sentinel { + self.inner(self.executor.rx.try_iter()) + } else { + self.inner(self.executor.rx.iter()) } } } @@ -218,6 +424,7 @@ impl<'a, 'b> Iterator for SubmitIterator<'a, 'b> { } else { for task in self.executor.rx.iter() { if let Task::Request(item) = task { + self.executor.reclaim(&item); return Some(item); } if self.executor.pool.queued_count() < threshold { diff --git a/src/dist/component/package.rs b/src/dist/component/package.rs index 6d3acae1d4..9d141f4d94 100644 --- a/src/dist/component/package.rs +++ b/src/dist/component/package.rs @@ -11,7 +11,7 @@ use std::path::{Path, PathBuf}; use anyhow::{anyhow, bail, Context, Result}; use tar::EntryType; -use crate::diskio::{get_executor, CompletedIo, Executor, Item, Kind}; +use crate::diskio::{get_executor, CompletedIo, Executor, FileBuffer, Item, Kind, IO_CHUNK_SIZE}; use crate::dist::component::components::*; use crate::dist::component::transaction::*; use crate::dist::temp; @@ -159,79 +159,44 @@ impl<'a> TarPackage<'a> { } } -struct MemoryBudget { - limit: usize, - used: usize, -} - // Probably this should live in diskio but ¯\_(ツ)_/¯ -impl MemoryBudget { - fn new( - io_chunk_size: usize, - effective_max_ram: Option, - notify_handler: Option<&dyn Fn(Notification<'_>)>, - ) -> Self { - const DEFAULT_UNPACK_RAM_MAX: usize = 500 * 1024 * 1024; - const RAM_ALLOWANCE_FOR_RUSTUP_AND_BUFFERS: usize = 100 * 1024 * 1024; - let default_max_unpack_ram = if let Some(effective_max_ram) = effective_max_ram { - let ram_for_unpacking = effective_max_ram - RAM_ALLOWANCE_FOR_RUSTUP_AND_BUFFERS; - std::cmp::min(DEFAULT_UNPACK_RAM_MAX, ram_for_unpacking) +fn unpack_ram( + io_chunk_size: usize, + effective_max_ram: Option, + notify_handler: Option<&dyn Fn(Notification<'_>)>, +) -> usize { + const RAM_ALLOWANCE_FOR_RUSTUP_AND_BUFFERS: usize = 200 * 1024 * 1024; + let minimum_ram = io_chunk_size * 2; + let default_max_unpack_ram = if let Some(effective_max_ram) = effective_max_ram { + if effective_max_ram > minimum_ram + RAM_ALLOWANCE_FOR_RUSTUP_AND_BUFFERS { + effective_max_ram - RAM_ALLOWANCE_FOR_RUSTUP_AND_BUFFERS } else { - // Rustup does not know how much RAM the machine has: use the - // minimum known to work reliably. - DEFAULT_UNPACK_RAM_MAX - }; - let unpack_ram = match process() - .var("RUSTUP_UNPACK_RAM") - .ok() - .and_then(|budget_str| budget_str.parse::().ok()) - { - // Note: In future we may want to add a warning or even an override if a user - // supplied budget is larger than effective_max_ram. - Some(budget) => budget, - None => { - if let Some(h) = notify_handler { - h(Notification::SetDefaultBufferSize(default_max_unpack_ram)) - } - default_max_unpack_ram - } - }; - - if io_chunk_size > unpack_ram { - panic!("RUSTUP_UNPACK_RAM must be larger than {}", io_chunk_size); - } - Self { - limit: unpack_ram, - used: 0, + minimum_ram } - } - - fn reclaim(&mut self, op: &CompletedIo) { - match &op { - CompletedIo::Item(op) => match &op.kind { - Kind::Directory => {} - Kind::File(content) => self.used -= content.len(), - Kind::IncrementalFile(_) => {} - }, - CompletedIo::Chunk(size) => self.used -= size, + } else { + // Rustup does not know how much RAM the machine has: use the minimum + minimum_ram + }; + let unpack_ram = match process() + .var("RUSTUP_UNPACK_RAM") + .ok() + .and_then(|budget_str| budget_str.parse::().ok()) + { + // Note: In future we may want to add a warning or even an override if a user + // supplied budget is larger than effective_max_ram. + Some(budget) => budget, + None => { + if let Some(h) = notify_handler { + h(Notification::SetDefaultBufferSize(default_max_unpack_ram)) + } + default_max_unpack_ram } - } - - fn claim(&mut self, op: &Item) { - match &op.kind { - Kind::Directory => {} - Kind::File(content) => self.used += content.len(), - Kind::IncrementalFile(_) => {} - }; - } - - fn claim_chunk(&mut self, len: usize) { - self.used += len; - } + }; - fn available(&self) -> usize { - self.limit - self.used + if io_chunk_size > unpack_ram { + panic!("RUSTUP_UNPACK_RAM must be larger than {}", io_chunk_size); } + unpack_ram } /// Handle the async result of io operations @@ -269,7 +234,6 @@ fn filter_result(op: &mut CompletedIo) -> io::Result<()> { fn trigger_children( io_executor: &dyn Executor, directories: &mut HashMap, - budget: &mut MemoryBudget, op: CompletedIo, ) -> Result { let mut result = 0; @@ -290,9 +254,8 @@ fn trigger_children( for pending_item in pending.into_iter() { for mut item in io_executor.execute(pending_item).collect::>() { // TODO capture metrics - budget.reclaim(&item); filter_result(&mut item)?; - result += trigger_children(io_executor, directories, budget, item)?; + result += trigger_children(io_executor, directories, item)?; } } } @@ -311,9 +274,7 @@ fn unpack_without_first_dir<'a, R: Read>( path: &Path, notify_handler: Option<&'a dyn Fn(Notification<'_>)>, ) -> Result<()> { - let mut io_executor: Box = get_executor(notify_handler)?; let entries = archive.entries()?; - const IO_CHUNK_SIZE: u64 = 16_777_216; let effective_max_ram = match effective_limits::memory_limit() { Ok(ram) => Some(ram as usize), Err(e) => { @@ -323,7 +284,8 @@ fn unpack_without_first_dir<'a, R: Read>( None } }; - let mut budget = MemoryBudget::new(IO_CHUNK_SIZE as usize, effective_max_ram, notify_handler); + let unpack_ram = unpack_ram(IO_CHUNK_SIZE, effective_max_ram, notify_handler); + let mut io_executor: Box = get_executor(notify_handler, unpack_ram)?; let mut directories: HashMap = HashMap::new(); // Path is presumed to exist. Call it a precondition. @@ -335,9 +297,8 @@ fn unpack_without_first_dir<'a, R: Read>( // our unpacked item is pending dequeue) for mut item in io_executor.completed().collect::>() { // TODO capture metrics - budget.reclaim(&item); filter_result(&mut item)?; - trigger_children(&*io_executor, &mut directories, &mut budget, item)?; + trigger_children(&*io_executor, &mut directories, item)?; } let mut entry = entry?; @@ -365,15 +326,14 @@ fn unpack_without_first_dir<'a, R: Read>( } struct SenderEntry<'a, 'b, R: std::io::Read> { - sender: Box) -> bool + 'a>, + sender: Box bool + 'a>, entry: tar::Entry<'b, R>, } /// true if either no sender_entry was provided, or the incremental file /// has been fully dispatched. fn flush_ios<'a, R: std::io::Read, P: AsRef>( - mut budget: &mut MemoryBudget, - io_executor: &dyn Executor, + io_executor: &mut dyn Executor, mut directories: &mut HashMap, mut sender_entry: Option<&mut SenderEntry<'a, '_, R>>, full_path: P, @@ -381,25 +341,23 @@ fn unpack_without_first_dir<'a, R: Read>( let mut result = sender_entry.is_none(); for mut op in io_executor.completed().collect::>() { // TODO capture metrics - budget.reclaim(&op); filter_result(&mut op)?; - trigger_children(&*io_executor, &mut directories, &mut budget, op)?; + trigger_children(&*io_executor, &mut directories, op)?; } // Maybe stream a file incrementally if let Some(sender) = sender_entry.as_mut() { - if budget.available() as u64 >= IO_CHUNK_SIZE { - let mut v = vec![0; IO_CHUNK_SIZE as usize]; + if io_executor.buffer_available(IO_CHUNK_SIZE) { + let mut buffer = io_executor.get_buffer(IO_CHUNK_SIZE); let len = sender .entry .by_ref() - .take(IO_CHUNK_SIZE) - .read_to_end(&mut v)?; + .take(IO_CHUNK_SIZE as u64) + .read_to_end(&mut buffer)?; + buffer = buffer.finished(); if len == 0 { result = true; } - v.resize(len, 0); - budget.claim_chunk(len); - if !(sender.sender)(v) { + if !(sender.sender)(buffer) { bail!(format!( "IO receiver for '{}' disconnected", full_path.as_ref().display() @@ -439,26 +397,25 @@ fn unpack_without_first_dir<'a, R: Read>( let mode = u_mode | g_mode | o_mode; let file_size = entry.header().size()?; - let size = std::cmp::min(IO_CHUNK_SIZE, file_size); + let size = std::cmp::min(IO_CHUNK_SIZE as u64, file_size); - while size > budget.available() as u64 { + while !io_executor.buffer_available(size as usize) { flush_ios::, _>( - &mut budget, - &*io_executor, + &mut *io_executor, &mut directories, None, &full_path, )?; } - let mut incremental_file_sender: Option) -> bool + '_>> = None; + let mut incremental_file_sender: Option bool + '_>> = None; let mut item = match kind { EntryType::Directory => { directories.insert(full_path.to_owned(), DirStatus::Pending(Vec::new())); Item::make_dir(full_path.clone(), mode) } EntryType::Regular => { - if file_size > IO_CHUNK_SIZE { + if file_size > IO_CHUNK_SIZE as u64 { let (item, sender) = Item::write_file_segmented( full_path.clone(), mode, @@ -467,14 +424,14 @@ fn unpack_without_first_dir<'a, R: Read>( incremental_file_sender = Some(sender); item } else { - let mut v = Vec::with_capacity(size as usize); - entry.read_to_end(&mut v)?; - Item::write_file(full_path.clone(), v, mode) + let mut content = io_executor.get_buffer(size as usize); + entry.read_to_end(&mut content)?; + content = content.finished(); + Item::write_file(full_path.clone(), mode, content) } } _ => bail!(format!("tar entry kind '{:?}' is not supported", kind)), }; - budget.claim(&item); let item = loop { // Create the full path to the entry if it does not exist already @@ -519,9 +476,8 @@ fn unpack_without_first_dir<'a, R: Read>( // Submit the new item for mut item in io_executor.execute(item).collect::>() { // TODO capture metrics - budget.reclaim(&item); filter_result(&mut item)?; - trigger_children(&*io_executor, &mut directories, &mut budget, item)?; + trigger_children(&*io_executor, &mut directories, item)?; } } @@ -533,8 +489,7 @@ fn unpack_without_first_dir<'a, R: Read>( // monitor io queue and feed in the content of the file (if needed) while !flush_ios( - &mut budget, - &*io_executor, + &mut *io_executor, &mut directories, incremental_file_sender.as_mut(), &full_path, @@ -546,9 +501,8 @@ fn unpack_without_first_dir<'a, R: Read>( for mut item in io_executor.join().collect::>() { // handle final IOs // TODO capture metrics - budget.reclaim(&item); filter_result(&mut item)?; - triggered += trigger_children(&*io_executor, &mut directories, &mut budget, item)?; + triggered += trigger_children(&*io_executor, &mut directories, item)?; } if triggered == 0 { // None of the IO submitted before the prior join triggered any new diff --git a/src/lib.rs b/src/lib.rs index 902d6bb41a..c6bd36d042 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -58,6 +58,7 @@ fn component_for_bin(binary: &str) -> Option<&'static str> { } } +#[macro_use] pub mod cli; pub mod command; mod config; diff --git a/src/utils/notifications.rs b/src/utils/notifications.rs index 5afaf6507c..2dd54fe148 100644 --- a/src/utils/notifications.rs +++ b/src/utils/notifications.rs @@ -46,6 +46,7 @@ impl<'a> Notification<'a> { pub fn level(&self) -> NotificationLevel { use self::Notification::*; match self { + SetDefaultBufferSize(_) => NotificationLevel::Debug, CreatingDirectory(_, _) | RemovingDirectory(_, _) | LinkingDirectory(_, _) @@ -59,7 +60,7 @@ impl<'a> Notification<'a> { | ResumingPartialDownload | UsingCurl | UsingReqwest => NotificationLevel::Verbose, - RenameInUse(_, _) | SetDefaultBufferSize(_) => NotificationLevel::Info, + RenameInUse(_, _) => NotificationLevel::Info, NoCanonicalPath(_) => NotificationLevel::Warn, Error(_) => NotificationLevel::Error, } diff --git a/src/utils/notify.rs b/src/utils/notify.rs index 17c5bc80e8..c4f3a4f528 100644 --- a/src/utils/notify.rs +++ b/src/utils/notify.rs @@ -4,4 +4,5 @@ pub enum NotificationLevel { Info, Warn, Error, + Debug, } diff --git a/tests/cli-exact.rs b/tests/cli-exact.rs index cea9a94270..e00ec13041 100644 --- a/tests/cli-exact.rs +++ b/tests/cli-exact.rs @@ -34,7 +34,6 @@ info: downloading component 'rust-docs' info: downloading component 'rust-std' info: downloading component 'rustc' info: installing component 'cargo' -info: using up to 500.0 MiB of RAM to unpack components info: installing component 'rust-docs' info: installing component 'rust-std' info: installing component 'rustc' @@ -241,7 +240,6 @@ info: downloading component 'rust-docs' info: downloading component 'rust-std' info: downloading component 'rustc' info: installing component 'cargo' -info: using up to 500.0 MiB of RAM to unpack components info: installing component 'rust-docs' info: installing component 'rust-std' info: installing component 'rustc' @@ -574,7 +572,6 @@ fn cross_install_indicates_target() { &format!( r"info: downloading component 'rust-std' for '{0}' info: installing component 'rust-std' for '{0}' -info: using up to 500.0 MiB of RAM to unpack components ", clitools::CROSS_ARCH1 ), diff --git a/tests/cli-rustup.rs b/tests/cli-rustup.rs index 34ec14ede0..12e9193ebe 100644 --- a/tests/cli-rustup.rs +++ b/tests/cli-rustup.rs @@ -54,7 +54,6 @@ info: removing previous version of component 'rust-docs' info: removing previous version of component 'rust-std' info: removing previous version of component 'rustc' info: installing component 'cargo' -info: using up to 500.0 MiB of RAM to unpack components info: installing component 'rust-docs' info: installing component 'rust-std' info: installing component 'rustc' @@ -95,7 +94,6 @@ info: removing previous version of component 'rust-docs' info: removing previous version of component 'rust-std' info: removing previous version of component 'rustc' info: installing component 'cargo' -info: using up to 500.0 MiB of RAM to unpack components info: installing component 'rust-docs' info: installing component 'rust-std' info: installing component 'rustc' @@ -160,7 +158,6 @@ info: removing previous version of component 'rust-docs' info: removing previous version of component 'rust-std' info: removing previous version of component 'rustc' info: installing component 'cargo' -info: using up to 500.0 MiB of RAM to unpack components info: installing component 'rust-docs' info: installing component 'rust-std' info: installing component 'rustc' @@ -231,7 +228,6 @@ info: removing previous version of component 'rust-docs' info: removing previous version of component 'rust-std' info: removing previous version of component 'rustc' info: installing component 'cargo' -info: using up to 500.0 MiB of RAM to unpack components info: installing component 'rust-docs' info: installing component 'rust-std' info: installing component 'rustc' @@ -293,7 +289,6 @@ info: downloading component 'rust-docs' info: downloading component 'rust-std' info: downloading component 'rustc' info: installing component 'cargo' -info: using up to 500.0 MiB of RAM to unpack components info: installing component 'rust-docs' info: installing component 'rust-std' info: installing component 'rustc' diff --git a/tests/cli-self-upd.rs b/tests/cli-self-upd.rs index 916a37fc8e..9164ca5e32 100644 --- a/tests/cli-self-upd.rs +++ b/tests/cli-self-upd.rs @@ -77,7 +77,6 @@ info: downloading component 'rust-docs' info: downloading component 'rust-std' info: downloading component 'rustc' info: installing component 'cargo' -info: using up to 500.0 MiB of RAM to unpack components info: installing component 'rust-docs' info: installing component 'rust-std' info: installing component 'rustc' diff --git a/tests/cli-v2.rs b/tests/cli-v2.rs index d2d9284156..f37c8201a4 100644 --- a/tests/cli-v2.rs +++ b/tests/cli-v2.rs @@ -1436,7 +1436,6 @@ info: downloading component 'cargo' info: downloading component 'rust-docs' info: downloading component 'rustc' info: installing component 'cargo' -info: using up to 500.0 MiB of RAM to unpack components info: installing component 'rust-docs' info: installing component 'rustc' "