Skip to content

Commit

Permalink
Add metrics for UnnestExec (#8482)
Browse files Browse the repository at this point in the history
  • Loading branch information
simonvandel authored Dec 14, 2023
1 parent 5be8dbe commit 72e39b8
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 30 deletions.
24 changes: 23 additions & 1 deletion datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use datafusion::prelude::JoinType;
use datafusion::prelude::{CsvReadOptions, ParquetReadOptions};
use datafusion::test_util::parquet_test_data;
use datafusion::{assert_batches_eq, assert_batches_sorted_eq};
use datafusion_common::{DataFusionError, ScalarValue, UnnestOptions};
use datafusion_common::{assert_contains, DataFusionError, ScalarValue, UnnestOptions};
use datafusion_execution::config::SessionConfig;
use datafusion_expr::expr::{GroupingSet, Sort};
use datafusion_expr::{
Expand Down Expand Up @@ -1408,6 +1408,28 @@ async fn unnest_with_redundant_columns() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn unnest_analyze_metrics() -> Result<()> {
const NUM_ROWS: usize = 5;

let df = table_with_nested_types(NUM_ROWS).await?;
let results = df
.unnest_column("tags")?
.explain(false, true)?
.collect()
.await?;
let formatted = arrow::util::pretty::pretty_format_batches(&results)
.unwrap()
.to_string();
assert_contains!(&formatted, "elapsed_compute=");
assert_contains!(&formatted, "input_batches=1");
assert_contains!(&formatted, "input_rows=5");
assert_contains!(&formatted, "output_rows=10");
assert_contains!(&formatted, "output_batches=1");

Ok(())
}

async fn create_test_table(name: &str) -> Result<DataFrame> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Expand Down
92 changes: 63 additions & 29 deletions datafusion/physical-plan/src/unnest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

//! Defines the unnest column plan for unnesting values in a column that contains a list
//! type, conceptually is like joining each row with all the values in the list column.
use std::time::Instant;
use std::{any::Any, sync::Arc};

use super::DisplayAs;
Expand All @@ -44,6 +42,8 @@ use async_trait::async_trait;
use futures::{Stream, StreamExt};
use log::trace;

use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};

/// Unnest the given column by joining the row with each value in the
/// nested type.
///
Expand All @@ -58,6 +58,8 @@ pub struct UnnestExec {
column: Column,
/// Options
options: UnnestOptions,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
}

impl UnnestExec {
Expand All @@ -73,6 +75,7 @@ impl UnnestExec {
schema,
column,
options,
metrics: Default::default(),
}
}
}
Expand Down Expand Up @@ -141,19 +144,58 @@ impl ExecutionPlan for UnnestExec {
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let input = self.input.execute(partition, context)?;
let metrics = UnnestMetrics::new(partition, &self.metrics);

Ok(Box::pin(UnnestStream {
input,
schema: self.schema.clone(),
column: self.column.clone(),
options: self.options.clone(),
num_input_batches: 0,
num_input_rows: 0,
num_output_batches: 0,
num_output_rows: 0,
unnest_time: 0,
metrics,
}))
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
}

#[derive(Clone, Debug)]
struct UnnestMetrics {
/// total time for column unnesting
elapsed_compute: metrics::Time,
/// Number of batches consumed
input_batches: metrics::Count,
/// Number of rows consumed
input_rows: metrics::Count,
/// Number of batches produced
output_batches: metrics::Count,
/// Number of rows produced by this operator
output_rows: metrics::Count,
}

impl UnnestMetrics {
fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
let elapsed_compute = MetricBuilder::new(metrics).elapsed_compute(partition);

let input_batches =
MetricBuilder::new(metrics).counter("input_batches", partition);

let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);

let output_batches =
MetricBuilder::new(metrics).counter("output_batches", partition);

let output_rows = MetricBuilder::new(metrics).output_rows(partition);

Self {
input_batches,
input_rows,
output_batches,
output_rows,
elapsed_compute,
}
}
}

/// A stream that issues [RecordBatch]es with unnested column data.
Expand All @@ -166,16 +208,8 @@ struct UnnestStream {
column: Column,
/// Options
options: UnnestOptions,
/// number of input batches
num_input_batches: usize,
/// number of input rows
num_input_rows: usize,
/// number of batches produced
num_output_batches: usize,
/// number of rows produced
num_output_rows: usize,
/// total time for column unnesting, in ms
unnest_time: usize,
/// Metrics
metrics: UnnestMetrics,
}

impl RecordBatchStream for UnnestStream {
Expand Down Expand Up @@ -207,28 +241,28 @@ impl UnnestStream {
.poll_next_unpin(cx)
.map(|maybe_batch| match maybe_batch {
Some(Ok(batch)) => {
let start = Instant::now();
let timer = self.metrics.elapsed_compute.timer();
let result =
build_batch(&batch, &self.schema, &self.column, &self.options);
self.num_input_batches += 1;
self.num_input_rows += batch.num_rows();
self.metrics.input_batches.add(1);
self.metrics.input_rows.add(batch.num_rows());
if let Ok(ref batch) = result {
self.unnest_time += start.elapsed().as_millis() as usize;
self.num_output_batches += 1;
self.num_output_rows += batch.num_rows();
timer.done();
self.metrics.output_batches.add(1);
self.metrics.output_rows.add(batch.num_rows());
}

Some(result)
}
other => {
trace!(
"Processed {} probe-side input batches containing {} rows and \
produced {} output batches containing {} rows in {} ms",
self.num_input_batches,
self.num_input_rows,
self.num_output_batches,
self.num_output_rows,
self.unnest_time,
produced {} output batches containing {} rows in {}",
self.metrics.input_batches,
self.metrics.input_rows,
self.metrics.output_batches,
self.metrics.output_rows,
self.metrics.elapsed_compute,
);
other
}
Expand Down

0 comments on commit 72e39b8

Please sign in to comment.