Skip to content

Commit

Permalink
Add partition_mode to Ballista serde
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Jun 27, 2021
1 parent f69d680 commit 6c9ffcd
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 19 deletions.
7 changes: 6 additions & 1 deletion ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -565,12 +565,17 @@ message CsvScanExecNode {
repeated string filename = 8;
}

enum PartitionMode {
COLLECT_LEFT = 0;
PARTITIONED = 1;
}

message HashJoinExecNode {
PhysicalPlanNode left = 1;
PhysicalPlanNode right = 2;
repeated JoinOn on = 3;
JoinType join_type = 4;

PartitionMode partition_mode = 6;
}

message PhysicalColumn {
Expand Down
11 changes: 10 additions & 1 deletion ballista/rust/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ use datafusion::arrow::ipc::writer::FileWriter;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError, Result};
use datafusion::physical_plan::hash_join::create_hashes;
use datafusion::physical_plan::{ExecutionPlan, Partitioning, RecordBatchStream};
use datafusion::physical_plan::{
displayable, ExecutionPlan, Partitioning, RecordBatchStream,
};
use futures::StreamExt;
use log::info;
use std::fs::File;
Expand Down Expand Up @@ -133,6 +135,13 @@ impl ExecutionPlan for ShuffleWriterExec {
) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
let now = Instant::now();

let displayable_plan = displayable(self.plan.as_ref());
info!(
"ShuffleWriterExec executing partition {}\n{}",
partition,
displayable_plan.indent()
);

let mut stream = self.plan.execute(partition).await?;

let mut path = PathBuf::from(&self.work_dir);
Expand Down
1 change: 1 addition & 0 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use datafusion::logical_plan::{
};
use datafusion::physical_plan::aggregates::AggregateFunction;
use datafusion::physical_plan::functions::BuiltinScalarFunction;
use datafusion::physical_plan::hash_join::PartitionMode;
use datafusion::physical_plan::window_functions::{
BuiltInWindowFunction, WindowFunction,
};
Expand Down
14 changes: 13 additions & 1 deletion ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,12 +356,24 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
protobuf::JoinType::Semi => JoinType::Semi,
protobuf::JoinType::Anti => JoinType::Anti,
};
let partition_mode =
protobuf::PartitionMode::from_i32(hashjoin.partition_mode)
.ok_or_else(|| {
proto_error(format!(
"Received a HashJoinNode message with unknown PartitionMode {}",
hashjoin.partition_mode
))
})?;
let partition_mode = match partition_mode {
protobuf::PartitionMode::CollectLeft => PartitionMode::CollectLeft,
protobuf::PartitionMode::Partitioned => PartitionMode::Partitioned,
};
Ok(Arc::new(HashJoinExec::try_new(
left,
right,
on,
&join_type,
PartitionMode::CollectLeft,
partition_mode,
)?))
}
PhysicalPlanType::ShuffleReader(shuffle_reader) => {
Expand Down
30 changes: 23 additions & 7 deletions ballista/rust/core/src/serde/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,29 @@ mod roundtrip_tests {
Column::new("col", schema_right.index_of("col")?),
)];

roundtrip_test(Arc::new(HashJoinExec::try_new(
Arc::new(EmptyExec::new(false, Arc::new(schema_left))),
Arc::new(EmptyExec::new(false, Arc::new(schema_right))),
on,
&JoinType::Inner,
PartitionMode::CollectLeft,
)?))
let schema_left = Arc::new(schema_left);
let schema_right = Arc::new(schema_right);
for join_type in vec![
JoinType::Inner,
JoinType::Left,
JoinType::Right,
JoinType::Full,
JoinType::Anti,
JoinType::Semi,
] {
for partition_mode in
vec![PartitionMode::Partitioned, PartitionMode::CollectLeft]
{
roundtrip_test(Arc::new(HashJoinExec::try_new(
Arc::new(EmptyExec::new(false, schema_left.clone())),
Arc::new(EmptyExec::new(false, schema_right.clone())),
on.clone(),
&join_type,
partition_mode,
)?))?;
}
}
Ok(())
}

#[test]
Expand Down
7 changes: 6 additions & 1 deletion ballista/rust/core/src/serde/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use datafusion::physical_plan::expressions::{
use datafusion::physical_plan::expressions::{CastExpr, TryCastExpr};
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::hash_aggregate::AggregateMode;
use datafusion::physical_plan::hash_join::HashJoinExec;
use datafusion::physical_plan::hash_join::{HashJoinExec, PartitionMode};
use datafusion::physical_plan::hash_utils::JoinType;
use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion::physical_plan::parquet::ParquetExec;
Expand Down Expand Up @@ -143,13 +143,18 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
JoinType::Semi => protobuf::JoinType::Semi,
JoinType::Anti => protobuf::JoinType::Anti,
};
let partition_mode = match exec.partition_mode() {
PartitionMode::CollectLeft => protobuf::PartitionMode::CollectLeft,
PartitionMode::Partitioned => protobuf::PartitionMode::Partitioned,
};
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new(
protobuf::HashJoinExecNode {
left: Some(Box::new(left)),
right: Some(Box::new(right)),
on,
join_type: join_type.into(),
partition_mode: partition_mode.into(),
},
))),
})
Expand Down
6 changes: 5 additions & 1 deletion ballista/rust/core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,11 @@ fn build_exec_plan_diagram(
.is_some()
{
"CoalesceBatchesExec"
} else if plan.as_any().downcast_ref::<CoalescePartitionsExec>().is_some() {
} else if plan
.as_any()
.downcast_ref::<CoalescePartitionsExec>()
.is_some()
{
"MergeExec"
} else {
println!("Unknown: {:?}", plan);
Expand Down
25 changes: 19 additions & 6 deletions ballista/rust/scheduler/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,12 @@ impl DistributedPlanner {
info!("planning query stages");
let (new_plan, mut stages) =
self.plan_query_stages_internal(job_id, execution_plan)?;
stages.push(create_shuffle_writer(job_id, self.next_stage_id(), new_plan, None)?);
stages.push(create_shuffle_writer(
job_id,
self.next_stage_id(),
new_plan,
None,
)?);
Ok(stages)
}

Expand Down Expand Up @@ -96,28 +101,34 @@ impl DistributedPlanner {
if let Some(adapter) = execution_plan.as_any().downcast_ref::<DfTableAdapter>() {
let ctx = ExecutionContext::new();
Ok((ctx.create_physical_plan(&adapter.logical_plan)?, stages))
} else if let Some(coalesce) = execution_plan.as_any().downcast_ref::<CoalescePartitionsExec>() {
} else if let Some(coalesce) = execution_plan
.as_any()
.downcast_ref::<CoalescePartitionsExec>()
{
let query_stage = create_shuffle_writer(
job_id,
self.next_stage_id(),
coalesce.children()[0].clone(),
None
None,
)?;
let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
vec![query_stage.stage_id()],
query_stage.schema(),
query_stage.output_partitioning().partition_count(),
));
stages.push(query_stage);
Ok((coalesce.with_new_children(vec![unresolved_shuffle])?, stages))
Ok((
coalesce.with_new_children(vec![unresolved_shuffle])?,
stages,
))
} else if let Some(repart) =
execution_plan.as_any().downcast_ref::<RepartitionExec>()
{
let query_stage = create_shuffle_writer(
job_id,
self.next_stage_id(),
repart.children()[0].clone(),
Some(repart.partitioning().to_owned())
Some(repart.partitioning().to_owned()),
)?;
let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
vec![query_stage.stage_id()],
Expand Down Expand Up @@ -207,7 +218,9 @@ mod test {
use datafusion::physical_plan::hash_aggregate::HashAggregateExec;
use datafusion::physical_plan::sort::SortExec;
use datafusion::physical_plan::{displayable, ExecutionPlan};
use datafusion::physical_plan::{merge::CoalescePartitionsExec, projection::ProjectionExec};
use datafusion::physical_plan::{
merge::CoalescePartitionsExec, projection::ProjectionExec,
};
use std::convert::TryInto;
use std::sync::Arc;
use uuid::Uuid;
Expand Down
5 changes: 5 additions & 0 deletions datafusion/src/physical_plan/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@ impl HashJoinExec {
&self.join_type
}

/// The partitioning mode of this hash join
pub fn partition_mode(&self) -> &PartitionMode {
&self.mode
}

/// Calculates column indices and left/right placement on input / output schemas and jointype
fn column_indices_from_schema(&self) -> ArrowResult<Vec<ColumnIndex>> {
let (primary_is_left, primary_schema, secondary_schema) = match self.join_type {
Expand Down
3 changes: 2 additions & 1 deletion datafusion/src/physical_plan/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,8 @@ mod tests {
// input should have 4 partitions
assert_eq!(csv.output_partitioning().partition_count(), num_partitions);

let limit = GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(Arc::new(csv))), 7);
let limit =
GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(Arc::new(csv))), 7);

// the result should contain 4 batches (one per input partition)
let iter = limit.execute(0).await?;
Expand Down

0 comments on commit 6c9ffcd

Please sign in to comment.