Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce new optional scheduler, using Morsel-driven Parallelism + rayon (#2199) #2226

Merged
merged 28 commits into from
May 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
0cf6d72
Morsel-driven Parallelism using rayon (#2199)
tustvold Apr 13, 2022
288cf71
Fix LIFO spawn ordering
tustvold Apr 14, 2022
c3af260
Further docs for ExecutionPipeline
tustvold Apr 14, 2022
ab8eb6e
Deduplicate concurrent wakes
tustvold Apr 14, 2022
a4372a4
Add license headers
tustvold Apr 14, 2022
4419313
Sort Cargo.toml
tustvold Apr 14, 2022
a59e765
Revert accidental change to ParquetExec
tustvold Apr 14, 2022
d9385b7
Handle wakeups triggered by other threads
tustvold Apr 14, 2022
8ac481e
Use SeqCst memory ordering
tustvold Apr 15, 2022
c08f427
Review feedback
tustvold Apr 15, 2022
8a30019
Add panic handler
tustvold Apr 15, 2022
b0839a4
Merge remote-tracking branch 'upstream/master' into datafusion-scheduler
tustvold Apr 19, 2022
73e10e3
Cleanup structs
tustvold Apr 19, 2022
6e6de14
Review feedback
tustvold Apr 19, 2022
b5a13b6
Merge remote-tracking branch 'upstream/master' into datafusion-scheduler
tustvold Apr 21, 2022
ead0427
Use BatchPartitioner
tustvold Apr 21, 2022
f2e2059
Clarify shutdown characteristics
tustvold Apr 21, 2022
0f32bbb
Fix racy test_panic
tustvold Apr 21, 2022
6c636d3
Don't overload Query nomenclature
tustvold Apr 22, 2022
6074a8a
Rename QueryResults to ExecutionResults
tustvold Apr 22, 2022
d6fb3dd
Further review feedback
tustvold Apr 22, 2022
57f4446
Merge remote-tracking branch 'upstream/master' into datafusion-scheduler
tustvold May 3, 2022
a325b37
Merge scheduler into datafusion/core
tustvold May 3, 2022
c21cabc
Review feedback
tustvold May 3, 2022
505e880
Fix partitioned execution
tustvold May 3, 2022
9c00a50
Format
tustvold May 3, 2022
e81131a
Format Cargo.toml
tustvold May 3, 2022
8344e51
Fix doc link
tustvold May 3, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ The parquet SQL benchmarks can be run with
cargo bench --bench parquet_query_sql
```

These randomly generate a parquet file, and then benchmark queries sourced from [parquet_query_sql.sql](./datafusion/benches/parquet_query_sql.sql) against it. This can therefore be a quick way to add coverage of particular query and/or data paths.
These randomly generate a parquet file, and then benchmark queries sourced from [parquet_query_sql.sql](./datafusion/core/benches/parquet_query_sql.sql) against it. This can therefore be a quick way to add coverage of particular query and/or data paths.

If the environment variable `PARQUET_FILE` is set, the benchmark will run queries against this file instead of a randomly generated one. This can be useful for performing multiple runs, potentially with different code, against the same source data, or for testing against a custom dataset.

Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

[workspace]
members = [
"datafusion/core",
"datafusion/common",
"datafusion/core",
"datafusion/expr",
"datafusion/jit",
"datafusion/physical-expr",
Expand Down
9 changes: 7 additions & 2 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ repository = "https://github.com/apache/arrow-datafusion"
readme = "../README.md"
authors = ["Apache Arrow <[email protected]>"]
license = "Apache-2.0"
keywords = [ "arrow", "query", "sql" ]
keywords = ["arrow", "query", "sql"]
include = [
"benches/*.rs",
"src/**/*.rs",
Expand All @@ -50,6 +50,8 @@ pyarrow = ["pyo3", "arrow/pyarrow", "datafusion-common/pyarrow"]
regex_expressions = ["datafusion-physical-expr/regex_expressions"]
# Used to enable row format experiment
row = ["datafusion-row"]
# Used to enable scheduler
scheduler = ["rayon"]
simd = ["arrow/simd"]
unicode_expressions = ["datafusion-physical-expr/regex_expressions"]

Expand All @@ -75,9 +77,10 @@ ordered-float = "3.0"
parking_lot = "0.12"
parquet = { version = "12", features = ["arrow"] }
paste = "^1.0"
pin-project-lite= "^0.2.7"
pin-project-lite = "^0.2.7"
pyo3 = { version = "0.16", optional = true }
rand = "0.8"
rayon = { version = "1.5", optional = true }
smallvec = { version = "1.6", features = ["union"] }
sqlparser = "0.16"
tempfile = "3"
Expand All @@ -88,6 +91,7 @@ uuid = { version = "1.0", features = ["v4"] }
[dev-dependencies]
criterion = "0.3"
doc-comment = "0.3"
env_logger = "0.9"
fuzz-utils = { path = "fuzz-utils" }

[[bench]]
Expand Down Expand Up @@ -121,6 +125,7 @@ name = "physical_plan"
[[bench]]
tustvold marked this conversation as resolved.
Show resolved Hide resolved
harness = false
name = "parquet_query_sql"
required-features = ["scheduler"]

[[bench]]
harness = false
Expand Down
65 changes: 50 additions & 15 deletions datafusion/core/benches/parquet_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use arrow::datatypes::{
};
use arrow::record_batch::RecordBatch;
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion::prelude::{ParquetReadOptions, SessionContext};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion::scheduler::Scheduler;
use futures::stream::StreamExt;
use parquet::arrow::ArrowWriter;
use parquet::file::properties::{WriterProperties, WriterVersion};
use rand::distributions::uniform::SampleUniform;
Expand All @@ -37,7 +39,6 @@ use std::path::Path;
use std::sync::Arc;
use std::time::Instant;
use tempfile::NamedTempFile;
use tokio_stream::StreamExt;

/// The number of batches to write
const NUM_BATCHES: usize = 2048;
Expand Down Expand Up @@ -193,15 +194,24 @@ fn criterion_benchmark(c: &mut Criterion) {
assert!(Path::new(&file_path).exists(), "path not found");
println!("Using parquet file {}", file_path);

let context = SessionContext::new();
let partitions = 4;
let config = SessionConfig::new().with_target_partitions(partitions);
let context = SessionContext::with_config(config);

let rt = tokio::runtime::Builder::new_multi_thread().build().unwrap();
rt.block_on(context.register_parquet(
"t",
file_path.as_str(),
ParquetReadOptions::default(),
))
.unwrap();
let scheduler = Scheduler::new(partitions);

let local_rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();

let query_rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(partitions)
.build()
.unwrap();

local_rt
.block_on(context.register_parquet("t", file_path.as_str(), Default::default()))
.unwrap();

// We read the queries from a file so they can be changed without recompiling the benchmark
let mut queries_file = File::open("benches/parquet_query_sql.sql").unwrap();
Expand All @@ -220,17 +230,42 @@ fn criterion_benchmark(c: &mut Criterion) {
continue;
}

let query = query.as_str();
c.bench_function(query, |b| {
c.bench_function(&format!("tokio: {}", query), |b| {
b.iter(|| {
let query = query.clone();
let context = context.clone();
rt.block_on(async move {
let query = context.sql(query).await.unwrap();
let (sender, mut receiver) = futures::channel::mpsc::unbounded();

// Spawn work to a separate tokio thread pool
query_rt.spawn(async move {
let query = context.sql(&query).await.unwrap();
let mut stream = query.execute_stream().await.unwrap();
while criterion::black_box(stream.next().await).is_some() {}

while let Some(next) = stream.next().await {
sender.unbounded_send(next).unwrap();
}
});

local_rt.block_on(async {
while receiver.next().await.transpose().unwrap().is_some() {}
})
});
});

c.bench_function(&format!("scheduled: {}", query), |b| {
b.iter(|| {
let query = query.clone();
let context = context.clone();

local_rt.block_on(async {
let query = context.sql(&query).await.unwrap();
let plan = query.create_physical_plan().await.unwrap();
let mut stream =
scheduler.schedule(plan, context.task_ctx()).unwrap();
while stream.next().await.transpose().unwrap().is_some() {}
});
});
});
}

// Temporary file must outlive the benchmarks, it is deleted when dropped
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ pub mod physical_optimizer;
pub mod physical_plan;
pub mod prelude;
pub mod scalar;
#[cfg(feature = "scheduler")]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

pub mod scheduler;
pub mod sql;
pub mod variable;

Expand Down
Loading