Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add baseline execution stats to WindowAggExec and UnionExec, and fixup CoalescePartitionsExec #1018

Merged
merged 4 commits into from
Sep 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions datafusion/src/physical_plan/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,6 @@ impl ExecutionPlan for CoalescePartitionsExec {
}

async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);

// CoalescePartitionsExec produces a single partition
if 0 != partition {
return Err(DataFusionError::Internal(format!(
Expand All @@ -113,10 +111,16 @@ impl ExecutionPlan for CoalescePartitionsExec {
"CoalescePartitionsExec requires at least one input partition".to_owned(),
)),
1 => {
// bypass any threading if there is a single partition
// bypass any threading / metrics if there is a single partition
self.input.execute(0).await
}
_ => {
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
// record the (very) minimal work done so that
// elapsed_compute is not reported as 0
let elapsed_compute = baseline_metrics.elapsed_compute().clone();
let _timer = elapsed_compute.timer();

// use a stream that allows each sender to put in at
// least one result in an attempt to maximize
// parallelism.
Expand Down
65 changes: 60 additions & 5 deletions datafusion/src/physical_plan/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,36 @@

use std::{any::Any, sync::Arc};

use arrow::datatypes::SchemaRef;
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use futures::StreamExt;

use super::{
ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning,
metrics::{ExecutionPlanMetricsSet, MetricsSet},
ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
use crate::{error::Result, physical_plan::expressions};
use crate::{
error::Result,
physical_plan::{expressions, metrics::BaselineMetrics},
};
use async_trait::async_trait;

/// UNION ALL execution plan
#[derive(Debug)]
pub struct UnionExec {
/// Input execution plan
inputs: Vec<Arc<dyn ExecutionPlan>>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
}

impl UnionExec {
/// Create a new UnionExec
pub fn new(inputs: Vec<Arc<dyn ExecutionPlan>>) -> Self {
UnionExec { inputs }
UnionExec {
inputs,
metrics: ExecutionPlanMetricsSet::new(),
}
}
}

Expand Down Expand Up @@ -82,11 +92,18 @@ impl ExecutionPlan for UnionExec {
}

async fn execute(&self, mut partition: usize) -> Result<SendableRecordBatchStream> {
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
// record the tiny amount of work done in this function so
// elapsed_compute is reported as non zero
let elapsed_compute = baseline_metrics.elapsed_compute().clone();
let _timer = elapsed_compute.timer(); // record on drop

// find partition to execute
for input in self.inputs.iter() {
// Calculate whether partition belongs to the current partition
if partition < input.output_partitioning().partition_count() {
return input.execute(partition).await;
let stream = input.execute(partition).await?;
return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)));
} else {
partition -= input.output_partitioning().partition_count();
}
Expand All @@ -110,6 +127,10 @@ impl ExecutionPlan for UnionExec {
}
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn statistics(&self) -> Statistics {
self.inputs
.iter()
Expand All @@ -119,6 +140,40 @@ impl ExecutionPlan for UnionExec {
}
}

/// Stream wrapper that records `BaselineMetrics` for a particular
/// partition
struct ObservedStream {
inner: SendableRecordBatchStream,
baseline_metrics: BaselineMetrics,
}

impl ObservedStream {
fn new(inner: SendableRecordBatchStream, baseline_metrics: BaselineMetrics) -> Self {
Self {
inner,
baseline_metrics,
}
}
}

impl RecordBatchStream for ObservedStream {
fn schema(&self) -> arrow::datatypes::SchemaRef {
self.inner.schema()
}
}

impl futures::Stream for ObservedStream {
type Item = arrow::error::Result<RecordBatch>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let poll = self.inner.poll_next_unpin(cx);
self.baseline_metrics.record_poll(poll)
}
}

fn col_stats_union(
mut left: ColumnStatistics,
right: ColumnStatistics,
Expand Down
51 changes: 43 additions & 8 deletions datafusion/src/physical_plan/windows/window_agg_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
//! Stream and channel implementations for window function expressions.

use crate::error::{DataFusionError, Result};
use crate::physical_plan::metrics::{
BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
};
use crate::physical_plan::{
common, ColumnStatistics, DisplayFormatType, Distribution, ExecutionPlan,
Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, WindowExpr,
Expand All @@ -30,7 +33,7 @@ use arrow::{
};
use async_trait::async_trait;
use futures::stream::Stream;
use futures::Future;
use futures::FutureExt;
use pin_project_lite::pin_project;
use std::any::Any;
use std::pin::Pin;
Expand All @@ -48,6 +51,8 @@ pub struct WindowAggExec {
schema: SchemaRef,
/// Schema before the window
input_schema: SchemaRef,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
}

impl WindowAggExec {
Expand All @@ -59,11 +64,12 @@ impl WindowAggExec {
) -> Result<Self> {
let schema = create_schema(&input_schema, &window_expr)?;
let schema = Arc::new(schema);
Ok(WindowAggExec {
Ok(Self {
input,
window_expr,
schema,
input_schema,
metrics: ExecutionPlanMetricsSet::new(),
})
}

Expand Down Expand Up @@ -140,6 +146,7 @@ impl ExecutionPlan for WindowAggExec {
self.schema.clone(),
self.window_expr.clone(),
input,
BaselineMetrics::new(&self.metrics, partition),
));
Ok(stream)
}
Expand All @@ -163,6 +170,10 @@ impl ExecutionPlan for WindowAggExec {
Ok(())
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn statistics(&self) -> Statistics {
let input_stat = self.input.statistics();
let win_cols = self.window_expr.len();
Expand Down Expand Up @@ -214,6 +225,7 @@ pin_project! {
#[pin]
output: futures::channel::oneshot::Receiver<ArrowResult<RecordBatch>>,
finished: bool,
baseline_metrics: BaselineMetrics,
}
}

Expand All @@ -223,31 +235,41 @@ impl WindowAggStream {
schema: SchemaRef,
window_expr: Vec<Arc<dyn WindowExpr>>,
input: SendableRecordBatchStream,
baseline_metrics: BaselineMetrics,
) -> Self {
let (tx, rx) = futures::channel::oneshot::channel();
let schema_clone = schema.clone();
let elapsed_compute = baseline_metrics.elapsed_compute().clone();
tokio::spawn(async move {
let schema = schema_clone.clone();
let result = WindowAggStream::process(input, window_expr, schema).await;
let result =
WindowAggStream::process(input, window_expr, schema, elapsed_compute)
.await;
tx.send(result)
});

Self {
schema,
output: rx,
finished: false,
schema,
baseline_metrics,
}
}

async fn process(
input: SendableRecordBatchStream,
window_expr: Vec<Arc<dyn WindowExpr>>,
schema: SchemaRef,
elapsed_compute: crate::physical_plan::metrics::Time,
) -> ArrowResult<RecordBatch> {
let input_schema = input.schema();
let batches = common::collect(input)
.await
.map_err(DataFusionError::into_arrow_external_error)?;

// record compute time on drop
let _timer = elapsed_compute.timer();

let batch = common::combine_batches(&batches, input_schema.clone())?;
if let Some(batch) = batch {
// calculate window cols
Expand All @@ -267,18 +289,31 @@ impl WindowAggStream {
impl Stream for WindowAggStream {
type Item = ArrowResult<RecordBatch>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let poll = self.poll_next_inner(cx);
self.baseline_metrics.record_poll(poll)
}
}

impl WindowAggStream {
#[inline]
fn poll_next_inner(
self: &mut Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<ArrowResult<RecordBatch>>> {
if self.finished {
return Poll::Ready(None);
}

// is the output ready?
let this = self.project();
let output_poll = this.output.poll(cx);
let output_poll = self.output.poll_unpin(cx);

match output_poll {
Poll::Ready(result) => {
*this.finished = true;
self.finished = true;
// check for error in receiving channel and unwrap actual result
let result = match result {
Err(e) => Some(Err(ArrowError::ExternalError(Box::new(e)))), // error receiving
Expand Down
Loading