Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory scheduling 2 DFS with Priority #3

Merged
merged 19 commits into from
Nov 16, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Avoid modifying leased_workers during iterate, block tasks from being…
… dispatched if priority is too low
stephanie-wang committed Nov 16, 2021

Verified

This commit was signed with the committer’s verified signature. The key has expired.
addaleax Anna Henningsen
commit c646427d71e6061c1cacb252f5d273bb07370372
3 changes: 0 additions & 3 deletions src/ray/core_worker/reference_count.h
Original file line number Diff line number Diff line change
@@ -476,9 +476,6 @@ class ReferenceCounter : public ReferenceCounterInterface,
LOCKS_EXCLUDED(mutex_);

private:
// TODO: Add priority to the Reference struct.
// TODO: Track how many tasks have been submitted so far that depend on an
// object, use for DFS-based priority calculation.
struct Reference {
/// Constructor for a reference whose origin is unknown.
Reference() {}
24 changes: 19 additions & 5 deletions src/ray/raylet/scheduling/cluster_task_manager.cc
Original file line number Diff line number Diff line change
@@ -78,8 +78,9 @@ bool ClusterTaskManager::SchedulePendingTasks() {
// tasks from being scheduled.
Priority task_priority = work_it->first.first;
if(task_priority >= block_requested_priority_){
return did_schedule;
return did_schedule;
}

const std::shared_ptr<Work> &work = work_it->second;
RayTask task = work->task;
RAY_LOG(DEBUG) << "Scheduling pending task "
@@ -294,6 +295,13 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers(
continue;
}

// Block tasks of a lower priority.
Priority task_priority = work_it->first.first;
if(task_priority >= block_requested_priority_){
break;
}


bool args_missing = false;
bool success = PinTaskArgsIfMemoryAvailable(spec, &args_missing);
// An argument was evicted since this task was added to the dispatch
@@ -1199,17 +1207,23 @@ bool ClusterTaskManager::ReturnCpuResourcesToBlockedWorker(

bool ClusterTaskManager::EvictTasks(Priority base_priority) {
bool should_spill = true;
std::vector<std::shared_ptr<WorkerInterface>> workers_to_kill;
for (auto &entry : leased_workers_) {
std::shared_ptr<WorkerInterface> worker = entry.second;
Priority priority = worker->GetAssignedTask().GetTaskSpecification().GetPriority();
//Smaller priority have higher priority
//Does not have less than check it
if(priority >= base_priority){
//Consider Using CancelTask instead of DestroyWorker
destroy_worker_(worker, rpc::WorkerExitType::INTENDED_EXIT);
should_spill = false;
if (priority >= base_priority){
workers_to_kill.push_back(worker);
should_spill = false;
}
}

for (auto &worker : workers_to_kill) {
//Consider Using CancelTask instead of DestroyWorker
destroy_worker_(worker, rpc::WorkerExitType::INTENDED_EXIT);
}

//Check Deadlock corner cases
//Finer granularity preemption is not considered, kill all the lower priorities
return should_spill;