Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: add az awareness to read strategy #2539

Merged
merged 10 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/redis-rs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ jobs:
working-directory: ./glide-core/redis-rs/redis

- name: Test
# TODO remove the concurrency limit after we fix test flakyness.
run: |
cargo test --release -- --test-threads=1 | tee ../test-results.xml
echo "### Tests passed :v:" >> $GITHUB_STEP_SUMMARY
Expand Down
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,10 @@
* Node: Add `JSON.STRLEN` and `JSON.STRAPPEND` command ([#2537](https://github.com/valkey-io/valkey-glide/pull/2537))
* Node: Add `FT.SEARCH` ([#2551](https://github.com/valkey-io/valkey-glide/pull/2551))
* Python: Fix example ([#2556](https://github.com/valkey-io/valkey-glide/issues/2556))
* Core: Add support for sending multi-slot JSON.MSET and JSON.MGET commands ([#2587]https://github.com/valkey-io/valkey-glide/pull/2587)
* Core: Add support for sending multi-slot JSON.MSET and JSON.MGET commands ([#2587](https://github.com/valkey-io/valkey-glide/pull/2587))
* Node: Add `JSON.DEBUG` command ([#2572](https://github.com/valkey-io/valkey-glide/pull/2572))
* Node: Add `JSON.NUMINCRBY` and `JSON.NUMMULTBY` command ([#2555](https://github.com/valkey-io/valkey-glide/pull/2555))
* Core: Add support to Availability Zone Affinity read strategy ([#2539](https://github.com/valkey-io/valkey-glide/pull/2539))
* Core: Fix list of readonly commands ([#2634](https://github.com/valkey-io/valkey-glide/pull/2634), [#2649](https://github.com/valkey-io/valkey-glide/pull/2649))
* Core: Improve retry logic and update unmaintained dependencies for Rust lint CI ([#2673](https://github.com/valkey-io/valkey-glide/pull/2643))
* Core: Release the read lock while creating connections in `refresh_connections` ([#2630](https://github.com/valkey-io/valkey-glide/issues/2630))
Expand Down
2 changes: 2 additions & 0 deletions glide-core/redis-rs/redis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ uuid = { version = "1.6.1", optional = true }

telemetrylib = { path = "../../telemetry" }

lazy_static = "1"

[features]
default = [
"acl",
Expand Down
2 changes: 1 addition & 1 deletion glide-core/redis-rs/redis/src/aio/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ where
pubsub: false,
protocol: connection_info.protocol,
};
setup_connection(connection_info, &mut rv).await?;
setup_connection(connection_info, &mut rv, false).await?;
Ok(rv)
}

Expand Down
51 changes: 49 additions & 2 deletions glide-core/redis-rs/redis/src/aio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use crate::cmd::{cmd, Cmd};
use crate::connection::{
get_resp3_hello_command_error, PubSubSubscriptionKind, RedisConnectionInfo,
};
use crate::types::{ErrorKind, ProtocolVersion, RedisFuture, RedisResult, Value};
use crate::types::{
ErrorKind, FromRedisValue, InfoDict, ProtocolVersion, RedisError, RedisFuture, RedisResult,
Value,
};
use crate::PushKind;
use ::tokio::io::{AsyncRead, AsyncWrite};
use async_trait::async_trait;
Expand Down Expand Up @@ -84,6 +87,14 @@ pub trait ConnectionLike {

/// Returns the state of the connection
fn is_closed(&self) -> bool;

/// Get the connection availibility zone
fn get_az(&self) -> Option<String> {
None
}

/// Set the connection availibility zone
fn set_az(&mut self, _az: Option<String>) {}
}

/// Implements ability to notify about disconnection events
Expand All @@ -105,8 +116,40 @@ impl Clone for Box<dyn DisconnectNotifier> {
}
}

// Helper function to extract and update availability zone from INFO command
async fn update_az_from_info<C>(con: &mut C) -> RedisResult<()>
where
C: ConnectionLike,
{
let info_res = con.req_packed_command(&cmd("INFO")).await;

match info_res {
Ok(value) => {
let info_dict: InfoDict = FromRedisValue::from_redis_value(&value)?;
if let Some(node_az) = info_dict.get::<String>("availability_zone") {
con.set_az(Some(node_az));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also add a debug message stating that the property "availability_zone" could not be found for this redis server

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eifrah-aws I dont think its required - not having AZ is ok (redis/older versions)

}
Ok(())
}
Err(e) => {
// Handle the error case for the INFO command
Err(RedisError::from((
ErrorKind::ResponseError,
"Failed to execute INFO command. ",
format!("{:?}", e),
)))
}
}
}

// Initial setup for every connection.
async fn setup_connection<C>(connection_info: &RedisConnectionInfo, con: &mut C) -> RedisResult<()>
async fn setup_connection<C>(
connection_info: &RedisConnectionInfo,
con: &mut C,
// This parameter is set to 'true' if ReadFromReplica strategy is set to AZAffinity.
// An INFO command will be triggered in the connection's setup to update the 'availability_zone' property.
discover_az: bool,
) -> RedisResult<()>
where
C: ConnectionLike,
{
Expand Down Expand Up @@ -181,6 +224,10 @@ where
}
}

if discover_az {
update_az_from_info(con).await?;
}

// result is ignored, as per the command's instructions.
// https://redis.io/commands/client-setinfo/
#[cfg(not(feature = "disable-client-setinfo"))]
Expand Down
33 changes: 32 additions & 1 deletion glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ pub struct MultiplexedConnection {
response_timeout: Duration,
protocol: ProtocolVersion,
push_manager: PushManager,
availability_zone: Option<String>,
password: Option<String>,
}

Expand Down Expand Up @@ -479,11 +480,16 @@ impl MultiplexedConnection {
.with_push_manager(pm)
.with_protocol(connection_info.redis.protocol)
.with_password(connection_info.redis.password.clone())
.with_availability_zone(None)
.build()
.await?;

let driver = {
let auth = setup_connection(&connection_info.redis, &mut con);
let auth = setup_connection(
&connection_info.redis,
&mut con,
glide_connection_options.discover_az,
);

futures_util::pin_mut!(auth);

Expand Down Expand Up @@ -575,6 +581,11 @@ impl MultiplexedConnection {
self.pipeline.set_push_manager(push_manager).await;
}

/// For external visibilty (glide-core)
pub fn get_availability_zone(&self) -> Option<String> {
self.availability_zone.clone()
}

/// Replace the password used to authenticate with the server.
/// If `None` is provided, the password will be removed.
pub async fn update_connection_password(
Expand All @@ -599,6 +610,8 @@ pub struct MultiplexedConnectionBuilder {
push_manager: Option<PushManager>,
protocol: Option<ProtocolVersion>,
password: Option<String>,
/// Represents the node's availability zone
availability_zone: Option<String>,
}

impl MultiplexedConnectionBuilder {
Expand All @@ -611,6 +624,7 @@ impl MultiplexedConnectionBuilder {
push_manager: None,
protocol: None,
password: None,
availability_zone: None,
}
}

Expand Down Expand Up @@ -644,6 +658,12 @@ impl MultiplexedConnectionBuilder {
self
}

/// Sets the avazilability zone for the `MultiplexedConnectionBuilder`.
pub fn with_availability_zone(mut self, az: Option<String>) -> Self {
self.availability_zone = az;
self
}

/// Builds and returns a new `MultiplexedConnection` instance using the configured settings.
pub async fn build(self) -> RedisResult<MultiplexedConnection> {
let db = self.db.unwrap_or_default();
Expand All @@ -661,6 +681,7 @@ impl MultiplexedConnectionBuilder {
push_manager,
protocol,
password,
availability_zone: self.availability_zone,
};

Ok(con)
Expand Down Expand Up @@ -688,6 +709,16 @@ impl ConnectionLike for MultiplexedConnection {
fn is_closed(&self) -> bool {
self.pipeline.is_closed()
}

/// Get the node's availability zone
fn get_az(&self) -> Option<String> {
self.availability_zone.clone()
}

/// Set the node's availability zone
fn set_az(&mut self, az: Option<String>) {
self.availability_zone = az;
}
}
impl MultiplexedConnection {
/// Subscribes to a new channel.
Expand Down
5 changes: 4 additions & 1 deletion glide-core/redis-rs/redis/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ pub struct GlideConnectionOptions {
#[cfg(feature = "aio")]
/// Passive disconnect notifier
pub disconnect_notifier: Option<Box<dyn DisconnectNotifier>>,
/// If ReadFromReplica strategy is set to AZAffinity, this parameter will be set to 'true'.
/// In this case, an INFO command will be triggered in the connection's setup to update the connection's 'availability_zone' property.
pub discover_az: bool,
}

/// To enable async support you need to enable the feature: `tokio-comp`
Expand Down Expand Up @@ -164,7 +167,7 @@ impl Client {
/// For Unix connections, returns (async connection, None)
#[cfg(feature = "tokio-comp")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
pub async fn get_multiplexed_async_connection_and_ip(
pub async fn get_multiplexed_async_connection_ip(
&self,
glide_connection_options: GlideConnectionOptions,
) -> RedisResult<(crate::aio::MultiplexedConnection, Option<IpAddr>)> {
Expand Down
7 changes: 5 additions & 2 deletions glide-core/redis-rs/redis/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,10 @@ where
) -> RedisResult<Self> {
let connection = Self {
connections: RefCell::new(HashMap::new()),
slots: RefCell::new(SlotMap::new(vec![], cluster_params.read_from_replicas)),
slots: RefCell::new(SlotMap::new(
vec![],
cluster_params.read_from_replicas.clone(),
)),
auto_reconnect: RefCell::new(true),
cluster_params,
read_timeout: RefCell::new(None),
Expand Down Expand Up @@ -384,7 +387,7 @@ where
"can't parse node address",
)))?;
match parse_and_count_slots(&value, self.cluster_params.tls, addr).map(|slots_data| {
SlotMap::new(slots_data.1, self.cluster_params.read_from_replicas)
SlotMap::new(slots_data.1, self.cluster_params.read_from_replicas.clone())
}) {
Ok(new_slots) => {
result = Ok(new_slots);
Expand Down
Loading
Loading