diff --git a/src/CraneCtld/TaskScheduler.cpp b/src/CraneCtld/TaskScheduler.cpp index 919940a4..a258ce25 100644 --- a/src/CraneCtld/TaskScheduler.cpp +++ b/src/CraneCtld/TaskScheduler.cpp @@ -1878,9 +1878,11 @@ void MinLoadFirst::CalculateNodeSelectionInfoOfPartition_( running_task_ids_str.emplace_back(std::to_string(task_id)); } + std::sort( + end_time_task_id_vec.begin(), end_time_task_id_vec.end(), + [](const auto& lhs, const auto& rhs) { return lhs.first < rhs.first; }); + if constexpr (kAlgoTraceOutput) { - CRANE_TRACE("Craned node {} has running tasks: {}", craned_id, - absl::StrJoin(running_task_ids_str, ", ")); if (!end_time_task_id_vec.empty()) { std::string str; str.append( @@ -1895,33 +1897,89 @@ void MinLoadFirst::CalculateNodeSelectionInfoOfPartition_( // Calculate delta resources at [now, first task end, // second task end, ...] in this node. - auto& time_delta_res_map = - node_selection_info_ref.node_time_delta_res_map[craned_id]; + auto& time_avail_res_map = + node_selection_info_ref.node_time_avail_res_map[craned_id]; node_selection_info_ref.node_res_total_map[craned_id] = craned_meta->res_total; node_selection_info_ref.setCost(craned_id, 0); - time_delta_res_map[now] = craned_meta->res_avail; + // Insert [now, inf) interval and thus guarantee time_avail_res_map is not + // null. + time_avail_res_map[now] = craned_meta->res_avail; - for (auto& [end_time, task_id] : end_time_task_id_vec) { - time_delta_res_map[end_time] += - running_tasks.at(task_id)->Resources().at(craned_id); - node_selection_info_ref.updateCost( - craned_id, now, end_time, - running_tasks.at(task_id)->Resources().at(craned_id)); + if constexpr (kAlgoTraceOutput) { + CRANE_TRACE("Craned {} initial res_avail now: cpu: {}, mem: {}, gres: {}", + craned_id, craned_meta->res_avail.allocatable_res.cpu_count, + craned_meta->res_avail.allocatable_res.memory_bytes, + util::ReadableDresInNode(craned_meta->res_avail)); } - if constexpr (kAlgoTraceOutput) { - std::string str; - ResourceInNode cur_res; - for (auto& [time, res] : time_delta_res_map) { - cur_res += res; - str.append(fmt::format( - "Craned {} res_avail at now+{}s: cpu: {}, mem: {}\n", craned_id, - absl::ToInt64Seconds(time - now), cur_res.allocatable_res.cpu_count, - cur_res.allocatable_res.memory_bytes)); + { // Limit the scope of `iter` + auto cur_time_iter = time_avail_res_map.begin(); + bool ok; + for (auto& [end_time, task_id] : end_time_task_id_vec) { + const auto& running_task = running_tasks.at(task_id); + ResourceInNode const& running_task_res = + running_task->Resources().at(craned_id); + node_selection_info_ref.updateCost(craned_id, now, end_time, + running_task_res); + if (cur_time_iter->first != end_time) { + /** + * If there isn't any task that ends at the `end_time`, + * insert an interval [end_time, inf) with the resource of + * the previous interval for the following addition of + * freed resources. + * Note: Such two intervals [5,6), [6,inf) do not overlap with + * each other. + */ + std::tie(cur_time_iter, ok) = + time_avail_res_map.emplace(end_time, cur_time_iter->second); + } + + /** + * For the situation in which multiple tasks may end at the same + * time: + * end_time__task_id_vec: [{now+1, 1}, {now+1, 2}, ...] + * But we want only 1 time point in time__avail_res__map: + * {{now+1+1: available_res(now) + available_res(1) + + * available_res(2)}, ...} + */ + cur_time_iter->second += running_task_res; + + if constexpr (kAlgoTraceOutput) { + CRANE_TRACE( + "Craned {} res_avail at now + {}s: cpu: {}, mem: {}, gres: {}; ", + craned_id, absl::ToInt64Seconds(cur_time_iter->first - now), + cur_time_iter->second.allocatable_res.cpu_count, + cur_time_iter->second.allocatable_res.memory_bytes, + util::ReadableDresInNode(cur_time_iter->second)); + } + } + + if constexpr (kAlgoTraceOutput) { + std::string str; + str.append(fmt::format("Node ({}, {}): ", partition_id, craned_id)); + auto prev_iter = time_avail_res_map.begin(); + auto iter = std::next(prev_iter); + for (; iter != time_avail_res_map.end(); prev_iter++, iter++) { + str.append( + fmt::format("[ now+{}s , now+{}s ) Available allocatable " + "res: cpu core {}, mem {}, gres {}", + absl::ToInt64Seconds(prev_iter->first - now), + absl::ToInt64Seconds(iter->first - now), + prev_iter->second.allocatable_res.cpu_count, + prev_iter->second.allocatable_res.memory_bytes, + util::ReadableDresInNode(prev_iter->second))); + } + str.append( + fmt::format("[ now+{}s , inf ) Available allocatable " + "res: cpu core {}, mem {}, gres {}", + absl::ToInt64Seconds(prev_iter->first - now), + prev_iter->second.allocatable_res.cpu_count, + prev_iter->second.allocatable_res.memory_bytes, + util::ReadableDresInNode(prev_iter->second))); + CRANE_TRACE("{}", str); } - CRANE_TRACE("{}", str); } } } @@ -1947,8 +2005,8 @@ bool MinLoadFirst::CalculateRunningNodesAndStartTime_( // for the task when checking task validity in TaskScheduler. continue; } - auto& time_delta_res_map = - node_selection_info.node_time_delta_res_map.at(craned_index); + auto& time_avail_res_map = + node_selection_info.node_time_avail_res_map.at(craned_index); auto craned_meta = craned_meta_map.at(craned_index).GetExclusivePtr(); // If any of the follow `if` is true, skip this node. @@ -2004,20 +2062,20 @@ bool MinLoadFirst::CalculateRunningNodesAndStartTime_( task->SetResources(std::move(allocated_res)); - std::vector trackers; - std::priority_queue, - std::function> - pq([](const TimeDeltaResTracker* lhs, const TimeDeltaResTracker* rhs) { + std::vector trackers; + std::priority_queue, + std::function> + pq([](const TimeAvailResTracker* lhs, const TimeAvailResTracker* rhs) { return lhs->it->first > rhs->it->first; }); trackers.reserve(craned_indexes_.size()); for (CranedId craned_id : craned_indexes_) { - auto& time_delta_res_map = - node_selection_info.node_time_delta_res_map.at(craned_id); - auto it = time_delta_res_map.begin(); - trackers.emplace_back(craned_id, it, time_delta_res_map.end(), + auto& time_avail_res_map = + node_selection_info.node_time_avail_res_map.at(craned_id); + auto it = time_avail_res_map.begin(); + trackers.emplace_back(craned_id, it, time_avail_res_map.end(), &task->Resources().at(craned_id)); pq.emplace(&trackers.back()); } @@ -2029,8 +2087,9 @@ bool MinLoadFirst::CalculateRunningNodesAndStartTime_( while (!pq.empty() && pq.top()->it->first == time) { auto tmp = pq.top(); pq.pop(); - satisfied_count += tmp->count; + satisfied_count -= tmp->satisfied; if (tmp->genNext()) pq.emplace(tmp); + satisfied_count += tmp->satisfied; } if constexpr (kAlgoTraceOutput) { CRANE_TRACE("At time now+{}s, {} nodes are satisfied.", @@ -2204,6 +2263,7 @@ void MinLoadFirst::SubtractTaskResourceNodeSelectionInfo_( const ResourceV2& resources, std::list const& craned_ids, MinLoadFirst::NodeSelectionInfo* node_selection_info) { NodeSelectionInfo& node_info = *node_selection_info; + bool ok; absl::Time task_end_time = expected_start_time + duration; @@ -2212,34 +2272,120 @@ void MinLoadFirst::SubtractTaskResourceNodeSelectionInfo_( ResourceInNode const& task_res_in_node = resources.at(craned_id); node_info.updateCost(craned_id, expected_start_time, task_end_time, task_res_in_node); - TimeDeltaResMap& time_delta_res_map = - node_info.node_time_delta_res_map[craned_id]; + TimeAvailResMap& time_avail_res_map = + node_info.node_time_avail_res_map[craned_id]; + + auto task_duration_begin_it = + time_avail_res_map.upper_bound(expected_start_time); + if (task_duration_begin_it == time_avail_res_map.end()) { + --task_duration_begin_it; + // Situation #1 + // task duration + // |<-------------->| + // *-----------------*----------------------> inf + // ^ + // task_duration_begin_it + // + // *-----------------*----------------|-----> inf + // ^ ^ + // | insert here + // subtract resource here + // + // OR Situation #2 + // task duration + // |<-------------->| + // *-----------------*----------------------> inf + // ^ + // task_duration_begin_it + // + // *-----------------*--|----------------|--> inf + // ^ ^ ^ + // insert here | insert here + // subtract resource here + + TimeAvailResMap::iterator inserted_it; + std::tie(inserted_it, ok) = time_avail_res_map.emplace( + task_end_time, task_duration_begin_it->second); + CRANE_ASSERT_MSG(ok == true, "Insertion must be successful."); + + if (task_duration_begin_it->first == expected_start_time) { + // Situation #1 + CRANE_ASSERT(task_res_in_node <= task_duration_begin_it->second); + task_duration_begin_it->second -= task_res_in_node; + } else { + // Situation #2 + std::tie(inserted_it, ok) = time_avail_res_map.emplace( + expected_start_time, task_duration_begin_it->second); + CRANE_ASSERT_MSG(ok == true, "Insertion must be successful."); - time_delta_res_map[expected_start_time] -= task_res_in_node; - time_delta_res_map[task_end_time] += task_res_in_node; + CRANE_ASSERT(task_res_in_node <= inserted_it->second); + inserted_it->second -= task_res_in_node; + } + } else { + --task_duration_begin_it; + // Situation #3 + // task duration + // |<-------------->| + // *-------*----------*---------*------------ + // ^ ^ + // task_duration_begin_it task_duration_end_it + // *-------*------|---*---------*--|--------- + // ^ ^ ^ ^ ^ + // insert here | | | insert here + // subtract at these points + // + // Or Situation #4 + // task duration + // |<----------------->| + // *-------*----------*--------*------------ + // ^ ^ + // task_duration_begin_it task_duration_end_it + + // std::prev can be used without any check here. + // There will always be one time point (now) before task_end_time. + + if (task_duration_begin_it->first != expected_start_time) { + // Situation #3 (begin) + TimeAvailResMap::iterator inserted_it; + std::tie(inserted_it, ok) = time_avail_res_map.emplace( + expected_start_time, task_duration_begin_it->second); + CRANE_ASSERT_MSG(ok == true, "Insertion must be successful."); + + task_duration_begin_it = inserted_it; + } - if constexpr (kAlgoTraceOutput) { - std::string str; - str.append( - fmt::format("Subtracted resource from Craned {} at now+{}s to " - "now+{}s: cpu: {}, mem: {}\n", - craned_id, - absl::ToInt64Seconds(expected_start_time - - time_delta_res_map.begin()->first), - absl::ToInt64Seconds(task_end_time - - time_delta_res_map.begin()->first), - task_res_in_node.allocatable_res.cpu_count, - task_res_in_node.allocatable_res.memory_bytes)); - ResourceInNode cur_res; - for (auto& [time, res] : time_delta_res_map) { - cur_res += res; - str.append(fmt::format( - "Craned {} res_avail at now+{}s: cpu: {}, mem: {}\n", craned_id, - absl::ToInt64Seconds(time - time_delta_res_map.begin()->first), - cur_res.allocatable_res.cpu_count, - cur_res.allocatable_res.memory_bytes)); + auto task_duration_end_it = + std::prev(time_avail_res_map.upper_bound(task_end_time)); + + // Subtract the required resources within the interval. + for (auto in_duration_it = task_duration_begin_it; + in_duration_it != task_duration_end_it; in_duration_it++) { + CRANE_ASSERT(task_res_in_node <= in_duration_it->second); + in_duration_it->second -= task_res_in_node; + } + + // Check if we need to insert a time point at + // `task_end_time_plus_1s` Detailed version of why: Assume one task + // end at time x-2, If "x+2" lies in the interval [x, y-1) in + // time__avail_res__map, + // for example, x+2 in [x, y-1) with the available resources amount + // `a`, we need to divide this interval into to two intervals: [x, + // x+2]: a-k, where k is the resource amount that task requires, + // [x+3, y-1]: a + // Therefore, we need to insert a key-value at x+3 to preserve this. + // However, if the length of [x+3, y-1] is 0, or more simply, the + // point x+3 exists, there's no need to save the interval [x+3, + // y-1]. + if (task_duration_end_it->first != task_end_time) { + // Situation #3 (end) + TimeAvailResMap::iterator inserted_it; + std::tie(inserted_it, ok) = time_avail_res_map.emplace( + task_end_time, task_duration_end_it->second); + CRANE_ASSERT_MSG(ok == true, "Insertion must be successful."); + + CRANE_ASSERT(task_res_in_node <= task_duration_end_it->second); + task_duration_end_it->second -= task_res_in_node; } - CRANE_TRACE("{}", str); } } } diff --git a/src/CraneCtld/TaskScheduler.h b/src/CraneCtld/TaskScheduler.h index 54f9d017..4c1c2e91 100644 --- a/src/CraneCtld/TaskScheduler.h +++ b/src/CraneCtld/TaskScheduler.h @@ -146,43 +146,37 @@ class MinLoadFirst : public INodeSelectionAlgo { static constexpr bool kAlgoRedundantNode = true; /** - * This map stores how much available resource changes over time on each - * Craned node. + * This map stores how much resource is available + * over time on each Craned node. * * In this map, the time is discretized by 1s and starts from absl::Now(). * {x: a, y: b, z: c, ...} means that - * At time x, the amount of available resources is a. - * At time y, the amount of available resources is a + b. - * At time z, the amount of available resources is a + b + c. + * In time interval [x, y-1], the amount of available resources is a. + * In time interval [y, z-1], the amount of available resources is b. + * In time interval [z, ...], the amount of available resources is c. */ - using TimeDeltaResMap = std::map; - struct TimeDeltaResTracker { + using TimeAvailResMap = std::map; + struct TimeAvailResTracker { const CranedId craned_id; - TimeDeltaResMap::const_iterator it; - const TimeDeltaResMap::const_iterator end; + TimeAvailResMap::const_iterator it; + const TimeAvailResMap::const_iterator end; const ResourceInNode* task_res; - ResourceInNode avail_res; - int count; - bool satisfied{false}; + bool satisfied; - TimeDeltaResTracker(const CranedId& craned_id, - const TimeDeltaResMap::const_iterator& it, - const TimeDeltaResMap::const_iterator& end, + TimeAvailResTracker(const CranedId& craned_id, + const TimeAvailResMap::const_iterator& begin, + const TimeAvailResMap::const_iterator& end, const ResourceInNode* task_res) - : craned_id(craned_id), it(it), end(end), task_res(task_res) { - avail_res = it->second; - count = (it != end && *task_res <= avail_res) ? 1 : 0; - } + : craned_id(craned_id), + it(begin), + end(end), + task_res(task_res), + satisfied(false) {} bool genNext() { - if (count != 0) satisfied = !satisfied; - count = 0; - if (++it == end) return false; - if (*task_res <= avail_res) count -= 1; - avail_res += it->second; - if (*task_res <= avail_res) count += 1; - return true; + satisfied = *task_res <= it->second; + return ++it != end; } }; @@ -190,7 +184,7 @@ class MinLoadFirst : public INodeSelectionAlgo { // Craned_ids are sorted by cost. std::set> cost_node_id_set; std::unordered_map node_cost_map; - std::unordered_map node_time_delta_res_map; + std::unordered_map node_time_avail_res_map; std::unordered_map node_res_total_map; void setCost(const CranedId& craned_id, uint64_t cost) {