From 05d74ea3fbabc0adcb0098d4896a623cc5a34ed5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Sch=C3=BCtt?= Date: Mon, 17 Oct 2022 13:51:10 +0200 Subject: [PATCH] Fix WASI pipe to properly store read bytes in temp_buffer --- lib/c-api/src/wasm_c_api/wasi/mod.rs | 93 +++++++++++++++++++--------- lib/wasi/src/state/pipe.rs | 26 +++++--- 2 files changed, 80 insertions(+), 39 deletions(-) diff --git a/lib/c-api/src/wasm_c_api/wasi/mod.rs b/lib/c-api/src/wasm_c_api/wasi/mod.rs index 5e7c34c75e1..b83dc3fd243 100644 --- a/lib/c-api/src/wasm_c_api/wasi/mod.rs +++ b/lib/c-api/src/wasm_c_api/wasi/mod.rs @@ -58,6 +58,7 @@ pub struct wasi_pipe_t { data: Option>>>, } +#[derive(Debug)] struct WasiPipeDataWithDestructor { data: Vec, // Buffer of already-read data that is being read into, @@ -71,12 +72,17 @@ impl WasiPipeDataWithDestructor { &mut self, read_cb: WasiConsoleIoReadCallback, max_read: Option, - ) -> io::Result> { + ) -> io::Result { const BLOCK_SIZE: usize = 1024; let mut final_buf = Vec::new(); - let max_read = max_read.unwrap_or(usize::MAX); + let max_to_read = max_read.unwrap_or(usize::MAX); + let max_read = max_to_read.saturating_sub(self.temp_buffer.len()); + if max_read == 0 { + // there are n bytes being available to read in the temp_buffer + return Ok(max_to_read); + } let mut cur_read = 0; // Read bytes until either EOF is reached or max_read bytes are reached @@ -107,15 +113,28 @@ impl WasiPipeDataWithDestructor { )); } - if result == 0 { + let result = result as usize; + if result == 0 || result > temp_buffer.len() { break; // EOF } - cur_read += temp_buffer.len(); - final_buf.append(&mut temp_buffer); + cur_read += result; + final_buf.extend_from_slice(&temp_buffer[..result]); } - Ok(final_buf) + let final_buf_len = final_buf.len(); + + // store the bytes in temp_buffer + self.temp_buffer.extend_from_slice(&final_buf); + + // temp_buffer.len() can be smaller than max_read in case we + // encounter EOF earlier than expected + assert!(self.temp_buffer.len() <= max_read); + + // return how many bytes were just read + // + // caller has to clear temp_buffer to advance actual reading + Ok(final_buf_len) } } @@ -161,26 +180,11 @@ impl io::Read for wasi_pipe_t { fn read(&mut self, buf: &mut [u8]) -> io::Result { let self_read = self.read; let mut data = self.get_data_mut("read")?; - - // fill up buf by draining temp_buffer first, then read more bytes - let bytes_to_read = data.temp_buffer.len().min(buf.len()); - let mut temp_buffer_drained: Vec<_> = data.temp_buffer.drain(..bytes_to_read).collect(); - assert!(temp_buffer_drained.len() <= buf.len()); - - // If temp_buffer is exhausted, try reading the remaining bytes from the pipe - let mut bytes_read = bytes_to_read; - if buf.len() >= temp_buffer_drained.len() { - let secondary_bytes_to_read = data.temp_buffer.len().min(buf.len()); - data.read_buffer(self_read, Some(secondary_bytes_to_read))?; - temp_buffer_drained - .append(&mut data.temp_buffer.drain(..secondary_bytes_to_read).collect()); - bytes_read += secondary_bytes_to_read; - } - - assert_eq!(buf.len(), temp_buffer_drained.len()); - buf.clone_from_slice(&temp_buffer_drained); - - Ok(bytes_read) + let _ = data.read_buffer(self_read, Some(buf.len()))?; + let bytes_to_read = buf.len().min(data.temp_buffer.len()); + let bytes_read = data.temp_buffer.drain(..bytes_to_read).collect::>(); + buf[..bytes_read.len()].clone_from_slice(&bytes_read); + Ok(bytes_to_read) } } @@ -278,7 +282,7 @@ impl VirtualFile for wasi_pipe_t { fn bytes_available_read(&self) -> Result, FsError> { let self_read = self.read; let mut data = self.get_data_mut("bytes_available_read")?; - data.read_buffer(self_read, None)?; + let _ = data.read_buffer(self_read, None)?; Ok(Some(data.temp_buffer.len())) } fn bytes_available_write(&self) -> Result, FsError> { @@ -507,6 +511,36 @@ pub unsafe extern "C" fn wasi_pipe_flush(ptr: *mut wasi_pipe_t) -> i64 { } } +#[test] +fn test_wasi_pipe_with_destructor() { + let mut wasi_pipe_t_ptr = std::ptr::null_mut(); + let second_wasi_pipe_t_ptr = unsafe { wasi_pipe_new(&mut wasi_pipe_t_ptr) }; + let wasi_pipe_t_ptr = unsafe { &mut *wasi_pipe_t_ptr }; + let second_wasi_pipe_t_ptr = unsafe { &mut *second_wasi_pipe_t_ptr }; + + let data = b"hello".into_iter().map(|v| *v as i8).collect::>(); + let result = unsafe { wasi_pipe_write_bytes(wasi_pipe_t_ptr, data.as_ptr(), data.len()) }; + assert_eq!(result, 5); + + let bytes_avail = wasi_pipe_t_ptr.bytes_available_read(); + assert_eq!(bytes_avail, Ok(Some(0))); + + let bytes_avail2 = second_wasi_pipe_t_ptr.bytes_available_read(); + assert_eq!(bytes_avail2, Ok(Some(5))); + + let mut read_str_ptr = std::ptr::null_mut(); + let result = unsafe { wasi_pipe_read_str(second_wasi_pipe_t_ptr, &mut read_str_ptr) }; + assert_eq!(result, 6); // hello\0 + let buf_slice = unsafe { std::slice::from_raw_parts_mut(read_str_ptr, result as usize) }; + assert_eq!(buf_slice[..5], data); + + unsafe { + wasi_pipe_delete_str(read_str_ptr); + } + unsafe { wasi_pipe_delete(wasi_pipe_t_ptr) }; + unsafe { wasi_pipe_delete(second_wasi_pipe_t_ptr) }; +} + #[no_mangle] pub unsafe extern "C" fn wasi_pipe_read_bytes( ptr: *const wasi_pipe_t, @@ -553,12 +587,13 @@ unsafe fn wasi_pipe_read_bytes_internal(ptr: *const wasi_pipe_t, buf: &mut Vec i64 { +pub unsafe extern "C" fn wasi_pipe_read_str(ptr: *const wasi_pipe_t, buf: &mut *mut c_char) -> i64 { use std::ffi::CString; let mut target = Vec::new(); diff --git a/lib/wasi/src/state/pipe.rs b/lib/wasi/src/state/pipe.rs index 70e41b249bd..60754a0cb0f 100644 --- a/lib/wasi/src/state/pipe.rs +++ b/lib/wasi/src/state/pipe.rs @@ -319,8 +319,6 @@ impl Read for WasiPipe { inner_buf.advance(read); return Ok(read); } - } else if !self.block { - return Ok(0); } } let rx = self.rx.lock().unwrap(); @@ -339,18 +337,26 @@ impl Read for WasiPipe { s } Err(_) => { - // could not immediately receive bytes, so we need to block - match rx.recv() { - Ok(o) => o, - // Errors can happen if the sender has been dropped already - // In this case, just return 0 to indicate that we can't read any - // bytes anymore - Err(_) => { - return Ok(0); + if !self.block { + // If self.block is explicitly set to false, never block + Vec::new() + } else { + // could not immediately receive bytes, so we need to block + match rx.recv() { + Ok(o) => o, + // Errors can happen if the sender has been dropped already + // In this case, just return 0 to indicate that we can't read any + // bytes anymore + Err(_) => { + return Ok(0); + } } } } }; + if data.is_empty() && self.read_buffer.as_ref().map(|s| s.len()).unwrap_or(0) == 0 { + return Ok(0); + } self.read_buffer.replace(Bytes::from(data)); } }