Skip to content

Commit

Permalink
Add pseudo-streams. (bytecodealliance#29)
Browse files Browse the repository at this point in the history
* Add pseudo-streams.

This add a pseudo-stream type to the wasi-poll interface, and adds ways
to obtain streams from command invocation and from files. In the future,
it can support sockets too. With this, `command` takes streams for
stdin/stdout, rather than filesystem descriptors.

Streams support reading and writing, as well as skipping,
repeated-element writing, and splicing from one stream to another. And
there are `subscribe-*` functions to produce pseudo-futures from
pseudo-streams, allowing them to be polled.

This makes the polyfill somewhat more complex, but this is largely due
to the polyfill being tied to the preview1 API.

This replaces the `seek` and `tell` functions, and implemented `fd_seek`
and `fd_tell` in terms of the polyfill's own position.

Also, add a dedicated stderr API for writing to stderr in a way that
tolerates strings that aren't necessarily expected to be newlines. And
add a way to test whether stderr is a terminal.

* Implement the host side of `poll_oneoff`.

This implements pseudo-futures and subscription functions, and adds
polling for streams.

* Implement clock subscriptions.

wasi.wit:

 - Remove the "timers" API from wasi-clocks, as it's now redundant with
   pseudo-future clock subscriptions.

 - Remove `subscribe-wall-clock`. Wall-clock timeouts were implemented by
   converting them to monotonic-clock timeouts anyway, so just make that
   explicit in the WASI API, and teach the polyfill how to convert
   wall-clock timeouts into monotonic-clock timeouts.

 - Move `subscribe-monotonic-clock` out of wasi-clocks and into wasi-poll,
   as it's closely tied to the pseudo-futures mechanism and the `poll-oneoff`
   implementation.

 - While here, fix `stream-read` and related functions to return an
   end-of-stream/file indicator.

Code changes:

 - `default_wall_clock()` and `default_monotonic_clock()` now always
   create a new table entry, rather than holding a table index in the
   `WasiCtx` which could potentially dangle.

 - Add support for monotonic-clock poll subscriptions.

 - Say "wall clock" instead of "system clock" when we have a choice.

* Remove the `OFlags::APPEND` flag, which is no longer used.
  • Loading branch information
sunfishcode authored Dec 23, 2022
1 parent 0798e85 commit d351a66
Show file tree
Hide file tree
Showing 30 changed files with 1,466 additions and 671 deletions.
2 changes: 2 additions & 0 deletions host/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ tracing = { workspace = true }
wasmtime = { git = "https://github.com/bytecodealliance/wasmtime", features = ["component-model"] }
wasi-common = { path = "../wasi-common" }
wasi-cap-std-sync = { path = "../wasi-common/cap-std-sync" }
is-terminal = "0.4.1"
terminal_size = "0.2.3"

[dev-dependencies]
test-programs-macros = { path = "../test-programs/macros" }
Expand Down
89 changes: 21 additions & 68 deletions host/src/clocks.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::{wasi_clocks, wasi_default_clocks, WasiCtx};
use anyhow::Context;
use cap_std::time::SystemTime;
use wasi_common::clocks::{MonotonicClock, MonotonicTimer, WallClock};
use wasi_common::clocks::{TableMonotonicClockExt, TableWallClockExt};

impl TryFrom<SystemTime> for wasi_clocks::Datetime {
type Error = anyhow::Error;
Expand All @@ -19,102 +18,56 @@ impl TryFrom<SystemTime> for wasi_clocks::Datetime {

#[async_trait::async_trait]
impl wasi_default_clocks::WasiDefaultClocks for WasiCtx {
async fn default_monotonic_clock(&mut self) -> anyhow::Result<wasi_clocks::MonotonicClock> {
Ok(self.clocks.default_monotonic)
async fn default_wall_clock(&mut self) -> anyhow::Result<wasi_clocks::WallClock> {
// Create a new handle to the default wall clock.
let new = self.clocks.default_wall_clock.dup();
Ok(self.table_mut().push(Box::new(new))?)
}

async fn default_wall_clock(&mut self) -> anyhow::Result<wasi_clocks::WallClock> {
Ok(self.clocks.default_wall)
async fn default_monotonic_clock(&mut self) -> anyhow::Result<wasi_clocks::MonotonicClock> {
// Create a new handle to the default monotonic clock.
let new = self.clocks.default_monotonic_clock.dup();
Ok(self.table_mut().push(Box::new(new))?)
}
}

#[async_trait::async_trait]
impl wasi_clocks::WasiClocks for WasiCtx {
async fn subscribe_wall_clock(
&mut self,
when: wasi_clocks::Datetime,
absolute: bool,
) -> anyhow::Result<wasi_clocks::WasiFuture> {
drop((when, absolute));
todo!()
}

async fn subscribe_monotonic_clock(
&mut self,
when: wasi_clocks::Instant,
absolute: bool,
) -> anyhow::Result<wasi_clocks::WasiFuture> {
drop((when, absolute));
todo!()
}

async fn monotonic_clock_now(
&mut self,
fd: wasi_clocks::MonotonicClock,
) -> anyhow::Result<wasi_clocks::Instant> {
let clock = self.table.get::<MonotonicClock>(fd)?;
let now = clock.now(self.clocks.monotonic.as_ref());
Ok(now
.as_nanos()
.try_into()
.context("converting monotonic time to nanos u64")?)
Ok(self.table().get_monotonic_clock(fd)?.now())
}

async fn monotonic_clock_resolution(
&mut self,
fd: wasi_clocks::MonotonicClock,
) -> anyhow::Result<wasi_clocks::Instant> {
self.table.get::<MonotonicClock>(fd)?;
let res = self.clocks.monotonic.resolution();
Ok(res
.as_nanos()
.try_into()
.context("converting monotonic resolution to nanos u64")?)
}

async fn monotonic_clock_new_timer(
&mut self,
fd: wasi_clocks::MonotonicClock,
initial: wasi_clocks::Instant,
) -> anyhow::Result<wasi_clocks::MonotonicTimer> {
let clock = self.table.get::<MonotonicClock>(fd)?;
let timer = clock.new_timer(std::time::Duration::from_micros(initial));
drop(clock);
let timer_fd = self.table.push(Box::new(timer))?;
Ok(timer_fd)
Ok(self.table().get_monotonic_clock(fd)?.now())
}

async fn wall_clock_now(
&mut self,
fd: wasi_clocks::WallClock,
) -> anyhow::Result<wasi_clocks::Datetime> {
let clock = self.table.get::<WallClock>(fd)?;
Ok(clock.now(self.clocks.system.as_ref()).try_into()?)
let clock = self.table().get_wall_clock(fd)?;
let now = clock.now();
Ok(wasi_clocks::Datetime {
seconds: now.as_secs(),
nanoseconds: now.subsec_nanos(),
})
}

async fn wall_clock_resolution(
&mut self,
fd: wasi_clocks::WallClock,
) -> anyhow::Result<wasi_clocks::Datetime> {
self.table.get::<WallClock>(fd)?;
let nanos = self.clocks.system.resolution().as_nanos();
let clock = self.table().get_wall_clock(fd)?;
let res = clock.resolution();
Ok(wasi_clocks::Datetime {
seconds: (nanos / 1_000_000_000_u128)
.try_into()
.context("converting wall clock resolution to seconds u64")?,
nanoseconds: (nanos % 1_000_000_000_u128).try_into().unwrap(),
seconds: res.as_secs(),
nanoseconds: res.subsec_nanos(),
})
}

async fn monotonic_timer_current(
&mut self,
fd: wasi_clocks::MonotonicTimer,
) -> anyhow::Result<wasi_clocks::Instant> {
let timer = self.table.get::<MonotonicTimer>(fd)?;
Ok(timer
.current(self.clocks.monotonic.as_ref())
.as_nanos()
.try_into()
.context("converting monotonic timer to nanos u64")?)
}
}
120 changes: 79 additions & 41 deletions host/src/filesystem.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
#![allow(unused_variables)]

use crate::wasi_poll::WasiStream;
use crate::{wasi_filesystem, HostResult, WasiCtx};
use std::{
io::{IoSlice, IoSliceMut},
ops::BitAnd,
time::SystemTime,
};
use wasi_common::{dir::TableDirExt, file::TableFileExt, WasiDir, WasiFile};
use wasi_common::{
dir::TableDirExt,
file::{FileStream, TableFileExt},
WasiDir, WasiFile,
};

fn contains<T: BitAnd<Output = T> + Eq + Copy>(flags: T, flag: T) -> bool {
(flags & flag) == flag
Expand Down Expand Up @@ -118,9 +123,6 @@ impl Into<wasi_common::file::OFlags> for wasi_filesystem::OFlags {
impl Into<wasi_common::file::FdFlags> for wasi_filesystem::DescriptorFlags {
fn into(self) -> wasi_common::file::FdFlags {
let mut flags = wasi_common::file::FdFlags::empty();
if contains(self, wasi_filesystem::DescriptorFlags::APPEND) {
flags |= wasi_common::file::FdFlags::APPEND;
}
if contains(self, wasi_filesystem::DescriptorFlags::DSYNC) {
flags |= wasi_common::file::FdFlags::DSYNC;
}
Expand Down Expand Up @@ -242,19 +244,19 @@ impl wasi_filesystem::WasiFilesystem for WasiCtx {
fd: wasi_filesystem::Descriptor,
len: wasi_filesystem::Size,
offset: wasi_filesystem::Filesize,
) -> HostResult<Vec<u8>, wasi_filesystem::Errno> {
let f = self.table().get_file_mut(u32::from(fd)).map_err(convert)?;
) -> HostResult<(Vec<u8>, bool), wasi_filesystem::Errno> {
let f = self.table_mut().get_file_mut(fd).map_err(convert)?;

let mut buffer = vec![0; len.try_into().unwrap()];

let bytes_read = f
let (bytes_read, end) = f
.read_vectored_at(&mut [IoSliceMut::new(&mut buffer)], offset)
.await
.map_err(convert)?;

buffer.truncate(bytes_read.try_into().unwrap());

Ok(buffer)
Ok((buffer, end))
}

async fn pwrite(
Expand All @@ -263,7 +265,7 @@ impl wasi_filesystem::WasiFilesystem for WasiCtx {
buf: Vec<u8>,
offset: wasi_filesystem::Filesize,
) -> HostResult<wasi_filesystem::Size, wasi_filesystem::Errno> {
let f = self.table().get_file_mut(u32::from(fd)).map_err(convert)?;
let f = self.table_mut().get_file_mut(fd).map_err(convert)?;

let bytes_written = f
.write_vectored_at(&[IoSlice::new(&buf)], offset)
Expand Down Expand Up @@ -294,42 +296,13 @@ impl wasi_filesystem::WasiFilesystem for WasiCtx {
todo!()
}

async fn seek(
&mut self,
fd: wasi_filesystem::Descriptor,
from: wasi_filesystem::SeekFrom,
) -> HostResult<wasi_filesystem::Filesize, wasi_filesystem::Errno> {
use std::io::SeekFrom;

let from = match from {
wasi_filesystem::SeekFrom::Cur(offset) => SeekFrom::Current(offset),
wasi_filesystem::SeekFrom::End(offset) => SeekFrom::End(offset.try_into().unwrap()),
wasi_filesystem::SeekFrom::Set(offset) => SeekFrom::Start(offset),
};

Ok(self
.table()
.get_file_mut(fd)
.map_err(convert)?
.seek(from)
.await
.map_err(convert)?)
}

async fn sync(
&mut self,
fd: wasi_filesystem::Descriptor,
) -> HostResult<(), wasi_filesystem::Errno> {
todo!()
}

async fn tell(
&mut self,
fd: wasi_filesystem::Descriptor,
) -> HostResult<wasi_filesystem::Filesize, wasi_filesystem::Errno> {
todo!()
}

async fn create_directory_at(
&mut self,
fd: wasi_filesystem::Descriptor,
Expand All @@ -345,7 +318,7 @@ impl wasi_filesystem::WasiFilesystem for WasiCtx {
let table = self.table();
if table.is::<Box<dyn WasiFile>>(fd) {
Ok(table
.get_file_mut(fd)
.get_file(fd)
.map_err(convert)?
.get_filestat()
.await
Expand Down Expand Up @@ -405,7 +378,7 @@ impl wasi_filesystem::WasiFilesystem for WasiCtx {
// TODO: How should this be used?
_mode: wasi_filesystem::Mode,
) -> HostResult<wasi_filesystem::Descriptor, wasi_filesystem::Errno> {
let table = self.table();
let table = self.table_mut();
if !table.is::<Box<dyn WasiDir>>(fd) {
return Err(wasi_filesystem::Errno::Notdir.into());
}
Expand Down Expand Up @@ -444,7 +417,7 @@ impl wasi_filesystem::WasiFilesystem for WasiCtx {
}

async fn close(&mut self, fd: wasi_filesystem::Descriptor) -> anyhow::Result<()> {
let table = self.table();
let table = self.table_mut();
if table.is::<Box<dyn WasiFile>>(fd) {
let _ = table.delete(fd);
} else if table.is::<Box<dyn WasiDir>>(fd) {
Expand Down Expand Up @@ -554,4 +527,69 @@ impl wasi_filesystem::WasiFilesystem for WasiCtx {
) -> HostResult<(), wasi_filesystem::Errno> {
todo!()
}

async fn read_via_stream(
&mut self,
fd: wasi_filesystem::Descriptor,
offset: u64,
) -> HostResult<WasiStream, wasi_filesystem::Errno> {
let f = self.table_mut().get_file_mut(fd).map_err(convert)?;

// Duplicate the file descriptor so that we get an indepenent lifetime.
let clone = f.try_clone().await.map_err(convert)?;

// Create a stream view for it.
let reader = FileStream::new_reader(clone, offset);

// Box it up.
let boxed: Box<dyn wasi_common::WasiStream> = Box::new(reader);

// Insert the stream view into the table.
let index = self.table_mut().push(Box::new(boxed)).map_err(convert)?;

Ok(index)
}

async fn write_via_stream(
&mut self,
fd: wasi_filesystem::Descriptor,
offset: u64,
) -> HostResult<WasiStream, wasi_filesystem::Errno> {
let f = self.table_mut().get_file_mut(fd).map_err(convert)?;

// Duplicate the file descriptor so that we get an indepenent lifetime.
let clone = f.try_clone().await.map_err(convert)?;

// Create a stream view for it.
let writer = FileStream::new_writer(clone, offset);

// Box it up.
let boxed: Box<dyn wasi_common::WasiStream> = Box::new(writer);

// Insert the stream view into the table.
let index = self.table_mut().push(Box::new(boxed)).map_err(convert)?;

Ok(index)
}

async fn append_via_stream(
&mut self,
fd: wasi_filesystem::Descriptor,
) -> HostResult<WasiStream, wasi_filesystem::Errno> {
let f = self.table_mut().get_file_mut(fd).map_err(convert)?;

// Duplicate the file descriptor so that we get an indepenent lifetime.
let clone = f.try_clone().await.map_err(convert)?;

// Create a stream view for it.
let appender = FileStream::new_appender(clone);

// Box it up.
let boxed: Box<dyn wasi_common::WasiStream> = Box::new(appender);

// Insert the stream view into the table.
let index = self.table_mut().push(Box::new(boxed)).map_err(convert)?;

Ok(index)
}
}
2 changes: 2 additions & 0 deletions host/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod filesystem;
mod logging;
mod poll;
mod random;
mod stderr;
mod tcp;
pub use wasi_common::{table::Table, WasiCtx};

Expand All @@ -23,6 +24,7 @@ pub fn add_to_linker<T: Send>(
wasi_default_clocks::add_to_linker(l, f)?;
wasi_filesystem::add_to_linker(l, f)?;
wasi_logging::add_to_linker(l, f)?;
wasi_stderr::add_to_linker(l, f)?;
wasi_poll::add_to_linker(l, f)?;
wasi_random::add_to_linker(l, f)?;
wasi_tcp::add_to_linker(l, f)?;
Expand Down
Loading

0 comments on commit d351a66

Please sign in to comment.