Skip to content

Commit

Permalink
wasm scaffolding
Browse files Browse the repository at this point in the history
  • Loading branch information
DrewRidley committed Jun 22, 2024
1 parent fa54262 commit 84f2013
Show file tree
Hide file tree
Showing 5 changed files with 333 additions and 230 deletions.
162 changes: 91 additions & 71 deletions crates/nevy_wasm/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,100 +1,120 @@
use std::{
collections::{HashSet, VecDeque},
default,
future::Future,
pin::Pin,
};

use futures_lite::stream::race;
use slotmap::new_key_type;
use transport_interface::{ConnectionMut, ConnectionRef, StreamEvent};
use web_sys::{
wasm_bindgen::{closure::Closure, JsCast, JsValue},
WebTransportBidirectionalStream, WebTransportReceiveStream, WebTransportSendStream,
};
use web_transport_wasm::Session;

use crate::{
reader::{Reader, WebError},
stream::WasmStreamId,
};

new_key_type! {
pub struct WasmConnectionId;
}
use std::collections::{HashSet, VecDeque};
use std::sync::atomic::{AtomicU64, Ordering};
use wasm_bindgen::JsValue;
use wasm_bindgen_futures::JsFuture;
use web_sys::WebTransport;

struct ConnectedWasmSession {
/// The underlying wasm session used to establish new streams, read, or write data.
session: web_sys::WebTransport,
/// A future that accepts either unidirectional or bidirectional streams from the peer.
/// This future will always exist in the [WasmSession::Connected] state and must be polled accordingly.
accept_future: Box<dyn Future<Output = ()>>,

/// A future used to populate the internal recv buffers from the async methods.
recv_future: Box<dyn Future<Output = ()>>,
/// A future used to progress all outstanding writes.
send_future: Box<dyn Future<Output = ()>>,
}
use transport_interface::*;

use crate::stream::WebTransportStreamId;

/// The wasm session state.
enum WasmSession {
/// The session is disconnected and awaiting a new connection attempt.
Disconnected,
/// The session is currently connecting with the specified future that must be polled to progress the connection.
Connecting(Box<dyn Future<Output = Result<Session, WebError>>>),
/// The session is currently connected.
Connected(ConnectedWasmSession),
pub struct WebTransportConnectionId(u64);

impl WebTransportConnectionId {
pub(crate) fn new() -> Self {
static CONNECTION_ID_COUNTER: AtomicU64 = AtomicU64::new(1);
WebTransportConnectionId(CONNECTION_ID_COUNTER.fetch_add(1, Ordering::SeqCst))
}
}

pub struct WasmConnection {
/// The session. May or may not contain a valid session depending on the state.
session: WasmSession,
/// A collection of events associated with this connection that can be read by the manager process.
/// This is disconnected from the session state because stream events (such as disconnected) can still be read
/// even if the connection is no longer valid.
stream_events: VecDeque<StreamEvent<WasmConnectionId>>,
pub struct WebTransportConnection {
pub(crate) connection: WebTransport,
pub(crate) connection_id: WebTransportConnectionId,
pub(crate) stream_events: VecDeque<StreamEvent<WebTransportStreamId>>,
pub(crate) open_send_streams: HashSet<WebTransportStreamId>,
pub(crate) open_recv_streams: HashSet<WebTransportStreamId>,
}

impl WasmConnection {
pub(crate) fn new() -> Self {
WasmConnection {
session: WasmSession::Disconnected,
impl WebTransportConnection {
pub(crate) fn new(connection: WebTransport, connection_id: WebTransportConnectionId) -> Self {
WebTransportConnection {
connection,
connection_id,
stream_events: VecDeque::new(),
open_send_streams: HashSet::new(),
open_recv_streams: HashSet::new(),
}
}

async fn accept_uni(&mut self) {}
pub async fn poll_timeouts(&self) {
// No explicit timeout handling required for WebTransport
}

async fn accept_bi(&mut self) -> Result<WebTransportBidirectionalStream, WebError> {
if let WasmSession::Connected(session) = self.session {
let transport = session.session;
pub async fn poll_events(&self, handler: &mut impl EndpointEventHandler<WebTransportEndpoint>) {
// Handle WebTransport events like readiness, closures, etc.

let mut reader = Reader::new(&transport.incoming_bidirectional_streams())?;
let stream: WebTransportBidirectionalStream =
reader.read().await?.expect("Closed without error");
// Example for handling readiness:
let ready_promise = self.connection.ready();
if let Ok(_) = JsFuture::from(ready_promise).await {
handler.connected(self.connection_id);
}

Ok(stream)
// Example for handling closed:
let closed_promise = self.connection.closed();
if let Ok(_) = JsFuture::from(closed_promise).await {
handler.disconnected(self.connection_id);
}
}

pub async fn accept_streams(&self) {
//.incoming_bidirectional_streams returns a ReadableStream
let incoming_bidi_streams = self.connection.incoming_bidirectional_streams();
let incoming_uni_streams = self.connection.incoming_unidirectional_streams();

let reader = incoming_bidi_streams
.get_reader()
.dyn_into::<web_sys::ReadableStreamDefaultReader>()
.unwrap();
while let Ok(stream) = JsFuture::from(reader.read()).await {
let new_stream = stream.dyn_into::<web_sys::ReadableStream>().unwrap();
let stream_id = WebTransportStreamId::new();
self.open_recv_streams.insert(stream_id);
self.stream_events.push_back(StreamEvent {
stream_id,
peer_generated: true,
event_type: StreamEventType::NewRecvStream,
});
}

let reader_uni = incoming_uni_streams
.get_reader()
.dyn_into::<web_sys::ReadableStreamDefaultReader>()
.unwrap();
while let Ok(stream) = JsFuture::from(reader_uni.read()).await {
let new_stream = stream.dyn_into::<web_sys::ReadableStream>().unwrap();
let stream_id = WebTransportStreamId::new();
self.open_recv_streams.insert(stream_id);
self.stream_events.push_back(StreamEvent {
stream_id,
peer_generated: true,
event_type: StreamEventType::NewRecvStream,
});
}
}

pub fn side(&self) -> web_sys::WebTransport {
self.connection.clone()
}
}

impl<'c> ConnectionMut<'c> for &'c mut WasmConnection {
type NonMut<'b> = &'b WasmConnection where Self: 'b;
type StreamType = WasmStreamId;
impl<'c> ConnectionMut<'c> for &'c mut WebTransportConnection {
type NonMut<'b> = &'b WebTransportConnection where Self: 'b;

type StreamType = WebTransportStreamId;

fn as_ref<'b>(&'b self) -> Self::NonMut<'b> {
self
}

fn disconnect(&mut self) {
todo!();
self.connection.close();
}
}

impl<'c> ConnectionRef<'c> for &'c WasmConnection {
type ConnectionStats = ();
impl<'c> ConnectionRef<'c> for &'c WebTransportConnection {
type ConnectionStats = WebTransportConnectionId;

fn get_stats(&self) -> Self::ConnectionStats {
todo!()
fn get_stats(&self) -> WebTransportConnectionId {
self.connection_id
}
}
103 changes: 84 additions & 19 deletions crates/nevy_wasm/src/endpoint.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,105 @@
use slotmap::SlotMap;
use transport_interface::Endpoint;
use crate::connection::*;
use std::collections::HashMap;
use std::net::SocketAddr;
use transport_interface::*;
use wasm_bindgen::JsValue;
use wasm_bindgen_futures::JsFuture;
use web_sys::WebTransport;

use crate::connection::{WasmConnection, WasmConnectionId};

pub struct WasmEndpoint {
connections: SlotMap<WasmConnectionId, WasmConnection>,
/// A transport endpoint facilitated using WebTransport.
///
/// Uses async methods and should be polled manually within update.
pub struct WebTransportEndpoint {
endpoint: WebTransport,
local_addr: SocketAddr,
connections: HashMap<WebTransportConnectionId, WebTransportConnection>,
}

impl Endpoint for WasmEndpoint {
type Connection<'c> = &'c mut WasmConnection;
impl WebTransportEndpoint {
/// Creates a new endpoint, facilitated through WebTransport.
///
/// Requires a bind_addr (consider '0.0.0.0:0' for clients).
pub fn new(bind_addr: SocketAddr, url: &str) -> Result<Self, JsValue> {
let endpoint = WebTransport::new(url)?;

type ConnectionId = WasmConnectionId;
Ok(Self {
endpoint,
connections: HashMap::new(),
local_addr: bind_addr,
})
}

type ConnectDescription = String;
async fn handle_ready(&self) {
let ready_promise = self.endpoint.ready();
JsFuture::from(ready_promise).await.unwrap();
}

async fn handle_closed(&self) {
let closed_promise = self.endpoint.closed();
JsFuture::from(closed_promise).await.unwrap();
}

async fn process_event(&mut self, handler: &mut impl EndpointEventHandler<Self>) {
// Example of handling ready and closed events
self.handle_ready().await;
self.handle_closed().await;
}

type IncomingConnectionInfo<'i> = ();
async fn update_connections(&mut self, handler: &mut impl EndpointEventHandler<Self>) {
for connection in self.connections.values_mut() {
connection.poll_timeouts().await;
connection.poll_events(handler).await;
connection.accept_streams().await;
}
}
}

impl Endpoint for WebTransportEndpoint {
type Connection<'a> = &'a mut WebTransportConnection;
type ConnectionId = WebTransportConnectionId;
type ConnectDescription = String;
type IncomingConnectionInfo<'a> = String;

fn update(&mut self, handler: &mut impl transport_interface::EndpointEventHandler<Self>) {
todo!()
fn update(&mut self, handler: &mut impl EndpointEventHandler<Self>) {
wasm_bindgen_futures::spawn_local(async move {
self.process_event(handler).await;
self.update_connections(handler).await;
});
}

fn connection<'c>(
&'c self,
id: Self::ConnectionId,
) -> Option<<Self::Connection<'c> as transport_interface::ConnectionMut>::NonMut<'c>> {
todo!()
) -> Option<<Self::Connection<'c> as ConnectionMut>::NonMut<'c>> {
self.connections.get(&id)
}

fn connection_mut<'a>(&'a mut self, id: Self::ConnectionId) -> Option<Self::Connection<'a>> {
todo!()
fn connection_mut<'c>(&'c mut self, id: Self::ConnectionId) -> Option<Self::Connection<'c>> {
self.connections.get_mut(&id)
}

fn connect<'c>(
&'c mut self,
info: Self::ConnectDescription,
description: Self::ConnectDescription,
) -> Option<(Self::ConnectionId, Self::Connection<'c>)> {
todo!()
let connection = WebTransport::new(&description).ok()?;
let connection_id = WebTransportConnectionId::new();

let connection = WebTransportConnection::new(connection, connection_id);
assert!(
self.connections.insert(connection_id, connection).is_none(),
"Connection handle should not be a duplicate"
);

Some((connection_id, &mut self.connections[&connection_id]))
}

fn disconnect(&mut self, id: Self::ConnectionId) -> Result<(), ()> {
if let Some(mut connection) = self.connection_mut(id) {
connection.disconnect();
Ok(())
} else {
Err(())
}
}
}
1 change: 0 additions & 1 deletion crates/nevy_wasm/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
mod connection;
mod endpoint;
mod reader;
mod stream;
68 changes: 0 additions & 68 deletions crates/nevy_wasm/src/reader.rs

This file was deleted.

Loading

0 comments on commit 84f2013

Please sign in to comment.