Skip to content

Commit

Permalink
TPC-H benchmark can optionally write JSON output file with benchmark …
Browse files Browse the repository at this point in the history
…summary (#1766)

* use ordered-float 2.10

Signed-off-by: Andy Grove <[email protected]>

* Add DATAFUSION_VERSION constant

Signed-off-by: Andy Grove <[email protected]>

* Add option to write JSON summary file with benchmark results

* update test

* Clippy fix

Co-authored-by: Daniël Heres <[email protected]>
  • Loading branch information
andygrove and Dandandan authored Feb 9, 2022
1 parent ecd0081 commit 1431ef3
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 4 deletions.
3 changes: 3 additions & 0 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ env_logger = "0.9"
mimalloc = { version = "0.1", optional = true, default-features = false }
snmalloc-rs = {version = "0.2", optional = true, features= ["cache-friendly"] }
rand = "0.8.4"
serde = "1.0.136"
serde_json = "1.0.78"
num_cpus = "1.13.0"

[dev-dependencies]
ballista-core = { path = "../ballista/rust/core" }
105 changes: 101 additions & 4 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ use futures::future::join_all;
use rand::prelude::*;
use std::ops::Div;
use std::{
fs,
fs::{self, File},
io::Write,
iter::Iterator,
path::{Path, PathBuf},
sync::Arc,
time::Instant,
time::{Instant, SystemTime},
};

use ballista::context::BallistaContext;
Expand All @@ -42,6 +43,7 @@ use datafusion::prelude::*;
use datafusion::{
arrow::datatypes::{DataType, Field, Schema},
datasource::file_format::{csv::CsvFormat, FileFormat},
DATAFUSION_VERSION,
};
use datafusion::{
arrow::record_batch::RecordBatch, datasource::file_format::parquet::ParquetFormat,
Expand All @@ -56,6 +58,7 @@ use datafusion::{

use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
use serde::Serialize;
use structopt::StructOpt;

#[cfg(feature = "snmalloc")]
Expand Down Expand Up @@ -105,6 +108,10 @@ struct BallistaBenchmarkOpt {
/// Ballista executor port
#[structopt(long = "port")]
port: Option<u16>,

/// Path to output directory where JSON summary file should be written to
#[structopt(parse(from_os_str), short = "o", long = "output")]
output_path: Option<PathBuf>,
}

#[derive(Debug, StructOpt, Clone)]
Expand Down Expand Up @@ -140,6 +147,10 @@ struct DataFusionBenchmarkOpt {
/// Load the data into a MemTable before executing the query
#[structopt(short = "m", long = "mem-table")]
mem_table: bool,

/// Path to output directory where JSON summary file should be written to
#[structopt(parse(from_os_str), short = "o", long = "output")]
output_path: Option<PathBuf>,
}

#[derive(Debug, StructOpt, Clone)]
Expand Down Expand Up @@ -261,6 +272,7 @@ async fn main() -> Result<()> {

async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordBatch>> {
println!("Running benchmarks with the following options: {:?}", opt);
let mut benchmark_run = BenchmarkRun::new(opt.query);
let config = ExecutionConfig::new()
.with_target_partitions(opt.partitions)
.with_batch_size(opt.batch_size);
Expand Down Expand Up @@ -302,17 +314,27 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordB
result = execute_query(&mut ctx, &plan, opt.debug).await?;
let elapsed = start.elapsed().as_secs_f64() * 1000.0;
millis.push(elapsed as f64);
println!("Query {} iteration {} took {:.1} ms", opt.query, i, elapsed);
let row_count = result.iter().map(|b| b.num_rows()).sum();
println!(
"Query {} iteration {} took {:.1} ms and returned {} rows",
opt.query, i, elapsed, row_count
);
benchmark_run.add_result(elapsed, row_count);
}

let avg = millis.iter().sum::<f64>() / millis.len() as f64;
println!("Query {} avg time: {:.2} ms", opt.query, avg);

if let Some(path) = &opt.output_path {
write_summary_json(&mut benchmark_run, path)?;
}

Ok(result)
}

async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
println!("Running benchmarks with the following options: {:?}", opt);
let mut benchmark_run = BenchmarkRun::new(opt.query);

let config = BallistaConfig::builder()
.set(
Expand Down Expand Up @@ -350,7 +372,12 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
.unwrap();
let elapsed = start.elapsed().as_secs_f64() * 1000.0;
millis.push(elapsed as f64);
println!("Query {} iteration {} took {:.1} ms", opt.query, i, elapsed);
let row_count = batches.iter().map(|b| b.num_rows()).sum();
println!(
"Query {} iteration {} took {:.1} ms and returned {} rows",
opt.query, i, elapsed, row_count
);
benchmark_run.add_result(elapsed, row_count);
if opt.debug {
pretty::print_batches(&batches)?;
}
Expand All @@ -359,6 +386,27 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
let avg = millis.iter().sum::<f64>() / millis.len() as f64;
println!("Query {} avg time: {:.2} ms", opt.query, avg);

if let Some(path) = &opt.output_path {
write_summary_json(&mut benchmark_run, path)?;
}

Ok(())
}

fn write_summary_json(benchmark_run: &mut BenchmarkRun, path: &Path) -> Result<()> {
let json =
serde_json::to_string_pretty(&benchmark_run).expect("summary is serializable");
let filename = format!(
"tpch-q{}-{}.json",
benchmark_run.query, benchmark_run.start_time
);
let path = path.join(filename);
println!(
"Writing summary file to {}",
path.as_os_str().to_str().unwrap()
);
let mut file = File::create(path)?;
file.write_all(json.as_bytes())?;
Ok(())
}

Expand Down Expand Up @@ -779,6 +827,54 @@ fn get_schema(table: &str) -> Schema {
}
}

#[derive(Debug, Serialize)]
struct BenchmarkRun {
/// Benchmark crate version
benchmark_version: String,
/// DataFusion crate version
datafusion_version: String,
/// Number of CPU cores
num_cpus: usize,
/// Start time
start_time: u64,
/// CLI arguments
arguments: Vec<String>,
/// query number
query: usize,
/// list of individual run times and row counts
iterations: Vec<QueryResult>,
}

impl BenchmarkRun {
fn new(query: usize) -> Self {
Self {
benchmark_version: env!("CARGO_PKG_VERSION").to_owned(),
datafusion_version: DATAFUSION_VERSION.to_owned(),
num_cpus: num_cpus::get(),
start_time: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("current time is later than UNIX_EPOCH")
.as_secs(),
arguments: std::env::args()
.skip(1)
.into_iter()
.collect::<Vec<String>>(),
query,
iterations: vec![],
}
}

fn add_result(&mut self, elapsed: f64, row_count: usize) {
self.iterations.push(QueryResult { elapsed, row_count })
}
}

#[derive(Debug, Serialize)]
struct QueryResult {
elapsed: f64,
row_count: usize,
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -1235,6 +1331,7 @@ mod tests {
path: PathBuf::from(path.to_string()),
file_format: "tbl".to_string(),
mem_table: false,
output_path: None,
};
let actual = benchmark_datafusion(opt).await?;

Expand Down
3 changes: 3 additions & 0 deletions datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@
//! cargo run --example simple_udf
//! ```
/// DataFusion crate version
pub const DATAFUSION_VERSION: &str = env!("CARGO_PKG_VERSION");

extern crate sqlparser;

pub mod avro_to_arrow;
Expand Down

0 comments on commit 1431ef3

Please sign in to comment.