diff --git a/core/wave-autoscale/Cargo.toml b/core/wave-autoscale/Cargo.toml index 89db92a0..8503b3cf 100644 --- a/core/wave-autoscale/Cargo.toml +++ b/core/wave-autoscale/Cargo.toml @@ -76,6 +76,7 @@ strum = { version = "0.25.0" } strum_macros = { version = "0.25.0" } tracing = { version = "0.1.40" } tracing-subscriber = { version = "0.3.17" } +serde_json_path = { version = "0.6.2" } [dev-dependencies] handlebars = "4.3.7" diff --git a/core/wave-autoscale/src/metric_collector_manager/mod.rs b/core/wave-autoscale/src/metric_collector_manager/mod.rs index 2c02f9d6..0dee5134 100644 --- a/core/wave-autoscale/src/metric_collector_manager/mod.rs +++ b/core/wave-autoscale/src/metric_collector_manager/mod.rs @@ -2,6 +2,7 @@ use data_layer::MetricDefinition; use flate2::read::GzDecoder; use futures_util::StreamExt; use std::cmp::min; +use std::collections::HashMap; use std::fs::File; use std::io::Write; use tar::Archive; @@ -214,118 +215,8 @@ impl MetricCollectorManager { * ... * */ - let mut root_toml = "".to_string(); - // Create a TOML array representing the metric definitions - for metric_definition in metric_definitions { - if metric_definition.collector != "vector" { - continue; - } - - // metadata.sinks remove - let mut metadata = metric_definition.metadata.clone(); - metadata.remove("sinks"); - - // convert metric_definition.metadata to toml - let Ok(metadata_toml) = - serde_json::from_value::(serde_json::json!(metadata)) else { - error!("[vector] Failed to convert metadata to toml"); - continue; - }; - - let Some(sinks) = metric_definition.metadata.get("sinks") else { - error!("[vector] sinks not found"); - continue; - }; - - let Some(sinks_object) = sinks.as_object() else { - error!("[vector] Failed to convert metadata.sinks to as_object"); - continue; - }; - - // find sinks intput - let mut sinks_input = Vec::::new(); - sinks_object.keys().for_each(|key| { - let Some(key_object) = sinks.get(key) else { - error!("[vector] Failed to convert metadata.sinks to as_object for key: {}", key); - return; - }; - let Some(sinks_type_data) = key_object.get("type") else { - error!("[vector] missing metadata.sinks type"); - return; - }; - let Some(sinks_inputs_data) = key_object.get("inputs") else { - error!("[vector] missing metadata.sinks inputs"); - return; - }; - if sinks_type_data == "wave-autoscale" { - let Some(sinks_inputs_data_arr) = sinks_inputs_data.as_array() else { - error!("[vector] Failed to convert metadata.sinks.inputs to as_array"); - return; - }; - for arr_data in sinks_inputs_data_arr { - let Some(input_str) = arr_data.as_str() else { - error!("[vector] sinks > wave-autoscale > input is not string"); - return; - }; - sinks_input.push(toml::Value::String(input_str.to_string())); - } - } - }); - if sinks_input.is_empty() { - error!("[vector] missing sinks > type: wave-autoscale > inputs data"); - continue; - } - - // make new sinks - let mut sinks_metric = toml::value::Table::new(); - sinks_metric.insert("type".to_string(), toml::Value::String("http".to_string())); - sinks_metric.insert("inputs".to_string(), toml::Value::Array(sinks_input)); - sinks_metric.insert( - "uri".to_string(), - toml::Value::String(format!( - "{}?metric_id={}&collector=vector", - self.output_url, metric_definition.id - )), - ); - sinks_metric.insert( - "method".to_string(), - toml::Value::String("post".to_string()), - ); - sinks_metric.insert( - "compression".to_string(), - toml::Value::String("gzip".to_string()), - ); - sinks_metric.insert( - "payload_prefix".to_string(), - toml::Value::String("{\"metrics\": ".to_string()), - ); - sinks_metric.insert( - "payload_suffix".to_string(), - toml::Value::String("}".to_string()), - ); - let mut encoding = toml::value::Table::new(); - encoding.insert("codec".to_string(), toml::Value::String("json".to_string())); - sinks_metric.insert("encoding".to_string(), toml::Value::Table(encoding)); - - let sink_metric_id = format!("output_{}", metric_definition.id); - let mut sinks_toml = toml::value::Table::new(); - sinks_toml.insert(sink_metric_id, toml::Value::Table(sinks_metric)); - let mut root_sinks_toml = toml::value::Table::new(); - root_sinks_toml.insert("sinks".to_string(), toml::Value::Table(sinks_toml)); - - let Ok(metadata_toml_str) = toml::to_string(&metadata_toml) else { - error!("[vector] Failed to convert metadata to toml string"); - continue; - }; - let Ok(root_sinks_toml_str) = toml::to_string(&root_sinks_toml) else { - error!("[vector] Failed to convert metadata.sinks to toml string"); - continue; - }; - - root_toml = root_toml + "\n" + &metadata_toml_str + "\n" + &root_sinks_toml_str + "\n"; - } - - debug!("Vector config:\n{}", root_toml); + let root_toml = + convert_metric_definitions_to_vector_toml(metric_definitions, self.output_url.clone()); // Write the string to a file std::fs::write(config_path, root_toml).unwrap(); @@ -378,77 +269,10 @@ impl MetricCollectorManager { * metric_id = "cloudwatch_metrics" * */ - let mut root_toml = "".to_string(); - // Create a TOML array representing the metric definitions - for metric_definition in metric_definitions { - if metric_definition.collector != "telegraf" { - continue; - } - - // metadata.outputs remove - let mut metadata = metric_definition.metadata.clone(); - metadata.remove("outputs"); - - // convert metric_definition.metadata to toml - let Ok(metadata_toml) = - serde_json::from_value::(serde_json::json!(metadata)) else { - error!("[telegraf] Failed to convert metadata to toml"); - continue; - }; - - let mut outputs_toml = toml::value::Array::new(); - if metric_definition.collector == "telegraf" { - let Some(outputs) = metric_definition.metadata.get("outputs") else { - error!("[telegraf] outputs not found"); - continue; - }; - - // find output waveautoscale - if outputs.get("wave-autoscale").is_some() { - // make new output - let mut output_metric = toml::value::Table::new(); - output_metric.insert( - "url".to_string(), - toml::Value::String(format!( - "{}?metric_id={}&collector=telegraf", - self.output_url, metric_definition.id - )), - ); - output_metric.insert( - "method".to_string(), - toml::Value::String("POST".to_string()), - ); - output_metric.insert( - "data_format".to_string(), - toml::Value::String("json".to_string()), - ); - let Ok(required_tags) = serde_json::from_str::(serde_json::json!({ "metric_id" : [metric_definition.id.to_string()] }).to_string().as_str()) else { - continue; - }; - output_metric.insert("tagpass".to_string(), required_tags); - outputs_toml.push(toml::Value::Table(output_metric)); - } - } - // [[outputs]] - let mut outputs = toml::value::Table::new(); - outputs.insert("http".to_string(), toml::Value::Array(outputs_toml)); - let mut root_outputs = toml::value::Table::new(); - root_outputs.insert("outputs".to_string(), toml::Value::Table(outputs)); - - let Ok(metadata_toml_str) = toml::to_string(&metadata_toml) else { - error!("[telegraf] Failed to convert metadata to toml string"); - continue; - }; - let Ok(root_outputs_toml_str) = toml::to_string(&root_outputs) else { - error!("[telegraf] Failed to convert metadata.output to toml string"); - continue; - }; - - root_toml = - root_toml + "\n" + &metadata_toml_str + "\n" + &root_outputs_toml_str + "\n"; - } - - debug!("Telegraf config:\n{}", root_toml); + let root_toml = convert_metric_definitions_to_telegraf_toml( + metric_definitions, + self.output_url.clone(), + ); // Write the string to a file std::fs::write(config_path, root_toml).unwrap(); @@ -523,25 +347,430 @@ impl MetricCollectorManager { } } -fn add_telegraf_input_required_tags( - mut input_metric: toml::map::Map, - metric_id: String, -) -> toml::map::Map { - if input_metric.contains_key("tags") && input_metric.get("tags").is_some() { - let input_metric_tags = input_metric.get("tags").unwrap(); - let input_metric_tags = input_metric_tags.as_table().unwrap(); - - let mut new_map = toml::value::Table::new(); - new_map = input_metric_tags.clone(); - new_map.insert("metric_id".to_string(), toml::Value::String(metric_id)); - input_metric.insert("tags".to_string(), toml::Value::Table(new_map.clone())); - } else { - let Ok(required_tags) = serde_json::from_str::(serde_json::json!({ "metric_id" : metric_id }).to_string().as_str()) else { - return input_metric; +fn convert_metric_definitions_to_vector_toml( + metric_definitions: &Vec<&MetricDefinition>, + output_url: String, +) -> String { + let mut root_toml = "".to_string(); + // Create a TOML array representing the metric definitions + for metric_definition in metric_definitions { + if !validate_vector_definition(metric_definition) { + error!("[vector] Validation Failed"); + continue; + } + + // metadata.sinks remove + let mut metadata = metric_definition.metadata.clone(); + metadata.remove("sinks"); + + // convert metric_definition.metadata to toml + let Ok(metadata_toml) = + serde_json::from_value::(serde_json::json!(metadata)) else { + error!("[vector] Failed to convert metadata to toml"); + continue; }; - input_metric.insert("tags".to_string(), required_tags); + + let Some(sinks) = metric_definition.metadata.get("sinks") else { + error!("[vector] sinks not found"); + continue; + }; + + let Some(sinks_object) = sinks.as_object() else { + error!("[vector] Failed to convert metadata.sinks to as_object"); + continue; + }; + + // find sinks intput + let mut sinks_input = Vec::::new(); + sinks_object.keys().for_each(|key| { + let Some(key_object) = sinks.get(key) else { + error!("[vector] Failed to convert metadata.sinks to as_object for key: {}", key); + return; + }; + let Some(sinks_type_data) = key_object.get("type") else { + error!("[vector] missing metadata.sinks type"); + return; + }; + let Some(sinks_inputs_data) = key_object.get("inputs") else { + error!("[vector] missing metadata.sinks inputs"); + return; + }; + if sinks_type_data == "wave-autoscale" { + let Some(sinks_inputs_data_arr) = sinks_inputs_data.as_array() else { + error!("[vector] Failed to convert metadata.sinks.inputs to as_array"); + return; + }; + for arr_data in sinks_inputs_data_arr { + let Some(input_str) = arr_data.as_str() else { + error!("[vector] sinks > wave-autoscale > input is not string"); + return; + }; + sinks_input.push(toml::Value::String(input_str.to_string())); + } + } + }); + if sinks_input.is_empty() { + error!("[vector] missing sinks > type: wave-autoscale > inputs data"); + continue; + } + + // make new sinks + let mut sinks_metric = toml::value::Table::new(); + sinks_metric.insert("type".to_string(), toml::Value::String("http".to_string())); + sinks_metric.insert("inputs".to_string(), toml::Value::Array(sinks_input)); + sinks_metric.insert( + "uri".to_string(), + toml::Value::String(format!( + "{}?metric_id={}&collector=vector", + output_url, metric_definition.id + )), + ); + sinks_metric.insert( + "method".to_string(), + toml::Value::String("post".to_string()), + ); + sinks_metric.insert( + "compression".to_string(), + toml::Value::String("gzip".to_string()), + ); + sinks_metric.insert( + "payload_prefix".to_string(), + toml::Value::String("{\"metrics\": ".to_string()), + ); + sinks_metric.insert( + "payload_suffix".to_string(), + toml::Value::String("}".to_string()), + ); + let mut encoding = toml::value::Table::new(); + encoding.insert("codec".to_string(), toml::Value::String("json".to_string())); + sinks_metric.insert("encoding".to_string(), toml::Value::Table(encoding)); + + let sink_metric_id = format!("output_{}", metric_definition.id); + let mut sinks_toml = toml::value::Table::new(); + sinks_toml.insert(sink_metric_id, toml::Value::Table(sinks_metric)); + let mut root_sinks_toml = toml::value::Table::new(); + root_sinks_toml.insert("sinks".to_string(), toml::Value::Table(sinks_toml)); + + let Ok(metadata_toml_str) = toml::to_string(&metadata_toml) else { + error!("[vector] Failed to convert metadata to toml string"); + continue; + }; + let Ok(root_sinks_toml_str) = toml::to_string(&root_sinks_toml) else { + error!("[vector] Failed to convert metadata.sinks to toml string"); + continue; + }; + + root_toml = root_toml + "\n" + &metadata_toml_str + "\n" + &root_sinks_toml_str + "\n"; + } + + debug!("Vector config:\n{}", root_toml); + + root_toml +} + +fn convert_metric_definitions_to_telegraf_toml( + metric_definitions: &Vec<&MetricDefinition>, + output_url: String, +) -> String { + let mut root_toml = "".to_string(); + // Create a TOML array representing the metric definitions + for metric_definition in metric_definitions { + if !validate_telegraf_definition(metric_definition) { + error!("[telegraf] Validation Failed"); + continue; + } + + // metadata.outputs, metadata.inputs remove + let mut metadata = metric_definition.metadata.clone(); + metadata.remove("outputs"); + metadata.remove("inputs"); + + // convert metric_definition.metadata to toml + let Ok(metadata_toml) = + serde_json::from_value::(serde_json::json!(metadata)) else { + error!("[telegraf] Failed to convert metadata to toml"); + continue; + }; + + let mut outputs_toml = toml::value::Array::new(); + let mut inputs_map = + HashMap::>>::new(); + + // metadata.inputs + let Some(inputs) = metric_definition.metadata.get("inputs").and_then(|inputs| inputs.as_object()) else { + error!("[telegraf] Failed to convert metadata.inputs to as_object"); + continue; + }; + + // Example: + // inputs: { // inputs + // cloudwatch: [ // inputs_target + // { // inputs_target_item + // cloudwatch items + // }, + // { + // cloudwatch items + // } + // ] + // } + for (inputs_key, inputs_target) in inputs.iter() { + let Some(inputs_target) = inputs_target.as_array() else { + error!("[telegraf] Failed to convert metadata.inputs to as_array"); + continue; + }; + + let mut transformed_inputs_target = + Vec::>::new(); + + // insert required tags to inputs + inputs_target.iter().for_each(|inputs_target_item| { + let inputs_tags = inputs_target_item.get("tags"); + let mut inputs_tags_append = serde_json::Map::new(); + if let Some(inputs_tags) = + inputs_tags.and_then(|inputs_tags| inputs_tags.as_object()) + { + error!("[telegraf] Failed to convert metadata.inputs.tags to as_object"); + inputs_tags_append = inputs_tags.clone(); + } + inputs_tags_append.insert( + "metric_id".to_string(), + serde_json::json!(metric_definition.id.clone()), + ); + let Some(inputs_target_item) = inputs_target_item.as_object() else { + error!("[telegraf] Failed to convert metadata.inputs array item to as_object"); + return; + }; + + let mut tags_map = serde_json::Map::new(); + tags_map.insert("tags".to_string(), serde_json::json!(inputs_tags_append)); + let mut inputs_target_item_tags_append = inputs_target_item.clone(); + inputs_target_item_tags_append.append(&mut tags_map); + transformed_inputs_target.push(inputs_target_item_tags_append); + }); + + inputs_map.insert(inputs_key.to_string(), transformed_inputs_target); + } + + // metadata.outputs + let Some(outputs) = metric_definition.metadata.get("outputs") else { + error!("[telegraf] outputs not found"); + continue; + }; + + // find output waveautoscale + if outputs.get("wave-autoscale").is_none() { + error!("[telegraf] missing outputs > wave-autoscale data"); + continue; + } + + // make new output + let mut output_metric = toml::value::Table::new(); + output_metric.insert( + "url".to_string(), + toml::Value::String(format!( + "{}?metric_id={}&collector=telegraf", + output_url, metric_definition.id + )), + ); + output_metric.insert( + "method".to_string(), + toml::Value::String("POST".to_string()), + ); + output_metric.insert( + "data_format".to_string(), + toml::Value::String("json".to_string()), + ); + let Ok(required_tags) = serde_json::from_str::(serde_json::json!({ "metric_id" : [metric_definition.id.to_string()] }).to_string().as_str()) else { + continue; + }; + output_metric.insert("tagpass".to_string(), required_tags); + outputs_toml.push(toml::Value::Table(output_metric)); + + // [[inputs]] + let Ok(inputs_toml) = + serde_json::from_value::(serde_json::json!(inputs_map)) else { + error!("[telegraf] Failed to convert inputs to toml"); + continue; + }; + let mut root_inputs = toml::value::Table::new(); + root_inputs.insert("inputs".to_string(), inputs_toml); + let Ok(inputs_toml_str) = toml::to_string(&root_inputs) else { + error!("[telegraf] Failed to convert metadata to toml string"); + continue; + }; + + // [[outputs]] + let mut outputs = toml::value::Table::new(); + outputs.insert("http".to_string(), toml::Value::Array(outputs_toml)); + let mut root_outputs = toml::value::Table::new(); + root_outputs.insert("outputs".to_string(), toml::Value::Table(outputs)); + let Ok(root_outputs_toml_str) = toml::to_string(&root_outputs) else { + error!("[telegraf] Failed to convert metadata.output to toml string"); + continue; + }; + + // other metadata + let Ok(metadata_toml_str) = toml::to_string(&metadata_toml) else { + error!("[telegraf] Failed to convert metadata to toml string"); + continue; + }; + + root_toml = root_toml + + "\n" + + &metadata_toml_str + + "\n" + + &inputs_toml_str + + "\n" + + &root_outputs_toml_str + + "\n"; } - input_metric + + // [agent] + let Ok(agent_toml) = serde_json::from_value::(serde_json::json!({ + "agent": { + "interval": "1s", + "round_interval": true, + "metric_batch_size": 1000, + "metric_buffer_limit": 10000, + "collection_jitter": "0s", + "flush_interval": "1s", + "flush_jitter": "0s", + "precision": "0s", + "debug": false + } + })) else { + error!("[telegraf] Failed to convert agent to toml"); + return String::new(); + }; + let Ok(agent_toml_str) = toml::to_string(&agent_toml) else { + error!("[telegraf] Failed to convert agent to toml string"); + return String::new(); + }; + + root_toml = root_toml + "\n" + &agent_toml_str + "\n"; + + debug!("Telegraf config:\n{}", root_toml); + + root_toml +} + +fn validate_vector_definition(metric_definitions: &MetricDefinition) -> bool { + let metric_definition_json = + serde_json::to_value::<&MetricDefinition>(metric_definitions).unwrap(); + + // 1. check definition collector is "vector" + if metric_definitions.collector != "vector" { + return false; + } + + // 2. check sinks type 'wave-autoscale' + let Ok(sinks_type_path) = serde_json_path::JsonPath::parse("$.metadata.sinks.*.type") else { + error!("[vector] no path - $.metadata.sinks.*.type"); + return false; + }; + let Ok(sinks_type) = sinks_type_path + .query(&metric_definition_json) + .exactly_one() else { + error!("[vector] sinks type not found"); + return false; + }; + if sinks_type != &serde_json::json!("wave-autoscale".to_string()) { + error!("[vector] sinks type is not 'wave-autoscale'"); + return false; + } + + // 3. sinks inputs validation + let mut sinks_inputs_target_ids = Vec::::new(); + + // 3-1. get sources ids + let Ok(sources_ids_path) = serde_json_path::JsonPath::parse("$.metadata.sources") else { + error!("[vector] no path - $.metadata.sources"); + return false; + }; + let Ok(sources_ids) = sources_ids_path + .query(&metric_definition_json) + .exactly_one() else { + error!("[vector] sources ids not found"); + return false; + }; + let Some(sources_ids_object) = sources_ids.as_object() else { + error!("[vector] Failed to convert sources ids to as_object"); + return false; + }; + for sources_id in sources_ids_object.iter() { + sinks_inputs_target_ids.push(sources_id.0.to_string()); + } + + // 3-2. get transforms ids + let Ok(transtorms_ids_path) = serde_json_path::JsonPath::parse("$.metadata.transforms") else { + error!("[vector] no path - $.metadata.transforms"); + return false; + }; + let transtorms_ids = transtorms_ids_path + .query(&metric_definition_json) + .exactly_one(); + // transforms is optional + if transtorms_ids.is_ok() { + let Some(transtorms_ids_object) = transtorms_ids.unwrap().as_object() else { + error!("[vector] Failed to convert transforms ids to as_object"); + return false; + }; + for transtorms_id in transtorms_ids_object.iter() { + sinks_inputs_target_ids.push(transtorms_id.0.to_string()); + } + } + + // 3-3. check sinks inputs + let Ok(sinks_inputs_path) = serde_json_path::JsonPath::parse("$.metadata.sinks.*.inputs") else { + error!("[vector] no path - $.metadata.sinks.*.inputs"); + return false; + }; + let Ok(sinks_inputs) = sinks_inputs_path + .query(&metric_definition_json) + .exactly_one() else { + error!("[vector] sinks inputs not found"); + return false; + }; + let mut is_sinks_inputs_target_ids = false; + for sinks_input in sinks_inputs.as_array().unwrap() { + let Some(sinks_input_str) = sinks_input.as_str() else { + error!("[vector] sinks inputs is not string"); + return false; + }; + if sinks_inputs_target_ids.contains(&sinks_input_str.to_string()) { + is_sinks_inputs_target_ids = true; + } + } + + is_sinks_inputs_target_ids +} + +fn validate_telegraf_definition(metric_definitions: &MetricDefinition) -> bool { + let metric_definition_json = + serde_json::to_value::<&MetricDefinition>(metric_definitions).unwrap(); + + // 1. check definition collector is "telegraf" + if metric_definitions.collector != "telegraf" { + return false; + } + + // 1. check outputs item contain 'wave-autoscale' + let Ok(outputs_items_path) = serde_json_path::JsonPath::parse("$.metadata.outputs") else { + error!("[telegraf] no path - $.metadata.outputs.wave-autoscale"); + return false; + }; + let Ok(outputs_items) = outputs_items_path + .query(&metric_definition_json) + .exactly_one() else { + error!("[telegraf] outputs.wave-autoscale not found"); + return false; + }; + let Some(outputs_items_object) = outputs_items.as_object() else { + error!("[telegraf] Failed to convert metadata.outputs to as_object"); + return false; + }; + + outputs_items_object.contains_key("wave-autoscale") } #[cfg(test)] @@ -563,43 +792,6 @@ mod tests { ) } - #[test] - #[traced_test] - fn test_add_telegraf_input_required_tags_empty_tags() { - let metric_id = "metric_id_1".to_string(); - let mut input_metric = toml::value::Table::new(); - input_metric = add_telegraf_input_required_tags(input_metric, metric_id.clone()); - let tags = input_metric.get("tags").unwrap(); - assert!(tags - .as_table() - .unwrap() - .get("metric_id") - .unwrap() - .eq(&toml::Value::String(metric_id))); - assert_eq!(tags.as_table().unwrap().len(), 1); - } - - #[test] - #[traced_test] - fn test_add_telegraf_input_required_tags_add_tags() { - let metric_id = "metric_id_1".to_string(); - let mut input_metric = toml::value::Table::new(); - - let mut input_tags = toml::value::Table::new(); - input_tags.insert("tag_1".to_string(), toml::Value::String(metric_id.clone())); - input_metric.insert("tags".to_string(), toml::Value::Table(input_tags.clone())); - - input_metric = add_telegraf_input_required_tags(input_metric, metric_id.clone()); - let tags = input_metric.get("tags").unwrap(); - assert!(tags - .as_table() - .unwrap() - .get("metric_id") - .unwrap() - .eq(&toml::Value::String(metric_id))); - assert_eq!(tags.as_table().unwrap().len(), 2); - } - // Test whether it fetchs the os and arch correctly #[test] fn test_get_os_arch() { @@ -889,8 +1081,8 @@ mod tests { #[test] #[traced_test] - fn test_vector_yaml_to_toml() { - let yaml = r#" + fn test_vector_yaml_to_toml_success() { + let metric_yaml_1 = r#" kind: Metric id: istio_request_duration_milliseconds_sum_1m collector: vector @@ -910,70 +1102,127 @@ mod tests { inputs: ["my_transforms_id_1"] "#; - let metric_definition = serde_yaml::from_str::(yaml).unwrap(); + let metric_yaml_2 = r#" + kind: Metric + id: istio_request_duration_milliseconds_sum_2m + collector: vector + metadata: + sources: + my_source_id_2: + type: http_client + query: + "query": ['rate(istio_request_duration_milliseconds_sum{destination_workload="node-server-dp",response_code="200",reporter="destination"}[2m])'] + sinks: + my_sinks_id_2: + type: wave-autoscale + inputs: ["my_source_id_2"] + "#; - // metadata.sinks remove - let mut metadata = metric_definition.metadata.clone(); - metadata.remove("sinks"); + let metric_definition_1 = serde_yaml::from_str::(metric_yaml_1).unwrap(); + let metric_definition_2 = serde_yaml::from_str::(metric_yaml_2).unwrap(); - // convert metric_definition.metadata to toml - let metadata_toml = - serde_json::from_value::(serde_json::json!(metadata)).unwrap(); + let root_toml = convert_metric_definitions_to_vector_toml( + &vec![&metric_definition_1, &metric_definition_2], + "output_url".to_string(), + ); - let mut root_sinks_toml = toml::value::Table::new(); - if metric_definition.collector == "vector" { - let sinks = metric_definition.metadata.get("sinks").unwrap(); - - // find sinks intput - let mut sinks_input = Vec::::new(); - sinks.as_object().unwrap().keys().for_each(|key| { - let sinks_type_data = sinks.get(key).unwrap().get("type"); - let sinks_inputs_data = sinks.get(key).unwrap().get("inputs"); - if sinks_type_data.unwrap() == "wave-autoscale" { - for arr_data in sinks_inputs_data.unwrap().as_array().unwrap() { - let Some(input_str) = arr_data.as_str() else { - error!("[vector] sinks > wave-autoscale > input is not string"); - return; - }; - sinks_input.push(toml::Value::String(input_str.to_string())); - } - } - }); + println!("root_toml:\n{}", root_toml); + assert!(root_toml.contains("[sources.my_source_id_1]")); + assert!(root_toml.contains("[sources.my_source_id_1.query]")); + assert!(root_toml.contains("[transforms.my_transforms_id_1]")); + assert!(root_toml.contains("inputs = [\"my_transforms_id_1\"]")); + assert!(root_toml.contains("inputs = [\"my_source_id_2\"]")); + assert!(root_toml.contains("method = \"post\"")); + } - // make new sinks - let mut sinks_metric = toml::value::Table::new(); - sinks_metric.insert("type".to_string(), toml::Value::String("http".to_string())); - sinks_metric.insert("inputs".to_string(), toml::Value::Array(sinks_input)); - sinks_metric.insert( - "uri".to_string(), - toml::Value::String(format!( - "{}?metric_id={}&collector=vector", - "output_url", metric_definition.id - )), - ); - sinks_metric.insert( - "method".to_string(), - toml::Value::String("post".to_string()), - ); - let mut encoding = toml::value::Table::new(); - encoding.insert("codec".to_string(), toml::Value::String("json".to_string())); - sinks_metric.insert("encoding".to_string(), toml::Value::Table(encoding)); - - let sink_metric_id = format!("output_{}", metric_definition.id); - let mut sinks_toml = toml::value::Table::new(); - sinks_toml.insert(sink_metric_id, toml::Value::Table(sinks_metric)); - root_sinks_toml.insert("sinks".to_string(), toml::Value::Table(sinks_toml)); - } + #[test] + #[traced_test] + fn test_vector_yaml_to_toml_validation() { + let metric_yaml_success_1 = r#" + kind: Metric + id: metric_id_1 + collector: vector + metadata: + sources: + my_source_id_1: + type: http_client + my_source_id_2: + type: http_client + transforms: + my_transforms_id_1: + inputs: ["my_source_id_1"] + type: remap + my_transforms_id_2: + inputs: ["my_transforms_id_1"] + type: remap + sinks: + my_sinks_id: + type: wave-autoscale + inputs: ["my_transforms_id_2"] + "#; + let metric_definition_success_1 = + serde_yaml::from_str::(metric_yaml_success_1).unwrap(); + assert!(validate_vector_definition(&metric_definition_success_1)); + + let metric_yaml_success_2 = r#" + kind: Metric + id: metric_id_1 + collector: vector + metadata: + sources: + my_source_id_2: + type: http_client + sinks: + my_sinks_id_2: + type: wave-autoscale + inputs: ["my_source_id_2"] + "#; + let metric_definition_success_2 = + serde_yaml::from_str::(metric_yaml_success_2).unwrap(); + assert!(validate_vector_definition(&metric_definition_success_2)); - let metadata_toml_str = toml::to_string(&metadata_toml).unwrap(); - debug!("metadata_toml:\n{}", metadata_toml_str); - assert!(metadata_toml_str.contains("[sources.my_source_id_1]")); - assert!(metadata_toml_str.contains("[sources.my_source_id_1.query]")); - assert!(metadata_toml_str.contains("[transforms.my_transforms_id_1]")); - let root_sinks_toml_str = toml::to_string(&root_sinks_toml).unwrap(); - debug!("sinks_toml:\n{}", root_sinks_toml_str); - assert!(root_sinks_toml_str.contains("inputs = [\"my_transforms_id_1\"]")); - assert!(root_sinks_toml_str.contains("method = \"post\"")); + let metric_yaml_fail_1 = r#" + kind: Metric + id: metric_id_1 + collector: vector + metadata: + sources: + my_source_id_2: + type: http_client + sinks: + my_sinks_id_2: + type: http_client + inputs: ["my_source_id_2"] + "#; + let metric_definition_fail_1 = + serde_yaml::from_str::(metric_yaml_fail_1).unwrap(); + assert!(!validate_vector_definition(&metric_definition_fail_1)); + + let metric_yaml_fail_2 = r#" + kind: Metric + id: metric_id_1 + collector: vector + metadata: + sources: + my_source_id_1: + type: http_client + my_source_id_2: + type: http_client + transforms: + my_transforms_id_1: + inputs: ["my_source_id_1"] + type: remap + my_transforms_id_2: + inputs: ["my_transforms_id_1"] + type: remap + sinks: + my_sinks_id: + type: wave-autoscale + inputs: ["my_transforms_id_3"] + "#; + let metric_definition_fail_2 = + serde_yaml::from_str::(metric_yaml_fail_2).unwrap(); + assert!(!validate_vector_definition(&metric_definition_fail_2)); } #[test] @@ -992,80 +1241,78 @@ mod tests { interval: "10s" namepass: ["process_cpu_seconds_*"] tags: - metric_id: prometheus_metrics + test: test_tag outputs: wave-autoscale: tagpass: metric_id: prometheus_metrics - agent: - interval: "1s" - metric_batch_size: 1000 - metric_buffer_limit: 10000 - flush_interval: "1s" + secretstores: + http: + - id: "secretstore" + url: "http://localhost/secrets" "#; let metric_definition = serde_yaml::from_str::(yaml).unwrap(); - // metadata.outputs remove - let mut metadata = metric_definition.metadata.clone(); - metadata.remove("outputs"); + let root_toml = convert_metric_definitions_to_telegraf_toml( + &vec![&metric_definition], + "output_url".to_string(), + ); - // convert metric_definition.metadata to toml - let metadata_toml = - serde_json::from_value::(serde_json::json!(metadata)).unwrap(); + debug!("root_toml:\n{}", root_toml); + assert!(root_toml.contains("[[secretstores.http]]")); + assert!(root_toml.contains("id = \"secretstore\"")); - let mut outputs_toml = toml::value::Array::new(); - if metric_definition.collector == "telegraf" { - let outputs = metric_definition.metadata.get("outputs").unwrap(); - - // find output waveautoscale - if outputs.get("wave-autoscale").is_some() { - // find tagpass - let tagpass = outputs - .get("wave-autoscale") - .unwrap() - .get("tagpass") - .unwrap(); - - // make new output - let mut output_metric = toml::value::Table::new(); - output_metric.insert( - "url".to_string(), - toml::Value::String(format!( - "{}?metric_id={}&collector=telegraf", - "output_url", metric_definition.id - )), - ); - output_metric.insert( - "method".to_string(), - toml::Value::String("POST".to_string()), - ); - output_metric.insert( - "data_format".to_string(), - toml::Value::String("json".to_string()), - ); - let Ok(output_tagpass) = serde_json::from_str::(serde_json::json!(tagpass).to_string().as_str()) else { - return - }; - output_metric.insert("tagpass".to_string(), output_tagpass); - outputs_toml.push(toml::Value::Table(output_metric)); - } - } - // [[outputs]] - let mut outputs = toml::value::Table::new(); - outputs.insert("http".to_string(), toml::Value::Array(outputs_toml)); - let mut root_outputs = toml::value::Table::new(); - root_outputs.insert("outputs".to_string(), toml::Value::Table(outputs)); + assert!(root_toml.contains("flush_interval = \"1s\"")); + + assert!(root_toml.contains("[[inputs.prometheus]]")); + assert!(root_toml.contains("namepass = [\"process_cpu_seconds_*\"]")); + assert!(root_toml.contains("[inputs.prometheus.tags]")); + assert!(root_toml.contains("metric_id = \"prometheus_metrics\"")); + assert!(root_toml.contains("test = \"test_tag\"")); + + assert!(root_toml.contains("[[outputs.http]]")); + assert!(root_toml.contains("[outputs.http.tagpass]")); + assert!(root_toml.contains("metric_id = \"prometheus_metrics\"")); + } + + #[test] + #[traced_test] + fn test_telegraf_yaml_to_toml_validation() { + let metric_yaml_success_1 = r#" + kind: Metric + id: prometheus_metrics + collector: telegraf + metadata: + inputs: + prometheus: + - urls: ["http://localhost:9090/metrics"] + period: "10s" + delay: "10s" + outputs: + wave-autoscale: {} + "#; + let metric_definition_success_1 = + serde_yaml::from_str::(metric_yaml_success_1).unwrap(); + assert!(validate_telegraf_definition(&metric_definition_success_1)); - let metadata_toml_str = toml::to_string(&metadata_toml).unwrap(); - debug!("metadata_toml:\n{}", metadata_toml_str); - assert!(metadata_toml_str.contains("flush_interval = \"1s\"")); - assert!(metadata_toml_str.contains("[[inputs.prometheus]]")); - assert!(metadata_toml_str.contains("namepass = [\"process_cpu_seconds_*\"]")); - let outputs_http_str = toml::to_string(&root_outputs).unwrap(); - debug!("output_toml:\n{}", outputs_http_str); - assert!(outputs_http_str.contains("[[outputs.http]]")); - assert!(outputs_http_str.contains("[outputs.http.tagpass]")); - assert!(outputs_http_str.contains("metric_id = \"prometheus_metrics\"")); + let metric_yaml_fail_1 = r#" + kind: Metric + id: prometheus_metrics + collector: telegraf + metadata: + inputs: + prometheus: + - urls: ["http://localhost:9090/metrics"] + period: "10s" + delay: "10s" + outputs: + http: + - url: "http://localhost/" + method: "POST" + "#; + let metric_definition_fail_1 = + serde_yaml::from_str::(metric_yaml_fail_1).unwrap(); + assert!(!validate_telegraf_definition(&metric_definition_fail_1)); } } diff --git a/core/wave-autoscale/tests/yaml/plan_k8s_istio_rds_sample.yaml b/core/wave-autoscale/tests/yaml/plan_k8s_istio_rds_sample.yaml index 1e6c4607..85a58710 100644 --- a/core/wave-autoscale/tests/yaml/plan_k8s_istio_rds_sample.yaml +++ b/core/wave-autoscale/tests/yaml/plan_k8s_istio_rds_sample.yaml @@ -16,8 +16,6 @@ metadata: dimensions: - name: DBInstanceIdentifier value: "wave-eks-istio-postgresql" - tags: - metric_id: cloudwatch_rds_metrics # add tags - region: ap-northeast-1 profile: "default" period: "1m" @@ -30,20 +28,8 @@ metadata: dimensions: - name: DBInstanceIdentifier value: "wave-eks-istio-postgresql" - tags: - metric_id: cloudwatch_rds_metrics # add tags outputs: wave-autoscale: {} - agent: - interval: "1s" - round_interval: true - metric_batch_size: 1000 - metric_buffer_limit: 10000 - collection_jitter: "0s" - flush_interval: "1s" - flush_jitter: "0s" - precision: "0s" - debug: false --- kind: ScalingComponent id: k8s_patch