Skip to content

Commit

Permalink
Add GSY Execution Engine. Introduce the new execution engine code wit…
Browse files Browse the repository at this point in the history
…h connectors, primitives, services, and utilities. This commit introduces placeholders for trade/measurement fetching, penalty computation, and a polling loop. Add Dockerfile. Update docker-compose
  • Loading branch information
keyanhov committed Jan 22, 2025
1 parent 179bb1d commit 408c696
Show file tree
Hide file tree
Showing 16 changed files with 291 additions and 0 deletions.
9 changes: 9 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ services:
condition: service_started
working_dir: /var/www/matching-engine

gsy-execution-engine:
container_name: gsy-execution-engine
build:
dockerfile: ./gsy-execution-engine/Dockerfile
depends_on:
gsy-orderbook:
condition: service_started
working_dir: /var/www/execution-engine

mongodb:
image: mongo:5.0
ports:
Expand Down
23 changes: 23 additions & 0 deletions gsy-execution-engine/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
name = "gsy-execution-engine"
version = "0.1.0"
authors = ["Grid Singularity <[email protected]>"]
edition = "2021"
description = "Execution Engine for the Grid Singularity Energy Exchange"

[dependencies]
anyhow = "1"
chrono = { version = "0.4.37", features = ["serde", "rustc-serialize"]}
clap = { version = "4", features = ["derive"] }
reqwest = { version = "0.11", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
tracing = { version = "0.1", features = ["log"] }
tracing-bunyan-formatter = "0.3"
tracing-log = "0.2"
tracing-subscriber = { version = "0.3", features = ["registry", "env-filter"] }

[[bin]]
name = "gsy-execution-engine"
path = "src/main.rs"
9 changes: 9 additions & 0 deletions gsy-execution-engine/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
FROM rust:1.80

WORKDIR /var/www/gsy-execution-engine

COPY . /var/www/gsy-execution-engine

RUN cargo build --release

ENTRYPOINT ["/var/www/gsy-execution-engine/target/release/gsy-execution-engine", "web3", "http://gsy-orderbook", "8080", "ws://gsy-node", "9944", "30"]
2 changes: 2 additions & 0 deletions gsy-execution-engine/src/connectors/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod offchain_storage;
pub mod substrate_connector;
41 changes: 41 additions & 0 deletions gsy-execution-engine/src/connectors/offchain_storage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use anyhow::{Result, anyhow};
use reqwest::Client;
use crate::primitives::{
trades::Trade,
measurements::Measurement
};

pub async fn fetch_trades_and_measurements_for_timeslot(
base_url: &str,
timeslot: &str,
) -> Result<(Vec<Trade>, Vec<Measurement>)> {
let client = Client::new();

// TODO: we might have an endpoint like /trades?timeslot=YYYY-MM-DD-HH
let trades_url = format!("{}/trades?timeslot={}", base_url, timeslot);
let measurements_url = format!("{}/measurements?timeslot={}", base_url, timeslot);

// 1) Fetch trades
let trades_resp = client.get(&trades_url).send().await?;
if !trades_resp.status().is_success() {
return Err(anyhow!(
"Failed to fetch trades for timeslot {}: HTTP {}",
timeslot,
trades_resp.status()
));
}
let trades: Vec<Trade> = trades_resp.json().await?;

// 2) Fetch measurements
let measurements_resp = client.get(&measurements_url).send().await?;
if !measurements_resp.status().is_success() {
return Err(anyhow!(
"Failed to fetch measurements for timeslot {}: HTTP {}",
timeslot,
measurements_resp.status()
));
}
let measurements: Vec<Measurement> = measurements_resp.json().await?;

Ok((trades, measurements))
}
15 changes: 15 additions & 0 deletions gsy-execution-engine/src/connectors/substrate_connector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use anyhow::Result;
use tracing::info;
use crate::primitives::penalty_calculator::Penalty;

pub async fn submit_penalties(
node_url: &str,
penalties: &[Penalty],
) -> Result<()> {
if penalties.is_empty() {
info!("No penalties to submit.");
return Ok(());
}
// TODO: actual extrinsic logic
Ok(())
}
49 changes: 49 additions & 0 deletions gsy-execution-engine/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
mod primitives;
mod services;
mod connectors;
mod utils;

use clap::Parser;
use tracing::{error, info};
use utils::cli::{Cli, Commands};
use utils::telemetry::{get_subscriber, init_subscriber};
use services::execution_orchestrator::run_execution_cycle;

#[tokio::main]
async fn main() {
let subscriber = get_subscriber("gsy-execution-engine", "info", std::io::stdout);
init_subscriber(subscriber);

let cli = Cli::parse();
match cli.command {
Commands::Web3 {
offchain_host,
offchain_port,
node_host,
node_port,
polling_interval
} => {
info!("Starting engine...");
let offchain_url = format!("{}:{}", offchain_host, offchain_port);
let node_url = format!("{}:{}", node_host, node_port);

loop {
let timeslot = generate_previous_timeslot();
if let Err(e) = run_execution_cycle(&offchain_url, &node_url, &timeslot).await {
error!("Cycle failed for {}: {:?}", timeslot, e);
}
info!("Sleeping for {}s...", polling_interval);
tokio::time::sleep(std::time::Duration::from_secs(polling_interval)).await;
}
}
}
}

fn generate_previous_timeslot() -> String {
use chrono::{Utc, Duration};

let now = Utc::now();
let prev = now - Duration::seconds(30);

prev.format("%Y-%m-%dT%H:%M:%SZ").to_string()
}
10 changes: 10 additions & 0 deletions gsy-execution-engine/src/primitives/measurements.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use serde::Deserialize;

/// TODO: Change with the offchain storage returned struct
#[derive(Debug, Deserialize)]
pub struct Measurement {
pub area_uuid: String,
pub energy: f32,
pub timestamp: String,
// ...
}
3 changes: 3 additions & 0 deletions gsy-execution-engine/src/primitives/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod trades;
pub mod measurements;
pub mod penalty_calculator;
17 changes: 17 additions & 0 deletions gsy-execution-engine/src/primitives/penalty_calculator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use super::{trades::Trade, measurements::Measurement};

#[derive(Debug)]
pub struct Penalty {
pub area_uuid: String,
pub penalty_energy: f32,
}

pub fn compute_penalties(
trades: &[Trade],
measurements: &[Measurement]
) -> Vec<Penalty> {
// TODO:
// e.g. penalty = measured_energy - traded_energy if it's > 0, etc.
// ...
Vec::new() // placeholder
}
11 changes: 11 additions & 0 deletions gsy-execution-engine/src/primitives/trades.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use serde::Deserialize;

/// TODO: Change with the offchain storage returned struct
#[derive(Debug, Deserialize)]
pub struct Trade {
pub area_uuid: String,
pub energy: f32,
pub price: f32,
pub time_slot: String,
// ...
}
41 changes: 41 additions & 0 deletions gsy-execution-engine/src/services/execution_orchestrator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use anyhow::Result;
use tracing::info;

use crate::{
primitives::{
penalty_calculator::{compute_penalties, Penalty},
trades::Trade,
measurements::Measurement,
},
connectors::{
offchain_storage::fetch_trades_and_measurements_for_timeslot,
substrate_connector::submit_penalties,
},
};

/// Higher-level function that does the repeated/polling logic
/// 1) fetch trades/measurements
/// 2) compute penalties
/// 3) submit them
pub async fn run_execution_cycle(
offchain_url: &str,
node_url: &str,
timeslot_str: &str,
) -> Result<()> {
// 1) fetch trades/measurements
let (trades, measurements) = fetch_trades_and_measurements_for_timeslot(offchain_url, timeslot_str).await?;
info!(
"Fetched {} trades, {} measurements for timeslot {}",
trades.len(),
measurements.len(),
timeslot_str
);

// 2) compute penalties
let penalties: Vec<Penalty> = compute_penalties(&trades, &measurements);
info!("Computed {} penalties", penalties.len());

// 3) submit penalties
submit_penalties(node_url, &penalties).await?;
Ok(())
}
1 change: 1 addition & 0 deletions gsy-execution-engine/src/services/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod execution_orchestrator;
28 changes: 28 additions & 0 deletions gsy-execution-engine/src/utils/cli.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use clap::{Parser, Subcommand};

#[derive(Parser, Debug)]
#[clap(author, version, about)]
pub struct Cli {
#[clap(subcommand)]
pub command: Commands,
}

#[derive(Subcommand, Debug)]
pub enum Commands {
Web3 {
#[clap(default_value_t = String::from("http://127.0.0.1"))]
offchain_host: String,

#[clap(default_value_t = String::from("8080"))]
offchain_port: String,

#[clap(default_value_t = String::from("ws://127.0.0.1"))]
node_host: String,

#[clap(default_value_t = String::from("9944"))]
node_port: String,

#[clap(default_value_t = 30)]
polling_interval: u64,
},
}
2 changes: 2 additions & 0 deletions gsy-execution-engine/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod cli;
pub mod telemetry;
30 changes: 30 additions & 0 deletions gsy-execution-engine/src/utils/telemetry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use tracing::subscriber::set_global_default;
use tracing::Subscriber;
use tracing_bunyan_formatter::{BunyanFormattingLayer, JsonStorageLayer};
use tracing_log::LogTracer;
use tracing_subscriber::fmt::MakeWriter;
use tracing_subscriber::{layer::SubscriberExt, EnvFilter, Registry};

pub fn get_subscriber<Sink>(
name: &str,
env_filter: &str,
sink: Sink,
) -> impl Subscriber + Send + Sync
where
Sink: for<'a> MakeWriter<'a> + Send + Sync + 'static,
{
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(env_filter));

let formatting_layer = BunyanFormattingLayer::new(name.to_string(), sink);

Registry::default()
.with(env_filter)
.with(JsonStorageLayer)
.with(formatting_layer)
}

pub fn init_subscriber(subscriber: impl Subscriber + Send + Sync) {
LogTracer::init().expect("Failed to initialize LogTracer");

set_global_default(subscriber).expect("Failed to set global tracing subscriber");
}

0 comments on commit 408c696

Please sign in to comment.