Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce new optional scheduler, using Morsel-driven Parallelism + rayon (#2199) #2226

Merged
merged 28 commits into from
May 4, 2022

Conversation

tustvold
Copy link
Contributor

@tustvold tustvold commented Apr 13, 2022

Which issue does this PR close?

Closes #2199.

Rationale for this change

See ticket

What changes are included in this PR?

Adds a new datafusion-scheduler crate that provides an implementation of a push-based, morsel-driven scheduler based on rayon. More details, background, and discussion can be found in the proposal document here, please feel free to comment there.

I would describe the current implementation as an MVP, and there are definitely things that could be improved, but I would like to propose we get something merged and can then iterate on it further. My hope is that we can iteratively refine the implementation, and then once we are happy with it begin the process of refactoring the operators to better align with it.

Are there any user-facing changes?

No, this is an entirely new crate, and does not require changes to any others.

@github-actions github-actions bot added the datafusion Changes in the datafusion crate label Apr 13, 2022
let mut context = context.clone();
let (sender, mut receiver) = futures::channel::mpsc::unbounded();

// Spawn work to a separate tokio thread pool
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes the performance comparison more fair, as the dispatch to another thread pool is necessary in both approaches, as you don't want to run long-running query workloads on the same tokio pool as say network IO, and can have a non-negligible impact on the simpler queries.

@tustvold tustvold marked this pull request as draft April 13, 2022 18:29
@tustvold tustvold force-pushed the datafusion-scheduler branch from f43b114 to 0cf6d72 Compare April 13, 2022 18:32
@alamb
Copy link
Contributor

alamb commented Apr 13, 2022

Can you please provide benchmark results as well? I am pretty sure you have them so I figure this is a light ask :)

@tustvold
Copy link
Contributor Author

tustvold commented Apr 13, 2022

Good shout, I'll need to re-run them as some things have changed since then, will do first thing tomorrow. 👍

The big ROI will be when operators start exploiting rayon and we simplify ExecutionPlan to match, but last time I ran them we were getting some nice performance improvements despite the shimming costs

@tustvold
Copy link
Contributor Author

tustvold commented Apr 14, 2022

Unfortunately I've discovered a bug with this implementation this morning, concerning concurrent wakeups. This could result in the same Task being scheduled concurrently, potentially leading to output reordering. I need to think a bit more on how to handle this. I have updated the description accordingly.

Once fixed I will capture the benchmarks again 😄

Edit: this should now be fixed, but I want to write some tests to be sure

@tustvold
Copy link
Contributor Author

tustvold commented Apr 14, 2022

Ok as promised some benchmarks. It should be noted these come with some pretty big disclaimers:

  • Until we make changes to ExecutionPlan, the scheduler cannot introduce additional parallelism within a partition, as it is constrained by the current pull-based interface. Removing this will be a key performance unlock
  • The Parquet SQL benchmarks are massively dominated by the performance of the parquet decoders, which may not be representative of all query workloads
  • Currently DataFusion uses tokio::spawn_blocking in the tokio case. Aside from this giving tokio more threads to play with, it also results in perfect thread-locality for the parquet decoder. I have therefore collected results with and without this enabled
  • My focus thus far has been to get something working, and not to squeeze out as much performance as possible, there is likely lots that could be improved
  • These benchmarks were run on my local machine with a Ryzen 5950x, 64GB RAM, and SSD storage, this may not be representative of server hardware

That all being said, in like-for-like comparisons (i.e. without spawn blocking) we are actually seeing a slight performance improvement from the scheduler 🎉. I've not looked into why this is, but the only thing I can think of that might have improved performance is the switch to use rayon, everything else is either the same or would make it slower.

@alamb alamb changed the title Morsel-driven Parallelism using rayon (#2199) Introduce new optional scheduler, using Morsel-driven Parallelism + rayon (#2199) Apr 14, 2022
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started going through it -- very cool @tustvold . Sorry for the myriad of comments -- it is mostly me understanding rather than anything I see needing to change.

  • It would be cool to be able to visualized(in text or graphviz form) whatever structures are created in this scheduler (e.g. Pipeline / Task / contents of a Task)

Can you give some thoughts about how the classic "pipeline breaker" operators fit into this model? It seems like this code will implicitly do this as all the Tasks above the pipeline breaker will not be ready to run until tasks below it are finished

Also, I plan to try a POC of this scheduler into IOx to see how it shakes down on our workloads

datafusion/scheduler/src/lib.rs Outdated Show resolved Hide resolved
datafusion/scheduler/src/lib.rs Outdated Show resolved Hide resolved
datafusion/scheduler/src/lib.rs Outdated Show resolved Hide resolved
datafusion/scheduler/src/lib.rs Outdated Show resolved Hide resolved
datafusion/scheduler/src/lib.rs Outdated Show resolved Hide resolved
datafusion/scheduler/src/pipeline/mod.rs Outdated Show resolved Hide resolved
datafusion/scheduler/src/task.rs Outdated Show resolved Hide resolved
datafusion/scheduler/src/task.rs Outdated Show resolved Hide resolved
datafusion/scheduler/src/task.rs Outdated Show resolved Hide resolved
datafusion/scheduler/src/task.rs Outdated Show resolved Hide resolved

/// Spawn a [`Task`] onto the local workers thread pool
///
/// There is no guaranteed order of execution, as workers may steal at any time. However,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

datafusion/scheduler/src/pipeline/execution.rs Outdated Show resolved Hide resolved
datafusion/scheduler/src/pipeline/mod.rs Outdated Show resolved Hide resolved
datafusion/scheduler/src/pipeline/mod.rs Outdated Show resolved Hide resolved
datafusion/scheduler/src/pipeline/mod.rs Outdated Show resolved Hide resolved
@andygrove
Copy link
Member

I am interested in this and would like to review but realistically, I won't be able to get to this until sometime next week.

@tustvold
Copy link
Contributor Author

No rush at all, I am away much of next week, and this is a bit of a chonker, so happy to wait and give people adequate time to review 👍

@alamb
Copy link
Contributor

alamb commented Apr 21, 2022

I plan to review this carefully tomorrow morning

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! I think this looks like a great step forward and an exciting direction to push DataFusion. Thank you very much @tustvold

I did a POC of using this scheduler in IOx (https://github.com/influxdata/influxdb_iox/pull/4397) . Some changes / notes based on that attempt

RecordBatchStream

Implementing RecordBatchStream for QueryResults would make the migration much easier.

Here is a PR showing what I had to do tustvold#37

Feature of the core

Making the scheduler an optional feature of datafusion crate would ease integration as well (so that I didn't have to add a new explicit dependency in IOx)

Execute individual Partitions

I couldn't find a way to ask the scheduler to run a single partition -- in compact plans IOx wants to get the output of each partition individually (rather than getting the combined version) as it stores them as individual Parquet files:

https://github.com/influxdata/influxdb_iox/blob/e8bfd7a/compactor/src/compact.rs#L690-L700

Thoughts on replacing tokio

"to get the maximum performance when doing fully CPU bound work, DataFusion needed full control for task scheduling. tokio served us well for several years and still is a great choice for CPU bound tasks for the reasons mentioned in this blog"

cc @yjshen @houqp @jimexist @Dandandan @thinkharderdev

datafusion/scheduler/src/lib.rs Outdated Show resolved Hide resolved
datafusion/scheduler/Cargo.toml Outdated Show resolved Hide resolved
datafusion/scheduler/src/lib.rs Outdated Show resolved Hide resolved
datafusion/scheduler/src/lib.rs Outdated Show resolved Hide resolved
#[derive(Debug, Default)]
struct InputPartition {
buffer: VecDeque<RecordBatch>,
wait_list: Vec<Waker>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe calling it wake_list would be better to align with the list of wakers

Suggested change
wait_list: Vec<Waker>,
wake_list: Vec<Waker>,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the "standard" nomenclature for a list of waiting tasks https://docs.rs/tokio/latest/src/tokio/sync/notify.rs.html#121 https://docs.rs/futures-util/0.3.21/src/futures_util/lock/mutex.rs.html#22

I guess the idea is that the Waker is the representation of a waiting task, maybe??

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those two links seem to point at a list called waiters (rather than wait_list) so not quite what is in this PR either :)

I don't feel strongly about this

datafusion/scheduler/src/pipeline/execution.rs Outdated Show resolved Hide resolved
plan: Arc<dyn ExecutionPlan>,
parent: Option<OutputLink>,
) -> Result<()> {
if let Some(repartition) = plan.as_any().downcast_ref::<RepartitionExec>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth some commentary here that RepartitionExec and CoalscePartitonsExec are handled natively by the scheduler and thus not directly added into a Pipeline

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They aren't added into an ExecutionPipeline they are still a Pipeline technically...

datafusion/scheduler/src/query.rs Outdated Show resolved Hide resolved
datafusion/scheduler/src/task.rs Outdated Show resolved Hide resolved
datafusion/scheduler/src/task.rs Outdated Show resolved Hide resolved
datafusion/scheduler/src/lib.rs Outdated Show resolved Hide resolved
datafusion/scheduler/src/plan.rs Outdated Show resolved Hide resolved
datafusion/scheduler/src/task.rs Outdated Show resolved Hide resolved
@thinkharderdev
Copy link
Contributor

I think there is an issue with filtering parquet sources (for some reason it is specific to parquet). PR'd a failing test case tustvold#38.

Projection seems to work, but for some reason FilterExec results in an empty result.

@tustvold
Copy link
Contributor Author

tustvold commented May 3, 2022

I think this is now ready, I'll leave it up for a few more days in case anyone else wants to take a look. Perhaps @yjshen or @houqp

@yjshen
Copy link
Member

yjshen commented May 3, 2022

Thanks @tustvold, I'll finish my review tomorrow.

@alamb
Copy link
Contributor

alamb commented May 4, 2022

I'll also try and give it another read tomorrow ~ 12:00 UTC

Copy link
Member

@yjshen yjshen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Epic work and well-documented, It's really great to read. Thanks @tustvold!

Two minor questions:

  1. How's the new scheduler-based execution's performance compared to the old one? Are there benchmark results available now?
  2. Can we provide some diagrams to illustrate how a tree of ExecutionPlan is break-up into pipelines and how pipelines are run to finish?

//! .unwrap()
//! .create_physical_plan()
//! .await
//! .unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the above four lines are 1 space less of indent.

//! .await
//! .unwrap();
//!
//! let task = context.task_ctx();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The task is slightly misleading from the pipeline's Task.

}

#[derive(Debug, Clone)]
pub struct Spawner {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc for the Spawner? We could check the clippy by

cargo clippy --all-targets --features=scheduler --workspace -- -D warnings

///
/// Longer term we will likely want to introduce new traits that differentiate between
/// pipeline-able operators like filters, and pipeline-breakers like aggregations, and
/// are better aligned with a push-based execution model.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

/// An aggregation pipeline which combines data from one or more input partitions into
/// a single output partition. [`Pipeline::push`] would eagerly update the computed
/// aggregates, and the final [`Pipeline::close`] trigger flushing these to the output.
/// It would also be possible to flush once the partial aggregates reach a certain size
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

///
/// An aggregation pipeline which combines data from one or more input partitions into
/// a single output partition. [`Pipeline::push`] would eagerly update the computed
/// aggregates, and the final [`Pipeline::close`] trigger flushing these to the output.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close marks the end of input, and poll_partition flushes aggregate states to the output?

pipeline: usize,

/// The partition of the pipeline within `query` to poll
partition: usize,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Wake up the waker and re-enqueue the task.

// Add the operator to the current group of operators to be combined
// into a single [`ExecutionPipeline`].
//
// TODO: More sophisticated policy, just because we can combine them doesn't mean we should
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The policy will be best combined with a new push-aware ExecutionPlan API.

/// to one or more output partitions. As a [`Pipeline`] may drawn on input from
/// more than one upstream [`Pipeline`], input partitions are identified by both
/// a child index, and a partition index, whereas output partitions are only
/// identified by a partition index.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pipeline inputs are identified by both a upstream index, and a partition index, whereas pipeline outputs are only
identified by a partition index.

///
/// There is no guaranteed order of execution, as workers may steal at any time. However,
/// `spawn_local` will append to the front of the current worker's queue, workers pop tasks from
/// the front of their queue, and steal tasks from the back of other workers queues
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 TIL

@alamb
Copy link
Contributor

alamb commented May 4, 2022

Can we provide some diagrams to illustrate how a tree of ExecutionPlan is break-up into pipelines and how pipelines are run to finish?

I volunteer to help create ascii art for this. If you want to meet @tustvold and we can work some examples I can ascii-ify them 😎

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀

@@ -218,6 +218,8 @@ pub mod physical_optimizer;
pub mod physical_plan;
pub mod prelude;
pub mod scalar;
#[cfg(feature = "scheduler")]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@alamb alamb merged commit dc76ec1 into apache:master May 4, 2022
@alamb
Copy link
Contributor

alamb commented May 4, 2022

Thanks @tustvold and @yjshen -- I agree this is a great step forward. I expect we'll keep iterating but this is a good foundation I think

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Morsel-Driven Parallelism Using Rayon
7 participants