From 84f2013ef6a1456966c889eb5339011c28436f25 Mon Sep 17 00:00:00 2001 From: Drew Ridley Date: Sat, 22 Jun 2024 11:37:46 -0400 Subject: [PATCH] wasm scaffolding --- crates/nevy_wasm/src/connection.rs | 162 +++++++++++--------- crates/nevy_wasm/src/endpoint.rs | 103 ++++++++++--- crates/nevy_wasm/src/lib.rs | 1 - crates/nevy_wasm/src/reader.rs | 68 --------- crates/nevy_wasm/src/stream.rs | 229 ++++++++++++++++++++--------- 5 files changed, 333 insertions(+), 230 deletions(-) delete mode 100644 crates/nevy_wasm/src/reader.rs diff --git a/crates/nevy_wasm/src/connection.rs b/crates/nevy_wasm/src/connection.rs index 1f73ea7..e401ee6 100644 --- a/crates/nevy_wasm/src/connection.rs +++ b/crates/nevy_wasm/src/connection.rs @@ -1,100 +1,120 @@ -use std::{ - collections::{HashSet, VecDeque}, - default, - future::Future, - pin::Pin, -}; - -use futures_lite::stream::race; -use slotmap::new_key_type; -use transport_interface::{ConnectionMut, ConnectionRef, StreamEvent}; -use web_sys::{ - wasm_bindgen::{closure::Closure, JsCast, JsValue}, - WebTransportBidirectionalStream, WebTransportReceiveStream, WebTransportSendStream, -}; -use web_transport_wasm::Session; - -use crate::{ - reader::{Reader, WebError}, - stream::WasmStreamId, -}; - -new_key_type! { - pub struct WasmConnectionId; -} +use std::collections::{HashSet, VecDeque}; +use std::sync::atomic::{AtomicU64, Ordering}; +use wasm_bindgen::JsValue; +use wasm_bindgen_futures::JsFuture; +use web_sys::WebTransport; -struct ConnectedWasmSession { - /// The underlying wasm session used to establish new streams, read, or write data. - session: web_sys::WebTransport, - /// A future that accepts either unidirectional or bidirectional streams from the peer. - /// This future will always exist in the [WasmSession::Connected] state and must be polled accordingly. - accept_future: Box>, - - /// A future used to populate the internal recv buffers from the async methods. - recv_future: Box>, - /// A future used to progress all outstanding writes. - send_future: Box>, -} +use transport_interface::*; + +use crate::stream::WebTransportStreamId; -/// The wasm session state. -enum WasmSession { - /// The session is disconnected and awaiting a new connection attempt. - Disconnected, - /// The session is currently connecting with the specified future that must be polled to progress the connection. - Connecting(Box>>), - /// The session is currently connected. - Connected(ConnectedWasmSession), +pub struct WebTransportConnectionId(u64); + +impl WebTransportConnectionId { + pub(crate) fn new() -> Self { + static CONNECTION_ID_COUNTER: AtomicU64 = AtomicU64::new(1); + WebTransportConnectionId(CONNECTION_ID_COUNTER.fetch_add(1, Ordering::SeqCst)) + } } -pub struct WasmConnection { - /// The session. May or may not contain a valid session depending on the state. - session: WasmSession, - /// A collection of events associated with this connection that can be read by the manager process. - /// This is disconnected from the session state because stream events (such as disconnected) can still be read - /// even if the connection is no longer valid. - stream_events: VecDeque>, +pub struct WebTransportConnection { + pub(crate) connection: WebTransport, + pub(crate) connection_id: WebTransportConnectionId, + pub(crate) stream_events: VecDeque>, + pub(crate) open_send_streams: HashSet, + pub(crate) open_recv_streams: HashSet, } -impl WasmConnection { - pub(crate) fn new() -> Self { - WasmConnection { - session: WasmSession::Disconnected, +impl WebTransportConnection { + pub(crate) fn new(connection: WebTransport, connection_id: WebTransportConnectionId) -> Self { + WebTransportConnection { + connection, + connection_id, stream_events: VecDeque::new(), + open_send_streams: HashSet::new(), + open_recv_streams: HashSet::new(), } } - async fn accept_uni(&mut self) {} + pub async fn poll_timeouts(&self) { + // No explicit timeout handling required for WebTransport + } - async fn accept_bi(&mut self) -> Result { - if let WasmSession::Connected(session) = self.session { - let transport = session.session; + pub async fn poll_events(&self, handler: &mut impl EndpointEventHandler) { + // Handle WebTransport events like readiness, closures, etc. - let mut reader = Reader::new(&transport.incoming_bidirectional_streams())?; - let stream: WebTransportBidirectionalStream = - reader.read().await?.expect("Closed without error"); + // Example for handling readiness: + let ready_promise = self.connection.ready(); + if let Ok(_) = JsFuture::from(ready_promise).await { + handler.connected(self.connection_id); + } - Ok(stream) + // Example for handling closed: + let closed_promise = self.connection.closed(); + if let Ok(_) = JsFuture::from(closed_promise).await { + handler.disconnected(self.connection_id); } } + + pub async fn accept_streams(&self) { + //.incoming_bidirectional_streams returns a ReadableStream + let incoming_bidi_streams = self.connection.incoming_bidirectional_streams(); + let incoming_uni_streams = self.connection.incoming_unidirectional_streams(); + + let reader = incoming_bidi_streams + .get_reader() + .dyn_into::() + .unwrap(); + while let Ok(stream) = JsFuture::from(reader.read()).await { + let new_stream = stream.dyn_into::().unwrap(); + let stream_id = WebTransportStreamId::new(); + self.open_recv_streams.insert(stream_id); + self.stream_events.push_back(StreamEvent { + stream_id, + peer_generated: true, + event_type: StreamEventType::NewRecvStream, + }); + } + + let reader_uni = incoming_uni_streams + .get_reader() + .dyn_into::() + .unwrap(); + while let Ok(stream) = JsFuture::from(reader_uni.read()).await { + let new_stream = stream.dyn_into::().unwrap(); + let stream_id = WebTransportStreamId::new(); + self.open_recv_streams.insert(stream_id); + self.stream_events.push_back(StreamEvent { + stream_id, + peer_generated: true, + event_type: StreamEventType::NewRecvStream, + }); + } + } + + pub fn side(&self) -> web_sys::WebTransport { + self.connection.clone() + } } -impl<'c> ConnectionMut<'c> for &'c mut WasmConnection { - type NonMut<'b> = &'b WasmConnection where Self: 'b; - type StreamType = WasmStreamId; +impl<'c> ConnectionMut<'c> for &'c mut WebTransportConnection { + type NonMut<'b> = &'b WebTransportConnection where Self: 'b; + + type StreamType = WebTransportStreamId; fn as_ref<'b>(&'b self) -> Self::NonMut<'b> { self } fn disconnect(&mut self) { - todo!(); + self.connection.close(); } } -impl<'c> ConnectionRef<'c> for &'c WasmConnection { - type ConnectionStats = (); +impl<'c> ConnectionRef<'c> for &'c WebTransportConnection { + type ConnectionStats = WebTransportConnectionId; - fn get_stats(&self) -> Self::ConnectionStats { - todo!() + fn get_stats(&self) -> WebTransportConnectionId { + self.connection_id } } diff --git a/crates/nevy_wasm/src/endpoint.rs b/crates/nevy_wasm/src/endpoint.rs index 1cdfbd2..04ed582 100644 --- a/crates/nevy_wasm/src/endpoint.rs +++ b/crates/nevy_wasm/src/endpoint.rs @@ -1,40 +1,105 @@ -use slotmap::SlotMap; -use transport_interface::Endpoint; +use crate::connection::*; +use std::collections::HashMap; +use std::net::SocketAddr; +use transport_interface::*; +use wasm_bindgen::JsValue; +use wasm_bindgen_futures::JsFuture; +use web_sys::WebTransport; -use crate::connection::{WasmConnection, WasmConnectionId}; - -pub struct WasmEndpoint { - connections: SlotMap, +/// A transport endpoint facilitated using WebTransport. +/// +/// Uses async methods and should be polled manually within update. +pub struct WebTransportEndpoint { + endpoint: WebTransport, + local_addr: SocketAddr, + connections: HashMap, } -impl Endpoint for WasmEndpoint { - type Connection<'c> = &'c mut WasmConnection; +impl WebTransportEndpoint { + /// Creates a new endpoint, facilitated through WebTransport. + /// + /// Requires a bind_addr (consider '0.0.0.0:0' for clients). + pub fn new(bind_addr: SocketAddr, url: &str) -> Result { + let endpoint = WebTransport::new(url)?; - type ConnectionId = WasmConnectionId; + Ok(Self { + endpoint, + connections: HashMap::new(), + local_addr: bind_addr, + }) + } - type ConnectDescription = String; + async fn handle_ready(&self) { + let ready_promise = self.endpoint.ready(); + JsFuture::from(ready_promise).await.unwrap(); + } + + async fn handle_closed(&self) { + let closed_promise = self.endpoint.closed(); + JsFuture::from(closed_promise).await.unwrap(); + } + + async fn process_event(&mut self, handler: &mut impl EndpointEventHandler) { + // Example of handling ready and closed events + self.handle_ready().await; + self.handle_closed().await; + } - type IncomingConnectionInfo<'i> = (); + async fn update_connections(&mut self, handler: &mut impl EndpointEventHandler) { + for connection in self.connections.values_mut() { + connection.poll_timeouts().await; + connection.poll_events(handler).await; + connection.accept_streams().await; + } + } +} + +impl Endpoint for WebTransportEndpoint { + type Connection<'a> = &'a mut WebTransportConnection; + type ConnectionId = WebTransportConnectionId; + type ConnectDescription = String; + type IncomingConnectionInfo<'a> = String; - fn update(&mut self, handler: &mut impl transport_interface::EndpointEventHandler) { - todo!() + fn update(&mut self, handler: &mut impl EndpointEventHandler) { + wasm_bindgen_futures::spawn_local(async move { + self.process_event(handler).await; + self.update_connections(handler).await; + }); } fn connection<'c>( &'c self, id: Self::ConnectionId, - ) -> Option< as transport_interface::ConnectionMut>::NonMut<'c>> { - todo!() + ) -> Option< as ConnectionMut>::NonMut<'c>> { + self.connections.get(&id) } - fn connection_mut<'a>(&'a mut self, id: Self::ConnectionId) -> Option> { - todo!() + fn connection_mut<'c>(&'c mut self, id: Self::ConnectionId) -> Option> { + self.connections.get_mut(&id) } fn connect<'c>( &'c mut self, - info: Self::ConnectDescription, + description: Self::ConnectDescription, ) -> Option<(Self::ConnectionId, Self::Connection<'c>)> { - todo!() + let connection = WebTransport::new(&description).ok()?; + let connection_id = WebTransportConnectionId::new(); + + let connection = WebTransportConnection::new(connection, connection_id); + assert!( + self.connections.insert(connection_id, connection).is_none(), + "Connection handle should not be a duplicate" + ); + + Some((connection_id, &mut self.connections[&connection_id])) + } + + fn disconnect(&mut self, id: Self::ConnectionId) -> Result<(), ()> { + if let Some(mut connection) = self.connection_mut(id) { + connection.disconnect(); + Ok(()) + } else { + Err(()) + } } } diff --git a/crates/nevy_wasm/src/lib.rs b/crates/nevy_wasm/src/lib.rs index b44a880..b9f15af 100644 --- a/crates/nevy_wasm/src/lib.rs +++ b/crates/nevy_wasm/src/lib.rs @@ -1,4 +1,3 @@ mod connection; mod endpoint; -mod reader; mod stream; diff --git a/crates/nevy_wasm/src/reader.rs b/crates/nevy_wasm/src/reader.rs deleted file mode 100644 index 9d23438..0000000 --- a/crates/nevy_wasm/src/reader.rs +++ /dev/null @@ -1,68 +0,0 @@ -use js_sys::Reflect; -use wasm_bindgen::{JsCast, JsValue}; -use wasm_bindgen_futures::JsFuture; -use web_sys::{ReadableStream, ReadableStreamDefaultReader, ReadableStreamReadResult}; - -#[derive(Debug)] -pub struct WebError { - value: JsValue, -} - -impl From for WebError { - fn from(value: JsValue) -> Self { - Self { value } - } -} - -impl std::fmt::Display for WebError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - // Print out the JsValue as a string - match self.value.as_string() { - Some(s) => write!(f, "{}", s), - None => write!(f, "{:?}", self.value), - } - } -} - -impl From<&str> for WebError { - fn from(value: &str) -> Self { - Self { - value: value.into(), - } - } -} - -// Wrapper around ReadableStream -pub struct Reader { - inner: ReadableStreamDefaultReader, -} - -impl Reader { - pub fn new(stream: &ReadableStream) -> Result { - let inner = stream.get_reader().unchecked_into(); - Ok(Self { inner }) - } - - pub async fn read(&mut self) -> Result, WebError> { - let result: ReadableStreamReadResult = JsFuture::from(self.inner.read()).await?.into(); - - if Reflect::get(&result, &"done".into())?.is_truthy() { - return Ok(None); - } - - let res = Reflect::get(&result, &"value".into())?.dyn_into()?; - Ok(Some(res)) - } - - pub fn close(self, reason: &str) { - let str = JsValue::from_str(reason); - let _ = self.inner.cancel_with_reason(&str); // ignore the promise - } -} - -impl Drop for Reader { - fn drop(&mut self) { - let _ = self.inner.cancel(); // ignore the promise - self.inner.release_lock(); - } -} diff --git a/crates/nevy_wasm/src/stream.rs b/crates/nevy_wasm/src/stream.rs index 554e161..e630eb5 100644 --- a/crates/nevy_wasm/src/stream.rs +++ b/crates/nevy_wasm/src/stream.rs @@ -1,112 +1,199 @@ -use transport_interface::{ErrorFatality, RecvStreamMut, SendStreamMut, StreamId}; +use std::collections::{HashSet, VecDeque}; +use transport_interface::*; +use wasm_bindgen::JsValue; +use wasm_bindgen_futures::JsFuture; +use web_sys::{ + ReadableStream, ReadableStreamDefaultReader, WebTransportSendStream, + WritableStreamDefaultWriter, +}; -use crate::connection::WasmConnection; +use crate::connection::WebTransportConnection; -/// stream id for a quinn stream #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] -pub struct WasmStreamId(pub(crate) u64); +pub struct WebTransportStreamId(u64); -impl StreamId for WasmStreamId { - type Connection<'c> = &'c mut WasmConnection; - type SendMut<'s> = WasmSendStreamMut; - type RecvMut<'s> = WasmRecvStream; - - // Since WASM can only initiate, theres only a single direction here. - type OpenDescription = (); - - fn open<'c>( - connection: &mut Self::Connection<'c>, - description: Self::OpenDescription, - ) -> Option - where - Self: Sized, - { - todo!(); - } - - fn get_send<'c, 's>( - self, - connection: &'s mut Self::Connection<'c>, - ) -> Option> { - todo!() +impl WebTransportStreamId { + pub(crate) fn new() -> Self { + static STREAM_ID_COUNTER: std::sync::atomic::AtomicU64 = + std::sync::atomic::AtomicU64::new(1); + WebTransportStreamId(STREAM_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst)) } +} - fn get_recv<'c, 's>( - self, - connection: &'s mut Self::Connection<'c>, - ) -> Option> { - todo!() - } +pub struct WebTransportSendStreamMut<'s> { + stream: WebTransportSendStream, + stream_id: WebTransportStreamId, + open_streams: &'s mut HashSet, +} - fn poll_events<'c>( - connection: &mut Self::Connection<'c>, - ) -> Option> - where - Self: Sized, - { - todo!() - } +pub struct WebTransportRecvStreamMut<'s> { + events: &'s mut VecDeque>, + stream_id: WebTransportStreamId, + stream: ReadableStream, + open_streams: &'s mut HashSet, } -pub struct WasmSendStreamMut(web_sys::WebTransportSendStream); +#[derive(Debug)] +pub enum WebTransportSendError { + Blocked, + NoStream, +} -#[derive(PartialEq, Eq)] -pub enum WasmSendError { +#[derive(Debug)] +pub enum WebTransportReadError { Blocked, + Finished, NoStream, } -impl ErrorFatality for WasmSendError { +impl ErrorFatality for WebTransportSendError { fn is_fatal(&self) -> bool { - *self == WasmSendError::Blocked + match self { + WebTransportSendError::Blocked => false, + WebTransportSendError::NoStream => true, + } } } -impl<'s> SendStreamMut<'s> for WasmSendStreamMut { - type SendError = WasmSendError; - type CloseDescription = String; +impl ErrorFatality for WebTransportReadError { + fn is_fatal(&self) -> bool { + match self { + WebTransportReadError::Blocked => false, + WebTransportReadError::Finished => false, + WebTransportReadError::NoStream => true, + } + } +} + +impl<'s> SendStreamMut<'s> for WebTransportSendStreamMut<'s> { + type SendError = WebTransportSendError; + + type CloseDescription = JsValue; fn send(&mut self, data: &[u8]) -> Result { - todo!() + let writer = WritableStreamDefaultWriter::new(&self.stream).unwrap(); + let chunk = JsValue::from_serde(data).unwrap(); + wasm_bindgen_futures::spawn_local(async move { + JsFuture::from(writer.write_with_chunk(&chunk)) + .await + .expect("Error writing to stream"); + }); + + Ok(data.len()) } fn close(&mut self, description: Self::CloseDescription) -> Result<(), ()> { - todo!() + wasm_bindgen_futures::spawn_local(async move { + JsFuture::from(self.stream.close_with_reason(description)) + .await + .expect("Error closing stream"); + }); + + self.open_streams.remove(&self.stream_id); + Ok(()) } fn is_open(&self) -> bool { - todo!() + self.open_streams.contains(&self.stream_id) } } -pub struct WasmRecvStream(web_sys::WebTransportReceiveStream); +impl<'s> RecvStreamMut<'s> for WebTransportRecvStreamMut<'s> { + type ReadError = WebTransportReadError; -#[derive(PartialEq, Eq)] -pub enum WasmReceiveError { - Blocked, - Finished, - NoStream, -} + type CloseDescription = JsValue; -impl ErrorFatality for WasmReceiveError { - fn is_fatal(&self) -> bool { - *self == WasmReceiveError::NoStream + fn recv(&mut self, limit: usize) -> Result, Self::ReadError> { + let reader = ReadableStreamDefaultReader::new(&self.stream).unwrap(); + wasm_bindgen_futures::spawn_local(async move { + match JsFuture::from(reader.read()).await { + Ok(chunk) => { + let array = js_sys::Uint8Array::new(&chunk); + if array.length() == 0 { + self.open_streams.remove(&self.stream_id); + self.events.push_back(StreamEvent { + stream_id: self.stream_id, + peer_generated: true, + event_type: StreamEventType::ClosedRecvStream, + }); + Err(WebTransportReadError::Finished) + } else { + Ok(array.to_vec().into_boxed_slice()) + } + } + Err(_) => Err(WebTransportReadError::Blocked), + } + }); + + Err(WebTransportReadError::Blocked) + } + + fn close(&mut self, description: Self::CloseDescription) -> Result<(), ()> { + wasm_bindgen_futures::spawn_local(async move { + JsFuture::from(self.stream.close_with_reason(description)) + .await + .expect("Error closing stream"); + }); + + self.open_streams.remove(&self.stream_id); + Ok(()) + } + + fn is_open(&self) -> bool { + self.open_streams.contains(&self.stream_id) } } -impl<'s> RecvStreamMut<'s> for WasmRecvStream { - type ReadError = WasmReceiveError; - type CloseDescription = String; +impl StreamId for WebTransportStreamId { + type Connection<'c> = &'c mut WebTransportConnection; - fn recv(&mut self, limit: usize) -> Result, Self::ReadError> { - todo!() + type SendMut<'s> = WebTransportSendStreamMut<'s>; + + type RecvMut<'s> = WebTransportRecvStreamMut<'s>; + + type OpenDescription = (); + + fn open<'c>(connection: &mut Self::Connection<'c>, _: Self::OpenDescription) -> Option { + let send_stream = connection.connection.create_bidirectional_stream().unwrap(); + let stream_id = WebTransportStreamId::new(); + connection.open_send_streams.insert(stream_id); + connection.open_recv_streams.insert(stream_id); + + Some(stream_id) } - fn close(&mut self, description: Self::CloseDescription) -> Result<(), ()> { - todo!() + fn get_send<'c, 's>( + self, + connection: &'s mut Self::Connection<'c>, + ) -> Option> { + if !connection.open_send_streams.contains(&self) { + return None; + } + let send_stream = connection.connection.create_bidirectional_stream().unwrap(); + Some(WebTransportSendStreamMut { + stream: send_stream, + stream_id: self, + open_streams: &mut connection.open_send_streams, + }) } - fn is_open(&self) -> bool { - todo!() + fn get_recv<'c, 's>( + self, + connection: &'s mut Self::Connection<'c>, + ) -> Option> { + if !connection.open_recv_streams.contains(&self) { + return None; + } + let recv_stream = connection.connection.create_bidirectional_stream().unwrap(); + Some(WebTransportRecvStreamMut { + events: &mut connection.stream_events, + stream_id: self, + stream: recv_stream, + open_streams: &mut connection.open_recv_streams, + }) + } + + fn poll_events<'c>(connection: &mut Self::Connection<'c>) -> Option> { + connection.stream_events.pop_front() } }