Skip to content

Commit

Permalink
Implement metrics for shuffle read and write (#676)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove authored Jul 5, 2021
1 parent e036a62 commit 58da159
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 12 deletions.
1 change: 1 addition & 0 deletions ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ simd = ["datafusion/simd"]
ahash = "0.7"
async-trait = "0.1.36"
futures = "0.3"
hashbrown = "0.11"
log = "0.4"
prost = "0.7"
serde = {version = "1", features = ["derive"]}
Expand Down
26 changes: 22 additions & 4 deletions ballista/rust/core/src/execution_plans/shuffle_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,17 @@ use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::error::Result as ArrowResult;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning};
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SQLMetric,
};
use datafusion::{
error::{DataFusionError, Result},
physical_plan::RecordBatchStream,
};
use futures::{future, Stream, StreamExt};
use hashbrown::HashMap;
use log::info;
use std::time::Instant;

/// ShuffleReaderExec reads partitions that have already been materialized by a ShuffleWriterExec
/// being executed by an executor
Expand All @@ -43,6 +47,8 @@ pub struct ShuffleReaderExec {
/// Each partition of a shuffle can read data from multiple locations
pub(crate) partition: Vec<Vec<PartitionLocation>>,
pub(crate) schema: SchemaRef,
/// Time to fetch data from executor
fetch_time: Arc<SQLMetric>,
}

impl ShuffleReaderExec {
Expand All @@ -51,7 +57,11 @@ impl ShuffleReaderExec {
partition: Vec<Vec<PartitionLocation>>,
schema: SchemaRef,
) -> Result<Self> {
Ok(Self { partition, schema })
Ok(Self {
partition,
schema,
fetch_time: SQLMetric::time_nanos(),
})
}
}

Expand Down Expand Up @@ -88,11 +98,13 @@ impl ExecutionPlan for ShuffleReaderExec {
) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
info!("ShuffleReaderExec::execute({})", partition);

let start = Instant::now();
let partition_locations = &self.partition[partition];
let result = future::join_all(partition_locations.iter().map(fetch_partition))
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
self.fetch_time.add_elapsed(start);

let result = WrappedStream::new(
Box::pin(futures::stream::iter(result).flatten()),
Expand All @@ -115,7 +127,7 @@ impl ExecutionPlan for ShuffleReaderExec {
x.iter()
.map(|l| {
format!(
"[executor={} part={}:{}:{} stats={:?}]",
"[executor={} part={}:{}:{} stats={}]",
l.executor_meta.id,
l.partition_id.job_id,
l.partition_id.stage_id,
Expand All @@ -127,11 +139,17 @@ impl ExecutionPlan for ShuffleReaderExec {
.join(",")
})
.collect::<Vec<String>>()
.join("\n");
.join(", ");
write!(f, "ShuffleReaderExec: partition_locations={}", loc_str)
}
}
}

fn metrics(&self) -> HashMap<String, SQLMetric> {
let mut metrics = HashMap::new();
metrics.insert("fetchTime".to_owned(), (*self.fetch_time).clone());
metrics
}
}

async fn fetch_partition(
Expand Down
42 changes: 36 additions & 6 deletions ballista/rust/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
//! partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query
//! will use the ShuffleReaderExec to read these results.
use std::fs::File;
use std::iter::Iterator;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
Expand All @@ -43,11 +44,11 @@ use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError, Result};
use datafusion::physical_plan::hash_join::create_hashes;
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SQLMetric,
};
use futures::StreamExt;
use hashbrown::HashMap;
use log::info;
use std::fs::File;
use uuid::Uuid;

/// ShuffleWriterExec represents a section of a query plan that has consistent partitioning and
Expand All @@ -66,6 +67,22 @@ pub struct ShuffleWriterExec {
work_dir: String,
/// Optional shuffle output partitioning
shuffle_output_partitioning: Option<Partitioning>,
/// Shuffle write metrics
metrics: ShuffleWriteMetrics,
}

#[derive(Debug, Clone)]
struct ShuffleWriteMetrics {
/// Time spend writing batches to shuffle files
write_time: Arc<SQLMetric>,
}

impl ShuffleWriteMetrics {
fn new() -> Self {
Self {
write_time: SQLMetric::time_nanos(),
}
}
}

impl ShuffleWriterExec {
Expand All @@ -83,6 +100,7 @@ impl ShuffleWriterExec {
plan,
work_dir,
shuffle_output_partitioning,
metrics: ShuffleWriteMetrics::new(),
})
}

Expand Down Expand Up @@ -150,12 +168,16 @@ impl ExecutionPlan for ShuffleWriterExec {
info!("Writing results to {}", path);

// stream results to disk
let stats = utils::write_stream_to_disk(&mut stream, path)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
let stats = utils::write_stream_to_disk(
&mut stream,
path,
self.metrics.write_time.clone(),
)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;

info!(
"Executed partition {} in {} seconds. Statistics: {:?}",
"Executed partition {} in {} seconds. Statistics: {}",
partition,
now.elapsed().as_secs(),
stats
Expand Down Expand Up @@ -231,6 +253,7 @@ impl ExecutionPlan for ShuffleWriterExec {
RecordBatch::try_new(input_batch.schema(), columns)?;

// write batch out
let start = Instant::now();
match &mut writers[num_output_partition] {
Some(w) => {
w.write(&output_batch)?;
Expand All @@ -251,6 +274,7 @@ impl ExecutionPlan for ShuffleWriterExec {
writers[num_output_partition] = Some(writer);
}
}
self.metrics.write_time.add_elapsed(start);
}
}

Expand Down Expand Up @@ -310,6 +334,12 @@ impl ExecutionPlan for ShuffleWriterExec {
}
}

fn metrics(&self) -> HashMap<String, SQLMetric> {
let mut metrics = HashMap::new();
metrics.insert("writeTime".to_owned(), (*self.metrics.write_time).clone());
metrics
}

fn fmt_as(
&self,
t: DisplayFormatType,
Expand Down
12 changes: 11 additions & 1 deletion ballista/rust/core/src/serde/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::{collections::HashMap, sync::Arc};
use std::{collections::HashMap, fmt, sync::Arc};

use datafusion::arrow::array::{
ArrayBuilder, ArrayRef, StructArray, StructBuilder, UInt64Array, UInt64Builder,
Expand Down Expand Up @@ -113,6 +113,16 @@ impl Default for PartitionStats {
}
}

impl fmt::Display for PartitionStats {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"numBatches={:?}, numRows={:?}, numBytes={:?}",
self.num_batches, self.num_rows, self.num_bytes
)
}
}

impl PartitionStats {
pub fn new(
num_rows: Option<u64>,
Expand Down
9 changes: 8 additions & 1 deletion ballista/rust/core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,17 @@ use datafusion::physical_plan::parquet::ParquetExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::sort::SortExec;
use datafusion::physical_plan::{
AggregateExpr, ExecutionPlan, PhysicalExpr, RecordBatchStream,
AggregateExpr, ExecutionPlan, PhysicalExpr, RecordBatchStream, SQLMetric,
};
use futures::{future, Stream, StreamExt};
use std::time::Instant;

/// Stream data to disk in Arrow IPC format
pub async fn write_stream_to_disk(
stream: &mut Pin<Box<dyn RecordBatchStream + Send + Sync>>,
path: &str,
disk_write_metric: Arc<SQLMetric>,
) -> Result<PartitionStats> {
let file = File::create(&path).map_err(|e| {
BallistaError::General(format!(
Expand All @@ -86,9 +88,14 @@ pub async fn write_stream_to_disk(
num_batches += 1;
num_rows += batch.num_rows();
num_bytes += batch_size_bytes;

let start = Instant::now();
writer.write(&batch)?;
disk_write_metric.add_elapsed(start);
}
let start = Instant::now();
writer.finish()?;
disk_write_metric.add_elapsed(start);
Ok(PartitionStats::new(
Some(num_rows as u64),
Some(num_batches),
Expand Down
9 changes: 9 additions & 0 deletions ballista/rust/executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use ballista_core::error::BallistaError;
use ballista_core::execution_plans::ShuffleWriterExec;
use ballista_core::utils;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::ExecutionPlan;

/// Ballista executor
Expand Down Expand Up @@ -60,6 +61,14 @@ impl Executor {
)?;
let mut stream = exec.execute(part).await?;
let batches = utils::collect_stream(&mut stream).await?;

println!(
"=== Physical plan with metrics ===\n{}\n",
DisplayableExecutionPlan::with_metrics(&exec)
.indent()
.to_string()
);

// the output should be a single batch containing metadata (path and statistics)
assert!(batches.len() == 1);
Ok(batches[0].clone())
Expand Down

0 comments on commit 58da159

Please sign in to comment.