diff --git a/Cargo.lock b/Cargo.lock index ccc45daf7..e33c6f1ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1325,15 +1325,6 @@ version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" -[[package]] -name = "encoding_rs" -version = "0.8.34" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b45de904aa0b010bce2ab45264d0631681847fa7b6f2eaa7dab7619943bc4f59" -dependencies = [ - "cfg-if", -] - [[package]] name = "equivalent" version = "1.0.1" @@ -1883,6 +1874,7 @@ dependencies = [ "tokio", "tokio-rustls", "tower-service", + "webpki-roots", ] [[package]] @@ -1911,22 +1903,6 @@ dependencies = [ "tokio-native-tls", ] -[[package]] -name = "hyper-tls" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" -dependencies = [ - "bytes", - "http-body-util", - "hyper 1.4.1", - "hyper-util", - "native-tls", - "tokio", - "tokio-native-tls", - "tower-service", -] - [[package]] name = "hyper-util" version = "0.1.7" @@ -1988,7 +1964,7 @@ dependencies = [ [[package]] name = "iggy" -version = "0.6.15" +version = "0.6.16" dependencies = [ "aes-gcm", "ahash 0.8.11", @@ -2014,7 +1990,6 @@ dependencies = [ "humantime", "keyring", "lazy_static", - "openssl", "passterm", "quinn", "regex", @@ -3304,38 +3279,37 @@ checksum = "f8f4955649ef5c38cc7f9e8aa41761d48fb9677197daea9984dc54f56aad5e63" dependencies = [ "base64 0.22.1", "bytes", - "encoding_rs", "futures-core", "futures-util", - "h2", "http 1.1.0", "http-body 1.0.1", "http-body-util", "hyper 1.4.1", "hyper-rustls", - "hyper-tls 0.6.0", "hyper-util", "ipnet", "js-sys", "log", "mime", - "native-tls", "once_cell", "percent-encoding", "pin-project-lite", + "quinn", + "rustls", "rustls-pemfile", + "rustls-pki-types", "serde", "serde_json", "serde_urlencoded", "sync_wrapper 1.0.1", - "system-configuration", "tokio", - "tokio-native-tls", + "tokio-rustls", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", "web-sys", + "webpki-roots", "windows-registry", ] @@ -3477,7 +3451,7 @@ dependencies = [ "hmac", "http 0.2.12", "hyper 0.14.30", - "hyper-tls 0.5.0", + "hyper-tls", "log", "maybe-async", "md5", @@ -3889,6 +3863,7 @@ dependencies = [ "jsonwebtoken", "log", "moka", + "openssl", "prometheus-client", "quinn", "rcgen", @@ -4174,27 +4149,6 @@ dependencies = [ "windows 0.57.0", ] -[[package]] -name = "system-configuration" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" -dependencies = [ - "bitflags 2.6.0", - "core-foundation", - "system-configuration-sys", -] - -[[package]] -name = "system-configuration-sys" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4" -dependencies = [ - "core-foundation-sys", - "libc", -] - [[package]] name = "tagptr" version = "0.2.0" diff --git a/integration/tests/examples/mod.rs b/integration/tests/examples/mod.rs index cd29ad148..b8739dce3 100644 --- a/integration/tests/examples/mod.rs +++ b/integration/tests/examples/mod.rs @@ -64,12 +64,10 @@ pub(crate) trait IggyExampleTestCase { ) { let server_addr = server.get_raw_tcp_addr().unwrap(); for stdout in [producer_stdout, consumer_stdout] { - assert!(stdout.contains( - format!("Iggy client is connecting to server: {}...", &server_addr).as_str() - )); - assert!(stdout.contains( - format!("Iggy client has connected to server: {}", &server_addr).as_str() - )); + assert!( + stdout.contains(format!("is connecting to server: {}...", &server_addr).as_str()) + ); + assert!(stdout.contains(format!("has connected to server: {}", &server_addr).as_str())); } } fn protocol(&self, server: &TestServer) -> Vec { diff --git a/sdk/Cargo.toml b/sdk/Cargo.toml index 0220fbb14..112d3a461 100644 --- a/sdk/Cargo.toml +++ b/sdk/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iggy" -version = "0.6.15" +version = "0.6.16" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2021" license = "MIT" @@ -39,11 +39,10 @@ futures-util = "0.3.30" humantime = "2.1.0" keyring = { version = "3.2.0", optional = true, features = ["sync-secret-service", "vendored"] } lazy_static = "1.4.0" -openssl = { version = "0.10.64", features = ["vendored"] } passterm = { version = "2.0.1", optional = true } quinn = { version = "0.11.5" } regex = "1.10.4" -reqwest = { version = "0.12.7", features = ["json"] } +reqwest = { version = "0.12.7", default-features = false, features = ["json", "rustls-tls"] } reqwest-middleware = { version = "0.3.2", features = ["json"] } reqwest-retry = "0.6.1" rustls = { version = "0.23.10", features = ["ring"] } diff --git a/sdk/src/clients/consumer.rs b/sdk/src/clients/consumer.rs index 8b920a312..43e18f301 100644 --- a/sdk/src/clients/consumer.rs +++ b/sdk/src/clients/consumer.rs @@ -397,7 +397,7 @@ impl IggyConsumer { continue; } - info!("Rejoining consumer group"); + info!("Rejoining consumer group: {consumer_name} for stream: {stream_id}, topic: {topic_id}..."); if let Err(error) = Self::initialize_consumer_group( client.clone(), create_consumer_group_if_not_exists, @@ -409,10 +409,10 @@ impl IggyConsumer { ) .await { - error!("Failed to join consumer group: {error}"); + error!("Failed to join consumer group: {consumer_name} for stream: {stream_id}, topic: {topic_id}. {error}"); continue; } - info!("Rejoined consumer group"); + info!("Rejoined consumer group: {consumer_name} for stream: {stream_id}, topic: {topic_id}"); can_poll.store(true, ORDERING); } DiagnosticEvent::SignedOut => { diff --git a/sdk/src/quic/client.rs b/sdk/src/quic/client.rs index 7d47fe530..ec6af0675 100644 --- a/sdk/src/quic/client.rs +++ b/sdk/src/quic/client.rs @@ -98,7 +98,10 @@ impl BinaryTransport for QuicClient { } self.disconnect().await?; - info!("Reconnecting to the server..."); + info!( + "Reconnecting to the server: {}, by client: {}", + self.config.server_address, self.config.client_address + ); self.connect().await?; self.send_raw(code, payload).await } @@ -281,7 +284,7 @@ impl QuicClient { return Err(IggyError::CannotEstablishConnection); } - connection = connection_result.unwrap(); + connection = connection_result?; remote_address = connection.remote_address(); break; } @@ -299,19 +302,25 @@ impl QuicClient { Ok(()) } AutoLogin::Enabled(credentials) => { - info!("{NAME} client is signing in..."); + info!( + "{NAME} client: {} is signing in...", + self.config.client_address + ); self.set_state(ClientState::Authenticating).await; match credentials { Credentials::UsernamePassword(username, password) => { self.login_user(username, password).await?; self.publish_event(DiagnosticEvent::SignedIn).await; - info!("{NAME} client has signed in with the user credentials, username: {username}",); + info!("{NAME} client: {} has signed in with the user credentials, username: {username}", self.config.client_address); Ok(()) } Credentials::PersonalAccessToken(token) => { self.login_with_personal_access_token(token).await?; self.publish_event(DiagnosticEvent::SignedIn).await; - info!("{NAME} client has signed in with a personal access token.",); + info!( + "{NAME} client: {} has signed in with a personal access token.", + self.config.client_address + ); Ok(()) } } @@ -324,24 +333,36 @@ impl QuicClient { return Ok(()); } - info!("{NAME} client is disconnecting from server..."); + info!( + "{NAME} client: {} is disconnecting from server...", + self.config.client_address + ); self.set_state(ClientState::Disconnected).await; self.connection.lock().await.take(); self.endpoint.wait_idle().await; self.publish_event(DiagnosticEvent::Disconnected).await; let now = IggyTimestamp::now(); - info!("{NAME} client has disconnected from server at: {now}."); + info!( + "{NAME} client: {} has disconnected from server at: {now}.", + self.config.client_address + ); Ok(()) } async fn send_raw(&self, code: u32, payload: Bytes) -> Result { match self.get_state().await { ClientState::Disconnected => { - trace!("Cannot send data. Client is not connected."); + trace!( + "Cannot send data. Client: {} is not connected.", + self.config.client_address + ); return Err(IggyError::NotConnected); } ClientState::Connecting => { - trace!("Cannot send data. Client is still connecting."); + trace!( + "Cannot send data. Client: {} is still connecting.", + self.config.client_address + ); return Err(IggyError::NotConnected); } _ => {} diff --git a/sdk/src/tcp/client.rs b/sdk/src/tcp/client.rs index 9ce909cce..14c4b9448 100644 --- a/sdk/src/tcp/client.rs +++ b/sdk/src/tcp/client.rs @@ -13,6 +13,7 @@ use async_broadcast::{broadcast, Receiver, Sender}; use async_trait::async_trait; use bytes::{Buf, BufMut, Bytes, BytesMut}; use std::fmt::Debug; +use std::net::SocketAddr; use std::str::FromStr; use std::sync::Arc; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}; @@ -35,6 +36,7 @@ pub struct TcpClient { pub(crate) stream: Mutex>>, pub(crate) config: Arc, pub(crate) state: Mutex, + client_address: Mutex>, events: (Sender, Receiver), connected_at: Mutex>, } @@ -51,14 +53,16 @@ pub(crate) trait ConnectionStream: Debug + Sync + Send { #[derive(Debug)] struct TcpConnectionStream { + client_address: SocketAddr, reader: BufReader, writer: BufWriter, } impl TcpConnectionStream { - pub fn new(stream: TcpStream) -> Self { + pub fn new(client_address: SocketAddr, stream: TcpStream) -> Self { let (reader, writer) = stream.into_split(); Self { + client_address, reader: BufReader::new(reader), writer: BufWriter::new(writer), } @@ -67,9 +71,19 @@ impl TcpConnectionStream { #[derive(Debug)] pub(crate) struct TcpTlsConnectionStream { + client_address: SocketAddr, stream: TlsStream, } +impl TcpTlsConnectionStream { + pub fn new(client_address: SocketAddr, stream: TlsStream) -> Self { + Self { + client_address, + stream, + } + } +} + unsafe impl Send for TcpConnectionStream {} unsafe impl Sync for TcpConnectionStream {} @@ -80,35 +94,65 @@ unsafe impl Sync for TcpTlsConnectionStream {} impl ConnectionStream for TcpConnectionStream { async fn read(&mut self, buf: &mut [u8]) -> Result { self.reader.read_exact(buf).await.map_err(|error| { - error!("Failed to read data from the TCP connection: {error}"); + error!( + "Failed to read data by client: {} from the TCP connection: {error}", + self.client_address + ); IggyError::from(error) }) } async fn write(&mut self, buf: &[u8]) -> Result<(), IggyError> { - Ok(self.writer.write_all(buf).await?) + self.writer.write_all(buf).await.map_err(|error| { + error!( + "Failed to write data by client: {} to the TCP connection: {error}", + self.client_address + ); + IggyError::from(error) + }) } async fn flush(&mut self) -> Result<(), IggyError> { - Ok(self.writer.flush().await?) + self.writer.flush().await.map_err(|error| { + error!( + "Failed to flush data by client: {} to the TCP connection: {error}", + self.client_address + ); + IggyError::from(error) + }) } } #[async_trait] impl ConnectionStream for TcpTlsConnectionStream { async fn read(&mut self, buf: &mut [u8]) -> Result { - self.stream.read_exact(buf).await.map_err(|error| { - error!("Failed to read data from the TCP connection: {error}"); + self.stream.read(buf).await.map_err(|error| { + error!( + "Failed to read data by client: {} from the TCP TLS connection: {error}", + self.client_address + ); IggyError::from(error) }) } async fn write(&mut self, buf: &[u8]) -> Result<(), IggyError> { - Ok(self.stream.write_all(buf).await?) + self.stream.write_all(buf).await.map_err(|error| { + error!( + "Failed to write data by client: {} to the TCP TLS connection: {error}", + self.client_address + ); + IggyError::from(error) + }) } async fn flush(&mut self) -> Result<(), IggyError> { - Ok(()) + self.stream.flush().await.map_err(|error| { + error!( + "Failed to flush data by client: {} to the TCP TLS connection: {error}", + self.client_address + ); + IggyError::from(error) + }) } } @@ -168,7 +212,15 @@ impl BinaryTransport for TcpClient { } self.disconnect().await?; - info!("Reconnecting to the server..."); + + { + let client_address = self.get_client_address_value().await; + info!( + "Reconnecting to the server: {} by client: {client_address}...", + self.config.server_address + ); + } + self.connect().await?; self.send_raw(code, payload).await } @@ -217,6 +269,7 @@ impl TcpClient { pub fn create(config: Arc) -> Result { Ok(Self { config, + client_address: Mutex::new(None), stream: Mutex::new(None), state: Mutex::new(ClientState::Disconnected), events: broadcast(1000), @@ -282,7 +335,8 @@ impl TcpClient { async fn connect(&self) -> Result<(), IggyError> { match self.get_state().await { ClientState::Connected | ClientState::Authenticating | ClientState::Authenticated => { - trace!("Client is already connected."); + let client_address = self.get_client_address_value().await; + trace!("Client: {client_address} is already connected."); return Ok(()); } ClientState::Connecting => { @@ -312,6 +366,7 @@ impl TcpClient { let mut retry_count = 0; let connection_stream: Box; let remote_address; + let client_address; loop { info!( "{NAME} client is connecting to server: {}...", @@ -354,11 +409,13 @@ impl TcpClient { return Err(IggyError::CannotEstablishConnection); } - let stream = connection.unwrap(); + let stream = connection?; + client_address = stream.local_addr()?; remote_address = stream.peer_addr()?; + self.client_address.lock().await.replace(client_address); if !tls_enabled { - connection_stream = Box::new(TcpConnectionStream::new(stream)); + connection_stream = Box::new(TcpConnectionStream::new(client_address, stream)); break; } @@ -379,12 +436,14 @@ impl TcpClient { IggyError::CannotEstablishConnection })?; - connection_stream = Box::new(TcpTlsConnectionStream { stream }); + connection_stream = Box::new(TcpTlsConnectionStream::new(client_address, stream)); break; } let now = IggyTimestamp::now(); - info!("{NAME} client has connected to server: {remote_address} at: {now}",); + info!( + "{NAME} client: {client_address} has connected to server: {remote_address} at: {now}", + ); self.stream.lock().await.replace(connection_stream); self.set_state(ClientState::Connected).await; self.connected_at.lock().await.replace(now); @@ -395,17 +454,17 @@ impl TcpClient { Ok(()) } AutoLogin::Enabled(credentials) => { - info!("{NAME} client is signing in..."); + info!("{NAME} client: {client_address} is signing in..."); self.set_state(ClientState::Authenticating).await; match credentials { Credentials::UsernamePassword(username, password) => { self.login_user(username, password).await?; - info!("{NAME} client has signed in with the user credentials, username: {username}",); + info!("{NAME} client: {client_address} has signed in with the user credentials, username: {username}",); Ok(()) } Credentials::PersonalAccessToken(token) => { self.login_with_personal_access_token(token).await?; - info!("{NAME} client has signed in with a personal access token.",); + info!("{NAME} client: {client_address} has signed in with a personal access token.",); Ok(()) } } @@ -418,12 +477,13 @@ impl TcpClient { return Ok(()); } - info!("{NAME} client is disconnecting from server..."); + let client_address = self.get_client_address_value().await; + info!("{NAME} client: {client_address} is disconnecting from server..."); self.set_state(ClientState::Disconnected).await; self.stream.lock().await.take(); self.publish_event(DiagnosticEvent::Disconnected).await; let now = IggyTimestamp::now(); - info!("{NAME} client has disconnected from server at: {now}."); + info!("{NAME} client: {client_address} has disconnected from server at: {now}."); Ok(()) } @@ -473,4 +533,13 @@ impl TcpClient { error!("Cannot send data. Client is not connected."); Err(IggyError::NotConnected) } + + async fn get_client_address_value(&self) -> String { + let client_address = self.client_address.lock().await; + if let Some(client_address) = &*client_address { + client_address.to_string() + } else { + "unknown".to_string() + } + } } diff --git a/server/Cargo.toml b/server/Cargo.toml index 4342c9841..b22229678 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -33,6 +33,7 @@ iggy = { path = "../sdk" } jsonwebtoken = "9.3.0" log = "0.4.20" moka = { version = "0.12.5", features = ["future"] } +openssl = { version = "0.10.66", features = ["vendored"] } prometheus-client = "0.22.2" quinn = { version = "0.11.5" } rcgen = "0.13.1" diff --git a/server/src/streaming/cache/buffer.rs b/server/src/streaming/cache/buffer.rs index 5b2b8016e..da48ddde5 100644 --- a/server/src/streaming/cache/buffer.rs +++ b/server/src/streaming/cache/buffer.rs @@ -87,11 +87,10 @@ where /// Extends the buffer with the given elements, and always adding the elements, /// even if it exceeds the memory limit. pub fn extend(&mut self, elements: impl IntoIterator) { - let elements = elements.into_iter().map(|element| { + let elements = elements.into_iter().inspect(|element| { let element_size = element.get_size_bytes() as u64; self.memory_tracker.increment_used_memory(element_size); self.current_size += element_size; - element }); self.buffer.extend(elements); } diff --git a/server/src/streaming/segments/storage.rs b/server/src/streaming/segments/storage.rs index 9283e0a54..04be8dd6e 100644 --- a/server/src/streaming/segments/storage.rs +++ b/server/src/streaming/segments/storage.rs @@ -318,19 +318,17 @@ impl SegmentStorage for FileSegmentStorage { let mut indexes = Vec::with_capacity(indexes_count); let mut reader = BufReader::with_capacity(BUF_READER_CAPACITY_BYTES, file); for idx_num in 0..indexes_count { - let offset = reader.read_u32_le().await.map_err(|error| { + let offset = reader.read_u32_le().await.inspect_err(|error| { error!( "Cannot read offset from index file for index number: {}. Error: {}", idx_num, &error - ); - error + ) })?; - let position = reader.read_u32_le().await.map_err(|error| { + let position = reader.read_u32_le().await.inspect_err(|error| { error!( "Cannot read position from index file for offset: {}. Error: {}", offset, &error - ); - error + ) })?; indexes.push(Index { relative_offset: offset, @@ -488,19 +486,17 @@ impl SegmentStorage for FileSegmentStorage { let mut indexes = Vec::with_capacity(indexes_count); let mut reader = BufReader::with_capacity(BUF_READER_CAPACITY_BYTES, file); for idx_num in 0..indexes_count { - let offset = reader.read_u32_le().await.map_err(|error| { + let offset = reader.read_u32_le().await.inspect_err(|error| { error!( "Cannot read offset from index file for offset: {}. Error: {}", idx_num, &error - ); - error + ) })?; - let timestamp = reader.read_u64().await.map_err(|error| { + let timestamp = reader.read_u64().await.inspect_err(|error| { error!( "Cannot read timestamp from index file for offset: {}. Error: {}", offset, &error - ); - error + ) })?; indexes.push(TimeIndex { relative_offset: offset,