Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix cargo clippy #571

Merged
merged 1 commit into from
Dec 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ballista/core/src/execution_plans/shuffle_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ async fn fetch_partition_remote(
// TODO for shuffle client connections, we should avoid creating new connections again and again.
// And we should also avoid to keep alive too many connections for long time.
let host = metadata.host.as_str();
let port = metadata.port as u16;
let port = metadata.port;
let mut ballista_client =
BallistaClient::try_new(host, port)
.await
Expand Down
2 changes: 1 addition & 1 deletion ballista/core/src/plugin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl PluginEnum {
/// new a struct which impl the PluginRegistrar trait
pub fn init_plugin_manager(&self) -> Box<dyn PluginRegistrar> {
match self {
PluginEnum::UDF => Box::new(UDFPluginManager::default()),
PluginEnum::UDF => Box::<UDFPluginManager>::default(),
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions ballista/executor/src/executor_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,20 +342,20 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
let partition_id = task.partition_id;
let shuffle_writer_plan =
self.executor
.new_shuffle_writer(job_id.clone(), stage_id as usize, plan)?;
.new_shuffle_writer(job_id.clone(), stage_id, plan)?;

let part = PartitionId {
job_id: job_id.clone(),
stage_id: stage_id as usize,
partition_id: partition_id as usize,
stage_id,
partition_id,
};

info!("Start to execute shuffle write for task {}", task_identity);

let execution_result = self
.executor
.execute_shuffle_write(
task_id as usize,
task_id,
part.clone(),
shuffle_writer_plan.clone(),
task_context,
Expand Down
10 changes: 4 additions & 6 deletions ballista/scheduler/src/state/execution_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,6 @@ impl ExecutionGraph {
let mut locations = vec![];
for task_status in stage_task_statuses.into_iter() {
{
let stage_id = stage_id as usize;
let task_stage_attempt_num =
task_status.stage_attempt_num as usize;
if task_stage_attempt_num < running_stage.stage_attempt_num {
Expand Down Expand Up @@ -481,7 +480,6 @@ impl ExecutionGraph {
);
} else if let ExecutionStage::UnResolved(unsolved_stage) = stage {
for task_status in stage_task_statuses.into_iter() {
let stage_id = stage_id as usize;
let task_stage_attempt_num =
task_status.stage_attempt_num as usize;
let partition_id = task_status.clone().partition_id as usize;
Expand Down Expand Up @@ -815,8 +813,8 @@ impl ExecutionGraph {
/// Total number of tasks in this plan that are ready for scheduling
pub fn available_tasks(&self) -> usize {
self.stages
.iter()
.map(|(_, stage)| {
.values()
.map(|stage| {
if let ExecutionStage::Running(stage) = stage {
stage.available_tasks()
} else {
Expand Down Expand Up @@ -1412,8 +1410,8 @@ impl Debug for ExecutionGraph {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let stages = self
.stages
.iter()
.map(|(_, stage)| format!("{:?}", stage))
.values()
.map(|stage| format!("{:?}", stage))
.collect::<Vec<String>>()
.join("");
write!(f, "ExecutionGraph[job_id={}, session_id={}, available_tasks={}, is_successful={}]\n{}",
Expand Down
6 changes: 3 additions & 3 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordB
result = execute_query(&ctx, &plan, opt.debug).await?;
}
let elapsed = start.elapsed().as_secs_f64() * 1000.0;
millis.push(elapsed as f64);
millis.push(elapsed);
let row_count = result.iter().map(|b| b.num_rows()).sum();
println!(
"Query {} iteration {} took {:.1} ms and returned {} rows",
Expand Down Expand Up @@ -405,7 +405,7 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
.unwrap();
}
let elapsed = start.elapsed().as_secs_f64() * 1000.0;
millis.push(elapsed as f64);
millis.push(elapsed);
let row_count = batches.iter().map(|b| b.num_rows()).sum();
println!(
"Query {} iteration {} took {:.1} ms and returned {} rows",
Expand Down Expand Up @@ -556,7 +556,7 @@ fn get_query_sql_by_path(query: usize, mut sql_path: String) -> Result<String> {
}
if query > 0 && query < 23 {
let filename = format!("{}/q{}.sql", sql_path, query);
Ok(fs::read_to_string(&filename).expect("failed to read query"))
Ok(fs::read_to_string(filename).expect("failed to read query"))
} else {
Err(DataFusionError::Plan(
"invalid query. Expected value between 1 and 22".to_owned(),
Expand Down