Skip to content

Commit

Permalink
complete connection
Browse files Browse the repository at this point in the history
  • Loading branch information
DrewRidley committed Jun 23, 2024
1 parent 0c74214 commit 995aa5c
Show file tree
Hide file tree
Showing 7 changed files with 223 additions and 406 deletions.
2 changes: 2 additions & 0 deletions crates/nevy_wasm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ wasm-bindgen-futures = "0.4"
js-sys = "0.3.69"
futures-lite = "1.12"
bytes = "1.6.0"
slotmap = "1.0.7"
log = "0.4.21"

[dev-dependencies]
wasm-bindgen-test = "0.3"
323 changes: 147 additions & 176 deletions crates/nevy_wasm/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,209 +1,180 @@
use std::collections::HashMap;

use bytes::Bytes;
use futures_lite::{
future::{block_on, poll_once, race, BoxedLocal},
FutureExt,
};
use js_sys::{Reflect, Uint8Array};
use transport_interface::{ConnectionMut, ConnectionRef};
use wasm_bindgen::JsCast;
use futures_lite::future::{block_on, poll_once, Boxed, BoxedLocal};
use futures_lite::FutureExt;
use log::{debug, info, trace, warn};
use slotmap::{new_key_type, SlotMap};
use std::collections::VecDeque;
use wasm_bindgen::JsValue;
use wasm_bindgen_futures::JsFuture;
use web_sys::{
ReadableStream, ReadableStreamDefaultReader, WebTransport, WebTransportBidirectionalStream,
WebTransportCloseInfo, WebTransportReceiveStream, WebTransportSendStream,
};
use web_sys::WebTransport;

use crate::{
error::WebError,
stream::reader::Reader,
stream::writer::Writer,
stream::{recv::RecvStream, send::SendStream},
stream::{WasmRecvStream, WasmSendStream, WasmStreamId},
};
use crate::error::WebError;
use crate::stream::{RecvStream, SendStream};

pub struct WasmConnectionId(u64);

pub struct WasmConnection {
/// If exists, we are connecting.
inner: web_sys::WebTransport,
ready_future: Option<BoxedLocal<Result<JsValue, JsValue>>>,
accept_stream_future: Option<BoxedLocal<Result<JsValue, JsValue>>>,
/// contains futures for streams that haven't been opened yet
pub(crate) unopened_streams: Vec<(
WasmStreamId,
BoxedLocal<Result<(SendStream, Option<RecvStream>), WebError>>,
)>,

/// Contains all of the completed streams ready to receive data on.
recv_streams: HashMap<WasmStreamId, WasmRecvStream>,
/// Contaisn all of the completed streams ready to write to.
send_streams: HashMap<WasmStreamId, WasmSendStream>,
new_key_type! {
/// A ephemeral identifier used to track a particular local stream.
///
/// A single [StreamKey] may be associated with both a [SendStream] and [RecvStream]
/// if the stream is of the bidirectional type.
pub struct StreamKey;
}

impl WasmConnection {
pub(crate) fn new(url: &str) -> Self {
let transport = web_sys::WebTransport::new(url).unwrap();
let ready_future = Some(JsFuture::from(transport.ready()).boxed_local());

let incoming_bidir = transport.incoming_bidirectional_streams();
let incoming_unidir = transport.incoming_unidirectional_streams();
/// The type of stream.
///
/// Methods to get a send or recv stream may return [None] if a stream is not of the requested direction.
pub(crate) enum StreamType {
Send(SendStream),
Recv(RecvStream),
Bidirectional(SendStream, RecvStream),
}

let accept_stream_future = async move {
let bidir: ReadableStreamDefaultReader = incoming_bidir.get_reader().unchecked_into();
let unidir: ReadableStreamDefaultReader = incoming_unidir.get_reader().unchecked_into();
race(JsFuture::from(bidir.read()), JsFuture::from(unidir.read())).await
}
.boxed_local();
/// The current state of the stream.
///
/// A stream may be currently Opening, in which case the runtime must poll the future.
/// Otherwise, the stream is ready and it's writer/reader can be used.
pub(crate) enum StreamState {
Opening(Boxed<Result<StreamType, WebError>>),
Ready(StreamType),
Closed,
}

Self {
inner: transport,
ready_future,
accept_stream_future: Some(accept_stream_future),
unopened_streams: vec![],
recv_streams: HashMap::new(),
send_streams: HashMap::new(),
}
}
/// A wrapper containing the state of the stream.
pub struct Stream {
state: StreamState,
}

async fn accept_uni(transport: &WebTransport) -> Result<RecvStream, WebError> {
let mut reader = Reader::new(&transport.incoming_unidirectional_streams())?;
let stream: WebTransportReceiveStream = reader.read().await?.expect("closed without error");
let recv = RecvStream::new(stream)?;
Ok(recv)
}
pub struct WasmConnection {
transport: WebTransport,
ready_future: Option<BoxedLocal<Result<JsValue, JsValue>>>,

async fn accept_bi(transport: &WebTransport) -> Result<(SendStream, RecvStream), WebError> {
let mut reader = Reader::new(&transport.incoming_bidirectional_streams())?;
let stream: WebTransportBidirectionalStream =
reader.read().await?.expect("closed without error");
streams: SlotMap<StreamKey, Stream>,
pending_streams: Vec<StreamKey>,
pending_events: VecDeque<StreamEvent>,
}

let send = SendStream::new(stream.writable())?;
let recv = RecvStream::new(stream.readable())?;
/// A event associated with a particular [StreamKey].
///
pub enum StreamEvent {
Opened(StreamKey),
Closed(StreamKey),
Error(StreamKey, WebError),
}

Ok((send, recv))
}
impl WasmConnection {
pub fn new(transport: WebTransport) -> Self {
let promise = JsFuture::from(transport.ready());
let future = Some(async move { promise.await }.boxed_local());

pub async fn send_datagram(
transport: &mut WebTransport,
payload: Bytes,
) -> Result<(), WebError> {
let mut writer = Writer::new(&transport.datagrams().writable())?;
writer.write(&Uint8Array::from(payload.as_ref())).await?;
Ok(())
Self {
transport,
ready_future: future,
streams: SlotMap::with_key(),
pending_events: VecDeque::new(),
pending_streams: vec![],
}
}

pub async fn recv_datagram(transport: &mut WebTransport) -> Result<Bytes, WebError> {
let mut reader = Reader::new(&transport.datagrams().readable())?;
let data: Uint8Array = reader.read().await?.unwrap_or_default();
Ok(data.to_vec().into())
/// Opens a stream.
///
/// The opened stream is facilitated through manual polling of the inserted future.
/// The returned [StreamKey] may be unavailable for reading/writing for a few ticks.
/// User applications should not expect immediate usage of the stream.
pub fn open_stream(&mut self, is_bidirectional: bool) -> StreamKey {
let future = self.create_stream_future(is_bidirectional);
let stream = Stream {
state: StreamState::Opening(future),
};
self.streams.insert(stream)
}

pub fn close(self, code: u32, reason: &str) {
let mut info = WebTransportCloseInfo::new();
info.close_code(code);
info.reason(reason);
self.inner.close_with_close_info(&info);
fn create_stream_future(&self, is_bidirectional: bool) -> Boxed<Result<StreamType, WebError>> {
async move { Err(WebError::from(JsValue::null())) }.boxed()
}

fn update(&mut self) {
if let Some(r_future) = self.ready_future.as_mut() {
match block_on(poll_once(r_future)) {
Some(Ok(_)) => {
self.ready_future = None;
}
Some(Err(_)) => {
self.ready_future = None;
// Handle error if needed
}
None => return,
}
/// Internally polls for the latest stream events.
pub(crate) fn poll_events(&mut self) -> Option<StreamEvent> {
if let Some(event) = self.pending_events.pop_front() {
return Some(event);
}
None
}

if let Some(a_future) = self.accept_stream_future.as_mut() {
match block_on(poll_once(a_future)) {
Some(Ok(_)) => {
let incoming_bidir = self.inner.incoming_bidirectional_streams();
let incoming_unidir = self.inner.incoming_unidirectional_streams();

let new_accept_stream_future = async move {
let bidir: ReadableStreamDefaultReader =
incoming_bidir.get_reader().unchecked_into();
let unidir: ReadableStreamDefaultReader =
incoming_unidir.get_reader().unchecked_into();
race(JsFuture::from(bidir.read()), JsFuture::from(unidir.read())).await
}
.boxed_local();

self.accept_stream_future = Some(new_accept_stream_future);
}
Some(Err(_)) => {
self.accept_stream_future = None;
// Handle error if needed
}
None => {}
}
/// Tries to progress the internal futures used to open or accept streams.
///
/// Uses the internal futures directly to accomplish this.
pub(crate) fn update(&mut self) {
// If we are not ready, let's try to progress the ready state and then return.
if let Some(not_ready) = &mut self.ready_future {
trace!("Progressing the 'ready' promise.");
if let Some(result) = block_on(poll_once(not_ready)) {
debug!("'ready()' promise was resolved to: {:?}", result);
self.ready_future = None;
};
return;
}

self.unopened_streams.retain_mut(|(stream_id, future)| {
match block_on(poll_once(future)) {
Some(Ok((send, recv))) => {
self.send_streams
.insert(*stream_id, WasmSendStream::new(send));
if let Some(recv) = recv {
self.recv_streams
.insert(*stream_id, WasmRecvStream::new(recv));
}
false
}
Some(Err(_)) => {
// Handle error if needed
false
}
None => true,
//Progress stream opening.
self.pending_streams.retain(|key| {
if let Some(stream) = self.streams.get_mut(*key) {

//If we are currently opening, lets progress the future.
//Otherwise, we can just remove the entry from the list.
let StreamState::Opening(open) = &mut stream.state else {
trace!("A pending stream was already open! Removing it from 'pending_streams'.");
return false;
};

let Some(poll_result) = block_on(poll_once(open)) else {
//We need to retain the future since its not finished progressing.
trace!("Stream future is not resolved. Continuing.");
return true;
};

debug!("Stream was ready. Progressing state enum and removing entry.");
stream.state = StreamState::Ready(poll_result.unwrap());
false
} else {
// Stream doesn't exist, remove it from pending_streams
debug!("A stream was marked as pending but did not exist in the streams map. Removing from pending streams.");
false
}
});
}

fn get_stats(&self) -> () {
()
}

fn disconnect(&mut self) {
self.inner.close();
// Clean up resources
self.ready_future = None;
self.accept_stream_future = None;
self.unopened_streams.clear();
self.recv_streams.clear();
self.send_streams.clear();
}
}

impl<'c> ConnectionRef<'c> for &'c WasmConnection {
type ConnectionStats = ();

fn get_stats(&self) -> Self::ConnectionStats {
// Implement this method to return connection statistics
()
/// Fetches a mutable [SendStream] identified by the specified [StreamKey].
///
/// Will return [None] if the stream does not exist, or is a unidirectional receive stream.
pub fn send_stream(&mut self, key: StreamKey) -> Option<&mut SendStream> {
if let Some(stream) = self.streams.get_mut(key) {
match &mut stream.state {
StreamState::Ready(StreamType::Send(send)) => Some(send),
StreamState::Ready(StreamType::Bidirectional(send, _)) => Some(send),
_ => None,
}
} else {
None
}
}
}

impl<'c> ConnectionMut<'c> for &'c mut WasmConnection {
type NonMut<'b> = &'b WasmConnection where 'c: 'b;
type StreamType = WasmStreamId;

fn as_ref<'b>(&'b self) -> Self::NonMut<'b> {
self
/// Fetches a mutable [RecvStream] identified by the specified [StreamKey].
///
/// Will return [None] if the stream does not exist, or is a unidirectional send stream.
pub fn recv_stream(&mut self, key: StreamKey) -> Option<&mut RecvStream> {
if let Some(stream) = self.streams.get_mut(key) {
match &mut stream.state {
StreamState::Ready(StreamType::Recv(recv)) => Some(recv),
StreamState::Ready(StreamType::Bidirectional(_, recv)) => Some(recv),
_ => None,
}
} else {
None
}
}

fn disconnect(&mut self) {
self.inner.close();
self.ready_future = None;
self.accept_stream_future = None;
self.unopened_streams.clear();
self.recv_streams.clear();
self.send_streams.clear();
/// Close the stream associated with the specified key.
pub fn close_stream(&mut self, key: StreamKey) {
if let Some(stream) = self.streams.get_mut(key) {
stream.state = StreamState::Closed;
self.pending_events.push_back(StreamEvent::Closed(key));
}
}
}
Loading

0 comments on commit 995aa5c

Please sign in to comment.