Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor: Add more documentation about Partitioning #8022

Merged
merged 11 commits into from
Nov 5, 2023
38 changes: 34 additions & 4 deletions datafusion/physical-expr/src/partitioning.rs
Original file line number Diff line number Diff line change
@@ -15,14 +15,43 @@
// specific language governing permissions and limitations
// under the License.

//! [`Partitioning`] and [`Distribution`] for physical expressions
//! [`Partitioning`] and [`Distribution`] for `ExecutionPlans`

use std::fmt;
use std::sync::Arc;

use crate::{expr_list_eq_strict_order, EquivalenceProperties, PhysicalExpr};

/// Partitioning schemes supported by operators.
/// Partitioning schemes supported by [`ExecutionPlan`]s.
///
/// A partition represents an independent stream that an `ExecutionPlan` can
alamb marked this conversation as resolved.
Show resolved Hide resolved
/// produce in parallel. Each `ExecutionPlan` must produce at least one
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each ExecutionPlan instance must produce at least one partition, and consume at most one partition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An ExecutionPlan can consume one or more partitions. I'll try and clarify that later today

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes please my understanding by trait .execute() signature is to run the method for 1 partition at most

    fn execute(
        &self,
        partition: usize,
        context: Arc<TaskContext>,
    ) -> Result<SendableRecordBatchStream>;

/// partition, and the number of partitions varies based on the input and the
alamb marked this conversation as resolved.
Show resolved Hide resolved
/// operation performed.
///
/// ```text
/// ▲ ▲ ▲
/// │ │ │
/// │ │ │ An ExecutionPlan with 3
/// │ │ │ output partitions will
/// ┌───┴──────┴──────┴──┐ produce 3 streams of
/// │ ExecutionPlan │ RecordBatches that run in
/// └────────────────────┘ parallel.
alamb marked this conversation as resolved.
Show resolved Hide resolved
/// ```
///
/// # Examples
///
/// A simple `FileScanExec` might produce one output stream (partition) for each
/// file (note the actual DataFusion file scaners can read individual files in
/// parallel, potentially producing multiple partitions per file)
///
/// Plans such as `SortPreservingMerge` produce a single output stream
/// (1 output partition) by combining some number of input streams (input partitions)
///
/// Plans such as `FilterExec` produce the same number of output streams
/// (partitions) as input streams (partitions).
///
/// [`ExecutionPlan`]: crate::physical_plan::ExecutionPlan
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably can add that every partition is the future by itself, and execution plan node is considered as completed when each partition(future) is completed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably phrase it differently, like "the result of executing a Partition is a async stream (a kind of future)" or something. I'll try and clarify

#[derive(Debug, Clone)]
pub enum Partitioning {
/// Allocate batches using a round-robin algorithm and the specified number of partitions
@@ -129,7 +158,8 @@ impl PartialEq for Partitioning {
}
}

/// Distribution schemes
/// How data is distributed amongst partitions. See [`Partitioning`] for more
/// details.
#[derive(Debug, Clone)]
pub enum Distribution {
/// Unspecified distribution
@@ -142,7 +172,7 @@ pub enum Distribution {
}

impl Distribution {
/// Creates a Partitioning for this Distribution to satisfy itself
/// Creates a `Partitioning` that satisfies this `Distribution`
pub fn create_partitioning(&self, partition_count: usize) -> Partitioning {
match self {
Distribution::UnspecifiedDistribution => {
Loading