From f805b5d3503e5ca0f85c22dabdeac3341618ec86 Mon Sep 17 00:00:00 2001 From: zonyitoo Date: Sat, 10 Apr 2021 09:07:52 +0800 Subject: [PATCH] Fixed bug, concatenated buffer kept available before first write successfully --- Cargo.lock | 6 +- Cargo.toml | 4 +- crates/shadowsocks-service/Cargo.toml | 4 +- crates/shadowsocks/Cargo.toml | 2 +- .../src/relay/tcprelay/proxy_stream/client.rs | 201 +++++++++++------- .../src/relay/tcprelay/proxy_stream/server.rs | 53 +++-- debian/changelog | 22 ++ homebrew/shadowsocks-rust.rb | 4 +- 8 files changed, 197 insertions(+), 99 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cb35a00ccb22..ff90c705623f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1470,7 +1470,7 @@ dependencies = [ [[package]] name = "shadowsocks" -version = "1.10.1" +version = "1.10.2" dependencies = [ "arc-swap 1.2.0", "async-trait", @@ -1514,7 +1514,7 @@ dependencies = [ [[package]] name = "shadowsocks-rust" -version = "1.10.4" +version = "1.10.5" dependencies = [ "byte_string", "byteorder", @@ -1535,7 +1535,7 @@ dependencies = [ [[package]] name = "shadowsocks-service" -version = "1.10.3" +version = "1.10.4" dependencies = [ "async-trait", "byte_string", diff --git a/Cargo.toml b/Cargo.toml index f97e329f2363..503fce01e305 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "shadowsocks-rust" -version = "1.10.4" +version = "1.10.5" authors = ["Shadowsocks Contributors"] description = "shadowsocks is a fast tunnel proxy that helps you bypass firewalls." repository = "https://github.com/shadowsocks/shadowsocks-rust" @@ -122,7 +122,7 @@ mimalloc = { version = "0.1", optional = true } tcmalloc = { version = "0.3", optional = true } jemallocator = { version = "0.3", optional = true } -shadowsocks-service = { version = "1.10.3", path = "./crates/shadowsocks-service" } +shadowsocks-service = { version = "1.10.4", path = "./crates/shadowsocks-service" } [target.'cfg(unix)'.dependencies] daemonize = "0.4" diff --git a/crates/shadowsocks-service/Cargo.toml b/crates/shadowsocks-service/Cargo.toml index e6a08643e226..af8de87ec466 100644 --- a/crates/shadowsocks-service/Cargo.toml +++ b/crates/shadowsocks-service/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "shadowsocks-service" -version = "1.10.3" +version = "1.10.4" authors = ["Shadowsocks Contributors"] description = "shadowsocks is a fast tunnel proxy that helps you bypass firewalls." repository = "https://github.com/shadowsocks/shadowsocks-rust" @@ -105,7 +105,7 @@ regex = "1.4" serde = { version = "1.0", features = ["derive"] } json5 = "0.3" -shadowsocks = { version = "1.10.1", path = "../shadowsocks" } +shadowsocks = { version = "1.10.2", path = "../shadowsocks" } strum = { version = "0.20", optional = true } strum_macros = { version = "0.20", optional = true } diff --git a/crates/shadowsocks/Cargo.toml b/crates/shadowsocks/Cargo.toml index 185273ac771a..d0f860985f17 100644 --- a/crates/shadowsocks/Cargo.toml +++ b/crates/shadowsocks/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "shadowsocks" -version = "1.10.1" +version = "1.10.2" authors = ["Shadowsocks Contributors"] description = "shadowsocks is a fast tunnel proxy that helps you bypass firewalls." repository = "https://github.com/shadowsocks/shadowsocks-rust" diff --git a/crates/shadowsocks/src/relay/tcprelay/proxy_stream/client.rs b/crates/shadowsocks/src/relay/tcprelay/proxy_stream/client.rs index e7e1462ccc80..2ebdd00cd0d8 100644 --- a/crates/shadowsocks/src/relay/tcprelay/proxy_stream/client.rs +++ b/crates/shadowsocks/src/relay/tcprelay/proxy_stream/client.rs @@ -10,6 +10,7 @@ use bytes::{BufMut, BytesMut}; use futures::ready; use log::trace; use once_cell::sync::Lazy; +use pin_project::pin_project; use tokio::{ io::{AsyncRead, AsyncWrite, ReadBuf}, net::TcpStream, @@ -26,10 +27,18 @@ use crate::{ }, }; +enum ProxyClientStreamWriteState { + Connect(Address), + Connecting(BytesMut), + Connected, +} + /// A stream for sending / receiving data stream from remote server via shadowsocks' proxy server +#[pin_project] pub struct ProxyClientStream { + #[pin] stream: CryptoStream, - addr: Option
, + state: ProxyClientStreamWriteState, context: SharedContext, } @@ -140,7 +149,7 @@ where ProxyClientStream { stream, - addr: Some(addr), + state: ProxyClientStreamWriteState::Connect(addr), context, } } @@ -166,9 +175,9 @@ where S: AsyncRead + AsyncWrite + Unpin, { #[inline] - fn poll_read(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { - let context = unsafe { &*(self.context.as_ref() as *const _) }; - self.stream.poll_read_decrypted(cx, context, buf) + fn poll_read(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { + let mut this = self.project(); + this.stream.poll_read_decrypted(cx, &this.context, buf) } } @@ -176,45 +185,61 @@ impl AsyncWrite for ProxyClientStream where S: AsyncRead + AsyncWrite + Unpin, { - fn poll_write(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll> { - match self.addr { - None => { - // For all subsequence calls, just proxy it to self.stream - return self.stream.poll_write_encrypted(cx, buf); - } - Some(ref addr) => { - let addr_length = addr.serialized_len(); - - let mut buffer = BytesMut::with_capacity(addr_length + buf.len()); - addr.write_to_buf(&mut buffer); - buffer.put_slice(buf); - - ready!(self.stream.poll_write_encrypted(cx, &buffer))?; - - // fallthrough. take the self.addr out + fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll> { + let mut this = self.project(); + + loop { + match this.state { + ProxyClientStreamWriteState::Connect(ref addr) => { + // Target Address should be sent with the first packet together, + // which would prevent from being detected by connection features. + + let addr_length = addr.serialized_len(); + + let mut buffer = BytesMut::with_capacity(addr_length + buf.len()); + addr.write_to_buf(&mut buffer); + buffer.put_slice(buf); + + // Save the concatenated buffer before it is written successfully. + // APIs require buffer to be kept alive before Poll::Ready + // + // Proactor APIs like IOCP on Windows, pointers of buffers have to be kept alive + // before IO completion. + *(this.state) = ProxyClientStreamWriteState::Connecting(buffer); + } + ProxyClientStreamWriteState::Connecting(ref buffer) => { + let n = ready!(this.stream.poll_write_encrypted(cx, &buffer))?; + + // In general, poll_write_encrypted should perform like write_all. + debug_assert!(n == buffer.len()); + + *(this.state) = ProxyClientStreamWriteState::Connected; + + // NOTE: + // poll_write will return Ok(0) if buf.len() == 0 + // But for the first call, this function will eventually send the handshake packet (IV/Salt + ADDR) to the remote address. + // + // https://github.com/shadowsocks/shadowsocks-rust/issues/232 + // + // For protocols that requires *Server Hello* message, like FTP, clients won't send anything to the server until server sends handshake messages. + // This could be achieved by calling poll_write with an empty input buffer. + return Ok(buf.len()).into(); + } + ProxyClientStreamWriteState::Connected => { + return this.stream.poll_write_encrypted(cx, buf); + } } } - - let _ = self.addr.take(); - - // NOTE: - // poll_write will return Ok(0) if buf.len() == 0 - // But for the first call, this function will eventually send the handshake packet (IV/Salt + ADDR) to the remote address. - // - // https://github.com/shadowsocks/shadowsocks-rust/issues/232 - // - // For protocols that requires *Server Hello* message, like FTP, clients won't send anything to the server until server sends handshake messages. - // This could be achieved by calling poll_write with an empty input buffer. - - Ok(buf.len()).into() } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { - self.stream.poll_flush(cx) + #[inline] + fn poll_flush(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + self.project().stream.poll_flush(cx) } - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { - self.stream.poll_shutdown(cx) + #[inline] + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + self.project().stream.poll_shutdown(cx) } } @@ -222,7 +247,13 @@ impl ProxyClientStream where S: AsyncRead + AsyncWrite + Unpin, { + /// Splits into reader and writer halves pub fn into_split(self) -> (ProxyClientStreamReadHalf, ProxyClientStreamWriteHalf) { + // Cannot split if stream is still pending + assert!( + !matches!(self.state, ProxyClientStreamWriteState::Connecting(..)), + "stream is pending on writing the first packet" + ); let (reader, writer) = self.stream.into_split(); ( ProxyClientStreamReadHalf { @@ -231,13 +262,16 @@ where }, ProxyClientStreamWriteHalf { writer, - addr: self.addr, + state: self.state, }, ) } } +/// Owned read half produced by `ProxyClientStream::into_split` +#[pin_project] pub struct ProxyClientStreamReadHalf { + #[pin] reader: CryptoStreamReadHalf, context: SharedContext, } @@ -247,53 +281,78 @@ where S: AsyncRead + Unpin, { #[inline] - fn poll_read(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { - let context = unsafe { &*(self.context.as_ref() as *const _) }; - self.reader.poll_read_decrypted(cx, context, buf) + fn poll_read(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { + let mut this = self.project(); + this.reader.poll_read_decrypted(cx, &this.context, buf) } } +/// Owned write half produced by `ProxyClientStream::into_split` +#[pin_project] pub struct ProxyClientStreamWriteHalf { + #[pin] writer: CryptoStreamWriteHalf, - addr: Option
, + state: ProxyClientStreamWriteState, } impl AsyncWrite for ProxyClientStreamWriteHalf where S: AsyncWrite + Unpin, { - fn poll_write(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll> { - if self.addr.is_none() { - // For all subsequence calls, just proxy it to self.writer - return self.writer.poll_write_encrypted(cx, buf); + fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll> { + let mut this = self.project(); + + loop { + match this.state { + ProxyClientStreamWriteState::Connect(ref addr) => { + // Target Address should be sent with the first packet together, + // which would prevent from being detected by connection features. + + let addr_length = addr.serialized_len(); + + let mut buffer = BytesMut::with_capacity(addr_length + buf.len()); + addr.write_to_buf(&mut buffer); + buffer.put_slice(buf); + + // Save the concatenated buffer before it is written successfully. + // APIs require buffer to be kept alive before Poll::Ready + // + // Proactor APIs like IOCP on Windows, pointers of buffers have to be kept alive + // before IO completion. + *(this.state) = ProxyClientStreamWriteState::Connecting(buffer); + } + ProxyClientStreamWriteState::Connecting(ref buffer) => { + let n = ready!(this.writer.poll_write_encrypted(cx, &buffer))?; + + // In general, poll_write_encrypted should perform like write_all. + debug_assert!(n == buffer.len()); + + *(this.state) = ProxyClientStreamWriteState::Connected; + + // NOTE: + // poll_write will return Ok(0) if buf.len() == 0 + // But for the first call, this function will eventually send the handshake packet (IV/Salt + ADDR) to the remote address. + // + // https://github.com/shadowsocks/shadowsocks-rust/issues/232 + // + // For protocols that requires *Server Hello* message, like FTP, clients won't send anything to the server until server sends handshake messages. + // This could be achieved by calling poll_write with an empty input buffer. + return Ok(buf.len()).into(); + } + ProxyClientStreamWriteState::Connected => { + return this.writer.poll_write_encrypted(cx, buf); + } + } } - - let addr = self.addr.take().unwrap(); - let addr_length = addr.serialized_len(); - - let mut buffer = BytesMut::with_capacity(addr_length + buf.len()); - addr.write_to_buf(&mut buffer); - buffer.put_slice(buf); - - ready!(self.writer.poll_write_encrypted(cx, &buffer))?; - - // NOTE: - // poll_write will return Ok(0) if buf.len() == 0 - // But for the first call, this function will eventually send the handshake packet (IV/Salt + ADDR) to the remote address. - // - // https://github.com/shadowsocks/shadowsocks-rust/issues/232 - // - // For protocols that requires *Server Hello* message, like FTP, clients won't send anything to the server until server sends handshake messages. - // This could be achieved by calling poll_write with an empty input buffer. - - Ok(buf.len()).into() } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { - self.writer.poll_flush(cx) + #[inline] + fn poll_flush(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + self.project().writer.poll_flush(cx) } - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { - self.writer.poll_shutdown(cx) + #[inline] + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + self.project().writer.poll_shutdown(cx) } } diff --git a/crates/shadowsocks/src/relay/tcprelay/proxy_stream/server.rs b/crates/shadowsocks/src/relay/tcprelay/proxy_stream/server.rs index 5d0cc5590603..91da0b5ccd96 100644 --- a/crates/shadowsocks/src/relay/tcprelay/proxy_stream/server.rs +++ b/crates/shadowsocks/src/relay/tcprelay/proxy_stream/server.rs @@ -6,6 +6,7 @@ use std::{ task::{self, Poll}, }; +use pin_project::pin_project; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use crate::{ @@ -15,7 +16,9 @@ use crate::{ }; /// A stream for communicating with shadowsocks' proxy client +#[pin_project] pub struct ProxyServerStream { + #[pin] stream: CryptoStream, context: SharedContext, } @@ -71,9 +74,10 @@ impl AsyncRead for ProxyServerStream where S: AsyncRead + AsyncWrite + Unpin, { - fn poll_read(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { - let context = unsafe { &*(self.context.as_ref() as *const _) }; - self.stream.poll_read_decrypted(cx, context, buf) + #[inline] + fn poll_read(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { + let mut this = self.project(); + this.stream.poll_read_decrypted(cx, &this.context, buf) } } @@ -81,20 +85,26 @@ impl AsyncWrite for ProxyServerStream where S: AsyncRead + AsyncWrite + Unpin, { - fn poll_write(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll> { - self.stream.poll_write_encrypted(cx, buf) + #[inline] + fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll> { + self.project().stream.poll_write_encrypted(cx, buf) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { - self.stream.poll_flush(cx) + #[inline] + fn poll_flush(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + self.project().stream.poll_flush(cx) } - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { - self.stream.poll_shutdown(cx) + #[inline] + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + self.project().stream.poll_shutdown(cx) } } +/// Owned read half produced by `ProxyServerStream::into_split` +#[pin_project] pub struct ProxyServerStreamReadHalf { + #[pin] reader: CryptoStreamReadHalf, context: SharedContext, } @@ -103,13 +113,17 @@ impl AsyncRead for ProxyServerStreamReadHalf where S: AsyncRead + Unpin, { - fn poll_read(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { - let context = unsafe { &*(self.context.as_ref() as *const _) }; - self.reader.poll_read_decrypted(cx, context, buf) + #[inline] + fn poll_read(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { + let mut this = self.project(); + this.reader.poll_read_decrypted(cx, &this.context, buf) } } +/// Owned write half produced by `ProxyServerStream::into_split` +#[pin_project] pub struct ProxyServerStreamWriteHalf { + #[pin] writer: CryptoStreamWriteHalf, } @@ -117,15 +131,18 @@ impl AsyncWrite for ProxyServerStreamWriteHalf where S: AsyncWrite + Unpin, { - fn poll_write(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll> { - self.writer.poll_write_encrypted(cx, buf) + #[inline] + fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll> { + self.project().writer.poll_write_encrypted(cx, buf) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { - self.writer.poll_flush(cx) + #[inline] + fn poll_flush(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + self.project().writer.poll_flush(cx) } - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { - self.writer.poll_shutdown(cx) + #[inline] + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + self.project().writer.poll_shutdown(cx) } } diff --git a/debian/changelog b/debian/changelog index cbf9a2a61689..4527a0c9ea59 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,25 @@ +shadowsocks-rust (1.10.5) unstable; urgency=medium + + ## BUG Fixed + + - `ProxyClientStream` should keep the concatenated first packet buffer alive before asynchronous `write()` finishes + +shadowsocks-rust (1.10.4) unstable; urgency=medium + + # Fixed BUG + + - `ProxyClientStream::poll_write` may lose the `Address` in the packet to be sent if socket returns `EAGAIN` + + # Features + + - Support `protocol` in basic configuration format + +shadowsocks-rust (1.10.3) unstable; urgency=medium + + ## BUG Fixed + + - #472 Fixed `SO_INCOMING_CPU` when building on some Linux targets. rust-lang/socket2#213 + shadowsocks-rust (1.10.2) unstable; urgency=medium ## BUG Fixed diff --git a/homebrew/shadowsocks-rust.rb b/homebrew/shadowsocks-rust.rb index a40ac48c374a..d9f15a137de1 100644 --- a/homebrew/shadowsocks-rust.rb +++ b/homebrew/shadowsocks-rust.rb @@ -1,8 +1,8 @@ class ShadowsocksRust < Formula desc "shadowsocks is a fast tunnel proxy that helps you bypass firewalls" homepage "https://github.com/shadowsocks/shadowsocks-rust" - url "https://github.com/shadowsocks/shadowsocks-rust/archive/refs/tags/v1.10.3.tar.gz" - version "1.10.3" + url "https://github.com/shadowsocks/shadowsocks-rust/archive/refs/tags/v1.10.5.tar.gz" + version "1.10.5" sha256 "00fb90b6f80d01c6b40f6cfeb49d70fbec9f659bfa268d6834e79fe1f670d55e" license "MIT"