Skip to content

Commit

Permalink
Add stream module to futures crate
Browse files Browse the repository at this point in the history
The stream module exposes the JsStream type used to convert
JS objects implementing the AsyncIterator interface to be used
as Rust streams.
  • Loading branch information
olanod committed Jan 5, 2021
1 parent 9f725e7 commit 52b5b10
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 2 deletions.
2 changes: 2 additions & 0 deletions crates/futures/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ edition = "2018"
cfg-if = "1.0.0"
js-sys = { path = "../js-sys", version = '0.3.46' }
wasm-bindgen = { path = "../..", version = '0.2.69' }
futures-core = { version = '0.3.8', default-features = false }

[target.'cfg(target_feature = "atomics")'.dependencies.web-sys]
path = "../web-sys"
Expand All @@ -26,3 +27,4 @@ features = [
[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
wasm-bindgen-test = { path = '../test', version = '0.3.19' }
futures-channel-preview = { version = "0.3.0-alpha.18" }
futures-lite = { version = "1.11.3", default-features = false }
1 change: 1 addition & 0 deletions crates/futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use std::task::{Context, Poll, Waker};
use wasm_bindgen::prelude::*;

mod queue;
pub mod stream;

mod task {
use cfg_if::cfg_if;
Expand Down
81 changes: 81 additions & 0 deletions crates/futures/src/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
//! Converting JavaScript `AsyncIterator`s to Rust `Stream`s.
//!
//! Analogous to the promise to future convertion, this module allows the
//! turing objects implementing the async iterator protocol into `Stream`s
//! that produce values that can be awaited from.
//!

use crate::JsFuture;
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use futures_core::stream::Stream;
use js_sys::{AsyncIterator, IteratorNext};
use wasm_bindgen::{prelude::*, JsCast};

/// A `Stream` that yields values from an underlying `AsyncIterator`.
pub struct JsStream {
iter: AsyncIterator,
next: Option<JsFuture>,
done: bool,
}

impl JsStream {
fn next_future(&self) -> Result<JsFuture, JsValue> {
self.iter.next().map(JsFuture::from)
}
}

impl From<AsyncIterator> for JsStream {
fn from(iter: AsyncIterator) -> Self {
JsStream {
iter,
next: None,
done: false,
}
}
}

impl Stream for JsStream {
type Item = Result<JsValue, JsValue>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if self.done {
return Poll::Ready(None);
}

let future = match self.next.as_mut() {
Some(val) => val,
None => match self.next_future() {
Ok(val) => {
self.next = Some(val);
self.next.as_mut().unwrap()
}
Err(e) => {
self.done = true;
return Poll::Ready(Some(Err(e)));
}
},
};

match Pin::new(future).poll(cx) {
Poll::Ready(res) => match res {
Ok(iter_next) => {
let next = iter_next.unchecked_into::<IteratorNext>();
if next.done() {
self.done = true;
Poll::Ready(None)
} else {
self.next.take();
Poll::Ready(Some(Ok(next.value())))
}
}
Err(e) => {
self.done = true;
Poll::Ready(Some(Err(e)))
}
},
Poll::Pending => Poll::Pending,
}
}
}
23 changes: 21 additions & 2 deletions crates/futures/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);

use futures_channel::oneshot;
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::{future_to_promise, spawn_local, JsFuture};
use wasm_bindgen::{prelude::*, JsCast};
use wasm_bindgen_futures::{future_to_promise, spawn_local, stream::JsStream, JsFuture};
use wasm_bindgen_test::*;

#[wasm_bindgen_test]
Expand Down Expand Up @@ -88,3 +88,22 @@ async fn can_create_multiple_futures_from_same_promise() {
a.await.unwrap();
b.await.unwrap();
}

#[wasm_bindgen_test]
async fn can_use_an_async_iterable_as_stream() {
use futures_lite::stream::StreamExt;
let async_iter = js_sys::Function::new_no_args(
"return async function*() {
yield 42;
yield 24;
}()",
)
.call0(&JsValue::undefined())
.unwrap()
.unchecked_into::<js_sys::AsyncIterator>();

let mut stream = JsStream::from(async_iter);
assert_eq!(stream.next().await, Some(Ok(JsValue::from(42))));
assert_eq!(stream.next().await, Some(Ok(JsValue::from(24))));
assert_eq!(stream.next().await, None);
}

0 comments on commit 52b5b10

Please sign in to comment.