Skip to content
This repository has been archived by the owner on Nov 21, 2024. It is now read-only.

add webhooks function (Http, SlackIncomingWebhook) #259

Merged
merged 3 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion core/utils/src/wave_config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::config_path::find_file_in_wa;
use serde::Deserialize;
use std::fs::File;
use std::{collections::HashMap, fs::File};
use tracing::{debug, error};

const CONFIG_FILE_NAME: &str = "wave-config.yaml";
Expand All @@ -18,6 +18,10 @@ const DEFAULT_WEB_UI: bool = true;
const DEFAULT_WEB_UI_HOST: &str = "0.0.0.0";
const DEFAULT_WEB_UI_PORT: u16 = 3025;
const DEFAULT_RESET_DEFINITIONS_ON_STARTUP: bool = false;
const DEFAULT_WEBHOOKS: Option<Vec<Webhooks>> = None;
const DEFAULT_WEBHOOKS_URL: Option<String> = None;
const DEFAULT_WEBHOOKS_HEADERS: Option<HashMap<String, String>> = None;
const DEFAULT_WEBHOOKS_WEBHOOK_URL: Option<String> = None;

fn default_debug() -> bool {
DEFAULT_DEBUG
Expand Down Expand Up @@ -58,6 +62,18 @@ fn default_web_ui_port() -> u16 {
fn default_reset_definitions_on_startup() -> bool {
DEFAULT_RESET_DEFINITIONS_ON_STARTUP
}
fn default_webhooks() -> Option<Vec<Webhooks>> {
DEFAULT_WEBHOOKS
}
fn default_webhooks_url() -> Option<String> {
DEFAULT_WEBHOOKS_URL
}
fn default_webhooks_headers() -> Option<HashMap<String, String>> {
DEFAULT_WEBHOOKS_HEADERS
}
fn default_webhooks_webhook_url() -> Option<String> {
DEFAULT_WEBHOOKS_WEBHOOK_URL
}

#[derive(Debug, PartialEq, Deserialize, Default, Clone)]
struct DownloadUrlDefinition {
Expand Down Expand Up @@ -127,6 +143,31 @@ pub struct WaveConfig {
vector: DownloadUrlDefinition,
#[serde(default)]
telegraf: DownloadUrlDefinition,

//
// Web hooks
//
#[serde(default = "default_webhooks")]
pub webhooks: Option<Vec<Webhooks>>,
}

#[derive(PartialEq, Clone, Deserialize, Debug)]
pub struct Webhooks {
pub id: String,
pub webhook_type: WebhookType,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignore the upper or the lower cases

#[serde(default = "default_webhooks_url")]
pub url: Option<String>,
#[serde(default = "default_webhooks_headers")]
pub headers: Option<HashMap<String, String>>,
#[serde(default = "default_webhooks_webhook_url")]
pub webhook_url: Option<String>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge webhook_url with url

}

#[derive(Debug, PartialEq, Deserialize, Clone)]
pub enum WebhookType {
Http,
SlackIncomingWebhook,
// SlackOauth, // TODO: To be developed.
}

impl Default for WaveConfig {
Expand All @@ -147,6 +188,7 @@ impl Default for WaveConfig {
web_ui_port: DEFAULT_WEB_UI_PORT,
vector: DownloadUrlDefinition::default(),
telegraf: DownloadUrlDefinition::default(),
webhooks: DEFAULT_WEBHOOKS,
}
}
}
Expand Down Expand Up @@ -227,5 +269,6 @@ mod tests {
assert_eq!(wave_config.web_ui, DEFAULT_WEB_UI);
assert_eq!(wave_config.web_ui_host, DEFAULT_WEB_UI_HOST);
assert_eq!(wave_config.web_ui_port, DEFAULT_WEB_UI_PORT);
assert_eq!(wave_config.webhooks, DEFAULT_WEBHOOKS);
}
}
7 changes: 6 additions & 1 deletion core/wave-autoscale/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ pub struct App {
}

impl App {
pub async fn new(wave_config: WaveConfig, shared_data_layer: Arc<DataLayer>) -> Self {
pub async fn new(
wave_config: WaveConfig,
shared_data_layer: Arc<DataLayer>,
webhooks: Option<Vec<utils::wave_config::Webhooks>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the webhooks and reuse the wave_config

) -> Self {
// Create MetricUpdater
let shared_metric_updater = MetricUpdater::new_shared(shared_data_layer.clone(), 1000);

Expand All @@ -33,6 +37,7 @@ impl App {
shared_data_layer.clone(),
shared_metric_updater.clone(),
shared_scaling_component_manager.clone(),
webhooks,
);

// Create App
Expand Down
7 changes: 6 additions & 1 deletion core/wave-autoscale/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,12 @@ async fn main() {
}

// Run the main application(controller)
let mut app = app::App::new(wave_config.clone(), shared_data_layer.clone()).await;
let mut app = app::App::new(
wave_config.clone(),
shared_data_layer.clone(),
wave_config.webhooks.clone(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the line

)
.await;

//
// Run some jobs (Autoscaling History Remover, Reset definitions on startup, Watch the definition file, and the main application(controller))
Expand Down
51 changes: 47 additions & 4 deletions core/wave-autoscale/src/scaling_planner/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod scaling_planner_manager;
mod js_functions;
mod webhooks;

use crate::{
metric_updater::SharedMetricUpdater, scaling_component::SharedScalingComponentManager,
Expand Down Expand Up @@ -103,7 +104,11 @@ async fn create_autoscaling_history(
expression_value_map: Option<&Vec<HashMap<String, Option<f64>>>>,
scaling_components_metadata: Option<&Value>,
fail_message: Option<String>,
plan_webhooks: Option<Vec<String>>,
webhooks: Option<Vec<utils::wave_config::Webhooks>>,
) {
let plan_item_id = plan_item.clone().id;

let metric_values_json = if let Some(expression_value_map) = expression_value_map {
json!(expression_value_map).to_string()
} else {
Expand All @@ -117,11 +122,11 @@ async fn create_autoscaling_history(
};
let autoscaling_history: AutoscalingHistoryDefinition = AutoscalingHistoryDefinition::new(
plan_db_id,
plan_id,
json!(plan_item).to_string(),
plan_id.clone(),
json!(plan_item.clone()).to_string(),
metric_values_json,
metadata_values_json,
fail_message,
metadata_values_json.clone(),
fail_message.clone(),
);
debug!(
"[ScalingPlanner] autoscaling_history - {:?}",
Expand All @@ -130,6 +135,14 @@ async fn create_autoscaling_history(
let _ = data_layer
.add_autoscaling_history(autoscaling_history)
.await;

let webhook_response = webhooks::WebhookResponse {
plan_id,
plan_item_id,
scaling_component_json_str: metadata_values_json,
fail_message: fail_message.clone(),
};
webhooks::send_webhooks(webhooks, plan_webhooks, webhook_response);
}

pub struct ScalingPlanner {
Expand All @@ -141,6 +154,7 @@ pub struct ScalingPlanner {
last_cool_down: Arc<RwLock<u64>>,
data_layer: Arc<DataLayer>,
task: Option<JoinHandle<()>>,
webhooks: Option<Vec<utils::wave_config::Webhooks>>,
// For instant action
action_task: Option<JoinHandle<()>>,
last_plan_item_id_by_action: Arc<RwLock<String>>,
Expand All @@ -153,6 +167,7 @@ impl<'a> ScalingPlanner {
metric_updater: SharedMetricUpdater,
scaling_component_manager: SharedScalingComponentManager,
data_layer: Arc<DataLayer>,
webhooks: Option<Vec<utils::wave_config::Webhooks>>,
) -> Self {
ScalingPlanner {
definition,
Expand All @@ -163,6 +178,7 @@ impl<'a> ScalingPlanner {
last_cool_down: Arc::new(RwLock::new(0)),
data_layer,
task: None,
webhooks,
action_task: None,
last_plan_item_id_by_action: Arc::new(RwLock::new(String::new())),
last_plan_timestamp_by_action: Arc::new(RwLock::new(None)),
Expand All @@ -185,6 +201,7 @@ impl<'a> ScalingPlanner {
let shared_last_plan_timestamp = self.last_plan_timestamp.clone();
let shared_last_cool_down = self.last_cool_down.clone();
let data_layer: Arc<DataLayer> = self.data_layer.clone();
let webhooks = self.webhooks.clone();

// PlanDefinition
let scaling_plan_definition = self.definition.clone();
Expand Down Expand Up @@ -215,6 +232,25 @@ impl<'a> ScalingPlanner {
} else {
plan_interval
};
let plan_webhooks = match plan_metadata.get("webhooks") {
Some(plan_webhooks) => {
let Some(plan_webhooks) = plan_webhooks.as_array() else {
error!("[ScalingPlanner] Failed to get plan webhooks Not Array - {:?}", plan_webhooks);
return;
};
let plan_webhooks = plan_webhooks
.iter()
.map(|webhook|
webhook.as_str().unwrap_or_else(|| {
error!("[ScalingPlanner] Failed to get plan webhook Not String - {:?}", webhook);
""
}).to_string()
)
.collect::<Vec<String>>();
Some(plan_webhooks)
}
None => None,
};

let plan_items = self.sort_plan_by_priority();

Expand Down Expand Up @@ -358,6 +394,8 @@ impl<'a> ScalingPlanner {
None,
None,
Some("Failed to parse cron expression".to_string()),
plan_webhooks.clone(),
webhooks.clone(),
)
.await;
// Skip this plan
Expand Down Expand Up @@ -440,6 +478,8 @@ impl<'a> ScalingPlanner {
None,
None,
expression_result.message,
plan_webhooks.clone(),
webhooks.clone(),
)
.await;
}
Expand Down Expand Up @@ -500,6 +540,8 @@ impl<'a> ScalingPlanner {
Some(&expression_value_map_for_history),
Some(&scaling_components_metadata[index]),
fail_message,
plan_webhooks.clone(),
webhooks.clone(),
)
.await;
}
Expand Down Expand Up @@ -712,6 +754,7 @@ mod tests {
shared_metric_updater,
scaling_component_manager,
data_layer.clone(),
None,
);
(data_layer, scaling_planner)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,35 @@ pub struct ScalingPlannerManager {
data_layer: Arc<DataLayer>,
metric_updater: SharedMetricUpdater,
scaling_component_manager: SharedScalingComponentManager,
webhooks: Option<Vec<utils::wave_config::Webhooks>>,
}

impl ScalingPlannerManager {
pub fn new(
data_layer: Arc<DataLayer>,
metric_updater: SharedMetricUpdater,
scaling_component_manager: SharedScalingComponentManager,
webhooks: Option<Vec<utils::wave_config::Webhooks>>,
) -> Self {
ScalingPlannerManager {
scaling_planners: HashMap::new(),
data_layer,
metric_updater,
scaling_component_manager,
webhooks,
}
}
pub fn new_shared(
data_layer: Arc<DataLayer>,
metric_updater: SharedMetricUpdater,
scaling_component_manager: SharedScalingComponentManager,
webhooks: Option<Vec<utils::wave_config::Webhooks>>,
) -> SharedScalingPlannerManager {
Arc::new(RwLock::new(ScalingPlannerManager::new(
data_layer,
metric_updater,
scaling_component_manager,
webhooks,
)))
}

Expand All @@ -51,6 +56,7 @@ impl ScalingPlannerManager {
self.metric_updater.clone(),
self.scaling_component_manager.clone(),
self.data_layer.clone(),
self.webhooks.clone(),
))
}

Expand Down
Loading
Loading