diff --git a/.github/workflows/redis-rs.yml b/.github/workflows/redis-rs.yml index 5aca39e88f..88577586a1 100644 --- a/.github/workflows/redis-rs.yml +++ b/.github/workflows/redis-rs.yml @@ -65,16 +65,14 @@ jobs: working-directory: ./glide-core/redis-rs/redis - name: Test - run: # TODO remove the concurrency limit after we fix test flakyness. run: | - versions_to_compare=$(printf "%s\n" "${{ matrix.engine.version }}" "7.1" | sort -V) - # Sort versions and get the latest version between the current version and 7.1, if the latest version is 7.1, skip the test. - lt_version=$(echo "$versions_to_comare" | tail -n1) - if [[ "7.1" == "$lt_version" ]]; then - cargo test --release -- --test-threads=1 | tee ../test-results.xml + smallest_version=$(printf "%s\n" "${{ matrix.engine.version }}" "8.0" | sort -V | head -n1) + # Sort versions and get the smallest version between the current version and 8.0, if the version is small than 8.0, skip the test. + if [[ "8.0" == "$smallest_version" ]]; then + cargo test --release --features valkey-gte-8 -- --test-threads=1 | tee ../test-results.xml else - cargo test --release --features valkey-gte-7-2 -- --test-threads=1 | tee ../test-results.xml + cargo test --release -- --test-threads=1 | tee ../test-results.xml fi echo "### Tests passed :v:" >> $GITHUB_STEP_SUMMARY diff --git a/glide-core/redis-rs/redis-test/src/lib.rs b/glide-core/redis-rs/redis-test/src/lib.rs index cafe8a347b..78d6db2657 100644 --- a/glide-core/redis-rs/redis-test/src/lib.rs +++ b/glide-core/redis-rs/redis-test/src/lib.rs @@ -292,6 +292,12 @@ impl AioConnectionLike for MockRedisConnection { fn is_closed(&self) -> bool { false } + + fn get_az(&self) -> Option { + None + } + + fn set_az(&mut self, _az: Option) {} } #[cfg(test)] diff --git a/glide-core/redis-rs/redis/Cargo.toml b/glide-core/redis-rs/redis/Cargo.toml index d091c92d71..bec0d577dc 100644 --- a/glide-core/redis-rs/redis/Cargo.toml +++ b/glide-core/redis-rs/redis/Cargo.toml @@ -153,7 +153,7 @@ bigdecimal = ["dep:bigdecimal"] num-bigint = [] uuid = ["dep:uuid"] disable-client-setinfo = [] -valkey-gte-7-2 = [] +valkey-gte-8 = [] # Deprecated features tls = ["tls-native-tls"] # use "tls-native-tls" instead diff --git a/glide-core/redis-rs/redis/src/aio/connection.rs b/glide-core/redis-rs/redis/src/aio/connection.rs index 7f471f8ee5..317b24ae81 100644 --- a/glide-core/redis-rs/redis/src/aio/connection.rs +++ b/glide-core/redis-rs/redis/src/aio/connection.rs @@ -65,7 +65,7 @@ where pubsub: false, protocol: connection_info.protocol, }; - setup_connection(connection_info, &mut rv).await?; + setup_connection(connection_info, &mut rv, false).await?; Ok(rv) } @@ -260,6 +260,12 @@ where // always false for AsyncRead + AsyncWrite (cant do better) false } + + fn get_az(&self) -> Option { + None + } + + fn set_az(&mut self, _az: Option) {} } /// Represents a `PubSub` connection. diff --git a/glide-core/redis-rs/redis/src/aio/connection_manager.rs b/glide-core/redis-rs/redis/src/aio/connection_manager.rs index 02e6976d15..affcc225a9 100644 --- a/glide-core/redis-rs/redis/src/aio/connection_manager.rs +++ b/glide-core/redis-rs/redis/src/aio/connection_manager.rs @@ -309,4 +309,10 @@ impl ConnectionLike for ConnectionManager { // always return false due to automatic reconnect false } + + fn get_az(&self) -> Option { + None + } + + fn set_az(&mut self, _az: Option) {} } diff --git a/glide-core/redis-rs/redis/src/aio/mod.rs b/glide-core/redis-rs/redis/src/aio/mod.rs index 34c098d600..688e1a669b 100644 --- a/glide-core/redis-rs/redis/src/aio/mod.rs +++ b/glide-core/redis-rs/redis/src/aio/mod.rs @@ -3,7 +3,10 @@ use crate::cmd::{cmd, Cmd}; use crate::connection::{ get_resp3_hello_command_error, PubSubSubscriptionKind, RedisConnectionInfo, }; -use crate::types::{ErrorKind, ProtocolVersion, RedisFuture, RedisResult, Value}; +use crate::types::{ + ErrorKind, FromRedisValue, InfoDict, ProtocolVersion, RedisError, RedisFuture, RedisResult, + Value, +}; use crate::PushKind; use ::tokio::io::{AsyncRead, AsyncWrite}; use async_trait::async_trait; @@ -84,6 +87,12 @@ pub trait ConnectionLike { /// Returns the state of the connection fn is_closed(&self) -> bool; + + /// Get the connection availibility zone + fn get_az(&self) -> Option; + + /// Set the connection availibility zone + fn set_az(&mut self, az: Option); } /// Implements ability to notify about disconnection events @@ -105,8 +114,40 @@ impl Clone for Box { } } +// Helper function to extract and update availability zone from INFO command +async fn update_az_from_info(con: &mut C) -> RedisResult<()> +where + C: ConnectionLike, +{ + let info_res = con.req_packed_command(&cmd("INFO")).await; + + match info_res { + Ok(value) => { + let info_dict: InfoDict = FromRedisValue::from_redis_value(&value)?; + if let Some(node_az) = info_dict.get::("availability_zone") { + con.set_az(Some(node_az)); + } + Ok(()) + } + Err(e) => { + // Handle the error case for the INFO command + Err(RedisError::from(( + ErrorKind::ResponseError, + "Failed to execute INFO command. ", + format!("{:?}", e), + ))) + } + } +} + // Initial setup for every connection. -async fn setup_connection(connection_info: &RedisConnectionInfo, con: &mut C) -> RedisResult<()> +async fn setup_connection( + connection_info: &RedisConnectionInfo, + con: &mut C, + // This parameter is set to 'true' if ReadFromReplica strategy is set to AZAffinity. + // An INFO command will be triggered in the connection's setup to update the 'availability_zone' property. + discover_az: bool, +) -> RedisResult<()> where C: ConnectionLike, { @@ -181,6 +222,10 @@ where } } + if discover_az { + update_az_from_info(con).await?; + } + // result is ignored, as per the command's instructions. // https://redis.io/commands/client-setinfo/ #[cfg(not(feature = "disable-client-setinfo"))] diff --git a/glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs b/glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs index b31c817817..98b3667cc9 100644 --- a/glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs +++ b/glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs @@ -417,6 +417,7 @@ pub struct MultiplexedConnection { response_timeout: Duration, protocol: ProtocolVersion, push_manager: PushManager, + availability_zone: Option, password: Option, } @@ -479,11 +480,16 @@ impl MultiplexedConnection { .with_push_manager(pm) .with_protocol(connection_info.redis.protocol) .with_password(connection_info.redis.password.clone()) + .with_availability_zone(None) .build() .await?; let driver = { - let auth = setup_connection(&connection_info.redis, &mut con); + let auth = setup_connection( + &connection_info.redis, + &mut con, + glide_connection_options.discover_az, + ); futures_util::pin_mut!(auth); @@ -575,6 +581,11 @@ impl MultiplexedConnection { self.pipeline.set_push_manager(push_manager).await; } + /// For external visibilty (glide-core) + pub fn get_availability_zone(&self) -> Option { + self.availability_zone.clone() + } + /// Replace the password used to authenticate with the server. /// If `None` is provided, the password will be removed. pub async fn update_connection_password( @@ -599,6 +610,8 @@ pub struct MultiplexedConnectionBuilder { push_manager: Option, protocol: Option, password: Option, + /// Represents the node's availability zone + availability_zone: Option, } impl MultiplexedConnectionBuilder { @@ -611,6 +624,7 @@ impl MultiplexedConnectionBuilder { push_manager: None, protocol: None, password: None, + availability_zone: None, } } @@ -644,6 +658,12 @@ impl MultiplexedConnectionBuilder { self } + /// Sets the avazilability zone for the `MultiplexedConnectionBuilder`. + pub fn with_availability_zone(mut self, az: Option) -> Self { + self.availability_zone = az; + self + } + /// Builds and returns a new `MultiplexedConnection` instance using the configured settings. pub async fn build(self) -> RedisResult { let db = self.db.unwrap_or_default(); @@ -661,6 +681,7 @@ impl MultiplexedConnectionBuilder { push_manager, protocol, password, + availability_zone: self.availability_zone, }; Ok(con) @@ -688,6 +709,16 @@ impl ConnectionLike for MultiplexedConnection { fn is_closed(&self) -> bool { self.pipeline.is_closed() } + + /// Get the node's availability zone + fn get_az(&self) -> Option { + self.availability_zone.clone() + } + + /// Set the node's availability zone + fn set_az(&mut self, az: Option) { + self.availability_zone = az; + } } impl MultiplexedConnection { /// Subscribes to a new channel. diff --git a/glide-core/redis-rs/redis/src/client.rs b/glide-core/redis-rs/redis/src/client.rs index 81bac9ec9a..112fe8bd69 100644 --- a/glide-core/redis-rs/redis/src/client.rs +++ b/glide-core/redis-rs/redis/src/client.rs @@ -86,6 +86,9 @@ pub struct GlideConnectionOptions { #[cfg(feature = "aio")] /// Passive disconnect notifier pub disconnect_notifier: Option>, + /// If ReadFromReplica strategy is set to AZAffinity, this parameter will be set to 'true'. + /// In this case, an INFO command will be triggered in the connection's setup to update the connection's 'availability_zone' property. + pub discover_az: bool, } /// To enable async support you need to enable the feature: `tokio-comp` diff --git a/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs b/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs index eb9db88b3e..955d24d9e9 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs @@ -30,9 +30,9 @@ macro_rules! count_connections { pub struct ConnectionDetails { /// The actual connection pub conn: Connection, - /// The IP and AZ associated with the connection + /// The IP associated with the connection pub ip: Option, - /// The AZ associated with the connection + /// The availability zone associated with the connection pub az: Option, } @@ -207,6 +207,13 @@ where Telemetry::incr_total_connections(conn_count_after.saturating_sub(conn_count_before)); } + /// Returns the availability zone associated with the connection in address + pub(crate) fn az_for_address(&self, address: &str) -> Option { + self.connection_map + .get(address) + .map(|item| item.value().user_connection.az.clone())? + } + /// Returns true if the address represents a known primary node. pub(crate) fn is_primary(&self, address: &String) -> bool { self.connection_for_address(address).is_some() && self.slot_map.is_primary(address) @@ -240,7 +247,9 @@ where } } - pub(crate) fn round_robin_read_from_az_awareness_replica( + /// Returns the node's connection in the same availability zone as `client_az` in round robin strategy if exits, + /// if not, will fall back to any available replica or primary. + pub(crate) fn round_robin_read_from_replica_with_az_awareness( &self, slot_map_value: &SlotMapValue, client_az: String, @@ -251,27 +260,21 @@ where loop { retries = retries.saturating_add(1); - // Looped through all replicas; no connected replica found in the same AZ. + // Looped through all replicas; no connected replica found in the same availability zone. if retries > addrs.replicas().len() { - // Attempt a fallback to any available replica in other AZs. - for replica in &addrs.replicas() { - if let Some(connection) = self.connection_for_address(replica.as_str()) { - return Some(connection); - } - } - // Fallback to the primary if no replica is connected. - return self.connection_for_address(addrs.primary().as_str()); + // Attempt a fallback to any available replica or primary if needed. + return self.round_robin_read_from_replica(slot_map_value); } // Calculate index based on initial index and check count. let index = (initial_index + retries) % addrs.replicas().len(); let replica = &addrs.replicas()[index]; - // Check if this replica’s AZ matches the user’s AZ. + // Check if this replica’s availability zone matches the user’s availability zone. if let Some((address, connection_details)) = self.connection_details_for_address(replica.as_str()) { - if connection_details.az.as_deref() == Some(&client_az) { + if self.az_for_address(&address) == Some(client_az.clone()) { // Attempt to update `latest_used_replica` with the index of this replica. let _ = slot_map_value.last_used_replica.compare_exchange_weak( initial_index, @@ -303,15 +306,19 @@ where ReadFromReplicaStrategy::RoundRobin => { self.round_robin_read_from_replica(slot_map_value) } - ReadFromReplicaStrategy::AZAffinity(az) => { - self.round_robin_read_from_az_awareness_replica(slot_map_value, az.to_string()) - } + ReadFromReplicaStrategy::AZAffinity(az) => self + .round_robin_read_from_replica_with_az_awareness( + slot_map_value, + az.to_string(), + ), }, // when the user strategy per command is replica_preffered SlotAddr::ReplicaRequired => match &self.read_from_replica_strategy { - ReadFromReplicaStrategy::AZAffinity(az) => { - self.round_robin_read_from_az_awareness_replica(slot_map_value, az.to_string()) - } + ReadFromReplicaStrategy::AZAffinity(az) => self + .round_robin_read_from_replica_with_az_awareness( + slot_map_value, + az.to_string(), + ), _ => self.round_robin_read_from_replica(slot_map_value), }, } @@ -502,7 +509,6 @@ mod tests { } fn create_container_with_az_strategy( - // strategy: ReadFromReplicaStrategy, use_management_connections: bool, ) -> ConnectionsContainer { let slot_map = SlotMap::new( diff --git a/glide-core/redis-rs/redis/src/cluster_async/connections_logic.rs b/glide-core/redis-rs/redis/src/cluster_async/connections_logic.rs index 67828d033e..4f9b3f0d4e 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/connections_logic.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/connections_logic.rs @@ -2,8 +2,7 @@ use super::{ connections_container::{ClusterNode, ConnectionDetails}, Connect, }; -use crate::FromRedisValue; -use crate::InfoDict; +use crate::cluster_slotmap::ReadFromReplicaStrategy; use crate::{ aio::{ConnectionLike, DisconnectNotifier}, client::GlideConnectionOptions, @@ -11,7 +10,6 @@ use crate::{ cluster_client::ClusterParams, ErrorKind, RedisError, RedisResult, }; -use crate::{cluster_slotmap::ReadFromReplicaStrategy, cmd}; use std::net::SocketAddr; use futures::prelude::*; @@ -181,6 +179,11 @@ async fn connect_and_check_only_management_conn( where C: ConnectionLike + Connect + Send + Sync + 'static + Clone, { + let discover_az = matches!( + params.read_from_replicas, + crate::cluster_slotmap::ReadFromReplicaStrategy::AZAffinity(_) + ); + match create_connection::( addr, params.clone(), @@ -189,6 +192,7 @@ where GlideConnectionOptions { push_sender: None, disconnect_notifier, + discover_az, }, ) .await @@ -359,38 +363,9 @@ where .await?; } - if matches!( - params.read_from_replicas, - ReadFromReplicaStrategy::AZAffinity(_) - ) { - update_az_from_info(conn_details).await?; - } Ok(()) } -// Helper function to extract and update availability zone from INFO command -async fn update_az_from_info(conn_details: &mut ConnectionDetails) -> RedisResult<()> -where - C: ConnectionLike + Send + 'static, -{ - let info_res = conn_details.conn.req_packed_command(&cmd("INFO")).await; - match info_res { - Ok(value) => { - let info_dict: InfoDict = FromRedisValue::from_redis_value(&value)?; - conn_details.az = info_dict.get::("availability_zone"); - Ok(()) - } - Err(e) => { - // Handle the error case for the INFO command - Err(RedisError::from(( - ErrorKind::ResponseError, - "Failed to execute INFO command. ", - format!("{:?}", e), - ))) - } - } -} - #[doc(hidden)] pub const MANAGEMENT_CONN_NAME: &str = "glide_management_connection"; @@ -434,7 +409,10 @@ where glide_connection_options, ) .await - .map(|conn| conn.into()) + .map(|conn| { + let az = conn.0.get_az(); + (conn.0, conn.1, az).into() + }) } /// The function returns None if the checked connection/s are healthy. Otherwise, it returns the type of the unhealthy connection/s. diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 0ee1a6001d..4519cac96c 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -1153,9 +1153,15 @@ where None }; + let discover_az = matches!( + cluster_params.read_from_replicas, + crate::cluster_slotmap::ReadFromReplicaStrategy::AZAffinity(_) + ); + let glide_connection_options = GlideConnectionOptions { push_sender, disconnect_notifier, + discover_az, }; let connections = Self::create_initial_connections( @@ -1957,7 +1963,7 @@ where // Reset the current slot map and connection vector with the new ones let mut write_guard = inner.conn_lock.write().expect(MUTEX_WRITE_ERR); let read_from_replicas = inner - .get_cluster_param(|params| params.read_from_replicas) + .get_cluster_param(|params| params.read_from_replicas.clone()) .expect(MUTEX_READ_ERR); *write_guard = ConnectionsContainer::new( new_slots, @@ -2777,7 +2783,7 @@ where .expect(MUTEX_READ_ERR); let read_from_replicas = inner - .get_cluster_param(|params| params.read_from_replicas) + .get_cluster_param(|params| params.read_from_replicas.clone()) .expect(MUTEX_READ_ERR); ( calculate_topology( @@ -2823,6 +2829,12 @@ where fn is_closed(&self) -> bool { false } + + fn get_az(&self) -> Option { + None + } + + fn set_az(&mut self, _az: Option) {} } /// Implements the process of connecting to a Redis server diff --git a/glide-core/redis-rs/redis/src/cluster_slotmap.rs b/glide-core/redis-rs/redis/src/cluster_slotmap.rs index 82ff21adcf..88e7549323 100644 --- a/glide-core/redis-rs/redis/src/cluster_slotmap.rs +++ b/glide-core/redis-rs/redis/src/cluster_slotmap.rs @@ -29,7 +29,7 @@ pub enum ReadFromReplicaStrategy { /// Spread the read requests between all replicas in a round robin manner. /// If no replica is available, route the requests to the primary. RoundRobin, - /// Spread the read requests between replicas in the same client's AZ (Aviliablity zone) in a round robin manner, + /// Spread the read requests between replicas in the same client's Aviliablity zone in a round robin manner, /// falling back to other replicas or the primary if needed. AZAffinity(String), } @@ -52,7 +52,7 @@ fn get_address_from_slot( } match read_from_replica { ReadFromReplicaStrategy::AlwaysFromPrimary => addrs.primary(), - ReadFromReplicaStrategy::RoundRobin | ReadFromReplicaStrategy::AZAffinity => { + ReadFromReplicaStrategy::RoundRobin => { let index = slot .last_used_replica .fetch_add(1, std::sync::atomic::Ordering::Relaxed) diff --git a/glide-core/redis-rs/redis/tests/support/mock_cluster.rs b/glide-core/redis-rs/redis/tests/support/mock_cluster.rs index 3b7eb3837f..8c7369f788 100644 --- a/glide-core/redis-rs/redis/tests/support/mock_cluster.rs +++ b/glide-core/redis-rs/redis/tests/support/mock_cluster.rs @@ -372,6 +372,12 @@ impl aio::ConnectionLike for MockConnection { fn is_closed(&self) -> bool { false } + + fn get_az(&self) -> Option { + None + } + + fn set_az(&mut self, _az: Option) {} } impl redis::ConnectionLike for MockConnection { diff --git a/glide-core/redis-rs/redis/tests/test_cluster_async.rs b/glide-core/redis-rs/redis/tests/test_cluster_async.rs index 52cb7acba3..33bab65f44 100644 --- a/glide-core/redis-rs/redis/tests/test_cluster_async.rs +++ b/glide-core/redis-rs/redis/tests/test_cluster_async.rs @@ -121,7 +121,7 @@ mod cluster_async { .unwrap(); } - #[cfg(feature = "valkey-gte-7-2")] + #[cfg(feature = "valkey-gte-8")] #[tokio::test] async fn test_routing_by_slot_to_replica_with_az_affinity_strategy_to_half_replicas() { let replica_num: u16 = 4; @@ -203,7 +203,7 @@ mod cluster_async { ); } - #[cfg(feature = "valkey-gte-7-2")] + #[cfg(feature = "valkey-gte-8")] #[tokio::test] async fn test_routing_by_slot_to_replica_with_az_affinity_strategy_to_all_replicas() { let replica_num: u16 = 4; @@ -260,7 +260,7 @@ mod cluster_async { .unwrap(); let info_result = redis::from_owned_redis_value::>(info).unwrap(); - println!("{:?}", info_result); + // println!("{:?}", info_result); let get_cmdstat = format!("cmdstat_get:calls={}", n); let client_az = format!("availability_zone:{}", az); @@ -748,6 +748,12 @@ mod cluster_async { fn is_closed(&self) -> bool { true } + + fn get_az(&self) -> Option { + None + } + + fn set_az(&mut self, _az: Option) {} } #[test] diff --git a/glide-core/src/client/reconnecting_connection.rs b/glide-core/src/client/reconnecting_connection.rs index c567b6b5a6..39a4c1db62 100644 --- a/glide-core/src/client/reconnecting_connection.rs +++ b/glide-core/src/client/reconnecting_connection.rs @@ -113,6 +113,7 @@ async fn create_connection( connection_backend: ConnectionBackend, retry_strategy: RetryStrategy, push_sender: Option>, + discover_az: bool, ) -> Result { let client = &connection_backend.connection_info; let connection_options = GlideConnectionOptions { @@ -120,6 +121,7 @@ async fn create_connection( disconnect_notifier: Some::>(Box::new( TokioDisconnectNotifier::new(), )), + discover_az, }; let action = || async { get_multiplexed_connection(client, &connection_options) @@ -204,6 +206,7 @@ impl ReconnectingConnection { redis_connection_info: RedisConnectionInfo, tls_mode: TlsMode, push_sender: Option>, + discover_az: bool, ) -> Result { log_debug( "connection creation", @@ -216,7 +219,7 @@ impl ReconnectingConnection { connection_available_signal: ManualResetEvent::new(true), client_dropped_flagged: AtomicBool::new(false), }; - create_connection(backend, connection_retry_strategy, push_sender).await + create_connection(backend, connection_retry_strategy, push_sender, discover_az).await } pub(crate) fn node_address(&self) -> String { diff --git a/glide-core/src/client/standalone_client.rs b/glide-core/src/client/standalone_client.rs index ee554630cb..c5e69fd6dd 100644 --- a/glide-core/src/client/standalone_client.rs +++ b/glide-core/src/client/standalone_client.rs @@ -4,6 +4,7 @@ use super::get_redis_connection_info; use super::reconnecting_connection::{ReconnectReason, ReconnectingConnection}; use super::{ConnectionRequest, NodeAddress, TlsMode}; +use crate::client::types::ReadFrom as ClientReadFrom; use crate::retry_strategies::RetryStrategy; use futures::{future, stream, StreamExt}; use logger_core::log_debug; @@ -13,6 +14,7 @@ use redis::aio::ConnectionLike; use redis::cluster_routing::{self, is_readonly_cmd, ResponsePolicy, Routable, RoutingInfo}; use redis::{PushInfo, RedisError, RedisResult, Value}; use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; use std::sync::Arc; use telemetrylib::Telemetry; use tokio::sync::mpsc; @@ -22,11 +24,11 @@ use tokio::task; enum ReadFrom { Primary, PreferReplica { - latest_read_replica_index: Arc, + latest_read_replica_index: Arc, }, AZAffinity { client_az: String, - lasted_read_replica_index: Arc, + last_read_replica_index: Arc, }, } @@ -124,6 +126,11 @@ impl StandaloneClient { // randomize pubsub nodes, maybe a batter option is to always use the primary let pubsub_node_index = rand::thread_rng().gen_range(0..node_count); let pubsub_addr = &connection_request.addresses[pubsub_node_index]; + let discover_az = matches!( + connection_request.read_from, + Some(ClientReadFrom::AZAffinity(_)) + ); + let mut stream = stream::iter(connection_request.addresses.iter()) .map(|address| async { get_connection_and_replication_info( @@ -136,6 +143,7 @@ impl StandaloneClient { }, tls_mode.unwrap_or(TlsMode::NoTls), &push_sender, + discover_az, ) .await .map_err(|err| (format!("{}:{}", address.host, address.port), err)) @@ -225,7 +233,7 @@ impl StandaloneClient { &self, latest_read_replica_index: &Arc, ) -> &ReconnectingConnection { - let initial_index = latest_read_replica_index.load(std::sync::atomic::Ordering::Relaxed); + let initial_index = latest_read_replica_index.load(Ordering::Relaxed); let mut check_count = 0; loop { check_count += 1; @@ -245,15 +253,53 @@ impl StandaloneClient { let _ = latest_read_replica_index.compare_exchange_weak( initial_index, index, - std::sync::atomic::Ordering::Relaxed, - std::sync::atomic::Ordering::Relaxed, + Ordering::Relaxed, + Ordering::Relaxed, ); return connection; } } } - fn get_connection(&self, readonly: bool) -> &ReconnectingConnection { + async fn round_robin_read_from_replica_az_awareness( + &self, + latest_read_replica_index: &Arc, + client_az: String, + ) -> &ReconnectingConnection { + let initial_index = latest_read_replica_index.load(Ordering::Relaxed); + let mut retries = 0usize; + + loop { + retries = retries.saturating_add(1); + // Looped through all replicas; no connected replica found in the same AZ. + if retries > self.inner.nodes.len() { + // Attempt a fallback to any available replica in other AZs or primary. + return self.round_robin_read_from_replica(latest_read_replica_index); + } + + // Calculate index based on initial index and check count. + let index = (initial_index + retries) % self.inner.nodes.len(); + let replica = &self.inner.nodes[index]; + + // Attempt to get a connection and retrieve the replica's AZ. + if let Ok(connection) = replica.get_connection().await { + if let Some(replica_az) = connection.get_az().as_deref() { + if replica_az == client_az { + // Update `latest_used_replica` with the index of this replica. + let _ = latest_read_replica_index.compare_exchange_weak( + initial_index, + index, + Ordering::Relaxed, + Ordering::Relaxed, + ); + return replica; + } + } + } + } + } + + async fn get_connection(&self, readonly: bool) -> &ReconnectingConnection { if self.inner.nodes.len() == 1 || !readonly { return self.get_primary_connection(); } @@ -264,15 +310,14 @@ impl StandaloneClient { latest_read_replica_index, } => self.round_robin_read_from_replica(latest_read_replica_index), ReadFrom::AZAffinity { - #[allow(unused_variables)] client_az, - lasted_read_replica_index, + last_read_replica_index, } => { - log_warn( - "get_connection", - "AZAffinity is not yet supported for Standalone client, choosing a random node", - ); - self.round_robin_read_from_replica(lasted_read_replica_index) + self.round_robin_read_from_replica_az_awareness( + last_read_replica_index, + client_az.to_string(), + ) + .await } } } @@ -369,7 +414,7 @@ impl StandaloneClient { cmd: &redis::Cmd, readonly: bool, ) -> RedisResult { - let reconnecting_connection = self.get_connection(readonly); + let reconnecting_connection = self.get_connection(readonly).await; Self::send_request(cmd, reconnecting_connection).await } @@ -493,6 +538,7 @@ impl StandaloneClient { password: Option, ) -> RedisResult { self.get_connection(false) + .await .get_connection() .await? .update_connection_password(password.clone()) @@ -506,6 +552,7 @@ async fn get_connection_and_replication_info( connection_info: &redis::RedisConnectionInfo, tls_mode: TlsMode, push_sender: &Option>, + discover_az: bool, ) -> Result<(ReconnectingConnection, Value), (ReconnectingConnection, RedisError)> { let result = ReconnectingConnection::new( address, @@ -513,6 +560,7 @@ async fn get_connection_and_replication_info( connection_info.clone(), tls_mode, push_sender.clone(), + discover_az, ) .await; let reconnecting_connection = match result { @@ -549,7 +597,7 @@ fn get_read_from(read_from: Option) -> ReadFrom { }, Some(super::ReadFrom::AZAffinity(az)) => ReadFrom::AZAffinity { client_az: az, - lasted_read_replica_index: Default::default(), + last_read_replica_index: Default::default(), }, None => ReadFrom::Primary, } diff --git a/glide-core/src/client/types.rs b/glide-core/src/client/types.rs index fc3bc8b3e0..0c7680b3a6 100644 --- a/glide-core/src/client/types.rs +++ b/glide-core/src/client/types.rs @@ -106,7 +106,10 @@ impl From for ConnectionRequest { } else { log_warn( "types", - format!("Failed to convert AZ string: '{:?}'", value.client_az), + format!( + "Failed to convert availability zone string: '{:?}'. Falling back to `ReadFrom::PreferReplica`", + value.client_az + ), ); ReadFrom::PreferReplica } diff --git a/glide-core/tests/test_standalone_client.rs b/glide-core/tests/test_standalone_client.rs index 8001ccab0c..c118d6d28f 100644 --- a/glide-core/tests/test_standalone_client.rs +++ b/glide-core/tests/test_standalone_client.rs @@ -273,6 +273,18 @@ mod standalone_client_tests { }); } + #[rstest] + #[serial_test::serial] + #[timeout(SHORT_STANDALONE_TEST_TIMEOUT)] + fn test_read_from_replica_az_affinity() { + test_read_from_replica(ReadFromReplicaTestConfig { + read_from: ReadFrom::AZAffinity, + expected_primary_reads: 0, + expected_replica_reads: vec![1, 1, 1], + ..Default::default() + }); + } + #[rstest] #[serial_test::serial] #[timeout(SHORT_STANDALONE_TEST_TIMEOUT)] diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts index 68c48263a6..dcd9da4e0c 100644 --- a/node/src/BaseClient.ts +++ b/node/src/BaseClient.ts @@ -499,7 +499,7 @@ export type ReadFrom = /** Spread the requests between all replicas in a round robin manner. If no replica is available, route the requests to the primary.*/ | "preferReplica" - /** Spread the requests between replicas in the same client's AZ (Aviliablity zone) in a round robin manner. + /** Spread the requests between replicas in the same client's Aviliablity zone in a round robin manner. If no replica is available, route the requests to the primary.*/ | "AZAffinity"; diff --git a/package.json b/package.json index 2f59fcc5a8..3f61298feb 100644 --- a/package.json +++ b/package.json @@ -3,7 +3,7 @@ "@eslint/js": "^9.10.0", "@types/eslint__js": "^8.42.3", "@types/eslint-config-prettier": "^6.11.3", - "eslint": "^9.10.0", + "eslint": "9.14.0", "eslint-config-prettier": "^9.1.0", "prettier": "^3.3.3", "typescript": "^5.6.2",