Skip to content

Commit

Permalink
Core: az affinity strategy
Browse files Browse the repository at this point in the history
Signed-off-by: Adar Ovadia <[email protected]>
  • Loading branch information
Adar Ovadia committed Nov 12, 2024
1 parent c15c76f commit 47691cf
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 40 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ jobs:

- name: Run tests
working-directory: ./glide-core
# TODO remove the concurrency limit after we fix test flakyness.
run: cargo test --all-features --release -- --test-threads=1
run: cargo test --all-features --release -- --test-threads=1 # TODO remove the concurrency limit after we fix test flakyness.

- name: Run logger tests
working-directory: ./logger_core
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use dashmap::DashMap;
use futures::FutureExt;
use rand::seq::IteratorRandom;
use std::net::IpAddr;
use std::sync::atomic::Ordering;
use telemetrylib::Telemetry;

/// Count the number of connections in a connections_map object
Expand All @@ -23,12 +24,12 @@ macro_rules! count_connections {
}};
}

/// A struct that encapsulates a network connection along with its associated IP address.
/// A struct that encapsulates a network connection along with its associated IP address and AZ.
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct ConnectionDetails<Connection> {
/// The actual connection
pub conn: Connection,
/// The IP associated with the connection
/// The IP and AZ associated with the connection
pub ip: Option<IpAddr>,
/// The AZ associated with the connection
pub az: Option<String>,
Expand Down Expand Up @@ -209,9 +210,7 @@ where
slot_map_value: &SlotMapValue,
) -> Option<ConnectionAndAddress<Connection>> {
let addrs = &slot_map_value.addrs;
let initial_index = slot_map_value
.latest_used_replica
.load(std::sync::atomic::Ordering::Relaxed);
let initial_index = slot_map_value.latest_used_replica.load(Ordering::Relaxed);
let mut check_count = 0;
loop {
check_count += 1;
Expand All @@ -225,8 +224,8 @@ where
let _ = slot_map_value.latest_used_replica.compare_exchange_weak(
initial_index,
index,
std::sync::atomic::Ordering::Relaxed,
std::sync::atomic::Ordering::Relaxed,
Ordering::Relaxed,
Ordering::Relaxed,
);
return Some(connection);
}
Expand All @@ -239,16 +238,13 @@ where
client_az: String,
) -> Option<ConnectionAndAddress<Connection>> {
let addrs = &slot_map_value.addrs;
let initial_index = slot_map_value
.latest_used_replica
.load(std::sync::atomic::Ordering::Relaxed);
let mut check_count = 0;
let initial_index = slot_map_value.latest_used_replica.load(Ordering::Relaxed);
let mut retries = 0usize;

loop {
check_count += 1;

retries = retries.saturating_add(1);
// Looped through all replicas; no connected replica found in the same AZ.
if check_count > addrs.replicas.len() {
if retries > addrs.replicas.len() {
// Attempt a fallback to any available replica in other AZs.
for replica in &addrs.replicas {
if let Some(connection) = self.connection_for_address(replica.as_str()) {
Expand All @@ -260,7 +256,7 @@ where
}

// Calculate index based on initial index and check count.
let index = (initial_index + check_count) % addrs.replicas.len();
let index = (initial_index + retries) % addrs.replicas.len();
let replica = &addrs.replicas[index];

// Check if this replica’s AZ matches the user’s AZ.
Expand All @@ -272,8 +268,8 @@ where
let _ = slot_map_value.latest_used_replica.compare_exchange_weak(
initial_index,
index,
std::sync::atomic::Ordering::Relaxed,
std::sync::atomic::Ordering::Relaxed,
Ordering::Relaxed,
Ordering::Relaxed,
);
return Some((address, connection_details.conn));
}
Expand Down
13 changes: 9 additions & 4 deletions glide-core/redis-rs/redis/src/cluster_async/connections_logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use super::{
connections_container::{ClusterNode, ConnectionDetails},
Connect,
};
use crate::cmd;
use crate::FromRedisValue;
use crate::InfoDict;
use crate::{
Expand All @@ -12,6 +11,7 @@ use crate::{
cluster_client::ClusterParams,
ErrorKind, RedisError, RedisResult,
};
use crate::{cluster_slotmap::ReadFromReplicaStrategy, cmd};
use std::net::SocketAddr;

use futures::prelude::*;
Expand Down Expand Up @@ -348,8 +348,8 @@ async fn setup_user_connection<C>(
where
C: ConnectionLike + Connect + Send + 'static,
{
let read_from_replicas = params.read_from_replicas
!= crate::cluster_slotmap::ReadFromReplicaStrategy::AlwaysFromPrimary;
let read_from_replicas =
params.read_from_replicas != ReadFromReplicaStrategy::AlwaysFromPrimary;
let connection_timeout = params.connection_timeout;
check_connection(&mut conn_details.conn, connection_timeout).await?;
if read_from_replicas {
Expand All @@ -359,7 +359,12 @@ where
.await?;
}

update_az_from_info(conn_details).await?;
if matches!(
params.read_from_replicas,
ReadFromReplicaStrategy::AZAffinity(_)
) {
update_az_from_info(conn_details).await?;
}
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion glide-core/redis-rs/redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2051,7 +2051,7 @@ where
.conn_lock
.read()
.expect(MUTEX_READ_ERR)
.connection_for_route_with_params(&route, Some(core.cluster_params.clone()))
.connection_for_route(&route)
{
ConnectionCheck::Found((conn, address))
} else {
Expand Down
15 changes: 7 additions & 8 deletions glide-core/redis-rs/redis/src/cluster_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,17 +387,16 @@ impl ClusterClientBuilder {
self
}

/// Sets the read strategy on all new connections, based on the specified policy.
/// Set the read strategy for this client.
///
/// Using the specified `read_strategy`, this function configures whether read queries will be
/// routed to replica nodes or primary nodes. If `ReadFromReplicaStrategy::AZAffinity` is set,
/// read requests will first attempt to access replicas in the same availability zone, falling
/// back to other replicas or the primary if needed. If `ReadFromReplicaStrategy::RoundRobin` is chosen, reads are distributed
/// across replicas for load balancing, while `ReadFromReplicaStrategy::AlwaysFromPrimary` ensures all read and write queries
/// are directed to the primary node.
/// The parameter `read_strategy` can be one of:
/// `ReadFromReplicaStrategy::AZAffinity(availability_zone)` - attempt to access replicas in the same availability zone.
/// If no suitable replica is found (i.e. no replica could be found in the requested availability zone), choose any replica. Falling back to primary if needed.
/// `ReadFromReplicaStrategy::RoundRobin` - reads are distributed across replicas for load balancing using round-robin algorithm. Falling back to primary if needed.
/// `ReadFromReplicaStrategy::AlwaysFromPrimary` ensures all read and write queries are directed to the primary node.
///
/// # Parameters
/// - `read_strategy`: Defines the replica routing strategy.
/// - `read_strategy`: defines the replica routing strategy.
pub fn read_from(mut self, read_strategy: ReadFromReplicaStrategy) -> ClusterClientBuilder {
self.builder_params.read_from_replicas = read_strategy;
self
Expand Down
14 changes: 7 additions & 7 deletions glide-core/redis-rs/redis/src/cluster_slotmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@ impl SlotMapValue {
}

#[derive(Debug, Default, Clone, PartialEq)]
/** Represents the client's read from strategy. */
/// Represents the client's read from strategy.
pub enum ReadFromReplicaStrategy {
#[default]
/** Always get from primary, in order to get the freshest data.*/
/// Always get from primary, in order to get the freshest data.
AlwaysFromPrimary,
/** Spread the read requests between all replicas in a round robin manner.
If no replica is available, route the requests to the primary.*/
/// Spread the read requests between all replicas in a round robin manner.
/// If no replica is available, route the requests to the primary.
RoundRobin,
/** Spread the read requests between replicas in the same client's AZ (Aviliablity zone) in a round robin manner,
falling back to other replicas or the primary if needed.*/
/// Spread the read requests between replicas in the same client's AZ (Aviliablity zone) in a round robin manner,
/// falling back to other replicas or the primary if needed.
AZAffinity(String),
}

Expand All @@ -60,7 +60,7 @@ fn get_address_from_slot(
% slot.addrs.replicas.len();
slot.addrs.replicas[index].as_str()
}
ReadFromReplicaStrategy::AZAffinity(_az) => todo!(), // todo thrrow exception for sync client
ReadFromReplicaStrategy::AZAffinity(_az) => todo!(), // Drop sync client
}
}

Expand Down
3 changes: 2 additions & 1 deletion glide-core/redis-rs/redis/tests/test_cluster_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ mod cluster_async {
#[cfg(feature = "valkey-gte-7-2")]
#[tokio::test]
async fn test_routing_by_slot_to_replica_with_az_affinity_strategy_to_all_replicas() {
let replica_num: u16 = 3;
let replica_num: u16 = 4;
let primaries_num: u16 = 3;
let cluster =
TestClusterContext::new((replica_num * primaries_num) + primaries_num, replica_num);
Expand Down Expand Up @@ -260,6 +260,7 @@ mod cluster_async {
.unwrap();

let info_result = redis::from_owned_redis_value::<HashMap<String, String>>(info).unwrap();
println!("{:?}", info_result);
let get_cmdstat = format!("cmdstat_get:calls={}", n);
let client_az = format!("availability_zone:{}", az);

Expand Down

0 comments on commit 47691cf

Please sign in to comment.