diff --git a/src/python/pants/engine/console.py b/src/python/pants/engine/console.py index 7100140b3f71..0298c684411a 100644 --- a/src/python/pants/engine/console.py +++ b/src/python/pants/engine/console.py @@ -38,20 +38,20 @@ def __init__( @property def stdout(self): + if self._session: + self._session.teardown_dynamic_ui() return self._stdout @property def stderr(self): + if self._session: + self._session.teardown_dynamic_ui() return self._stderr def write_stdout(self, payload: str) -> None: - if self._session: - self._session.teardown_dynamic_ui() self.stdout.write(payload) def write_stderr(self, payload: str) -> None: - if self._session: - self._session.teardown_dynamic_ui() self.stderr.write(payload) def print_stdout(self, payload: str, end: str = "\n") -> None: diff --git a/src/python/pants/option/global_options.py b/src/python/pants/option/global_options.py index ca00278b8da0..d0e4dbc870c1 100644 --- a/src/python/pants/option/global_options.py +++ b/src/python/pants/option/global_options.py @@ -319,12 +319,18 @@ def register_bootstrap_options(cls, register): ) register( - "-l", "--level", type=LogLevel, default=LogLevel.INFO, help="Set the logging level." + "-l", + "--level", + type=LogLevel, + default=LogLevel.INFO, + daemon=True, + help="Set the logging level.", ) register( "--show-log-target", type=bool, default=False, + daemon=True, advanced=True, help="Display the target where a log message originates in that log message's output. " "This can be helpful when paired with --log-levels-by-target.", @@ -334,6 +340,7 @@ def register_bootstrap_options(cls, register): "--log-levels-by-target", type=dict, default={}, + daemon=True, advanced=True, help="Set a more specific logging level for one or more logging targets. The names of " "logging targets are specified in log strings when the --show-log-target option is set. " @@ -347,6 +354,7 @@ def register_bootstrap_options(cls, register): "--log-show-rust-3rdparty", type=bool, default=False, + daemon=True, advanced=True, help="Whether to show/hide logging done by 3rdparty Rust crates used by the Pants " "engine.", @@ -368,6 +376,7 @@ def register_bootstrap_options(cls, register): type=list, member_type=str, default=[], + daemon=True, advanced=True, help="Regexps matching warning strings to ignore, e.g. " '["DEPRECATED: the option `--my-opt` will be removed"]. The regex patterns will be ' diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index 04795d32b1d8..b9d281996fc8 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -351,6 +351,7 @@ dependencies = [ [[package]] name = "console" version = "0.14.0" +source = "git+https://github.com/stuhood/console?branch=stuhood/term-target-for-arbitrary-handles#c0163003c94550b3d7308e462493fa7e772cd572" dependencies = [ "encode_unicode", "lazy_static", diff --git a/src/rust/engine/Cargo.toml b/src/rust/engine/Cargo.toml index cf3b71ee3ef8..96d03173e208 100644 --- a/src/rust/engine/Cargo.toml +++ b/src/rust/engine/Cargo.toml @@ -153,5 +153,5 @@ env_logger = "0.5.4" prost = { git = "https://github.com/danburkert/prost", rev = "a1cccbcee343e2c444e1cd2738c7fba2599fc391" } prost-build = { git = "https://github.com/danburkert/prost", rev = "a1cccbcee343e2c444e1cd2738c7fba2599fc391" } prost-types = { git = "https://github.com/danburkert/prost", rev = "a1cccbcee343e2c444e1cd2738c7fba2599fc391" } - -console = { path = "/Users/stuhood/src/console" } +# TODO: Stabilize before landing. +console = { git = "https://github.com/stuhood/console", branch = "stuhood/term-target-for-arbitrary-handles" } diff --git a/src/rust/engine/src/externs/interface.rs b/src/rust/engine/src/externs/interface.rs index 0aa6ed6d6e5b..e2027c81eef4 100644 --- a/src/rust/engine/src/externs/interface.rs +++ b/src/rust/engine/src/externs/interface.rs @@ -133,13 +133,13 @@ py_module_initializer!(native_engine, |py, m| { m.add( py, "stdio_write_stdout", - py_fn!(py, stdio_write_stdout(b: String)), + py_fn!(py, stdio_write_stdout(b: &[u8])), )?; m.add( py, "stdio_write_stderr", - py_fn!(py, stdio_write_stderr(b: String)), + py_fn!(py, stdio_write_stderr(b: &[u8])), )?; m.add(py, "flush_log", py_fn!(py, flush_log()))?; @@ -1899,22 +1899,20 @@ fn stdio_thread_console_set( } fn stdio_thread_console_clear(_: Python) -> PyUnitResult { - stdio::Destination::console_clear(&stdio::get_destination()); + stdio::get_destination().console_clear(); Ok(None) } -fn stdio_write_stdout(py: Python, msg: String) -> PyUnitResult { +fn stdio_write_stdout(py: Python, payload: &[u8]) -> PyUnitResult { py.allow_threads(|| { - let destination = stdio::get_destination(); - stdio::Destination::write_stdout(&destination, &msg); + stdio::get_destination().write_stdout(payload); Ok(None) }) } -fn stdio_write_stderr(py: Python, msg: String) -> PyUnitResult { +fn stdio_write_stderr(py: Python, payload: &[u8]) -> PyUnitResult { py.allow_threads(|| { - let destination = stdio::get_destination(); - stdio::Destination::write_stderr(&destination, &msg); + stdio::get_destination().write_stderr(payload); Ok(None) }) } diff --git a/src/rust/engine/src/externs/stdio.rs b/src/rust/engine/src/externs/stdio.rs index 066d8888411b..72796601551f 100644 --- a/src/rust/engine/src/externs/stdio.rs +++ b/src/rust/engine/src/externs/stdio.rs @@ -60,9 +60,9 @@ py_class!(pub class PyStdioWrite |py| { py.allow_threads(|| { let destination = stdio::get_destination(); if is_stdout { - stdio::Destination::write_stdout(&destination, payload); + destination.write_stdout(payload.as_bytes()); } else { - stdio::Destination::write_stderr(&destination, payload); + destination.write_stderr(payload.as_bytes()); } }); Ok(Python::None(py)) @@ -83,7 +83,7 @@ py_class!(pub class PyStdioWrite |py| { } else { stdio::Destination::stderr_as_raw_fd(&destination) }; - fd.ok_or_else(|| PyErr::new::(py, ("No associated fileno.".to_owned(),))) + fd.map_err(|e| PyErr::new::(py, (e,))) } def flush(&self) -> PyResult { diff --git a/src/rust/engine/stdio/src/lib.rs b/src/rust/engine/stdio/src/lib.rs index 46d5f92dfe2d..e0987950e374 100644 --- a/src/rust/engine/stdio/src/lib.rs +++ b/src/rust/engine/stdio/src/lib.rs @@ -27,8 +27,12 @@ // Arc can be more clear than needing to grok Orderings: #![allow(clippy::mutex_atomic)] +mod term; + +pub use term::{TermReadDestination, TermWriteDestination}; + use std::cell::RefCell; -use std::collections::HashMap; +use std::fmt; use std::fs::File; use std::future::Future; use std::io::{Read, Write}; @@ -37,20 +41,19 @@ use std::sync::Arc; use parking_lot::Mutex; use tokio::task_local; -use uuid::Uuid; /// /// A Console wraps some "borrowed" file handles: when it is dropped, we forget about the file /// handles rather than closing them. The file handles are optional only so that they may be /// "taken" during Drop. /// +#[derive(Debug)] struct Console { // TODO: Consume from `sys.stdin` replacement. #[allow(dead_code)] stdin_handle: Option, stdout_handle: Option, stderr_handle: Option, - stderr_handlers: HashMap, } impl Console { @@ -66,7 +69,6 @@ impl Console { stdin_handle: Some(stdin), stdout_handle: Some(stdout), stderr_handle: Some(stderr), - stderr_handlers: HashMap::new(), } } @@ -81,36 +83,21 @@ impl Console { } fn write_stderr(&mut self, content: &[u8]) -> Result<(), std::io::Error> { - // We first try to output to all registered handlers. - let logged_to_handlers = if self.stderr_handlers.is_empty() { - false - } else { - // TODO: The UI should do a full replacement here when it opens the Read/Write instances. - self - .stderr_handlers - .values() - .all(|callback| callback(&String::from_utf8_lossy(content)).is_ok()) - }; - - // If we successfully logged to (all) handlers, we're done. - if logged_to_handlers { - return Ok(()); - } - - // Otherwise, write to the underlying stderr handle. let mut stderr = self.stderr_handle.as_ref().unwrap(); stderr.write_all(content)?; stderr.flush() } - pub fn register_stderr_handler(&mut self, callback: StdioHandler) -> Uuid { - let unique_id = Uuid::new_v4(); - self.stderr_handlers.insert(unique_id, callback); - unique_id + fn stdin_as_raw_fd(&self) -> RawFd { + self.stdin_handle.as_ref().unwrap().as_raw_fd() + } + + fn stdout_as_raw_fd(&self) -> RawFd { + self.stdout_handle.as_ref().unwrap().as_raw_fd() } - pub fn deregister_stderr_handler(&mut self, unique_id: Uuid) { - self.stderr_handlers.remove(&unique_id); + fn stderr_as_raw_fd(&self) -> RawFd { + self.stderr_handle.as_ref().unwrap().as_raw_fd() } } @@ -138,9 +125,23 @@ impl Drop for Console { enum InnerDestination { Logging, Console(Console), - UI, + UI { stderr_handler: StdioHandler }, } +impl fmt::Debug for InnerDestination { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Logging => f.debug_struct("Logging").finish(), + Self::Console(c) => f.debug_struct("Console").field("console", c).finish(), + Self::UI { .. } => f + .debug_struct("UI") + .field("stderr_handler", &"") + .finish(), + } + } +} + +#[derive(Debug)] pub struct Destination(Mutex); impl Destination { @@ -152,12 +153,37 @@ impl Destination { } /// - /// If the UI has not already been cleared, clears it and restores the Console. + /// Starts a UI iff the Destination is currently a Console, and returns a TermDestination for + /// interacting with it. Dropping the TermDestination will restore direct Console access. + /// + pub fn ui_create( + self: &Arc, + stderr_handler: StdioHandler, + ) -> Result<(TermReadDestination, TermWriteDestination), String> { + let mut destination = self.0.lock(); + if !matches!(*destination, InnerDestination::Console(..)) { + return Err(format!( + "Cannot start a UI on Destination {:?}", + destination + )); + } + let console = std::mem::replace(&mut *destination, InnerDestination::UI { stderr_handler }); + match console { + InnerDestination::Console(console) => Ok(term::TermDestination::new(console, self.clone())), + _ => unreachable!(), + } + } + + /// + /// Clears the UI and restores the Console. /// fn ui_clear(&self, console: Console) { let mut destination = self.0.lock(); - if matches!(*destination, InnerDestination::UI) { + if matches!(*destination, InnerDestination::UI { .. }) { *destination = InnerDestination::Console(console); + } else { + // The UI was torn down independently: drop the Console. + *destination = InnerDestination::Logging; } } @@ -168,6 +194,10 @@ impl Destination { let mut destination = self.0.lock(); match *destination { InnerDestination::Console(ref mut console) => console.read_stdin(buf), + InnerDestination::UI { .. } => Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "stdin is currently owned by the UI.", + )), InnerDestination::Logging => Err(std::io::Error::new( std::io::ErrorKind::UnexpectedEof, "No stdin attached.", @@ -180,34 +210,39 @@ impl Destination { /// available. /// pub fn write_stdout(&self, content: &[u8]) { - { - let mut destination = self.0.lock(); - match *destination { - InnerDestination::Console(ref mut console) => { - // Write to the underlying Console. - if let Ok(()) = console.write_stdout(content) { - return; - } - // If writing to the stdout handle fails, fall through to mutate self to drop it. - } - InnerDestination::Logging => { - // Release the lock on the Destination before logging. - std::mem::drop(destination); - log::info!("stdout: {:?}", content); + let mut destination = self.0.lock(); + let error_res = match *destination { + InnerDestination::Console(ref mut console) => { + // Write to the underlying Console. + let res = console.write_stdout(content); + if res.is_ok() { return; } + // If writing to the stdout handle fails, fall through to mutate self to drop it. + res.map_err(|e| e.to_string()) } - } + InnerDestination::Logging | InnerDestination::UI { .. } => { + // Release the lock on the Destination before logging. + std::mem::drop(destination); + log::info!("stdout: {:?}", String::from_utf8_lossy(content)); + return; + } + }; - // If we fall through to here, we failed to write to a Console destination, and should switch - // to Logging and recurse. + // Release the lock, clear the Console, log the error and retry. + let error_str = format!( + "Failed to write stdout to {:?}, falling back to Logging: {:?}", + destination, error_res + ); + std::mem::drop(destination); self.console_clear(); - self.write_stdout(content) + log::warn!("{}", error_str); + self.write_stdout(content); } /// - /// Write the given content to the current stdout destination, returning an error if none is - /// available. + /// Write the given content to the current stdout Destination, without falling back to Logging. + /// Returns an error if only Logging is available. /// /// NB: This method is used from the logging crate, where attempting to fall back to logging for /// written stdio might result in infinite recursion. @@ -218,6 +253,10 @@ impl Destination { InnerDestination::Console(ref mut console) => { console.write_stderr(content).map_err(|e| e.to_string()) } + InnerDestination::UI { ref stderr_handler } => { + stderr_handler(&String::from_utf8_lossy(content)) + .map_err(|()| "UI handler failed.".to_owned()) + } InnerDestination::Logging => { Err("There is no 'real' stdio destination available.".to_owned()) } @@ -229,29 +268,43 @@ impl Destination { /// available. /// pub fn write_stderr(&self, content: &[u8]) { - { - let mut destination = self.0.lock(); - match *destination { - InnerDestination::Console(ref mut console) => { - // Write to the underlying Console. - if let Ok(()) = console.write_stderr(content) { - return; - } - // If writing to the stderr handle fails, fall through to mutate self to drop it. + let mut destination = self.0.lock(); + let error_res = match *destination { + InnerDestination::Console(ref mut console) => { + // Write to the underlying Console. + let res = console.write_stderr(content); + if res.is_ok() { + return; } - InnerDestination::Logging => { - // Release the lock on the Destination before logging. - std::mem::drop(destination); - log::info!("stderr: {:?}", content); + // If writing to the stdout handle fails, fall through to mutate self to drop it. + res.map_err(|e| e.to_string()) + } + InnerDestination::UI { ref stderr_handler } => { + // Write to the UI handler. + let res = stderr_handler(&String::from_utf8_lossy(content)); + if res.is_ok() { return; } + // If writing to the stderr handler fails, fall through to clear it and try again. + res.map_err(|()| "Failed to write stderr to UI".to_owned()) } - } + InnerDestination::Logging => { + // Release the lock on the Destination before logging. + std::mem::drop(destination); + log::info!("stderr: {:?}", String::from_utf8_lossy(content)); + return; + } + }; - // If we fall through to here, we failed to write to a Console destination, and should switch - // to Logging and recurse. + // Release the lock, clear the Console, log the error and retry. + let error_str = format!( + "Failed to write stderr to {:?}, falling back to Logging: {:?}", + destination, error_res + ); + std::mem::drop(destination); self.console_clear(); - self.write_stderr(content) + log::warn!("{}", error_str); + self.write_stderr(content); } /// @@ -259,12 +312,15 @@ impl Destination { /// but this method is additionally unsafe because the real file might have been closed by the /// time the caller interacts with it. /// - pub fn stdout_as_raw_fd(&self) -> Option { + pub fn stdout_as_raw_fd(&self) -> Result { match &*self.0.lock() { - InnerDestination::Console(console) => { - Some(console.stdout_handle.as_ref().unwrap().as_raw_fd()) + InnerDestination::Console(console) => Ok(console.stdout_as_raw_fd()), + InnerDestination::Logging => { + Err("No associated file descriptor for the Logging destination".to_owned()) + } + InnerDestination::UI { .. } => { + Err("A UI is running, and must be stopped before stdio is directly accessible.".to_owned()) } - InnerDestination::Logging => None, } } @@ -273,70 +329,19 @@ impl Destination { /// but this method is additionally unsafe because the real file might have been closed by the /// time the caller interacts with it. /// - pub fn stderr_as_raw_fd(&self) -> Option { + pub fn stderr_as_raw_fd(&self) -> Result { match &*self.0.lock() { - InnerDestination::Console(console) => { - Some(console.stderr_handle.as_ref().unwrap().as_raw_fd()) + InnerDestination::Console(console) => Ok(console.stderr_as_raw_fd()), + InnerDestination::Logging => { + Err("No associated file descriptor for the Logging destination".to_owned()) } - InnerDestination::Logging => None, - } - } - - pub fn register_stderr_handler(&self, callback: StdioHandler) -> Result { - let mut destination = self.0.lock(); - match *destination { - InnerDestination::Console(ref mut console) => Ok(console.register_stderr_handler(callback)), - InnerDestination::Logging => Err("stdio does not have an associated Console.".to_owned()), - } - } - - pub fn deregister_stderr_handler(&self, unique_id: Uuid) { - let mut destination = self.0.lock(); - match *destination { - InnerDestination::Console(ref mut console) => { - console.deregister_stderr_handler(unique_id); + InnerDestination::UI { .. } => { + Err("A UI is running, and must be stopped before stdio is directly accessible.".to_owned()) } - InnerDestination::Logging => {} } } } -/// -/// An implementation of Read and Write that reads from stdin and writes to stderr. -/// -/// Used to implement `console::Term` for use with the `indicatif` library. -/// -pub struct TermDestination { - // Optional so that it can be restored to the destination on Drop. - console: Option, - // The destination that the Console was taken from, and will be restored to. - destination: Arc, -} - -impl Read for TermDestination { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - Destination::read_stdin(&get_destination(), buf) - } -} - -impl Write for TermDestination { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - Destination::write_stderr_raw(&get_destination(), buf) - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; - Ok(buf.len()) - } - - fn flush(&mut self) -> std::io::Result<()> { - Ok(()) - } -} - -impl Drop for TermDestination { - fn drop(&mut self) { - self.destination.ui_clear(self.console.take().unwrap()) - } -} - thread_local! { /// /// See set_thread_destination. diff --git a/src/rust/engine/stdio/src/term.rs b/src/rust/engine/stdio/src/term.rs new file mode 100644 index 000000000000..00605b09f56e --- /dev/null +++ b/src/rust/engine/stdio/src/term.rs @@ -0,0 +1,79 @@ +use std::io::{Read, Write}; +use std::os::unix::io::{AsRawFd, RawFd}; +use std::sync::Arc; + +use parking_lot::Mutex; + +use crate::{Console, Destination}; + +/// +/// An implementation of Read and Write that reads from stdin and writes to stderr. +/// +/// Used to implement `console::Term` for use with the `indicatif` library. +/// +#[derive(Debug)] +pub(crate) struct TermDestination { + // Optional so that it can be restored to the destination on Drop. + console: Mutex>, + // The destination that the Console was taken from, and will be restored to. + destination: Arc, +} + +impl TermDestination { + pub(crate) fn new( + console: Console, + destination: Arc, + ) -> (TermReadDestination, TermWriteDestination) { + let term_destination = Arc::new(TermDestination { + console: Mutex::new(Some(console)), + destination, + }); + ( + TermReadDestination(term_destination.clone()), + TermWriteDestination(term_destination), + ) + } +} + +#[derive(Debug)] +pub struct TermReadDestination(Arc); + +#[derive(Debug)] +pub struct TermWriteDestination(Arc); + +impl Read for TermReadDestination { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + self.0.console.lock().as_mut().unwrap().read_stdin(buf) + } +} + +impl AsRawFd for TermReadDestination { + fn as_raw_fd(&self) -> RawFd { + self.0.console.lock().as_ref().unwrap().stdin_as_raw_fd() + } +} + +impl Write for TermWriteDestination { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.0.console.lock().as_mut().unwrap().write_stderr(buf)?; + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + +impl AsRawFd for TermWriteDestination { + fn as_raw_fd(&self) -> RawFd { + self.0.console.lock().as_ref().unwrap().stderr_as_raw_fd() + } +} + +impl Drop for TermDestination { + fn drop(&mut self) { + self + .destination + .ui_clear(self.console.lock().take().unwrap()) + } +} diff --git a/src/rust/engine/ui/src/console_ui.rs b/src/rust/engine/ui/src/console_ui.rs index c71fbf1a1a3c..8ff32615ab57 100644 --- a/src/rust/engine/ui/src/console_ui.rs +++ b/src/rust/engine/ui/src/console_ui.rs @@ -35,7 +35,6 @@ use std::time::Duration; use futures::future::{self, FutureExt, TryFutureExt}; use indexmap::IndexMap; use indicatif::{MultiProgress, ProgressBar, ProgressDrawTarget, ProgressStyle}; -use uuid::Uuid; use task_executor::Executor; use workunit_store::{format_workunit_duration, WorkunitStore}; @@ -101,8 +100,13 @@ impl ConsoleUI { } } - fn setup_bars(num_swimlanes: usize) -> (MultiProgress, Vec) { - let term = console::Term::read_write_pair(unimplemented!(), unimplemented!()); + fn setup_bars( + stderr_handler: stdio::StdioHandler, + num_swimlanes: usize, + ) -> Result<(MultiProgress, Vec), String> { + let (term_read, term_write) = stdio::get_destination().ui_create(stderr_handler)?; + + let term = console::Term::read_write_pair(term_read, term_write); // NB: We render more frequently than we receive new data in order to minimize aliasing where a // render might barely miss a data refresh. let draw_target = ProgressDrawTarget::to_term(term, Self::render_rate_hz() * 2); @@ -115,7 +119,7 @@ impl ConsoleUI { }) .collect(); - (multi_progress_bars, bars) + Ok((multi_progress_bars, bars)) } fn get_label_from_heavy_hitters<'a>( @@ -185,8 +189,9 @@ impl ConsoleUI { return Err("A ConsoleUI cannot render multiple UIs concurrently.".to_string()); } - // Setup bars, and then spawning rendering of the bars into a background task. - let (multi_progress, bars) = Self::setup_bars(self.local_parallelism); + // Setup bars (which will take ownership of the current Console), and then spawn rendering + // of the bars into a background task. + let (multi_progress, bars) = Self::setup_bars(stderr_handler, self.local_parallelism)?; let multi_progress_task = { executor .spawn_blocking(move || multi_progress.join()) @@ -196,10 +201,6 @@ impl ConsoleUI { self.instance = Some(Instance { tasks_to_display: IndexMap::new(), multi_progress_task, - logger_handle: stdio::Destination::register_stderr_handler( - &stdio::get_destination(), - stderr_handler, - )?, bars, }); Ok(()) @@ -212,10 +213,8 @@ impl ConsoleUI { if let Some(instance) = self.instance.take() { let sender = self.teardown_mpsc.0.clone(); self.teardown_in_progress = true; - stdio::Destination::deregister_stderr_handler( - &stdio::get_destination(), - instance.logger_handle, - ); + // When the MultiProgress completes, the Term(Destination) is dropped, which will restore + // direct access to the Console. instance .multi_progress_task .map_err(|e| format!("Failed to render UI: {}", e)) @@ -235,5 +234,4 @@ struct Instance { tasks_to_display: IndexMap>, multi_progress_task: Pin> + Send>>, bars: Vec, - logger_handle: Uuid, }