Skip to content

Commit

Permalink
Fixed throughput issues by fixing lack of guarentee on SCTP -> DTLS p…
Browse files Browse the repository at this point in the history
…acket ordering (#513)

* Fixed lack of guarentee on SCTP -> DTLS packet ordering by removing tokio::spawn. The ordering mixup triggered SCTPs congestion control, severely limitting throughput in practice.

* Formatted

* Fixed typo
  • Loading branch information
onnoowl authored Nov 15, 2023
1 parent d751e94 commit 361acc4
Showing 1 changed file with 25 additions and 29 deletions.
54 changes: 25 additions & 29 deletions sctp/src/association/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use association_internal::*;
use association_stats::*;
use bytes::{Bytes, BytesMut};
use rand::random;
use tokio::sync::{broadcast, mpsc, Mutex, Semaphore};
use tokio::sync::{broadcast, mpsc, Mutex};
use util::Conn;

use crate::chunk::chunk_abort::ChunkAbort;
Expand Down Expand Up @@ -487,18 +487,6 @@ impl Association {
let done = Arc::new(AtomicBool::new(false));
let name = Arc::new(name);

let limit = {
#[cfg(test)]
{
1
}
#[cfg(not(test))]
{
8
}
};

let sem = Arc::new(Semaphore::new(limit));
while !done.load(Ordering::Relaxed) {
//log::debug!("[{}] gather_outbound begin", name);
let (packets, continue_loop) = {
Expand All @@ -507,35 +495,43 @@ impl Association {
};
//log::debug!("[{}] gather_outbound done with {}", name, packets.len());

// We schedule a new task here for a reason:
// If we don't tokio tends to run the write_loop and read_loop of one connection on the same OS thread
// This means that even though we release the lock above, the read_loop isn't able to take it, simply because it is not being scheduled by tokio
// Doing it this way, tokio schedules this to a new thread, this future is suspended, and the read_loop can make progress
let net_conn = Arc::clone(&net_conn);
let bytes_sent = Arc::clone(&bytes_sent);
let name2 = Arc::clone(&name);
let done2 = Arc::clone(&done);
let sem = Arc::clone(&sem);
sem.acquire().await.unwrap().forget();
tokio::task::spawn(async move {
let mut buf = BytesMut::with_capacity(16 * 1024);
for raw in packets {
buf.clear();
if let Err(err) = raw.marshal_to(&mut buf) {
log::warn!("[{}] failed to serialize a packet: {:?}", name2, err);
} else {
let mut buffer = None;
for raw in packets {
let mut buf = buffer
.take()
.unwrap_or_else(|| BytesMut::with_capacity(16 * 1024));

// We do the marshalling work in a blocking task here for a reason:
// If we don't tokio tends to run the write_loop and read_loop of one connection on the same OS thread
// This means that even though we release the lock above, the read_loop isn't able to take it, simply because it is not being scheduled by tokio
// Doing it this way, tokio schedules this work on a dedicated blocking thread, this future is suspended, and the read_loop can make progress
match tokio::task::spawn_blocking(move || raw.marshal_to(&mut buf).map(|_| buf))
.await
.unwrap()
{
Ok(mut buf) => {
let raw = buf.as_ref();
if let Err(err) = net_conn.send(raw.as_ref()).await {
log::warn!("[{}] failed to write packets on net_conn: {}", name2, err);
done2.store(true, Ordering::Relaxed)
} else {
bytes_sent.fetch_add(raw.len(), Ordering::SeqCst);
}

// Reuse allocation. Have to use options, since spawn blocking can't borrow, has to take ownership.
buf.clear();
buffer = Some(buf);
}
Err(err) => {
log::warn!("[{}] failed to serialize a packet: {:?}", name2, err);
}
//log::debug!("[{}] sending {} bytes done", name, raw.len());
}
sem.add_permits(1);
});
//log::debug!("[{}] sending {} bytes done", name, raw.len());
}

if !continue_loop {
break;
Expand Down

1 comment on commit 361acc4

@ris-work
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this. Experienced a speed up of over 3x after this commit for a relatively high bandwidth usage.

Please sign in to comment.