Skip to content

Commit

Permalink
Implement bytes_stream() for wasm. (#1713)
Browse files Browse the repository at this point in the history
Co-authored-by: muji <[email protected]>
  • Loading branch information
dmeijboom and tmpfs authored Jan 5, 2023
1 parent c2a1870 commit 4d96adf
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 4 deletions.
10 changes: 6 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ multipart = ["mime_guess"]

trust-dns = ["trust-dns-resolver"]

stream = ["tokio/fs", "tokio-util"]
stream = ["tokio/fs", "tokio-util", "wasm-streams"]

socks = ["tokio-socks"]

Expand All @@ -83,6 +83,8 @@ bytes = "1.0"
serde = "1.0"
serde_urlencoded = "0.7.1"
tower-service = "0.3"
futures-core = { version = "0.3.0", default-features = false }
futures-util = { version = "0.3.0", default-features = false }

# Optional deps...

Expand All @@ -93,8 +95,6 @@ mime_guess = { version = "2.0", default-features = false, optional = true }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
encoding_rs = "0.8"
futures-core = { version = "0.3.0", default-features = false }
futures-util = { version = "0.3.0", default-features = false }
http-body = "0.4.0"
hyper = { version = "0.14.18", default-features = false, features = ["tcp", "http1", "http2", "client", "runtime"] }
h2 = "0.3.10"
Expand Down Expand Up @@ -154,6 +154,7 @@ js-sys = "0.3.45"
serde_json = "1.0"
wasm-bindgen = "0.2.68"
wasm-bindgen-futures = "0.4.18"
wasm-streams = { version = "0.2", optional = true }

[target.'cfg(target_arch = "wasm32")'.dependencies.web-sys]
version = "0.3.25"
Expand All @@ -169,7 +170,8 @@ features = [
"BlobPropertyBag",
"ServiceWorkerGlobalScope",
"RequestCredentials",
"File"
"File",
"ReadableStream"
]

[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
Expand Down
26 changes: 26 additions & 0 deletions src/wasm/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ use http::{HeaderMap, StatusCode};
use js_sys::Uint8Array;
use url::Url;

#[cfg(feature = "stream")]
use wasm_bindgen::JsCast;

#[cfg(feature = "stream")]
use futures_util::stream::StreamExt;

#[cfg(feature = "json")]
use serde::de::DeserializeOwned;

Expand Down Expand Up @@ -118,6 +124,26 @@ impl Response {
Ok(bytes.into())
}

/// Convert the response into a `Stream` of `Bytes` from the body.
#[cfg(feature = "stream")]
pub fn bytes_stream(self) -> impl futures_core::Stream<Item = crate::Result<Bytes>> {
let web_response = self.http.into_body();
let body = web_response
.body()
.expect("could not create wasm byte stream");
let body = wasm_streams::ReadableStream::from_raw(body.unchecked_into());
Box::pin(body.into_stream().map(|buf_js| {
let buffer = Uint8Array::new(
&buf_js
.map_err(crate::error::wasm)
.map_err(crate::error::decode)?,
);
let mut bytes = vec![0; buffer.length() as usize];
buffer.copy_to(&mut bytes);
Ok(bytes.into())
}))
}

// util methods

/// Turn a response into an error if the server returned an error.
Expand Down

0 comments on commit 4d96adf

Please sign in to comment.