Skip to content

Commit

Permalink
stream headers
Browse files Browse the repository at this point in the history
  • Loading branch information
DrewRidley committed Jun 16, 2024
1 parent a430cca commit 81ca091
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 88 deletions.
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ bevy_interface.path = "./crates/bevy_interface"

[workspace]
members = ["crates/*"]
default-members = [
"crates/bevy_interface",
"crates/nevy_quic",
"crates/nevy_web_transport",
"crates/transport_interface",
]
resolver = "2"

[workspace.dependencies]
Expand Down
34 changes: 21 additions & 13 deletions crates/bevy_interface/examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ fn main() {
});

app.add_plugins(EndpointPlugin::default());
app.add_plugins(StreamHeaderPlugin::default());

app.add_systems(Startup, (spawn_endpoint, apply_deferred, connect).chain());
app.add_systems(Update, (log_events, send_message, send_stream_data));
Expand All @@ -27,7 +28,7 @@ struct ExampleEndpoint;
#[derive(Component)]
struct ExampleStream {
connection_entity: Entity,
stream_id: BevyStreamId,
stream_id: HeaderStreamId,
buffer: Vec<u8>,
}

Expand Down Expand Up @@ -133,12 +134,15 @@ fn send_message(

let mut connection = endpoint.connection_mut(connection_entity).unwrap();

let stream_id = connection
.open_stream(Description::new_open_description::<QuinnStreamId>(
let stream_id = HeaderStreamId::new(
&mut connection,
Description::new_open_description::<QuinnStreamId>(
nevy_quic::quinn_proto::Dir::Uni,
))
.unwrap()
.unwrap();
),
96,
)
.unwrap()
.unwrap();

debug!("Opened stream");

Expand All @@ -165,16 +169,20 @@ fn send_stream_data(
.connection_mut(stream_queue.connection_entity)
.unwrap();

let mut stream = connection
.send_stream(stream_queue.stream_id.clone())
.unwrap()
.unwrap();
let Some(stream_id) = stream_queue.stream_id.poll_ready(&mut connection).unwrap() else {
continue;
};

let mut stream = connection.send_stream(stream_id.clone()).unwrap().unwrap();

loop {
if stream_queue.buffer.len() == 0 {
stream.close(Description::new_send_close_description::<QuinnStreamId>(
None,
));
stream
.close(Description::new_send_close_description::<QuinnStreamId>(
None,
))
.unwrap()
.unwrap();
commands.entity(stream_entity).despawn();
break;
}
Expand Down
63 changes: 33 additions & 30 deletions crates/bevy_interface/examples/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ fn main() {
});

app.add_plugins(EndpointPlugin::default());
app.add_plugins(StreamHeaderPlugin::default());

app.add_systems(Startup, spawn_endpoint);
app.add_systems(Update, (log_events, spawn_streams, receive_data));
Expand Down Expand Up @@ -79,41 +80,40 @@ fn spawn_endpoint(mut commands: Commands) {
let endpoint =
QuinnEndpoint::new("0.0.0.0:27018".parse().unwrap(), None, Some(server_config)).unwrap();

commands.spawn((ExampleEndpoint, BevyEndpoint::new(endpoint)));
commands.spawn((
ExampleEndpoint,
EndpointStreamHeaders,
BevyEndpoint::new(endpoint),
));
}

fn spawn_streams(
mut commands: Commands,
connection_q: Query<Entity, With<BevyConnection>>,
mut connections: Connections,
endpoint_q: Query<(), With<ExampleEndpoint>>,
mut stream_event_r: EventReader<HeaderStreamEvent>,
) {
for connection_entity in connection_q.iter() {
let mut endpoint = connections
.connection_endpoint_mut(connection_entity)
.unwrap();
for HeaderStreamEvent {
endpoint_entity,
connection_entity,
stream_id,
event_type,
..
} in stream_event_r.read()
{
if !endpoint_q.contains(*endpoint_entity) {
continue;
}

let mut connection = endpoint.connection_mut(connection_entity).unwrap();

while let Some(BevyStreamEvent {
stream_id,
event_type,
..
}) = connection.poll_stream_events()
{
match event_type {
StreamEventType::NewSendStream => (),
StreamEventType::ClosedSendStream => (),
StreamEventType::NewRecvStream => {
commands
.spawn(ExampleStream {
connection_entity,
stream_id,
buffer: Vec::new(),
})
.set_parent(connection_entity);
}
StreamEventType::ClosedRecvStream => (),
}
if let HeaderStreamEventType::NewRecvStream(header) = event_type {
info!("new recv stream with header {}", header);

commands
.spawn(ExampleStream {
connection_entity: *connection_entity,
stream_id: stream_id.clone(),
buffer: Vec::new(),
})
.set_parent(*connection_entity);
}
}
}
Expand Down Expand Up @@ -148,7 +148,10 @@ fn receive_data(
}

if !stream.is_open() {
info!("message received: {:?}", example_stream.buffer);
info!(
"message received: {:?}",
String::from_utf8_lossy(&example_stream.buffer)
);

commands.entity(stream_entity).despawn();
}
Expand Down
2 changes: 1 addition & 1 deletion crates/bevy_interface/src/connections/stream_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use transport_interface::*;

use crate::{description::Description, MismatchedType};

pub trait StreamError {
pub trait StreamError: std::fmt::Debug {
fn is_fatal(&self) -> bool;

fn into_any(self: Box<Self>) -> Box<dyn Any>;
Expand Down
8 changes: 0 additions & 8 deletions crates/bevy_interface/src/endpoint.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::any::Any;

use bevy::{prelude::*, utils::HashMap};
use transport_interface::*;

Expand All @@ -20,8 +18,6 @@ pub struct BevyEndpoint {
}

trait BevyEndpointType: Send + Sync {
fn endpoint_type_name(&self) -> &'static str;

fn update(&mut self, endpoint_entity: Entity, params: &mut UpdateHandlerParams);

fn connect(
Expand Down Expand Up @@ -111,10 +107,6 @@ where
E::ConnectionId: Send + Sync,
for<'a> <E::Connection<'a> as ConnectionMut<'a>>::StreamType: Send + Sync,
{
fn endpoint_type_name(&self) -> &'static str {
std::any::type_name::<E>()
}

fn update(&mut self, endpoint_entity: Entity, params: &mut UpdateHandlerParams) {
self.endpoint.update(&mut UpdateHandler {
params,
Expand Down
5 changes: 4 additions & 1 deletion crates/bevy_interface/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ pub mod prelude {
pub use crate::connections::{BevyStreamEvent, BevyStreamId};
pub use crate::description::{CloneableDescription, Description};
pub use crate::endpoint::{BevyConnection, BevyEndpoint, ConnectError, Connections};
pub use crate::stream_headers::StreamHeaderPlugin;
pub use crate::stream_headers::{
EndpointStreamHeaders, HeaderStreamEvent, HeaderStreamEventType, HeaderStreamId,
StreamHeaderPlugin,
};
pub use crate::{Connected, Disconnected, EndpointPlugin};
pub use transport_interface::StreamEventType;
}
Expand Down
Loading

0 comments on commit 81ca091

Please sign in to comment.