Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unbounded SortExec (and Top-K) Implementation When Req's Are Satisfied #12174

Merged
merged 12 commits into from
Aug 29, 2024
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/metrics/baseline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub struct BaselineMetrics {
}

impl BaselineMetrics {
/// Create a new BaselineMetric structure, and set `start_time` to now
/// Create a new BaselineMetric structure, and set `start_time` to now
pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
let start_time = MetricBuilder::new(metrics).start_timestamp(partition);
start_time.record();
Expand Down
311 changes: 244 additions & 67 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::sync::Arc;

use crate::common::spawn_buffered;
use crate::expressions::PhysicalSortExpr;
use crate::limit::LimitStream;
use crate::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
};
Expand All @@ -51,6 +52,7 @@ use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::LexOrdering;
use datafusion_physical_expr_common::sort_expr::PhysicalSortRequirement;

use futures::{StreamExt, TryStreamExt};
use log::{debug, trace};
Expand Down Expand Up @@ -737,9 +739,22 @@ impl SortExec {
/// This can reduce the memory pressure required by the sort
/// operation since rows that are not going to be included
/// can be dropped.
pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
self.fetch = fetch;
self
pub fn with_fetch(&self, fetch: Option<usize>) -> Self {
let mut cache = self.cache.clone();
if fetch.is_some() && self.cache.execution_mode == ExecutionMode::Unbounded {
// When a theoretically unnecessary sort becomes a top-K (which
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand how a top-k sort would become bounded. I may misundersrtand what the ExecutionMode trait means, but it seems like TopK could not complete until its input completed, but if its input was unbounded the sort itself therefore would also be unbounded

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general you are right -- however, you are missing that we turn it into Bounded only when the sort requirement is already satisfied. This happens when a sort "becomes" unnecessary during one of the plan optimization steps (and it will eventually get removed).

Copy link
Contributor Author

@berkaysynnada berkaysynnada Aug 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand how a top-k sort would become bounded. I may misundersrtand what the ExecutionMode trait means, but it seems like TopK could not complete until its input completed, but if its input was unbounded the sort itself therefore would also be unbounded

I misassumed the implementation of top-k. Could you please take a look to the new idea? I will update the PR title and body

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I think the comments help explain what is going on here well. Thank you

// sometimes arises as an intermediate state before full removal),
// its execution mode should become `Bounded`.
cache.execution_mode = ExecutionMode::Bounded;
}
SortExec {
input: Arc::clone(&self.input),
expr: self.expr.clone(),
metrics_set: self.metrics_set.clone(),
preserve_partitioning: self.preserve_partitioning,
fetch,
cache,
}
}

/// Input schema
Expand Down Expand Up @@ -775,6 +790,16 @@ impl SortExec {
sort_exprs: LexOrdering,
preserve_partitioning: bool,
) -> PlanProperties {
// Determine execution mode:
let sort_satisfied = input.equivalence_properties().ordering_satisfy_requirement(
PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter()).as_slice(),
);
let mode = match input.execution_mode() {
ExecutionMode::Unbounded if sort_satisfied => ExecutionMode::Unbounded,
ExecutionMode::Bounded => ExecutionMode::Bounded,
_ => ExecutionMode::PipelineBreaking,
};

// Calculate equivalence properties; i.e. reset the ordering equivalence
// class with the new ordering:
let eq_properties = input
Expand All @@ -786,14 +811,6 @@ impl SortExec {
let output_partitioning =
Self::output_partitioning_helper(input, preserve_partitioning);

// Determine execution mode:
let mode = match input.execution_mode() {
ExecutionMode::Unbounded | ExecutionMode::PipelineBreaking => {
ExecutionMode::PipelineBreaking
}
ExecutionMode::Bounded => ExecutionMode::Bounded,
};

PlanProperties::new(eq_properties, output_partitioning, mode)
}
}
Expand Down Expand Up @@ -874,53 +891,68 @@ impl ExecutionPlan for SortExec {

trace!("End SortExec's input.execute for partition: {}", partition);

if let Some(fetch) = self.fetch.as_ref() {
let mut topk = TopK::try_new(
partition,
input.schema(),
self.expr.clone(),
*fetch,
context.session_config().batch_size(),
context.runtime_env(),
&self.metrics_set,
partition,
)?;

Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
futures::stream::once(async move {
while let Some(batch) = input.next().await {
let batch = batch?;
topk.insert_batch(batch)?;
}
topk.emit()
})
.try_flatten(),
)))
} else {
let mut sorter = ExternalSorter::new(
partition,
input.schema(),
self.expr.clone(),
context.session_config().batch_size(),
self.fetch,
execution_options.sort_spill_reservation_bytes,
execution_options.sort_in_place_threshold_bytes,
&self.metrics_set,
context.runtime_env(),
let sort_satisfied = self
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same calculation as self.execution_mode(), right? Maybe we could call self.execution_mode here instead to be more efficient and ensure the calculations remained in sync

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is similar but not exactly the same (i.e. execution mode is derived from sort_satisfied but AFAIK the reverse is not possible). I think @berkaysynnada tried this but it didn't work

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tried that but didn't work. Knowing the sort is bounded or unbounded does not mean sort is satisfied.

.input
.equivalence_properties()
.ordering_satisfy_requirement(
PhysicalSortRequirement::from_sort_exprs(self.expr.iter()).as_slice(),
);

Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
futures::stream::once(async move {
while let Some(batch) = input.next().await {
let batch = batch?;
sorter.insert_batch(batch).await?;
}
sorter.sort()
})
.try_flatten(),
)))
match (sort_satisfied, self.fetch.as_ref()) {
(true, Some(fetch)) => Ok(Box::pin(LimitStream::new(
input,
0,
Some(*fetch),
BaselineMetrics::new(&self.metrics_set, partition),
))),
(true, None) => Ok(input),
(false, Some(fetch)) => {
let mut topk = TopK::try_new(
partition,
input.schema(),
self.expr.clone(),
*fetch,
context.session_config().batch_size(),
context.runtime_env(),
&self.metrics_set,
partition,
)?;
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
futures::stream::once(async move {
while let Some(batch) = input.next().await {
let batch = batch?;
topk.insert_batch(batch)?;
}
topk.emit()
})
.try_flatten(),
)))
}
(false, None) => {
let mut sorter = ExternalSorter::new(
partition,
input.schema(),
self.expr.clone(),
context.session_config().batch_size(),
self.fetch,
execution_options.sort_spill_reservation_bytes,
execution_options.sort_in_place_threshold_bytes,
&self.metrics_set,
context.runtime_env(),
);
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
futures::stream::once(async move {
while let Some(batch) = input.next().await {
let batch = batch?;
sorter.insert_batch(batch).await?;
}
sorter.sort()
})
.try_flatten(),
)))
}
}
}

Expand All @@ -933,14 +965,7 @@ impl ExecutionPlan for SortExec {
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
Some(Arc::new(SortExec {
input: Arc::clone(&self.input),
expr: self.expr.clone(),
metrics_set: self.metrics_set.clone(),
preserve_partitioning: self.preserve_partitioning,
fetch: limit,
cache: self.cache.clone(),
}))
Some(Arc::new(SortExec::with_fetch(self, limit)))
}

fn fetch(&self) -> Option<usize> {
Expand All @@ -951,6 +976,8 @@ impl ExecutionPlan for SortExec {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::pin::Pin;
use std::task::{Context, Poll};

use super::*;
use crate::coalesce_partitions::CoalescePartitionsExec;
Expand All @@ -965,12 +992,124 @@ mod tests {
use arrow::compute::SortOptions;
use arrow::datatypes::*;
use datafusion_common::cast::as_primitive_array;
use datafusion_common::{assert_batches_eq, Result, ScalarValue};
use datafusion_execution::config::SessionConfig;
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use datafusion_execution::RecordBatchStream;
use datafusion_physical_expr::expressions::{Column, Literal};
use datafusion_physical_expr::EquivalenceProperties;

use futures::{FutureExt, Stream};

#[derive(Debug, Clone)]
pub struct SortedUnboundedExec {
schema: Schema,
batch_size: u64,
cache: PlanProperties,
}

impl DisplayAs for SortedUnboundedExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "UnboundableExec",).unwrap()
}
}
Ok(())
}
}

impl SortedUnboundedExec {
fn compute_properties(schema: SchemaRef) -> PlanProperties {
let mut eq_properties = EquivalenceProperties::new(schema);
eq_properties.add_new_orderings(vec![vec![PhysicalSortExpr::new(
Arc::new(Column::new("c1", 0)),
SortOptions::default(),
)]]);
let mode = ExecutionMode::Unbounded;
PlanProperties::new(eq_properties, Partitioning::UnknownPartitioning(1), mode)
}
}

impl ExecutionPlan for SortedUnboundedExec {
fn name(&self) -> &'static str {
Self::static_name()
}

fn as_any(&self) -> &dyn Any {
self
}

fn properties(&self) -> &PlanProperties {
&self.cache
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}

fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}

fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
Ok(Box::pin(SortedUnboundedStream {
schema: Arc::new(self.schema.clone()),
batch_size: self.batch_size,
offset: 0,
}))
}
}

#[derive(Debug)]
pub struct SortedUnboundedStream {
schema: SchemaRef,
batch_size: u64,
offset: u64,
}

use datafusion_common::ScalarValue;
use datafusion_physical_expr::expressions::Literal;
use futures::FutureExt;
impl Stream for SortedUnboundedStream {
type Item = Result<RecordBatch>;

fn poll_next(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let batch = SortedUnboundedStream::create_record_batch(
Arc::clone(&self.schema),
self.offset,
self.batch_size,
);
self.offset += self.batch_size;
Poll::Ready(Some(Ok(batch)))
}
}

impl RecordBatchStream for SortedUnboundedStream {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
}

impl SortedUnboundedStream {
fn create_record_batch(
schema: SchemaRef,
offset: u64,
batch_size: u64,
) -> RecordBatch {
let values = (0..batch_size).map(|i| offset + i).collect::<Vec<_>>();
let array = UInt64Array::from(values);
let array_ref: ArrayRef = Arc::new(array);
RecordBatch::try_new(schema, vec![array_ref]).unwrap()
}
}

#[tokio::test]
async fn test_in_mem_sort() -> Result<()> {
Expand Down Expand Up @@ -1414,4 +1553,42 @@ mod tests {
let result = sort_batch(&batch, &expressions, None).unwrap();
assert_eq!(result.num_rows(), 1);
}

#[tokio::test]
async fn topk_unbounded_source() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let schema = Schema::new(vec![Field::new("c1", DataType::UInt64, false)]);
let source = SortedUnboundedExec {
schema: schema.clone(),
batch_size: 2,
cache: SortedUnboundedExec::compute_properties(Arc::new(schema.clone())),
};
let mut plan = SortExec::new(
vec![PhysicalSortExpr::new(
Arc::new(Column::new("c1", 0)),
SortOptions::default(),
)],
Arc::new(source),
);
plan = plan.with_fetch(Some(9));

let batches = collect(Arc::new(plan), task_ctx).await?;
#[rustfmt::skip]
let expected = [
"+----+",
"| c1 |",
"+----+",
"| 0 |",
"| 1 |",
"| 2 |",
"| 3 |",
"| 4 |",
"| 5 |",
"| 6 |",
"| 7 |",
"| 8 |",
"+----+",];
assert_batches_eq!(expected, &batches);
Ok(())
}
}
Loading
Loading