Skip to content

Commit

Permalink
Only decode plan in LaunchMultiTaskParams once (apache#743)
Browse files Browse the repository at this point in the history
* Only decode plan once

* WIP

* WIP

* Clippy

* Refactor

* Refactor

* Refactor

* Refactor

* Fmt

* clippy

* clippy

* Cleanup

* Reuse querystage exec

* Fmt

* Cleanup

---------

Co-authored-by: Daniël Heres <[email protected]>
  • Loading branch information
Dandandan and Daniël Heres authored Apr 13, 2023
1 parent 48c4c2d commit 4e4842c
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 93 deletions.
72 changes: 39 additions & 33 deletions ballista/core/src/serde/scheduler/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,35 +269,38 @@ impl Into<ExecutorData> for protobuf::ExecutorData {
}
}

impl TryInto<TaskDefinition> for protobuf::TaskDefinition {
impl TryInto<(TaskDefinition, Vec<u8>)> for protobuf::TaskDefinition {
type Error = BallistaError;

fn try_into(self) -> Result<TaskDefinition, Self::Error> {
fn try_into(self) -> Result<(TaskDefinition, Vec<u8>), Self::Error> {
let mut props = HashMap::new();
for kv_pair in self.props {
props.insert(kv_pair.key, kv_pair.value);
}

Ok(TaskDefinition {
task_id: self.task_id as usize,
task_attempt_num: self.task_attempt_num as usize,
job_id: self.job_id,
stage_id: self.stage_id as usize,
stage_attempt_num: self.stage_attempt_num as usize,
partition_id: self.partition_id as usize,
plan: self.plan,
output_partitioning: self.output_partitioning,
session_id: self.session_id,
launch_time: self.launch_time,
props,
})
Ok((
TaskDefinition {
task_id: self.task_id as usize,
task_attempt_num: self.task_attempt_num as usize,
job_id: self.job_id,
stage_id: self.stage_id as usize,
stage_attempt_num: self.stage_attempt_num as usize,
partition_id: self.partition_id as usize,
plan: vec![],
output_partitioning: self.output_partitioning,
session_id: self.session_id,
launch_time: self.launch_time,
props,
},
self.plan,
))
}
}

impl TryInto<Vec<TaskDefinition>> for protobuf::MultiTaskDefinition {
impl TryInto<(Vec<TaskDefinition>, Vec<u8>)> for protobuf::MultiTaskDefinition {
type Error = BallistaError;

fn try_into(self) -> Result<Vec<TaskDefinition>, Self::Error> {
fn try_into(self) -> Result<(Vec<TaskDefinition>, Vec<u8>), Self::Error> {
let mut props = HashMap::new();
for kv_pair in self.props {
props.insert(kv_pair.key, kv_pair.value);
Expand All @@ -312,21 +315,24 @@ impl TryInto<Vec<TaskDefinition>> for protobuf::MultiTaskDefinition {
let launch_time = self.launch_time;
let task_ids = self.task_ids;

Ok(task_ids
.iter()
.map(|task_id| TaskDefinition {
task_id: task_id.task_id as usize,
task_attempt_num: task_id.task_attempt_num as usize,
job_id: job_id.clone(),
stage_id,
stage_attempt_num,
partition_id: task_id.partition_id as usize,
plan: plan.clone(),
output_partitioning: output_partitioning.clone(),
session_id: session_id.clone(),
launch_time,
props: props.clone(),
})
.collect())
Ok((
task_ids
.iter()
.map(|task_id| TaskDefinition {
task_id: task_id.task_id as usize,
task_attempt_num: task_id.task_attempt_num as usize,
job_id: job_id.clone(),
stage_id,
stage_attempt_num,
partition_id: task_id.partition_id as usize,
plan: vec![],
output_partitioning: output_partitioning.clone(),
session_id: session_id.clone(),
launch_time,
props: props.clone(),
})
.collect(),
plan,
))
}
}
7 changes: 7 additions & 0 deletions ballista/executor/src/execution_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use ballista_core::execution_plans::ShuffleWriterExec;
use ballista_core::serde::protobuf::ShuffleWritePartition;
Expand Down Expand Up @@ -51,6 +52,8 @@ pub trait QueryStageExecutor: Sync + Send + Debug {
) -> Result<Vec<ShuffleWritePartition>>;

fn collect_plan_metrics(&self) -> Vec<MetricsSet>;

fn schema(&self) -> SchemaRef;
}

pub struct DefaultExecutionEngine {}
Expand Down Expand Up @@ -108,6 +111,10 @@ impl QueryStageExecutor for DefaultQueryStageExec {
.await
}

fn schema(&self) -> SchemaRef {
self.shuffle_writer.schema()
}

fn collect_plan_metrics(&self) -> Vec<MetricsSet> {
utils::collect_plan_metrics(self.shuffle_writer.children()[0].as_ref())
}
Expand Down
Loading

0 comments on commit 4e4842c

Please sign in to comment.