diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml index bedc0973e6ad..3a89c75a5cd7 100644 --- a/ballista/rust/core/Cargo.toml +++ b/ballista/rust/core/Cargo.toml @@ -33,6 +33,7 @@ simd = ["datafusion/simd"] ahash = "0.7" async-trait = "0.1.36" futures = "0.3" +hashbrown = "0.11" log = "0.4" prost = "0.7" serde = {version = "1", features = ["derive"]} diff --git a/ballista/rust/core/src/execution_plans/shuffle_reader.rs b/ballista/rust/core/src/execution_plans/shuffle_reader.rs index 9ab064115ace..db03d3ddf080 100644 --- a/ballista/rust/core/src/execution_plans/shuffle_reader.rs +++ b/ballista/rust/core/src/execution_plans/shuffle_reader.rs @@ -28,13 +28,17 @@ use async_trait::async_trait; use datafusion::arrow::datatypes::SchemaRef; use datafusion::arrow::error::Result as ArrowResult; use datafusion::arrow::record_batch::RecordBatch; -use datafusion::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning}; +use datafusion::physical_plan::{ + DisplayFormatType, ExecutionPlan, Partitioning, SQLMetric, +}; use datafusion::{ error::{DataFusionError, Result}, physical_plan::RecordBatchStream, }; use futures::{future, Stream, StreamExt}; +use hashbrown::HashMap; use log::info; +use std::time::Instant; /// ShuffleReaderExec reads partitions that have already been materialized by a ShuffleWriterExec /// being executed by an executor @@ -43,6 +47,8 @@ pub struct ShuffleReaderExec { /// Each partition of a shuffle can read data from multiple locations pub(crate) partition: Vec>, pub(crate) schema: SchemaRef, + /// Time to fetch data from executor + fetch_time: Arc, } impl ShuffleReaderExec { @@ -51,7 +57,11 @@ impl ShuffleReaderExec { partition: Vec>, schema: SchemaRef, ) -> Result { - Ok(Self { partition, schema }) + Ok(Self { + partition, + schema, + fetch_time: SQLMetric::time_nanos(), + }) } } @@ -88,11 +98,13 @@ impl ExecutionPlan for ShuffleReaderExec { ) -> Result>> { info!("ShuffleReaderExec::execute({})", partition); + let start = Instant::now(); let partition_locations = &self.partition[partition]; let result = future::join_all(partition_locations.iter().map(fetch_partition)) .await .into_iter() .collect::>>()?; + self.fetch_time.add_elapsed(start); let result = WrappedStream::new( Box::pin(futures::stream::iter(result).flatten()), @@ -115,7 +127,7 @@ impl ExecutionPlan for ShuffleReaderExec { x.iter() .map(|l| { format!( - "[executor={} part={}:{}:{} stats={:?}]", + "[executor={} part={}:{}:{} stats={}]", l.executor_meta.id, l.partition_id.job_id, l.partition_id.stage_id, @@ -127,11 +139,17 @@ impl ExecutionPlan for ShuffleReaderExec { .join(",") }) .collect::>() - .join("\n"); + .join(", "); write!(f, "ShuffleReaderExec: partition_locations={}", loc_str) } } } + + fn metrics(&self) -> HashMap { + let mut metrics = HashMap::new(); + metrics.insert("fetchTime".to_owned(), (*self.fetch_time).clone()); + metrics + } } async fn fetch_partition( diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs b/ballista/rust/core/src/execution_plans/shuffle_writer.rs index 7fffaba13217..92b4448a69ec 100644 --- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs @@ -20,6 +20,7 @@ //! partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query //! will use the ShuffleReaderExec to read these results. +use std::fs::File; use std::iter::Iterator; use std::path::PathBuf; use std::sync::{Arc, Mutex}; @@ -43,11 +44,11 @@ use datafusion::arrow::record_batch::RecordBatch; use datafusion::error::{DataFusionError, Result}; use datafusion::physical_plan::hash_join::create_hashes; use datafusion::physical_plan::{ - DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, + DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SQLMetric, }; use futures::StreamExt; +use hashbrown::HashMap; use log::info; -use std::fs::File; use uuid::Uuid; /// ShuffleWriterExec represents a section of a query plan that has consistent partitioning and @@ -66,6 +67,22 @@ pub struct ShuffleWriterExec { work_dir: String, /// Optional shuffle output partitioning shuffle_output_partitioning: Option, + /// Shuffle write metrics + metrics: ShuffleWriteMetrics, +} + +#[derive(Debug, Clone)] +struct ShuffleWriteMetrics { + /// Time spend writing batches to shuffle files + write_time: Arc, +} + +impl ShuffleWriteMetrics { + fn new() -> Self { + Self { + write_time: SQLMetric::time_nanos(), + } + } } impl ShuffleWriterExec { @@ -83,6 +100,7 @@ impl ShuffleWriterExec { plan, work_dir, shuffle_output_partitioning, + metrics: ShuffleWriteMetrics::new(), }) } @@ -150,12 +168,16 @@ impl ExecutionPlan for ShuffleWriterExec { info!("Writing results to {}", path); // stream results to disk - let stats = utils::write_stream_to_disk(&mut stream, path) - .await - .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?; + let stats = utils::write_stream_to_disk( + &mut stream, + path, + self.metrics.write_time.clone(), + ) + .await + .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?; info!( - "Executed partition {} in {} seconds. Statistics: {:?}", + "Executed partition {} in {} seconds. Statistics: {}", partition, now.elapsed().as_secs(), stats @@ -231,6 +253,7 @@ impl ExecutionPlan for ShuffleWriterExec { RecordBatch::try_new(input_batch.schema(), columns)?; // write batch out + let start = Instant::now(); match &mut writers[num_output_partition] { Some(w) => { w.write(&output_batch)?; @@ -251,6 +274,7 @@ impl ExecutionPlan for ShuffleWriterExec { writers[num_output_partition] = Some(writer); } } + self.metrics.write_time.add_elapsed(start); } } @@ -310,6 +334,12 @@ impl ExecutionPlan for ShuffleWriterExec { } } + fn metrics(&self) -> HashMap { + let mut metrics = HashMap::new(); + metrics.insert("writeTime".to_owned(), (*self.metrics.write_time).clone()); + metrics + } + fn fmt_as( &self, t: DisplayFormatType, diff --git a/ballista/rust/core/src/serde/scheduler/mod.rs b/ballista/rust/core/src/serde/scheduler/mod.rs index f66bb08189d2..cbe1a31227c6 100644 --- a/ballista/rust/core/src/serde/scheduler/mod.rs +++ b/ballista/rust/core/src/serde/scheduler/mod.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, fmt, sync::Arc}; use datafusion::arrow::array::{ ArrayBuilder, ArrayRef, StructArray, StructBuilder, UInt64Array, UInt64Builder, @@ -113,6 +113,16 @@ impl Default for PartitionStats { } } +impl fmt::Display for PartitionStats { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "numBatches={:?}, numRows={:?}, numBytes={:?}", + self.num_batches, self.num_rows, self.num_bytes + ) + } +} + impl PartitionStats { pub fn new( num_rows: Option, diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs index 8a510f480876..f7d884d50298 100644 --- a/ballista/rust/core/src/utils.rs +++ b/ballista/rust/core/src/utils.rs @@ -53,15 +53,17 @@ use datafusion::physical_plan::parquet::ParquetExec; use datafusion::physical_plan::projection::ProjectionExec; use datafusion::physical_plan::sort::SortExec; use datafusion::physical_plan::{ - AggregateExpr, ExecutionPlan, PhysicalExpr, RecordBatchStream, + AggregateExpr, ExecutionPlan, PhysicalExpr, RecordBatchStream, SQLMetric, }; use futures::{future, Stream, StreamExt}; +use std::time::Instant; /// Stream data to disk in Arrow IPC format pub async fn write_stream_to_disk( stream: &mut Pin>, path: &str, + disk_write_metric: Arc, ) -> Result { let file = File::create(&path).map_err(|e| { BallistaError::General(format!( @@ -86,9 +88,14 @@ pub async fn write_stream_to_disk( num_batches += 1; num_rows += batch.num_rows(); num_bytes += batch_size_bytes; + + let start = Instant::now(); writer.write(&batch)?; + disk_write_metric.add_elapsed(start); } + let start = Instant::now(); writer.finish()?; + disk_write_metric.add_elapsed(start); Ok(PartitionStats::new( Some(num_rows as u64), Some(num_batches), diff --git a/ballista/rust/executor/src/executor.rs b/ballista/rust/executor/src/executor.rs index 86aaa7e9f495..4a75448b5f06 100644 --- a/ballista/rust/executor/src/executor.rs +++ b/ballista/rust/executor/src/executor.rs @@ -23,6 +23,7 @@ use ballista_core::error::BallistaError; use ballista_core::execution_plans::ShuffleWriterExec; use ballista_core::utils; use datafusion::arrow::record_batch::RecordBatch; +use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion::physical_plan::ExecutionPlan; /// Ballista executor @@ -60,6 +61,14 @@ impl Executor { )?; let mut stream = exec.execute(part).await?; let batches = utils::collect_stream(&mut stream).await?; + + println!( + "=== Physical plan with metrics ===\n{}\n", + DisplayableExecutionPlan::with_metrics(&exec) + .indent() + .to_string() + ); + // the output should be a single batch containing metadata (path and statistics) assert!(batches.len() == 1); Ok(batches[0].clone())