Skip to content

Commit

Permalink
Stream output while subprocess is running (#43)
Browse files Browse the repository at this point in the history
* Add a simple proof-of-concept of the output streaming pattern suggested in https://stackoverflow.com/a/72862682/113632

* Implement streaming support for subprocess output.

As suggested in https://stackoverflow.com/q/66060139 the child process'
streams are processed in background threads responsible for both
persisting and streaming the output to the caller's stdout/stderr.

Scoped threads are used to support streaming to references such as
`&mut Vec<u8>` and to loosen the type requirements of the stream (namely
to not require that they are 'static).

CLI:
* Subprocess out/err streams are duplicated to bkt's out/err while the
  subprocess runs, instead of caching all output first and then writing
  the cached output after the subprocess completes.
* Streaming is enabled by default and is not configurable.

Library:
* Added `Bkt::retrieve_streaming` and `Bkt::refresh_streaming` methods
  which accept out/err sinks. The subprocess' output is written to these
  streams in addition to being cached and available in the returned
  `Invocation`.
* These methods are marked "experimental" and are subject to change or
  even removal.
  • Loading branch information
dimo414 authored Aug 27, 2023
1 parent c611f84 commit 7d2e522
Show file tree
Hide file tree
Showing 5 changed files with 349 additions and 39 deletions.
7 changes: 1 addition & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Two flags, `--ttl` and `--stale`, configure how long cached data is preserved.
By default `bkt` uses a TTL (Time to Live) of 60 seconds, meaning cached
data older than sixty seconds will be discarded and the backing command re-run.
Passing a different value, such as `--ttl=1d`, will change how long the cached
data is considered valid. The default TTL can be overriden by defining a
data is considered valid. The default TTL can be overridden by defining a
`BKT_TTL` environment variable.

When the data expires `bkt` has to re-execute the command synchronously, which
Expand Down Expand Up @@ -246,11 +246,6 @@ See [this discussion](https://github.com/dimo414/bkt/discussions/29) for a more
complete example of using `bkt` with `fzf`, including warming the commands before
the user starts navigating the selector.

Note: one downside to using `bkt` is, currently, `bkt` doesn't
[stream](https://github.com/junegunn/fzf/pull/2215) the backing process' output.
This means when `bkt` has a cache miss the preview will be absent until the
process completes, even if partial output could be displayed sooner.

### Using `bkt` only if installed

You may want to distribute shell scripts that utilize `bkt` without requiring
Expand Down
61 changes: 61 additions & 0 deletions examples/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Demo of streaming the out/err of a subprocess as it executes.
// See also https://stackoverflow.com/a/72862682/113632 and #31

use std::io::{Read, Write};
use std::process::{Command, Stdio};

static BASH_CMD: &str =
"echo START; date >&2
sleep 1
printf MID
sleep 1
echo DLE; date >&2
sleep 3
echo DONE; date >&2";

fn stream(
mut source: impl Read,
mut sink: impl Write,
) -> std::io::Result<()> {
// This initialization can be avoided (safely) once
// https://github.com/rust-lang/rust/issues/78485 is stable.
let mut buf = [0u8; 1024];
loop {
let num_read = source.read(&mut buf)?;
if num_read == 0 {
break;
}

let buf = &buf[..num_read];
sink.write_all(buf)?;
// flush is needed to print partial lines, otherwise output is buffered until a newline
sink.flush()?;
}

Ok(())
}

fn main() {
let mut child = Command::new("bash").args(&["-c", BASH_CMD])
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("failed to execute child");

let child_out = child.stdout.take().expect("cannot attach to child stdout");
let child_err = child.stderr.take().expect("cannot attach to child stderr");

let thread_out = std::thread::spawn(move || {
stream(child_out, std::io::stdout())
.expect("error communicating with child stdout")
});
let thread_err = std::thread::spawn(move || {
stream(child_err, std::io::stderr()).expect("error communicating with child stderr")
});

thread_out.join().expect("child stdout thread failed to join");
thread_err.join().expect("child stderr thread failed to join");

let status = child.wait().expect("Subprocess wait failed");
assert!(status.success(), "Subprocess failed");
}
221 changes: 208 additions & 13 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ use std::convert::{TryFrom, TryInto};
use std::ffi::{OsString, OsStr};
use std::fs::{File, OpenOptions};
use std::hash::{Hash, Hasher};
use std::io::{self, BufReader, ErrorKind, BufWriter, Write};
use std::io::{self, BufReader, ErrorKind, BufWriter, Write, Read};
use std::path::{PathBuf, Path};
use std::time::{Duration, Instant, SystemTime};

use anyhow::{Context, Error, Result};
use anyhow::{anyhow, Context, Error, Result};
use serde::{Serialize, Deserialize};
use serde::de::DeserializeOwned;

Expand Down Expand Up @@ -1058,19 +1058,70 @@ impl Bkt {
Ok(())
}

fn execute_subprocess(cmd: impl Into<std::process::Command>) -> Result<Invocation> {
// Executes the given command, capturing its output and exit code in the returned Invocation.
// If output_streams is present the output of the command is _also_ written to these streams
// concurrently, in order to support displaying a command's output while simultaneously caching
// it (instead of waiting for the command to complete before outputting anything).
fn execute_subprocess(
cmd: impl Into<std::process::Command>,
output_streams: Option<(impl Write+Send, impl Write+Send)>
) -> Result<Invocation> {
fn maybe_tee(mut source: impl Read, mut sink: Option<impl Write>) -> std::io::Result<Vec<u8>> {
let mut ret = Vec::new();

// This initialization can be avoided (safely) once
// https://github.com/rust-lang/rust/issues/78485 is stable.
let mut buf = [0u8; 1024 * 10];
loop {
let num_read = source.read(&mut buf)?;
if num_read == 0 {
break;
}

let buf = &buf[..num_read];
if let Some(ref mut sink) = sink {
sink.write_all(buf)?;
sink.flush()?;
}
ret.extend(buf);
}
Ok(ret)
}

let (out_sink, err_sink) = match output_streams {
Some((out, err)) => (Some(out), Some(err)),
None => (None, None),
};

let mut command: std::process::Command = cmd.into();
use std::process::Stdio;
let command = command.stdout(Stdio::piped()).stderr(Stdio::piped());

let start = Instant::now();
// TODO write to stdout/stderr while running, rather than after the process completes?
// See https://stackoverflow.com/q/66060139
let result = command.output()
.with_context(|| format!("Failed to run command: {:?}", command))?;
let mut child = command.spawn().with_context(|| format!("Failed to run command: {:?}", command))?;

let child_out = child.stdout.take().ok_or(anyhow!("cannot attach to child stdout"))?;
let child_err = child.stderr.take().ok_or(anyhow!("cannot attach to child stderr"))?;

// Using scoped threads means we can take a Write+Send instead of a W+S+'static, allowing
// callers to pass mutable references (such as `&mut Vec<u8>`). See also
// https://stackoverflow.com/q/32750829/113632
let (stdout, stderr) = std::thread::scope(|s| {
let thread_out = s.spawn(|| maybe_tee(child_out, out_sink));
let thread_err = s.spawn(|| maybe_tee(child_err, err_sink));
let stdout = thread_out.join().expect("child stdout thread failed to join").context("stdout pipe failed")?;
let stderr = thread_err.join().expect("child stderr thread failed to join").context("stderr pipe failed")?;
anyhow::Ok((stdout, stderr))
})?;

let status = child.wait()?;
let runtime = start.elapsed();

Ok(Invocation {
stdout: result.stdout,
stderr: result.stderr,
stdout,
stderr,
// TODO handle signals, see https://stackoverflow.com/q/66272686
exit_code: result.status.code().unwrap_or(126),
exit_code: status.code().unwrap_or(126),
runtime,
})
}
Expand All @@ -1089,15 +1140,60 @@ impl Bkt {
pub fn retrieve<T>(&self, command: T, ttl: Duration) -> Result<(Invocation, CacheStatus)> where
T: TryInto<CommandState>,
anyhow::Error: From<T::Error>, // https://stackoverflow.com/a/72627328
{
self.retrieve_impl(command, ttl, None::<(std::io::Stdout, std::io::Stderr)>)
}

/// **Experimental** This method is subject to change.
///
/// Looks up the given command in Bkt's cache. If found (and newer than the given TTL) returns
/// the cached invocation. If stale or not found the command is executed and the result is
/// cached and then returned. Additionally, the invocation's stdout and stderr are written to
/// the given streams in real time.
///
/// The second element in the returned tuple reports whether or not the invocation was cached
/// and includes information such as the cached data's age or the executed subprocess' runtime.
///
/// # Errors
///
/// If looking up, deserializing, executing, or serializing the command fails. This generally
/// reflects a user error such as an invalid command.
pub fn retrieve_streaming<T>(
&self,
command: T,
ttl: Duration,
stdout_sink: impl Write+Send,
stderr_sink: impl Write+Send,
) -> Result<(Invocation, CacheStatus)> where
T: TryInto<CommandState>,
anyhow::Error: From<T::Error>, // https://stackoverflow.com/a/72627328
{
self.retrieve_impl(command, ttl, Some((stdout_sink, stderr_sink)))
}

fn retrieve_impl<T>(
&self, command: T,
ttl: Duration,
output_streams: Option<(impl Write+Send, impl Write+Send)>
) -> Result<(Invocation, CacheStatus)> where
T: TryInto<CommandState>,
anyhow::Error: From<T::Error>, // https://stackoverflow.com/a/72627328
{
let command = command.try_into()?;
let cached = self.cache.lookup(&command, ttl).context("Cache lookup failed")?;
let result = match cached {
Some((cached, mtime)) => (cached, CacheStatus::Hit(Instant::now() - mtime.elapsed()?)),
Some((inv, mtime)) => {
let inv: Invocation = inv; //The if-let confuses type inference for some reason, if that's commented out this line isn't needed
if let Some((mut stdout, mut stderr)) = output_streams {
stdout.write_all(inv.stdout())?;
stderr.write_all(inv.stderr())?;
}
(inv, CacheStatus::Hit(Instant::now() - mtime.elapsed()?))
},
None => {
let cleanup_hook = self.maybe_cleanup_once();
let start = Instant::now();
let result = Bkt::execute_subprocess(&command).context("Subprocess execution failed")?;
let result = Bkt::execute_subprocess(&command, output_streams).context("Subprocess execution failed")?;
let runtime = Instant::now() - start;
if command.persist_failures || result.exit_code == 0 {
self.cache.store(&command, &result, ttl).context("Cache write failed")?;
Expand All @@ -1121,11 +1217,47 @@ impl Bkt {
pub fn refresh<T>(&self, command: T, ttl: Duration) -> Result<(Invocation, Duration)> where
T: TryInto<CommandState>,
anyhow::Error: From<T::Error>, // https://stackoverflow.com/a/72627328
{
self.refresh_impl(command, ttl, None::<(std::io::Stdout, std::io::Stderr)>)
}

/// Unconditionally executes the given command and caches the invocation for the given TTL.
/// This can be used to "warm" the cache so that subsequent calls to `execute` are fast.
/// The invocation's stdout and stderr are written to the given streams in real time in addition
/// to being cached.
///
/// The second element in the returned tuple is the subprocess' execution time.
///
/// # Errors
///
/// If executing or serializing the command fails. This generally reflects a user error such as
/// an invalid command.
pub fn refresh_streaming<T>(
&self,
command: T,
ttl: Duration,
stdout_sink: impl Write+Send,
stderr_sink: impl Write+Send,
) -> Result<(Invocation, Duration)> where
T: TryInto<CommandState>,
anyhow::Error: From<T::Error>, // https://stackoverflow.com/a/72627328
{
self.refresh_impl(command, ttl, Some((stdout_sink, stderr_sink)))
}

fn refresh_impl<T>(
&self,
command: T,
ttl: Duration,
output_streams: Option<(impl Write+Send, impl Write+Send)>
) -> Result<(Invocation, Duration)> where
T: TryInto<CommandState>,
anyhow::Error: From<T::Error>, // https://stackoverflow.com/a/72627328
{
let command = command.try_into()?;
let cleanup_hook = self.maybe_cleanup_once();
let start = Instant::now();
let result = Bkt::execute_subprocess(&command).context("Subprocess execution failed")?;
let result = Bkt::execute_subprocess(&command, output_streams).context("Subprocess execution failed")?;
let elapsed = Instant::now() - start;
if command.persist_failures || result.exit_code == 0 {
self.cache.store(&command, &result, ttl).context("Cache write failed")?;
Expand Down Expand Up @@ -1256,6 +1388,69 @@ mod bkt_tests {
assert!(cached_status.is_hit());
}

#[test]
fn streaming_same_output() {
let dir = TestDir::temp();

let cmd = CommandDesc::new(["bash", "-c", r#"echo StdOut; echo StdErr >&2"#]);
let bkt = Bkt::create(dir.path("cache")).unwrap();

let mut stdout = Vec::new();
let mut stderr = Vec::new();
let (res, stat) = bkt.retrieve_streaming(
&cmd, Duration::from_secs(10), &mut stdout, &mut stderr).unwrap();
assert!(stat.is_miss());
assert_eq!(&stdout, &res.stdout);
assert_eq!(&stderr, &res.stderr);
assert_eq!(res.stdout_utf8(), "StdOut\n");
assert_eq!(res.stderr_utf8(), "StdErr\n");

let mut stdout = Vec::new();
let mut stderr = Vec::new();
let (res, stat) = bkt.retrieve_streaming(
&cmd, Duration::from_secs(10), &mut stdout, &mut stderr).unwrap();
assert!(stat.is_hit());
assert_eq!(&stdout, &res.stdout);
assert_eq!(&stderr, &res.stderr);
assert_eq!(res.stdout_utf8(), "StdOut\n");
assert_eq!(res.stderr_utf8(), "StdErr\n");
}

#[test]
fn streaming_refresh() {
let dir = TestDir::temp();

let cmd = CommandDesc::new(["bash", "-c", r#"echo StdOut; echo StdErr >&2"#]);
let bkt = Bkt::create(dir.path("cache")).unwrap();

let mut stdout = Vec::new();
let mut stderr = Vec::new();
let (res, _) = bkt.refresh_streaming(
&cmd, Duration::from_secs(10), &mut stdout, &mut stderr).unwrap();

assert_eq!(&stdout, &res.stdout);
assert_eq!(&stderr, &res.stderr);
assert_eq!(res.stdout_utf8(), "StdOut\n");
assert_eq!(res.stderr_utf8(), "StdErr\n");
}

// Just a proof-of-concept that streaming to files works as well.
#[test]
fn streaming_to_file() {
let dir = TestDir::temp();

let cmd = CommandDesc::new(["bash", "-c", r#"echo StdOut; echo StdErr >&2"#]);
let bkt = Bkt::create(dir.path("cache")).unwrap();

let out = File::create(dir.path("out")).unwrap();
let err = File::create(dir.path("err")).unwrap();
let _ = bkt.retrieve_streaming(
&cmd, Duration::from_secs(10), out, err).unwrap();

assert_eq!(std::fs::read_to_string(dir.path("out")).unwrap(), "StdOut\n");
assert_eq!(std::fs::read_to_string(dir.path("err")).unwrap(), "StdErr\n");
}

#[test]
fn with_working_dir() {
let dir = TestDir::temp().create("wd", FileType::Dir);
Expand Down
Loading

0 comments on commit 7d2e522

Please sign in to comment.