From 995aa5c927377f4617fbf7f561e033af7256853c Mon Sep 17 00:00:00 2001 From: Drew Ridley Date: Sun, 23 Jun 2024 16:23:58 -0400 Subject: [PATCH] complete connection --- crates/nevy_wasm/Cargo.toml | 2 + crates/nevy_wasm/src/connection.rs | 323 ++++++++++++-------------- crates/nevy_wasm/src/stream/mod.rs | 148 +----------- crates/nevy_wasm/src/stream/reader.rs | 40 ---- crates/nevy_wasm/src/stream/recv.rs | 49 +++- crates/nevy_wasm/src/stream/send.rs | 33 ++- crates/nevy_wasm/src/stream/writer.rs | 34 --- 7 files changed, 223 insertions(+), 406 deletions(-) delete mode 100644 crates/nevy_wasm/src/stream/reader.rs delete mode 100644 crates/nevy_wasm/src/stream/writer.rs diff --git a/crates/nevy_wasm/Cargo.toml b/crates/nevy_wasm/Cargo.toml index 1aefdb5..a91c796 100644 --- a/crates/nevy_wasm/Cargo.toml +++ b/crates/nevy_wasm/Cargo.toml @@ -23,6 +23,8 @@ wasm-bindgen-futures = "0.4" js-sys = "0.3.69" futures-lite = "1.12" bytes = "1.6.0" +slotmap = "1.0.7" +log = "0.4.21" [dev-dependencies] wasm-bindgen-test = "0.3" diff --git a/crates/nevy_wasm/src/connection.rs b/crates/nevy_wasm/src/connection.rs index 0ca4bc3..4dbfbd8 100644 --- a/crates/nevy_wasm/src/connection.rs +++ b/crates/nevy_wasm/src/connection.rs @@ -1,209 +1,180 @@ -use std::collections::HashMap; - -use bytes::Bytes; -use futures_lite::{ - future::{block_on, poll_once, race, BoxedLocal}, - FutureExt, -}; -use js_sys::{Reflect, Uint8Array}; -use transport_interface::{ConnectionMut, ConnectionRef}; -use wasm_bindgen::JsCast; +use futures_lite::future::{block_on, poll_once, Boxed, BoxedLocal}; +use futures_lite::FutureExt; +use log::{debug, info, trace, warn}; +use slotmap::{new_key_type, SlotMap}; +use std::collections::VecDeque; use wasm_bindgen::JsValue; use wasm_bindgen_futures::JsFuture; -use web_sys::{ - ReadableStream, ReadableStreamDefaultReader, WebTransport, WebTransportBidirectionalStream, - WebTransportCloseInfo, WebTransportReceiveStream, WebTransportSendStream, -}; +use web_sys::WebTransport; -use crate::{ - error::WebError, - stream::reader::Reader, - stream::writer::Writer, - stream::{recv::RecvStream, send::SendStream}, - stream::{WasmRecvStream, WasmSendStream, WasmStreamId}, -}; +use crate::error::WebError; +use crate::stream::{RecvStream, SendStream}; -pub struct WasmConnectionId(u64); - -pub struct WasmConnection { - /// If exists, we are connecting. - inner: web_sys::WebTransport, - ready_future: Option>>, - accept_stream_future: Option>>, - /// contains futures for streams that haven't been opened yet - pub(crate) unopened_streams: Vec<( - WasmStreamId, - BoxedLocal), WebError>>, - )>, - - /// Contains all of the completed streams ready to receive data on. - recv_streams: HashMap, - /// Contaisn all of the completed streams ready to write to. - send_streams: HashMap, +new_key_type! { + /// A ephemeral identifier used to track a particular local stream. + /// + /// A single [StreamKey] may be associated with both a [SendStream] and [RecvStream] + /// if the stream is of the bidirectional type. + pub struct StreamKey; } -impl WasmConnection { - pub(crate) fn new(url: &str) -> Self { - let transport = web_sys::WebTransport::new(url).unwrap(); - let ready_future = Some(JsFuture::from(transport.ready()).boxed_local()); - - let incoming_bidir = transport.incoming_bidirectional_streams(); - let incoming_unidir = transport.incoming_unidirectional_streams(); +/// The type of stream. +/// +/// Methods to get a send or recv stream may return [None] if a stream is not of the requested direction. +pub(crate) enum StreamType { + Send(SendStream), + Recv(RecvStream), + Bidirectional(SendStream, RecvStream), +} - let accept_stream_future = async move { - let bidir: ReadableStreamDefaultReader = incoming_bidir.get_reader().unchecked_into(); - let unidir: ReadableStreamDefaultReader = incoming_unidir.get_reader().unchecked_into(); - race(JsFuture::from(bidir.read()), JsFuture::from(unidir.read())).await - } - .boxed_local(); +/// The current state of the stream. +/// +/// A stream may be currently Opening, in which case the runtime must poll the future. +/// Otherwise, the stream is ready and it's writer/reader can be used. +pub(crate) enum StreamState { + Opening(Boxed>), + Ready(StreamType), + Closed, +} - Self { - inner: transport, - ready_future, - accept_stream_future: Some(accept_stream_future), - unopened_streams: vec![], - recv_streams: HashMap::new(), - send_streams: HashMap::new(), - } - } +/// A wrapper containing the state of the stream. +pub struct Stream { + state: StreamState, +} - async fn accept_uni(transport: &WebTransport) -> Result { - let mut reader = Reader::new(&transport.incoming_unidirectional_streams())?; - let stream: WebTransportReceiveStream = reader.read().await?.expect("closed without error"); - let recv = RecvStream::new(stream)?; - Ok(recv) - } +pub struct WasmConnection { + transport: WebTransport, + ready_future: Option>>, - async fn accept_bi(transport: &WebTransport) -> Result<(SendStream, RecvStream), WebError> { - let mut reader = Reader::new(&transport.incoming_bidirectional_streams())?; - let stream: WebTransportBidirectionalStream = - reader.read().await?.expect("closed without error"); + streams: SlotMap, + pending_streams: Vec, + pending_events: VecDeque, +} - let send = SendStream::new(stream.writable())?; - let recv = RecvStream::new(stream.readable())?; +/// A event associated with a particular [StreamKey]. +/// +pub enum StreamEvent { + Opened(StreamKey), + Closed(StreamKey), + Error(StreamKey, WebError), +} - Ok((send, recv)) - } +impl WasmConnection { + pub fn new(transport: WebTransport) -> Self { + let promise = JsFuture::from(transport.ready()); + let future = Some(async move { promise.await }.boxed_local()); - pub async fn send_datagram( - transport: &mut WebTransport, - payload: Bytes, - ) -> Result<(), WebError> { - let mut writer = Writer::new(&transport.datagrams().writable())?; - writer.write(&Uint8Array::from(payload.as_ref())).await?; - Ok(()) + Self { + transport, + ready_future: future, + streams: SlotMap::with_key(), + pending_events: VecDeque::new(), + pending_streams: vec![], + } } - pub async fn recv_datagram(transport: &mut WebTransport) -> Result { - let mut reader = Reader::new(&transport.datagrams().readable())?; - let data: Uint8Array = reader.read().await?.unwrap_or_default(); - Ok(data.to_vec().into()) + /// Opens a stream. + /// + /// The opened stream is facilitated through manual polling of the inserted future. + /// The returned [StreamKey] may be unavailable for reading/writing for a few ticks. + /// User applications should not expect immediate usage of the stream. + pub fn open_stream(&mut self, is_bidirectional: bool) -> StreamKey { + let future = self.create_stream_future(is_bidirectional); + let stream = Stream { + state: StreamState::Opening(future), + }; + self.streams.insert(stream) } - pub fn close(self, code: u32, reason: &str) { - let mut info = WebTransportCloseInfo::new(); - info.close_code(code); - info.reason(reason); - self.inner.close_with_close_info(&info); + fn create_stream_future(&self, is_bidirectional: bool) -> Boxed> { + async move { Err(WebError::from(JsValue::null())) }.boxed() } - fn update(&mut self) { - if let Some(r_future) = self.ready_future.as_mut() { - match block_on(poll_once(r_future)) { - Some(Ok(_)) => { - self.ready_future = None; - } - Some(Err(_)) => { - self.ready_future = None; - // Handle error if needed - } - None => return, - } + /// Internally polls for the latest stream events. + pub(crate) fn poll_events(&mut self) -> Option { + if let Some(event) = self.pending_events.pop_front() { + return Some(event); } + None + } - if let Some(a_future) = self.accept_stream_future.as_mut() { - match block_on(poll_once(a_future)) { - Some(Ok(_)) => { - let incoming_bidir = self.inner.incoming_bidirectional_streams(); - let incoming_unidir = self.inner.incoming_unidirectional_streams(); - - let new_accept_stream_future = async move { - let bidir: ReadableStreamDefaultReader = - incoming_bidir.get_reader().unchecked_into(); - let unidir: ReadableStreamDefaultReader = - incoming_unidir.get_reader().unchecked_into(); - race(JsFuture::from(bidir.read()), JsFuture::from(unidir.read())).await - } - .boxed_local(); - - self.accept_stream_future = Some(new_accept_stream_future); - } - Some(Err(_)) => { - self.accept_stream_future = None; - // Handle error if needed - } - None => {} - } + /// Tries to progress the internal futures used to open or accept streams. + /// + /// Uses the internal futures directly to accomplish this. + pub(crate) fn update(&mut self) { + // If we are not ready, let's try to progress the ready state and then return. + if let Some(not_ready) = &mut self.ready_future { + trace!("Progressing the 'ready' promise."); + if let Some(result) = block_on(poll_once(not_ready)) { + debug!("'ready()' promise was resolved to: {:?}", result); + self.ready_future = None; + }; + return; } - self.unopened_streams.retain_mut(|(stream_id, future)| { - match block_on(poll_once(future)) { - Some(Ok((send, recv))) => { - self.send_streams - .insert(*stream_id, WasmSendStream::new(send)); - if let Some(recv) = recv { - self.recv_streams - .insert(*stream_id, WasmRecvStream::new(recv)); - } - false - } - Some(Err(_)) => { - // Handle error if needed - false - } - None => true, + //Progress stream opening. + self.pending_streams.retain(|key| { + if let Some(stream) = self.streams.get_mut(*key) { + + //If we are currently opening, lets progress the future. + //Otherwise, we can just remove the entry from the list. + let StreamState::Opening(open) = &mut stream.state else { + trace!("A pending stream was already open! Removing it from 'pending_streams'."); + return false; + }; + + let Some(poll_result) = block_on(poll_once(open)) else { + //We need to retain the future since its not finished progressing. + trace!("Stream future is not resolved. Continuing."); + return true; + }; + + debug!("Stream was ready. Progressing state enum and removing entry."); + stream.state = StreamState::Ready(poll_result.unwrap()); + false + } else { + // Stream doesn't exist, remove it from pending_streams + debug!("A stream was marked as pending but did not exist in the streams map. Removing from pending streams."); + false } }); } - fn get_stats(&self) -> () { - () - } - - fn disconnect(&mut self) { - self.inner.close(); - // Clean up resources - self.ready_future = None; - self.accept_stream_future = None; - self.unopened_streams.clear(); - self.recv_streams.clear(); - self.send_streams.clear(); - } -} - -impl<'c> ConnectionRef<'c> for &'c WasmConnection { - type ConnectionStats = (); - - fn get_stats(&self) -> Self::ConnectionStats { - // Implement this method to return connection statistics - () + /// Fetches a mutable [SendStream] identified by the specified [StreamKey]. + /// + /// Will return [None] if the stream does not exist, or is a unidirectional receive stream. + pub fn send_stream(&mut self, key: StreamKey) -> Option<&mut SendStream> { + if let Some(stream) = self.streams.get_mut(key) { + match &mut stream.state { + StreamState::Ready(StreamType::Send(send)) => Some(send), + StreamState::Ready(StreamType::Bidirectional(send, _)) => Some(send), + _ => None, + } + } else { + None + } } -} -impl<'c> ConnectionMut<'c> for &'c mut WasmConnection { - type NonMut<'b> = &'b WasmConnection where 'c: 'b; - type StreamType = WasmStreamId; - - fn as_ref<'b>(&'b self) -> Self::NonMut<'b> { - self + /// Fetches a mutable [RecvStream] identified by the specified [StreamKey]. + /// + /// Will return [None] if the stream does not exist, or is a unidirectional send stream. + pub fn recv_stream(&mut self, key: StreamKey) -> Option<&mut RecvStream> { + if let Some(stream) = self.streams.get_mut(key) { + match &mut stream.state { + StreamState::Ready(StreamType::Recv(recv)) => Some(recv), + StreamState::Ready(StreamType::Bidirectional(_, recv)) => Some(recv), + _ => None, + } + } else { + None + } } - fn disconnect(&mut self) { - self.inner.close(); - self.ready_future = None; - self.accept_stream_future = None; - self.unopened_streams.clear(); - self.recv_streams.clear(); - self.send_streams.clear(); + /// Close the stream associated with the specified key. + pub fn close_stream(&mut self, key: StreamKey) { + if let Some(stream) = self.streams.get_mut(key) { + stream.state = StreamState::Closed; + self.pending_events.push_back(StreamEvent::Closed(key)); + } } } diff --git a/crates/nevy_wasm/src/stream/mod.rs b/crates/nevy_wasm/src/stream/mod.rs index 452a99f..3aaf9f4 100644 --- a/crates/nevy_wasm/src/stream/mod.rs +++ b/crates/nevy_wasm/src/stream/mod.rs @@ -1,150 +1,4 @@ -use crate::{connection::WasmConnection, error::WebError}; -use transport_interface::{ConnectionMut, ErrorFatality, RecvStreamMut, SendStreamMut, StreamId}; -use wasm_bindgen::{JsCast, JsValue}; -use wasm_bindgen_futures::JsFuture; -use web_sys::{WebTransport, WebTransportBidirectionalStream, WebTransportSendStream}; - -pub(crate) mod reader; pub(crate) mod recv; pub(crate) mod send; -pub(crate) mod writer; - -use {recv::RecvStream, send::SendStream}; - -#[derive(Clone, Copy, Eq, PartialEq, Hash)] -pub struct WasmStreamId(u64); - -impl WasmStreamId { - pub async fn open_bi( - transport: &mut WebTransport, - ) -> Result<(SendStream, Option), WebError> { - let stream: WebTransportBidirectionalStream = - JsFuture::from(transport.create_bidirectional_stream()) - .await? - .dyn_into()?; - - let send = SendStream::new(stream.writable())?; - let recv = RecvStream::new(stream.readable())?; - - Ok((send, Some(recv))) - } - - pub async fn open_uni( - transport: &mut WebTransport, - ) -> Result<(SendStream, Option), WebError> { - let stream: WebTransportSendStream = - JsFuture::from(transport.create_unidirectional_stream()) - .await? - .dyn_into()?; - - let send = SendStream::new(stream)?; - Ok((send, None)) - } -} - -impl StreamId for WasmStreamId { - type Connection<'c> = &'c mut WasmConnection; - type SendMut<'s> = &'s mut WasmSendStream; - type RecvMut<'s> = &'s mut WasmRecvStream; - - type OpenDescription = (); - fn open<'c>( - connection: &mut Self::Connection<'c>, - description: Self::OpenDescription, - ) -> Option - where - Self: Sized, - { - //Here we allocate an ID and box a future with that ID. - //We then return the same ID to the client for use. - } - - fn get_send<'c, 's>( - self, - connection: &'s mut Self::Connection<'c>, - ) -> Option> { - connection.send_stream(self) - } - - fn get_recv<'c, 's>( - self, - connection: &'s mut Self::Connection<'c>, - ) -> Option> { - connection.recv_stream(self) - } - - fn poll_events<'c>( - connection: &mut Self::Connection<'c>, - ) -> Option> - where - Self: Sized, - { - None - } -} - -pub struct WasmSendStream { - send: SendStream, -} - -impl WasmSendStream { - pub(crate) fn new(send: SendStream) -> Self { - Self { send } - } -} - -#[derive(Debug)] -pub struct SendBufferFull; - -/// A full send buffer is 'fatal' because it will not be spontaneously resolved if invoked again. -/// The only way to continue writing is to flush the buffer back to the socket again, which happens once per [Update]. -impl ErrorFatality for SendBufferFull { - fn is_fatal(&self) -> bool { - true - } -} - -impl<'s> SendStreamMut<'s> for &'s mut WasmSendStream { - type SendError = SendBufferFull; - type CloseDescription = String; - - fn send(&mut self, data: &[u8]) -> Result { - todo!() - } - - fn close(&mut self, description: Self::CloseDescription) -> Result<(), ()> { - todo!() - } - - fn is_open(&self) -> bool { - todo!() - } -} - -pub struct WasmRecvStream { - recv: RecvStream, -} - -impl WasmRecvStream { - pub(crate) fn new(recv: RecvStream) -> Self { - Self { recv } - } -} - -impl<'s> RecvStreamMut<'s> for &'s mut WasmRecvStream { - type ReadError = (); - - type CloseDescription = (); - - fn recv(&mut self, limit: usize) -> Result, Self::ReadError> { - todo!() - } - - fn close(&mut self, description: Self::CloseDescription) -> Result<(), ()> { - todo!() - } - fn is_open(&self) -> bool { - todo!() - } -} +pub use {recv::RecvStream, send::SendStream}; diff --git a/crates/nevy_wasm/src/stream/reader.rs b/crates/nevy_wasm/src/stream/reader.rs deleted file mode 100644 index 0889d53..0000000 --- a/crates/nevy_wasm/src/stream/reader.rs +++ /dev/null @@ -1,40 +0,0 @@ -use crate::error::WebError; -use js_sys::Reflect; -use wasm_bindgen::{JsCast, JsValue}; -use wasm_bindgen_futures::JsFuture; -use web_sys::{ReadableStream, ReadableStreamDefaultReader, ReadableStreamReadResult}; - -// Wrapper around ReadableStream -pub struct Reader { - inner: ReadableStreamDefaultReader, -} - -impl Reader { - pub fn new(stream: &ReadableStream) -> Result { - let inner = stream.get_reader().unchecked_into(); - Ok(Self { inner }) - } - - pub async fn read(&mut self) -> Result, WebError> { - let result: ReadableStreamReadResult = JsFuture::from(self.inner.read()).await?.into(); - - if Reflect::get(&result, &"done".into())?.is_truthy() { - return Ok(None); - } - - let res = Reflect::get(&result, &"value".into())?.dyn_into()?; - Ok(Some(res)) - } - - pub fn close(self, reason: &str) { - let str = JsValue::from_str(reason); - let _ = self.inner.cancel_with_reason(&str); // ignore the promise - } -} - -impl Drop for Reader { - fn drop(&mut self) { - let _ = self.inner.cancel(); // ignore the promise - self.inner.release_lock(); - } -} diff --git a/crates/nevy_wasm/src/stream/recv.rs b/crates/nevy_wasm/src/stream/recv.rs index fe5837f..77cddd1 100644 --- a/crates/nevy_wasm/src/stream/recv.rs +++ b/crates/nevy_wasm/src/stream/recv.rs @@ -1,16 +1,23 @@ -use std::cmp; - -use super::reader::Reader; use crate::error::WebError; use bytes::{BufMut, Bytes, BytesMut}; -use js_sys::Uint8Array; -use web_sys::WebTransportReceiveStream; +use js_sys::{Reflect, Uint8Array}; +use std::cmp; +use wasm_bindgen::{JsCast, JsValue}; +use wasm_bindgen_futures::JsFuture; +use web_sys::{ + ReadableStream, ReadableStreamDefaultReader, ReadableStreamReadResult, + WebTransportReceiveStream, +}; pub struct RecvStream { reader: Reader, buffer: BytesMut, } +struct Reader { + inner: ReadableStreamDefaultReader, +} + impl RecvStream { pub fn new(stream: WebTransportReceiveStream) -> Result { if stream.locked() { @@ -51,13 +58,11 @@ impl RecvStream { } let mut data: Bytes = match self.reader.read::().await? { - // TODO can we avoid making a copy here? Some(data) => data.to_vec().into(), None => return Ok(None), }; if data.len() > max { - // The chunk is too big; add the tail to the buffer for next read. self.buffer.extend_from_slice(&data.split_off(max)); } @@ -68,3 +73,33 @@ impl RecvStream { self.reader.close(reason); } } + +impl Reader { + fn new(stream: &ReadableStream) -> Result { + let inner = stream.get_reader().unchecked_into(); + Ok(Self { inner }) + } + + async fn read(&mut self) -> Result, WebError> { + let result: ReadableStreamReadResult = JsFuture::from(self.inner.read()).await?.into(); + + if Reflect::get(&result, &"done".into())?.is_truthy() { + return Ok(None); + } + + let res = Reflect::get(&result, &"value".into())?.dyn_into()?; + Ok(Some(res)) + } + + fn close(self, reason: &str) { + let str = JsValue::from_str(reason); + let _ = self.inner.cancel_with_reason(&str); + } +} + +impl Drop for Reader { + fn drop(&mut self) { + let _ = self.inner.cancel(); + self.inner.release_lock(); + } +} diff --git a/crates/nevy_wasm/src/stream/send.rs b/crates/nevy_wasm/src/stream/send.rs index 5877b22..dffc288 100644 --- a/crates/nevy_wasm/src/stream/send.rs +++ b/crates/nevy_wasm/src/stream/send.rs @@ -1,15 +1,20 @@ use bytes::{Buf, Bytes}; use js_sys::{Reflect, Uint8Array}; -use web_sys::WebTransportSendStream; +use wasm_bindgen::{JsCast, JsValue}; +use wasm_bindgen_futures::JsFuture; +use web_sys::{WebTransportSendStream, WritableStream, WritableStreamDefaultWriter}; use crate::error::WebError; -use crate::stream::writer::Writer; pub struct SendStream { stream: WebTransportSendStream, writer: Writer, } +struct Writer { + inner: WritableStreamDefaultWriter, +} + impl SendStream { pub fn new(stream: WebTransportSendStream) -> Result { let writer = Writer::new(&stream)?; @@ -40,3 +45,27 @@ impl SendStream { .expect("failed to set priority"); } } + +impl Writer { + fn new(stream: &WritableStream) -> Result { + let inner = stream.get_writer()?.unchecked_into(); + Ok(Self { inner }) + } + + async fn write(&mut self, v: &JsValue) -> Result<(), WebError> { + JsFuture::from(self.inner.write_with_chunk(v)).await?; + Ok(()) + } + + fn close(self, reason: &str) { + let str = JsValue::from_str(reason); + let _ = self.inner.abort_with_reason(&str); + } +} + +impl Drop for Writer { + fn drop(&mut self) { + let _ = self.inner.close(); + self.inner.release_lock(); + } +} diff --git a/crates/nevy_wasm/src/stream/writer.rs b/crates/nevy_wasm/src/stream/writer.rs deleted file mode 100644 index 7e00c65..0000000 --- a/crates/nevy_wasm/src/stream/writer.rs +++ /dev/null @@ -1,34 +0,0 @@ -use wasm_bindgen::{JsCast, JsValue}; -use wasm_bindgen_futures::JsFuture; -use web_sys::{WritableStream, WritableStreamDefaultWriter}; - -use crate::error::WebError; - -// Wrapper around WritableStream -pub struct Writer { - inner: WritableStreamDefaultWriter, -} - -impl Writer { - pub fn new(stream: &WritableStream) -> Result { - let inner = stream.get_writer()?.unchecked_into(); - Ok(Self { inner }) - } - - pub async fn write(&mut self, v: &JsValue) -> Result<(), WebError> { - JsFuture::from(self.inner.write_with_chunk(v)).await?; - Ok(()) - } - - pub fn close(self, reason: &str) { - let str = JsValue::from_str(reason); - let _ = self.inner.abort_with_reason(&str); // ignore the promise - } -} - -impl Drop for Writer { - fn drop(&mut self) { - let _ = self.inner.close(); // ignore the promise - self.inner.release_lock(); - } -}