Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ballista: Implement scalable distributed joins #634

Merged
merged 2 commits into from
Jul 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion ballista/rust/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ use datafusion::arrow::ipc::writer::FileWriter;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError, Result};
use datafusion::physical_plan::hash_join::create_hashes;
use datafusion::physical_plan::{ExecutionPlan, Partitioning, RecordBatchStream};
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
};
use futures::StreamExt;
use log::info;
use std::fs::File;
Expand Down Expand Up @@ -307,6 +309,22 @@ impl ExecutionPlan for ShuffleWriterExec {
)),
}
}

fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
write!(
f,
"ShuffleWriterExec: {:?}",
self.shuffle_output_partitioning
)
}
}
}
}

fn result_schema() -> SchemaRef {
Expand Down
15 changes: 14 additions & 1 deletion ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ use crate::serde::scheduler::PartitionLocation;

use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::physical_plan::{ExecutionPlan, Partitioning};
use datafusion::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning};
use datafusion::{
error::{DataFusionError, Result},
physical_plan::RecordBatchStream,
};
use log::info;
use std::fmt::Formatter;

/// UnresolvedShuffleExec represents a dependency on the results of several ShuffleWriterExec nodes which haven't been computed yet.
///
Expand Down Expand Up @@ -97,4 +98,16 @@ impl ExecutionPlan for UnresolvedShuffleExec {
"Ballista UnresolvedShuffleExec does not support execution".to_owned(),
))
}

fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
write!(f, "UnresolvedShuffleExec")
}
}
}
}
11 changes: 1 addition & 10 deletions ballista/rust/core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,16 +227,7 @@ fn build_exec_plan_diagram(

/// Create a DataFusion context that is compatible with Ballista
pub fn create_datafusion_context() -> ExecutionContext {
// remove Repartition rule because that isn't supported yet
let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
Arc::new(CoalesceBatches::new()),
Arc::new(AddCoalescePartitionsExec::new()),
];
let config = ExecutionConfig::new()
.with_concurrency(1)
.with_repartition_joins(false)
.with_repartition_aggregations(false)
.with_physical_optimizer_rules(rules);
let config = ExecutionConfig::new().with_concurrency(2); // TODO: this is hack to enable partitioned joins
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the idea here for later? I guess the repartitioning needs to be applied with concurrency=1 too to avoid inefficient plans?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExecutionContext::with_config(config)
}

Expand Down
189 changes: 95 additions & 94 deletions ballista/rust/scheduler/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,11 @@ use ballista_core::{
execution_plans::{ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec},
serde::scheduler::PartitionLocation,
};
use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches;
use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule;
use datafusion::execution::context::ExecutionContext;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
use datafusion::physical_plan::hash_join::HashJoinExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::windows::WindowAggExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::{ExecutionPlan, Partitioning};
use log::info;

type PartialQueryStageResult = (Arc<dyn ExecutionPlan>, Vec<Arc<ShuffleWriterExec>>);
Expand Down Expand Up @@ -71,13 +67,18 @@ impl DistributedPlanner {
info!("planning query stages");
let (new_plan, mut stages) =
self.plan_query_stages_internal(job_id, execution_plan)?;
stages.push(create_query_stage(job_id, self.next_stage_id(), new_plan)?);
stages.push(create_shuffle_writer(
job_id,
self.next_stage_id(),
new_plan,
None,
)?);
Ok(stages)
}

/// Returns a potentially modified version of the input execution_plan along with the resulting query stages.
/// This function is needed because the input execution_plan might need to be modified, but it might not hold a
/// compelte query stage (its parent might also belong to the same stage)
/// complete query stage (its parent might also belong to the same stage)
fn plan_query_stages_internal(
&mut self,
job_id: &str,
Expand All @@ -98,58 +99,44 @@ impl DistributedPlanner {
}

if let Some(adapter) = execution_plan.as_any().downcast_ref::<DfTableAdapter>() {
// remove Repartition rule because that isn't supported yet
let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
Arc::new(CoalesceBatches::new()),
Arc::new(AddCoalescePartitionsExec::new()),
];
let config = ExecutionConfig::new().with_physical_optimizer_rules(rules);
let ctx = ExecutionContext::with_config(config);
let ctx = ExecutionContext::new();
Ok((ctx.create_physical_plan(&adapter.logical_plan)?, stages))
} else if let Some(merge) = execution_plan
} else if let Some(coalesce) = execution_plan
.as_any()
.downcast_ref::<CoalescePartitionsExec>()
{
let query_stage = create_query_stage(
let query_stage = create_shuffle_writer(
job_id,
self.next_stage_id(),
merge.children()[0].clone(),
coalesce.children()[0].clone(),
None,
)?;
let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
vec![query_stage.stage_id()],
query_stage.schema(),
query_stage.output_partitioning().partition_count(),
));
stages.push(query_stage);
Ok((merge.with_new_children(vec![unresolved_shuffle])?, stages))
} else if let Some(agg) =
execution_plan.as_any().downcast_ref::<HashAggregateExec>()
Ok((
coalesce.with_new_children(vec![unresolved_shuffle])?,
stages,
))
} else if let Some(repart) =
execution_plan.as_any().downcast_ref::<RepartitionExec>()
{
//TODO should insert query stages in more generic way based on partitioning metadata
// and not specifically for this operator
match agg.mode() {
AggregateMode::Final | AggregateMode::FinalPartitioned => {
let mut new_children: Vec<Arc<dyn ExecutionPlan>> = vec![];
for child in &children {
let new_stage = create_query_stage(
job_id,
self.next_stage_id(),
child.clone(),
)?;
new_children.push(Arc::new(UnresolvedShuffleExec::new(
vec![new_stage.stage_id()],
new_stage.schema().clone(),
new_stage.output_partitioning().partition_count(),
)));
stages.push(new_stage);
}
Ok((agg.with_new_children(new_children)?, stages))
}
AggregateMode::Partial => Ok((agg.with_new_children(children)?, stages)),
}
} else if let Some(join) = execution_plan.as_any().downcast_ref::<HashJoinExec>()
{
Ok((join.with_new_children(children)?, stages))
let query_stage = create_shuffle_writer(
job_id,
self.next_stage_id(),
repart.children()[0].clone(),
Some(repart.partitioning().to_owned()),
)?;
let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
vec![query_stage.stage_id()],
query_stage.schema(),
query_stage.output_partitioning().partition_count(),
));
stages.push(query_stage);
Ok((unresolved_shuffle, stages))
} else if let Some(window) =
execution_plan.as_any().downcast_ref::<WindowAggExec>()
{
Expand All @@ -158,25 +145,7 @@ impl DistributedPlanner {
window
)))
} else {
// TODO check for compatible partitioning schema, not just count
if execution_plan.output_partitioning().partition_count()
!= children[0].output_partitioning().partition_count()
{
let mut new_children: Vec<Arc<dyn ExecutionPlan>> = vec![];
for child in &children {
let new_stage =
create_query_stage(job_id, self.next_stage_id(), child.clone())?;
new_children.push(Arc::new(UnresolvedShuffleExec::new(
vec![new_stage.stage_id()],
new_stage.schema().clone(),
new_stage.output_partitioning().partition_count(),
)));
stages.push(new_stage);
}
Ok((execution_plan.with_new_children(new_children)?, stages))
} else {
Ok((execution_plan.with_new_children(children)?, stages))
}
Ok((execution_plan.with_new_children(children)?, stages))
}
}

Expand Down Expand Up @@ -224,17 +193,18 @@ pub fn remove_unresolved_shuffles(
Ok(stage.with_new_children(new_children)?)
}

fn create_query_stage(
fn create_shuffle_writer(
job_id: &str,
stage_id: usize,
plan: Arc<dyn ExecutionPlan>,
partitioning: Option<Partitioning>,
) -> Result<Arc<ShuffleWriterExec>> {
Ok(Arc::new(ShuffleWriterExec::try_new(
job_id.to_owned(),
stage_id,
plan,
"".to_owned(), // executor will decide on the work_dir path
None,
partitioning,
)?))
}

Expand All @@ -245,7 +215,7 @@ mod test {
use ballista_core::error::BallistaError;
use ballista_core::execution_plans::UnresolvedShuffleExec;
use ballista_core::serde::protobuf;
use datafusion::physical_plan::hash_aggregate::HashAggregateExec;
use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
use datafusion::physical_plan::sort::SortExec;
use datafusion::physical_plan::{
coalesce_partitions::CoalescePartitionsExec, projection::ProjectionExec,
Expand All @@ -262,7 +232,7 @@ mod test {
}

#[test]
fn test() -> Result<(), BallistaError> {
fn distributed_hash_aggregate_plan() -> Result<(), BallistaError> {
let mut ctx = datafusion_test_context("testdata")?;

// simplified form of TPC-H query 1
Expand All @@ -285,41 +255,72 @@ mod test {
}

/* Expected result:
ShuffleWriterExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=1
HashAggregateExec: groupBy=["l_returnflag"], aggrExpr=["SUM(l_extendedprice Multiply Int64(1)) [\"l_extendedprice * CAST(1 AS Float64)\"]"]
CsvExec: testdata/lineitem; partitions=2
ShuffleWriterExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=2
CoalescePartitionsExec
UnresolvedShuffleExec: stages=[1]
ShuffleWriterExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=3
SortExec { input: ProjectionExec { expr: [(Column { name: "l_returnflag" }, "l_returnflag"), (Column { name: "SUM(l_ext
ProjectionExec { expr: [(Column { name: "l_returnflag" }, "l_returnflag"), (Column { name: "SUM(l_extendedprice Multip
HashAggregateExec: groupBy=["l_returnflag"], aggrExpr=["SUM(l_extendedprice Multiply Int64(1)) [\"l_extendedprice * CAST(1 AS Float64)\"]"]
UnresolvedShuffleExec: stages=[2]

ShuffleWriterExec: Some(Hash([Column { name: "l_returnflag", index: 0 }], 2))
HashAggregateExec: mode=Partial, gby=[l_returnflag@1 as l_returnflag], aggr=[SUM(l_extendedprice Multiply Int64(1))]
CsvExec: source=Path(testdata/lineitem: [testdata/lineitem/partition0.tbl,testdata/lineitem/partition1.tbl]), has_header=false

ShuffleWriterExec: None
ProjectionExec: expr=[l_returnflag@0 as l_returnflag, SUM(lineitem.l_extendedprice Multiply Int64(1))@1 as sum_disc_price]
HashAggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag], aggr=[SUM(l_extendedprice Multiply Int64(1))]
CoalesceBatchesExec: target_batch_size=4096
RepartitionExec: partitioning=Hash([Column { name: "l_returnflag", index: 0 }], 2)
HashAggregateExec: mode=Partial, gby=[l_returnflag@1 as l_returnflag], aggr=[SUM(l_extendedprice Multiply Int64(1))]
CsvExec: source=Path(testdata/lineitem: [testdata/lineitem/partition0.tbl,testdata/lineitem/partition1.tbl]), has_header=false

ShuffleWriterExec: None
SortExec: [l_returnflag@0 ASC]
CoalescePartitionsExec
UnresolvedShuffleExec
*/

let sort = stages[2].children()[0].clone();
let sort = downcast_exec!(sort, SortExec);
assert_eq!(3, stages.len());

let projection = sort.children()[0].clone();
println!("{:?}", projection);
let projection = downcast_exec!(projection, ProjectionExec);
// verify stage 0
let stage0 = stages[0].children()[0].clone();
let partial_hash = downcast_exec!(stage0, HashAggregateExec);
assert!(*partial_hash.mode() == AggregateMode::Partial);

// verify stage 1
let stage1 = stages[1].children()[0].clone();
let projection = downcast_exec!(stage1, ProjectionExec);
let final_hash = projection.children()[0].clone();
let final_hash = downcast_exec!(final_hash, HashAggregateExec);

let unresolved_shuffle = final_hash.children()[0].clone();
assert!(*final_hash.mode() == AggregateMode::FinalPartitioned);

// verify stage 2
let stage2 = stages[2].children()[0].clone();
let sort = downcast_exec!(stage2, SortExec);
let coalesce_partitions = sort.children()[0].clone();
let coalesce_partitions =
downcast_exec!(coalesce_partitions, CoalescePartitionsExec);
let unresolved_shuffle = coalesce_partitions.children()[0].clone();
let unresolved_shuffle =
downcast_exec!(unresolved_shuffle, UnresolvedShuffleExec);
assert_eq!(unresolved_shuffle.query_stage_ids, vec![2]);

let merge_exec = stages[1].children()[0].clone();
let merge_exec = downcast_exec!(merge_exec, CoalescePartitionsExec);
Ok(())
}

let unresolved_shuffle = merge_exec.children()[0].clone();
let unresolved_shuffle =
downcast_exec!(unresolved_shuffle, UnresolvedShuffleExec);
assert_eq!(unresolved_shuffle.query_stage_ids, vec![1]);
#[test]
fn roundtrip_serde_hash_aggregate() -> Result<(), BallistaError> {
let mut ctx = datafusion_test_context("testdata")?;

// simplified form of TPC-H query 1
let df = ctx.sql(
"select l_returnflag, sum(l_extendedprice * 1) as sum_disc_price
from lineitem
group by l_returnflag
order by l_returnflag",
)?;

let plan = df.to_logical_plan();
let plan = ctx.optimize(&plan)?;
let plan = ctx.create_physical_plan(&plan)?;

let mut planner = DistributedPlanner::new();
let job_uuid = Uuid::new_v4();
let stages = planner.plan_query_stages(&job_uuid.to_string(), plan)?;

let partial_hash = stages[0].children()[0].clone();
let partial_hash_serde = roundtrip_operator(partial_hash.clone())?;
Expand Down
Loading