From 990892b09b60e11ed4d6cb4fb82557de8980ba1a Mon Sep 17 00:00:00 2001 From: Sehyo Chang Date: Wed, 6 Sep 2023 17:11:22 +0000 Subject: [PATCH] feature: add nonzero copy option (#3519) Adding an option to disable zero-copy for sending file slices to the socket. This is needed in the case where sending to a TLS socket where a zero copy doesn't work. --- Cargo.lock | 2 + crates/fluvio-socket/Cargo.toml | 10 +- crates/fluvio-socket/src/sink.rs | 344 ++++++++++++++++++++++--------- 3 files changed, 259 insertions(+), 97 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2199f35c91..046f819032 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3044,7 +3044,9 @@ dependencies = [ "event-listener", "fluvio-future", "fluvio-protocol", + "fluvio-types", "futures-util", + "nix 0.27.1", "once_cell", "pin-project", "portpicker", diff --git a/crates/fluvio-socket/Cargo.toml b/crates/fluvio-socket/Cargo.toml index b8d8ac2a75..1035e56608 100644 --- a/crates/fluvio-socket/Cargo.toml +++ b/crates/fluvio-socket/Cargo.toml @@ -30,6 +30,7 @@ async-trait = { workspace = true } pin-project = { workspace = true } thiserror = { workspace = true } semver = { workspace = true } +nix = { workspace = true, features = ["uio"]} # Fluvio dependencies fluvio-future = { workspace = true, features = ["net", "task", "retry"] } @@ -41,16 +42,19 @@ fluvio-protocol = { workspace = true, features = [ ] } [dev-dependencies] +portpicker = { workspace = true } +[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] +async-net = { workspace = true } + fluvio-future = { workspace = true, features = [ "fixture", "fs", "zero_copy", "native2_tls", ] } -portpicker = { workspace = true } +fluvio-types = { workspace = true, features = ["events"] } + -[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] -async-net = { workspace = true } [build-dependencies] built = "0.6.0" diff --git a/crates/fluvio-socket/src/sink.rs b/crates/fluvio-socket/src/sink.rs index 8d6976b296..21ee5ea98c 100644 --- a/crates/fluvio-socket/src/sink.rs +++ b/crates/fluvio-socket/src/sink.rs @@ -22,6 +22,7 @@ type SinkFrame = FramedWrite, FluvioCodec>; pub struct FluvioSink { inner: SinkFrame, fd: ConnectionFd, + enable_zero_copy: bool, } impl fmt::Debug for FluvioSink { @@ -49,10 +50,16 @@ impl FluvioSink { pub fn new(sink: BoxWriteConnection, fd: ConnectionFd) -> Self { Self { fd, + enable_zero_copy: true, inner: SinkFrame::new(sink.compat_write(), FluvioCodec::new()), } } + /// don't use zero copy + pub fn disable_zerocopy(&mut self) { + self.enable_zero_copy = false; + } + /// as client, send request to server #[instrument(level = "trace",skip(req_msg),fields(req=?req_msg))] pub async fn send_request(&mut self, req_msg: &RequestMessage) -> Result<(), SocketError> @@ -99,9 +106,12 @@ mod file { use std::io::Error as IoError; use std::io::ErrorKind; + use std::os::fd::BorrowedFd; use bytes::BytesMut; + use fluvio_future::task::spawn_blocking; use futures_util::AsyncWriteExt; + use nix::sys::uio::pread; use fluvio_protocol::store::{FileWrite, StoreValue}; use fluvio_future::zero_copy::ZeroCopy; @@ -159,24 +169,254 @@ mod file { f_slice.position(), f_slice.len() ); - let writer = ZeroCopy::raw(self.fd); - let bytes_written = - writer.copy_slice(&f_slice).await.map_err(|err| { - IoError::new( - ErrorKind::Other, - format!("zero copy failed: {err}"), - ) - })?; - trace!("finish writing file slice with {bytes_written} bytes"); - total_bytes_written += bytes_written; + if self.enable_zero_copy { + let writer = ZeroCopy::raw(self.fd); + let bytes_written = + writer.copy_slice(&f_slice).await.map_err(|err| { + IoError::new( + ErrorKind::Other, + format!("zero copy failed: {err}"), + ) + })?; + trace!("finish writing file slice with {bytes_written} bytes"); + total_bytes_written += bytes_written; + } else { + let offset = f_slice.position() as i64; + + #[cfg(all(target_pointer_width = "32", target_env = "gnu"))] + let offset: i32 = offset.try_into().unwrap(); + + let in_fd = f_slice.fd(); + trace!( + in_fd, + offset, + len = f_slice.len(), + "reading from file slice" + ); + let (read_result, mut buf) = spawn_blocking(move || { + let mut buf = BytesMut::with_capacity(f_slice.len() as usize); + buf.resize(f_slice.len() as usize, 0); + let fd = unsafe { BorrowedFd::borrow_raw(in_fd) }; + let read_size = pread(fd, &mut buf, offset).map_err(|err| { + IoError::new( + ErrorKind::Other, + format!("pread failed: {err}"), + ) + }); + (read_size, buf) + }) + .await; + + let read = read_result?; + buf.resize(read, 0); + + trace!(read, in_fd, buf_len = buf.len(), "status from file slice"); + + // write to socket + self.get_mut_tcp_sink() + .get_mut() + .get_mut() + .write_all(&buf) + .await?; + + total_bytes_written += read; + } } } } } + trace!(total_bytes_written, "finish writing store values"); Ok(total_bytes_written) } } + + #[cfg(test)] + mod tests { + + use std::io::Cursor; + use std::io::ErrorKind; + use std::sync::Arc; + use std::time::Duration; + use std::io::Error as IoError; + + use async_net::TcpListener; + use bytes::Buf; + use bytes::BufMut; + use bytes::BytesMut; + use futures_util::AsyncWriteExt; + use futures_util::future::join; + use futures_util::StreamExt; + use tracing::debug; + + use fluvio_future::file_slice::AsyncFileSlice; + use fluvio_protocol::Version; + use fluvio_protocol::store::FileWrite; + use fluvio_protocol::store::StoreValue; + use fluvio_future::fs::util; + use fluvio_future::fs::AsyncFileExtension; + use fluvio_future::timer::sleep; + use fluvio_protocol::{Decoder, Encoder}; + use fluvio_types::event::StickyEvent; + + use crate::FluvioSocket; + use crate::SocketError; + + // slice that outputs to socket with len and slice + #[derive(Debug, Default)] + struct SliceWrapper(AsyncFileSlice); + + impl SliceWrapper { + pub fn len(&self) -> usize { + self.0.len() as usize + } + + pub fn raw_slice(&self) -> AsyncFileSlice { + self.0.clone() + } + } + + impl Encoder for SliceWrapper { + fn write_size(&self, _version: Version) -> usize { + self.len() + 4 // include header + } + + fn encode(&self, src: &mut T, version: Version) -> Result<(), IoError> + where + T: BufMut, + { + // can only encode zero length + if self.len() == 0 { + let len: u32 = 0; + len.encode(src, version) + } else { + Err(IoError::new( + ErrorKind::InvalidInput, + format!("len {} is not zeo", self.len()), + )) + } + } + } + + impl Decoder for SliceWrapper { + fn decode(&mut self, _src: &mut T, _version: Version) -> Result<(), IoError> + where + T: Buf, + { + unimplemented!("file slice cannot be decoded in the ButMut") + } + } + + impl FileWrite for SliceWrapper { + fn file_encode( + &self, + _dest: &mut BytesMut, + data: &mut Vec, + _version: Version, + ) -> Result<(), IoError> { + // just push slice + data.push(StoreValue::FileSlice(self.raw_slice())); + Ok(()) + } + } + + async fn test_server( + addr: &str, + end: Arc, + disable_zc: bool, + ) -> Result<(), SocketError> { + let listener = TcpListener::bind(&addr).await.expect("bind"); + debug!("server is running"); + let mut incoming = listener.incoming(); + + end.notify(); + let incoming_stream = incoming.next().await; + debug!("server: got connection"); + let incoming_stream = incoming_stream.expect("next").expect("unwrap again"); + let mut socket: FluvioSocket = incoming_stream.into(); + + let raw_tcp_sink = socket.get_mut_sink().get_mut_tcp_sink(); + + const TEXT_LEN: u16 = 5; + + // directly encode total buffer with is 4 + 2 + string + let mut out = vec![]; + let len: i32 = TEXT_LEN as i32 + 2; // msg plus file + len.encode(&mut out, 0).expect("encode"); // codec len + out.put_u16(TEXT_LEN); // string message len + + raw_tcp_sink.get_mut().get_mut().write_all(&out).await?; + + // send out file + debug!("server: sending out file contents"); + let data_file = util::open("tests/test.txt").await.expect("open file"); + let fslice = data_file.as_slice(0, None).await.expect("slice"); + assert_eq!(fslice.len(), 5); + let wrapper = SliceWrapper(fslice); + + let (mut sink, _stream) = socket.split(); + // output file slice + if disable_zc { + sink.disable_zerocopy(); + } + sink.encode_file_slices(&wrapper, 0).await.expect("encode"); + + debug!("server: hanging on client to test"); + // just in case if we need to keep it on + sleep(Duration::from_millis(500)).await; + debug!("server: finish"); + Ok(()) + } + + async fn setup_client(addr: &str, end: Arc) -> Result<(), SocketError> { + debug!("waiting for server to start"); + while !end.is_set() { + end.listen().await; + } + debug!("client: trying to connect"); + let mut socket = FluvioSocket::connect(addr).await.expect("connect"); + debug!("client: connect to test server and waiting for server to send out"); + let stream = socket.get_mut_stream(); + debug!("client: waiting for bytes"); + let next_value = stream.get_mut_tcp_stream().next().await; + debug!("client: got bytes"); + let bytes = next_value.expect("next").expect("bytes"); + assert_eq!(bytes.len(), 7); + debug!("decoding values"); + let mut src = Cursor::new(&bytes); + let mut msg1 = String::new(); + msg1.decode(&mut src, 0).expect("decode should work"); + assert_eq!(msg1, "hello"); + + Ok(()) + } + + #[fluvio_future::test] + async fn test_sink_zero_copy() { + let port = portpicker::pick_unused_port().expect("No free ports left"); + let addr = format!("127.0.0.1:{port}"); + + let send_event = StickyEvent::shared(); + let _r = join( + setup_client(&addr, send_event.clone()), + test_server(&addr, send_event, false), + ) + .await; + } + + #[fluvio_future::test] + async fn test_sink_buffer_copy() { + let port = portpicker::pick_unused_port().expect("No free ports left"); + let addr = format!("127.0.0.1:{port}"); + + let send_event = StickyEvent::shared(); + let _r = join( + setup_client(&addr, send_event.clone()), + test_server(&addr, send_event, true), + ) + .await; + } + } } /// Multi-thread aware Sink. Only allow sending request one a time. @@ -236,87 +476,3 @@ impl Clone for ExclusiveFlvSink { } } } - -#[cfg(test)] -mod tests { - - use std::io::Cursor; - use std::time::Duration; - - use async_net::TcpListener; - use bytes::BufMut; - use futures_util::future::join; - use futures_util::StreamExt; - use futures_util::io::AsyncWriteExt; - use tracing::debug; - use tracing::info; - - use crate::FluvioSocket; - use crate::SocketError; - use fluvio_future::fs::util; - use fluvio_future::fs::AsyncFileExtension; - use fluvio_future::timer::sleep; - use fluvio_future::zero_copy::ZeroCopy; - use fluvio_protocol::{Decoder, Encoder}; - - async fn test_server(addr: &str) -> Result<(), SocketError> { - let listener = TcpListener::bind(&addr).await.expect("bind"); - debug!("server is running"); - let mut incoming = listener.incoming(); - let incoming_stream = incoming.next().await; - debug!("server: got connection"); - let incoming_stream = incoming_stream.expect("next").expect("unwrap again"); - let mut socket: FluvioSocket = incoming_stream.into(); - let raw_tcp_sink = socket.get_mut_sink().get_mut_tcp_sink(); - - const TEXT_LEN: u16 = 5; - - // encode text file length as string - let mut out = vec![]; - let len: i32 = TEXT_LEN as i32 + 2; // msg plus file - len.encode(&mut out, 0).expect("encode"); // codec len - out.put_u16(TEXT_LEN); // string message len - - raw_tcp_sink.get_mut().get_mut().write_all(&out).await?; - - // send out file - debug!("sending out file contents"); - let data_file = util::open("tests/test.txt").await.expect("open file"); - let fslice = data_file.as_slice(0, None).await.expect("slice"); - - let zerocopy = ZeroCopy::raw(socket.get_mut_sink().fd); - zerocopy.copy_slice(&fslice).await.expect("zero copy"); - - // just in case if we need to keep it on - sleep(Duration::from_millis(200)).await; - debug!("server: finish sending out"); - Ok(()) - } - - async fn setup_client(addr: &str) -> Result<(), SocketError> { - sleep(Duration::from_millis(50)).await; - debug!("client: trying to connect"); - let mut socket = FluvioSocket::connect(addr).await.expect("connect"); - info!("client: connect to test server and waiting..."); - let stream = socket.get_mut_stream(); - let next_value = stream.get_mut_tcp_stream().next().await; - debug!("client: got bytes"); - let bytes = next_value.expect("next").expect("bytes"); - assert_eq!(bytes.len(), 7); - debug!("decoding values"); - let mut src = Cursor::new(&bytes); - let mut msg1 = String::new(); - msg1.decode(&mut src, 0).expect("decode should work"); - assert_eq!(msg1, "hello"); - - Ok(()) - } - - #[fluvio_future::test] - async fn test_sink_copy() { - let port = portpicker::pick_unused_port().expect("No free ports left"); - let addr = format!("127.0.0.1:{port}"); - - let _r = join(setup_client(&addr), test_server(&addr)).await; - } -}