From 045db8d413087dda565b7bf4abd7ebe32b618be3 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 4 Jan 2023 16:05:38 +0000 Subject: [PATCH 1/3] Handle trailing tbl column in TPCH benchmarks --- benchmarks/src/bin/tpch.rs | 12 ++++++------ benchmarks/src/tpch.rs | 21 ++++++++++++++++++++- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index bff7999cdb16..d1bc62d804d9 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -17,6 +17,7 @@ //! Benchmark derived from TPC-H. This is not an official TPC-H benchmark. +use arrow::datatypes::{DataType, Field}; use std::{ fs::File, io::Write, @@ -402,7 +403,6 @@ async fn get_table( unimplemented!("Invalid file format '{}'", other); } }; - let schema = Arc::new(get_tpch_table_schema(table)); let options = ListingOptions::new(format) .with_file_extension(extension) @@ -412,10 +412,11 @@ async fn get_table( let table_path = ListingTableUrl::parse(path)?; let config = ListingTableConfig::new(table_path).with_listing_options(options); - let config = if table_format == "parquet" { - config.infer_schema(&state).await? - } else { - config.with_schema(schema) + let config = match table_format { + "parquet" => config.infer_schema(&state).await?, + "tbl" => config.with_schema(Arc::new(get_tbl_tpch_table_schema(table))), + "csv" => config.with_schema(Arc::new(get_tpch_table_schema(table))), + _ => unreachable!(), }; Ok(Arc::new(ListingTable::try_new(config)?)) @@ -1086,7 +1087,6 @@ mod ci { /// * the correct number of rows are returned /// * the content of the rows is correct async fn verify_query(n: usize) -> Result<()> { - use datafusion::arrow::datatypes::{DataType, Field}; use datafusion::common::ScalarValue; use datafusion::logical_expr::expr::Cast; diff --git a/benchmarks/src/tpch.rs b/benchmarks/src/tpch.rs index deaecdd93db1..47a69f62d198 100644 --- a/benchmarks/src/tpch.rs +++ b/benchmarks/src/tpch.rs @@ -43,6 +43,15 @@ pub const TPCH_TABLES: &[&str] = &[ "part", "supplier", "partsupp", "customer", "orders", "lineitem", "nation", "region", ]; +/// The `.tbl` file contains a trailing column +pub fn get_tbl_tpch_table_schema(table: &str) -> Schema { + let mut schema = get_tpch_table_schema(table); + schema + .fields + .push(Field::new("__placeholder", DataType::Utf8, false)); + schema +} + /// Get the schema for the benchmarks derived from TPC-H pub fn get_tpch_table_schema(table: &str) -> Schema { // note that the schema intentionally uses signed integers so that any generated Parquet @@ -331,7 +340,7 @@ pub async fn convert_tbl( let output_root_path = Path::new(output_path); for table in TPCH_TABLES { let start = Instant::now(); - let schema = get_tpch_table_schema(table); + let schema = get_tbl_tpch_table_schema(table); let input_path = format!("{input_path}/{table}.tbl"); let options = CsvReadOptions::new() @@ -346,6 +355,16 @@ pub async fn convert_tbl( // build plan to read the TBL file let mut csv = ctx.read_csv(&input_path, options).await?; + // Select all apart from the padding column + let selection = csv + .schema() + .fields() + .iter() + .take(schema.fields.len() - 1) + .map(|d| Expr::Column(d.qualified_column())) + .collect(); + + csv = csv.select(selection)?; // optionally, repartition the file if partitions > 1 { csv = csv.repartition(Partitioning::RoundRobinBatch(partitions))? From e646bfe9aff31587c4eca50a523a60548b8d35f8 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 4 Jan 2023 16:31:31 +0000 Subject: [PATCH 2/3] Clippy --- benchmarks/src/bin/tpch.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index d1bc62d804d9..a4e144d28dec 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -17,7 +17,6 @@ //! Benchmark derived from TPC-H. This is not an official TPC-H benchmark. -use arrow::datatypes::{DataType, Field}; use std::{ fs::File, io::Write, From 7124770ba3271b6b8437de4c52722941483601f4 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 4 Jan 2023 18:16:31 +0000 Subject: [PATCH 3/3] Fix benchmarks --- benchmarks/src/bin/tpch.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index a4e144d28dec..63eb9127ece8 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -827,6 +827,7 @@ mod tests { #[cfg(feature = "ci")] mod ci { use super::*; + use arrow::datatypes::{DataType, Field}; use datafusion_proto::bytes::{logical_plan_from_bytes, logical_plan_to_bytes}; async fn serde_round_trip(query: usize) -> Result<()> { @@ -1213,7 +1214,8 @@ mod ci { } fn get_tpch_data_path() -> Result { - let path = std::env::var("TPCH_DATA").unwrap_or("benchmarks/data".to_string()); + let path = + std::env::var("TPCH_DATA").unwrap_or_else(|_| "benchmarks/data".to_string()); if !Path::new(&path).exists() { return Err(DataFusionError::Execution(format!( "Benchmark data not found (set TPCH_DATA env var to override): {}",