Skip to content

Commit

Permalink
recover TimeAvailResMap to avoid resource addition
Browse files Browse the repository at this point in the history
  • Loading branch information
NamelessOIer committed Nov 28, 2024
1 parent cdc94b9 commit b4bde90
Show file tree
Hide file tree
Showing 2 changed files with 225 additions and 85 deletions.
262 changes: 204 additions & 58 deletions src/CraneCtld/TaskScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1878,9 +1878,11 @@ void MinLoadFirst::CalculateNodeSelectionInfoOfPartition_(
running_task_ids_str.emplace_back(std::to_string(task_id));
}

std::sort(
end_time_task_id_vec.begin(), end_time_task_id_vec.end(),
[](const auto& lhs, const auto& rhs) { return lhs.first < rhs.first; });

if constexpr (kAlgoTraceOutput) {
CRANE_TRACE("Craned node {} has running tasks: {}", craned_id,
absl::StrJoin(running_task_ids_str, ", "));
if (!end_time_task_id_vec.empty()) {
std::string str;
str.append(
Expand All @@ -1895,33 +1897,89 @@ void MinLoadFirst::CalculateNodeSelectionInfoOfPartition_(

// Calculate delta resources at [now, first task end,
// second task end, ...] in this node.
auto& time_delta_res_map =
node_selection_info_ref.node_time_delta_res_map[craned_id];
auto& time_avail_res_map =
node_selection_info_ref.node_time_avail_res_map[craned_id];
node_selection_info_ref.node_res_total_map[craned_id] =
craned_meta->res_total;
node_selection_info_ref.setCost(craned_id, 0);

time_delta_res_map[now] = craned_meta->res_avail;
// Insert [now, inf) interval and thus guarantee time_avail_res_map is not
// null.
time_avail_res_map[now] = craned_meta->res_avail;

for (auto& [end_time, task_id] : end_time_task_id_vec) {
time_delta_res_map[end_time] +=
running_tasks.at(task_id)->Resources().at(craned_id);
node_selection_info_ref.updateCost(
craned_id, now, end_time,
running_tasks.at(task_id)->Resources().at(craned_id));
if constexpr (kAlgoTraceOutput) {
CRANE_TRACE("Craned {} initial res_avail now: cpu: {}, mem: {}, gres: {}",
craned_id, craned_meta->res_avail.allocatable_res.cpu_count,
craned_meta->res_avail.allocatable_res.memory_bytes,
util::ReadableDresInNode(craned_meta->res_avail));
}

if constexpr (kAlgoTraceOutput) {
std::string str;
ResourceInNode cur_res;
for (auto& [time, res] : time_delta_res_map) {
cur_res += res;
str.append(fmt::format(
"Craned {} res_avail at now+{}s: cpu: {}, mem: {}\n", craned_id,
absl::ToInt64Seconds(time - now), cur_res.allocatable_res.cpu_count,
cur_res.allocatable_res.memory_bytes));
{ // Limit the scope of `iter`
auto cur_time_iter = time_avail_res_map.begin();
bool ok;
for (auto& [end_time, task_id] : end_time_task_id_vec) {
const auto& running_task = running_tasks.at(task_id);
ResourceInNode const& running_task_res =
running_task->Resources().at(craned_id);
node_selection_info_ref.updateCost(craned_id, now, end_time,
running_task_res);
if (cur_time_iter->first != end_time) {
/**
* If there isn't any task that ends at the `end_time`,
* insert an interval [end_time, inf) with the resource of
* the previous interval for the following addition of
* freed resources.
* Note: Such two intervals [5,6), [6,inf) do not overlap with
* each other.
*/
std::tie(cur_time_iter, ok) =
time_avail_res_map.emplace(end_time, cur_time_iter->second);
}

/**
* For the situation in which multiple tasks may end at the same
* time:
* end_time__task_id_vec: [{now+1, 1}, {now+1, 2}, ...]
* But we want only 1 time point in time__avail_res__map:
* {{now+1+1: available_res(now) + available_res(1) +
* available_res(2)}, ...}
*/
cur_time_iter->second += running_task_res;

if constexpr (kAlgoTraceOutput) {
CRANE_TRACE(
"Craned {} res_avail at now + {}s: cpu: {}, mem: {}, gres: {}; ",
craned_id, absl::ToInt64Seconds(cur_time_iter->first - now),
cur_time_iter->second.allocatable_res.cpu_count,
cur_time_iter->second.allocatable_res.memory_bytes,
util::ReadableDresInNode(cur_time_iter->second));
}
}

if constexpr (kAlgoTraceOutput) {
std::string str;
str.append(fmt::format("Node ({}, {}): ", partition_id, craned_id));
auto prev_iter = time_avail_res_map.begin();
auto iter = std::next(prev_iter);
for (; iter != time_avail_res_map.end(); prev_iter++, iter++) {
str.append(
fmt::format("[ now+{}s , now+{}s ) Available allocatable "
"res: cpu core {}, mem {}, gres {}",
absl::ToInt64Seconds(prev_iter->first - now),
absl::ToInt64Seconds(iter->first - now),
prev_iter->second.allocatable_res.cpu_count,
prev_iter->second.allocatable_res.memory_bytes,
util::ReadableDresInNode(prev_iter->second)));
}
str.append(
fmt::format("[ now+{}s , inf ) Available allocatable "
"res: cpu core {}, mem {}, gres {}",
absl::ToInt64Seconds(prev_iter->first - now),
prev_iter->second.allocatable_res.cpu_count,
prev_iter->second.allocatable_res.memory_bytes,
util::ReadableDresInNode(prev_iter->second)));
CRANE_TRACE("{}", str);
}
CRANE_TRACE("{}", str);
}
}
}
Expand All @@ -1947,8 +2005,8 @@ bool MinLoadFirst::CalculateRunningNodesAndStartTime_(
// for the task when checking task validity in TaskScheduler.
continue;
}
auto& time_delta_res_map =
node_selection_info.node_time_delta_res_map.at(craned_index);
auto& time_avail_res_map =
node_selection_info.node_time_avail_res_map.at(craned_index);
auto craned_meta = craned_meta_map.at(craned_index).GetExclusivePtr();

// If any of the follow `if` is true, skip this node.
Expand Down Expand Up @@ -2004,20 +2062,20 @@ bool MinLoadFirst::CalculateRunningNodesAndStartTime_(

task->SetResources(std::move(allocated_res));

std::vector<TimeDeltaResTracker> trackers;
std::priority_queue<TimeDeltaResTracker*, std::vector<TimeDeltaResTracker*>,
std::function<bool(const TimeDeltaResTracker*,
const TimeDeltaResTracker*)>>
pq([](const TimeDeltaResTracker* lhs, const TimeDeltaResTracker* rhs) {
std::vector<TimeAvailResTracker> trackers;
std::priority_queue<TimeAvailResTracker*, std::vector<TimeAvailResTracker*>,
std::function<bool(const TimeAvailResTracker*,
const TimeAvailResTracker*)>>
pq([](const TimeAvailResTracker* lhs, const TimeAvailResTracker* rhs) {
return lhs->it->first > rhs->it->first;
});

trackers.reserve(craned_indexes_.size());
for (CranedId craned_id : craned_indexes_) {
auto& time_delta_res_map =
node_selection_info.node_time_delta_res_map.at(craned_id);
auto it = time_delta_res_map.begin();
trackers.emplace_back(craned_id, it, time_delta_res_map.end(),
auto& time_avail_res_map =
node_selection_info.node_time_avail_res_map.at(craned_id);
auto it = time_avail_res_map.begin();
trackers.emplace_back(craned_id, it, time_avail_res_map.end(),
&task->Resources().at(craned_id));
pq.emplace(&trackers.back());
}
Expand All @@ -2029,8 +2087,9 @@ bool MinLoadFirst::CalculateRunningNodesAndStartTime_(
while (!pq.empty() && pq.top()->it->first == time) {
auto tmp = pq.top();
pq.pop();
satisfied_count += tmp->count;
satisfied_count -= tmp->satisfied;
if (tmp->genNext()) pq.emplace(tmp);
satisfied_count += tmp->satisfied;
}
if constexpr (kAlgoTraceOutput) {
CRANE_TRACE("At time now+{}s, {} nodes are satisfied.",
Expand Down Expand Up @@ -2204,6 +2263,7 @@ void MinLoadFirst::SubtractTaskResourceNodeSelectionInfo_(
const ResourceV2& resources, std::list<CranedId> const& craned_ids,
MinLoadFirst::NodeSelectionInfo* node_selection_info) {
NodeSelectionInfo& node_info = *node_selection_info;
bool ok;

absl::Time task_end_time = expected_start_time + duration;

Expand All @@ -2212,34 +2272,120 @@ void MinLoadFirst::SubtractTaskResourceNodeSelectionInfo_(
ResourceInNode const& task_res_in_node = resources.at(craned_id);
node_info.updateCost(craned_id, expected_start_time, task_end_time,
task_res_in_node);
TimeDeltaResMap& time_delta_res_map =
node_info.node_time_delta_res_map[craned_id];
TimeAvailResMap& time_avail_res_map =
node_info.node_time_avail_res_map[craned_id];

auto task_duration_begin_it =
time_avail_res_map.upper_bound(expected_start_time);
if (task_duration_begin_it == time_avail_res_map.end()) {
--task_duration_begin_it;
// Situation #1
// task duration
// |<-------------->|
// *-----------------*----------------------> inf
// ^
// task_duration_begin_it
//
// *-----------------*----------------|-----> inf
// ^ ^
// | insert here
// subtract resource here
//
// OR Situation #2
// task duration
// |<-------------->|
// *-----------------*----------------------> inf
// ^
// task_duration_begin_it
//
// *-----------------*--|----------------|--> inf
// ^ ^ ^
// insert here | insert here
// subtract resource here

TimeAvailResMap::iterator inserted_it;
std::tie(inserted_it, ok) = time_avail_res_map.emplace(
task_end_time, task_duration_begin_it->second);
CRANE_ASSERT_MSG(ok == true, "Insertion must be successful.");

if (task_duration_begin_it->first == expected_start_time) {
// Situation #1
CRANE_ASSERT(task_res_in_node <= task_duration_begin_it->second);
task_duration_begin_it->second -= task_res_in_node;
} else {
// Situation #2
std::tie(inserted_it, ok) = time_avail_res_map.emplace(
expected_start_time, task_duration_begin_it->second);
CRANE_ASSERT_MSG(ok == true, "Insertion must be successful.");

time_delta_res_map[expected_start_time] -= task_res_in_node;
time_delta_res_map[task_end_time] += task_res_in_node;
CRANE_ASSERT(task_res_in_node <= inserted_it->second);
inserted_it->second -= task_res_in_node;
}
} else {
--task_duration_begin_it;
// Situation #3
// task duration
// |<-------------->|
// *-------*----------*---------*------------
// ^ ^
// task_duration_begin_it task_duration_end_it
// *-------*------|---*---------*--|---------
// ^ ^ ^ ^ ^
// insert here | | | insert here
// subtract at these points
//
// Or Situation #4
// task duration
// |<----------------->|
// *-------*----------*--------*------------
// ^ ^
// task_duration_begin_it task_duration_end_it

// std::prev can be used without any check here.
// There will always be one time point (now) before task_end_time.

if (task_duration_begin_it->first != expected_start_time) {
// Situation #3 (begin)
TimeAvailResMap::iterator inserted_it;
std::tie(inserted_it, ok) = time_avail_res_map.emplace(
expected_start_time, task_duration_begin_it->second);
CRANE_ASSERT_MSG(ok == true, "Insertion must be successful.");

task_duration_begin_it = inserted_it;
}

if constexpr (kAlgoTraceOutput) {
std::string str;
str.append(
fmt::format("Subtracted resource from Craned {} at now+{}s to "
"now+{}s: cpu: {}, mem: {}\n",
craned_id,
absl::ToInt64Seconds(expected_start_time -
time_delta_res_map.begin()->first),
absl::ToInt64Seconds(task_end_time -
time_delta_res_map.begin()->first),
task_res_in_node.allocatable_res.cpu_count,
task_res_in_node.allocatable_res.memory_bytes));
ResourceInNode cur_res;
for (auto& [time, res] : time_delta_res_map) {
cur_res += res;
str.append(fmt::format(
"Craned {} res_avail at now+{}s: cpu: {}, mem: {}\n", craned_id,
absl::ToInt64Seconds(time - time_delta_res_map.begin()->first),
cur_res.allocatable_res.cpu_count,
cur_res.allocatable_res.memory_bytes));
auto task_duration_end_it =
std::prev(time_avail_res_map.upper_bound(task_end_time));

// Subtract the required resources within the interval.
for (auto in_duration_it = task_duration_begin_it;
in_duration_it != task_duration_end_it; in_duration_it++) {
CRANE_ASSERT(task_res_in_node <= in_duration_it->second);
in_duration_it->second -= task_res_in_node;
}

// Check if we need to insert a time point at
// `task_end_time_plus_1s` Detailed version of why: Assume one task
// end at time x-2, If "x+2" lies in the interval [x, y-1) in
// time__avail_res__map,
// for example, x+2 in [x, y-1) with the available resources amount
// `a`, we need to divide this interval into to two intervals: [x,
// x+2]: a-k, where k is the resource amount that task requires,
// [x+3, y-1]: a
// Therefore, we need to insert a key-value at x+3 to preserve this.
// However, if the length of [x+3, y-1] is 0, or more simply, the
// point x+3 exists, there's no need to save the interval [x+3,
// y-1].
if (task_duration_end_it->first != task_end_time) {
// Situation #3 (end)
TimeAvailResMap::iterator inserted_it;
std::tie(inserted_it, ok) = time_avail_res_map.emplace(
task_end_time, task_duration_end_it->second);
CRANE_ASSERT_MSG(ok == true, "Insertion must be successful.");

CRANE_ASSERT(task_res_in_node <= task_duration_end_it->second);
task_duration_end_it->second -= task_res_in_node;
}
CRANE_TRACE("{}", str);
}
}
}
Expand Down
Loading

0 comments on commit b4bde90

Please sign in to comment.