Skip to content

Commit

Permalink
Include endpoint token and metadata from cluster update (#359)
Browse files Browse the repository at this point in the history
* Include endpoint token and metadata from cluster update

Currently we're only passing along the endpoint address
while other info is empty

* test

* move out of function
  • Loading branch information
iffyio authored Aug 12, 2021
1 parent d853da8 commit 4f0eda5
Showing 1 changed file with 103 additions and 15 deletions.
118 changes: 103 additions & 15 deletions src/cluster/cluster_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,30 +106,37 @@ impl ClusterManager {
Ok(cluster_manager)
}

fn update_cluster_update_metrics(metrics: &Metrics, update: &ClusterUpdate) {
metrics.active_clusters.set(update.len() as i64);
metrics.active_endpoints.set(
Self::create_endpoints_from_update(update)
.map(|ep| ep.as_ref().len() as i64)
.unwrap_or_default(),
)
fn process_cluster_update(metrics: &Metrics, update: ClusterUpdate) -> Option<Endpoints> {
let num_clusters = update.len() as i64;
let update = Self::create_endpoints_from_update(update);
let num_endpoints = update
.as_ref()
.map(|ep| ep.as_ref().len() as i64)
.unwrap_or_default();
Self::update_cluster_update_metrics(metrics, num_clusters, num_endpoints);
update
}

fn update_cluster_update_metrics(metrics: &Metrics, num_clusters: i64, num_endpoints: i64) {
metrics.active_clusters.set(num_clusters);
metrics.active_endpoints.set(num_endpoints)
}

fn create_endpoints_from_update(update: &ClusterUpdate) -> Option<Endpoints> {
fn create_endpoints_from_update(update: ClusterUpdate) -> Option<Endpoints> {
// NOTE: We don't currently have support for consuming multiple clusters
// so here gather all endpoints into the same set, ignoring what cluster they
// belong to.
let endpoints = update
.iter()
.into_iter()
.fold(vec![], |mut endpoints, (_name, cluster)| {
let cluster_endpoints = cluster
.localities
.iter()
.into_iter()
.map(|(_, endpoints)| {
endpoints
.endpoints
.iter()
.map(|ep| Endpoint::from_address(ep.address))
.into_iter()
.map(|ep| Endpoint::new(ep.address, ep.tokens, ep.metadata))
})
.flatten();
endpoints.extend(cluster_endpoints);
Expand Down Expand Up @@ -158,10 +165,8 @@ impl ClusterManager {
update = cluster_updates_rx.recv() => {
match update {
Some(update) => {
Self::update_cluster_update_metrics(&metrics, &update);
let update = Self::create_endpoints_from_update(&update);
debug!(log, "Received a cluster update.");
cluster_manager.write().update(update);
cluster_manager.write().update(Self::process_cluster_update(&metrics, update));
}
None => {
warn!(log, "Exiting cluster update receive loop because the sender dropped the channel.");
Expand All @@ -188,6 +193,89 @@ mod tests {
use prometheus::Registry;
use tokio::sync::{mpsc, watch};

#[tokio::test]
async fn dynamic_cluster_manager_process_cluster_update() {
let (update_tx, update_rx) = mpsc::channel(3);
let (_shutdown_tx, shutdown_rx) = watch::channel(());
let cm = ClusterManager::dynamic(logger(), &Registry::default(), update_rx, shutdown_rx)
.unwrap();

let test_endpoints = vec![
Endpoint::new(
"127.0.0.1:80".parse().unwrap(),
vec!["abc-0".into(), "xyz-0".into()].into_iter().collect(),
Some(serde_json::json!({
"key-01": "value-01",
"key-02": "value-02",
})),
),
Endpoint::new(
"127.0.0.1:82".parse().unwrap(),
vec!["abc-2".into(), "xyz-2".into()].into_iter().collect(),
Some(serde_json::json!({
"key-01": "value-01",
"key-02": "value-02",
})),
),
Endpoint::new(
"127.0.0.1:83".parse().unwrap(),
vec!["abc-3".into(), "xyz-3".into()].into_iter().collect(),
None,
),
];

let update = vec![
(
"cluster-1".into(),
Cluster {
localities: vec![(
None,
LocalityEndpoints {
endpoints: vec![test_endpoints[0].clone()],
},
)]
.into_iter()
.collect(),
},
),
(
"cluster-2".into(),
Cluster {
localities: vec![(
None,
LocalityEndpoints {
endpoints: vec![test_endpoints[1].clone(), test_endpoints[2].clone()],
},
)]
.into_iter()
.collect(),
},
),
]
.into_iter()
.collect();
update_tx.send(update).await.unwrap();

// Check the processed update.
tokio::time::timeout(std::time::Duration::from_secs(3), async move {
// Wait for the update to be processed. Here just poll until there's
// a change we expect (or we will timeout from the enclosing future eventually.
loop {
let endpoints = { cm.read().endpoints.clone() };
if let Some(endpoints) = endpoints {
let mut endpoints = endpoints.as_ref().clone();
endpoints.sort_by(|a, b| a.address.cmp(&b.address));
assert_eq!(endpoints, test_endpoints);
break;
} else {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
};
}
})
.await
.unwrap();
}

#[test]
fn static_cluster_manager_metrics() {
let cm = ClusterManager::fixed(
Expand Down

0 comments on commit 4f0eda5

Please sign in to comment.