Skip to content

Commit

Permalink
chore(upgrade): fix rebuild validation
Browse files Browse the repository at this point in the history
Signed-off-by: sinhaashish <[email protected]>
  • Loading branch information
sinhaashish committed Aug 22, 2023
1 parent b2bf668 commit 4101a29
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 23 deletions.
16 changes: 12 additions & 4 deletions k8s/upgrade/src/bin/upgrade-job/upgrade/data_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
kube_client::KubeClientSet,
rest_client::RestClientSet,
},
upgrade::utils::{all_pods_are_ready, data_plane_is_upgraded, is_rebuilding},
upgrade::utils::{all_pods_are_ready, data_plane_is_upgraded, rebuild_result, RebuildResult},
};
use k8s_openapi::api::core::v1::Pod;
use kube::{
Expand Down Expand Up @@ -252,9 +252,17 @@ async fn verify_data_plane_pod_is_running(
async fn wait_for_rebuild(node_name: &str, rest_client: &RestClientSet) -> Result<()> {
// Wait for 60 seconds for any rebuilds to kick in.
tokio::time::sleep(Duration::from_secs(60_u64)).await;
while is_rebuilding(rest_client).await? {
info!(node.name = %node_name, "Waiting for volume rebuilds to complete");
tokio::time::sleep(Duration::from_secs(10_u64)).await;

let mut result = RebuildResult::default();
loop {
let rebuild = rebuild_result(rest_client, &mut result.discarded_volumes).await?;

if rebuild.rebuilding {
info!(node.name = %node_name, "Waiting for volume rebuilds to complete");
tokio::time::sleep(Duration::from_secs(10_u64)).await;
} else {
break;
}
}
info!(node.name = %node_name, "No volume rebuilds in progress");
Ok(())
Expand Down
103 changes: 84 additions & 19 deletions k8s/upgrade/src/bin/upgrade-job/upgrade/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,67 @@ use crate::common::{
};
use k8s_openapi::api::core::v1::Pod;
use kube::{api::ObjectList, ResourceExt};
use openapi::models::{Volume, VolumeStatus};
use semver::{Version, VersionReq};
use snafu::ResultExt;
use std::time::Duration;
use tracing::{info, warn};

/// Function to check for any volume rebuild in progress across the cluster
pub(crate) async fn is_rebuilding(rest_client: &RestClientSet) -> Result<bool> {
/// Contains the Rebuild Results.
#[derive(Default)]
pub(crate) struct RebuildResult {
pub(crate) rebuilding: bool,
pub(crate) discarded_volumes: Vec<Volume>,
}

/// Function to check for any volume rebuild in progress across the cluster.
pub(crate) async fn rebuild_result(
rest_client: &RestClientSet,
stale_volumes: &mut Vec<Volume>,
) -> Result<RebuildResult> {
loop {
let unhealthy_volumes = list_unhealthy_volumes(rest_client, stale_volumes).await?;
if unhealthy_volumes.is_empty() {
break;
}

for volume in unhealthy_volumes.iter() {
match replica_rebuild_count(volume.clone()).await {
0 => {
for _i in 0 .. 11 {
// wait for a minute for any rebuild to start
tokio::time::sleep(Duration::from_secs(60_u64)).await;
let count = replica_rebuild_count(volume.clone()).await;
if count > 0 {
return Ok(RebuildResult {
rebuilding: true,
discarded_volumes: stale_volumes.clone(),
});
}
}
stale_volumes.push(volume.clone());
}
_ => {
return Ok(RebuildResult {
rebuilding: true,
discarded_volumes: stale_volumes.clone(),
})
}
}
}
}
Ok(RebuildResult {
rebuilding: false,
discarded_volumes: stale_volumes.clone(),
})
}

/// Return the list of unhealthy volumes.
pub(crate) async fn list_unhealthy_volumes(
rest_client: &RestClientSet,
discarded_volumes: &[Volume],
) -> Result<Vec<Volume>> {
let mut unhealthy_volumes: Vec<Volume> = Vec::new();
// The number of volumes to get per request.
let max_entries = 200;
let mut starting_token = Some(0_isize);
Expand All @@ -28,27 +83,37 @@ pub(crate) async fn is_rebuilding(rest_client: &RestClientSet) -> Result<bool> {
let volumes = vols.into_body();
starting_token = volumes.next_token;
for volume in volumes.entries {
if let Some(target) = &volume.state.target {
let mut rebuild_count = 0;

for child in target.children.iter() {
if child.rebuild_progress.is_some() {
rebuild_count += 1;
}
}
if rebuild_count > 0 {
info!(
"Rebuilding {} of {} replicas for volume {}",
rebuild_count,
target.children.len(),
volume.spec.uuid
);
return Ok(true);
match volume.state.status {
VolumeStatus::Faulted | VolumeStatus::Degraded => {
unhealthy_volumes.push(volume);
}
_ => continue,
}
}
}
Ok(false)
unhealthy_volumes.retain(|v| !discarded_volumes.contains(v));
Ok(unhealthy_volumes)
}

/// Count of number of replica rebuilding.
pub(crate) async fn replica_rebuild_count(volume: Volume) -> i32 {
let mut rebuild_count = 0;
if let Some(target) = &volume.state.target {
for child in target.children.iter() {
if child.rebuild_progress.is_some() {
rebuild_count += 1;
}
}
if rebuild_count > 0 {
info!(
"Rebuilding {} of {} replicas for volume {}",
rebuild_count,
target.children.len(),
volume.spec.uuid
);
}
}
rebuild_count
}

/// This function returns 'true' only if all of the containers in the Pods contained in the
Expand Down

0 comments on commit 4101a29

Please sign in to comment.