Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pull-based execution loop improvements #380

Merged
merged 8 commits into from
Oct 20, 2022
Merged
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 21 additions & 17 deletions ballista/executor/src/execution_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use ballista_core::serde::protobuf::{
scheduler_grpc_client::SchedulerGrpcClient, PollWorkParams, PollWorkResult,
TaskDefinition, TaskStatus,
};
use tokio::sync::Semaphore;

use crate::cpu_bound_executor::DedicatedExecutor;
use crate::executor::Executor;
use crate::{as_task_status, TaskExecutionTimes};
use ballista_core::error::BallistaError;
Expand All @@ -38,7 +40,6 @@ use std::collections::HashMap;
use std::convert::TryInto;
use std::error::Error;
use std::ops::Deref;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::{Receiver, Sender, TryRecvError};
use std::time::{SystemTime, UNIX_EPOCH};
use std::{sync::Arc, time::Duration};
Expand All @@ -56,25 +57,25 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
.unwrap()
.clone()
.into();
let available_tasks_slots =
Arc::new(AtomicUsize::new(executor_specification.task_slots as usize));
let available_task_slots =
Arc::new(Semaphore::new(executor_specification.task_slots as usize));

let (task_status_sender, mut task_status_receiver) =
std::sync::mpsc::channel::<TaskStatus>();
info!("Starting poll work loop with scheduler");

let dedicated_executor =
DedicatedExecutor::new("task_runner", executor_specification.task_slots as usize);

loop {
// Wait for task slots to be available before asking for new work
let semaphore = available_task_slots.clone().acquire_owned().await.unwrap();
drop(semaphore);

// Keeps track of whether we received task in last iteration
// to avoid going in sleep mode between polling
let mut active_job = false;

let can_accept_task = available_tasks_slots.load(Ordering::SeqCst) > 0;

// Don't poll for work if we can not accept any tasks
if !can_accept_task {
tokio::time::sleep(Duration::from_millis(1)).await;
continue;
}

let task_status: Vec<TaskStatus> =
sample_tasks_status(&mut task_status_receiver).await;

Expand All @@ -84,7 +85,7 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
> = scheduler
.poll_work(PollWorkParams {
metadata: Some(executor.metadata.clone()),
can_accept_task,
can_accept_task: true,
task_status,
})
.await;
Expand All @@ -96,10 +97,11 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
if let Some(task) = result.into_inner().task {
match run_received_tasks(
executor.clone(),
available_tasks_slots.clone(),
available_task_slots.clone(),
task_status_sender,
task,
&codec,
&dedicated_executor,
)
.await
{
Expand Down Expand Up @@ -141,10 +143,11 @@ pub(crate) fn any_to_string(any: &Box<dyn Any + Send>) -> String {

async fn run_received_tasks<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
executor: Arc<Executor>,
available_tasks_slots: Arc<AtomicUsize>,
available_task_slots: Arc<Semaphore>,
task_status_sender: Sender<TaskStatus>,
task: TaskDefinition,
codec: &BallistaCodec<T, U>,
dedicated_executor: &DedicatedExecutor,
) -> Result<(), BallistaError> {
let task_id = task.task_id;
let task_attempt_num = task.task_attempt_num;
Expand All @@ -162,7 +165,7 @@ async fn run_received_tasks<T: 'static + AsLogicalPlan, U: 'static + AsExecution
task_id, job_id, stage_id, stage_attempt_num, partition_id, task_attempt_num
);
info!("Received task {}", task_identity);
available_tasks_slots.fetch_sub(1, Ordering::SeqCst);
let permit = available_task_slots.clone().acquire_owned().await.unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be careful acquiring a permit after retrieving a task from the scheduler. I'm imagining a scenario where the executor retrieves a task to execute, but then that task sits waiting for a permit to open up. could we move this call up to poll_loop, before scheduler.poll_work() is called?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't see whether that happens, as we also wait until there is a slot available before polling (so at this point there should be at least one available.

But I can see if I can make it not need the first check at all.

Copy link
Contributor

@tfeda tfeda Oct 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry about that, I didn't see the first check above. The problem I described wouldn't happen.

Another option is to pass the permit from the first check into run_received_tasks(), and then you wouldn't need this check. I think you would replace the available_task_slots argument with an OwnedSemaphorePermit.

Copy link
Contributor Author

@Dandandan Dandandan Oct 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I did something like that, but after polling.

I would prefer to see if we can keep it like this (acquiring+release before poll instead of acquiring directly), as I want to add the possibility to retrieve multiple tasks from the scheduler based on semaphore.available_permits() (and then acquire those permits later based on the nr. of tasks that are returned from the scheduler).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


let mut task_props = HashMap::new();
for kv_pair in task.props {
Expand Down Expand Up @@ -206,7 +209,7 @@ async fn run_received_tasks<T: 'static + AsLogicalPlan, U: 'static + AsExecution

let shuffle_writer_plan =
executor.new_shuffle_writer(job_id.clone(), stage_id as usize, plan)?;
tokio::spawn(async move {
dedicated_executor.spawn(async move {
use std::panic::AssertUnwindSafe;
let part = PartitionId {
job_id: job_id.clone(),
Expand All @@ -232,9 +235,9 @@ async fn run_received_tasks<T: 'static + AsLogicalPlan, U: 'static + AsExecution
}
};


info!("Done with task {}", task_identity);
debug!("Statistics: {:?}", execution_result);
available_tasks_slots.fetch_add(1, Ordering::SeqCst);

let plan_metrics = collect_plan_metrics(shuffle_writer_plan.as_ref());
let operator_metrics = plan_metrics
Expand Down Expand Up @@ -263,6 +266,7 @@ async fn run_received_tasks<T: 'static + AsLogicalPlan, U: 'static + AsExecution
operator_metrics,
task_execution_times,
));
drop(permit);
});

Ok(())
Expand Down