diff --git a/docker-compose.yml b/docker-compose.yml index 6d31206..19b226b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -42,6 +42,15 @@ services: condition: service_started working_dir: /var/www/matching-engine + gsy-execution-engine: + container_name: gsy-execution-engine + build: + dockerfile: ./gsy-execution-engine/Dockerfile + depends_on: + gsy-orderbook: + condition: service_started + working_dir: /var/www/execution-engine + mongodb: image: mongo:5.0 ports: diff --git a/gsy-execution-engine/Cargo.toml b/gsy-execution-engine/Cargo.toml new file mode 100644 index 0000000..26e6487 --- /dev/null +++ b/gsy-execution-engine/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "gsy-execution-engine" +version = "0.1.0" +authors = ["Grid Singularity "] +edition = "2021" +description = "Execution Engine for the Grid Singularity Energy Exchange" + +[dependencies] +anyhow = "1" +chrono = { version = "0.4.37", features = ["serde", "rustc-serialize"]} +clap = { version = "4", features = ["derive"] } +reqwest = { version = "0.11", features = ["json"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +tokio = { version = "1", features = ["macros", "rt-multi-thread"] } +tracing = { version = "0.1", features = ["log"] } +tracing-bunyan-formatter = "0.3" +tracing-log = "0.2" +tracing-subscriber = { version = "0.3", features = ["registry", "env-filter"] } + +[[bin]] +name = "gsy-execution-engine" +path = "src/main.rs" diff --git a/gsy-execution-engine/Dockerfile b/gsy-execution-engine/Dockerfile new file mode 100644 index 0000000..9de7955 --- /dev/null +++ b/gsy-execution-engine/Dockerfile @@ -0,0 +1,9 @@ +FROM rust:1.80 + +WORKDIR /var/www/gsy-execution-engine + +COPY . /var/www/gsy-execution-engine + +RUN cargo build --release + +ENTRYPOINT ["/var/www/gsy-execution-engine/target/release/gsy-execution-engine", "web3", "http://gsy-orderbook", "8080", "ws://gsy-node", "9944", "30"] diff --git a/gsy-execution-engine/src/connectors/mod.rs b/gsy-execution-engine/src/connectors/mod.rs new file mode 100644 index 0000000..f2e5666 --- /dev/null +++ b/gsy-execution-engine/src/connectors/mod.rs @@ -0,0 +1,2 @@ +pub mod offchain_storage; +pub mod substrate_connector; diff --git a/gsy-execution-engine/src/connectors/offchain_storage.rs b/gsy-execution-engine/src/connectors/offchain_storage.rs new file mode 100644 index 0000000..e85c714 --- /dev/null +++ b/gsy-execution-engine/src/connectors/offchain_storage.rs @@ -0,0 +1,41 @@ +use anyhow::{Result, anyhow}; +use reqwest::Client; +use crate::primitives::{ + trades::Trade, + measurements::Measurement +}; + +pub async fn fetch_trades_and_measurements_for_timeslot( + base_url: &str, + timeslot: &str, +) -> Result<(Vec, Vec)> { + let client = Client::new(); + + // TODO: we might have an endpoint like /trades?timeslot=YYYY-MM-DD-HH + let trades_url = format!("{}/trades?timeslot={}", base_url, timeslot); + let measurements_url = format!("{}/measurements?timeslot={}", base_url, timeslot); + + // 1) Fetch trades + let trades_resp = client.get(&trades_url).send().await?; + if !trades_resp.status().is_success() { + return Err(anyhow!( + "Failed to fetch trades for timeslot {}: HTTP {}", + timeslot, + trades_resp.status() + )); + } + let trades: Vec = trades_resp.json().await?; + + // 2) Fetch measurements + let measurements_resp = client.get(&measurements_url).send().await?; + if !measurements_resp.status().is_success() { + return Err(anyhow!( + "Failed to fetch measurements for timeslot {}: HTTP {}", + timeslot, + measurements_resp.status() + )); + } + let measurements: Vec = measurements_resp.json().await?; + + Ok((trades, measurements)) +} \ No newline at end of file diff --git a/gsy-execution-engine/src/connectors/substrate_connector.rs b/gsy-execution-engine/src/connectors/substrate_connector.rs new file mode 100644 index 0000000..6614d7f --- /dev/null +++ b/gsy-execution-engine/src/connectors/substrate_connector.rs @@ -0,0 +1,15 @@ +use anyhow::Result; +use tracing::info; +use crate::primitives::penalty_calculator::Penalty; + +pub async fn submit_penalties( + node_url: &str, + penalties: &[Penalty], +) -> Result<()> { + if penalties.is_empty() { + info!("No penalties to submit."); + return Ok(()); + } + // TODO: actual extrinsic logic + Ok(()) +} diff --git a/gsy-execution-engine/src/main.rs b/gsy-execution-engine/src/main.rs new file mode 100644 index 0000000..a87f78d --- /dev/null +++ b/gsy-execution-engine/src/main.rs @@ -0,0 +1,49 @@ +mod primitives; +mod services; +mod connectors; +mod utils; + +use clap::Parser; +use tracing::{error, info}; +use utils::cli::{Cli, Commands}; +use utils::telemetry::{get_subscriber, init_subscriber}; +use services::execution_orchestrator::run_execution_cycle; + +#[tokio::main] +async fn main() { + let subscriber = get_subscriber("gsy-execution-engine", "info", std::io::stdout); + init_subscriber(subscriber); + + let cli = Cli::parse(); + match cli.command { + Commands::Web3 { + offchain_host, + offchain_port, + node_host, + node_port, + polling_interval + } => { + info!("Starting engine..."); + let offchain_url = format!("{}:{}", offchain_host, offchain_port); + let node_url = format!("{}:{}", node_host, node_port); + + loop { + let timeslot = generate_previous_timeslot(); + if let Err(e) = run_execution_cycle(&offchain_url, &node_url, ×lot).await { + error!("Cycle failed for {}: {:?}", timeslot, e); + } + info!("Sleeping for {}s...", polling_interval); + tokio::time::sleep(std::time::Duration::from_secs(polling_interval)).await; + } + } + } +} + +fn generate_previous_timeslot() -> String { + use chrono::{Utc, Duration}; + + let now = Utc::now(); + let prev = now - Duration::seconds(30); + + prev.format("%Y-%m-%dT%H:%M:%SZ").to_string() +} diff --git a/gsy-execution-engine/src/primitives/measurements.rs b/gsy-execution-engine/src/primitives/measurements.rs new file mode 100644 index 0000000..d875569 --- /dev/null +++ b/gsy-execution-engine/src/primitives/measurements.rs @@ -0,0 +1,10 @@ +use serde::Deserialize; + +/// TODO: Change with the offchain storage returned struct +#[derive(Debug, Deserialize)] +pub struct Measurement { + pub area_uuid: String, + pub energy: f32, + pub timestamp: String, + // ... +} \ No newline at end of file diff --git a/gsy-execution-engine/src/primitives/mod.rs b/gsy-execution-engine/src/primitives/mod.rs new file mode 100644 index 0000000..32960ee --- /dev/null +++ b/gsy-execution-engine/src/primitives/mod.rs @@ -0,0 +1,3 @@ +pub mod trades; +pub mod measurements; +pub mod penalty_calculator; \ No newline at end of file diff --git a/gsy-execution-engine/src/primitives/penalty_calculator.rs b/gsy-execution-engine/src/primitives/penalty_calculator.rs new file mode 100644 index 0000000..46fe45c --- /dev/null +++ b/gsy-execution-engine/src/primitives/penalty_calculator.rs @@ -0,0 +1,17 @@ +use super::{trades::Trade, measurements::Measurement}; + +#[derive(Debug)] +pub struct Penalty { + pub area_uuid: String, + pub penalty_energy: f32, +} + +pub fn compute_penalties( + trades: &[Trade], + measurements: &[Measurement] +) -> Vec { + // TODO: + // e.g. penalty = measured_energy - traded_energy if it's > 0, etc. + // ... + Vec::new() // placeholder +} diff --git a/gsy-execution-engine/src/primitives/trades.rs b/gsy-execution-engine/src/primitives/trades.rs new file mode 100644 index 0000000..06bcb72 --- /dev/null +++ b/gsy-execution-engine/src/primitives/trades.rs @@ -0,0 +1,11 @@ +use serde::Deserialize; + +/// TODO: Change with the offchain storage returned struct +#[derive(Debug, Deserialize)] +pub struct Trade { + pub area_uuid: String, + pub energy: f32, + pub price: f32, + pub time_slot: String, + // ... +} diff --git a/gsy-execution-engine/src/services/execution_orchestrator.rs b/gsy-execution-engine/src/services/execution_orchestrator.rs new file mode 100644 index 0000000..9d543f5 --- /dev/null +++ b/gsy-execution-engine/src/services/execution_orchestrator.rs @@ -0,0 +1,41 @@ +use anyhow::Result; +use tracing::info; + +use crate::{ + primitives::{ + penalty_calculator::{compute_penalties, Penalty}, + trades::Trade, + measurements::Measurement, + }, + connectors::{ + offchain_storage::fetch_trades_and_measurements_for_timeslot, + substrate_connector::submit_penalties, + }, +}; + +/// Higher-level function that does the repeated/polling logic +/// 1) fetch trades/measurements +/// 2) compute penalties +/// 3) submit them +pub async fn run_execution_cycle( + offchain_url: &str, + node_url: &str, + timeslot_str: &str, +) -> Result<()> { + // 1) fetch trades/measurements + let (trades, measurements) = fetch_trades_and_measurements_for_timeslot(offchain_url, timeslot_str).await?; + info!( + "Fetched {} trades, {} measurements for timeslot {}", + trades.len(), + measurements.len(), + timeslot_str + ); + + // 2) compute penalties + let penalties: Vec = compute_penalties(&trades, &measurements); + info!("Computed {} penalties", penalties.len()); + + // 3) submit penalties + submit_penalties(node_url, &penalties).await?; + Ok(()) +} diff --git a/gsy-execution-engine/src/services/mod.rs b/gsy-execution-engine/src/services/mod.rs new file mode 100644 index 0000000..e2f62ba --- /dev/null +++ b/gsy-execution-engine/src/services/mod.rs @@ -0,0 +1 @@ +pub mod execution_orchestrator; diff --git a/gsy-execution-engine/src/utils/cli.rs b/gsy-execution-engine/src/utils/cli.rs new file mode 100644 index 0000000..e595f30 --- /dev/null +++ b/gsy-execution-engine/src/utils/cli.rs @@ -0,0 +1,28 @@ +use clap::{Parser, Subcommand}; + +#[derive(Parser, Debug)] +#[clap(author, version, about)] +pub struct Cli { + #[clap(subcommand)] + pub command: Commands, +} + +#[derive(Subcommand, Debug)] +pub enum Commands { + Web3 { + #[clap(default_value_t = String::from("http://127.0.0.1"))] + offchain_host: String, + + #[clap(default_value_t = String::from("8080"))] + offchain_port: String, + + #[clap(default_value_t = String::from("ws://127.0.0.1"))] + node_host: String, + + #[clap(default_value_t = String::from("9944"))] + node_port: String, + + #[clap(default_value_t = 30)] + polling_interval: u64, + }, +} diff --git a/gsy-execution-engine/src/utils/mod.rs b/gsy-execution-engine/src/utils/mod.rs new file mode 100644 index 0000000..acad601 --- /dev/null +++ b/gsy-execution-engine/src/utils/mod.rs @@ -0,0 +1,2 @@ +pub mod cli; +pub mod telemetry; \ No newline at end of file diff --git a/gsy-execution-engine/src/utils/telemetry.rs b/gsy-execution-engine/src/utils/telemetry.rs new file mode 100644 index 0000000..883c7b6 --- /dev/null +++ b/gsy-execution-engine/src/utils/telemetry.rs @@ -0,0 +1,30 @@ +use tracing::subscriber::set_global_default; +use tracing::Subscriber; +use tracing_bunyan_formatter::{BunyanFormattingLayer, JsonStorageLayer}; +use tracing_log::LogTracer; +use tracing_subscriber::fmt::MakeWriter; +use tracing_subscriber::{layer::SubscriberExt, EnvFilter, Registry}; + +pub fn get_subscriber( + name: &str, + env_filter: &str, + sink: Sink, +) -> impl Subscriber + Send + Sync +where + Sink: for<'a> MakeWriter<'a> + Send + Sync + 'static, +{ + let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(env_filter)); + + let formatting_layer = BunyanFormattingLayer::new(name.to_string(), sink); + + Registry::default() + .with(env_filter) + .with(JsonStorageLayer) + .with(formatting_layer) +} + +pub fn init_subscriber(subscriber: impl Subscriber + Send + Sync) { + LogTracer::init().expect("Failed to initialize LogTracer"); + + set_global_default(subscriber).expect("Failed to set global tracing subscriber"); +}