Skip to content

Commit

Permalink
[Ballista] Fix scheduler state mod bug (#1655)
Browse files Browse the repository at this point in the history
* Fix the bug of task stuck

* Fix the bug of task stuck
  • Loading branch information
EricJoy2048 authored Jan 24, 2022
1 parent 618c1e8 commit 2a9df64
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 46 deletions.
69 changes: 69 additions & 0 deletions ballista/rust/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ impl BallistaContext {

#[cfg(test)]
mod tests {

#[tokio::test]
#[cfg(feature = "standalone")]
async fn test_standalone_mode() {
Expand Down Expand Up @@ -452,4 +453,72 @@ mod tests {
let df = context.sql("show tables;").await;
assert!(df.is_ok());
}

#[tokio::test]
#[cfg(feature = "standalone")]
async fn test_task_stuck_when_referenced_task_failed() {
use super::*;
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::util::pretty;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{ListingOptions, ListingTable};

use ballista_core::config::{
BallistaConfigBuilder, BALLISTA_WITH_INFORMATION_SCHEMA,
};
use std::fs::File;
use std::io::Write;
use tempfile::TempDir;
let config = BallistaConfigBuilder::default()
.set(BALLISTA_WITH_INFORMATION_SCHEMA, "true")
.build()
.unwrap();
let context = BallistaContext::standalone(&config, 1).await.unwrap();

let testdata = datafusion::test_util::parquet_test_data();
context
.register_parquet("single_nan", &format!("{}/single_nan.parquet", testdata))
.await
.unwrap();

{
let mut guard = context.state.lock().unwrap();
let csv_table = guard.tables.get("single_nan");

if let Some(table_provide) = csv_table {
if let Some(listing_table) = table_provide
.clone()
.as_any()
.downcast_ref::<ListingTable>()
{
let x = listing_table.options();
let error_options = ListingOptions {
file_extension: x.file_extension.clone(),
format: Arc::new(CsvFormat::default()),
table_partition_cols: x.table_partition_cols.clone(),
collect_stat: x.collect_stat,
target_partitions: x.target_partitions,
};
let error_table = ListingTable::new(
listing_table.object_store().clone(),
listing_table.table_path().to_string(),
Arc::new(Schema::new(vec![])),
error_options,
);
// change the table to an error table
guard
.tables
.insert("single_nan".to_string(), Arc::new(error_table));
}
}
}

let df = context
.sql("select count(1) from single_nan;")
.await
.unwrap();
let results = df.collect().await.unwrap();
pretty::print_batches(&results);
}
}
108 changes: 62 additions & 46 deletions ballista/rust/scheduler/src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,64 +429,80 @@ impl SchedulerState {
.await?;
if task_is_dead {
continue 'tasks;
} else if let Some(task_status::Status::Completed(
CompletedTask {
}
match &referenced_task.status {
Some(task_status::Status::Completed(CompletedTask {
executor_id,
partitions,
},
)) = &referenced_task.status
{
debug!("Task for unresolved shuffle input partition {} completed and produced these shuffle partitions:\n\t{}",
shuffle_input_partition_id,
partitions.iter().map(|p| format!("{}={}", p.partition_id, &p.path)).collect::<Vec<_>>().join("\n\t")
);
let stage_shuffle_partition_locations = partition_locations
.entry(unresolved_shuffle.stage_id)
.or_insert_with(HashMap::new);
let executor_meta = executors
.iter()
.find(|exec| exec.id == *executor_id)
.unwrap()
.clone();

for shuffle_write_partition in partitions {
let temp = stage_shuffle_partition_locations
.entry(shuffle_write_partition.partition_id as usize)
.or_insert_with(Vec::new);
let executor_meta = executor_meta.clone();
let partition_location =
ballista_core::serde::scheduler::PartitionLocation {
partition_id:
})) => {
debug!("Task for unresolved shuffle input partition {} completed and produced these shuffle partitions:\n\t{}",
shuffle_input_partition_id,
partitions.iter().map(|p| format!("{}={}", p.partition_id, &p.path)).collect::<Vec<_>>().join("\n\t")
);
let stage_shuffle_partition_locations =
partition_locations
.entry(unresolved_shuffle.stage_id)
.or_insert_with(HashMap::new);
let executor_meta = executors
.iter()
.find(|exec| exec.id == *executor_id)
.unwrap()
.clone();

for shuffle_write_partition in partitions {
let temp = stage_shuffle_partition_locations
.entry(
shuffle_write_partition.partition_id as usize,
)
.or_insert_with(Vec::new);
let executor_meta = executor_meta.clone();
let partition_location =
ballista_core::serde::scheduler::PartitionLocation {
partition_id:
ballista_core::serde::scheduler::PartitionId {
job_id: partition.job_id.clone(),
stage_id: unresolved_shuffle.stage_id,
partition_id: shuffle_write_partition
.partition_id
as usize,
},
executor_meta,
partition_stats: PartitionStats::new(
Some(shuffle_write_partition.num_rows),
Some(shuffle_write_partition.num_batches),
Some(shuffle_write_partition.num_bytes),
),
path: shuffle_write_partition.path.clone(),
};
executor_meta,
partition_stats: PartitionStats::new(
Some(shuffle_write_partition.num_rows),
Some(shuffle_write_partition.num_batches),
Some(shuffle_write_partition.num_bytes),
),
path: shuffle_write_partition.path.clone(),
};
debug!(
"Scheduler storing stage {} output partition {} path: {}",
unresolved_shuffle.stage_id,
partition_location.partition_id.partition_id,
partition_location.path
);
temp.push(partition_location);
}
}
Some(task_status::Status::Failed(FailedTask { error })) => {
// A task should fail when its referenced_task fails
let mut status = status.clone();
let err_msg = error.to_string();
status.status =
Some(task_status::Status::Failed(FailedTask {
error: err_msg,
}));
self.save_task_status(&status).await?;
continue 'tasks;
}
_ => {
debug!(
"Scheduler storing stage {} output partition {} path: {}",
"Stage {} input partition {} has not completed yet",
unresolved_shuffle.stage_id,
partition_location.partition_id.partition_id,
partition_location.path
);
temp.push(partition_location);
shuffle_input_partition_id,
);
continue 'tasks;
}
} else {
debug!(
"Stage {} input partition {} has not completed yet",
unresolved_shuffle.stage_id, shuffle_input_partition_id,
);
continue 'tasks;
}
};
}
}

Expand Down

0 comments on commit 2a9df64

Please sign in to comment.