Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Ballista to use new physical plan formatter utility #344

Merged
merged 2 commits into from
May 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 0 additions & 95 deletions ballista/rust/core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,101 +102,6 @@ pub async fn collect_stream(
Ok(batches)
}

pub fn format_plan(plan: &dyn ExecutionPlan, indent: usize) -> Result<String> {
let operator_str =
if let Some(exec) = plan.as_any().downcast_ref::<HashAggregateExec>() {
format!(
"HashAggregateExec: groupBy={:?}, aggrExpr={:?}",
exec.group_expr()
.iter()
.map(|e| format_expr(e.0.as_ref()))
.collect::<Vec<String>>(),
exec.aggr_expr()
.iter()
.map(|e| format_agg_expr(e.as_ref()))
.collect::<Result<Vec<String>>>()?
)
} else if let Some(exec) = plan.as_any().downcast_ref::<HashJoinExec>() {
format!(
"HashJoinExec: joinType={:?}, on={:?}",
exec.join_type(),
exec.on()
)
} else if let Some(exec) = plan.as_any().downcast_ref::<ParquetExec>() {
let mut num_files = 0;
for part in exec.partitions() {
num_files += part.filenames().len();
}
format!(
"ParquetExec: partitions={}, files={}",
exec.partitions().len(),
num_files
)
} else if let Some(exec) = plan.as_any().downcast_ref::<CsvExec>() {
format!(
"CsvExec: {}; partitions={}",
&exec.path(),
exec.output_partitioning().partition_count()
)
} else if let Some(exec) = plan.as_any().downcast_ref::<FilterExec>() {
format!("FilterExec: {}", format_expr(exec.predicate().as_ref()))
} else if let Some(exec) = plan.as_any().downcast_ref::<QueryStageExec>() {
format!(
"QueryStageExec: job={}, stage={}",
exec.job_id, exec.stage_id
)
} else if let Some(exec) = plan.as_any().downcast_ref::<UnresolvedShuffleExec>() {
format!("UnresolvedShuffleExec: stages={:?}", exec.query_stage_ids)
} else if let Some(exec) = plan.as_any().downcast_ref::<CoalesceBatchesExec>() {
format!(
"CoalesceBatchesExec: batchSize={}",
exec.target_batch_size()
)
} else if plan.as_any().downcast_ref::<MergeExec>().is_some() {
"MergeExec".to_string()
} else {
let str = format!("{:?}", plan);
String::from(&str[0..120])
};

let children_str = plan
.children()
.iter()
.map(|c| format_plan(c.as_ref(), indent + 1))
.collect::<Result<Vec<String>>>()?
.join("\n");

let indent_str = " ".repeat(indent);
if plan.children().is_empty() {
Ok(format!("{}{}{}", indent_str, &operator_str, children_str))
} else {
Ok(format!("{}{}\n{}", indent_str, &operator_str, children_str))
}
}

pub fn format_agg_expr(expr: &dyn AggregateExpr) -> Result<String> {
Ok(format!(
"{} {:?}",
expr.field()?.name(),
expr.expressions()
.iter()
.map(|e| format_expr(e.as_ref()))
.collect::<Vec<String>>()
))
}

pub fn format_expr(expr: &dyn PhysicalExpr) -> String {
if let Some(e) = expr.as_any().downcast_ref::<Column>() {
e.name().to_string()
} else if let Some(e) = expr.as_any().downcast_ref::<Literal>() {
e.to_string()
} else if let Some(e) = expr.as_any().downcast_ref::<BinaryExpr>() {
format!("{} {} {}", e.left(), e.op(), e.right())
} else {
format!("{}", expr)
}
}

pub fn produce_diagram(filename: &str, stages: &[Arc<QueryStageExec>]) -> Result<()> {
let write_file = File::create(filename)?;
let mut w = BufWriter::new(&write_file);
Expand Down
6 changes: 3 additions & 3 deletions ballista/rust/executor/src/flight_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::time::Instant;
use ballista_core::error::BallistaError;
use ballista_core::serde::decode_protobuf;
use ballista_core::serde::scheduler::{Action as BallistaAction, PartitionStats};
use ballista_core::utils::{self, format_plan};
use ballista_core::utils;

use arrow::array::{ArrayRef, StringBuilder};
use arrow::datatypes::{DataType, Field, Schema};
Expand All @@ -40,6 +40,7 @@ use arrow_flight::{
PutResult, SchemaResult, Ticket,
};
use datafusion::error::DataFusionError;
use datafusion::physical_plan::displayable;
use futures::{Stream, StreamExt};
use log::{info, warn};
use std::io::{Read, Seek};
Expand Down Expand Up @@ -97,8 +98,7 @@ impl FlightService for BallistaFlightService {
partition.job_id,
partition.stage_id,
partition.partition_id,
format_plan(partition.plan.as_ref(), 0)
.map_err(|e| from_ballista_err(&e))?
displayable(partition.plan.as_ref()).indent().to_string()
);

let mut tasks: Vec<JoinHandle<Result<_, BallistaError>>> = vec![];
Expand Down
5 changes: 2 additions & 3 deletions ballista/rust/scheduler/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,9 @@ mod test {
use ballista_core::error::BallistaError;
use ballista_core::execution_plans::UnresolvedShuffleExec;
use ballista_core::serde::protobuf;
use ballista_core::utils::format_plan;
use datafusion::physical_plan::hash_aggregate::HashAggregateExec;
use datafusion::physical_plan::sort::SortExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::{displayable, ExecutionPlan};
use datafusion::physical_plan::{merge::MergeExec, projection::ProjectionExec};
use std::convert::TryInto;
use std::sync::Arc;
Expand Down Expand Up @@ -270,7 +269,7 @@ mod test {
let job_uuid = Uuid::new_v4();
let stages = planner.plan_query_stages(&job_uuid.to_string(), plan)?;
for stage in &stages {
println!("{}", format_plan(stage.as_ref(), 0)?);
println!("{}", displayable(stage.as_ref()).indent().to_string());
}

/* Expected result:
Expand Down