diff --git a/core/utils/src/wave_config.rs b/core/utils/src/wave_config.rs index dd34e6fa..640844bf 100644 --- a/core/utils/src/wave_config.rs +++ b/core/utils/src/wave_config.rs @@ -1,6 +1,6 @@ use crate::config_path::find_file_in_wa; use serde::Deserialize; -use std::fs::File; +use std::{collections::HashMap, fs::File}; use tracing::{debug, error}; const CONFIG_FILE_NAME: &str = "wave-config.yaml"; @@ -18,6 +18,9 @@ const DEFAULT_WEB_UI: bool = true; const DEFAULT_WEB_UI_HOST: &str = "0.0.0.0"; const DEFAULT_WEB_UI_PORT: u16 = 3025; const DEFAULT_RESET_DEFINITIONS_ON_STARTUP: bool = false; +const DEFAULT_WEBHOOKS: Option> = None; +const DEFAULT_WEBHOOKS_URL: Option = None; +const DEFAULT_WEBHOOKS_HEADERS: Option> = None; fn default_debug() -> bool { DEFAULT_DEBUG @@ -58,6 +61,15 @@ fn default_web_ui_port() -> u16 { fn default_reset_definitions_on_startup() -> bool { DEFAULT_RESET_DEFINITIONS_ON_STARTUP } +fn default_webhooks() -> Option> { + DEFAULT_WEBHOOKS +} +fn default_webhooks_url() -> Option { + DEFAULT_WEBHOOKS_URL +} +fn default_webhooks_headers() -> Option> { + DEFAULT_WEBHOOKS_HEADERS +} #[derive(Debug, PartialEq, Deserialize, Default, Clone)] struct DownloadUrlDefinition { @@ -127,6 +139,31 @@ pub struct WaveConfig { vector: DownloadUrlDefinition, #[serde(default)] telegraf: DownloadUrlDefinition, + + // + // Web hooks + // + #[serde(default = "default_webhooks")] + pub webhooks: Option>, +} + +#[derive(PartialEq, Clone, Deserialize, Debug)] +pub struct Webhooks { + pub id: String, + pub webhook_type: WebhookType, + #[serde(default = "default_webhooks_url")] + pub url: Option, + #[serde(default = "default_webhooks_headers")] + pub headers: Option>, +} + +#[derive(Debug, PartialEq, Deserialize, Clone)] +pub enum WebhookType { + #[serde(alias = "Http", alias = "http")] + Http, + #[serde(alias = "SlackIncomingWebhook", alias = "slackincomingwebhook")] + SlackIncomingWebhook, + // SlackOauth, // TODO: To be developed. } impl Default for WaveConfig { @@ -147,6 +184,7 @@ impl Default for WaveConfig { web_ui_port: DEFAULT_WEB_UI_PORT, vector: DownloadUrlDefinition::default(), telegraf: DownloadUrlDefinition::default(), + webhooks: DEFAULT_WEBHOOKS, } } } @@ -227,5 +265,6 @@ mod tests { assert_eq!(wave_config.web_ui, DEFAULT_WEB_UI); assert_eq!(wave_config.web_ui_host, DEFAULT_WEB_UI_HOST); assert_eq!(wave_config.web_ui_port, DEFAULT_WEB_UI_PORT); + assert_eq!(wave_config.webhooks, DEFAULT_WEBHOOKS); } } diff --git a/core/wave-autoscale/src/app.rs b/core/wave-autoscale/src/app.rs index 9b8b265d..6f962050 100644 --- a/core/wave-autoscale/src/app.rs +++ b/core/wave-autoscale/src/app.rs @@ -33,6 +33,7 @@ impl App { shared_data_layer.clone(), shared_metric_updater.clone(), shared_scaling_component_manager.clone(), + wave_config.webhooks.clone(), ); // Create App diff --git a/core/wave-autoscale/src/scaling_planner/mod.rs b/core/wave-autoscale/src/scaling_planner/mod.rs index d1dff0ea..e55ae497 100644 --- a/core/wave-autoscale/src/scaling_planner/mod.rs +++ b/core/wave-autoscale/src/scaling_planner/mod.rs @@ -1,5 +1,6 @@ pub mod scaling_planner_manager; mod js_functions; +mod webhooks; use crate::{ metric_updater::SharedMetricUpdater, scaling_component::SharedScalingComponentManager, @@ -103,7 +104,11 @@ async fn create_autoscaling_history( expression_value_map: Option<&Vec>>>, scaling_components_metadata: Option<&Value>, fail_message: Option, + plan_webhooks: Option>, + webhooks: Option>, ) { + let plan_item_id = plan_item.clone().id; + let metric_values_json = if let Some(expression_value_map) = expression_value_map { json!(expression_value_map).to_string() } else { @@ -117,11 +122,11 @@ async fn create_autoscaling_history( }; let autoscaling_history: AutoscalingHistoryDefinition = AutoscalingHistoryDefinition::new( plan_db_id, - plan_id, - json!(plan_item).to_string(), + plan_id.clone(), + json!(plan_item.clone()).to_string(), metric_values_json, - metadata_values_json, - fail_message, + metadata_values_json.clone(), + fail_message.clone(), ); debug!( "[ScalingPlanner] autoscaling_history - {:?}", @@ -130,6 +135,14 @@ async fn create_autoscaling_history( let _ = data_layer .add_autoscaling_history(autoscaling_history) .await; + + let webhook_request_body = webhooks::WebhookRequestBody { + plan_id, + plan_item_id, + scaling_component_json_str: metadata_values_json, + fail_message: fail_message.clone(), + }; + webhooks::send_webhooks(webhooks, plan_webhooks, webhook_request_body); } pub struct ScalingPlanner { @@ -141,6 +154,7 @@ pub struct ScalingPlanner { last_cool_down: Arc>, data_layer: Arc, task: Option>, + webhooks: Option>, // For instant action action_task: Option>, last_plan_item_id_by_action: Arc>, @@ -153,6 +167,7 @@ impl<'a> ScalingPlanner { metric_updater: SharedMetricUpdater, scaling_component_manager: SharedScalingComponentManager, data_layer: Arc, + webhooks: Option>, ) -> Self { ScalingPlanner { definition, @@ -163,6 +178,7 @@ impl<'a> ScalingPlanner { last_cool_down: Arc::new(RwLock::new(0)), data_layer, task: None, + webhooks, action_task: None, last_plan_item_id_by_action: Arc::new(RwLock::new(String::new())), last_plan_timestamp_by_action: Arc::new(RwLock::new(None)), @@ -185,6 +201,7 @@ impl<'a> ScalingPlanner { let shared_last_plan_timestamp = self.last_plan_timestamp.clone(); let shared_last_cool_down = self.last_cool_down.clone(); let data_layer: Arc = self.data_layer.clone(); + let webhooks = self.webhooks.clone(); // PlanDefinition let scaling_plan_definition = self.definition.clone(); @@ -215,6 +232,25 @@ impl<'a> ScalingPlanner { } else { plan_interval }; + let plan_webhooks = match plan_metadata.get("webhooks") { + Some(plan_webhooks) => { + let Some(plan_webhooks) = plan_webhooks.as_array() else { + error!("[ScalingPlanner] Failed to get plan webhooks Not Array - {:?}", plan_webhooks); + return; + }; + let plan_webhooks = plan_webhooks + .iter() + .map(|webhook| + webhook.as_str().unwrap_or_else(|| { + error!("[ScalingPlanner] Failed to get plan webhook Not String - {:?}", webhook); + "" + }).to_string() + ) + .collect::>(); + Some(plan_webhooks) + } + None => None, + }; let plan_items = self.sort_plan_by_priority(); @@ -358,6 +394,8 @@ impl<'a> ScalingPlanner { None, None, Some("Failed to parse cron expression".to_string()), + plan_webhooks.clone(), + webhooks.clone(), ) .await; // Skip this plan @@ -440,6 +478,8 @@ impl<'a> ScalingPlanner { None, None, expression_result.message, + plan_webhooks.clone(), + webhooks.clone(), ) .await; } @@ -500,6 +540,8 @@ impl<'a> ScalingPlanner { Some(&expression_value_map_for_history), Some(&scaling_components_metadata[index]), fail_message, + plan_webhooks.clone(), + webhooks.clone(), ) .await; } @@ -712,6 +754,7 @@ mod tests { shared_metric_updater, scaling_component_manager, data_layer.clone(), + None, ); (data_layer, scaling_planner) } diff --git a/core/wave-autoscale/src/scaling_planner/scaling_planner_manager.rs b/core/wave-autoscale/src/scaling_planner/scaling_planner_manager.rs index e1c04caf..738af488 100644 --- a/core/wave-autoscale/src/scaling_planner/scaling_planner_manager.rs +++ b/core/wave-autoscale/src/scaling_planner/scaling_planner_manager.rs @@ -17,6 +17,7 @@ pub struct ScalingPlannerManager { data_layer: Arc, metric_updater: SharedMetricUpdater, scaling_component_manager: SharedScalingComponentManager, + webhooks: Option>, } impl ScalingPlannerManager { @@ -24,23 +25,27 @@ impl ScalingPlannerManager { data_layer: Arc, metric_updater: SharedMetricUpdater, scaling_component_manager: SharedScalingComponentManager, + webhooks: Option>, ) -> Self { ScalingPlannerManager { scaling_planners: HashMap::new(), data_layer, metric_updater, scaling_component_manager, + webhooks, } } pub fn new_shared( data_layer: Arc, metric_updater: SharedMetricUpdater, scaling_component_manager: SharedScalingComponentManager, + webhooks: Option>, ) -> SharedScalingPlannerManager { Arc::new(RwLock::new(ScalingPlannerManager::new( data_layer, metric_updater, scaling_component_manager, + webhooks, ))) } @@ -51,6 +56,7 @@ impl ScalingPlannerManager { self.metric_updater.clone(), self.scaling_component_manager.clone(), self.data_layer.clone(), + self.webhooks.clone(), )) } diff --git a/core/wave-autoscale/src/scaling_planner/webhooks.rs b/core/wave-autoscale/src/scaling_planner/webhooks.rs new file mode 100644 index 00000000..a61af9e8 --- /dev/null +++ b/core/wave-autoscale/src/scaling_planner/webhooks.rs @@ -0,0 +1,256 @@ +use reqwest::header::HeaderMap; +use reqwest::Client; +use std::collections::HashMap; +use tracing::error; +use utils::wave_config::WebhookType; +use utils::wave_config::Webhooks; + +#[derive(Clone)] +pub struct WebhookRequestBody { + pub plan_id: String, + pub plan_item_id: String, + pub scaling_component_json_str: String, + pub fail_message: Option, +} +impl WebhookRequestBody { + pub fn to_http(&self) -> Option { + let mut scaling_component = serde_json::json!(""); + if !self.scaling_component_json_str.is_empty() { + let Ok(scaling_component_json) = serde_json::from_str::(self.scaling_component_json_str.as_str()) else { + error!( + "[Webhook] Failed to send webhook: Failed to parse scaling_component_json_str (json)" + ); + return None; + }; + scaling_component = scaling_component_json; + } + Some(serde_json::json!({ + "timestamp": chrono::Utc::now().timestamp(), + "plan_id": self.plan_id, + "plan_item_id": self.plan_item_id, + "scaling_component": scaling_component, + "status": if self.fail_message.is_some() { PlanStatus::Fail.to_string() } else { PlanStatus::Success.to_string() }, + "fail_message": if self.fail_message.is_some() { self.fail_message.clone().unwrap() } else { "".to_string() }, + })) + } + + pub fn to_slack(&self) -> Option { + let slack_date_time = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S").to_string(); + let mut scaling_component = "-\n".to_string(); + if !self.scaling_component_json_str.is_empty() { + let Ok(scaling_component_json) = serde_json::from_str::(self.scaling_component_json_str.as_str()) else { + error!( + "[Webhook] Failed to send webhook: Failed to parse scaling_component_json_str (json)" + ); + return None; + }; + let Ok(scaling_component_yaml) = serde_yaml::to_string(&scaling_component_json) else { + error!( + "[Webhook] Failed to send webhook: Failed to parse scaling_component_json_str (yaml)" + ); + return None; + }; + scaling_component = scaling_component_yaml; + } + + Some(serde_json::json!({ + "blocks": [ + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": if self.fail_message.is_some() { format!(":X: *FAIL*\nFail Message: *{}*", self.fail_message.clone().unwrap()) } else { ":white_check_mark: *SUCCESS*".to_string() } + } + }, + { + "type": "context", + "elements": [ + { + "text": format!("*{}* | Wave Autoscale - Scaling Plan History", slack_date_time), + "type": "mrkdwn" + } + ] + }, + { + "type": "divider" + }, + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": format!("Plan ID: *{}*\nPlan Item ID: *{}*\nScaling Component:\n ```{}```", self.plan_id, self.plan_item_id, scaling_component) + } + } + ] + })) + } +} + +enum PlanStatus { + Success, + Fail, +} +impl std::fmt::Display for PlanStatus { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + PlanStatus::Success => write!(f, "SUCCESS"), + PlanStatus::Fail => write!(f, "FAIL"), + } + } +} + +pub fn send_webhooks( + webhooks: Option>, + plan_webhooks: Option>, + webhook_request_body: WebhookRequestBody, +) { + tokio::spawn(async move { + if let (Some(plan_webhooks), Some(webhooks)) = (plan_webhooks, webhooks) { + let plan_webhooks_map: HashMap = plan_webhooks + .iter() + .map(|webhook_id| (webhook_id.clone(), "".to_string())) + .collect(); + for webhook in webhooks { + if !(plan_webhooks_map.contains_key(webhook.id.as_str())) { + return; + } + match webhook.webhook_type { + WebhookType::Http => { + let _ = send_webhook_http(webhook, webhook_request_body.clone()).await; + } + WebhookType::SlackIncomingWebhook => { + let _ = send_webhook_slack_incoming_webhook( + webhook, + webhook_request_body.clone(), + ) + .await; + } + } + } + } + }); +} + +async fn send_webhook_http( + webhook: Webhooks, + webhook_request_body: WebhookRequestBody, +) -> Result<(), anyhow::Error> { + let Some(url) = webhook.url else { + error!("[Webhook] Failed to send webhook: url is not set"); + return Err(anyhow::anyhow!("[Webhook] Failed to send webhook: url is not set")); + }; + let client = Client::new(); + let mut headers = HeaderMap::new(); + if let Some(headers_map) = webhook.headers { + for (key, value) in headers_map { + headers.insert( + reqwest::header::HeaderName::from_bytes(key.as_bytes()).unwrap(), + reqwest::header::HeaderValue::from_str(value.as_str()).unwrap(), + ); + } + } + let Some(webhook_request_body_tohttp) = webhook_request_body.to_http() else { + error!("[Webhook] Failed to send webhook: Failed to parse webhook_request_body"); + return Err(anyhow::anyhow!("[Webhook] Failed to send webhook: Failed to parse webhook_request_body")); + }; + let response = client + .post(&url) + .headers(headers) + .json(&webhook_request_body_tohttp) + .send() + .await; + if let Err(e) = response { + error!("[Webhook] Failed to send webhook HTTP: {}", e); + return Err(anyhow::anyhow!("[Webhook] Failed to send webhook HTTP")); + } + Ok(()) +} + +async fn send_webhook_slack_incoming_webhook( + webhook: Webhooks, + webhook_request_body: WebhookRequestBody, +) -> Result<(), anyhow::Error> { + let Some(url) = webhook.url else { + error!("[Webhook] Failed to send webhook: url is not set"); + return Err(anyhow::anyhow!("[Webhook] Failed to send webhook: url is not set")); + }; + let client = Client::new(); + let Some(webhook_request_body_for_slack) = webhook_request_body.to_slack() else { + error!("[Webhook] Failed to send webhook: Failed to parse webhook_request_body"); + return Err(anyhow::anyhow!("[Webhook] Failed to send webhook: Failed to parse webhook_request_body")); + }; + let response = client + .post(&url) + .json(&webhook_request_body_for_slack) + .send() + .await; + if let Err(e) = response { + error!( + "[Webhook] Failed to send webhook Slack incoming webhook: {}", + e + ); + return Err(anyhow::anyhow!( + "[Webhook] Failed to send webhook Slack incoming webhook" + )); + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_convert_json_str_to_yaml() { + let json_str = r#"{"component_id":"k8s_node_dp","replicas":"1"}"#; + let scaling_component_json = serde_json::from_str::(json_str).unwrap(); + let scaling_component_yaml = serde_yaml::to_string(&scaling_component_json).unwrap(); + assert_eq!( + scaling_component_yaml, + "component_id: k8s_node_dp\nreplicas: '1'\n" + ); + } + + #[ignore] + #[tokio::test] + async fn test_send_webhook_http() { + let webhooks = Webhooks { + id: "test".to_string(), + webhook_type: WebhookType::Http, + url: Some("http://localhost:3024/api/test".to_string()), + headers: None, + }; + let webhook_request_body = WebhookRequestBody { + plan_id: "test-plan-1".to_string(), + plan_item_id: "test-plan-item-1".to_string(), + scaling_component_json_str: r#"{"component_id":"k8s_node_dp","replicas":"1"}"# + .to_string(), + fail_message: None, + }; + let send_webhook_http = send_webhook_http(webhooks, webhook_request_body).await; + assert!(send_webhook_http.is_ok()); + } + + #[ignore] + #[tokio::test] + async fn test_send_webhook_slack_incoming_webhook() { + let webhooks = Webhooks { + id: "test".to_string(), + webhook_type: WebhookType::SlackIncomingWebhook, + url: Some( + "https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX" + .to_string(), + ), + headers: None, + }; + let webhook_request_body = WebhookRequestBody { + plan_id: "test-plan-1".to_string(), + plan_item_id: "test-plan-item-1".to_string(), + scaling_component_json_str: r#""#.to_string(), + fail_message: None, + }; + let send_webhook_slack_incoming_webhook = + send_webhook_slack_incoming_webhook(webhooks, webhook_request_body).await; + assert!(send_webhook_slack_incoming_webhook.is_ok()); + } +}