Skip to content

Commit

Permalink
Merge pull request #15 from MattiasBuelens/fix-release-lock-tests
Browse files Browse the repository at this point in the history
Fix releaseLock() documentation and tests
  • Loading branch information
MattiasBuelens authored Oct 15, 2022
2 parents 3960e8b + c4a1e60 commit 6dc7ee5
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 31 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## Unreleased

* Updated documentation of `ReadableStream(Default|BYOB)Reader::release_lock()` around the expected behavior when there are pending read requests.
See the corresponding [Streams specification change](https://github.com/whatwg/streams/commit/d5f92d9f17306d31ba6b27424d23d58e89bf64a5) for details.

## v0.2.3 (2022-05-18)

* Replaced `futures` dependency with `futures-util` to improve compilation times ([#11](https://github.com/MattiasBuelens/wasm-streams/pull/11), [#12](https://github.com/MattiasBuelens/wasm-streams/pull/12))
Expand Down
23 changes: 15 additions & 8 deletions src/readable/byob_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ use super::{sys, IntoAsyncRead, ReadableStream};
/// This is returned by the [`get_byob_reader`](ReadableStream::get_byob_reader) method.
///
/// When the reader is dropped, it automatically [releases its lock](https://streams.spec.whatwg.org/#release-a-lock).
/// If the reader still has a pending read request at this point (i.e. if a future returned
/// by [`read`](Self::read) is not yet ready), then this will **panic**. You must either `await`
/// all `read` futures, or [`cancel`](Self::cancel) the stream to discard any pending `read` futures.
#[derive(Debug)]
pub struct ReadableStreamBYOBReader<'stream> {
raw: sys::ReadableStreamBYOBReader,
Expand Down Expand Up @@ -146,9 +143,14 @@ impl<'stream> ReadableStreamBYOBReader<'stream> {
/// [Releases](https://streams.spec.whatwg.org/#release-a-lock) this reader's lock on the
/// corresponding stream.
///
/// **Panics** if the reader still has a pending read request, i.e. if a future returned
/// by [`read`](Self::read) is not yet ready. For a non-panicking variant,
/// use [`try_release_lock`](Self::try_release_lock).
/// [As of January 2022](https://github.com/whatwg/streams/commit/d5f92d9f17306d31ba6b27424d23d58e89bf64a5),
/// the Streams standard allows the lock to be released even when there are still pending read
/// requests. Such requests will automatically become rejected, and this function will always
/// succeed.
///
/// However, if the Streams implementation is not yet up-to-date with this change, then
/// releasing the lock while there are pending read requests will **panic**. For a non-panicking
/// variant, use [`try_release_lock`](Self::try_release_lock).
#[inline]
pub fn release_lock(mut self) {
self.release_lock_mut()
Expand All @@ -163,8 +165,13 @@ impl<'stream> ReadableStreamBYOBReader<'stream> {
/// Try to [release](https://streams.spec.whatwg.org/#release-a-lock) this reader's lock on the
/// corresponding stream.
///
/// The lock cannot be released while the reader still has a pending read request, i.e.
/// if a future returned by [`read`](Self::read) is not yet ready. Attempting to do so will
/// [As of January 2022](https://github.com/whatwg/streams/commit/d5f92d9f17306d31ba6b27424d23d58e89bf64a5),
/// the Streams standard allows the lock to be released even when there are still pending read
/// requests. Such requests will automatically become rejected, and this function will always
/// return `Ok(())`.
///
/// However, if the Streams implementation is not yet up-to-date with this change, then
/// the lock cannot be released while there are pending read requests. Attempting to do so will
/// return an error and leave the reader locked to the stream.
#[inline]
pub fn try_release_lock(self) -> Result<(), (js_sys::Error, Self)> {
Expand Down
23 changes: 15 additions & 8 deletions src/readable/default_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ use super::{sys, IntoStream, ReadableStream};
/// This is returned by the [`get_reader`](ReadableStream::get_reader) method.
///
/// When the reader is dropped, it automatically [releases its lock](https://streams.spec.whatwg.org/#release-a-lock).
/// If the reader still has a pending read request at this point (i.e. if a future returned
/// by [`read`](Self::read) is not yet ready), then this will **panic**. You must either `await`
/// all `read` futures, or [`cancel`](Self::cancel) the stream to discard any pending `read` futures.
#[derive(Debug)]
pub struct ReadableStreamDefaultReader<'stream> {
raw: sys::ReadableStreamDefaultReader,
Expand Down Expand Up @@ -80,9 +77,14 @@ impl<'stream> ReadableStreamDefaultReader<'stream> {
/// [Releases](https://streams.spec.whatwg.org/#release-a-lock) this reader's lock on the
/// corresponding stream.
///
/// **Panics** if the reader still has a pending read request, i.e. if a future returned
/// by [`read`](Self::read) is not yet ready. For a non-panicking variant,
/// use [`try_release_lock`](Self::try_release_lock).
/// [As of January 2022](https://github.com/whatwg/streams/commit/d5f92d9f17306d31ba6b27424d23d58e89bf64a5),
/// the Streams standard allows the lock to be released even when there are still pending read
/// requests. Such requests will automatically become rejected, and this function will always
/// succeed.
///
/// However, if the Streams implementation is not yet up-to-date with this change, then
/// releasing the lock while there are pending read requests will **panic**. For a non-panicking
/// variant, use [`try_release_lock`](Self::try_release_lock).
#[inline]
pub fn release_lock(mut self) {
self.release_lock_mut()
Expand All @@ -97,8 +99,13 @@ impl<'stream> ReadableStreamDefaultReader<'stream> {
/// Try to [release](https://streams.spec.whatwg.org/#release-a-lock) this reader's lock on the
/// corresponding stream.
///
/// The lock cannot be released while the reader still has a pending read request, i.e.
/// if a future returned by [`read`](Self::read) is not yet ready. Attempting to do so will
/// [As of January 2022](https://github.com/whatwg/streams/commit/d5f92d9f17306d31ba6b27424d23d58e89bf64a5),
/// the Streams standard allows the lock to be released even when there are still pending read
/// requests. Such requests will automatically become rejected, and this function will always
/// return `Ok(())`.
///
/// However, if the Streams implementation is not yet up-to-date with this change, then
/// the lock cannot be released while there are pending read requests. Attempting to do so will
/// return an error and leave the reader locked to the stream.
#[inline]
pub fn try_release_lock(self) -> Result<(), (js_sys::Error, Self)> {
Expand Down
7 changes: 0 additions & 7 deletions src/readable/into_async_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,6 @@ use super::ReadableStreamBYOBReader;
/// When this `AsyncRead` is dropped, it also drops its reader which in turn
/// [releases its lock](https://streams.spec.whatwg.org/#release-a-lock).
///
/// When used through [`ReadableStream::into_async_read`](super::ReadableStream::into_async_read),
/// the stream is automatically cancelled before dropping the reader, discarding any pending read requests.
/// When used through [`ReadableStreamBYOBReader::into_async_read`](super::ReadableStreamBYOBReader::into_async_read),
/// it is up to the user to either manually [cancel](Self::cancel) the stream,
/// or to ensure that there are no pending read requests when dropped.
/// See the documentation on [`ReadableStreamBYOBReader`] for more details on the drop behavior.
///
/// [`AsyncRead`]: https://docs.rs/futures/0.3.18/futures/io/trait.AsyncRead.html
#[must_use = "readers do nothing unless polled"]
#[derive(Debug)]
Expand Down
7 changes: 0 additions & 7 deletions src/readable/into_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,6 @@ use super::ReadableStreamDefaultReader;
/// When this `Stream` is dropped, it also drops its reader which in turn
/// [releases its lock](https://streams.spec.whatwg.org/#release-a-lock).
///
/// When used through [`ReadableStream::into_stream`](super::ReadableStream::into_stream),
/// the stream is automatically cancelled before dropping the reader, discarding any pending read requests.
/// When used through [`ReadableStreamDefaultReader::into_stream`](super::ReadableStreamDefaultReader::into_stream),
/// it is up to the user to either manually [cancel](Self::cancel) the stream,
/// or to ensure that there are no pending read requests when dropped.
/// See the documentation on [`ReadableStreamDefaultReader`] for more details on the drop behavior.
///
/// [`Stream`]: https://docs.rs/futures/0.3.18/futures/stream/trait.Stream.html
#[must_use = "streams do nothing unless polled"]
#[derive(Debug)]
Expand Down
18 changes: 17 additions & 1 deletion tests/js/readable_stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,20 @@ export function new_readable_byte_stream_from_array(chunks) {
}
}
});
}
}

/**
* Tests whether `reader.releaseLock()` is allowed while there are pending read requests.
*
* See: https://github.com/whatwg/streams/commit/d5f92d9f17306d31ba6b27424d23d58e89bf64a5
*/
export function supports_release_lock_with_pending_read() {
try {
const reader = new ReadableStream().getReader();
reader.read().then(() => {}, () => {});
reader.releaseLock();
return true;
} catch {
return false;
}
}
1 change: 1 addition & 0 deletions tests/js/readable_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ extern "C" {
pub fn new_noop_readable_byte_stream() -> sys::ReadableStream;
pub fn new_readable_stream_from_array(chunks: Box<[JsValue]>) -> sys::ReadableStream;
pub fn new_readable_byte_stream_from_array(chunks: Box<[JsValue]>) -> sys::ReadableStream;
pub fn supports_release_lock_with_pending_read() -> bool;
}
27 changes: 27 additions & 0 deletions tests/tests/readable_byte_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,33 @@ async fn test_readable_byte_stream_multiple_byob_readers() {
}

async fn test_readable_byte_stream_abort_read(mut readable: ReadableStream) {
if supports_release_lock_with_pending_read() {
test_readable_byte_stream_abort_read_new(readable).await;
} else {
test_readable_byte_stream_abort_read_old(readable).await;
}
}

async fn test_readable_byte_stream_abort_read_new(mut readable: ReadableStream) {
let mut reader = readable.get_byob_reader();

// Start reading
// Since the stream will never produce a chunk, this read will remain pending forever
let mut dst = [0u8; 3];
let mut fut = reader.read(&mut dst).boxed_local();
// We need to poll the future at least once to start the read
let poll_result = poll!(&mut fut);
assert!(matches!(poll_result, Poll::Pending));
// Drop the future, to regain control over the reader
drop(fut);

// Releasing the lock should work even while there are pending reads
reader
.try_release_lock()
.expect("releasing the reader should work even while there are pending reads");
}

async fn test_readable_byte_stream_abort_read_old(mut readable: ReadableStream) {
let mut reader = readable.get_byob_reader();

// Start reading
Expand Down
29 changes: 29 additions & 0 deletions tests/tests/readable_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,35 @@ async fn test_readable_stream_multiple_readers() {

#[wasm_bindgen_test]
async fn test_readable_stream_abort_read() {
if supports_release_lock_with_pending_read() {
test_readable_stream_abort_read_new().await;
} else {
test_readable_stream_abort_read_old().await;
}
}

async fn test_readable_stream_abort_read_new() {
let stream = pending();
let (stream, observer) = observe_drop(stream);
let mut readable = ReadableStream::from_stream(stream);
let mut reader = readable.get_reader();

// Start reading
// Since the stream will never produce a chunk, this read will remain pending forever
let mut fut = reader.read().boxed_local();
// We need to poll the future at least once to start the read
let poll_result = poll!(&mut fut);
assert!(matches!(poll_result, Poll::Pending));
// Drop the future, to regain control over the reader
drop(fut);

// Releasing the lock should work even while there are pending reads
reader
.try_release_lock()
.expect("releasing the reader should work even while there are pending reads");
}

async fn test_readable_stream_abort_read_old() {
let stream = pending();
let (stream, observer) = observe_drop(stream);
let mut readable = ReadableStream::from_stream(stream);
Expand Down

0 comments on commit 6dc7ee5

Please sign in to comment.