-
Notifications
You must be signed in to change notification settings - Fork 198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pull-based execution loop improvements #380
Conversation
@@ -162,7 +165,7 @@ async fn run_received_tasks<T: 'static + AsLogicalPlan, U: 'static + AsExecution | |||
task_id, job_id, stage_id, stage_attempt_num, partition_id, task_attempt_num | |||
); | |||
info!("Received task {}", task_identity); | |||
available_tasks_slots.fetch_sub(1, Ordering::SeqCst); | |||
let permit = available_task_slots.clone().acquire_owned().await.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd be careful acquiring a permit after retrieving a task from the scheduler. I'm imagining a scenario where the executor retrieves a task to execute, but then that task sits waiting for a permit to open up. could we move this call up to poll_loop
, before scheduler.poll_work()
is called?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't see whether that happens, as we also wait until there is a slot available before polling (so at this point there should be at least one available.
But I can see if I can make it not need the first check at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry about that, I didn't see the first check above. The problem I described wouldn't happen.
Another option is to pass the permit from the first check into run_received_tasks()
, and then you wouldn't need this check. I think you would replace the available_task_slots
argument with an OwnedSemaphorePermit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I did something like that, but after polling.
I would prefer to see if we can keep it like this (acquiring+release before poll instead of acquiring directly), as I want to add the possibility to retrieve multiple tasks from the scheduler based on semaphore.available_permits()
(and then acquire those permits later based on the nr. of tasks that are returned from the scheduler).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merged it in after some good testing to reduce the open PRs |
Which issue does this PR close?
Closes #383, #388
Rationale for this change
This pattern is applied in other places as well for cpu/blocking tasks, so it seems good to apply here too.
Also we can switch to using semaphores per @tfeda recommendation to avoid the sleeping. This improves performance somewhat (5-15%) when there is only one slot available, as there is less waiting between the tasks.
Will reduce some waiting in other cases too.
What changes are included in this PR?
Are there any user-facing changes?