Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SCTP] limit the bytes in the PendingQueue by using a semaphore #367

Merged
merged 23 commits into from
Jan 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
e2a329c
limit the bytes in the PendingQueue by using a semaphore
KillingSpark Dec 15, 2022
73c81d6
changelog
KillingSpark Dec 15, 2022
4357935
implement semaphore with async primitives
KillingSpark Dec 15, 2022
4151527
if one aquire did not use all the remaining capacity, notify another …
KillingSpark Dec 15, 2022
1c1b62d
use right changelog
KillingSpark Dec 15, 2022
c172dbc
use tokio semaphore
KillingSpark Dec 15, 2022
16bf4e2
Add try_write API to stream that is still sync and returns Err:TryAga…
KillingSpark Dec 15, 2022
fd966da
fix tests and issues around asyncifying the pending queue
KillingSpark Dec 15, 2022
a243845
changelog
KillingSpark Dec 15, 2022
1a7b28f
fix last tests that were ignored on my machine
KillingSpark Dec 15, 2022
45dbeeb
fix AsyncWrite impl again. I think the original thing was the correct…
KillingSpark Dec 20, 2022
9e7b46d
make poll_flush a no-op like tokio tcp does and give poll_shutdown a …
KillingSpark Dec 20, 2022
44e3044
limit pending queue to 128kB
KillingSpark Dec 21, 2022
b32c367
Fix blocking for ever on appends to the pendign queue larger than the…
KillingSpark Dec 21, 2022
cbc831f
Merge branch 'limit_pending_queue' of https://github.com/KillingSpark…
KillingSpark Dec 21, 2022
5e93120
typo
KillingSpark Dec 21, 2022
90fc599
Update sctp/CHANGELOG.md
KillingSpark Dec 21, 2022
d33dd76
go back to AsyncWrite implementation from before stream.write was sync
KillingSpark Dec 22, 2022
a26097f
remove unneeded sync try_write API on sctp stream
KillingSpark Dec 23, 2022
f8e11eb
Merge branch 'master' into limit_pending_queue
melekes Jan 2, 2023
5ad531b
Fix copypaste error
KillingSpark Jan 2, 2023
f4cf779
Fix copypaste error
KillingSpark Jan 2, 2023
ee22bc5
make clippy happy
KillingSpark Jan 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions data/src/data_channel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ impl DataChannel {
})
.marshal()?;

stream.write_sctp(&msg, PayloadProtocolIdentifier::Dcep)?;
stream
.write_sctp(&msg, PayloadProtocolIdentifier::Dcep)
.await?;
}
Ok(DataChannel::new(stream, config))
}
Expand Down Expand Up @@ -284,10 +286,13 @@ impl DataChannel {
};

let n = if data_len == 0 {
let _ = self.stream.write_sctp(&Bytes::from_static(&[0]), ppi)?;
let _ = self
.stream
.write_sctp(&Bytes::from_static(&[0]), ppi)
.await?;
0
} else {
let n = self.stream.write_sctp(data, ppi)?;
let n = self.stream.write_sctp(data, ppi).await?;
self.bytes_sent.fetch_add(n, Ordering::SeqCst);
n
};
Expand All @@ -300,7 +305,8 @@ impl DataChannel {
let ack = Message::DataChannelAck(DataChannelAck {}).marshal()?;
Ok(self
.stream
.write_sctp(&ack, PayloadProtocolIdentifier::Dcep)?)
.write_sctp(&ack, PayloadProtocolIdentifier::Dcep)
.await?)
}

/// Close closes the DataChannel and the underlying SCTP stream.
Expand Down
12 changes: 8 additions & 4 deletions sctp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@

## Unreleased

* Performance improvements
* improve algorithm used to push to pending queue from O(n*log(n)) to O(log(n)) [#365](https://github.com/webrtc-rs/webrtc/pull/365)
* reuse as many allocations as possible when marshaling [#364](https://github.com/webrtc-rs/webrtc/pull/364)
* The lock for the internal association was contended badly because marshaling was done while still in a critical section and also tokio was scheduling tasks badly [#363](https://github.com/webrtc-rs/webrtc/pull/363)
* Limit the bytes in the PendingQueue to avoid packets accumulating there uncontrollably [367](https://github.com/webrtc-rs/webrtc/pull/367)
* Improve algorithm used to push to pending queue from O(n*log(n)) to O(log(n)) [#365](https://github.com/webrtc-rs/webrtc/pull/365)
* Reuse as many allocations as possible when marshaling [#364](https://github.com/webrtc-rs/webrtc/pull/364)
* The lock for the internal association was contended badly because marshaling was done while still in a critical section and also tokio was scheduling tasks badly [#363](https://github.com/webrtc-rs/webrtc/pull/363)

### Breaking

* Make `sctp::Stream::write` & `sctp::Stream::write_sctp` async again [367](https://github.com/webrtc-rs/webrtc/pull/367)

## v0.7.0

Expand Down
2 changes: 1 addition & 1 deletion sctp/examples/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ async fn main() -> Result<(), Error> {
while ping_seq_num < 10 {
let ping_msg = format!("ping {}", ping_seq_num);
println!("sent: {}", ping_msg);
stream_tx.write(&Bytes::from(ping_msg))?;
stream_tx.write(&Bytes::from(ping_msg)).await?;

ping_seq_num += 1;
}
Expand Down
2 changes: 1 addition & 1 deletion sctp/examples/pong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ async fn main() -> Result<(), Error> {

let pong_msg = format!("pong [{}]", ping_msg);
println!("sent: {}", pong_msg);
stream2.write(&Bytes::from(pong_msg))?;
stream2.write(&Bytes::from(pong_msg)).await?;

tokio::time::sleep(Duration::from_secs(1)).await;
}
Expand Down
2 changes: 1 addition & 1 deletion sctp/examples/throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ fn main() -> Result<(), Error> {

let mut now = tokio::time::Instant::now();
let mut pkt_num = 0;
while stream.write(&buf.clone().into()).is_ok() {
while stream.write(&buf.clone().into()).await.is_ok() {
pkt_num += 1;
if now.elapsed().as_secs() == 1 {
println!("Send {} pkts", pkt_num);
Expand Down
20 changes: 11 additions & 9 deletions sctp/src/association/association_internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ impl AssociationInternal {
) -> Vec<Packet> {
// Pop unsent data chunks from the pending queue to send as much as
// cwnd and rwnd allow.
let (chunks, sis_to_reset) = self.pop_pending_data_chunks_to_send();
let (chunks, sis_to_reset) = self.pop_pending_data_chunks_to_send().await;
if !chunks.is_empty() {
// Start timer. (noop if already started)
log::trace!("[{}] T3-rtx timer start (pt1)", self.name);
Expand Down Expand Up @@ -1717,7 +1717,7 @@ impl AssociationInternal {
self.handle_peer_last_tsn_and_acknowledgement(false)
}

fn send_reset_request(&mut self, stream_identifier: u16) -> Result<()> {
async fn send_reset_request(&mut self, stream_identifier: u16) -> Result<()> {
let state = self.get_state();
if state != AssociationState::Established {
return Err(Error::ErrResetPacketInStateNotExist);
Expand All @@ -1733,7 +1733,7 @@ impl AssociationInternal {
..Default::default()
};

self.pending_queue.push(c);
self.pending_queue.push(c).await;
self.awake_write_loop();

Ok(())
Expand Down Expand Up @@ -1798,7 +1798,7 @@ impl AssociationInternal {
}

/// Move the chunk peeked with self.pending_queue.peek() to the inflight_queue.
fn move_pending_data_chunk_to_inflight_queue(
async fn move_pending_data_chunk_to_inflight_queue(
&mut self,
beginning_fragment: bool,
unordered: bool,
Expand Down Expand Up @@ -1840,7 +1840,7 @@ impl AssociationInternal {

/// pop_pending_data_chunks_to_send pops chunks from the pending queues as many as
/// the cwnd and rwnd allows to send.
fn pop_pending_data_chunks_to_send(&mut self) -> (Vec<ChunkPayloadData>, Vec<u16>) {
async fn pop_pending_data_chunks_to_send(&mut self) -> (Vec<ChunkPayloadData>, Vec<u16>) {
let mut chunks = vec![];
let mut sis_to_reset = vec![]; // stream identifiers to reset

Expand Down Expand Up @@ -1885,8 +1885,9 @@ impl AssociationInternal {

self.rwnd -= data_len as u32;

if let Some(chunk) =
self.move_pending_data_chunk_to_inflight_queue(beginning_fragment, unordered)
if let Some(chunk) = self
.move_pending_data_chunk_to_inflight_queue(beginning_fragment, unordered)
.await
{
chunks.push(chunk);
}
Expand All @@ -1898,8 +1899,9 @@ impl AssociationInternal {
if let Some(c) = self.pending_queue.peek() {
let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);

if let Some(chunk) =
self.move_pending_data_chunk_to_inflight_queue(beginning_fragment, unordered)
if let Some(chunk) = self
.move_pending_data_chunk_to_inflight_queue(beginning_fragment, unordered)
.await
{
chunks.push(chunk);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,8 @@ async fn test_assoc_handle_init() -> Result<()> {
Ok(())
}

#[test]
fn test_assoc_max_message_size_default() -> Result<()> {
#[tokio::test]
async fn test_assoc_max_message_size_default() -> Result<()> {
let mut a = create_association_internal(Config {
net_conn: Arc::new(DumbConn {}),
max_receive_buffer_size: 0,
Expand All @@ -458,7 +458,7 @@ fn test_assoc_max_message_size_default() -> Result<()> {
let p = Bytes::from(vec![0u8; 65537]);
let ppi = PayloadProtocolIdentifier::from(s.default_payload_type.load(Ordering::SeqCst));

if let Err(err) = s.write_sctp(&p.slice(..65536), ppi) {
if let Err(err) = s.write_sctp(&p.slice(..65536), ppi).await {
assert_ne!(
Error::ErrOutboundPacketTooLarge,
err,
Expand All @@ -468,7 +468,7 @@ fn test_assoc_max_message_size_default() -> Result<()> {
assert!(false, "should be error");
}

if let Err(err) = s.write_sctp(&p.slice(..65537), ppi) {
if let Err(err) = s.write_sctp(&p.slice(..65537), ppi).await {
assert_eq!(
Error::ErrOutboundPacketTooLarge,
err,
Expand All @@ -482,8 +482,8 @@ fn test_assoc_max_message_size_default() -> Result<()> {
Ok(())
}

#[test]
fn test_assoc_max_message_size_explicit() -> Result<()> {
#[tokio::test]
async fn test_assoc_max_message_size_explicit() -> Result<()> {
let mut a = create_association_internal(Config {
net_conn: Arc::new(DumbConn {}),
max_receive_buffer_size: 0,
Expand All @@ -504,7 +504,7 @@ fn test_assoc_max_message_size_explicit() -> Result<()> {
let p = Bytes::from(vec![0u8; 30001]);
let ppi = PayloadProtocolIdentifier::from(s.default_payload_type.load(Ordering::SeqCst));

if let Err(err) = s.write_sctp(&p.slice(..30000), ppi) {
if let Err(err) = s.write_sctp(&p.slice(..30000), ppi).await {
assert_ne!(
Error::ErrOutboundPacketTooLarge,
err,
Expand All @@ -514,7 +514,7 @@ fn test_assoc_max_message_size_explicit() -> Result<()> {
assert!(false, "should be error");
}

if let Err(err) = s.write_sctp(&p.slice(..30001), ppi) {
if let Err(err) = s.write_sctp(&p.slice(..30001), ppi).await {
assert_eq!(
Error::ErrOutboundPacketTooLarge,
err,
Expand Down
Loading