-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
"Task has lost its waker" and freeze when sending large packets #92
Comments
I don't think this is a tws issue as we don't do anything with wakers but pass them along. It seems to instead be this old issue about locking inside of I've been thinking about adding a true duplex API (created from two separate streams), but I don't know if that's something anyone wants considering you'd be unable to use TLS1, see rustls/rustls#288 about its challenges. Footnotes
|
I tried without using use bytes::Bytes;
use futures::SinkExt;
use tokio_websockets::Message;
#[tokio::main]
async fn main() {
console_subscriber::init();
let mut ws = tokio_websockets::ClientBuilder::new()
.uri("ws://127.0.0.1:3000")
.unwrap()
.connect()
.await
.unwrap()
.0;
let len = std::env::args().nth(1).unwrap().parse().unwrap();
let x = Bytes::from(vec![0; len]);
let mut cnt = 0usize;
loop {
let x = Message::binary(x.clone());
ws.send(x).await.unwrap();
println!("sent {cnt}");
cnt += 1;
}
} EDIT: |
Also, |
You cannot just send data to a remote server which sends messages back and never read them. If I run your reproducer and check
Note the |
I think the issue might be a combination of that and how |
Ah, that makes a ton of sense, actually. And would be a strong reason for proper full-duplex streams in the future, once the TLS libraries support it. As a workaround, try wrapping the call in a timeout and re-try later to give the receiver task some time? |
I seem to have mixed up two different issues, sorry. The
Here's the new reproducer: [package]
name = "tokio-ws-reproduce"
version = "0.1.0"
edition = "2021"
[dependencies]
bytes = "1.9.0"
console-subscriber = "0.4.1"
futures = { version = "0.3.31", default-features = false, features = ["std", "async-await"] }
parking_lot = "0.12.3"
tokio = { version = "1.43.0", features = ["full"] }
tokio-websockets = { version = "0.11.0", features = ["client", "simd", "sha1_smol", "rand", "native-tls"] } use std::{pin::Pin, sync::Arc, task::Poll};
use bytes::Bytes;
use futures::{future::join, Sink, SinkExt, Stream, StreamExt};
use parking_lot::FairMutex;
use tokio::net::TcpStream;
use tokio_websockets::{MaybeTlsStream, Message, WebSocketStream};
pub struct SplitRead(Arc<FairMutex<WebSocketStream<MaybeTlsStream<TcpStream>>>>);
impl Stream for SplitRead {
type Item = Result<Message, tokio_websockets::Error>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
self.0.lock().poll_next_unpin(cx)
}
}
pub struct SplitWrite(Arc<FairMutex<WebSocketStream<MaybeTlsStream<TcpStream>>>>);
impl Sink<Message> for SplitWrite {
type Error = tokio_websockets::Error;
fn poll_ready(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.0.lock().poll_ready_unpin(cx)
}
fn start_send(self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
self.0.lock().start_send_unpin(item)
}
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.0.lock().poll_flush_unpin(cx)
}
fn poll_close(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.0.lock().poll_close_unpin(cx)
}
}
#[tokio::main]
async fn main() {
console_subscriber::init();
let inner = Arc::new(FairMutex::new(
tokio_websockets::ClientBuilder::new()
.uri("ws://127.0.0.1:3000")
.unwrap()
.connect()
.await
.unwrap()
.0,
));
let mut rx = SplitRead(inner.clone());
let mut tx = SplitWrite(inner);
let len = std::env::args().nth(1).unwrap().parse().unwrap();
let a = tokio::spawn(async move {
let x = Bytes::from(vec![0; len]);
let mut cnt = 0usize;
loop {
let x = Message::binary(x.clone());
tx.send(x).await.unwrap();
println!("sent {cnt}");
cnt += 1;
}
});
let b = tokio::spawn(async move {
loop {
rx.next().await;
}
});
join(a, b).await.0.unwrap();
} |
Commenting this code seems to fix it as the reproducer hasn't locked up after a couple minutes of constant writing: tokio-websockets/src/proto/stream.rs Lines 164 to 167 in 3fe4767
The reader task calling writer functions on the inner stream while the writer task was writing made the inner stream replace and drop the waker for the writer task. I'm not sure what the proper solution for this is, though. |
Right... Because This quote and discussion from
|
Can you try if e8d517e fixes the problem for you? |
Yes, that commit fixes the problem. |
Release version 0.11.1 includes the fix, thanks for reporting! |
When sending large packets very quickly, there is a chance the future completely locks up and loses its waker according to
tokio-console
andconsole-subscriber
.I am using
rustc 1.86.0-nightly (1e9b0177d 2025-01-24)
Reproduction code:
Run the echo-server example from this repo (commit b8b3418):
cargo r -r --example echo_server -F server,sha1_smol
Then run the code:
RUSTFLAGS='--cfg tokio_unstable' cargo r -r -- 51205
On my machine, it locks up after around 100-150 messages with a size of
51205
. It also locks up with smaller message sizes, but not quickly or reliably.Once it locks up, check tokio-console:


The task location matches up with the
tokio::spawn
for the writer task.The text was updated successfully, but these errors were encountered: