Skip to content

Commit

Permalink
feat(upgrade-job): add check for partial rebuild between versions 2.2…
Browse files Browse the repository at this point in the history
… and 2.5

Signed-off-by: Niladri Halder <[email protected]>
  • Loading branch information
niladrih committed Apr 12, 2024
1 parent 56c8b34 commit f3c623f
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 99 deletions.
6 changes: 6 additions & 0 deletions k8s/upgrade/src/bin/upgrade-job/common/constants.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use semver::Version;

/// This is the name of the project that is being upgraded.
pub(crate) const PRODUCT: &str = "Mayastor";

Expand Down Expand Up @@ -34,6 +36,10 @@ pub(crate) const UMBRELLA_CHART_UPGRADE_DOCS_URL: &str =
/// the kubernetes api.
pub(crate) const KUBE_API_PAGE_SIZE: u32 = 500;

/// The Core chart version limits for requiring partial rebuild to be disabled for upgrade.
pub(crate) const PARTIAL_REBUILD_DISABLE_EXTENTS: (Version, Version) =
(Version::new(2, 2, 0), Version::new(2, 5, 0));

/// Version value for the earliest possible 2.0 release.
pub(crate) const TWO_DOT_O_RC_ONE: &str = "2.0.0-rc.1";

Expand Down
24 changes: 7 additions & 17 deletions k8s/upgrade/src/bin/upgrade-job/common/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,23 +477,6 @@ pub(crate) enum Error {
#[snafu(display("Failed to send Event over the channel"))]
EventChannelSend,

/// Error for when the no value for version label is found on the helm chart.
#[snafu(display(
"Failed to get the value of the {} label in Pod {} in Namespace {}",
CHART_VERSION_LABEL_KEY,
pod_name,
namespace
))]
HelmChartVersionLabelHasNoValue { pod_name: String, namespace: String },

/// Error for when a pod does not have Namespace set on it.
#[snafu(display(
"Found None when trying to get Namespace for Pod {}, context: {}",
pod_name,
context
))]
NoNamespaceInPod { pod_name: String, context: String },

/// Error for the Umbrella chart is not upgraded.
#[snafu(display(
"The {} helm chart is not upgraded to version {}: Upgrade for helm chart {} is not \
Expand Down Expand Up @@ -680,6 +663,13 @@ pub(crate) enum Error {

#[snafu(display("failed to list CustomResourceDefinitions: {source}"))]
ListCrds { source: kube::Error },

#[snafu(display("Partial rebuild must be disabled for upgrades from {chart_name} chart versions >= {lower_extent}, <= {upper_extent}"))]
PartialRebuildNotAllowed {
chart_name: String,
lower_extent: String,
upper_extent: String,
},
}

/// A wrapper type to remove repeated Result<T, Error> returns.
Expand Down
43 changes: 22 additions & 21 deletions k8s/upgrade/src/bin/upgrade-job/helm/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use tracing::info;

/// HelmUpgradeRunner is returned after an upgrade is validated and dry-run-ed. Running
/// it carries out helm upgrade.
pub(crate) type HelmUpgradeRunner = Pin<Box<dyn Future<Output = Result<()>>>>;
pub(crate) type HelmUpgradeRunner =
Pin<Box<dyn Future<Output = Result<Box<dyn HelmValuesCollection>>>>>;

/// A trait object of type HelmUpgrader is either CoreHelmUpgrader or an UmbrellaHelmUpgrader.
/// They either deal with upgrading the Core helm chart or the Umbrella helm chart respectively.
Expand All @@ -42,9 +43,6 @@ pub(crate) trait HelmUpgrader {

/// Return the target helm chart version as a String.
fn target_version(&self) -> String;

/// Returns a deserialized struct with tools to gather information about the source helm values.
fn source_values(&self) -> &dyn HelmValuesCollection;
}

/// This is a builder for the Helm chart upgrade.
Expand Down Expand Up @@ -183,14 +181,11 @@ impl HelmUpgraderBuilder {
// Fail if the Umbrella chart isn't already upgraded.
ensure!(already_upgraded, UmbrellaChartNotUpgraded);

// Deserialize umbrella chart values yaml.
let source_values = UmbrellaValues::try_from(source_values_buf.as_slice())?;

Ok(Box::new(UmbrellaHelmUpgrader {
release_name,
client,
source_version,
target_version,
source_values,
}))
} else if Regex::new(core_regex.as_str())?.is_match(chart) {
// Skip upgrade-path validation and allow all upgrades for the Core helm chart, if
Expand Down Expand Up @@ -292,7 +287,9 @@ impl HelmUpgrader for CoreHelmUpgrader {
is the same as that of this upgrade-job's helm chart"
);

Ok(())
let source_values: Box<dyn HelmValuesCollection> = Box::new(self.source_values);

Ok(source_values)
}));
}

Expand Down Expand Up @@ -320,14 +317,20 @@ is the same as that of this upgrade-job's helm chart"
info!("Starting helm upgrade...");
self.client
.upgrade(
self.release_name,
self.release_name.as_str(),
self.chart_dir,
Some(self.helm_upgrade_extra_args),
)
.await?;
info!("Helm upgrade successful!");

Ok(())
let final_values_buf = self
.client
.get_values_as_yaml::<&str, String>(self.release_name.as_str(), None)?;
let final_values: Box<dyn HelmValuesCollection> =
Box::new(CoreValues::try_from(final_values_buf.as_slice())?);

Ok(final_values)
}))
}

Expand All @@ -338,19 +341,15 @@ is the same as that of this upgrade-job's helm chart"
fn target_version(&self) -> String {
self.target_version.to_string()
}

fn source_values(&self) -> &dyn HelmValuesCollection {
&self.source_values
}
}

/// This is a HelmUpgrader for the Umbrella chart. This gathers information, and doesn't
/// set up a helm upgrade or a dry-run in any way.
pub(crate) struct UmbrellaHelmUpgrader {
release_name: String,
client: HelmReleaseClient,
source_version: Version,
target_version: Version,
source_values: UmbrellaValues,
}

#[async_trait]
Expand All @@ -362,7 +361,13 @@ impl HelmUpgrader for UmbrellaHelmUpgrader {
self.release_name.as_str()
);

Ok(())
let final_values_buf = self
.client
.get_values_as_yaml::<&str, String>(self.release_name.as_str(), None)?;
let final_values: Box<dyn HelmValuesCollection> =
Box::new(UmbrellaValues::try_from(final_values_buf.as_slice())?);

Ok(final_values)
}))
}

Expand All @@ -373,8 +378,4 @@ impl HelmUpgrader for UmbrellaHelmUpgrader {
fn target_version(&self) -> String {
self.target_version.to_string()
}

fn source_values(&self) -> &dyn HelmValuesCollection {
&self.source_values
}
}
75 changes: 66 additions & 9 deletions k8s/upgrade/src/bin/upgrade-job/upgrade.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,23 @@
use crate::{
common::{constants::PRODUCT, error::Result},
common::{
constants::{
CHART_VERSION_LABEL_KEY, CORE_CHART_NAME, IO_ENGINE_LABEL,
PARTIAL_REBUILD_DISABLE_EXTENTS, PRODUCT,
},
error::{PartialRebuildNotAllowed, Result},
kube_client as KubeClient,
},
events::event_recorder::{EventAction, EventRecorder},
helm::upgrade::{HelmUpgradeRunner, HelmUpgraderBuilder},
opts::CliArgs,
};
use data_plane::upgrade_data_plane;

use k8s_openapi::api::core::v1::Pod;
use kube::ResourceExt;
use semver::Version;
use tracing::error;

/// Contains the data-plane upgrade logic.
pub(crate) mod data_plane;

Expand Down Expand Up @@ -54,9 +66,6 @@ async fn upgrade_product(opts: &CliArgs, event: &mut EventRecorder) -> Result<()
event.set_source_version(source_version.clone());
event.set_target_version(target_version.clone());

// Capture HA state before helm upgrade is consumed.
let ha_is_enabled = helm_upgrader.source_values().ha_is_enabled();

// Dry-run helm upgrade.
let dry_run_result: Result<HelmUpgradeRunner> = helm_upgrader.dry_run().await;
let run_helm_upgrade = match dry_run_result {
Expand All @@ -82,10 +91,13 @@ async fn upgrade_product(opts: &CliArgs, event: &mut EventRecorder) -> Result<()
.await?;

// Control plane containers are updated in this step.
if let Err(error) = run_helm_upgrade.await {
event.publish_unrecoverable(&error, false).await;
return Err(error);
}
let final_values = match run_helm_upgrade.await {
Ok(values) => values,
Err(error) => {
event.publish_unrecoverable(&error, false).await;
return Err(error);
}
};

event
.publish_normal(
Expand All @@ -96,6 +108,22 @@ async fn upgrade_product(opts: &CliArgs, event: &mut EventRecorder) -> Result<()

// Data plane containers are updated in this step.
if !opts.skip_data_plane_restart() {
let yet_to_upgrade_io_engine_label = format!(
"{IO_ENGINE_LABEL},{CHART_VERSION_LABEL_KEY}!={}",
target_version.as_str()
);
let yet_to_upgrade_io_engine_pods = KubeClient::list_pods(
opts.namespace(),
Some(yet_to_upgrade_io_engine_label.clone()),
None,
)
.await?;

partial_rebuild_check(
yet_to_upgrade_io_engine_pods.as_slice(),
final_values.partial_rebuild_is_enabled(),
)?;

event
.publish_normal(
format!("Upgrading {PRODUCT} data-plane"),
Expand All @@ -107,7 +135,9 @@ async fn upgrade_product(opts: &CliArgs, event: &mut EventRecorder) -> Result<()
opts.namespace(),
opts.rest_endpoint(),
target_version,
ha_is_enabled,
final_values.ha_is_enabled(),
yet_to_upgrade_io_engine_label,
yet_to_upgrade_io_engine_pods,
)
.await
{
Expand All @@ -132,3 +162,30 @@ async fn upgrade_product(opts: &CliArgs, event: &mut EventRecorder) -> Result<()

Ok(())
}

fn partial_rebuild_check(
yet_to_upgrade_io_engine_pods: &[Pod],
partial_rebuild_is_enabled: bool,
) -> Result<()> {
let partial_rebuild_disable_required = yet_to_upgrade_io_engine_pods
.iter()
.filter_map(|pod| pod.labels().get(CHART_VERSION_LABEL_KEY))
.any(|v| {
let version =
Version::parse(v).expect("failed to parse version from io-engine Pod label");
version.ge(&PARTIAL_REBUILD_DISABLE_EXTENTS.0)
& version.le(&PARTIAL_REBUILD_DISABLE_EXTENTS.1)
});

if partial_rebuild_disable_required && partial_rebuild_is_enabled {
error!("Partial rebuild must be disabled for upgrades from {CORE_CHART_NAME} chart versions >= {}, <= {}", PARTIAL_REBUILD_DISABLE_EXTENTS.0, PARTIAL_REBUILD_DISABLE_EXTENTS.1);
return PartialRebuildNotAllowed {
chart_name: CORE_CHART_NAME.to_string(),
lower_extent: PARTIAL_REBUILD_DISABLE_EXTENTS.0.to_string(),
upper_extent: PARTIAL_REBUILD_DISABLE_EXTENTS.1.to_string(),
}
.fail();
}

Ok(())
}
17 changes: 6 additions & 11 deletions k8s/upgrade/src/bin/upgrade-job/upgrade/data_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use crate::{
rest_client::RestClientSet,
},
upgrade::utils::{
all_pods_are_ready, cordon_storage_node, data_plane_is_upgraded, list_all_volumes,
rebuild_result, uncordon_storage_node, RebuildResult,
all_pods_are_ready, cordon_storage_node, list_all_volumes, rebuild_result,
uncordon_storage_node, RebuildResult,
},
};
use k8s_openapi::api::core::v1::{Node, Pod};
Expand All @@ -35,22 +35,17 @@ pub(crate) async fn upgrade_data_plane(
rest_endpoint: String,
upgrade_target_version: String,
ha_is_enabled: bool,
yet_to_upgrade_io_engine_label: String,
yet_to_upgrade_io_engine_pods: Vec<Pod>,
) -> Result<()> {
// This makes data-plane upgrade idempotent.
let io_engine_label = format!("{IO_ENGINE_LABEL},{CHART_VERSION_LABEL_KEY}");
let io_engine_pod_list =
KubeClient::list_pods(namespace.clone(), Some(io_engine_label), None).await?;

if data_plane_is_upgraded(&upgrade_target_version, &io_engine_pod_list).await? {
if yet_to_upgrade_io_engine_pods.is_empty() {
info!("Skipping data-plane upgrade: All data-plane Pods are already upgraded");
return Ok(());
}

// If here, then there is a need to proceed to data-plane upgrade.

let yet_to_upgrade_io_engine_label_selector =
format!("{IO_ENGINE_LABEL},{CHART_VERSION_LABEL_KEY}!={upgrade_target_version}");

// Generate storage REST API client.
let rest_client = RestClientSet::new_with_url(rest_endpoint)?;

Expand All @@ -74,7 +69,7 @@ pub(crate) async fn upgrade_data_plane(
loop {
let initial_io_engine_pod_list: Vec<Pod> = KubeClient::list_pods(
namespace.clone(),
Some(yet_to_upgrade_io_engine_label_selector.clone()),
Some(yet_to_upgrade_io_engine_label.clone()),
None,
)
.await?;
Expand Down
44 changes: 3 additions & 41 deletions k8s/upgrade/src/bin/upgrade-job/upgrade/utils.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use crate::common::{
constants::{CHART_VERSION_LABEL_KEY, PRODUCT},
constants::PRODUCT,
error::{
CordonStorageNode, EmptyStorageNodeSpec, GetStorageNode, HelmChartVersionLabelHasNoValue,
ListStorageVolumes, NoNamespaceInPod, Result, SemverParse, StorageNodeUncordon,
CordonStorageNode, EmptyStorageNodeSpec, GetStorageNode, ListStorageVolumes, Result,
StorageNodeUncordon,
},
rest_client::RestClientSet,
};
use k8s_openapi::api::core::v1::Pod;
use kube::ResourceExt;
use openapi::models::{CordonDrainState, Volume, VolumeStatus};
use semver::Version;
use snafu::ResultExt;
use std::{collections::HashSet, time::Duration};
use tracing::{info, warn};
Expand Down Expand Up @@ -176,43 +175,6 @@ pub(crate) fn all_pods_are_ready(pod_list: Vec<Pod>) -> bool {
true
}

/// Checks to see if all of io-engine Pods are already upgraded to the version of the local helm
/// chart.
pub(crate) async fn data_plane_is_upgraded(
target_version: &str,
io_engine_pod_list: &Vec<Pod>,
) -> Result<bool> {
let target_version_requirement: Version =
Version::parse(target_version).context(SemverParse {
version_string: target_version.to_string(),
})?;

for pod in io_engine_pod_list {
let version_str = pod.labels().get(CHART_VERSION_LABEL_KEY).ok_or(
HelmChartVersionLabelHasNoValue {
pod_name: pod.name_any(),
namespace: pod.namespace().ok_or(
NoNamespaceInPod {
pod_name: pod.name_any(),
context: "checking to see if data-plane Pods are already upgraded"
.to_string(),
}
.build(),
)?,
}
.build(),
)?;
let version = Version::parse(version_str).context(SemverParse {
version_string: version_str.clone(),
})?;
if !target_version_requirement.eq(&version) {
return Ok(false);
}
}

Ok(true)
}

/// Cordon storage node.
pub(crate) async fn cordon_storage_node(
node_id: &str,
Expand Down

0 comments on commit f3c623f

Please sign in to comment.