Skip to content

Commit

Permalink
Improve SQLMetric APIs, port existing metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Aug 19, 2021
1 parent 0aa5f27 commit ff41c08
Show file tree
Hide file tree
Showing 17 changed files with 1,320 additions and 390 deletions.
24 changes: 13 additions & 11 deletions ballista/rust/core/src/execution_plans/shuffle_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::error::Result as ArrowResult;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SQLMetric,
use datafusion::physical_plan::metrics::{
ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
};
use datafusion::physical_plan::{DisplayFormatType, ExecutionPlan, Metric, Partitioning};
use datafusion::{
error::{DataFusionError, Result},
physical_plan::RecordBatchStream,
Expand All @@ -47,8 +48,8 @@ pub struct ShuffleReaderExec {
/// Each partition of a shuffle can read data from multiple locations
pub(crate) partition: Vec<Vec<PartitionLocation>>,
pub(crate) schema: SchemaRef,
/// Time to fetch data from executor
fetch_time: Arc<SQLMetric>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
}

impl ShuffleReaderExec {
Expand All @@ -60,7 +61,7 @@ impl ShuffleReaderExec {
Ok(Self {
partition,
schema,
fetch_time: SQLMetric::time_nanos(),
metrics: ExecutionPlanMetricsSet::new(),
})
}
}
Expand Down Expand Up @@ -100,13 +101,16 @@ impl ExecutionPlan for ShuffleReaderExec {
) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
info!("ShuffleReaderExec::execute({})", partition);

let start = Instant::now();
let fetch_time =
MetricBuilder::new(&self.metrics).subset_time("fetch_time", partition);
let timer = fetch_time.timer();

let partition_locations = &self.partition[partition];
let result = future::join_all(partition_locations.iter().map(fetch_partition))
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
self.fetch_time.add_elapsed(start);
timer.done();

let result = WrappedStream::new(
Box::pin(futures::stream::iter(result).flatten()),
Expand Down Expand Up @@ -149,10 +153,8 @@ impl ExecutionPlan for ShuffleReaderExec {
}
}

fn metrics(&self) -> HashMap<String, SQLMetric> {
let mut metrics = HashMap::new();
metrics.insert("fetchTime".to_owned(), (*self.fetch_time).clone());
metrics
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
}

Expand Down
60 changes: 33 additions & 27 deletions ballista/rust/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,13 @@ use datafusion::arrow::ipc::writer::FileWriter;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError, Result};
use datafusion::physical_plan::hash_utils::create_hashes;
use datafusion::physical_plan::metrics::{
self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
};
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::Partitioning::RoundRobinBatch;
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SQLMetric,
DisplayFormatType, ExecutionPlan, Metric, Partitioning, RecordBatchStream,
};
use futures::StreamExt;
use hashbrown::HashMap;
Expand All @@ -71,24 +74,30 @@ pub struct ShuffleWriterExec {
work_dir: String,
/// Optional shuffle output partitioning
shuffle_output_partitioning: Option<Partitioning>,
/// Shuffle write metrics
metrics: ShuffleWriteMetrics,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
}

#[derive(Debug, Clone)]
struct ShuffleWriteMetrics {
/// Time spend writing batches to shuffle files
write_time: Arc<SQLMetric>,
input_rows: Arc<SQLMetric>,
output_rows: Arc<SQLMetric>,
write_time: metrics::Time,
input_rows: metrics::Count,
output_rows: metrics::Count,
}

impl ShuffleWriteMetrics {
fn new() -> Self {
fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
let write_time = MetricBuilder::new(metrics).subset_time("write_time", partition);

let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);

let output_rows = MetricBuilder::new(metrics).output_rows(partition);

Self {
write_time: SQLMetric::time_nanos(),
input_rows: SQLMetric::counter(),
output_rows: SQLMetric::counter(),
write_time,
input_rows,
output_rows,
}
}
}
Expand All @@ -108,7 +117,7 @@ impl ShuffleWriterExec {
plan,
work_dir,
shuffle_output_partitioning,
metrics: ShuffleWriteMetrics::new(),
metrics: ExecutionPlanMetricsSet::new(),
})
}

Expand Down Expand Up @@ -139,9 +148,11 @@ impl ShuffleWriterExec {
path.push(&self.job_id);
path.push(&format!("{}", self.stage_id));

let write_metrics = ShuffleWriteMetrics::new(input_partition, &self.metrics);

match &self.shuffle_output_partitioning {
None => {
let start = Instant::now();
let timer = write_metrics.write_time.timer();
path.push(&format!("{}", input_partition));
std::fs::create_dir_all(&path)?;
path.push("data.arrow");
Expand All @@ -152,18 +163,18 @@ impl ShuffleWriterExec {
let stats = utils::write_stream_to_disk(
&mut stream,
path,
self.metrics.write_time.clone(),
&write_metrics.write_time,
)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;

self.metrics
write_metrics
.input_rows
.add(stats.num_rows.unwrap_or(0) as usize);
self.metrics
write_metrics
.output_rows
.add(stats.num_rows.unwrap_or(0) as usize);
self.metrics.write_time.add_elapsed(start);
timer.done();

info!(
"Executed partition {} in {} seconds. Statistics: {}",
Expand Down Expand Up @@ -197,7 +208,7 @@ impl ShuffleWriterExec {
while let Some(result) = stream.next().await {
let input_batch = result?;

self.metrics.input_rows.add(input_batch.num_rows());
write_metrics.input_rows.add(input_batch.num_rows());

let arrays = exprs
.iter()
Expand Down Expand Up @@ -239,7 +250,7 @@ impl ShuffleWriterExec {

//TODO optimize so we don't write or fetch empty partitions
//if output_batch.num_rows() > 0 {
let start = Instant::now();
let timer = write_metrics.write_time.timer();
match &mut writers[output_partition] {
Some(w) => {
w.write(&output_batch)?;
Expand All @@ -260,9 +271,8 @@ impl ShuffleWriterExec {
writers[output_partition] = Some(writer);
}
}
self.metrics.output_rows.add(output_batch.num_rows());
self.metrics.write_time.add_elapsed(start);
//}
write_metrics.output_rows.add(output_batch.num_rows());
timer.done();
}
}

Expand Down Expand Up @@ -388,12 +398,8 @@ impl ExecutionPlan for ShuffleWriterExec {
Ok(Box::pin(MemoryStream::try_new(vec![batch], schema, None)?))
}

fn metrics(&self) -> HashMap<String, SQLMetric> {
let mut metrics = HashMap::new();
metrics.insert("inputRows".to_owned(), (*self.metrics.input_rows).clone());
metrics.insert("outputRows".to_owned(), (*self.metrics.output_rows).clone());
metrics.insert("writeTime".to_owned(), (*self.metrics.write_time).clone());
metrics
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn fmt_as(
Expand Down
12 changes: 6 additions & 6 deletions ballista/rust/core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use datafusion::physical_plan::parquet::ParquetExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::sort::SortExec;
use datafusion::physical_plan::{
AggregateExpr, ExecutionPlan, PhysicalExpr, RecordBatchStream, SQLMetric,
metrics, AggregateExpr, ExecutionPlan, Metric, PhysicalExpr, RecordBatchStream,
};
use futures::{future, Stream, StreamExt};
use std::time::Instant;
Expand All @@ -71,7 +71,7 @@ use std::time::Instant;
pub async fn write_stream_to_disk(
stream: &mut Pin<Box<dyn RecordBatchStream + Send + Sync>>,
path: &str,
disk_write_metric: Arc<SQLMetric>,
disk_write_metric: &metrics::Time,
) -> Result<PartitionStats> {
let file = File::create(&path).map_err(|e| {
BallistaError::General(format!(
Expand All @@ -97,13 +97,13 @@ pub async fn write_stream_to_disk(
num_rows += batch.num_rows();
num_bytes += batch_size_bytes;

let start = Instant::now();
let timer = disk_write_metric.timer();
writer.write(&batch)?;
disk_write_metric.add_elapsed(start);
timer.done();
}
let start = Instant::now();
let timer = disk_write_metric.timer();
writer.finish()?;
disk_write_metric.add_elapsed(start);
timer.done();
Ok(PartitionStats::new(
Some(num_rows as u64),
Some(num_batches),
Expand Down
13 changes: 8 additions & 5 deletions datafusion/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,24 +110,25 @@ mod tests {

use super::*;
use crate::datasource::datasource::Statistics;
use crate::physical_plan::parquet::{
ParquetExec, ParquetExecMetrics, ParquetPartition,
};
use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
use crate::physical_plan::parquet::{ParquetExec, ParquetPartition};
use crate::physical_plan::projection::ProjectionExec;

#[test]
fn added_repartition_to_single_partition() -> Result<()> {
let schema = Arc::new(Schema::empty());
let metrics = ExecutionPlanMetricsSet::new();
let parquet_project = ProjectionExec::try_new(
vec![],
Arc::new(ParquetExec::new(
vec![ParquetPartition::new(
vec!["x".to_string()],
Statistics::default(),
metrics.clone(),
)],
schema,
None,
ParquetExecMetrics::new(),
metrics,
None,
2048,
None,
Expand All @@ -154,6 +155,7 @@ mod tests {
#[test]
fn repartition_deepest_node() -> Result<()> {
let schema = Arc::new(Schema::empty());
let metrics = ExecutionPlanMetricsSet::new();
let parquet_project = ProjectionExec::try_new(
vec![],
Arc::new(ProjectionExec::try_new(
Expand All @@ -162,10 +164,11 @@ mod tests {
vec![ParquetPartition::new(
vec!["x".to_string()],
Statistics::default(),
metrics.clone(),
)],
schema,
None,
ParquetExecMetrics::new(),
metrics,
None,
2048,
None,
Expand Down
8 changes: 8 additions & 0 deletions datafusion/src/physical_plan/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,14 @@ impl ExecutionPlan for AnalyzeExec {
// Verbose output
// TODO make this more sophisticated
if verbose {
type_builder.append_value("Plan with Full Metrics").unwrap();

let annotated_plan =
DisplayableExecutionPlan::with_full_metrics(captured_input.as_ref())
.indent()
.to_string();
plan_builder.append_value(annotated_plan).unwrap();

type_builder.append_value("Output Rows").unwrap();
plan_builder.append_value(total_rows.to_string()).unwrap();

Expand Down
Loading

0 comments on commit ff41c08

Please sign in to comment.