diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml index 2d3f7ba5bb..8e7e843d6d 100644 --- a/.github/workflows/check.yml +++ b/.github/workflows/check.yml @@ -136,5 +136,6 @@ jobs: bench: name: "Benchmark" - needs: [check] + # TODO + # needs: [check] uses: ./.github/workflows/bench.yml diff --git a/neqo-bin/src/client/mod.rs b/neqo-bin/src/client/mod.rs index ebbab98286..11600f0844 100644 --- a/neqo-bin/src/client/mod.rs +++ b/neqo-bin/src/client/mod.rs @@ -380,6 +380,7 @@ struct Runner<'a, H: Handler> { handler: H, timeout: Option>>, args: &'a Args, + recv_buf: Option>, } impl<'a, H: Handler> Runner<'a, H> { @@ -445,12 +446,26 @@ impl<'a, H: Handler> Runner<'a, H> { async fn process_multiple_input(&mut self) -> Res<()> { loop { - let dgrams = self.socket.recv(&self.local_addr)?; - if dgrams.is_empty() { - break; - } + // TODO: big hack + let dgram = match self.socket.recv( + &self.local_addr, + self.recv_buf.take().expect("recv_buf not to be taken"), + ) { + Ok(Ok(d)) => d, + Ok(Err(recv_buf)) => { + self.recv_buf = Some(recv_buf); + break; + } + Err((e, recv_buf)) => { + self.recv_buf = Some(recv_buf); + return Err(e.into()); + } + }; + self.client - .process_multiple_input(dgrams.iter(), Instant::now()); + // TODO + .process_multiple_input(std::iter::once(&dgram), Instant::now()); + self.recv_buf = Some(dgram.into_recv_buf()); self.process_output().await?; } @@ -568,6 +583,7 @@ pub async fn client(mut args: Args) -> Res<()> { local_addr: real_local, socket: &mut socket, timeout: None, + recv_buf: Some(Vec::with_capacity(neqo_udp::RECV_BUF_SIZE)), } .run() .await? @@ -584,6 +600,7 @@ pub async fn client(mut args: Args) -> Res<()> { local_addr: real_local, socket: &mut socket, timeout: None, + recv_buf: Some(Vec::with_capacity(neqo_udp::RECV_BUF_SIZE)), } .run() .await? diff --git a/neqo-bin/src/server/mod.rs b/neqo-bin/src/server/mod.rs index 3867cd4ecc..03a426e8a7 100644 --- a/neqo-bin/src/server/mod.rs +++ b/neqo-bin/src/server/mod.rs @@ -205,6 +205,7 @@ pub struct ServerRunner { server: Box, timeout: Option>>, sockets: Vec<(SocketAddr, crate::udp::Socket)>, + recv_buf: Option>, } impl ServerRunner { @@ -219,6 +220,7 @@ impl ServerRunner { server, timeout: None, sockets, + recv_buf: Some(Vec::with_capacity(neqo_udp::RECV_BUF_SIZE)), } } @@ -289,13 +291,23 @@ impl ServerRunner { match self.ready().await? { Ready::Socket(inx) => loop { let (host, socket) = self.sockets.get_mut(inx).unwrap(); - let dgrams = socket.recv(host)?; - if dgrams.is_empty() { - break; - } - for dgram in dgrams { - self.process(Some(&dgram)).await?; - } + // TODO: big hack + let dgram = match socket.recv( + host, + self.recv_buf.take().expect("recv_buf not to be taken"), + ) { + Ok(Ok(d)) => d, + Ok(Err(recv_buf)) => { + self.recv_buf = Some(recv_buf); + break; + } + Err((e, recv_buf)) => { + self.recv_buf = Some(recv_buf); + return Err(e.into()); + } + }; + self.process(Some(&dgram)).await?; + self.recv_buf = Some(dgram.into_recv_buf()); }, Ready::Timeout => { self.timeout = None; diff --git a/neqo-bin/src/udp.rs b/neqo-bin/src/udp.rs index 94488032d7..51092d86fb 100644 --- a/neqo-bin/src/udp.rs +++ b/neqo-bin/src/udp.rs @@ -54,16 +54,35 @@ impl Socket { /// Receive a batch of [`Datagram`]s on the given [`Socket`], each set with /// the provided local address. - pub fn recv(&self, local_address: &SocketAddr) -> Result, io::Error> { + pub fn recv( + &self, + local_address: &SocketAddr, + recv_buf: Vec, + ) -> Result>, (io::Error, Vec)> { + // TODO: big hack! + let mut recv_buf = Some(recv_buf); self.inner - .try_io(tokio::io::Interest::READABLE, || { - neqo_udp::recv_inner(local_address, &self.state, &self.inner) - }) + .try_io( + tokio::io::Interest::READABLE, + || match neqo_udp::recv_inner_2( + local_address, + &self.state, + &self.inner, + recv_buf.take().unwrap(), + ) { + Ok(d) => return Ok(d), + Err((e, buf)) => { + recv_buf = Some(buf); + return Err(e); + } + }, + ) + .map(Ok) .or_else(|e| { if e.kind() == io::ErrorKind::WouldBlock { - Ok(vec![]) + Ok(Err(recv_buf.take().unwrap())) } else { - Err(e) + Err((e, recv_buf.take().unwrap())) } }) } diff --git a/neqo-common/src/datagram.rs b/neqo-common/src/datagram.rs index b2d346d02a..cdd2939dd0 100644 --- a/neqo-common/src/datagram.rs +++ b/neqo-common/src/datagram.rs @@ -13,6 +13,9 @@ pub struct Datagram { src: SocketAddr, dst: SocketAddr, tos: IpTos, + /// The segment size if this transmission contains multiple datagrams. + /// This is `None` if the [`Datagram`] only contains a single datagram + segment_size: Option, d: Vec, } @@ -22,6 +25,23 @@ impl Datagram { src, dst, tos, + segment_size: None, + d: d.into(), + } + } + + pub fn new_with_segment_size>>( + src: SocketAddr, + dst: SocketAddr, + tos: IpTos, + segment_size: usize, + d: V, + ) -> Self { + Self { + src, + dst, + tos, + segment_size: Some(segment_size), d: d.into(), } } @@ -44,6 +64,14 @@ impl Datagram { pub fn set_tos(&mut self, tos: IpTos) { self.tos = tos; } + + pub fn into_recv_buf(self) -> Vec { + self.d + } + + pub fn segment_size(&self) -> Option { + self.segment_size + } } impl Deref for Datagram { diff --git a/neqo-crypto/build.rs b/neqo-crypto/build.rs index 5e8ac6fda9..1c57aca78c 100644 --- a/neqo-crypto/build.rs +++ b/neqo-crypto/build.rs @@ -333,11 +333,7 @@ fn setup_standalone(nss: &str) -> Vec { "cargo:rustc-link-search=native={}", nsslibdir.to_str().unwrap() ); - if is_debug() || env::consts::OS == "windows" { - static_link(); - } else { - dynamic_link(); - } + static_link(); let mut flags: Vec = Vec::new(); for i in includes { diff --git a/neqo-transport/src/connection/mod.rs b/neqo-transport/src/connection/mod.rs index 2930af3997..aaf0ba2cb5 100644 --- a/neqo-transport/src/connection/mod.rs +++ b/neqo-transport/src/connection/mod.rs @@ -1498,118 +1498,123 @@ impl Connection { } fn input_path(&mut self, path: &PathRef, d: &Datagram, now: Instant) -> Res<()> { - let mut slc = &d[..]; - let mut dcid = None; - - qtrace!([self], "{} input {}", path.borrow(), hex(&**d)); - let pto = path.borrow().rtt().pto(PacketNumberSpace::ApplicationData); - - // Handle each packet in the datagram. - while !slc.is_empty() { - self.stats.borrow_mut().packets_rx += 1; - let (packet, remainder) = - match PublicPacket::decode(slc, self.cid_manager.decoder().as_ref()) { - Ok((packet, remainder)) => (packet, remainder), - Err(e) => { - qinfo!([self], "Garbage packet: {}", e); - qtrace!([self], "Garbage packet contents: {}", hex(slc)); - self.stats.borrow_mut().pkt_dropped("Garbage packet"); - break; - } - }; - match self.preprocess_packet(&packet, path, dcid.as_ref(), now)? { - PreprocessResult::Continue => (), - PreprocessResult::Next => break, - PreprocessResult::End => return Ok(()), - } - - qtrace!([self], "Received unverified packet {:?}", packet); - - match packet.decrypt(&mut self.crypto.states, now + pto) { - Ok(payload) => { - // OK, we have a valid packet. - self.idle_timeout.on_packet_received(now); - dump_packet( - self, - path, - "-> RX", - payload.packet_type(), - payload.pn(), - &payload[..], - d.tos(), - d.len(), - ); + for mut slc in d + .as_slice() + .chunks(d.segment_size().unwrap_or(d.as_slice().len())) + { + let mut dcid = None; - #[cfg(feature = "build-fuzzing-corpus")] - if packet.packet_type() == PacketType::Initial { - let target = if self.role == Role::Client { - "server_initial" - } else { - "client_initial" - }; - neqo_common::write_item_to_fuzzing_corpus(target, &payload[..]); - } + qtrace!([self], "{} input {}", path.borrow(), hex(&**d)); + let pto = path.borrow().rtt().pto(PacketNumberSpace::ApplicationData); - qlog::packet_received(&self.qlog, &packet, &payload); - let space = PacketNumberSpace::from(payload.packet_type()); - if let Some(space) = self.acks.get_mut(space) { - if space.is_duplicate(payload.pn()) { - qdebug!("Duplicate packet {}-{}", space, payload.pn()); - self.stats.borrow_mut().dups_rx += 1; - } else { - match self.process_packet(path, &payload, now) { - Ok(migrate) => { - self.postprocess_packet(path, d, &packet, migrate, now); - } - Err(e) => { - self.ensure_error_path(path, &packet, now); - return Err(e); - } - } + // Handle each packet in the datagram. + while !slc.is_empty() { + self.stats.borrow_mut().packets_rx += 1; + let (packet, remainder) = + match PublicPacket::decode(slc, self.cid_manager.decoder().as_ref()) { + Ok((packet, remainder)) => (packet, remainder), + Err(e) => { + qinfo!([self], "Garbage packet: {}", e); + qtrace!([self], "Garbage packet contents: {}", hex(slc)); + self.stats.borrow_mut().pkt_dropped("Garbage packet"); + break; } - } else { - qdebug!( - [self], - "Received packet {} for untracked space {}", - space, - payload.pn() - ); - return Err(Error::ProtocolViolation); - } + }; + match self.preprocess_packet(&packet, path, dcid.as_ref(), now)? { + PreprocessResult::Continue => (), + PreprocessResult::Next => break, + PreprocessResult::End => return Ok(()), } - Err(e) => { - match e { - Error::KeysPending(cspace) => { - // This packet can't be decrypted because we don't have the keys yet. - // Don't check this packet for a stateless reset, just return. - let remaining = slc.len(); - self.save_datagram(cspace, d, remaining, now); - return Ok(()); + + qtrace!([self], "Received unverified packet {:?}", packet); + + match packet.decrypt(&mut self.crypto.states, now + pto) { + Ok(payload) => { + // OK, we have a valid packet. + self.idle_timeout.on_packet_received(now); + dump_packet( + self, + path, + "-> RX", + payload.packet_type(), + payload.pn(), + &payload[..], + d.tos(), + d.len(), + ); + + #[cfg(feature = "build-fuzzing-corpus")] + if packet.packet_type() == PacketType::Initial { + let target = if self.role == Role::Client { + "server_initial" + } else { + "client_initial" + }; + neqo_common::write_item_to_fuzzing_corpus(target, &payload[..]); } - Error::KeysExhausted => { - // Exhausting read keys is fatal. - return Err(e); + + qlog::packet_received(&self.qlog, &packet, &payload); + let space = PacketNumberSpace::from(payload.packet_type()); + if let Some(space) = self.acks.get_mut(space) { + if space.is_duplicate(payload.pn()) { + qdebug!("Duplicate packet {}-{}", space, payload.pn()); + self.stats.borrow_mut().dups_rx += 1; + } else { + match self.process_packet(path, &payload, now) { + Ok(migrate) => { + self.postprocess_packet(path, d, &packet, migrate, now); + } + Err(e) => { + self.ensure_error_path(path, &packet, now); + return Err(e); + } + } + } + } else { + qdebug!( + [self], + "Received packet {} for untracked space {}", + space, + payload.pn() + ); + return Err(Error::ProtocolViolation); } - Error::KeysDiscarded(cspace) => { - // This was a valid-appearing Initial packet: maybe probe with - // a Handshake packet to keep the handshake moving. - self.received_untracked |= - self.role == Role::Client && cspace == CryptoSpace::Initial; + } + Err(e) => { + match e { + Error::KeysPending(cspace) => { + // This packet can't be decrypted because we don't have the keys yet. + // Don't check this packet for a stateless reset, just return. + let remaining = slc.len(); + self.save_datagram(cspace, d, remaining, now); + return Ok(()); + } + Error::KeysExhausted => { + // Exhausting read keys is fatal. + return Err(e); + } + Error::KeysDiscarded(cspace) => { + // This was a valid-appearing Initial packet: maybe probe with + // a Handshake packet to keep the handshake moving. + self.received_untracked |= + self.role == Role::Client && cspace == CryptoSpace::Initial; + } + _ => (), } - _ => (), + // Decryption failure, or not having keys is not fatal. + // If the state isn't available, or we can't decrypt the packet, drop + // the rest of the datagram on the floor, but don't generate an error. + self.check_stateless_reset(path, d, dcid.is_none(), now)?; + self.stats.borrow_mut().pkt_dropped("Decryption failure"); + qlog::packet_dropped(&self.qlog, &packet); } - // Decryption failure, or not having keys is not fatal. - // If the state isn't available, or we can't decrypt the packet, drop - // the rest of the datagram on the floor, but don't generate an error. - self.check_stateless_reset(path, d, dcid.is_none(), now)?; - self.stats.borrow_mut().pkt_dropped("Decryption failure"); - qlog::packet_dropped(&self.qlog, &packet); } + slc = remainder; + dcid = Some(ConnectionId::from(packet.dcid())); } - slc = remainder; - dcid = Some(ConnectionId::from(packet.dcid())); + + self.check_stateless_reset(path, d, dcid.is_none(), now)?; } - self.check_stateless_reset(path, d, dcid.is_none(), now)?; Ok(()) } diff --git a/neqo-transport/src/connection/tests/mod.rs b/neqo-transport/src/connection/tests/mod.rs index e5ca47a6c0..ebd3378760 100644 --- a/neqo-transport/src/connection/tests/mod.rs +++ b/neqo-transport/src/connection/tests/mod.rs @@ -678,3 +678,32 @@ fn create_server() { // Server won't have a default path, so no RTT. assert_eq!(stats.rtt, Duration::from_secs(0)); } + +#[test] +fn segments() { + let mut client = default_client(); + let mut server = default_server(); + connect_force_idle(&mut client, &mut server); + + let before = server.stats().packets_rx; + + let stream_id = client.stream_create(StreamType::UniDi).unwrap(); + let (dgrams, _) = fill_cwnd(&mut client, stream_id, now()); + + let source = dgrams[0].source(); + let destination = dgrams[0].destination(); + let tos = dgrams[0].tos(); + let d: Vec = dgrams.into_iter().fold(Vec::new(), |mut acc, i| { + acc.extend_from_slice(i.as_slice()); + acc + }); + // TODO: segment size is wrong. + let large_dgram = Datagram::new_with_segment_size(source, destination, tos, 42, d); + server.process_input(&large_dgram, now()); + assert!( + server.stats().packets_rx > before + 1, + "before {}, after {}", + before, + server.stats().packets_rx + ); +} diff --git a/neqo-udp/src/lib.rs b/neqo-udp/src/lib.rs index 5f1fb3dbe6..5d1f4c8803 100644 --- a/neqo-udp/src/lib.rs +++ b/neqo-udp/src/lib.rs @@ -21,7 +21,7 @@ use quinn_udp::{EcnCodepoint, RecvMeta, Transmit, UdpSocketState}; /// Allows reading multiple datagrams in a single [`Socket::recv`] call. // // TODO: Experiment with different values across platforms. -const RECV_BUF_SIZE: usize = u16::MAX as usize; +pub const RECV_BUF_SIZE: usize = u16::MAX as usize; std::thread_local! { static RECV_BUF: RefCell> = RefCell::new(vec![0; RECV_BUF_SIZE]); @@ -116,6 +116,64 @@ pub fn recv_inner( Ok(dgrams) } +pub fn recv_inner_2( + local_address: &SocketAddr, + state: &UdpSocketState, + socket: impl SocketRef, + mut recv_buf: Vec, +) -> Result)> { + assert_eq!(recv_buf.capacity(), RECV_BUF_SIZE); + // TODO: unsafe worth it here? + unsafe { + recv_buf.set_len(RECV_BUF_SIZE); + } + + let mut meta; + + loop { + meta = RecvMeta::default(); + + if let Err(e) = state.recv( + (&socket).into(), + &mut [IoSliceMut::new(recv_buf.as_mut())], + slice::from_mut(&mut meta), + ) { + return Err((e, recv_buf)); + } + + if meta.len == 0 || meta.stride == 0 { + qdebug!( + "ignoring datagram from {} to {} len {} stride {}", + meta.addr, + local_address, + meta.len, + meta.stride + ); + continue; + } + + recv_buf.truncate(meta.len); + + break; + } + + qtrace!( + "received {} bytes from {} to {} with {} segments", + recv_buf.len(), + meta.addr, + local_address, + meta.len.div_ceil(meta.stride), + ); + + Ok(Datagram::new_with_segment_size( + meta.addr, + *local_address, + meta.ecn.map(|n| IpTos::from(n as u8)).unwrap_or_default(), + meta.stride, + recv_buf, + )) +} + /// A wrapper around a UDP socket, sending and receiving [`Datagram`]s. pub struct Socket { state: UdpSocketState,