Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data channel fails to send packets at a frequently speed when works as a rtc server and connected by a browser. #360

Closed
HsuJv opened this issue Dec 8, 2022 · 32 comments

Comments

@HsuJv
Copy link

HsuJv commented Dec 8, 2022

Hi all,

I'm trying to test the bandwidth of the Data Channel based on your example

I didn't change too many codes except the data channel callbacks.
I tried to send as much as possible to the client when a data channel was created (will show in the codes later).
And with a controllable delay, I tried send frequency at10Hz, 20Hz, 25Hz, 40Hz, 50Hz...etc

The problem is, when it comes to over than 40Hz, the data received in the browser mismatches the data I'm trying to send.
With the send frequency set to 40Hz (65535 bytes per send call), I've got the throughput jitter sharply and the browser receives fewer packets than I sent.
image
The gap increases when the send frequency becomes larger.
(Note here in the picture 192.168.1.2 is my webrtc server which runs the codes and sends data)

With the send frequency set to 25Hz (65535 bytes per send call as well), I've got the throughput smoothly and the browser receives all the data I sent.
image

I've got no idea what is going wrong, please if anyone here can help?

P.S.
Below are my code snippets if they can be helpful.

I rewrite these codes

peer_connection
.on_data_channel(Box::new(move |d: Arc<RTCDataChannel>| {
let d_label = d.label().to_owned();
let d_id = d.id();
println!("New DataChannel {} {}", d_label, d_id);
// Register channel opening handling
Box::pin(async move {
let d2 = Arc::clone(&d);
let d_label2 = d_label.clone();
let d_id2 = d_id;
d.on_open(Box::new(move || {
println!("Data channel '{}'-'{}' open. Random messages will now be sent to any connected DataChannels every 5 seconds", d_label2, d_id2);
Box::pin(async move {
let mut result = Result::<usize>::Ok(0);
while result.is_ok() {
let timeout = tokio::time::sleep(Duration::from_secs(5));
tokio::pin!(timeout);
tokio::select! {
_ = timeout.as_mut() =>{
let message = math_rand_alpha(15);
println!("Sending '{}'", message);
result = d2.send_text(message).await.map_err(Into::into);
}
};
}
})
}));
// Register text message handling
d.on_message(Box::new(move |msg: DataChannelMessage| {
let msg_str = String::from_utf8(msg.data.to_vec()).unwrap();
println!("Message from DataChannel '{}': '{}'", d_label, msg_str);
Box::pin(async {})
}));
})
}));

to the following

    // Register data channel creation handling
    peer_connection.on_data_channel(Box::new(move |d: Arc<RTCDataChannel>| {
        let d_label = d.label().to_owned();
        let d_id = d.id();
        println!("New DataChannel {} {}", d_label, d_id);

        // Register channel opening handling
        Box::pin(async move {
            use tokio::time;
            let d2 = Arc::clone(&d);
            let d_label2 = d_label.clone();
            let d_id2 = d_id;

            d.on_open(Box::new(move || {
                println!("Data channel '{}'-'{}' open.", d_label2, d_id2);
                let mut buf = Vec::with_capacity(65535);
                let mut current = time::Instant::now();
                let mut pkt_num = 0_usize;
                unsafe {
                    buf.set_len(65535);
                }

                Box::pin(async move {
                    while d2.send(&buf.to_vec().into()).await.is_ok() {
                        pkt_num  += 1;
                        time::sleep(time::Duration::from_millis(40)).await;
                        if current.elapsed().as_secs() > 1 {
                            println!("current send {} packets", pkt_num);
                            current = time::Instant::now();
                        }
                    }
                })
            }));

            // Register text message handling
            d.on_message(Box::new(move |_: DataChannelMessage| {
                Box::pin(async move {})
            }));

            d.on_close(Box::new(|| std::process::exit(0)));
        })
    }));

And my Javascript codes are quite simple as shown below

let pc = new RTCPeerConnection({
    iceServers: [
        {
            urls: 'stun:stun.l.google.com:19302'
        }
    ]
})

var recv_bytes = 0;
var pkts_num = 0;
let interval = null;

let sendChannel = pc.createDataChannel('foo')
sendChannel.onclose = () => {
    console.log('sendChannel has closed')
    clearInterval(interval);
}
sendChannel.onopen = () => {
    console.log('sendChannel has opened')
    interval = setInterval(() => {
        console.log("Recv " + pkts_num + " packets. Current rate: " + recv_bytes + " B/s");
        recv_bytes = 0;
    }, 1000);
}
sendChannel.onmessage = e => {
    recv_bytes += e.data.byteLength
    pkts_num += 1
    // sendChannel.send(e.data)
}

pc.oniceconnectionstatechange = e => console.log(pc.iceConnectionState)
pc.onnegotiationneeded = e =>
    pc.createOffer().then(d => {
        pc.setLocalDescription(d)
        fetch('/sdp', {
            method: 'POST',
            headers: {
                'Content-Type': 'application/x-www-form-urlencoded'
            },
            body: "client=" + btoa(JSON.stringify(d))
        }).then(response => response.text().then(response => {
            pc.setRemoteDescription(new RTCSessionDescription(JSON.parse(atob(response))))
        }))
    }).catch(console.log)
@KillingSpark
Copy link
Contributor

There is issue #101 which might be what you are running up against?

@HsuJv
Copy link
Author

HsuJv commented Dec 8, 2022

@KillingSpark ,
Thanks for the information!

I tried the example in the issue, and it works well with 1024 bytes per send call.
But when I increase the send data to 16384 bytes per call, it still fails.
The buffered_amount of the data channel just doesn't get down unless I increase the sleep time to 10ms (I tried 10 micros, 100 micros, and 1000 micros which didn't help).

My question here is what exactly blocks the buffered_amount from getting down

image
image

@KillingSpark
Copy link
Contributor

I can only guess, I don't have deep knowledge on this library (I am just a user with a few small contributions :)). But the default for datachannels is probably reliable transport, so if the bandwidth is used to the maximum, it will start buffering data, just like tcp would.

Just out of interest, does this change if you open a datachannel with an unreliable transport? Should be a setting on the javascript API somewhere.

@HsuJv
Copy link
Author

HsuJv commented Dec 8, 2022

I've got no idea but I'll try it out later.
Anyway thanks for the help.
Let's leave the issue here to see if anyone else would like to answer.

@HsuJv
Copy link
Author

HsuJv commented Dec 9, 2022

It seems something is wrong within the SCTP

I just wrote a simple POC to do a throughput test

use clap::{App, AppSettings, Arg};
use std::io::Write;
use std::sync::Arc;
use tokio::net::UdpSocket;
use util::{conn::conn_disconnected_packet::DisconnectedPacketConn, Conn};
use webrtc_sctp::association::*;
use webrtc_sctp::chunk::chunk_payload_data::PayloadProtocolIdentifier;
use webrtc_sctp::stream::*;
use webrtc_sctp::Error;

fn main() -> Result<(), Error> {
    env_logger::Builder::new()
        .format(|buf, record| {
            writeln!(
                buf,
                "{}:{} [{}] {} - {}",
                record.file().unwrap_or("unknown"),
                record.line().unwrap_or(0),
                record.level(),
                chrono::Local::now().format("%H:%M:%S.%6f"),
                record.args()
            )
        })
        .filter(None, log::LevelFilter::Warn)
        .init();

    let mut app = App::new("SCTP Throughput")
        .version("0.1.0")
        .about("An example of SCTP Server")
        .setting(AppSettings::DeriveDisplayOrder)
        .setting(AppSettings::SubcommandsNegateReqs)
        .arg(
            Arg::with_name("FULLHELP")
                .help("Prints more detailed help information")
                .long("fullhelp"),
        )
        .arg(
            Arg::with_name("port")
                .required_unless("FULLHELP")
                .takes_value(true)
                .long("port")
                .help("use port ."),
        );

    let matches = app.clone().get_matches();

    if matches.is_present("FULLHELP") {
        app.print_long_help().unwrap();
        std::process::exit(0);
    }

    let port1 = matches.value_of("port").unwrap().to_owned();
    let port2 = port1.clone();

    std::thread::spawn(|| {
        tokio::runtime::Runtime::new()
            .unwrap()
            .block_on(async move {
                let conn = DisconnectedPacketConn::new(Arc::new(
                    UdpSocket::bind(format!("127.0.0.1:{}", port1))
                        .await
                        .unwrap(),
                ));
                println!("listening {}...", conn.local_addr().unwrap());

                let config = Config {
                    net_conn: Arc::new(conn),
                    max_receive_buffer_size: 0,
                    max_message_size: 0,
                    name: "server".to_owned(),
                };
                let a = Association::server(config).await?;
                println!("created a server");

                let stream = a.accept_stream().await.unwrap();
                println!("accepted a stream");

                // set unordered = true and 10ms treshold for dropping packets
                stream.set_reliability_params(true, ReliabilityType::Timed, 10);

                let mut buff = vec![0u8; 65535];
                let mut recv = 0;
                let mut pkt_num = 0;
                let mut loop_num = 0;
                let mut now = tokio::time::Instant::now();
                while let Ok(n) = stream.read(&mut buff).await {
                    recv += n;
                    if n != 0 {
                        pkt_num += 1;
                    }
                    loop_num += 1;
                    if now.elapsed().as_secs() == 1 {
                        println!(
                            "Throughput: {} Bytes/s, {} pkts, {} loops",
                            recv, pkt_num, loop_num
                        );
                        now = tokio::time::Instant::now();
                        recv = 0;
                        loop_num = 0;
                        pkt_num = 0;
                    }
                }
                Result::<(), Error>::Ok(())
            })
    });

    std::thread::spawn(|| {
        tokio::runtime::Runtime::new()
            .unwrap()
            .block_on(async move {
                let conn = Arc::new(UdpSocket::bind("0.0.0.0:0").await.unwrap());
                conn.connect(format!("127.0.0.1:{}", port2)).await.unwrap();
                println!("connecting {}..", format!("127.0.0.1:{}", port2));

                let config = Config {
                    net_conn: conn,
                    max_receive_buffer_size: 0,
                    max_message_size: 0,
                    name: "client".to_owned(),
                };
                let a = Association::client(config).await.unwrap();
                println!("created a client");

                let stream = a
                    .open_stream(0, PayloadProtocolIdentifier::Binary)
                    .await
                    .unwrap();
                println!("opened a stream");

                let mut buf = Vec::with_capacity(65535);
                unsafe {
                    buf.set_len(65535);
                }

                let mut now = tokio::time::Instant::now();
                let mut pkt_num = 0;
                while stream.write(&buf.clone().into()).is_ok() {
                    pkt_num += 1;
                    if now.elapsed().as_secs() == 1 {
                        println!("Send {} pkts", pkt_num);
                        now = tokio::time::Instant::now();
                        pkt_num = 0;
                    }
                }
                Result::<(), Error>::Ok(())
            })
    });
    loop {}
}

And got logs like

listening 127.0.0.1:8888...
connecting 127.0.0.1:8888..
created a server
created a client
opened a stream
accepted a stream
Send 20245 pkts
Throughput: 4980660 Bytes/s, 76 pkts
Send 20135 pkts
Throughput: 5046195 Bytes/s, 77 pkts
Send 19996 pkts
Throughput: 5111730 Bytes/s, 78 pkts

whatever the set_reliability_params is true or false

But if I remove the sctp layer and use tokio udp socket directly, everything is ok

Connect to 127.0.0.1:8888
Connected
listening 127.0.0.1:8888
Send 197388 pkts
Throughput: 3233792000 Bytes/s, 197375 pkts
Send 227920 pkts
Throughput: 3734208512 Bytes/s, 227918 pkts
Send 242558 pkts
Throughput: 3973890048 Bytes/s, 242547 pkts
Send 214998 pkts
Throughput: 3522510848 Bytes/s, 214997 pkts
Send 212757 pkts
Throughput: 3485302784 Bytes/s, 212726 pkts

@KillingSpark
Copy link
Contributor

KillingSpark commented Dec 9, 2022

Didn't test the UDP case, but I believe your numbers. The SCTP example you provided is very slow for me too, around 100is packets instead of your 76ish. In a release build I can get up to 900 packets though. That's still way less than it probably should be, but goes to show how good the optimizer is :)

A profile I did on a release build shows this:

This stack (37% of total execution time) is concerned with marshalling packets (20% of total execution time) and unmarshaling packets (6% of total execution time)
image

This stack (about 29%) is... blocking on a threadpool? Seems like tokio internals but maybe this is influenced by the usage of tokio in this library?
image

I performed this with Reliability::Rexmit 0, so the sender performance should not rely on the receiver performance

My logs look like this:

Send 71410 pkts
Throughput: 42728820 Bytes/s, 652 pkts, 652 loops
Send 68634 pkts
Throughput: 41155980 Bytes/s, 628 pkts, 628 loops
Send 71338 pkts
Throughput: 42794355 Bytes/s, 653 pkts, 653 loops
Send 68920 pkts
Throughput: 39386535 Bytes/s, 601 pkts, 601 loops
Send 68421 pkts
Throughput: 41024910 Bytes/s, 626 pkts, 626 loops
Send 69504 pkts
Throughput: 41549190 Bytes/s, 634 pkts, 634 loops

All in all this suggest to me a few things:

  • The marshalling could maybe be improved, but it taking a lot of CPU time is expected, it is pumping out 70k pkts/s
  • The receiving part is doing SOMETHING wrong. About 99% of the packets seem to be dropped, according to the mismatch between the amount of sent and received packets.
  • Since packet unmarshalling is only using 6% of the total runtime the packets seem to get dropped somewhere before the processing of the data even begins

My speculation is: For some reason tokio does not read packets fast enough from the socket, resulting in the processing code not being called often and causing many packets to be dropped by the kernel because the socket queue is overflowing.

That there is that much time spent on blocking on a threadpool seems fishy and could be related. Not sure though.

Edit:

A quick look into netstat confirms that the receive queue is overflowing. Just gotta figure out why the packets are not retrieved fast enough.

Proto Recv-Q Send-Q  Local Address          Foreign Address        (state)
udp4    1680      0  localhost.28300        localhost.10000

I had a look around in the code and maybe there is just high contention on the reassembly queue lock? That would maybe explain the blocking on parked threads?

@HsuJv
Copy link
Author

HsuJv commented Dec 9, 2022

My UDP snippets were like this

fn main() -> Result<(), Error> {
    env_logger::Builder::new()
        .format(|buf, record| {
            writeln!(
                buf,
                "{}:{} [{}] {} - {}",
                record.file().unwrap_or("unknown"),
                record.line().unwrap_or(0),
                record.level(),
                chrono::Local::now().format("%H:%M:%S.%6f"),
                record.args()
            )
        })
        .filter(None, log::LevelFilter::Warn)
        .init();

    let mut app = App::new("SCTP Throughput")
        .version("0.1.0")
        .about("An example of SCTP Server")
        .setting(AppSettings::DeriveDisplayOrder)
        .setting(AppSettings::SubcommandsNegateReqs)
        .arg(
            Arg::with_name("FULLHELP")
                .help("Prints more detailed help information")
                .long("fullhelp"),
        )
        .arg(
            Arg::with_name("port")
                .required_unless("FULLHELP")
                .takes_value(true)
                .long("port")
                .help("use port ."),
        );

    let matches = app.clone().get_matches();

    if matches.is_present("FULLHELP") {
        app.print_long_help().unwrap();
        std::process::exit(0);
    }

    let port1 = matches.value_of("port").unwrap().to_owned();
    let port2 = port1.clone();

    std::thread::spawn(|| {
        tokio::runtime::Runtime::new()
            .unwrap()
            .block_on(async move {
                let conn = UdpSocket::bind(format!("127.0.0.1:{}", port1))
                    .await
                    .unwrap();
                println!("listening {}", format!("127.0.0.1:{}", port1));

                let mut buff = vec![0u8; 65535];
                let mut recv = 0;
                let mut pkt_num = 0;
                let mut now = tokio::time::Instant::now();
                while let Ok(n) = conn.recv(&mut buff).await {
                    recv += n;
                    if n != 0 {
                        pkt_num += 1;
                    }
                    if now.elapsed().as_secs() == 1 {
                        println!("Throughput: {} Bytes/s, {} pkts", recv, pkt_num);
                        now = tokio::time::Instant::now();
                        recv = 0;
                        pkt_num = 0;
                    }
                }
                Result::<(), Error>::Ok(())
            })
    });

    std::thread::spawn(|| {
        tokio::runtime::Runtime::new()
            .unwrap()
            .block_on(async move {
                let conn = UdpSocket::bind("0.0.0.0:0").await.unwrap();
                println!("Connect to {}", format!("127.0.0.1:{}", port2));
                conn.connect(format!("127.0.0.1:{}", port2)).await.unwrap();
                println!("Connected");

                let mut buf = Vec::with_capacity(16384);
                unsafe {
                    buf.set_len(16384);
                }

                let mut now = tokio::time::Instant::now();
                let mut pkt_num = 0;
                while conn.send(&buf).await.is_ok() {
                    pkt_num += 1;
                    if now.elapsed().as_secs() == 1 {
                        println!("Send {} pkts", pkt_num);
                        now = tokio::time::Instant::now();
                        pkt_num = 0;
                    }
                }
                Result::<(), Error>::Ok(())
            })
    });
    loop {}
}

With this I think we can rule out the possibility that tokio goes wrong.
Hope this would be helpful for you.
I will do some more research when I have time.

BRs.

@KillingSpark
Copy link
Contributor

Ok so I did some more digging and need to take back what I wrote earlier. It is NOT the received queue of the socket filling up.

Two things i found:

  1. The sctp layer decides to limit how many packets are sent
  2. The pending queue grows indefinitely (partly because of 1. but this should apply back pressure to the code actually sending packets)

So there are two separate problems at play here:

  1. SCTP is limiting me to about 40MBit/s in release mode. This seems way smaller than it should be, considering the speeds that raw UDP can provide.
  2. The pending queue does not apply backpressure, causing these confusing logs. If this backpressure had been applied it would have been obvious that the sender is slow and not the reader. It's also important in general that this library doesn't just fill up all the memory just because the application is writing faster than the connection is sending data.

@KillingSpark
Copy link
Contributor

KillingSpark commented Dec 12, 2022

More info: it seems like the ACK mechanism is borked:

I just did dumb println! debugging in association_internal.rs

  • "Handle" => "handle_data"
  • "Send" => "pop_sending_data_chunks_to_send"
  • "ACK" => "create_selective_ack_chunk"
  • "ACKRCV => "process_selective_ack"
Handle TSN: 438973196
Send   TSN: 438973199
Handle TSN: 438973197
Send   TSN: 438973200
Handle TSN: 438973198
ACK    TSN: 438973198
Send   TSN: 438973201
ACKRCV TSN: 438972331
Send   TSN: 438973202
Handle TSN: 438973199
Handle TSN: 438973200
Send   TSN: 438973203
Handle TSN: 438973201
ACK    TSN: 438973201
Send   TSN: 438973204
ACKRCV TSN: 438972334
Handle TSN: 438973202
Handle TSN: 438973203
Handle TSN: 438973204

Note that Send/Handle/ACK seem to match pretty well while ACKRCV is laggin behind significantly. So either

  1. The ACK is sent with a big delay
  2. The ACK is processed with a big delay

Further investigation shows: it is very likely option number 2. Now to find out why...

@KillingSpark
Copy link
Contributor

KillingSpark commented Dec 12, 2022

Yup it's lock contention. Specifically on the Mutex around the InternalAssociation struct.

Occupied time in readloop measures time used to process an SACK and in writeloop it's gathering packets to send.

Taking time is just measuring the time it took to actually lock the mutex before doing the above operation.

It seems pretty clear to me that the SACK processing is stalled by the lock contention. The solution to this is probably non-trivial. Maybe instead of making the writeloop gather packets to send, the packets should be put into a channel either when they are queued and can be sent immediatly within the rwnd or when a SACK arrives that increases the rwnd?

Writeloop lock occupied: 1431us
readloop lock occupied:  43us
Writeloop lock occupied: 38us
Writeloop lock occupied: 1us
Writeloop lock occupied: 1548us
readloop lock occupied:  46us
Writeloop lock occupied: 17us
Writeloop lock occupied: 0us
Writeloop lock occupied: 1498us
readloop lock occupied:  47us
Writeloop lock occupied: 66us
Writeloop lock occupied: 1us
Writeloop lock occupied: 1412us
readloop lock occupied:  43us
Writeloop lock occupied: 19us
Writeloop lock occupied: 0us
Writeloop lock occupied: 1426us
readloop lock occupied:  43us
Writeloop lock occupied: 19us
Writeloop lock occupied: 1us
Writeloop lock occupied: 1552us
readloop lock occupied:  68us
Writeloop lock occupied: 19us
Writeloop lock occupied: 0us
Writeloop lock occupied: 1641us
Taking Writeloop lock: 0us
Taking Writeloop lock: 0us
Taking readloop lock:  791us
Taking Writeloop lock: 66us
Taking Writeloop lock: 0us
Taking Writeloop lock: 0us
Taking readloop lock:  825us
Taking Writeloop lock: 65us
Taking Writeloop lock: 0us
Taking Writeloop lock: 0us
Taking readloop lock:  870us
Taking Writeloop lock: 68us
Taking Writeloop lock: 0us
Taking Writeloop lock: 0us
Taking readloop lock:  861us
Taking Writeloop lock: 71us
Taking Writeloop lock: 0us
Taking Writeloop lock: 0us
Taking readloop lock:  846us
Taking Writeloop lock: 71us
Taking Writeloop lock: 0us
Taking Writeloop lock: 0us
Taking readloop lock:  863us
Taking Writeloop lock: 68us
Taking Writeloop lock: 0us
Taking Writeloop lock: 0us
Taking readloop lock:  908us
Taking Writeloop lock: 69us
Taking Writeloop lock: 0us
Taking Writeloop lock: 0us
Taking readloop lock:  810us
Taking Writeloop lock: 64us
Taking Writeloop lock: 0us
Taking Writeloop lock: 0us
Taking readloop lock:  885us
Taking Writeloop lock: 72us
Taking Writeloop lock: 0us
Taking Writeloop lock: 0us

Modified code for better understanding of the measurements above:

    async fn read_loop(
        name: String,
        bytes_received: Arc<AtomicUsize>,
        net_conn: Arc<dyn Conn + Send + Sync>,
        mut close_loop_ch: broadcast::Receiver<()>,
        association_internal: Arc<Mutex<AssociationInternal>>,
    ) {
        log::debug!("[{}] read_loop entered", name);

        let mut buffer = vec![0u8; RECEIVE_MTU];
        let mut done = false;
        let mut n;
        while !done {
            tokio::select! {
                _ = close_loop_ch.recv() => break,
                result = net_conn.recv(&mut buffer) => {
                    match result {
                        Ok(m) => {
                            n=m;
                        }
                        Err(err) => {
                            log::warn!("[{}] failed to read packets on net_conn: {}", name, err);
                            break;
                        }
                    }
                }
            };

            // Make a buffer sized to what we read, then copy the data we
            // read from the underlying transport. We do this because the
            // user data is passed to the reassembly queue without
            // copying.
            log::debug!("[{}] recving {} bytes", name, n);
            let inbound = Bytes::from(buffer[..n].to_vec());
            bytes_received.fetch_add(n, Ordering::SeqCst);

            {
                let x = std::time::Instant::now();
                let mut ai = association_internal.lock().await;
                if inbound.len() < 1200 {
                    eprintln!("Taking readloop lock:  {}us", x.elapsed().as_micros());
                }

                let x = std::time::Instant::now();
                if let Err(err) = ai.handle_inbound(&inbound).await {
                    log::warn!("[{}] failed to handle_inbound: {:?}", name, err);
                    done = true;
                }
                if inbound.len() < 1200 {
                    eprintln!("readloop lock occupied:  {}us", x.elapsed().as_micros());
                }
            }
        }

        {
            let mut ai = association_internal.lock().await;
            if let Err(err) = ai.close().await {
                log::warn!("[{}] failed to close association: {:?}", name, err);
            }
        }

        log::debug!("[{}] read_loop exited", name);
    }

    async fn write_loop(
        name: String,
        bytes_sent: Arc<AtomicUsize>,
        net_conn: Arc<dyn Conn + Send + Sync>,
        mut close_loop_ch: broadcast::Receiver<()>,
        association_internal: Arc<Mutex<AssociationInternal>>,
        mut awake_write_loop_ch: mpsc::Receiver<()>,
    ) {
        log::debug!("[{}] write_loop entered", name);
        let mut done = false;
        while !done {
            //log::debug!("[{}] gather_outbound begin", name);
            let (raw_packets, mut ok) = {
                let x = std::time::Instant::now();
                let mut ai = association_internal.lock().await;
                eprintln!("Taking Writeloop lock: {}us", x.elapsed().as_micros());

                let x = std::time::Instant::now();
                let r = ai.gather_outbound().await;
                eprintln!("Writeloop lock occupied: {}us", x.elapsed().as_micros());
                r
            };
            //log::debug!("[{}] gather_outbound done with {}", name, raw_packets.len());

            for raw in &raw_packets {
                log::debug!("[{}] sending {} bytes", name, raw.len());
                if let Err(err) = net_conn.send(raw).await {
                    log::warn!("[{}] failed to write packets on net_conn: {}", name, err);
                    ok = false;
                    break;
                } else {
                    bytes_sent.fetch_add(raw.len(), Ordering::SeqCst);
                }
                //log::debug!("[{}] sending {} bytes done", name, raw.len());
            }

            if !ok {
                break;
            }

            if raw_packets.is_empty() {
                //log::debug!("[{}] wait awake_write_loop_ch", name);
                tokio::select! {
                    _ = awake_write_loop_ch.recv() =>{}
                    _ = close_loop_ch.recv() => {
                        done = true;
                    }
                };
            }
            //log::debug!("[{}] wait awake_write_loop_ch done", name);
        }

        {
            let mut ai = association_internal.lock().await;
            if let Err(err) = ai.close().await {
                log::warn!("[{}] failed to close association: {:?}", name, err);
            }
        }

        log::debug!("[{}] write_loop exited", name);
    }

Edit: My current suspicion is, that the marshalling of the packets is the culprit, because it is done under while holding the lock. I think it is easy enough to fix this and maybe performance will be good enough without bigger changes to the architecture

@KillingSpark
Copy link
Contributor

KillingSpark commented Dec 13, 2022

Ok so my suspicion was right. Pulling the marshalling our from under the lock reduces the time the mutex is locked in total drastically...but... Even if we trick tokio into not running the read_loop and write_loop on the same thread we see something unfortunate:

sender Writeloop body took: 7789us for 537836 Bytes   <------- mostly Packet::marshal() + sending over socket
sender Writeloop slept for: 0us                       <------- The write_loop is never ideling (as expected :))
sender Try taking writeloop lock
sender Taking Writeloop lock: 3us
sender Try taking readloop lock on ThreadId(23)
sender Writeloop lock occupied: 615us                 <------- Mostly AssociationInternal::gather_outbound()
sender Run write body on ThreadId(5)                    
sender Taking readloop lock:  837us                   <------- Waiting on write_loop to release lock

537836 Bytes in 7789 microseconds is 69.050.712 Bytes/s and since the write_loop is not ideling, send performance is entirely bottlenecked by:

  1. Packet::marshal()
  2. AssociationInternal::gather_outbound() (which I hope is only this slow because the queue builds up to this ridiculous size?)
  3. sending over socket

I can put together a PR that provides the behaviour above, which would allow optimizations on Packet::marshal to result in immediate throughput benefits

Doing a few low hanging optimizations on the marshaling code gets me to this throughput

Throughput: 95550030 Bytes/s, 1458 pkts, 1458 loops

95MByte/s is still not great but it's something.

Changes to the write_loop

    async fn write_loop(
        name: String,
        bytes_sent: Arc<AtomicUsize>,
        net_conn: Arc<dyn Conn + Send + Sync>,
        mut close_loop_ch: broadcast::Receiver<()>,
        association_internal: Arc<Mutex<AssociationInternal>>,
        mut awake_write_loop_ch: mpsc::Receiver<()>,
    ) {
        log::debug!("[{}] write_loop entered", name);
        let mut done = false;
        while !done {
            //log::debug!("[{}] gather_outbound begin", name);
            let (raw_packets, mut ok) = {
                let x = std::time::Instant::now();
                eprintln!("{name} Try taking writeloop lock");
                let mut ai = association_internal.lock().await;
                eprintln!(
                    "{name} Taking Writeloop lock: {}us",
                    x.elapsed().as_micros()
                );

                let x = std::time::Instant::now();
                let r = ai.gather_outbound().await;
                drop(ai);
                eprintln!(
                    "{name} Writeloop lock occupied: {}us",
                    x.elapsed().as_micros()
                );
                r
            };

            let x = std::time::Instant::now();
            eprintln!("{name} Run write body on {:?}", std::thread::current().id());

            let name2 = name.clone();
            let net_conn = Arc::clone(&net_conn);
            let bytes_sent = Arc::clone(&bytes_sent);

            // THIS IS IMPORTANT
            // This task::spawn makes tokio spawn this on another thread, allowing the read_loop
            // to make progress while we send out this batch of packets
            let bytes_sent = tokio::task::spawn(async move {
                let mut b = 0;
                for raw in raw_packets {
                    let raw = raw.marshal().unwrap();
                    if let Err(err) = net_conn.send(raw.as_ref()).await {
                        ok = false;
                        break;
                    } else {
                        b += raw.len();
                        bytes_sent.fetch_add(raw.len(), Ordering::SeqCst);
                    }
                    //log::debug!("[{}] sending {} bytes done", name, raw.len());
                }
                b
                //log::debug!("[{}] gather_outbound done with {}", name, raw_packets.len());
            })
            .await
            .unwrap();

            if !ok {
                break;
            }

            eprintln!(
                "{name} Writeloop body took: {}us for {bytes_sent}Bytes",
                x.elapsed().as_micros()
            );
            //log::debug!("[{}] wait awake_write_loop_ch", name);

            let x = std::time::Instant::now();
            tokio::select! {
                _ = awake_write_loop_ch.recv() =>{}
                _ = close_loop_ch.recv() => {
                    done = true;
                }
            };
            eprintln!("{name} Writeloop slept for: {}us", x.elapsed().as_micros());
            //log::debug!("[{}] wait awake_write_loop_ch done", name);
        }

        {
            let mut ai = association_internal.lock().await;
            if let Err(err) = ai.close().await {
                log::warn!("[{}] failed to close association: {:?}", name, err);
            }
        }

        log::debug!("[{}] write_loop exited", name);
    }

@KillingSpark
Copy link
Contributor

KillingSpark commented Dec 13, 2022

TLDR for this whole issue:

  1. The lock around AssociationInternal is contended by the write and read loop
  2. If one of the loops is doing too much work in one iteration it chokes the other one, the write_loop is especially prone of doing this because it naturally batches its work
  3. This can be solved (or at least drastically improved) by pulling the marshalling code up into the write_loop, outside the critical section making the total time spent in the critical section significantly smaller
  4. After this improvement send performance is only being bottlenecked by two functions
    1. Packet::marshal() <- There are some low hanging optimizations here
    2. AssociationInternal::gather_outbound() <- This actually still chokes the read_loop so optimizing this is also important

Separately I noticed that the pending queue does not apply backpressure so if the sender continously sends faster than the connection is allowed to send this queue will grow indefinitely and in the long run cause an OOM.

Way forward

@rainliu @k0nserv First of all sorry for hijacking (and kinda spamming :D) an issue on your repo I hope that's ok. I got a bit carried away.

Do you have any objections to the changes I did in the write_loop (see comment directly above) ? Obviously the .unwrap()s need to be handled more gracefully.

If not I'd prepare two PRs one for the changes in the write_loop and one for the optimizations to the marshal code. And probably a new issue for the pending queue not applying back pressure.

@HsuJv
Copy link
Author

HsuJv commented Dec 14, 2022

Awesome work!
I've been a bit busy for a while and haven't had time to research this issue.
From 14MB/s to 95MB, it looks like quite a significant improvement
But we still seem to have a long way to go compared to webrtc-go.
Anyway, I appreciate your enthusiastic help on this issue, the problem I am currently facing seems to have been solved in this way for the most part.
Hope this change can be merged into releases

BRs

@KillingSpark
Copy link
Contributor

KillingSpark commented Dec 14, 2022

Even before the changes I could get ~70MB/s in release mode, but the changes I did to get to 95 where pretty small, I'd guess there is even more potential for optimization. Still a nice improvement :)

@k0nserv
Copy link
Member

k0nserv commented Dec 14, 2022

Excellent research! Sounds like your changes are promising @KillingSpark. Please roll it up into PRs for review

@KillingSpark
Copy link
Contributor

KillingSpark commented Dec 14, 2022

The three PRs combined have this effect for me:

Current master

Throughput: 58522755 Bytes/s, 893 pkts, 893 loops
Send 89902 pkts
Throughput: 60030060 Bytes/s, 916 pkts, 916 loops
Send 90575 pkts
Throughput: 60161130 Bytes/s, 918 pkts, 918 loops
Send 89910 pkts
Throughput: 59374710 Bytes/s, 906 pkts, 906 loops
Send 89765 pkts
Throughput: 58850430 Bytes/s, 898 pkts, 898 loops

All three PRs combined

Throughput: 103938510 Bytes/s, 1586 pkts, 1586 loops
Send 83505 pkts
Throughput: 139196340 Bytes/s, 2124 pkts, 2124 loops
Send 80997 pkts
Throughput: 136247265 Bytes/s, 2079 pkts, 2079 loops
Send 83435 pkts
Throughput: 136706010 Bytes/s, 2086 pkts, 2086 loops
Send 84264 pkts
Throughput: 140244900 Bytes/s, 2140 pkts, 2140 loops

Still not at pion levels but the next big botlleneck that exists now seems to be the CRC32 impementation.

@HsuJv If you want to try this out: I pushed a branch with the current state of the three PRs merged here: https://github.com/KillingSpark/webrtc/tree/merged

@k0nserv
Copy link
Member

k0nserv commented Dec 14, 2022

@KillingSpark is there a specific order of merging these that would be simpler than the others?

@k0nserv
Copy link
Member

k0nserv commented Dec 14, 2022

Also, can you please add changelog entries in your PRs?

@KillingSpark
Copy link
Contributor

Will do!

Order of merging shouldn't matter they should be independent.

But hold off from merging, I have one more idea on the write_loop behaviour I want to try.

@KillingSpark
Copy link
Contributor

KillingSpark commented Dec 14, 2022

Ok, changelogs have been done, my idea didn't work out, so you can merge if you want to

@HsuJv
Copy link
Author

HsuJv commented Dec 15, 2022

I tested with the release build in my WSL2 Arch (another pc), and the throughput of the SCTP increased from 140-200 MBps to 400-500 MBps, awesome!
But it seems the send queue still increases infinitely. Should we add some back pressure when it comes to fast-slow issues?

@KillingSpark
Copy link
Contributor

KillingSpark commented Dec 15, 2022

Definitely should do that. I have some ideas for that as well (and prototyping shows it increases throughput even more) but I wanted to get these PRs merged before starting another one :)

Edit: nevermind, found a good solution and couldn't wait

@k0nserv
Copy link
Member

k0nserv commented Dec 15, 2022

@melekes would you mind helping with some reviews? You have a more subject matter expertise than me in this area

@KillingSpark
Copy link
Contributor

KillingSpark commented Dec 16, 2022

I am a bit lost here to be honest.

If I use the optimized sctp marshalling the performance just tanks on my linux PC and I cannot figure out why. On the Mac this improves performance a lot.

But it isn't really about the scheduling either as far as I can tell.

tokio-console also doesn't reveal any glaring issues.

Does anyone have ideas? HsuJv mentioned that in WSL this brought big improvements too, so windows and mac seem to see an improvement but it seems very weird that on linux this would cause such a drastic performance decrease... In the end it is a pretty straight forward optimization...

If anyone wants to try I am just running the throughput example on my "merged" branch https://github.com/KillingSpark/webrtc/tree/merged where all PRs of this issue have been merged together. But trial and error has shown that the optimized marshalling causes the performance drop.

@HsuJv
Copy link
Author

HsuJv commented Dec 17, 2022

On my ubuntu 20.04 server (i7-7700 16G RAM)
The throughput increased from 200-400MB to around 700MB, but it jitters very sharply like this

istening 127.0.0.1:8888...
connecting 127.0.0.1:8888..
created a server
created a client
opened a stream
accepted a stream
Throughput: 15204120 Bytes/s, 232 pkts, 232 loops
Send 15505 pkts
Throughput: 127989855 Bytes/s, 1953 pkts, 1953 loops
Send 2089 pkts
Throughput: 23395995 Bytes/s, 357 pkts, 357 loops
Send 223 pkts
Throughput: 135788520 Bytes/s, 2072 pkts, 2072 loops
Send 2069 pkts
Throughput: 17170170 Bytes/s, 262 pkts, 262 loops
Send 265 pkts
Throughput: 43777380 Bytes/s, 668 pkts, 668 loops
Send 668 pkts
Throughput: 51117300 Bytes/s, 780 pkts, 780 loops
Send 780 pkts
Throughput: 132315165 Bytes/s, 2019 pkts, 2019 loops
Send 2015 pkts
Throughput: 139786155 Bytes/s, 2133 pkts, 2133 loops
Send 2137 pkts
Throughput: 3604425 Bytes/s, 55 pkts, 55 loops
Send 55 pkts
Throughput: 30801450 Bytes/s, 470 pkts, 470 loops
Send 470 pkts
Throughput: 107608470 Bytes/s, 1642 pkts, 1642 loops
Send 2063 pkts
Throughput: 88079040 Bytes/s, 1344 pkts, 1344 loops
Send 923 pkts

@KillingSpark
Copy link
Contributor

This is somewhat consistent with what I see on my machine though my spikes are much rarer. I am on a Ryzen 3800.

Does Tokio employe different scheduling depending on the OS?

@KillingSpark
Copy link
Contributor

Fixed it. @HsuJv could you try the current state of the merged branch one more time just to confirm this?

@HsuJv
Copy link
Author

HsuJv commented Dec 19, 2022

Yes it gets smoothly at around 130MB
BTW, In my last comment, the number was 20-40 MB & 70 MB. I did a wrong calculation.

accepted a stream
Throughput: 262140 Bytes/s, 4 pkts, 4 loops
Send 15279 pkts
Throughput: 95550030 Bytes/s, 1458 pkts, 1458 loops
Send 1850 pkts
Throughput: 130152510 Bytes/s, 1986 pkts, 1986 loops
Send 1981 pkts
Throughput: 131594280 Bytes/s, 2008 pkts, 2008 loops
Send 2008 pkts
Throughput: 132315165 Bytes/s, 2019 pkts, 2019 loops
Send 2030 pkts
Throughput: 131528745 Bytes/s, 2007 pkts, 2007 loops
Send 1995 pkts
Throughput: 131070000 Bytes/s, 2000 pkts, 2000 loops
Send 1996 pkts
Throughput: 129824835 Bytes/s, 1981 pkts, 1981 loops

(My server always runs some heavy tasks, so it is expected that the performance is lower than my WSL)

@melekes
Copy link
Contributor

melekes commented Dec 21, 2022

@HsuJv @KillingSpark thanks for looking into this 👍 fantastic work

melekes pushed a commit that referenced this issue Jan 2, 2023
As discussed in #360 the lock on the internal association is very contended in high send-bandwidth situations. This PR achieves two things:

1. Pull the marshalling of packets outside of the critical section thus reducing the time the lock is taken by the write loop
2. Schedule the marshalling and sending as a new tokio::task::Task which makes tokio schedule this on another thread allowing the read loop to make progress in parallel while the write loop is working on that
This in itself does not really increase the bandwidth but with improvements to the marshalling code itself gains can be head now, that were previously (at least partially) blocked by the lock contention.

It should also improve situations where both sides send a lot of data because then the write and read loops would both be very busy and fighting for the lock even more.
melekes pushed a commit that referenced this issue Jan 2, 2023
As discussed in #360 the marshalling code has become a bottle neck in high bandwidth sending situations. I found two places that had a big effect on the performance, the hot path for this situation is marshalling packets with exactly one data chunk in them.

After this PR the marshalling is largely dominated by the CRC32 calculation which is... not easy to speed up.
melekes pushed a commit that referenced this issue Jan 2, 2023
As discussed in #360 Gathering packets to send is a big chunk of the work the Association::write_loop is doing while in a critical section.

This PR improves the performance of this by making the payload queue more performant to push to.

Previously this did a full mergesort (O(n*log(n))) on all the in-flight TSNs, now it does a binary search (O(log(n)))
melekes pushed a commit that referenced this issue Jan 2, 2023
As discussed in #360 the pending queue can grow indefinitely if the sender writes packets faster than the association is able to transmit them.

This PR solves this by enforcing a limit on the pending queue. This blocks the sender until enough space is free.
@k0nserv
Copy link
Member

k0nserv commented Jan 4, 2023

Btw, worth keeping in mind how this work intersects with #136 and a whether this potential move to a sans-IO version of SCTP helps improve the performance problems.

In any case, excellent work on this 👍🏼

@KillingSpark
Copy link
Contributor

I think the only PR that really interferes with a sans-io implementation would be the changes in #367. Those would probably need to be lifted into the sctp-async part somehow. The other PRs should still be applicable to a sans-io refactored version.

Is this refactor still active? Because I would be interested in a pure rust sans-io sctp implementation to integrate in our own project. We currently use usrsctp via FFI which works but is not ideal.

@k0nserv
Copy link
Member

k0nserv commented Jan 4, 2023

We were discussing it Discord the other day and would like to move forward with switching out the implementation of SCTP with it. However, no one is committing to do that work at the moment so it would only happen when someone finds the time for it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants