diff --git a/benchmarks/src/bin/h2o.rs b/benchmarks/src/bin/h2o.rs index c747cb0a96ca..d75f9a30b4e9 100644 --- a/benchmarks/src/bin/h2o.rs +++ b/benchmarks/src/bin/h2o.rs @@ -26,6 +26,7 @@ use datafusion::datasource::listing::{ use datafusion::datasource::MemTable; use datafusion::prelude::CsvReadOptions; use datafusion::{arrow::util::pretty, error::Result, prelude::SessionContext}; +use datafusion_benchmarks::BenchmarkRun; use std::path::PathBuf; use std::sync::Arc; use structopt::StructOpt; @@ -51,6 +52,9 @@ struct GroupBy { /// Load the data into a MemTable before executing the query #[structopt(short = "m", long = "mem-table")] mem_table: bool, + /// Path to machine readable output file + #[structopt(parse(from_os_str), short = "o", long = "output")] + output_path: Option, } #[tokio::main] @@ -63,6 +67,7 @@ async fn main() -> Result<()> { } async fn group_by(opt: &GroupBy) -> Result<()> { + let mut rundata = BenchmarkRun::new(); let path = opt.path.to_str().unwrap(); let mut config = ConfigOptions::from_env()?; config.execution.batch_size = 65535; @@ -94,7 +99,7 @@ async fn group_by(opt: &GroupBy) -> Result<()> { ctx.register_csv("x", path, CsvReadOptions::default().schema(&schema)) .await?; } - + rundata.start_new_case(&opt.query.to_string()); let sql = match opt.query { 1 => "select id1, sum(v1) as v1 from x group by id1", 2 => "select id1, id2, sum(v1) as v1 from x group by id1, id2", @@ -113,13 +118,17 @@ async fn group_by(opt: &GroupBy) -> Result<()> { let start = Instant::now(); let df = ctx.sql(sql).await?; let batches = df.collect().await?; - let elapsed = start.elapsed().as_millis(); - + let elapsed = start.elapsed(); + let numrows = batches.iter().map(|b| b.num_rows()).sum::(); if opt.debug { pretty::print_batches(&batches)?; } - - println!("h2o groupby query {} took {} ms", opt.query, elapsed); - + rundata.write_iter(elapsed, numrows); + println!( + "h2o groupby query {} took {} ms", + opt.query, + elapsed.as_secs_f64() * 1000.0 + ); + rundata.maybe_write_json(opt.output_path.as_ref())?; Ok(()) } diff --git a/benchmarks/src/bin/nyctaxi.rs b/benchmarks/src/bin/nyctaxi.rs index 266335228332..1de490c905e5 100644 --- a/benchmarks/src/bin/nyctaxi.rs +++ b/benchmarks/src/bin/nyctaxi.rs @@ -29,6 +29,7 @@ use datafusion::error::Result; use datafusion::execution::context::{SessionConfig, SessionContext}; use datafusion::prelude::{CsvReadOptions, ParquetReadOptions}; +use datafusion_benchmarks::BenchmarkRun; use structopt::StructOpt; #[cfg(feature = "snmalloc")] @@ -61,6 +62,10 @@ struct Opt { /// File format: `csv` or `parquet` #[structopt(short = "f", long = "format", default_value = "csv")] file_format: String, + + /// Path to machine readable output file + #[structopt(parse(from_os_str), short = "o", long = "output")] + output_path: Option, } #[tokio::main] @@ -91,42 +96,51 @@ async fn main() -> Result<()> { } } - datafusion_sql_benchmarks(&mut ctx, opt.iterations, opt.debug).await + datafusion_sql_benchmarks(&mut ctx, opt).await } -async fn datafusion_sql_benchmarks( - ctx: &mut SessionContext, - iterations: usize, - debug: bool, -) -> Result<()> { +async fn datafusion_sql_benchmarks(ctx: &mut SessionContext, opt: Opt) -> Result<()> { + let iterations = opt.iterations; + let debug = opt.debug; + let output = opt.output_path; + let mut rundata = BenchmarkRun::new(); let mut queries = HashMap::new(); queries.insert("fare_amt_by_passenger", "SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), SUM(fare_amount) FROM tripdata GROUP BY passenger_count"); for (name, sql) in &queries { println!("Executing '{name}'"); + rundata.start_new_case(name); for i in 0..iterations { - let start = Instant::now(); - execute_sql(ctx, sql, debug).await?; + let (rows, elapsed) = execute_sql(ctx, sql, debug).await?; println!( "Query '{}' iteration {} took {} ms", name, i, - start.elapsed().as_millis() + elapsed.as_secs_f64() * 1000.0 ); + rundata.write_iter(elapsed, rows); } } + rundata.maybe_write_json(output.as_ref())?; Ok(()) } -async fn execute_sql(ctx: &SessionContext, sql: &str, debug: bool) -> Result<()> { +async fn execute_sql( + ctx: &SessionContext, + sql: &str, + debug: bool, +) -> Result<(usize, std::time::Duration)> { + let start = Instant::now(); let dataframe = ctx.sql(sql).await?; if debug { println!("Optimized logical plan:\n{:?}", dataframe.logical_plan()); } let result = dataframe.collect().await?; + let elapsed = start.elapsed(); if debug { pretty::print_batches(&result)?; } - Ok(()) + let rowcount = result.iter().map(|b| b.num_rows()).sum(); + Ok((rowcount, elapsed)) } fn nyctaxi_schema() -> Schema { diff --git a/benchmarks/src/bin/parquet.rs b/benchmarks/src/bin/parquet.rs index df1ce5dab583..658d924dfb2a 100644 --- a/benchmarks/src/bin/parquet.rs +++ b/benchmarks/src/bin/parquet.rs @@ -24,6 +24,7 @@ use datafusion::physical_plan::collect; use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::prelude::{col, SessionConfig, SessionContext}; use datafusion::test_util::parquet::{ParquetScanOptions, TestParquetFile}; +use datafusion_benchmarks::BenchmarkRun; use parquet::file::properties::WriterProperties; use std::path::PathBuf; use std::sync::Arc; @@ -72,6 +73,10 @@ struct Opt { /// Total size of generated dataset. The default scale factor of 1.0 will generate a roughly 1GB parquet file #[structopt(short = "s", long = "scale-factor", default_value = "1.0")] scale_factor: f32, + + /// Path to machine readable output file + #[structopt(parse(from_os_str), short = "o", long = "output")] + output_path: Option, } impl Opt { /// Initialize parquet test file given options. @@ -114,6 +119,7 @@ async fn main() -> Result<()> { async fn run_sort_benchmarks(opt: Opt, test_file: &TestParquetFile) -> Result<()> { use datafusion::physical_expr::expressions::col; + let mut rundata = BenchmarkRun::new(); let schema = test_file.schema(); let sort_cases = vec![ ( @@ -195,22 +201,30 @@ async fn run_sort_benchmarks(opt: Opt, test_file: &TestParquetFile) -> Result<() ]; for (title, expr) in sort_cases { println!("Executing '{title}' (sorting by: {expr:?})"); + rundata.start_new_case(title); for i in 0..opt.iterations { let config = SessionConfig::new().with_target_partitions(opt.partitions); let ctx = SessionContext::with_config(config); - let start = Instant::now(); - exec_sort(&ctx, &expr, test_file, opt.debug).await?; - println!( - "Iteration {} finished in {} ms", - i, - start.elapsed().as_millis() - ); + let (rows, elapsed) = exec_sort(&ctx, &expr, test_file, opt.debug).await?; + let ms = elapsed.as_secs_f64() * 1000.0; + println!("Iteration {i} finished in {ms} ms"); + rundata.write_iter(elapsed, rows); } println!("\n"); } + if let Some(path) = &opt.output_path { + std::fs::write(path, rundata.to_json())?; + } Ok(()) } +fn parquet_scan_disp(opts: &ParquetScanOptions) -> String { + format!( + "pushdown_filters={}, reorder_filters={}, page_index={}", + opts.pushdown_filters, opts.reorder_filters, opts.enable_page_index + ) +} async fn run_filter_benchmarks(opt: Opt, test_file: &TestParquetFile) -> Result<()> { + let mut rundata = BenchmarkRun::new(); let scan_options_matrix = vec![ ParquetScanOptions { pushdown_filters: false, @@ -230,54 +244,57 @@ async fn run_filter_benchmarks(opt: Opt, test_file: &TestParquetFile) -> Result< ]; let filter_matrix = vec![ - // Selective-ish filter - col("request_method").eq(lit("GET")), - // Non-selective filter - col("request_method").not_eq(lit("GET")), - // Basic conjunction - col("request_method") - .eq(lit("POST")) - .and(col("response_status").eq(lit(503_u16))), - // Nested filters - col("request_method").eq(lit("POST")).and(or( - col("response_status").eq(lit(503_u16)), - col("response_status").eq(lit(403_u16)), - )), - // Many filters - disjunction([ + ("Selective-ish filter", col("request_method").eq(lit("GET"))), + ( + "Non-selective filter", col("request_method").not_eq(lit("GET")), - col("response_status").eq(lit(400_u16)), - col("service").eq(lit("backend")), - ]) - .unwrap(), - // Filter everything - col("response_status").eq(lit(429_u16)), - // Filter nothing - col("response_status").gt(lit(0_u16)), + ), + ( + "Basic conjunction", + col("request_method") + .eq(lit("POST")) + .and(col("response_status").eq(lit(503_u16))), + ), + ( + "Nested filters", + col("request_method").eq(lit("POST")).and(or( + col("response_status").eq(lit(503_u16)), + col("response_status").eq(lit(403_u16)), + )), + ), + ( + "Many filters", + disjunction([ + col("request_method").not_eq(lit("GET")), + col("response_status").eq(lit(400_u16)), + col("service").eq(lit("backend")), + ]) + .unwrap(), + ), + ("Filter everything", col("response_status").eq(lit(429_u16))), + ("Filter nothing", col("response_status").gt(lit(0_u16))), ]; - for filter_expr in &filter_matrix { - println!("Executing with filter '{filter_expr}'"); + for (name, filter_expr) in &filter_matrix { + println!("Executing '{name}' (filter: {filter_expr})"); for scan_options in &scan_options_matrix { println!("Using scan options {scan_options:?}"); + rundata + .start_new_case(&format!("{name}: {}", parquet_scan_disp(scan_options))); for i in 0..opt.iterations { - let start = Instant::now(); - let config = scan_options.config().with_target_partitions(opt.partitions); let ctx = SessionContext::with_config(config); - let rows = + let (rows, elapsed) = exec_scan(&ctx, test_file, filter_expr.clone(), opt.debug).await?; - println!( - "Iteration {} returned {} rows in {} ms", - i, - rows, - start.elapsed().as_millis() - ); + let ms = elapsed.as_secs_f64() * 1000.0; + println!("Iteration {} returned {} rows in {ms} ms", i, rows); + rundata.write_iter(elapsed, rows); } } println!("\n"); } + rundata.maybe_write_json(opt.output_path.as_ref())?; Ok(()) } @@ -286,16 +303,18 @@ async fn exec_scan( test_file: &TestParquetFile, filter: Expr, debug: bool, -) -> Result { +) -> Result<(usize, std::time::Duration)> { + let start = Instant::now(); let exec = test_file.create_scan(Some(filter)).await?; let task_ctx = ctx.task_ctx(); let result = collect(exec, task_ctx).await?; - + let elapsed = start.elapsed(); if debug { pretty::print_batches(&result)?; } - Ok(result.iter().map(|b| b.num_rows()).sum()) + let rows = result.iter().map(|b| b.num_rows()).sum(); + Ok((rows, elapsed)) } async fn exec_sort( @@ -303,15 +322,18 @@ async fn exec_sort( expr: &[PhysicalSortExpr], test_file: &TestParquetFile, debug: bool, -) -> Result<()> { +) -> Result<(usize, std::time::Duration)> { + let start = Instant::now(); let scan = test_file.create_scan(None).await?; let exec = Arc::new(SortExec::try_new(expr.to_owned(), scan, None)?); let task_ctx = ctx.task_ctx(); let result = collect(exec, task_ctx).await?; + let elapsed = start.elapsed(); if debug { pretty::print_batches(&result)?; } - Ok(()) + let rows = result.iter().map(|b| b.num_rows()).sum(); + Ok((rows, elapsed)) } fn gen_data( diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index b869490a527a..2922e584e41e 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -17,15 +17,7 @@ //! Benchmark derived from TPC-H. This is not an official TPC-H benchmark. -use std::{ - fs::File, - io::Write, - iter::Iterator, - path::{Path, PathBuf}, - sync::Arc, - time::{Instant, SystemTime}, -}; - +use datafusion::datasource::file_format::{csv::CsvFormat, FileFormat}; use datafusion::datasource::{MemTable, TableProvider}; use datafusion::error::{DataFusionError, Result}; use datafusion::parquet::basic::Compression; @@ -39,18 +31,14 @@ use datafusion::{ arrow::util::pretty, datasource::listing::{ListingOptions, ListingTable, ListingTableConfig}, }; -use datafusion::{ - datasource::file_format::{csv::CsvFormat, FileFormat}, - DATAFUSION_VERSION, -}; -use datafusion_benchmarks::tpch::*; +use datafusion_benchmarks::{tpch::*, BenchmarkRun}; +use std::{iter::Iterator, path::PathBuf, sync::Arc, time::Instant}; use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION; use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION; use datafusion::datasource::listing::ListingTableUrl; use datafusion::scheduler::Scheduler; use futures::TryStreamExt; -use serde::Serialize; use structopt::StructOpt; #[cfg(feature = "snmalloc")] @@ -95,7 +83,7 @@ struct DataFusionBenchmarkOpt { #[structopt(short = "m", long = "mem-table")] mem_table: bool, - /// Path to output directory where JSON summary file should be written to + /// Path to machine readable output file #[structopt(parse(from_os_str), short = "o", long = "output")] output_path: Option, @@ -201,22 +189,22 @@ async fn benchmark_datafusion( let mut benchmark_run = BenchmarkRun::new(); let mut results = vec![]; for query_id in query_range { + benchmark_run.start_new_case(&format!("Query {query_id}")); let (query_run, result) = benchmark_query(&opt, query_id).await?; results.push(result); - benchmark_run.add_query(query_run); - } - - if let Some(path) = &opt.output_path { - write_summary_json(&mut benchmark_run, path)?; + for iter in query_run { + benchmark_run.write_iter(iter.elapsed, iter.row_count); + } } + benchmark_run.maybe_write_json(opt.output_path.as_ref())?; Ok(results) } async fn benchmark_query( opt: &DataFusionBenchmarkOpt, query_id: usize, -) -> Result<(QueryRun, Vec)> { - let mut benchmark_run = QueryRun::new(query_id); +) -> Result<(Vec, Vec)> { + let mut query_results = vec![]; let config = SessionConfig::new() .with_target_partitions(opt.partitions) .with_batch_size(opt.batch_size) @@ -252,19 +240,20 @@ async fn benchmark_query( } } - let elapsed = start.elapsed().as_secs_f64() * 1000.0; - millis.push(elapsed); + let elapsed = start.elapsed(); //.as_secs_f64() * 1000.0; + let ms = elapsed.as_secs_f64() * 1000.0; + millis.push(ms); let row_count = result.iter().map(|b| b.num_rows()).sum(); println!( - "Query {query_id} iteration {i} took {elapsed:.1} ms and returned {row_count} rows" + "Query {query_id} iteration {i} took {ms:.1} ms and returned {row_count} rows" ); - benchmark_run.add_result(elapsed, row_count); + query_results.push(QueryResult { elapsed, row_count }); } let avg = millis.iter().sum::() / millis.len() as f64; println!("Query {query_id} avg time: {avg:.2} ms"); - Ok((benchmark_run, result)) + Ok((query_results, result)) } async fn register_tables( @@ -302,20 +291,6 @@ async fn register_tables( Ok(()) } -fn write_summary_json(benchmark_run: &mut BenchmarkRun, path: &Path) -> Result<()> { - let json = - serde_json::to_string_pretty(&benchmark_run).expect("summary is serializable"); - let filename = format!("tpch-summary--{}.json", benchmark_run.context.start_time); - let path = path.join(filename); - println!( - "Writing summary file to {}", - path.as_os_str().to_str().unwrap() - ); - let mut file = File::create(path)?; - file.write_all(json.as_bytes())?; - Ok(()) -} - async fn execute_query( ctx: &SessionContext, sql: &str, @@ -421,86 +396,8 @@ async fn get_table( Ok(Arc::new(ListingTable::try_new(config)?)) } -#[derive(Debug, Serialize)] -struct RunContext { - /// Benchmark crate version - benchmark_version: String, - /// DataFusion crate version - datafusion_version: String, - /// Number of CPU cores - num_cpus: usize, - /// Start time - start_time: u64, - /// CLI arguments - arguments: Vec, -} - -impl RunContext { - fn new() -> Self { - Self { - benchmark_version: env!("CARGO_PKG_VERSION").to_owned(), - datafusion_version: DATAFUSION_VERSION.to_owned(), - num_cpus: num_cpus::get(), - start_time: SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .expect("current time is later than UNIX_EPOCH") - .as_secs(), - arguments: std::env::args().skip(1).collect::>(), - } - } -} - -#[derive(Debug, Serialize)] -struct BenchmarkRun { - /// Information regarding the environment in which the benchmark was run - context: RunContext, - /// Per-query summaries - queries: Vec, -} - -impl BenchmarkRun { - fn new() -> Self { - Self { - context: RunContext::new(), - queries: vec![], - } - } - - fn add_query(&mut self, query: QueryRun) { - self.queries.push(query) - } -} - -#[derive(Debug, Serialize)] -struct QueryRun { - /// query number - query: usize, - /// list of individual run times and row counts - iterations: Vec, - /// Start time - start_time: u64, -} - -impl QueryRun { - fn new(query: usize) -> Self { - Self { - query, - iterations: vec![], - start_time: SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .expect("current time is later than UNIX_EPOCH") - .as_secs(), - } - } - - fn add_result(&mut self, elapsed: f64, row_count: usize) { - self.iterations.push(QueryResult { elapsed, row_count }) - } -} - -#[derive(Debug, Serialize)] struct QueryResult { - elapsed: f64, + elapsed: std::time::Duration, row_count: usize, } @@ -508,7 +405,9 @@ struct QueryResult { mod tests { use super::*; use datafusion::sql::TableReference; + use std::fs::File; use std::io::{BufRead, BufReader}; + use std::path::Path; use std::sync::Arc; #[tokio::test] @@ -823,6 +722,8 @@ mod tests { #[cfg(test)] #[cfg(feature = "ci")] mod ci { + use std::path::Path; + use super::*; use arrow::datatypes::{DataType, Field}; use datafusion_proto::bytes::{logical_plan_from_bytes, logical_plan_to_bytes}; diff --git a/benchmarks/src/lib.rs b/benchmarks/src/lib.rs index af1dd46fd42e..c2f4e876ce70 100644 --- a/benchmarks/src/lib.rs +++ b/benchmarks/src/lib.rs @@ -15,4 +15,143 @@ // specific language governing permissions and limitations // under the License. +use datafusion::{error::Result, DATAFUSION_VERSION}; +use serde::{Serialize, Serializer}; +use serde_json::Value; +use std::{ + collections::HashMap, + path::Path, + time::{Duration, SystemTime}, +}; + pub mod tpch; + +fn serialize_start_time(start_time: &SystemTime, ser: S) -> Result +where + S: Serializer, +{ + ser.serialize_u64( + start_time + .duration_since(SystemTime::UNIX_EPOCH) + .expect("current time is later than UNIX_EPOCH") + .as_secs(), + ) +} +fn serialize_elapsed(elapsed: &Duration, ser: S) -> Result +where + S: Serializer, +{ + let ms = elapsed.as_secs_f64() * 1000.0; + ser.serialize_f64(ms) +} +#[derive(Debug, Serialize)] +pub struct RunContext { + /// Benchmark crate version + pub benchmark_version: String, + /// DataFusion crate version + pub datafusion_version: String, + /// Number of CPU cores + pub num_cpus: usize, + /// Start time + #[serde(serialize_with = "serialize_start_time")] + pub start_time: SystemTime, + /// CLI arguments + pub arguments: Vec, +} + +impl Default for RunContext { + fn default() -> Self { + Self::new() + } +} + +impl RunContext { + pub fn new() -> Self { + Self { + benchmark_version: env!("CARGO_PKG_VERSION").to_owned(), + datafusion_version: DATAFUSION_VERSION.to_owned(), + num_cpus: num_cpus::get(), + start_time: SystemTime::now(), + arguments: std::env::args().skip(1).collect::>(), + } + } +} + +/// A single iteration of a benchmark query +#[derive(Debug, Serialize)] +struct QueryIter { + #[serde(serialize_with = "serialize_elapsed")] + elapsed: Duration, + row_count: usize, +} +/// A single benchmark case +#[derive(Debug, Serialize)] +pub struct BenchQuery { + query: String, + iterations: Vec, + #[serde(serialize_with = "serialize_start_time")] + start_time: SystemTime, +} + +/// collects benchmark run data and then serializes it at the end +pub struct BenchmarkRun { + context: RunContext, + queries: Vec, + current_case: Option, +} + +impl Default for BenchmarkRun { + fn default() -> Self { + Self::new() + } +} + +impl BenchmarkRun { + // create new + pub fn new() -> Self { + Self { + context: RunContext::new(), + queries: vec![], + current_case: None, + } + } + /// begin a new case. iterations added after this will be included in the new case + pub fn start_new_case(&mut self, id: &str) { + self.queries.push(BenchQuery { + query: id.to_owned(), + iterations: vec![], + start_time: SystemTime::now(), + }); + if let Some(c) = self.current_case.as_mut() { + *c += 1; + } else { + self.current_case = Some(0); + } + } + /// Write a new iteration to the current case + pub fn write_iter(&mut self, elapsed: Duration, row_count: usize) { + if let Some(idx) = self.current_case { + self.queries[idx] + .iterations + .push(QueryIter { elapsed, row_count }) + } else { + panic!("no cases existed yet"); + } + } + + /// Stringify data into formatted json + pub fn to_json(&self) -> String { + let mut output = HashMap::<&str, Value>::new(); + output.insert("context", serde_json::to_value(&self.context).unwrap()); + output.insert("queries", serde_json::to_value(&self.queries).unwrap()); + serde_json::to_string_pretty(&output).unwrap() + } + + /// Write data as json into output path if it exists. + pub fn maybe_write_json(&self, maybe_path: Option>) -> Result<()> { + if let Some(path) = maybe_path { + std::fs::write(path, self.to_json())?; + }; + Ok(()) + } +}