From d42528526a45585f7dedcf832e5bebe0e23041a8 Mon Sep 17 00:00:00 2001 From: ohuu Date: Sun, 10 Nov 2024 20:05:41 +0000 Subject: [PATCH] added region and ci --- Cargo.toml | 1 + cardamon.bk.toml | 68 +++++++ cardamon.toml | 174 ++++++++++++------ fixtures/runs.sql | 8 +- src/carbon_intensity.rs | 147 +++++++++++++++ src/config.rs | 12 +- src/data.rs | 7 +- src/data/dataset.rs | 8 +- src/data/dataset_builder.rs | 34 ++-- src/entities/cpu.rs | 6 +- src/entities/iteration.rs | 2 +- src/entities/metrics.rs | 2 +- src/entities/mod.rs | 2 +- src/entities/power_curve.rs | 18 +- src/entities/prelude.rs | 2 +- src/entities/run.rs | 7 +- src/lib.rs | 19 +- src/main.rs | 88 +++++++-- src/metrics_logger.rs | 4 +- .../m20241109_180400_add_region_column.rs | 36 ++++ .../m20241110_191154_add_ci_column.rs | 40 ++++ src/migrations/mod.rs | 4 + src/models.rs | 62 +++---- src/server/routes.rs | 6 +- 24 files changed, 596 insertions(+), 161 deletions(-) create mode 100644 cardamon.bk.toml create mode 100644 src/carbon_intensity.rs create mode 100644 src/migrations/m20241109_180400_add_region_column.rs create mode 100644 src/migrations/m20241110_191154_add_ci_column.rs diff --git a/Cargo.toml b/Cargo.toml index 1712254..579a0d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,6 +48,7 @@ http = "1.1.0" itertools = "0.13.0" mime_guess = { version = "2.0.5" } num_cpus = "1.16.0" +phf = { version = "0.11", features = ["macros"] } reqwest = { version = "0.12.7", features = ["json"] } rust-embed = "8.5.0" sea-orm = { version = "1.0.0", features = [ diff --git a/cardamon.bk.toml b/cardamon.bk.toml new file mode 100644 index 0000000..d41489f --- /dev/null +++ b/cardamon.bk.toml @@ -0,0 +1,68 @@ +# CPU +# ######################################## +[cpu] +name = "AMD Ryzen 7 PRO 6850U with Radeon Graphics" +curve = [ + 7.627190097500079, + 0.07551567953624883, + 20.45110313049153, + -1.5261422759740344, +] + +# Processes +# ######################################## +[[process]] +name = "rand-api" +up = "./rand-api" +down = "kill {pid}" +redirect.to = "file" +process.type = "baremetal" + +[[process]] +name = "rand-api-docker" +up = "docker run -d --name c1 -p 4244:4243 rand-api" +down = "bash -c 'docker stop c1 && docker rm c1'" +redirect.to = "file" +process.type = "docker" +process.containers = ["c1"] + +# Scenarios +# ######################################## +[[scenario]] +name = "stress" +desc = "" +command = "sh scenario.sh" +iterations = 2 +processes = ["rand-api", "rand-api-docker"] + +[[scenario]] +name = "stress_metal" +desc = "" +command = "sh scenario.sh" +iterations = 2 +processes = ["rand-api"] + +[[scenario]] +name = "stress_docker" +desc = "" +command = "sh scenario.sh" +iterations = 2 +processes = ["rand-api-docker"] + +# Observations +# ######################################## +[[observation]] +name = "stress" +scenarios = ["stress"] + +[[observation]] +name = "stress_metal" +scenarios = ["stress_metal"] + +[[observation]] +name = "stress_docker" +scenarios = ["stress_docker"] + +[[observation]] +name = "live_monitor" +processes = ["test_proc1", "test_proc2"] diff --git a/cardamon.toml b/cardamon.toml index d41489f..1beee19 100644 --- a/cardamon.toml +++ b/cardamon.toml @@ -1,68 +1,138 @@ -# CPU -# ######################################## [cpu] name = "AMD Ryzen 7 PRO 6850U with Radeon Graphics" -curve = [ - 7.627190097500079, - 0.07551567953624883, - 20.45110313049153, - -1.5261422759740344, -] +curve = [7.627190097500079,0.07551567953624883,20.45110313049153,-1.5261422759740344] # Processes -# ######################################## +# --------- +# This array of tables describes the components of your application that you +# would like cardamon to measure. +# +# processes contain the following properties: +# name: +# type - string +# desc - must be unique +# required - true +# +# up: +# type - string +# desc - command to execute the processes +# required - true +# +# down: +# type - string +# desc - command to stop the process. In the case of bare-metal processes +# cardamon will pass the PID of the process to this command. You can +# use `{pid}` as a placeholder in the command e.g. `kill {pid}`. +# default: empty string +# required - false +# +# proccess.type: +# type - "baremetal" | "docker" +# desc - the type of process which is being executed +# required - true +# +# process.containers: +# type - string[] +# desc - docker processes may initiate multiple containers from a single +# command, e.g. `docker compose up -d`. This is the list of +# containers started by this process you would like cardamon to +# measure. +# required - true (if `process.type` is "docker") +# +# redirect.to: +# type - "null" | "parent" | "file" +# desc - where to redirect this processes stdout and stderr. "null" ignores +# output, "parent" attaches the processes output to cardamon, "file" +# writes stdout and stderr to a file of the same name as this +# process e.g. db.stdout. +# default: "file" +# required - false +# +# EXAMPLE +# ------- +# [[process]] +# name = "db" +# up = "docker compose up -d" +# down = "docker compose down" +# redirect.to = "file" +# process.type = "docker" +# process.containers = ["postgres"] + [[process]] -name = "rand-api" -up = "./rand-api" +name = "test_proc" +up = "bash -c \"while true; do shuf -i 0-1337 -n 1; done\"" down = "kill {pid}" redirect.to = "file" process.type = "baremetal" -[[process]] -name = "rand-api-docker" -up = "docker run -d --name c1 -p 4244:4243 rand-api" -down = "bash -c 'docker stop c1 && docker rm c1'" -redirect.to = "file" -process.type = "docker" -process.containers = ["c1"] - # Scenarios -# ######################################## +# --------- +# This array of tables describes the scenarios that cardamon can run. They can +# be any kind of executable and are designed to place your application under +# consistent load each time they are run. Examples include bash scripts which +# `curl` a REST endpoint or nodejs scripts using playwright.js to control a +# webpage. +# +# scenarios contain the following properties: +# name: +# type - string +# desc - must be unique +# required - true +# +# desc: +# type - string +# desc - a short description of the scenario to remind you what it does +# required - false +# +# command: +# type - string +# desc - the command to execute this scenario +# required - true +# +# iterations: +# type - integer +# desc - the number of times cardamon should execute this scenario per run. +# It's better to run scenarios multiple times and take an average. +# default - 1 +# required - false +# +# processes: +# type - string[] +# desc - a list of the processes which need to be started before executing +# this scenario. +# required - true +# [[scenario]] -name = "stress" -desc = "" -command = "sh scenario.sh" +name = "sleep" +desc = "Sleeps for 10 seconds, a real scenario would call your app" +command = "sleep 10" iterations = 2 -processes = ["rand-api", "rand-api-docker"] +processes = ["test_proc"] -[[scenario]] -name = "stress_metal" -desc = "" -command = "sh scenario.sh" -iterations = 2 -processes = ["rand-api"] - -[[scenario]] -name = "stress_docker" -desc = "" -command = "sh scenario.sh" -iterations = 2 -processes = ["rand-api-docker"] # Observations -# ######################################## -[[observation]] -name = "stress" -scenarios = ["stress"] - -[[observation]] -name = "stress_metal" -scenarios = ["stress_metal"] - -[[observation]] -name = "stress_docker" -scenarios = ["stress_docker"] - +# --------------- +# This array of tables allows you to group scenarios together to make it +# easier to execute multiple scenarios in a single run. +# +# obserations contain the following properties: +# name: +# type - string +# desc - a unique name +# required - true +# +# observe.scenarios: +# type - string[] +# desc - a list of scenarios to execute whilst observing the processes +# required to run all scenarios +# required - required if observe.processes is not defined +# +# observe.processes: +# type - string[] +# desc - a list of processes to execute and observe. Running an observation +# with this property set runs Cardamon in Live mode. +# required - required if observe.scenarios is not defined. +# [[observation]] -name = "live_monitor" -processes = ["test_proc1", "test_proc2"] +name = "test_obs" +scenarios = ["sleep"] \ No newline at end of file diff --git a/fixtures/runs.sql b/fixtures/runs.sql index 62ce81a..16d49e4 100644 --- a/fixtures/runs.sql +++ b/fixtures/runs.sql @@ -1,7 +1,7 @@ DELETE FROM run; -INSERT INTO run (id, is_live, cpu_id, start_time, stop_time) +INSERT INTO run (id, is_live, cpu_id, region, start_time, stop_time) VALUES -(1, false, 1, 1717507590000, 1717507601000), -(2, false, 1, 1717507690000, 1717507699000), -(3, false, 1, 1717507790000, 1717507795000); +(1, false, 1, 'GB', 1717507590000, 1717507601000), +(2, false, 1, 'GB', 1717507690000, 1717507699000), +(3, false, 1, 'GB', 1717507790000, 1717507795000); diff --git a/src/carbon_intensity.rs b/src/carbon_intensity.rs new file mode 100644 index 0000000..33bfa53 --- /dev/null +++ b/src/carbon_intensity.rs @@ -0,0 +1,147 @@ +use anyhow::Context; +use chrono::{DateTime, Datelike, Months, Utc}; +use phf::phf_map; +use serde_json::Value; + +pub const GLOBAL_CI: f64 = 0.494; // g/Wh + +static ISO_3166: phf::Map<&'static str, &'static str> = phf_map! { + "AF" => "AFG", "AX" => "ALA", "AL" => "ALB", "DZ" => "DZA", "AS" => "ASM", "AD" => "AND", + "AO" => "AGO", "AI" => "AIA", "AQ" => "ATA", "AG" => "ATG", "AR" => "ARG", "AM" => "ARM", + "AW" => "ABW", "AU" => "AUS", "AT" => "AUT", "AZ" => "AZE", "BS" => "BHS", "BH" => "BHR", + "BD" => "BGD", "BB" => "BRB", "BY" => "BLR", "BE" => "BEL", "BZ" => "BLZ", "BJ" => "BEN", + "BM" => "BMU", "BT" => "BTN", "BO" => "BOL", "BQ" => "BES", "BA" => "BIH", "BW" => "BWA", + "BV" => "BVT", "BR" => "BRA", "IO" => "IOT", "BN" => "BRN", "BG" => "BGR", "BF" => "BFA", + "BI" => "BDI", "CV" => "CPV", "KH" => "KHM", "CM" => "CMR", "CA" => "CAN", "KY" => "CYM", + "CF" => "CAF", "TD" => "TCD", "CL" => "CHL", "CN" => "CHN", "CX" => "CXR", "CC" => "CCK", + "CO" => "COL", "KM" => "COM", "CG" => "COG", "CD" => "COD", "CK" => "COK", "CR" => "CRI", + "CI" => "CIV", "HR" => "HRV", "CU" => "CUB", "CW" => "CUW", "CY" => "CYP", "CZ" => "CZE", + "DK" => "DNK", "DJ" => "DJI", "DM" => "DMA", "DO" => "DOM", "EC" => "ECU", "EG" => "EGY", + "SV" => "SLV", "GQ" => "GNQ", "ER" => "ERI", "EE" => "EST", "SZ" => "SWZ", "ET" => "ETH", + "FK" => "FLK", "FO" => "FRO", "FJ" => "FJI", "FI" => "FIN", "FR" => "FRA", "GF" => "GUF", + "PF" => "PYF", "TF" => "ATF", "GA" => "GAB", "GM" => "GMB", "GE" => "GEO", "DE" => "DEU", + "GH" => "GHA", "GI" => "GIB", "GR" => "GRC", "GL" => "GRL", "GD" => "GRD", "GP" => "GLP", + "GU" => "GUM", "GT" => "GTM", "GG" => "GGY", "GN" => "GIN", "GW" => "GNB", "GY" => "GUY", + "HT" => "HTI", "HM" => "HMD", "VA" => "VAT", "HN" => "HND", "HK" => "HKG", "HU" => "HUN", + "IS" => "ISL", "IN" => "IND", "ID" => "IDN", "IR" => "IRN", "IQ" => "IRQ", "IE" => "IRL", + "IM" => "IMN", "IL" => "ISR", "IT" => "ITA", "JM" => "JAM", "JP" => "JPN", "JE" => "JEY", + "JO" => "JOR", "KZ" => "KAZ", "KE" => "KEN", "KI" => "KIR", "KP" => "PRK", "KR" => "KOR", + "KW" => "KWT", "KG" => "KGZ", "LA" => "LAO", "LV" => "LVA", "LB" => "LBN", "LS" => "LSO", + "LR" => "LBR", "LY" => "LBY", "LI" => "LIE", "LT" => "LTU", "LU" => "LUX", "MO" => "MAC", + "MG" => "MDG", "MW" => "MWI", "MY" => "MYS", "MV" => "MDV", "ML" => "MLI", "MT" => "MLT", + "MH" => "MHL", "MQ" => "MTQ", "MR" => "MRT", "MU" => "MUS", "YT" => "MYT", "MX" => "MEX", + "FM" => "FSM", "MD" => "MDA", "MC" => "MCO", "MN" => "MNG", "ME" => "MNE", "MS" => "MSR", + "MA" => "MAR", "MZ" => "MOZ", "MM" => "MMR", "NA" => "NAM", "NR" => "NRU", "NP" => "NPL", + "NL" => "NLD", "NC" => "NCL", "NZ" => "NZL", "NI" => "NIC", "NE" => "NER", "NG" => "NGA", + "NU" => "NIU", "NF" => "NFK", "MK" => "MKD", "MP" => "MNP", "NO" => "NOR", "OM" => "OMN", + "PK" => "PAK", "PW" => "PLW", "PS" => "PSE", "PA" => "PAN", "PG" => "PNG", "PY" => "PRY", + "PE" => "PER", "PH" => "PHL", "PN" => "PCN", "PL" => "POL", "PT" => "PRT", "PR" => "PRI", + "QA" => "QAT", "RE" => "REU", "RO" => "ROU", "RU" => "RUS", "RW" => "RWA", "BL" => "BLM", + "SH" => "SHN", "KN" => "KNA", "LC" => "LCA", "MF" => "MAF", "PM" => "SPM", "VC" => "VCT", + "WS" => "WSM", "SM" => "SMR", "ST" => "STP", "SA" => "SAU", "SN" => "SEN", "RS" => "SRB", + "SC" => "SYC", "SL" => "SLE", "SG" => "SGP", "SX" => "SXM", "SK" => "SVK", "SI" => "SVN", + "SB" => "SLB", "SO" => "SOM", "ZA" => "ZAF", "GS" => "SGS", "SS" => "SSD", "ES" => "ESP", + "LK" => "LKA", "SD" => "SDN", "SR" => "SUR", "SJ" => "SJM", "SE" => "SWE", "CH" => "CHE", + "SY" => "SYR", "TW" => "TWN", "TJ" => "TJK", "TZ" => "TZA", "TH" => "THA", "TL" => "TLS", + "TG" => "TGO", "TK" => "TKL", "TO" => "TON", "TT" => "TTO", "TN" => "TUN", "TR" => "TUR", + "TM" => "TKM", "TC" => "TCA", "TV" => "TUV", "UG" => "UGA", "UA" => "UKR", "AE" => "ARE", + "GB" => "GBR", "US" => "USA", "UM" => "UMI", "UY" => "URY", "UZ" => "UZB", "VU" => "VUT", + "VE" => "VEN", "VN" => "VNM", "VG" => "VGB", "VI" => "VIR", "WF" => "WLF", "EH" => "ESH", + "YE" => "YEM", "ZM" => "ZMB", "ZW" => "ZWE", +}; + +const EMBER_API_BASE_URL: &str = "https://api.ember-energy.org/v1/carbon-intensity"; +const EMBER_KEY: &str = "c5e07f2c-5d07-4b99-a78e-661097d874e6"; + +pub fn valid_region_code(code: &str) -> bool { + ISO_3166.get_key(code).is_some() +} + +fn try_parse_region(json_obj: Value) -> Option { + json_obj.get("country")?.as_str().map(|str| str.to_string()) +} + +pub async fn fetch_region_code() -> anyhow::Result { + let client = reqwest::Client::new(); + + let resp = client + .get("https://api.country.is/") + .header("Content-Type", "application/json") + .send() + .await?; + + let json_obj = resp.json().await?; + try_parse_region(json_obj).context("Error fetching region from IP") +} + +fn try_parse_ci(json_obj: &Value) -> Option { + json_obj + .get("stats")? + .get("query_value_range")? + .get("emissions_intensity_gco2_per_kwh")? + .get("max")? + .as_f64() + .map(|ci| ci / 1000.0) // g/kWh -> g/Wh +} + +/// Attempts to fetch carbon intensity for the given region from Ember. +pub async fn fetch_ci(code: &str, date: &DateTime) -> anyhow::Result { + let code = ISO_3166.get(code).context("Incorrect ISO 3166 code")?; + + let client = reqwest::Client::new(); + + let end = date; + let start = end + .checked_sub_months(Months::new(1)) + .context("Error parsing month")?; + + let start_date = format!("{}-{}", start.year(), start.month()); + let end_date = format!("{}-{}", end.year(), end.month()); + + let url = format!( + "{}/monthly?entity_code={}&start_date={}&end_date={}&api_key={}", + EMBER_API_BASE_URL, code, start_date, end_date, EMBER_KEY + ); + + let resp = client + .get(url) + .header("Content-Type", "application/json") + .send() + .await?; + + let json_obj = resp.json().await?; + try_parse_ci(&json_obj).context("Error parsing carbon intensity") +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn can_fetch_region_ci() -> anyhow::Result<()> { + let now = Utc::now(); + let ci = fetch_ci("GB", &now).await?; + assert!(ci > 0.0); + Ok(()) + } + + #[tokio::test] + async fn incorrect_region_should_cause_error() -> anyhow::Result<()> { + let now = Utc::now(); + let ci = fetch_ci("ZZ", &now).await; + assert!(ci.is_err()); + Ok(()) + } + + #[tokio::test] + async fn can_fetch_ip() -> anyhow::Result<()> { + let region = fetch_region_code().await?; + assert!(!region.is_empty()); + + let now = Utc::now(); + let ci = fetch_ci(®ion, &now).await?; + assert!(ci > 0.0); + + Ok(()) + } +} diff --git a/src/config.rs b/src/config.rs index d83bc08..6fb1243 100644 --- a/src/config.rs +++ b/src/config.rs @@ -95,7 +95,7 @@ impl Config { pub fn find_scenarios(&self, scenario_names: &[&String]) -> anyhow::Result> { let mut scenarios = vec![]; for scenario_name in scenario_names { - let scenario = self.find_scenario(&scenario_name)?; + let scenario = self.find_scenario(scenario_name)?; scenarios.push(scenario); } Ok(scenarios) @@ -118,7 +118,7 @@ impl Config { fn find_processes(&self, proc_names: &[&String]) -> anyhow::Result> { let mut processes = vec![]; for proc_name in proc_names { - let proc = self.find_process(&proc_name)?; + let proc = self.find_process(proc_name)?; processes.push(proc); } Ok(processes) @@ -146,7 +146,7 @@ impl Config { if !external_only { let mut proc_set: HashSet = HashSet::new(); for scenario_name in scenario_names { - let scenario = self.find_scenario(&scenario_name).context(format!( + let scenario = self.find_scenario(scenario_name).context(format!( "Unable to find scenario with name {}", scenario_name ))?; @@ -273,6 +273,7 @@ pub enum ProcessToObserve { pub enum ExecutionMode<'a> { Live, Observation(Vec<&'a Scenario>), + Trigger, } #[derive(Debug)] @@ -303,10 +304,7 @@ impl<'a> ExecutionPlan<'a> { pub fn observe_external_process(&mut self, process_to_observe: ProcessToObserve) { match &mut self.external_processes_to_observe { None => self.external_processes_to_observe = Some(vec![process_to_observe]), - Some(vec) => { - vec.push(process_to_observe); - Some(vec); - } + Some(vec) => vec.push(process_to_observe), }; } } diff --git a/src/data.rs b/src/data.rs index 2b28969..7e12ca8 100644 --- a/src/data.rs +++ b/src/data.rs @@ -38,15 +38,12 @@ impl std::ops::Add for Data { } impl Data { pub fn sum(data: &[&Data]) -> Self { - data.into_iter() - .fold(Data::default(), |acc, item| acc + *item) + data.iter().fold(Data::default(), |acc, item| acc + *item) } pub fn mean(data: &[&Data]) -> Self { let len = data.len() as f64; - let mut data = data - .into_iter() - .fold(Data::default(), |acc, item| acc + *item); + let mut data = data.iter().fold(Data::default(), |acc, item| acc + *item); data.pow /= len; data.co2 /= len; diff --git a/src/data/dataset.rs b/src/data/dataset.rs index bd03deb..03baed9 100644 --- a/src/data/dataset.rs +++ b/src/data/dataset.rs @@ -185,7 +185,7 @@ impl<'a> ScenarioDataset<'a> { pub async fn apply_model( &'a self, db: &DatabaseConnection, - model: &impl Fn(&Vec<&Metrics>, &Power) -> Data, + model: &impl Fn(&Vec<&Metrics>, &Power, f64) -> Data, aggregation_method: AggregationMethod, ) -> anyhow::Result { let mut all_run_data = vec![]; @@ -265,9 +265,9 @@ impl<'a> ScenarioRunDataset<'a> { pub async fn apply_model( &'a self, db: &DatabaseConnection, - model: &impl Fn(&Vec<&Metrics>, &Power) -> Data, + model: &impl Fn(&Vec<&Metrics>, &Power, f64) -> Data, ) -> anyhow::Result { - let run = dao::run::fetch(self.run_id, &db).await?; + let run = dao::run::fetch(self.run_id, db).await?; let cpu = run .find_related(entities::cpu::Entity) .one(db) @@ -301,7 +301,7 @@ impl<'a> ScenarioRunDataset<'a> { for scenario_run_iteration_dataset in self.by_iteration() { for (proc_id, metrics) in scenario_run_iteration_dataset.by_process() { // run the RAB model to get power and co2 emissions - let cardamon_data = model(&metrics, &power); + let cardamon_data = model(&metrics, &power, run.carbon_intensity); // convert the metrics database model into metrics data let proc_metrics = metrics diff --git a/src/data/dataset_builder.rs b/src/data/dataset_builder.rs index 7506068..008341c 100644 --- a/src/data/dataset_builder.rs +++ b/src/data/dataset_builder.rs @@ -99,6 +99,12 @@ impl DatasetBuilder { } } +impl Default for DatasetBuilder { + fn default() -> Self { + Self::new() + } +} + /// The DatasetRowPager defines an incomplete Dataset which includes set of scenarios (rows) /// without any runs. /// @@ -243,7 +249,7 @@ impl DatasetBuilderFinal { let (scenario_names, scenario_pages) = match &self.scenario_selection { ScenarioSelection::All => dao::scenario::fetch_all(&self.scenario_page, db).await, ScenarioSelection::One(name) => { - let scenario_name = dao::scenario::fetch(&name, db) + let scenario_name = dao::scenario::fetch(name, db) .await? .context(format!("Error finding scenario with name {}", name))?; @@ -251,10 +257,10 @@ impl DatasetBuilderFinal { // pages are not required! } ScenarioSelection::Search(name) => { - dao::scenario::fetch_by_query(&name, &self.scenario_page, db).await + dao::scenario::fetch_by_query(name, &self.scenario_page, db).await } ScenarioSelection::InRun(run) => { - dao::scenario::fetch_in_run(&run, &self.scenario_page, db).await + dao::scenario::fetch_in_run(run, &self.scenario_page, db).await } ScenarioSelection::InRange { from, to } => { dao::scenario::fetch_in_range(*from, *to, &self.scenario_page, db).await @@ -273,11 +279,7 @@ impl DatasetBuilderFinal { let (scenarios, total_scenarios) = self.fetch_scenarios(db).await?; let (iterations, total_runs) = match self.run_selection { - RunSelection::All => { - let poop = dao::iteration::fetch_runs_all(&scenarios, None, db).await; - // println!("\n {:?}", poop); - poop - } + RunSelection::All => dao::iteration::fetch_runs_all(&scenarios, None, db).await, RunSelection::InRange { from, to } => { dao::iteration::fetch_runs_in_range(&scenarios, from, to, None, db).await @@ -513,7 +515,7 @@ mod tests { .build(&db) .await?; let scenario_datasets = dataset.by_scenario(LiveDataFilter::IncludeLive); - let run_datasets = scenario_datasets.get(0).unwrap().by_run(); + let run_datasets = scenario_datasets.first().unwrap().by_run(); // there should be three runs for scenario 3 returned in reverse chronological order // ie. [3, 2, 1] @@ -536,7 +538,7 @@ mod tests { // there should be 3 runs for scenario_3, 2 for scenario_2 and 1 for scenario_1 in reverse // chronological order // ie. scenario_3 = [3,2,1], scenario_2 = [2,1], scenario_1 = [1] - let scenario_dataset = scenario_datasets.get(0).unwrap(); + let scenario_dataset = scenario_datasets.first().unwrap(); let run_datasets = scenario_dataset.by_run(); let run_ids = run_datasets .iter() @@ -579,7 +581,7 @@ mod tests { .build(&db) .await?; let scenario_datasets = dataset.by_scenario(LiveDataFilter::IncludeLive); - let run_datasets = scenario_datasets.get(0).unwrap().by_run(); + let run_datasets = scenario_datasets.first().unwrap().by_run(); // there should be 2 runs for scenario 3 returned in reverse chronological order // ie. [3, 2] @@ -601,7 +603,7 @@ mod tests { // there should be 2 runs for scenario_3 and 1 for scenario_2 in reverse chronological order // ie. scenario_3 = [3,2], scenario_2 = [2] - let scenario_dataset = scenario_datasets.get(0).unwrap(); + let scenario_dataset = scenario_datasets.first().unwrap(); let run_datasets = scenario_dataset.by_run(); let run_ids = run_datasets .iter() @@ -635,7 +637,7 @@ mod tests { .build(&db) .await?; let scenario_datasets = dataset.by_scenario(LiveDataFilter::IncludeLive); - let run_datasets = scenario_datasets.get(0).unwrap().by_run(); + let run_datasets = scenario_datasets.first().unwrap().by_run(); // there should be 2 runs for scenario 3 returned in reverse chronological order // ie. [3, 2] @@ -657,7 +659,7 @@ mod tests { // there should be 2 runs for scenario_3, 2 for scenario_2 and 1 for scenario_1 in reverse chronological order // ie. scenario_3 = [3,2], scenario_2 = [2,1], scenario_1 = [1] - let scenario_dataset = scenario_datasets.get(0).unwrap(); + let scenario_dataset = scenario_datasets.first().unwrap(); let run_datasets = scenario_dataset.by_run(); let run_ids = run_datasets .iter() @@ -700,8 +702,8 @@ mod tests { .build(&db) .await?; let scenario_datasets = dataset.by_scenario(LiveDataFilter::IncludeLive); - let run_datasets = scenario_datasets.get(0).unwrap().by_run(); - let run_dataset = run_datasets.get(0).unwrap(); + let run_datasets = scenario_datasets.first().unwrap().by_run(); + let run_dataset = run_datasets.first().unwrap(); // there should be three runs for scenario 3 returned in reverse chronological order // ie. [3] diff --git a/src/entities/cpu.rs b/src/entities/cpu.rs index 456372b..79fb5d4 100644 --- a/src/entities/cpu.rs +++ b/src/entities/cpu.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity, @generated by sea-orm-codegen 1.0.0 +//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0 use sea_orm::entity::prelude::*; @@ -8,8 +8,8 @@ pub struct Model { #[sea_orm(primary_key)] pub id: i32, pub name: String, - #[sea_orm(column_type = "Float", nullable)] - pub tdp: Option, + #[sea_orm(column_type = "Double", nullable)] + pub tdp: Option, pub power_curve_id: Option, } diff --git a/src/entities/iteration.rs b/src/entities/iteration.rs index de99f1a..897fe85 100644 --- a/src/entities/iteration.rs +++ b/src/entities/iteration.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity, @generated by sea-orm-codegen 1.0.0 +//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0 use sea_orm::entity::prelude::*; diff --git a/src/entities/metrics.rs b/src/entities/metrics.rs index d0ff5aa..d17f6ac 100644 --- a/src/entities/metrics.rs +++ b/src/entities/metrics.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity, @generated by sea-orm-codegen 1.0.0 +//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0 use sea_orm::entity::prelude::*; diff --git a/src/entities/mod.rs b/src/entities/mod.rs index 0990486..3f2874c 100644 --- a/src/entities/mod.rs +++ b/src/entities/mod.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity, @generated by sea-orm-codegen 1.0.0 +//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0 pub mod prelude; diff --git a/src/entities/power_curve.rs b/src/entities/power_curve.rs index 407cd1c..497858c 100644 --- a/src/entities/power_curve.rs +++ b/src/entities/power_curve.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity, @generated by sea-orm-codegen 1.0.0 +//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0 use sea_orm::entity::prelude::*; @@ -7,14 +7,14 @@ use sea_orm::entity::prelude::*; pub struct Model { #[sea_orm(primary_key)] pub id: i32, - #[sea_orm(column_type = "Float")] - pub a: f32, - #[sea_orm(column_type = "Float")] - pub b: f32, - #[sea_orm(column_type = "Float")] - pub c: f32, - #[sea_orm(column_type = "Float")] - pub d: f32, + #[sea_orm(column_type = "Double")] + pub a: f64, + #[sea_orm(column_type = "Double")] + pub b: f64, + #[sea_orm(column_type = "Double")] + pub c: f64, + #[sea_orm(column_type = "Double")] + pub d: f64, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/src/entities/prelude.rs b/src/entities/prelude.rs index f77d2a8..c66e809 100644 --- a/src/entities/prelude.rs +++ b/src/entities/prelude.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity, @generated by sea-orm-codegen 1.0.0 +//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0 pub use super::cpu::Entity as Cpu; pub use super::iteration::Entity as Iteration; diff --git a/src/entities/run.rs b/src/entities/run.rs index 545f663..4608833 100644 --- a/src/entities/run.rs +++ b/src/entities/run.rs @@ -1,8 +1,8 @@ -//! `SeaORM` Entity, @generated by sea-orm-codegen 1.0.0 +//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0 use sea_orm::entity::prelude::*; -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel)] #[sea_orm(table_name = "run")] pub struct Model { #[sea_orm(primary_key)] @@ -11,6 +11,9 @@ pub struct Model { pub cpu_id: i32, pub start_time: i64, pub stop_time: i64, + pub region: Option, + #[sea_orm(column_type = "Double")] + pub carbon_intensity: f64, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/src/lib.rs b/src/lib.rs index a1b8c19..7bcc7de 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +pub mod carbon_intensity; pub mod config; pub mod dao; pub mod data; @@ -560,6 +561,8 @@ pub async fn run_live<'a>( pub async fn run<'a>( exec_plan: ExecutionPlan<'a>, + region: Option, + ci: f64, db: &DatabaseConnection, ) -> anyhow::Result { let mut processes_to_observe = exec_plan.external_processes_to_observe.unwrap_or(vec![]); // external procs to observe are cloned here. @@ -605,7 +608,7 @@ pub async fn run<'a>( cpu::ActiveModel { id: ActiveValue::NotSet, name: ActiveValue::Set(exec_plan.cpu.name), - tdp: ActiveValue::Set(Some(tdp as f32)), + tdp: ActiveValue::Set(Some(tdp)), power_curve_id: ActiveValue::NotSet, } .save(db) @@ -615,10 +618,10 @@ pub async fn run<'a>( Power::Curve(a, b, c, d) => { let power_curve = entities::power_curve::ActiveModel { id: ActiveValue::NotSet, - a: ActiveValue::Set(a as f32), - b: ActiveValue::Set(b as f32), - c: ActiveValue::Set(c as f32), - d: ActiveValue::Set(d as f32), + a: ActiveValue::Set(a), + b: ActiveValue::Set(b), + c: ActiveValue::Set(c), + d: ActiveValue::Set(d), } .save(db) .await? @@ -644,6 +647,8 @@ pub async fn run<'a>( id: ActiveValue::NotSet, is_live: ActiveValue::Set(is_live), cpu_id: ActiveValue::Set(cpu_id), + region: ActiveValue::Set(region), + carbon_intensity: ActiveValue::Set(ci), start_time: ActiveValue::Set(start_time), stop_time: ActiveValue::set(start_time), // set to start time for now we'll update it later } @@ -670,6 +675,10 @@ pub async fn run<'a>( config::ExecutionMode::Live => { run_live(run_id, processes_to_observe.clone(), db).await?; } + + config::ExecutionMode::Trigger => { + todo!() + } }; let stop_time = Utc::now().timestamp_millis(); // Use UTC to avoid confusion, UI can handle diff --git a/src/main.rs b/src/main.rs index a792181..537188b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ use anyhow::Context; use cardamon::{ + carbon_intensity::{fetch_ci, fetch_region_code, valid_region_code, GLOBAL_CI}, cleanup_stdout_stderr, config::{self, Config, ExecutionPlan, ProcessToObserve}, data::{dataset::LiveDataFilter, dataset_builder::DatasetBuilder, Data}, @@ -34,6 +35,9 @@ pub enum Commands { #[arg(help = "Please provide an observation name")] name: String, + #[arg(value_name = "REGION", short, long)] + region: Option, + #[arg(value_name = "EXTERNAL PIDs", short, long, value_delimiter = ',')] pids: Option>, @@ -134,14 +138,79 @@ async fn main() -> anyhow::Result<()> { Commands::Run { name, + region, pids, containers, external_only, } => { - println!("\n{}", " Cardamon ".reversed().green()); let config = load_config(&args.file) .context("Error loading configuration, please run `cardamon init`")?; + let now = Utc::now(); + + let region_code = match region { + None => { + print!("> fetching region from IP address"); + match fetch_region_code().await { + Err(err) => { + println!("\t{}", "✗".red()); + println!("\t{}", format!("- {}", err).bright_black()); + None + } + + Ok(code) => { + println!("\t{}", "✓".green()); + println!( + "\t{}", + format!("- using region code {}", code).bright_black() + ); + Some(code) + } + } + } + + Some(code) => { + print!("> validating region code"); + if valid_region_code(&code) { + println!("\t{}", "✓".green()); + Some(code) + } else { + println!("\t{}", "✗".red()); + None + } + } + }; + + let ci = match ®ion_code { + Some(code) => { + print!("> fetching carbon intensity for {}", code); + match fetch_ci(&code, &now).await { + Ok(ci) => { + println!("\t{}", "✓".green()); + println!( + "\t{}", + format!("- using {:.3} gWh CO2eq", ci).bright_black() + ); + ci + } + + Err(_) => { + println!("\t{}", "✗".red()); + println!("\t{}", "- using global avg 0.494 gWh CO2eq".bright_black()); + GLOBAL_CI + } + } + } + + None => { + print!( + "> using global avg carbon intensity {} gWh CO2eq", + "0.494".green() + ); + GLOBAL_CI + } + }; + // create an execution plan let cpu = config.cpu.clone(); let mut execution_plan = config.create_execution_plan(cpu, &name, external_only)?; @@ -153,7 +222,7 @@ async fn main() -> anyhow::Result<()> { cleanup_stdout_stderr()?; // run it! - let observation_dataset_rows = run(execution_plan, &db_conn).await?; + let observation_dataset_rows = run(execution_plan, region_code, ci, &db_conn).await?; let observation_dataset = observation_dataset_rows .last_n_runs(5) .all() @@ -168,16 +237,15 @@ async fn main() -> anyhow::Result<()> { let run_datasets = scenario_dataset.by_run(); // execute model for current run - let f = rab_model(0.16); let (head, tail) = run_datasets .split_first() .expect("Dataset does not include recent run."); - let run_data = head.apply_model(&db_conn, &f).await?; + let run_data = head.apply_model(&db_conn, &rab_model).await?; // execute model for previous runs and calculate trend let mut tail_data = vec![]; for run_dataset in tail { - let run_data = run_dataset.apply_model(&db_conn, &f).await?; + let run_data = run_dataset.apply_model(&db_conn, &rab_model).await?; tail_data.push(run_data.data); } let tail_data = Data::mean(&tail_data.iter().collect_vec()); @@ -193,10 +261,7 @@ async fn main() -> anyhow::Result<()> { } }; - println!( - "{}:", - format!("{}", scenario_dataset.scenario_name()).green() - ); + println!("{}:", scenario_dataset.scenario_name().to_string().green()); let table = Table::builder() .rows(rows![ @@ -242,11 +307,10 @@ async fn main() -> anyhow::Result<()> { println!("\nno data found!"); } - let f = rab_model(0.16); for scenario_dataset in dataset.by_scenario(LiveDataFilter::IncludeLive) { println!( "Scenario {}:", - format!("{}", scenario_dataset.scenario_name()).green() + scenario_dataset.scenario_name().to_string().green() ); let mut table = Table::builder() @@ -262,7 +326,7 @@ async fn main() -> anyhow::Result<()> { // let mut points: Vec<(f32, f32)> = vec![]; // let mut run = 0.0; for run_dataset in scenario_dataset.by_run() { - let run_data = run_dataset.apply_model(&db_conn, &f).await?; + let run_data = run_dataset.apply_model(&db_conn, &rab_model).await?; let run_start_time = Utc.timestamp_opt(run_data.start_time / 1000, 0).unwrap(); let run_duration = (run_data.stop_time - run_data.start_time) as f64 / 1000.0; let _per_min_factor = 60.0 / run_duration; diff --git a/src/metrics_logger.rs b/src/metrics_logger.rs index 014afc6..fde7aae 100644 --- a/src/metrics_logger.rs +++ b/src/metrics_logger.rs @@ -60,9 +60,7 @@ impl StopHandle { /// /// A `Result` containing the metrics log for the given scenario or an `Error` if either /// the scenario failed to complete successfully or any of the loggers contained errors. -pub fn start_logging<'a>( - processes_to_observe: Vec, -) -> anyhow::Result { +pub fn start_logging(processes_to_observe: Vec) -> anyhow::Result { let metrics_log = MetricsLog::new(); let metrics_log_mutex = Mutex::new(metrics_log); let shared_metrics_log = Arc::new(metrics_log_mutex); diff --git a/src/migrations/m20241109_180400_add_region_column.rs b/src/migrations/m20241109_180400_add_region_column.rs new file mode 100644 index 0000000..fa0520e --- /dev/null +++ b/src/migrations/m20241109_180400_add_region_column.rs @@ -0,0 +1,36 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Run::Table) + .add_column_if_not_exists(ColumnDef::new(Run::Region).string()) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Run::Table) + .drop_column(Alias::new("region")) + .to_owned(), + ) + .await + } +} + +#[derive(DeriveIden)] +pub enum Run { + Table, + Region, + CarbonIntensity, +} diff --git a/src/migrations/m20241110_191154_add_ci_column.rs b/src/migrations/m20241110_191154_add_ci_column.rs new file mode 100644 index 0000000..c91bada --- /dev/null +++ b/src/migrations/m20241110_191154_add_ci_column.rs @@ -0,0 +1,40 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Run::Table) + .add_column( + ColumnDef::new(Run::CarbonIntensity) + .double() + .not_null() + .default(0.494), + ) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Run::Table) + .drop_column(Alias::new("carbon_intensity")) + .to_owned(), + ) + .await + } +} + +#[derive(DeriveIden)] +enum Run { + Table, + CarbonIntensity, +} diff --git a/src/migrations/mod.rs b/src/migrations/mod.rs index 3736409..593bdb9 100644 --- a/src/migrations/mod.rs +++ b/src/migrations/mod.rs @@ -1,6 +1,8 @@ pub mod m20240822_095823_create_run_table; pub mod m20240822_095830_create_metrics_table; pub mod m20240822_095838_create_iteration_table; +pub mod m20241109_180400_add_region_column; +pub mod m20241110_191154_add_ci_column; pub use sea_orm_migration::prelude::*; @@ -13,6 +15,8 @@ impl MigratorTrait for Migrator { Box::new(m20240822_095823_create_run_table::Migration), Box::new(m20240822_095830_create_metrics_table::Migration), Box::new(m20240822_095838_create_iteration_table::Migration), + Box::new(m20241109_180400_add_region_column::Migration), + Box::new(m20241110_191154_add_ci_column::Migration), ] } } diff --git a/src/models.rs b/src/models.rs index 1608524..8c01669 100644 --- a/src/models.rs +++ b/src/models.rs @@ -9,39 +9,37 @@ fn boa_model(a: f64, b: f64, c: f64, d: f64) -> impl Fn(f64) -> f64 { move |workload| a * (b * (workload + c)).ln() + d } -pub fn rab_model(ci_g_w: f32) -> impl Fn(&Vec<&Metrics>, &Power) -> Data { - return move |metrics, power| { - let data = metrics - .into_iter() - .sorted_by(|a, b| b.time_stamp.cmp(&a.time_stamp)) - .tuple_windows() - .map(|(x, y)| { - match *power { - Power::Curve(a, b, c, d) => { - let cpu_util = 0.5 * (x.cpu_usage + y.cpu_usage) * 100.0; - let delta_t_h = (x.time_stamp - y.time_stamp) as f64 / 3_600_00.0; - - // boa_model(a, b, c, d)(cpu_util * delta_t_h) - boa_model(a, b, c, d)(cpu_util) * delta_t_h - } - - Power::Tdp(tdp) => { - let delta_t_h = (x.time_stamp - y.time_stamp) as f64 / 3_600_000.0; - - // taking the midpoint of the two datapoints and dividing by 50 because we're - // assuming tdp is at 50% utilization - (0.5 * (x.cpu_usage + y.cpu_usage)) / 50.0 * tdp * delta_t_h - } +pub fn rab_model(metrics: &Vec<&Metrics>, power: &Power, ci_g_wh: f64) -> Data { + let data = metrics + .iter() + .sorted_by(|a, b| b.time_stamp.cmp(&a.time_stamp)) + .tuple_windows() + .map(|(x, y)| { + match *power { + Power::Curve(a, b, c, d) => { + let cpu_util = 0.5 * (x.cpu_usage + y.cpu_usage) * 100.0; + let delta_t_h = (x.time_stamp - y.time_stamp) as f64 / 3_600_000.0; + + // boa_model(a, b, c, d)(cpu_util * delta_t_h) + boa_model(a, b, c, d)(cpu_util) * delta_t_h } - }) - .collect_vec(); - let pow_w = data.iter().fold(0_f64, |x, acc| x + acc); - let co2_g_w = pow_w * ci_g_w as f64; + Power::Tdp(tdp) => { + let delta_t_h = (x.time_stamp - y.time_stamp) as f64 / 3_600_000.0; - Data { - pow: pow_w, - co2: co2_g_w, - } - }; + // taking the midpoint of the two datapoints and dividing by 50 because we're + // assuming tdp is at 50% utilization + (0.5 * (x.cpu_usage + y.cpu_usage)) / 50.0 * tdp * delta_t_h + } + } + }) + .collect_vec(); + + let pow_w = data.iter().fold(0_f64, |x, acc| x + acc); + let co2_g_wh = pow_w * ci_g_wh; + + Data { + pow: pow_w, + co2: co2_g_wh, + } } diff --git a/src/server/routes.rs b/src/server/routes.rs index 2eba0e9..5d64d17 100644 --- a/src/server/routes.rs +++ b/src/server/routes.rs @@ -5,7 +5,7 @@ use crate::{ dataset_builder::DatasetBuilder, ProcessMetrics, ScenarioData, }, - models::{self, rab_model}, + models::rab_model, server::errors::ServerError, }; use anyhow::Context; @@ -95,7 +95,7 @@ pub async fn build_scenario_data( let mut scenario_data = vec![]; for scenario_dataset in dataset.by_scenario(LiveDataFilter::IncludeLive) { let data = scenario_dataset - .apply_model(&db, &models::rab_model(0.16), AggregationMethod::MostRecent) + .apply_model(db, &rab_model, AggregationMethod::MostRecent) .await?; scenario_data.push(data); } @@ -217,7 +217,7 @@ pub async fn get_runs( let mut runs = vec![]; for scenario_dataset in &dataset.by_scenario(LiveDataFilter::IncludeLive) { for run_dataset in scenario_dataset.by_run() { - let model_data = run_dataset.apply_model(&db, &rab_model(0.16)).await?; + let model_data = run_dataset.apply_model(&db, &rab_model).await?; let processes = model_data .process_data .iter()