Skip to content

Commit

Permalink
standalone az awareness
Browse files Browse the repository at this point in the history
Signed-off-by: Adar Ovadia <[email protected]>
  • Loading branch information
Adar Ovadia committed Nov 16, 2024
1 parent c9556e0 commit 2ab55f2
Show file tree
Hide file tree
Showing 20 changed files with 261 additions and 92 deletions.
12 changes: 5 additions & 7 deletions .github/workflows/redis-rs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,14 @@ jobs:
working-directory: ./glide-core/redis-rs/redis

- name: Test
run:
# TODO remove the concurrency limit after we fix test flakyness.
run: |
versions_to_compare=$(printf "%s\n" "${{ matrix.engine.version }}" "7.1" | sort -V)
# Sort versions and get the latest version between the current version and 7.1, if the latest version is 7.1, skip the test.
lt_version=$(echo "$versions_to_comare" | tail -n1)
if [[ "7.1" == "$lt_version" ]]; then
cargo test --release -- --test-threads=1 | tee ../test-results.xml
smallest_version=$(printf "%s\n" "${{ matrix.engine.version }}" "8.0" | sort -V | head -n1)
# Sort versions and get the smallest version between the current version and 8.0, if the version is small than 8.0, skip the test.
if [[ "8.0" == "$smallest_version" ]]; then
cargo test --release --features valkey-gte-8 -- --test-threads=1 | tee ../test-results.xml
else
cargo test --release --features valkey-gte-7-2 -- --test-threads=1 | tee ../test-results.xml
cargo test --release -- --test-threads=1 | tee ../test-results.xml
fi
echo "### Tests passed :v:" >> $GITHUB_STEP_SUMMARY
Expand Down
6 changes: 6 additions & 0 deletions glide-core/redis-rs/redis-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,12 @@ impl AioConnectionLike for MockRedisConnection {
fn is_closed(&self) -> bool {
false
}

fn get_az(&self) -> Option<String> {
None
}

fn set_az(&mut self, _az: Option<String>) {}
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion glide-core/redis-rs/redis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ bigdecimal = ["dep:bigdecimal"]
num-bigint = []
uuid = ["dep:uuid"]
disable-client-setinfo = []
valkey-gte-7-2 = []
valkey-gte-8 = []

# Deprecated features
tls = ["tls-native-tls"] # use "tls-native-tls" instead
Expand Down
8 changes: 7 additions & 1 deletion glide-core/redis-rs/redis/src/aio/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ where
pubsub: false,
protocol: connection_info.protocol,
};
setup_connection(connection_info, &mut rv).await?;
setup_connection(connection_info, &mut rv, false).await?;
Ok(rv)
}

Expand Down Expand Up @@ -260,6 +260,12 @@ where
// always false for AsyncRead + AsyncWrite (cant do better)
false
}

fn get_az(&self) -> Option<String> {
None
}

fn set_az(&mut self, _az: Option<String>) {}
}

/// Represents a `PubSub` connection.
Expand Down
6 changes: 6 additions & 0 deletions glide-core/redis-rs/redis/src/aio/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,4 +309,10 @@ impl ConnectionLike for ConnectionManager {
// always return false due to automatic reconnect
false
}

fn get_az(&self) -> Option<String> {
None
}

fn set_az(&mut self, _az: Option<String>) {}
}
49 changes: 47 additions & 2 deletions glide-core/redis-rs/redis/src/aio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use crate::cmd::{cmd, Cmd};
use crate::connection::{
get_resp3_hello_command_error, PubSubSubscriptionKind, RedisConnectionInfo,
};
use crate::types::{ErrorKind, ProtocolVersion, RedisFuture, RedisResult, Value};
use crate::types::{
ErrorKind, FromRedisValue, InfoDict, ProtocolVersion, RedisError, RedisFuture, RedisResult,
Value,
};
use crate::PushKind;
use ::tokio::io::{AsyncRead, AsyncWrite};
use async_trait::async_trait;
Expand Down Expand Up @@ -84,6 +87,12 @@ pub trait ConnectionLike {

/// Returns the state of the connection
fn is_closed(&self) -> bool;

/// Get the connection availibility zone
fn get_az(&self) -> Option<String>;

/// Set the connection availibility zone
fn set_az(&mut self, az: Option<String>);
}

/// Implements ability to notify about disconnection events
Expand All @@ -105,8 +114,40 @@ impl Clone for Box<dyn DisconnectNotifier> {
}
}

// Helper function to extract and update availability zone from INFO command
async fn update_az_from_info<C>(con: &mut C) -> RedisResult<()>
where
C: ConnectionLike,
{
let info_res = con.req_packed_command(&cmd("INFO")).await;

match info_res {
Ok(value) => {
let info_dict: InfoDict = FromRedisValue::from_redis_value(&value)?;
if let Some(node_az) = info_dict.get::<String>("availability_zone") {
con.set_az(Some(node_az));
}
Ok(())
}
Err(e) => {
// Handle the error case for the INFO command
Err(RedisError::from((
ErrorKind::ResponseError,
"Failed to execute INFO command. ",
format!("{:?}", e),
)))
}
}
}

// Initial setup for every connection.
async fn setup_connection<C>(connection_info: &RedisConnectionInfo, con: &mut C) -> RedisResult<()>
async fn setup_connection<C>(
connection_info: &RedisConnectionInfo,
con: &mut C,
// This parameter is set to 'true' if ReadFromReplica strategy is set to AZAffinity.
// An INFO command will be triggered in the connection's setup to update the 'availability_zone' property.
discover_az: bool,
) -> RedisResult<()>
where
C: ConnectionLike,
{
Expand Down Expand Up @@ -181,6 +222,10 @@ where
}
}

if discover_az {
update_az_from_info(con).await?;
}

// result is ignored, as per the command's instructions.
// https://redis.io/commands/client-setinfo/
#[cfg(not(feature = "disable-client-setinfo"))]
Expand Down
33 changes: 32 additions & 1 deletion glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ pub struct MultiplexedConnection {
response_timeout: Duration,
protocol: ProtocolVersion,
push_manager: PushManager,
availability_zone: Option<String>,
password: Option<String>,
}

Expand Down Expand Up @@ -479,11 +480,16 @@ impl MultiplexedConnection {
.with_push_manager(pm)
.with_protocol(connection_info.redis.protocol)
.with_password(connection_info.redis.password.clone())
.with_availability_zone(None)
.build()
.await?;

let driver = {
let auth = setup_connection(&connection_info.redis, &mut con);
let auth = setup_connection(
&connection_info.redis,
&mut con,
glide_connection_options.discover_az,
);

futures_util::pin_mut!(auth);

Expand Down Expand Up @@ -575,6 +581,11 @@ impl MultiplexedConnection {
self.pipeline.set_push_manager(push_manager).await;
}

/// For external visibilty (glide-core)
pub fn get_availability_zone(&self) -> Option<String> {
self.availability_zone.clone()
}

/// Replace the password used to authenticate with the server.
/// If `None` is provided, the password will be removed.
pub async fn update_connection_password(
Expand All @@ -599,6 +610,8 @@ pub struct MultiplexedConnectionBuilder {
push_manager: Option<PushManager>,
protocol: Option<ProtocolVersion>,
password: Option<String>,
/// Represents the node's availability zone
availability_zone: Option<String>,
}

impl MultiplexedConnectionBuilder {
Expand All @@ -611,6 +624,7 @@ impl MultiplexedConnectionBuilder {
push_manager: None,
protocol: None,
password: None,
availability_zone: None,
}
}

Expand Down Expand Up @@ -644,6 +658,12 @@ impl MultiplexedConnectionBuilder {
self
}

/// Sets the avazilability zone for the `MultiplexedConnectionBuilder`.
pub fn with_availability_zone(mut self, az: Option<String>) -> Self {
self.availability_zone = az;
self
}

/// Builds and returns a new `MultiplexedConnection` instance using the configured settings.
pub async fn build(self) -> RedisResult<MultiplexedConnection> {
let db = self.db.unwrap_or_default();
Expand All @@ -661,6 +681,7 @@ impl MultiplexedConnectionBuilder {
push_manager,
protocol,
password,
availability_zone: self.availability_zone,
};

Ok(con)
Expand Down Expand Up @@ -688,6 +709,16 @@ impl ConnectionLike for MultiplexedConnection {
fn is_closed(&self) -> bool {
self.pipeline.is_closed()
}

/// Get the node's availability zone
fn get_az(&self) -> Option<String> {
self.availability_zone.clone()
}

/// Set the node's availability zone
fn set_az(&mut self, az: Option<String>) {
self.availability_zone = az;
}
}
impl MultiplexedConnection {
/// Subscribes to a new channel.
Expand Down
3 changes: 3 additions & 0 deletions glide-core/redis-rs/redis/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ pub struct GlideConnectionOptions {
#[cfg(feature = "aio")]
/// Passive disconnect notifier
pub disconnect_notifier: Option<Box<dyn DisconnectNotifier>>,
/// If ReadFromReplica strategy is set to AZAffinity, this parameter will be set to 'true'.
/// In this case, an INFO command will be triggered in the connection's setup to update the connection's 'availability_zone' property.
pub discover_az: bool,
}

/// To enable async support you need to enable the feature: `tokio-comp`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ macro_rules! count_connections {
pub struct ConnectionDetails<Connection> {
/// The actual connection
pub conn: Connection,
/// The IP and AZ associated with the connection
/// The IP associated with the connection
pub ip: Option<IpAddr>,
/// The AZ associated with the connection
/// The availability zone associated with the connection
pub az: Option<String>,
}

Expand Down Expand Up @@ -207,6 +207,13 @@ where
Telemetry::incr_total_connections(conn_count_after.saturating_sub(conn_count_before));
}

/// Returns the availability zone associated with the connection in address
pub(crate) fn az_for_address(&self, address: &str) -> Option<String> {
self.connection_map
.get(address)
.map(|item| item.value().user_connection.az.clone())?
}

/// Returns true if the address represents a known primary node.
pub(crate) fn is_primary(&self, address: &String) -> bool {
self.connection_for_address(address).is_some() && self.slot_map.is_primary(address)
Expand Down Expand Up @@ -240,7 +247,9 @@ where
}
}

pub(crate) fn round_robin_read_from_az_awareness_replica(
/// Returns the node's connection in the same availability zone as `client_az` in round robin strategy if exits,
/// if not, will fall back to any available replica or primary.
pub(crate) fn round_robin_read_from_replica_with_az_awareness(
&self,
slot_map_value: &SlotMapValue,
client_az: String,
Expand All @@ -251,27 +260,21 @@ where

loop {
retries = retries.saturating_add(1);
// Looped through all replicas; no connected replica found in the same AZ.
// Looped through all replicas; no connected replica found in the same availability zone.
if retries > addrs.replicas().len() {
// Attempt a fallback to any available replica in other AZs.
for replica in &addrs.replicas() {
if let Some(connection) = self.connection_for_address(replica.as_str()) {
return Some(connection);
}
}
// Fallback to the primary if no replica is connected.
return self.connection_for_address(addrs.primary().as_str());
// Attempt a fallback to any available replica or primary if needed.
return self.round_robin_read_from_replica(slot_map_value);
}

// Calculate index based on initial index and check count.
let index = (initial_index + retries) % addrs.replicas().len();
let replica = &addrs.replicas()[index];

// Check if this replica’s AZ matches the user’s AZ.
// Check if this replica’s availability zone matches the user’s availability zone.
if let Some((address, connection_details)) =
self.connection_details_for_address(replica.as_str())
{
if connection_details.az.as_deref() == Some(&client_az) {
if self.az_for_address(&address) == Some(client_az.clone()) {
// Attempt to update `latest_used_replica` with the index of this replica.
let _ = slot_map_value.last_used_replica.compare_exchange_weak(
initial_index,
Expand Down Expand Up @@ -303,15 +306,19 @@ where
ReadFromReplicaStrategy::RoundRobin => {
self.round_robin_read_from_replica(slot_map_value)
}
ReadFromReplicaStrategy::AZAffinity(az) => {
self.round_robin_read_from_az_awareness_replica(slot_map_value, az.to_string())
}
ReadFromReplicaStrategy::AZAffinity(az) => self
.round_robin_read_from_replica_with_az_awareness(
slot_map_value,
az.to_string(),
),
},
// when the user strategy per command is replica_preffered
SlotAddr::ReplicaRequired => match &self.read_from_replica_strategy {
ReadFromReplicaStrategy::AZAffinity(az) => {
self.round_robin_read_from_az_awareness_replica(slot_map_value, az.to_string())
}
ReadFromReplicaStrategy::AZAffinity(az) => self
.round_robin_read_from_replica_with_az_awareness(
slot_map_value,
az.to_string(),
),
_ => self.round_robin_read_from_replica(slot_map_value),
},
}
Expand Down Expand Up @@ -502,7 +509,6 @@ mod tests {
}

fn create_container_with_az_strategy(
// strategy: ReadFromReplicaStrategy,
use_management_connections: bool,
) -> ConnectionsContainer<usize> {
let slot_map = SlotMap::new(
Expand Down
Loading

0 comments on commit 2ab55f2

Please sign in to comment.