From 72e39b8bce867d1f141356a918400b185e6efe74 Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Thu, 14 Dec 2023 23:28:25 +0300 Subject: [PATCH] Add metrics for UnnestExec (#8482) --- datafusion/core/tests/dataframe/mod.rs | 24 ++++++- datafusion/physical-plan/src/unnest.rs | 92 ++++++++++++++++++-------- 2 files changed, 86 insertions(+), 30 deletions(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index c6b8e0e01b4f..ba661aa2445c 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -39,7 +39,7 @@ use datafusion::prelude::JoinType; use datafusion::prelude::{CsvReadOptions, ParquetReadOptions}; use datafusion::test_util::parquet_test_data; use datafusion::{assert_batches_eq, assert_batches_sorted_eq}; -use datafusion_common::{DataFusionError, ScalarValue, UnnestOptions}; +use datafusion_common::{assert_contains, DataFusionError, ScalarValue, UnnestOptions}; use datafusion_execution::config::SessionConfig; use datafusion_expr::expr::{GroupingSet, Sort}; use datafusion_expr::{ @@ -1408,6 +1408,28 @@ async fn unnest_with_redundant_columns() -> Result<()> { Ok(()) } +#[tokio::test] +async fn unnest_analyze_metrics() -> Result<()> { + const NUM_ROWS: usize = 5; + + let df = table_with_nested_types(NUM_ROWS).await?; + let results = df + .unnest_column("tags")? + .explain(false, true)? + .collect() + .await?; + let formatted = arrow::util::pretty::pretty_format_batches(&results) + .unwrap() + .to_string(); + assert_contains!(&formatted, "elapsed_compute="); + assert_contains!(&formatted, "input_batches=1"); + assert_contains!(&formatted, "input_rows=5"); + assert_contains!(&formatted, "output_rows=10"); + assert_contains!(&formatted, "output_batches=1"); + + Ok(()) +} + async fn create_test_table(name: &str) -> Result { let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Utf8, false), diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index af4a81626cd7..b9e732c317af 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -17,8 +17,6 @@ //! Defines the unnest column plan for unnesting values in a column that contains a list //! type, conceptually is like joining each row with all the values in the list column. - -use std::time::Instant; use std::{any::Any, sync::Arc}; use super::DisplayAs; @@ -44,6 +42,8 @@ use async_trait::async_trait; use futures::{Stream, StreamExt}; use log::trace; +use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; + /// Unnest the given column by joining the row with each value in the /// nested type. /// @@ -58,6 +58,8 @@ pub struct UnnestExec { column: Column, /// Options options: UnnestOptions, + /// Execution metrics + metrics: ExecutionPlanMetricsSet, } impl UnnestExec { @@ -73,6 +75,7 @@ impl UnnestExec { schema, column, options, + metrics: Default::default(), } } } @@ -141,19 +144,58 @@ impl ExecutionPlan for UnnestExec { context: Arc, ) -> Result { let input = self.input.execute(partition, context)?; + let metrics = UnnestMetrics::new(partition, &self.metrics); Ok(Box::pin(UnnestStream { input, schema: self.schema.clone(), column: self.column.clone(), options: self.options.clone(), - num_input_batches: 0, - num_input_rows: 0, - num_output_batches: 0, - num_output_rows: 0, - unnest_time: 0, + metrics, })) } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } +} + +#[derive(Clone, Debug)] +struct UnnestMetrics { + /// total time for column unnesting + elapsed_compute: metrics::Time, + /// Number of batches consumed + input_batches: metrics::Count, + /// Number of rows consumed + input_rows: metrics::Count, + /// Number of batches produced + output_batches: metrics::Count, + /// Number of rows produced by this operator + output_rows: metrics::Count, +} + +impl UnnestMetrics { + fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self { + let elapsed_compute = MetricBuilder::new(metrics).elapsed_compute(partition); + + let input_batches = + MetricBuilder::new(metrics).counter("input_batches", partition); + + let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition); + + let output_batches = + MetricBuilder::new(metrics).counter("output_batches", partition); + + let output_rows = MetricBuilder::new(metrics).output_rows(partition); + + Self { + input_batches, + input_rows, + output_batches, + output_rows, + elapsed_compute, + } + } } /// A stream that issues [RecordBatch]es with unnested column data. @@ -166,16 +208,8 @@ struct UnnestStream { column: Column, /// Options options: UnnestOptions, - /// number of input batches - num_input_batches: usize, - /// number of input rows - num_input_rows: usize, - /// number of batches produced - num_output_batches: usize, - /// number of rows produced - num_output_rows: usize, - /// total time for column unnesting, in ms - unnest_time: usize, + /// Metrics + metrics: UnnestMetrics, } impl RecordBatchStream for UnnestStream { @@ -207,15 +241,15 @@ impl UnnestStream { .poll_next_unpin(cx) .map(|maybe_batch| match maybe_batch { Some(Ok(batch)) => { - let start = Instant::now(); + let timer = self.metrics.elapsed_compute.timer(); let result = build_batch(&batch, &self.schema, &self.column, &self.options); - self.num_input_batches += 1; - self.num_input_rows += batch.num_rows(); + self.metrics.input_batches.add(1); + self.metrics.input_rows.add(batch.num_rows()); if let Ok(ref batch) = result { - self.unnest_time += start.elapsed().as_millis() as usize; - self.num_output_batches += 1; - self.num_output_rows += batch.num_rows(); + timer.done(); + self.metrics.output_batches.add(1); + self.metrics.output_rows.add(batch.num_rows()); } Some(result) @@ -223,12 +257,12 @@ impl UnnestStream { other => { trace!( "Processed {} probe-side input batches containing {} rows and \ - produced {} output batches containing {} rows in {} ms", - self.num_input_batches, - self.num_input_rows, - self.num_output_batches, - self.num_output_rows, - self.unnest_time, + produced {} output batches containing {} rows in {}", + self.metrics.input_batches, + self.metrics.input_rows, + self.metrics.output_batches, + self.metrics.output_rows, + self.metrics.elapsed_compute, ); other }