diff --git a/.dockerignore b/.dockerignore index 9a64a123f735..8cd6a89645c3 100644 --- a/.dockerignore +++ b/.dockerignore @@ -23,4 +23,6 @@ ci dev +testing +parquet-testing **/target/* diff --git a/Cargo.toml b/Cargo.toml index 0947beadac0d..2f34babdb247 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,6 @@ members = [ "datafusion", "datafusion-examples", "benchmarks", - "ballista/rust/benchmarks/tpch", "ballista/rust/client", "ballista/rust/core", "ballista/rust/executor", diff --git a/ballista/rust/benchmarks/tpch/Cargo.toml b/ballista/rust/benchmarks/tpch/Cargo.toml deleted file mode 100644 index 9311f23ad886..000000000000 --- a/ballista/rust/benchmarks/tpch/Cargo.toml +++ /dev/null @@ -1,36 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -[package] -name = "tpch" -version = "0.5.0-SNAPSHOT" -homepage = "https://github.com/apache/arrow" -repository = "https://github.com/apache/arrow" -authors = ["Apache Arrow "] -license = "Apache-2.0" -edition = "2018" - -[dependencies] -ballista = { path="../../client" } -datafusion = { path = "../../../../datafusion" } - -arrow = { git = "https://github.com/apache/arrow-rs", rev = "c3fe3bab9905739fdda75301dab07a18c91731bd" } -parquet = { git = "https://github.com/apache/arrow-rs", rev = "c3fe3bab9905739fdda75301dab07a18c91731bd" } - -env_logger = "0.8" -tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] } -structopt = "0.3" diff --git a/ballista/rust/benchmarks/tpch/README.md b/ballista/rust/benchmarks/tpch/README.md deleted file mode 100644 index 20c4fc71de35..000000000000 --- a/ballista/rust/benchmarks/tpch/README.md +++ /dev/null @@ -1,104 +0,0 @@ - - -# TPC-H Benchmarks - -TPC-H is an industry standard benchmark for testing databases and query engines. A command-line tool is available that -can generate the raw test data at any given scale factor (scale factor refers to the amount of data to be generated). - -## Generating Test Data - -TPC-H data can be generated using the `tpch-gen.sh` script, which creates a Docker image containing the TPC-DS data -generator. - -```bash -./tpch-gen.sh -``` - -Data will be generated into the `data` subdirectory and will not be checked in because this directory has been added -to the `.gitignore` file. - -## Running the Benchmarks - -To run the benchmarks it is necessary to have at least one Ballista scheduler and one Ballista executor running. - -To run the scheduler from source: - -```bash -cd $ARROW_HOME/ballista/rust/scheduler -RUST_LOG=info cargo run --release -``` - -By default the scheduler will bind to `0.0.0.0` and listen on port 50050. - -To run the executor from source: - -```bash -cd $ARROW_HOME/ballista/rust/executor -RUST_LOG=info cargo run --release -``` - -By default the executor will bind to `0.0.0.0` and listen on port 50051. - -You can add SIMD/snmalloc/LTO flags to improve speed (with longer build times): - -``` -RUST_LOG=info RUSTFLAGS='-C target-cpu=native -C lto -C codegen-units=1 -C embed-bitcode' cargo run --release --bin executor --features "simd snmalloc" --target x86_64-unknown-linux-gnu -``` - -To run the benchmarks: - -```bash -cd $ARROW_HOME/ballista/rust/benchmarks/tpch -cargo run --release benchmark --host localhost --port 50050 --query 1 --path $(pwd)/data --format tbl -``` - -## Running the Benchmarks on docker-compose - -To start a Rust scheduler and executor using Docker Compose: - -```bash -cd $BALLISTA_HOME -./dev/build-rust.sh -cd $BALLISTA_HOME/rust/benchmarks/tpch -docker-compose up -``` - -Then you can run the benchmark with: - -```bash -docker-compose run ballista-client cargo run benchmark --host ballista-scheduler --port 50050 --query 1 --path /data --format tbl -``` - -## Expected output - -The result of query 1 should produce the following output when executed against the SF=1 dataset. - -``` -+--------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+-------------+ -| l_returnflag | l_linestatus | sum_qty | sum_base_price | sum_disc_price | sum_charge | avg_qty | avg_price | avg_disc | count_order | -+--------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+-------------+ -| A | F | 37734107 | 56586554400.73001 | 53758257134.870026 | 55909065222.82768 | 25.522005853257337 | 38273.12973462168 | 0.049985295838396455 | 1478493 | -| N | F | 991417 | 1487504710.3799996 | 1413082168.0541 | 1469649223.1943746 | 25.516471920522985 | 38284.467760848296 | 0.05009342667421622 | 38854 | -| N | O | 74476023 | 111701708529.50996 | 106118209986.10472 | 110367023144.56622 | 25.502229680934594 | 38249.1238377803 | 0.049996589476752576 | 2920373 | -| R | F | 37719753 | 56568041380.90001 | 53741292684.60399 | 55889619119.83194 | 25.50579361269077 | 38250.854626099666 | 0.05000940583012587 | 1478870 | -+--------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+-------------+ -Query 1 iteration 0 took 1956.1 ms -Query 1 avg time: 1956.11 ms -``` diff --git a/ballista/rust/benchmarks/tpch/src/main.rs b/ballista/rust/benchmarks/tpch/src/main.rs deleted file mode 100644 index 1ba46ea1826a..000000000000 --- a/ballista/rust/benchmarks/tpch/src/main.rs +++ /dev/null @@ -1,360 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Benchmark derived from TPC-H. This is not an official TPC-H benchmark. -//! -//! This is a modified version of the DataFusion version of these benchmarks. - -use std::collections::HashMap; -use std::fs; -use std::path::{Path, PathBuf}; -use std::time::Instant; - -use arrow::datatypes::{DataType, Field, Schema}; -use arrow::util::pretty; -use ballista::prelude::*; -use datafusion::prelude::*; -use parquet::basic::Compression; -use parquet::file::properties::WriterProperties; -use structopt::StructOpt; - -#[derive(Debug, StructOpt)] -struct BenchmarkOpt { - /// Ballista executor host - #[structopt(long = "host")] - host: String, - - /// Ballista executor port - #[structopt(long = "port")] - port: u16, - - /// Query number - #[structopt(long)] - query: usize, - - /// Activate debug mode to see query results - #[structopt(long)] - debug: bool, - - /// Number of iterations of each test run - #[structopt(long = "iterations", default_value = "1")] - iterations: usize, - - /// Batch size when reading CSV or Parquet files - #[structopt(long = "batch-size", default_value = "32768")] - batch_size: usize, - - /// Path to data files - #[structopt(parse(from_os_str), required = true, long = "path")] - path: PathBuf, - - /// File format: `csv`, `tbl` or `parquet` - #[structopt(long = "format")] - file_format: String, -} - -#[derive(Debug, StructOpt)] -struct ConvertOpt { - /// Path to csv files - #[structopt(parse(from_os_str), required = true, short = "i", long = "input")] - input_path: PathBuf, - - /// Output path - #[structopt(parse(from_os_str), required = true, short = "o", long = "output")] - output_path: PathBuf, - - /// Output file format: `csv` or `parquet` - #[structopt(short = "f", long = "format")] - file_format: String, - - /// Compression to use when writing Parquet files - #[structopt(short = "c", long = "compression", default_value = "snappy")] - compression: String, - - /// Number of partitions to produce - #[structopt(short = "p", long = "partitions", default_value = "1")] - partitions: usize, - - /// Batch size when reading CSV or Parquet files - #[structopt(short = "s", long = "batch-size", default_value = "4096")] - batch_size: usize, -} - -#[derive(Debug, StructOpt)] -#[structopt(name = "TPC-H", about = "TPC-H Benchmarks.")] -enum TpchOpt { - Benchmark(BenchmarkOpt), - Convert(ConvertOpt), -} - -const TABLES: &[&str] = &[ - "part", "supplier", "partsupp", "customer", "orders", "lineitem", "nation", "region", -]; - -#[tokio::main] -async fn main() -> Result<()> { - env_logger::init(); - match TpchOpt::from_args() { - TpchOpt::Benchmark(opt) => benchmark(opt).await.map(|_| ()), - TpchOpt::Convert(opt) => convert_tbl(opt).await, - } -} - -async fn benchmark(opt: BenchmarkOpt) -> Result<()> { - println!("Running benchmarks with the following options: {:?}", opt); - - let mut settings = HashMap::new(); - settings.insert("batch.size".to_owned(), format!("{}", opt.batch_size)); - - let ctx = BallistaContext::remote(opt.host.as_str(), opt.port, settings); - - // register tables with Ballista context - let path = opt.path.to_str().unwrap(); - let file_format = opt.file_format.as_str(); - for table in TABLES { - match file_format { - // dbgen creates .tbl ('|' delimited) files without header - "tbl" => { - let path = format!("{}/{}.tbl", path, table); - let schema = get_schema(table); - let options = CsvReadOptions::new() - .schema(&schema) - .delimiter(b'|') - .has_header(false) - .file_extension(".tbl"); - ctx.register_csv(table, &path, options)?; - } - "csv" => { - let path = format!("{}/{}", path, table); - let schema = get_schema(table); - let options = CsvReadOptions::new().schema(&schema).has_header(true); - ctx.register_csv(table, &path, options)?; - } - "parquet" => { - let path = format!("{}/{}", path, table); - ctx.register_parquet(table, &path)?; - } - other => { - unimplemented!("Invalid file format '{}'", other); - } - } - } - - let mut millis = vec![]; - - // run benchmark - let sql = get_query_sql(opt.query)?; - println!("Running benchmark with query {}:\n {}", opt.query, sql); - for i in 0..opt.iterations { - let start = Instant::now(); - let df = ctx.sql(&sql)?; - let mut batches = vec![]; - let mut stream = df.collect().await?; - while let Some(result) = stream.next().await { - let batch = result?; - batches.push(batch); - } - let elapsed = start.elapsed().as_secs_f64() * 1000.0; - millis.push(elapsed as f64); - println!("Query {} iteration {} took {:.1} ms", opt.query, i, elapsed); - if opt.debug { - pretty::print_batches(&batches)?; - } - } - - let avg = millis.iter().sum::() / millis.len() as f64; - println!("Query {} avg time: {:.2} ms", opt.query, avg); - - Ok(()) -} - -fn get_query_sql(query: usize) -> Result { - if query > 0 && query < 23 { - let filename = format!("queries/q{}.sql", query); - Ok(fs::read_to_string(&filename).expect("failed to read query")) - } else { - Err(BallistaError::General( - "invalid query. Expected value between 1 and 22".to_owned(), - )) - } -} - -async fn convert_tbl(opt: ConvertOpt) -> Result<()> { - let output_root_path = Path::new(&opt.output_path); - for table in TABLES { - let start = Instant::now(); - let schema = get_schema(table); - - let input_path = format!("{}/{}.tbl", opt.input_path.to_str().unwrap(), table); - let options = CsvReadOptions::new() - .schema(&schema) - .delimiter(b'|') - .file_extension(".tbl"); - - let config = ExecutionConfig::new().with_batch_size(opt.batch_size); - let mut ctx = ExecutionContext::with_config(config); - - // build plan to read the TBL file - let mut csv = ctx.read_csv(&input_path, options)?; - - // optionally, repartition the file - if opt.partitions > 1 { - csv = csv.repartition(Partitioning::RoundRobinBatch(opt.partitions))? - } - - // create the physical plan - let csv = csv.to_logical_plan(); - let csv = ctx.optimize(&csv)?; - let csv = ctx.create_physical_plan(&csv)?; - - let output_path = output_root_path.join(table); - let output_path = output_path.to_str().unwrap().to_owned(); - - println!( - "Converting '{}' to {} files in directory '{}'", - &input_path, &opt.file_format, &output_path - ); - match opt.file_format.as_str() { - "csv" => ctx.write_csv(csv, output_path).await?, - "parquet" => { - let compression = match opt.compression.as_str() { - "none" => Compression::UNCOMPRESSED, - "snappy" => Compression::SNAPPY, - "brotli" => Compression::BROTLI, - "gzip" => Compression::GZIP, - "lz4" => Compression::LZ4, - "lz0" => Compression::LZO, - "zstd" => Compression::ZSTD, - other => { - return Err(BallistaError::NotImplemented(format!( - "Invalid compression format: {}", - other - ))) - } - }; - let props = WriterProperties::builder() - .set_compression(compression) - .build(); - ctx.write_parquet(csv, output_path, Some(props)).await? - } - other => { - return Err(BallistaError::NotImplemented(format!( - "Invalid output format: {}", - other - ))) - } - } - println!("Conversion completed in {} ms", start.elapsed().as_millis()); - } - - Ok(()) -} - -fn get_schema(table: &str) -> Schema { - // note that the schema intentionally uses signed integers so that any generated Parquet - // files can also be used to benchmark tools that only support signed integers, such as - // Apache Spark - - match table { - "part" => Schema::new(vec![ - Field::new("p_partkey", DataType::Int32, false), - Field::new("p_name", DataType::Utf8, false), - Field::new("p_mfgr", DataType::Utf8, false), - Field::new("p_brand", DataType::Utf8, false), - Field::new("p_type", DataType::Utf8, false), - Field::new("p_size", DataType::Int32, false), - Field::new("p_container", DataType::Utf8, false), - Field::new("p_retailprice", DataType::Float64, false), - Field::new("p_comment", DataType::Utf8, false), - ]), - - "supplier" => Schema::new(vec![ - Field::new("s_suppkey", DataType::Int32, false), - Field::new("s_name", DataType::Utf8, false), - Field::new("s_address", DataType::Utf8, false), - Field::new("s_nationkey", DataType::Int32, false), - Field::new("s_phone", DataType::Utf8, false), - Field::new("s_acctbal", DataType::Float64, false), - Field::new("s_comment", DataType::Utf8, false), - ]), - - "partsupp" => Schema::new(vec![ - Field::new("ps_partkey", DataType::Int32, false), - Field::new("ps_suppkey", DataType::Int32, false), - Field::new("ps_availqty", DataType::Int32, false), - Field::new("ps_supplycost", DataType::Float64, false), - Field::new("ps_comment", DataType::Utf8, false), - ]), - - "customer" => Schema::new(vec![ - Field::new("c_custkey", DataType::Int32, false), - Field::new("c_name", DataType::Utf8, false), - Field::new("c_address", DataType::Utf8, false), - Field::new("c_nationkey", DataType::Int32, false), - Field::new("c_phone", DataType::Utf8, false), - Field::new("c_acctbal", DataType::Float64, false), - Field::new("c_mktsegment", DataType::Utf8, false), - Field::new("c_comment", DataType::Utf8, false), - ]), - - "orders" => Schema::new(vec![ - Field::new("o_orderkey", DataType::Int32, false), - Field::new("o_custkey", DataType::Int32, false), - Field::new("o_orderstatus", DataType::Utf8, false), - Field::new("o_totalprice", DataType::Float64, false), - Field::new("o_orderdate", DataType::Date32, false), - Field::new("o_orderpriority", DataType::Utf8, false), - Field::new("o_clerk", DataType::Utf8, false), - Field::new("o_shippriority", DataType::Int32, false), - Field::new("o_comment", DataType::Utf8, false), - ]), - - "lineitem" => Schema::new(vec![ - Field::new("l_orderkey", DataType::Int32, false), - Field::new("l_partkey", DataType::Int32, false), - Field::new("l_suppkey", DataType::Int32, false), - Field::new("l_linenumber", DataType::Int32, false), - Field::new("l_quantity", DataType::Float64, false), - Field::new("l_extendedprice", DataType::Float64, false), - Field::new("l_discount", DataType::Float64, false), - Field::new("l_tax", DataType::Float64, false), - Field::new("l_returnflag", DataType::Utf8, false), - Field::new("l_linestatus", DataType::Utf8, false), - Field::new("l_shipdate", DataType::Date32, false), - Field::new("l_commitdate", DataType::Date32, false), - Field::new("l_receiptdate", DataType::Date32, false), - Field::new("l_shipinstruct", DataType::Utf8, false), - Field::new("l_shipmode", DataType::Utf8, false), - Field::new("l_comment", DataType::Utf8, false), - ]), - - "nation" => Schema::new(vec![ - Field::new("n_nationkey", DataType::Int32, false), - Field::new("n_name", DataType::Utf8, false), - Field::new("n_regionkey", DataType::Int32, false), - Field::new("n_comment", DataType::Utf8, false), - ]), - - "region" => Schema::new(vec![ - Field::new("r_regionkey", DataType::Int32, false), - Field::new("r_name", DataType::Utf8, false), - Field::new("r_comment", DataType::Utf8, false), - ]), - - _ => unimplemented!(), - } -} diff --git a/ballista/rust/benchmarks/tpch/.dockerignore b/benchmarks/.dockerignore similarity index 100% rename from ballista/rust/benchmarks/tpch/.dockerignore rename to benchmarks/.dockerignore diff --git a/ballista/rust/benchmarks/tpch/.gitignore b/benchmarks/.gitignore similarity index 100% rename from ballista/rust/benchmarks/tpch/.gitignore rename to benchmarks/.gitignore diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 6eb6ab9f89d6..35622661eaaa 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -34,6 +34,7 @@ snmalloc = ["snmalloc-rs"] arrow = { git = "https://github.com/apache/arrow-rs", rev = "c3fe3bab9905739fdda75301dab07a18c91731bd" } parquet = { git = "https://github.com/apache/arrow-rs", rev = "c3fe3bab9905739fdda75301dab07a18c91731bd" } datafusion = { path = "../datafusion" } +ballista = { path = "../ballista/rust/client" } structopt = { version = "0.3", default-features = false } tokio = { version = "^1.0", features = ["macros", "rt", "rt-multi-thread"] } futures = "0.3" diff --git a/benchmarks/README.md b/benchmarks/README.md index 7460477db4e9..e003d9687c9c 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -17,53 +17,47 @@ under the License. --> -# Apache Arrow Rust Benchmarks +# DataFusion and Ballista Benchmarks This crate contains benchmarks based on popular public data sets and open source benchmark suites, making it easy to run real-world benchmarks to help with performance and scalability testing and for comparing performance with other Arrow implementations as well as other query engines. -Currently, only DataFusion benchmarks exist, but the plan is to add benchmarks for the arrow, flight, and parquet -crates as well. - ## Benchmark derived from TPC-H These benchmarks are derived from the [TPC-H][1] benchmark. -Data for this benchmark can be generated using the [tpch-dbgen][2] command-line tool. Run the following commands to -clone the repository and build the source code. +## Generating Test Data + +TPC-H data can be generated using the `tpch-gen.sh` script, which creates a Docker image containing the TPC-DS data +generator. ```bash -git clone git@github.com:databricks/tpch-dbgen.git -cd tpch-dbgen -make -export TPCH_DATA=$(pwd) +./tpch-gen.sh ``` -Data can now be generated with the following command. Note that `-s 1` means use Scale Factor 1 or ~1 GB of -data. This value can be increased to generate larger data sets. +Data will be generated into the `data` subdirectory and will not be checked in because this directory has been added +to the `.gitignore` file. -```bash -./dbgen -vf -s 1 -``` +## Running the DataFusion Benchmarks -The benchmark can then be run (assuming the data created from `dbgen` is in `/mnt/tpch-dbgen`) with a command such as: +The benchmark can then be run (assuming the data created from `dbgen` is in `./data`) with a command such as: ```bash -cargo run --release --bin tpch -- benchmark --iterations 3 --path /mnt/tpch-dbgen --format tbl --query 1 --batch-size 4096 +cargo run --release --bin tpch -- benchmark --iterations 3 --path ./data --format tbl --query 1 --batch-size 4096 ``` You can enable the features `simd` (to use SIMD instructions) and/or `mimalloc` or `snmalloc` (to use either the mimalloc or snmalloc allocator) as features by passing them in as `--features`: ``` -cargo run --release --features "simd mimalloc" --bin tpch -- benchmark --iterations 3 --path /mnt/tpch-dbgen --format tbl --query 1 --batch-size 4096 +cargo run --release --features "simd mimalloc" --bin tpch -- benchmark --iterations 3 --path ./data --format tbl --query 1 --batch-size 4096 ``` The benchmark program also supports CSV and Parquet input file formats and a utility is provided to convert from `tbl` (generated by the `dbgen` utility) to CSV and Parquet. ```bash -cargo run --release --bin tpch -- convert --input /mnt/tpch-dbgen --output /mnt/tpch-parquet --format parquet +cargo run --release --bin tpch -- convert --input ./data --output /mnt/tpch-parquet --format parquet ``` This utility does not yet provide support for changing the number of partitions when performing the conversion. Another @@ -97,9 +91,78 @@ docker run -v /mnt:/mnt -it ballistacompute/spark-benchmarks:0.4.0-SNAPSHOT \ --partitions 64 ``` +## Running the Ballista Benchmarks + +To run the benchmarks it is necessary to have at least one Ballista scheduler and one Ballista executor running. + +To run the scheduler from source: + +```bash +cd $ARROW_HOME/ballista/rust/scheduler +RUST_LOG=info cargo run --release +``` + +By default the scheduler will bind to `0.0.0.0` and listen on port 50050. + +To run the executor from source: + +```bash +cd $ARROW_HOME/ballista/rust/executor +RUST_LOG=info cargo run --release +``` + +By default the executor will bind to `0.0.0.0` and listen on port 50051. + +You can add SIMD/snmalloc/LTO flags to improve speed (with longer build times): + +``` +RUST_LOG=info RUSTFLAGS='-C target-cpu=native -C lto -C codegen-units=1 -C embed-bitcode' cargo run --release --bin executor --features "simd snmalloc" --target x86_64-unknown-linux-gnu +``` + +To run the benchmarks: + +```bash +cd $ARROW_HOME/ballista/rust/benchmarks/tpch +cargo run --release benchmark --host localhost --port 50050 --query 1 --path $(pwd)/data --format tbl +``` + +## Running the Ballista Benchmarks on docker-compose + +To start a Rust scheduler and executor using Docker Compose: + +```bash +cd $BALLISTA_HOME +./dev/build-rust.sh +cd $BALLISTA_HOME/rust/benchmarks/tpch +docker-compose up +``` + +Then you can run the benchmark with: + +```bash +docker-compose run ballista-client cargo run benchmark --host ballista-scheduler --port 50050 --query 1 --path /data --format tbl +``` + +## Expected output + +The result of query 1 should produce the following output when executed against the SF=1 dataset. + +``` ++--------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+-------------+ +| l_returnflag | l_linestatus | sum_qty | sum_base_price | sum_disc_price | sum_charge | avg_qty | avg_price | avg_disc | count_order | ++--------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+-------------+ +| A | F | 37734107 | 56586554400.73001 | 53758257134.870026 | 55909065222.82768 | 25.522005853257337 | 38273.12973462168 | 0.049985295838396455 | 1478493 | +| N | F | 991417 | 1487504710.3799996 | 1413082168.0541 | 1469649223.1943746 | 25.516471920522985 | 38284.467760848296 | 0.05009342667421622 | 38854 | +| N | O | 74476023 | 111701708529.50996 | 106118209986.10472 | 110367023144.56622 | 25.502229680934594 | 38249.1238377803 | 0.049996589476752576 | 2920373 | +| R | F | 37719753 | 56568041380.90001 | 53741292684.60399 | 55889619119.83194 | 25.50579361269077 | 38250.854626099666 | 0.05000940583012587 | 1478870 | ++--------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+-------------+ +Query 1 iteration 0 took 1956.1 ms +Query 1 avg time: 1956.11 ms +``` + ## NYC Taxi Benchmark -These benchmarks are based on the [New York Taxi and Limousine Commission][3] data set. +These benchmarks are based on the [New York Taxi and Limousine Commission][2] data set. ```bash cargo run --release --bin nyctaxi -- --iterations 3 --path /mnt/nyctaxi/csv --format csv --batch-size 4096 @@ -116,5 +179,4 @@ Query 'fare_amt_by_passenger' iteration 2 took 7969 ms ``` [1]: http://www.tpc.org/tpch/ -[2]: https://github.com/databricks/tpch-dbgen -[3]: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page +[2]: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page diff --git a/ballista/rust/benchmarks/tpch/docker-compose.yaml b/benchmarks/docker-compose.yaml similarity index 90% rename from ballista/rust/benchmarks/tpch/docker-compose.yaml rename to benchmarks/docker-compose.yaml index f872ce16e2d8..6015dbac2cc2 100644 --- a/ballista/rust/benchmarks/tpch/docker-compose.yaml +++ b/benchmarks/docker-compose.yaml @@ -20,7 +20,7 @@ services: image: quay.io/coreos/etcd:v3.4.9 command: "etcd -advertise-client-urls http://etcd:2379 -listen-client-urls http://0.0.0.0:2379" ballista-scheduler: - image: ballistacompute/ballista-rust:0.4.2-SNAPSHOT + image: ballistacompute/ballista-rust:0.5.0-SNAPSHOT command: "/scheduler --config-backend etcd --etcd-urls etcd:2379 --bind-host 0.0.0.0 --port 50050" environment: - RUST_LOG=ballista=debug @@ -29,7 +29,7 @@ services: depends_on: - etcd ballista-executor-1: - image: ballistacompute/ballista-rust:0.4.2-SNAPSHOT + image: ballistacompute/ballista-rust:0.5.0-SNAPSHOT command: "/executor --bind-host 0.0.0.0 --port 50051 --external-host ballista-executor-1 --scheduler-host ballista-scheduler" environment: - RUST_LOG=info @@ -38,7 +38,7 @@ services: depends_on: - ballista-scheduler ballista-executor-2: - image: ballistacompute/ballista-rust:0.4.2-SNAPSHOT + image: ballistacompute/ballista-rust:0.5.0-SNAPSHOT command: "/executor --bind-host 0.0.0.0 --port 50052 --external-host ballista-executor-2 --scheduler-host ballista-scheduler" environment: - RUST_LOG=info @@ -47,7 +47,7 @@ services: depends_on: - ballista-scheduler ballista-client: - image: ballistacompute/ballista-rust:0.4.2-SNAPSHOT + image: ballistacompute/ballista-rust:0.5.0-SNAPSHOT command: "/bin/sh" # do nothing working_dir: /ballista/benchmarks/tpch environment: diff --git a/ballista/rust/benchmarks/tpch/entrypoint.sh b/benchmarks/entrypoint.sh similarity index 100% rename from ballista/rust/benchmarks/tpch/entrypoint.sh rename to benchmarks/entrypoint.sh diff --git a/ballista/rust/benchmarks/tpch/queries/q1.sql b/benchmarks/queries/q1.sql similarity index 100% rename from ballista/rust/benchmarks/tpch/queries/q1.sql rename to benchmarks/queries/q1.sql diff --git a/ballista/rust/benchmarks/tpch/queries/q10.sql b/benchmarks/queries/q10.sql similarity index 100% rename from ballista/rust/benchmarks/tpch/queries/q10.sql rename to benchmarks/queries/q10.sql diff --git a/ballista/rust/benchmarks/tpch/queries/q11.sql b/benchmarks/queries/q11.sql similarity index 100% rename from ballista/rust/benchmarks/tpch/queries/q11.sql rename to benchmarks/queries/q11.sql diff --git a/ballista/rust/benchmarks/tpch/queries/q12.sql b/benchmarks/queries/q12.sql similarity index 100% rename from ballista/rust/benchmarks/tpch/queries/q12.sql rename to benchmarks/queries/q12.sql diff --git a/ballista/rust/benchmarks/tpch/queries/q13.sql b/benchmarks/queries/q13.sql similarity index 100% rename from ballista/rust/benchmarks/tpch/queries/q13.sql rename to benchmarks/queries/q13.sql diff --git a/ballista/rust/benchmarks/tpch/queries/q14.sql b/benchmarks/queries/q14.sql similarity index 100% rename from ballista/rust/benchmarks/tpch/queries/q14.sql rename to benchmarks/queries/q14.sql diff --git a/ballista/rust/benchmarks/tpch/queries/q16.sql b/benchmarks/queries/q16.sql similarity index 100% rename from ballista/rust/benchmarks/tpch/queries/q16.sql rename to benchmarks/queries/q16.sql diff --git a/ballista/rust/benchmarks/tpch/queries/q17.sql b/benchmarks/queries/q17.sql similarity index 100% rename from ballista/rust/benchmarks/tpch/queries/q17.sql rename to benchmarks/queries/q17.sql diff --git a/ballista/rust/benchmarks/tpch/queries/q18.sql b/benchmarks/queries/q18.sql similarity index 100% rename from ballista/rust/benchmarks/tpch/queries/q18.sql rename to benchmarks/queries/q18.sql diff --git a/ballista/rust/benchmarks/tpch/queries/q19.sql b/benchmarks/queries/q19.sql similarity index 100% rename from ballista/rust/benchmarks/tpch/queries/q19.sql rename to benchmarks/queries/q19.sql diff --git a/ballista/rust/benchmarks/tpch/queries/q2.sql b/benchmarks/queries/q2.sql similarity index 100% rename from ballista/rust/benchmarks/tpch/queries/q2.sql rename to benchmarks/queries/q2.sql diff --git a/ballista/rust/benchmarks/tpch/queries/q20.sql b/benchmarks/queries/q20.sql similarity index 100% rename from ballista/rust/benchmarks/tpch/queries/q20.sql rename to benchmarks/queries/q20.sql diff --git a/ballista/rust/benchmarks/tpch/queries/q21.sql b/benchmarks/queries/q21.sql similarity index 100% rename from ballista/rust/benchmarks/tpch/queries/q21.sql rename to benchmarks/queries/q21.sql diff --git a/ballista/rust/benchmarks/tpch/queries/q22.sql b/benchmarks/queries/q22.sql similarity index 100% rename from ballista/rust/benchmarks/tpch/queries/q22.sql rename to benchmarks/queries/q22.sql diff --git a/ballista/rust/benchmarks/tpch/queries/q3.sql b/benchmarks/queries/q3.sql similarity index 100% rename from ballista/rust/benchmarks/tpch/queries/q3.sql rename to benchmarks/queries/q3.sql diff --git a/ballista/rust/benchmarks/tpch/queries/q4.sql b/benchmarks/queries/q4.sql similarity index 100% rename from ballista/rust/benchmarks/tpch/queries/q4.sql rename to benchmarks/queries/q4.sql diff --git a/ballista/rust/benchmarks/tpch/queries/q5.sql b/benchmarks/queries/q5.sql similarity index 100% rename from ballista/rust/benchmarks/tpch/queries/q5.sql rename to benchmarks/queries/q5.sql diff --git a/ballista/rust/benchmarks/tpch/queries/q6.sql b/benchmarks/queries/q6.sql similarity index 100% rename from ballista/rust/benchmarks/tpch/queries/q6.sql rename to benchmarks/queries/q6.sql diff --git a/ballista/rust/benchmarks/tpch/queries/q7.sql b/benchmarks/queries/q7.sql similarity index 100% rename from ballista/rust/benchmarks/tpch/queries/q7.sql rename to benchmarks/queries/q7.sql diff --git a/ballista/rust/benchmarks/tpch/queries/q8.sql b/benchmarks/queries/q8.sql similarity index 100% rename from ballista/rust/benchmarks/tpch/queries/q8.sql rename to benchmarks/queries/q8.sql diff --git a/ballista/rust/benchmarks/tpch/queries/q9.sql b/benchmarks/queries/q9.sql similarity index 100% rename from ballista/rust/benchmarks/tpch/queries/q9.sql rename to benchmarks/queries/q9.sql diff --git a/ballista/rust/benchmarks/tpch/run.sh b/benchmarks/run.sh similarity index 99% rename from ballista/rust/benchmarks/tpch/run.sh rename to benchmarks/run.sh index c8a36b6013cd..fd97ff9a9a6a 100755 --- a/ballista/rust/benchmarks/tpch/run.sh +++ b/benchmarks/run.sh @@ -19,6 +19,7 @@ set -e # This bash script is meant to be run inside the docker-compose environment. Check the README for instructions +cd / for query in 1 3 5 6 10 12 do /tpch benchmark --host ballista-scheduler --port 50050 --query $query --path /data --format tbl --iterations 1 --debug diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index b203ceb3f741..fd9f0525987d 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -17,21 +17,26 @@ //! Benchmark derived from TPC-H. This is not an official TPC-H benchmark. -use std::time::Instant; use std::{ + collections::HashMap, + fs, + iter::Iterator, path::{Path, PathBuf}, sync::Arc, + time::Instant, }; +use futures::StreamExt; + use arrow::datatypes::{DataType, Field, Schema}; use arrow::util::pretty; +use ballista::context::BallistaContext; use datafusion::datasource::parquet::ParquetTable; use datafusion::datasource::{CsvFile, MemTable, TableProvider}; use datafusion::error::{DataFusionError, Result}; use datafusion::logical_plan::LogicalPlan; use datafusion::physical_plan::collect; use datafusion::prelude::*; - use parquet::basic::Compression; use parquet::file::properties::WriterProperties; use structopt::StructOpt; @@ -44,7 +49,7 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; #[global_allocator] static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; -#[derive(Debug, StructOpt)] +#[derive(Debug, StructOpt, Clone)] struct BenchmarkOpt { /// Query number #[structopt(short, long)] @@ -81,6 +86,14 @@ struct BenchmarkOpt { /// Number of partitions to create when using MemTable as input #[structopt(short = "n", long = "partitions", default_value = "8")] partitions: usize, + + /// Ballista executor host + #[structopt(long = "host")] + host: Option, + + /// Ballista executor port + #[structopt(long = "port")] + port: Option, } #[derive(Debug, StructOpt)] @@ -125,12 +138,20 @@ const TABLES: &[&str] = &[ async fn main() -> Result<()> { env_logger::init(); match TpchOpt::from_args() { - TpchOpt::Benchmark(opt) => benchmark(opt).await.map(|_| ()), + TpchOpt::Benchmark(opt) => { + if opt.host.is_some() && opt.port.is_some() { + benchmark_ballista(opt).await.map(|_| ()) + } else { + benchmark_datafusion(opt).await.map(|_| ()) + } + } TpchOpt::Convert(opt) => convert_tbl(opt).await, } } -async fn benchmark(opt: BenchmarkOpt) -> Result> { +async fn benchmark_datafusion( + opt: BenchmarkOpt, +) -> Result> { println!("Running benchmarks with the following options: {:?}", opt); let config = ExecutionConfig::new() .with_concurrency(opt.concurrency) @@ -181,832 +202,97 @@ async fn benchmark(opt: BenchmarkOpt) -> Result Result { - match query { - // original - // 1 => ctx.create_logical_plan( - // "select - // l_returnflag, - // l_linestatus, - // sum(l_quantity) as sum_qty, - // sum(l_extendedprice) as sum_base_price, - // sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, - // sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, - // avg(l_quantity) as avg_qty, - // avg(l_extendedprice) as avg_price, - // avg(l_discount) as avg_disc, - // count(*) as count_order - // from - // lineitem - // where - // l_shipdate <= date '1998-12-01' - interval '90' day (3) - // group by - // l_returnflag, - // l_linestatus - // order by - // l_returnflag, - // l_linestatus;" - // ), - 1 => ctx.create_logical_plan( - "select - l_returnflag, - l_linestatus, - sum(l_quantity) as sum_qty, - sum(l_extendedprice) as sum_base_price, - sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, - sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, - avg(l_quantity) as avg_qty, - avg(l_extendedprice) as avg_price, - avg(l_discount) as avg_disc, - count(*) as count_order - from - lineitem - where - l_shipdate <= date '1998-09-02' - group by - l_returnflag, - l_linestatus - order by - l_returnflag, - l_linestatus;", - ), - - 2 => ctx.create_logical_plan( - "select - s_acctbal, - s_name, - n_name, - p_partkey, - p_mfgr, - s_address, - s_phone, - s_comment - from - part, - supplier, - partsupp, - nation, - region - where - p_partkey = ps_partkey - and s_suppkey = ps_suppkey - and p_size = 15 - and p_type like '%BRASS' - and s_nationkey = n_nationkey - and n_regionkey = r_regionkey - and r_name = 'EUROPE' - and ps_supplycost = ( - select - min(ps_supplycost) - from - partsupp, - supplier, - nation, - region - where - p_partkey = ps_partkey - and s_suppkey = ps_suppkey - and s_nationkey = n_nationkey - and n_regionkey = r_regionkey - and r_name = 'EUROPE' - ) - order by - s_acctbal desc, - n_name, - s_name, - p_partkey;", - ), - - 3 => ctx.create_logical_plan( - "select - l_orderkey, - sum(l_extendedprice * (1 - l_discount)) as revenue, - o_orderdate, - o_shippriority - from - customer, - orders, - lineitem - where - c_mktsegment = 'BUILDING' - and c_custkey = o_custkey - and l_orderkey = o_orderkey - and o_orderdate < date '1995-03-15' - and l_shipdate > date '1995-03-15' - group by - l_orderkey, - o_orderdate, - o_shippriority - order by - revenue desc, - o_orderdate;", - ), - - 4 => ctx.create_logical_plan( - "select - o_orderpriority, - count(*) as order_count - from - orders - where - o_orderdate >= '1993-07-01' - and o_orderdate < date '1993-07-01' + interval '3' month - and exists ( - select - * - from - lineitem - where - l_orderkey = o_orderkey - and l_commitdate < l_receiptdate - ) - group by - o_orderpriority - order by - o_orderpriority;", - ), - - // original - // 5 => ctx.create_logical_plan( - // "select - // n_name, - // sum(l_extendedprice * (1 - l_discount)) as revenue - // from - // customer, - // orders, - // lineitem, - // supplier, - // nation, - // region - // where - // c_custkey = o_custkey - // and l_orderkey = o_orderkey - // and l_suppkey = s_suppkey - // and c_nationkey = s_nationkey - // and s_nationkey = n_nationkey - // and n_regionkey = r_regionkey - // and r_name = 'ASIA' - // and o_orderdate >= date '1994-01-01' - // and o_orderdate < date '1994-01-01' + interval '1' year - // group by - // n_name - // order by - // revenue desc;" - // ), - 5 => ctx.create_logical_plan( - "select - n_name, - sum(l_extendedprice * (1 - l_discount)) as revenue - from - customer, - orders, - lineitem, - supplier, - nation, - region - where - c_custkey = o_custkey - and l_orderkey = o_orderkey - and l_suppkey = s_suppkey - and c_nationkey = s_nationkey - and s_nationkey = n_nationkey - and n_regionkey = r_regionkey - and r_name = 'ASIA' - and o_orderdate >= date '1994-01-01' - and o_orderdate < date '1995-01-01' - group by - n_name - order by - revenue desc;", - ), - - // original - // 6 => ctx.create_logical_plan( - // "select - // sum(l_extendedprice * l_discount) as revenue - // from - // lineitem - // where - // l_shipdate >= date '1994-01-01' - // and l_shipdate < date '1994-01-01' + interval '1' year - // and l_discount between .06 - 0.01 and .06 + 0.01 - // and l_quantity < 24;" - // ), - 6 => ctx.create_logical_plan( - "select - sum(l_extendedprice * l_discount) as revenue - from - lineitem - where - l_shipdate >= date '1994-01-01' - and l_shipdate < date '1995-01-01' - and l_discount between .06 - 0.01 and .06 + 0.01 - and l_quantity < 24;", - ), - - 7 => ctx.create_logical_plan( - "select - supp_nation, - cust_nation, - l_year, - sum(volume) as revenue - from - ( - select - n1.n_name as supp_nation, - n2.n_name as cust_nation, - extract(year from l_shipdate) as l_year, - l_extendedprice * (1 - l_discount) as volume - from - supplier, - lineitem, - orders, - customer, - nation n1, - nation n2 - where - s_suppkey = l_suppkey - and o_orderkey = l_orderkey - and c_custkey = o_custkey - and s_nationkey = n1.n_nationkey - and c_nationkey = n2.n_nationkey - and ( - (n1.n_name = 'FRANCE' and n2.n_name = 'GERMANY') - or (n1.n_name = 'GERMANY' and n2.n_name = 'FRANCE') - ) - and l_shipdate between date '1995-01-01' and date '1996-12-31' - ) as shipping - group by - supp_nation, - cust_nation, - l_year - order by - supp_nation, - cust_nation, - l_year;", - ), - - 8 => ctx.create_logical_plan( - "select - o_year, - sum(case - when nation = 'BRAZIL' then volume - else 0 - end) / sum(volume) as mkt_share - from - ( - select - extract(year from o_orderdate) as o_year, - l_extendedprice * (1 - l_discount) as volume, - n2.n_name as nation - from - part, - supplier, - lineitem, - orders, - customer, - nation n1, - nation n2, - region - where - p_partkey = l_partkey - and s_suppkey = l_suppkey - and l_orderkey = o_orderkey - and o_custkey = c_custkey - and c_nationkey = n1.n_nationkey - and n1.n_regionkey = r_regionkey - and r_name = 'AMERICA' - and s_nationkey = n2.n_nationkey - and o_orderdate between date '1995-01-01' and date '1996-12-31' - and p_type = 'ECONOMY ANODIZED STEEL' - ) as all_nations - group by - o_year - order by - o_year;", - ), - - 9 => ctx.create_logical_plan( - "select - nation, - o_year, - sum(amount) as sum_profit - from - ( - select - n_name as nation, - extract(year from o_orderdate) as o_year, - l_extendedprice * (1 - l_discount) - ps_supplycost * l_quantity as amount - from - part, - supplier, - lineitem, - partsupp, - orders, - nation - where - s_suppkey = l_suppkey - and ps_suppkey = l_suppkey - and ps_partkey = l_partkey - and p_partkey = l_partkey - and o_orderkey = l_orderkey - and s_nationkey = n_nationkey - and p_name like '%green%' - ) as profit - group by - nation, - o_year - order by - nation, - o_year desc;", - ), - - // 10 => ctx.create_logical_plan( - // "select - // c_custkey, - // c_name, - // sum(l_extendedprice * (1 - l_discount)) as revenue, - // c_acctbal, - // n_name, - // c_address, - // c_phone, - // c_comment - // from - // customer, - // orders, - // lineitem, - // nation - // where - // c_custkey = o_custkey - // and l_orderkey = o_orderkey - // and o_orderdate >= date '1993-10-01' - // and o_orderdate < date '1993-10-01' + interval '3' month - // and l_returnflag = 'R' - // and c_nationkey = n_nationkey - // group by - // c_custkey, - // c_name, - // c_acctbal, - // c_phone, - // n_name, - // c_address, - // c_comment - // order by - // revenue desc;" - // ), - 10 => ctx.create_logical_plan( - "select - c_custkey, - c_name, - sum(l_extendedprice * (1 - l_discount)) as revenue, - c_acctbal, - n_name, - c_address, - c_phone, - c_comment - from - customer, - orders, - lineitem, - nation - where - c_custkey = o_custkey - and l_orderkey = o_orderkey - and o_orderdate >= date '1993-10-01' - and o_orderdate < date '1994-01-01' - and l_returnflag = 'R' - and c_nationkey = n_nationkey - group by - c_custkey, - c_name, - c_acctbal, - c_phone, - n_name, - c_address, - c_comment - order by - revenue desc;", - ), - - 11 => ctx.create_logical_plan( - "select - ps_partkey, - sum(ps_supplycost * ps_availqty) as value - from - partsupp, - supplier, - nation - where - ps_suppkey = s_suppkey - and s_nationkey = n_nationkey - and n_name = 'GERMANY' - group by - ps_partkey having - sum(ps_supplycost * ps_availqty) > ( - select - sum(ps_supplycost * ps_availqty) * 0.0001 - from - partsupp, - supplier, - nation - where - ps_suppkey = s_suppkey - and s_nationkey = n_nationkey - and n_name = 'GERMANY' - ) - order by - value desc;", - ), - - // original - // 12 => ctx.create_logical_plan( - // "select - // l_shipmode, - // sum(case - // when o_orderpriority = '1-URGENT' - // or o_orderpriority = '2-HIGH' - // then 1 - // else 0 - // end) as high_line_count, - // sum(case - // when o_orderpriority <> '1-URGENT' - // and o_orderpriority <> '2-HIGH' - // then 1 - // else 0 - // end) as low_line_count - // from - // orders, - // lineitem - // where - // o_orderkey = l_orderkey - // and l_shipmode in ('MAIL', 'SHIP') - // and l_commitdate < l_receiptdate - // and l_shipdate < l_commitdate - // and l_receiptdate >= date '1994-01-01' - // and l_receiptdate < date '1994-01-01' + interval '1' year - // group by - // l_shipmode - // order by - // l_shipmode;" - // ), - 12 => ctx.create_logical_plan( - "select - l_shipmode, - sum(case - when o_orderpriority = '1-URGENT' - or o_orderpriority = '2-HIGH' - then 1 - else 0 - end) as high_line_count, - sum(case - when o_orderpriority <> '1-URGENT' - and o_orderpriority <> '2-HIGH' - then 1 - else 0 - end) as low_line_count - from - lineitem - join - orders - on - l_orderkey = o_orderkey - where - l_shipmode in ('MAIL', 'SHIP') - and l_commitdate < l_receiptdate - and l_shipdate < l_commitdate - and l_receiptdate >= date '1994-01-01' - and l_receiptdate < date '1995-01-01' - group by - l_shipmode - order by - l_shipmode;", - ), - - 13 => ctx.create_logical_plan( - "select - c_count, - count(*) as custdist - from - ( - select - c_custkey, - count(o_orderkey) - from - customer left outer join orders on - c_custkey = o_custkey - and o_comment not like '%special%requests%' - group by - c_custkey - ) as c_orders (c_custkey, c_count) - group by - c_count - order by - custdist desc, - c_count desc;", - ), - - 14 => ctx.create_logical_plan( - "select - 100.00 * sum(case - when p_type like 'PROMO%' - then l_extendedprice * (1 - l_discount) - else 0 - end) / sum(l_extendedprice * (1 - l_discount)) as promo_revenue - from - lineitem, - part - where - l_partkey = p_partkey - and l_shipdate >= date '1995-09-01' - and l_shipdate < date '1995-10-01';", - ), - - 15 => ctx.create_logical_plan( - "create view revenue0 (supplier_no, total_revenue) as - select - l_suppkey, - sum(l_extendedprice * (1 - l_discount)) - from - lineitem - where - l_shipdate >= date '1996-01-01' - and l_shipdate < date '1996-01-01' + interval '3' month - group by - l_suppkey; - - select - s_suppkey, - s_name, - s_address, - s_phone, - total_revenue - from - supplier, - revenue0 - where - s_suppkey = supplier_no - and total_revenue = ( - select - max(total_revenue) - from - revenue0 - ) - order by - s_suppkey; - - drop view revenue0;", - ), - - 16 => ctx.create_logical_plan( - "select - p_brand, - p_type, - p_size, - count(distinct ps_suppkey) as supplier_cnt - from - partsupp, - part - where - p_partkey = ps_partkey - and p_brand <> 'Brand#45' - and p_type not like 'MEDIUM POLISHED%' - and p_size in (49, 14, 23, 45, 19, 3, 36, 9) - and ps_suppkey not in ( - select - s_suppkey - from - supplier - where - s_comment like '%Customer%Complaints%' - ) - group by - p_brand, - p_type, - p_size - order by - supplier_cnt desc, - p_brand, - p_type, - p_size;", - ), - - 17 => ctx.create_logical_plan( - "select - sum(l_extendedprice) / 7.0 as avg_yearly - from - lineitem, - part - where - p_partkey = l_partkey - and p_brand = 'Brand#23' - and p_container = 'MED BOX' - and l_quantity < ( - select - 0.2 * avg(l_quantity) - from - lineitem - where - l_partkey = p_partkey - );", - ), - - 18 => ctx.create_logical_plan( - "select - c_name, - c_custkey, - o_orderkey, - o_orderdate, - o_totalprice, - sum(l_quantity) - from - customer, - orders, - lineitem - where - o_orderkey in ( - select - l_orderkey - from - lineitem - group by - l_orderkey having - sum(l_quantity) > 300 - ) - and c_custkey = o_custkey - and o_orderkey = l_orderkey - group by - c_name, - c_custkey, - o_orderkey, - o_orderdate, - o_totalprice - order by - o_totalprice desc, - o_orderdate;", - ), - - 19 => ctx.create_logical_plan( - "select - sum(l_extendedprice* (1 - l_discount)) as revenue - from - lineitem, - part - where - ( - p_partkey = l_partkey - and p_brand = 'Brand#12' - and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG') - and l_quantity >= 1 and l_quantity <= 1 + 10 - and p_size between 1 and 5 - and l_shipmode in ('AIR', 'AIR REG') - and l_shipinstruct = 'DELIVER IN PERSON' - ) - or - ( - p_partkey = l_partkey - and p_brand = 'Brand#23' - and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK') - and l_quantity >= 10 and l_quantity <= 10 + 10 - and p_size between 1 and 10 - and l_shipmode in ('AIR', 'AIR REG') - and l_shipinstruct = 'DELIVER IN PERSON' - ) - or - ( - p_partkey = l_partkey - and p_brand = 'Brand#34' - and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG') - and l_quantity >= 20 and l_quantity <= 20 + 10 - and p_size between 1 and 15 - and l_shipmode in ('AIR', 'AIR REG') - and l_shipinstruct = 'DELIVER IN PERSON' - );", - ), - - 20 => ctx.create_logical_plan( - "select - s_name, - s_address - from - supplier, - nation - where - s_suppkey in ( - select - ps_suppkey - from - partsupp - where - ps_partkey in ( - select - p_partkey - from - part - where - p_name like 'forest%' - ) - and ps_availqty > ( - select - 0.5 * sum(l_quantity) - from - lineitem - where - l_partkey = ps_partkey - and l_suppkey = ps_suppkey - and l_shipdate >= date '1994-01-01' - and l_shipdate < 'date 1994-01-01' + interval '1' year - ) - ) - and s_nationkey = n_nationkey - and n_name = 'CANADA' - order by - s_name;", - ), - - 21 => ctx.create_logical_plan( - "select - s_name, - count(*) as numwait - from - supplier, - lineitem l1, - orders, - nation - where - s_suppkey = l1.l_suppkey - and o_orderkey = l1.l_orderkey - and o_orderstatus = 'F' - and l1.l_receiptdate > l1.l_commitdate - and exists ( - select - * - from - lineitem l2 - where - l2.l_orderkey = l1.l_orderkey - and l2.l_suppkey <> l1.l_suppkey - ) - and not exists ( - select - * - from - lineitem l3 - where - l3.l_orderkey = l1.l_orderkey - and l3.l_suppkey <> l1.l_suppkey - and l3.l_receiptdate > l3.l_commitdate - ) - and s_nationkey = n_nationkey - and n_name = 'SAUDI ARABIA' - group by - s_name - order by - numwait desc, - s_name;", - ), - - 22 => ctx.create_logical_plan( - "select - cntrycode, - count(*) as numcust, - sum(c_acctbal) as totacctbal - from - ( - select - substring(c_phone from 1 for 2) as cntrycode, - c_acctbal - from - customer - where - substring(c_phone from 1 for 2) in - ('13', '31', '23', '29', '30', '18', '17') - and c_acctbal > ( - select - avg(c_acctbal) - from - customer - where - c_acctbal > 0.00 - and substring(c_phone from 1 for 2) in - ('13', '31', '23', '29', '30', '18', '17') - ) - and not exists ( - select - * - from - orders - where - o_custkey = c_custkey - ) - ) as custsale - group by - cntrycode - order by - cntrycode;", - ), - - _ => unimplemented!("invalid query. Expected value between 1 and 22"), +async fn benchmark_ballista(opt: BenchmarkOpt) -> Result<()> { + println!("Running benchmarks with the following options: {:?}", opt); + + let mut settings = HashMap::new(); + settings.insert("batch.size".to_owned(), format!("{}", opt.batch_size)); + + let ctx = + BallistaContext::remote(opt.host.unwrap().as_str(), opt.port.unwrap(), settings); + + // register tables with Ballista context + let path = opt.path.to_str().unwrap(); + let file_format = opt.file_format.as_str(); + for table in TABLES { + match file_format { + // dbgen creates .tbl ('|' delimited) files without header + "tbl" => { + let path = format!("{}/{}.tbl", path, table); + let schema = get_schema(table); + let options = CsvReadOptions::new() + .schema(&schema) + .delimiter(b'|') + .has_header(false) + .file_extension(".tbl"); + ctx.register_csv(table, &path, options) + .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?; + } + "csv" => { + let path = format!("{}/{}", path, table); + let schema = get_schema(table); + let options = CsvReadOptions::new().schema(&schema).has_header(true); + ctx.register_csv(table, &path, options) + .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?; + } + "parquet" => { + let path = format!("{}/{}", path, table); + ctx.register_parquet(table, &path) + .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?; + } + other => { + unimplemented!("Invalid file format '{}'", other); + } + } } + + let mut millis = vec![]; + + // run benchmark + let sql = get_query_sql(opt.query)?; + println!("Running benchmark with query {}:\n {}", opt.query, sql); + for i in 0..opt.iterations { + let start = Instant::now(); + let df = ctx + .sql(&sql) + .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?; + let mut batches = vec![]; + let mut stream = df + .collect() + .await + .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?; + while let Some(result) = stream.next().await { + let batch = result?; + batches.push(batch); + } + let elapsed = start.elapsed().as_secs_f64() * 1000.0; + millis.push(elapsed as f64); + println!("Query {} iteration {} took {:.1} ms", opt.query, i, elapsed); + if opt.debug { + pretty::print_batches(&batches)?; + } + } + + let avg = millis.iter().sum::() / millis.len() as f64; + println!("Query {} avg time: {:.2} ms", opt.query, avg); + + Ok(()) +} + +fn get_query_sql(query: usize) -> Result { + if query > 0 && query < 23 { + let filename = format!("queries/q{}.sql", query); + Ok(fs::read_to_string(&filename).expect("failed to read query")) + } else { + Err(DataFusionError::Plan( + "invalid query. Expected value between 1 and 22".to_owned(), + )) + } +} + +fn create_logical_plan(ctx: &mut ExecutionContext, query: usize) -> Result { + let sql = get_query_sql(query)?; + ctx.create_logical_plan(&sql) } async fn execute_query( @@ -1668,8 +954,10 @@ mod tests { file_format: "tbl".to_string(), mem_table: false, partitions: 16, + host: None, + port: None, }; - let actual = benchmark(opt).await?; + let actual = benchmark_datafusion(opt).await?; // assert schema equality without comparing nullable values assert_eq!( diff --git a/ballista/rust/benchmarks/tpch/tpch-gen.sh b/benchmarks/tpch-gen.sh similarity index 97% rename from ballista/rust/benchmarks/tpch/tpch-gen.sh rename to benchmarks/tpch-gen.sh index f5147f55f2f6..fef3480c612c 100755 --- a/ballista/rust/benchmarks/tpch/tpch-gen.sh +++ b/benchmarks/tpch-gen.sh @@ -16,7 +16,7 @@ # specific language governing permissions and limitations # under the License. -BALLISTA_VERSION=0.4.2-SNAPSHOT +BALLISTA_VERSION=0.5.0-SNAPSHOT #set -e diff --git a/ballista/rust/benchmarks/tpch/tpchgen.dockerfile b/benchmarks/tpchgen.dockerfile similarity index 100% rename from ballista/rust/benchmarks/tpch/tpchgen.dockerfile rename to benchmarks/tpchgen.dockerfile diff --git a/dev/build-rust-base.sh b/dev/build-rust-base.sh index e424909fb6f1..1bedbd880b44 100755 --- a/dev/build-rust-base.sh +++ b/dev/build-rust-base.sh @@ -16,6 +16,6 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -BALLISTA_VERSION=0.4.2-SNAPSHOT +BALLISTA_VERSION=0.5.0-SNAPSHOT set -e docker build -t ballistacompute/rust-base:$BALLISTA_VERSION -f dev/docker/rust-base.dockerfile . diff --git a/dev/build-rust.sh b/dev/build-rust.sh index d31c5241c6f1..5777d1eb253b 100755 --- a/dev/build-rust.sh +++ b/dev/build-rust.sh @@ -17,7 +17,7 @@ # specific language governing permissions and limitations # under the License. -BALLISTA_VERSION=0.4.2-SNAPSHOT +BALLISTA_VERSION=0.5.0-SNAPSHOT set -e diff --git a/dev/docker/rust.dockerfile b/dev/docker/rust.dockerfile index 6505f3c1660a..ba713b15e90c 100644 --- a/dev/docker/rust.dockerfile +++ b/dev/docker/rust.dockerfile @@ -22,7 +22,7 @@ # as a mounted directory. ARG RELEASE_FLAG=--release -FROM ballistacompute/rust-base:0.4.2-SNAPSHOT AS base +FROM ballistacompute/rust-base:0.5.0-SNAPSHOT AS base WORKDIR /tmp/ballista RUN apt-get -y install cmake RUN cargo install cargo-chef @@ -73,7 +73,7 @@ ENV RELEASE_FLAG=${RELEASE_FLAG} RUN if [ -z "$RELEASE_FLAG" ]; then mv /tmp/ballista/target/debug/tpch /tpch; else mv /tmp/ballista/target/release/tpch /tpch; fi # Copy the binary into a new container for a smaller docker image -FROM ballistacompute/rust-base:0.4.0-20210213 +FROM ballistacompute/rust-base:0.5.0-SNAPSHOT COPY --from=builder /executor / @@ -81,6 +81,10 @@ COPY --from=builder /scheduler / COPY --from=builder /tpch / +ADD benchmarks/run.sh / +RUN mkdir /queries +COPY benchmarks/queries/ /queries/ + ENV RUST_LOG=info ENV RUST_BACKTRACE=full diff --git a/dev/integration-tests.sh b/dev/integration-tests.sh index 6ed764ecda8a..06ab108c2931 100755 --- a/dev/integration-tests.sh +++ b/dev/integration-tests.sh @@ -19,11 +19,11 @@ set -e ./dev/build-rust-base.sh ./dev/build-rust.sh -pushd ballista/rust/benchmarks/tpch +pushd benchmarks ./tpch-gen.sh docker-compose up -d -docker-compose run ballista-client ./run.sh +docker-compose run ballista-client /run.sh docker-compose down popd diff --git a/dev/release/rat_exclude_files.txt b/dev/release/rat_exclude_files.txt index f9eca7a2ed45..cef0a91eb00b 100644 --- a/dev/release/rat_exclude_files.txt +++ b/dev/release/rat_exclude_files.txt @@ -100,6 +100,6 @@ requirements.txt *.scss .gitattributes rust-toolchain -ballista/rust/benchmarks/tpch/queries/q*.sql +benchmarks/queries/q*.sql ballista/rust/scheduler/testdata/* ballista/ui/scheduler/yarn.lock