From a1d72d202053fa9d7eda1730f45feb25ee75a235 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Wed, 14 Jun 2023 21:56:12 +0200 Subject: [PATCH 1/4] add more debugging to TLS acceptor --- src/hp/derp/http/server.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/hp/derp/http/server.rs b/src/hp/derp/http/server.rs index 415a8e26e2..849ff3e2fd 100644 --- a/src/hp/derp/http/server.rs +++ b/src/hp/derp/http/server.rs @@ -514,6 +514,7 @@ impl DerpService { match tls_config { Some(tls_config) => self.tls_serve_connection(stream, tls_config).await, None => { + debug!("HTTP: serve connection"); self.serve_connection(MaybeTlsStreamServer::Plain(stream)) .await } @@ -526,18 +527,25 @@ impl DerpService { match acceptor { TlsAcceptor::LetsEncrypt(a) => match a.accept(stream).await? { None => { - info!("received TLS-ALPN-01 validation request"); + info!("TLS[acme]: received TLS-ALPN-01 validation request"); } Some(start_handshake) => { - let tls_stream = start_handshake.into_stream(config).await?; + debug!("TLS[acme]: start handshake"); + let tls_stream = start_handshake + .into_stream(config) + .await + .context("TLS[acme] handshake")?; self.serve_connection(MaybeTlsStreamServer::Tls(tls_stream)) - .await?; + .await + .context("TLS[acme] serve connection")?; } }, TlsAcceptor::Manual(a) => { - let tls_stream = a.accept(stream).await?; + debug!("TLS[manual]: accept"); + let tls_stream = a.accept(stream).await.context("TLS[manual] accept")?; self.serve_connection(MaybeTlsStreamServer::Tls(tls_stream)) - .await?; + .await + .context("TLS[manual] serve connection")?; } } Ok(()) From 8daccefd32d0a898a3acb9f595a679032041ec51 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Wed, 14 Jun 2023 22:22:06 +0200 Subject: [PATCH 2/4] fix(derp) client conn: ensure flushing on shutdown --- src/hp/derp/client_conn.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/hp/derp/client_conn.rs b/src/hp/derp/client_conn.rs index 4a92624f32..d451416dce 100644 --- a/src/hp/derp/client_conn.rs +++ b/src/hp/derp/client_conn.rs @@ -294,9 +294,11 @@ where loop { trace!("tick"); tokio::select! { + biased; + _ = done.cancelled() => { trace!("cancelled"); - return Ok(()); + break; } read_res = read_frame(&mut self.io, MAX_FRAME_SIZE, &mut read_buf) => { self.handle_read(read_res, &mut read_buf).await?; @@ -335,6 +337,8 @@ where // refactor to get something similar self.io.flush().await?; } + + Ok(()) } /// Send `FrameType::KeepAlive`, does not flush From d8a08cb883d270ee476904eabf16ec3722af25b3 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Wed, 14 Jun 2023 22:23:20 +0200 Subject: [PATCH 3/4] actual fix.. --- src/hp/derp/client_conn.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/hp/derp/client_conn.rs b/src/hp/derp/client_conn.rs index d451416dce..84fd197c02 100644 --- a/src/hp/derp/client_conn.rs +++ b/src/hp/derp/client_conn.rs @@ -298,7 +298,9 @@ where _ = done.cancelled() => { trace!("cancelled"); - break; + // final flush + self.io.flush().await?; + return Ok(()); } read_res = read_frame(&mut self.io, MAX_FRAME_SIZE, &mut read_buf) => { self.handle_read(read_res, &mut read_buf).await?; @@ -337,8 +339,6 @@ where // refactor to get something similar self.io.flush().await?; } - - Ok(()) } /// Send `FrameType::KeepAlive`, does not flush From e1f604693dc219c4fcfaad83dc62040960f1ef88 Mon Sep 17 00:00:00 2001 From: Kasey Date: Fri, 16 Jun 2023 03:12:26 -0400 Subject: [PATCH 4/4] `ClientConn` task should close on EOF --- src/hp/derp/client_conn.rs | 10 ++-------- src/hp/derp/http/server.rs | 31 ++++++++++++++++++------------- 2 files changed, 20 insertions(+), 21 deletions(-) diff --git a/src/hp/derp/client_conn.rs b/src/hp/derp/client_conn.rs index 84fd197c02..f9eeeb57d9 100644 --- a/src/hp/derp/client_conn.rs +++ b/src/hp/derp/client_conn.rs @@ -303,6 +303,7 @@ where return Ok(()); } read_res = read_frame(&mut self.io, MAX_FRAME_SIZE, &mut read_buf) => { + trace!("handle read"); self.handle_read(read_res, &mut read_buf).await?; } peer = self.peer_gone.recv() => { @@ -500,14 +501,7 @@ where } Ok(()) } - Err(err) => { - if let Some(io_err) = err.downcast_ref::() { - if io_err.kind() == std::io::ErrorKind::UnexpectedEof { - return Ok(()); - } - } - Err(err) - } + Err(err) => Err(err), } } diff --git a/src/hp/derp/http/server.rs b/src/hp/derp/http/server.rs index 849ff3e2fd..a510093c7c 100644 --- a/src/hp/derp/http/server.rs +++ b/src/hp/derp/http/server.rs @@ -365,21 +365,26 @@ where // Note: This can't possibly be fulfilled until the 101 response // is returned below, so it's better to spawn this future instead // waiting for it to complete to then return a response. - tokio::task::spawn(async move { - match hyper::upgrade::on(&mut req).await { - Ok(upgraded) => { - if let Err(e) = - derp_connection_handler(&closure_conn_handler, upgraded).await - { - tracing::warn!( - "server \"{HTTP_UPGRADE_PROTOCOL}\" io error: {:?}", - e - ) - }; + tokio::task::spawn( + async move { + match hyper::upgrade::on(&mut req).await { + Ok(upgraded) => { + if let Err(e) = + derp_connection_handler(&closure_conn_handler, upgraded).await + { + tracing::warn!( + "server \"{HTTP_UPGRADE_PROTOCOL}\" io error: {:?}", + e + ); + } else { + tracing::info!("server \"{HTTP_UPGRADE_PROTOCOL}\" success"); + }; + } + Err(e) => tracing::warn!("upgrade error: {:?}", e), } - Err(e) => tracing::warn!("upgrade error: {:?}", e), } - }); + .instrument(tracing::debug_span!("derp_connection_handler")), + ); // Now return a 101 Response saying we agree to the upgrade to the // HTTP_UPGRADE_PROTOCOL