From 5e54ce522243f53c7288af92dce689ed7f5c840e Mon Sep 17 00:00:00 2001 From: NamelessOIer <70872016+NamelessOIer@users.noreply.github.com> Date: Fri, 18 Oct 2024 12:39:17 +0800 Subject: [PATCH] fix node inconsistency --- src/CraneCtld/TaskScheduler.cpp | 56 ++++++--------- src/CraneCtld/TaskScheduler.h | 70 +++++++++++++++++-- .../PublicHeader/include/crane/PublicHeader.h | 2 - 3 files changed, 88 insertions(+), 40 deletions(-) diff --git a/src/CraneCtld/TaskScheduler.cpp b/src/CraneCtld/TaskScheduler.cpp index a76a8231..061f2488 100644 --- a/src/CraneCtld/TaskScheduler.cpp +++ b/src/CraneCtld/TaskScheduler.cpp @@ -2083,53 +2083,43 @@ bool MinLoadFirst::CalculateRunningNodesAndStartTime_( for (CranedId craned_id : craned_indexes_) { auto& time_avail_res_map = node_selection_info.node_time_avail_res_map.at(craned_id); - auto it = time_avail_res_map.begin(); - trackers.emplace_back(craned_id, it, time_avail_res_map.end(), + trackers.emplace_back(craned_id, time_avail_res_map.begin(), + time_avail_res_map.end(), &task->Resources().at(craned_id)); pq.emplace(&trackers.back()); } - int satisfied_count = 0; - absl::Time last_time = absl::InfinitePast(); + TrackerList satisfied_trackers(task->node_num); + while (!pq.empty()) { absl::Time time = pq.top()->it->first; if (time - now > kAlgoMaxTimeWindow) { return false; } while (!pq.empty() && pq.top()->it->first == time) { - auto tmp = pq.top(); + auto tracker = pq.top(); pq.pop(); - satisfied_count -= tmp->satisfied; - if (tmp->genNext()) pq.emplace(tmp); - satisfied_count += tmp->satisfied; - } - if constexpr (kAlgoTraceOutput) { - CRANE_TRACE("At time now+{}s, {} nodes are satisfied.", - (time - now) / absl::Seconds(1), satisfied_count); - for (auto& tracker : trackers) { - if (tracker.satisfied) { - CRANE_TRACE("Craned {} is satisfied.", tracker.craned_id); - } + if (tracker->satisfied()) { + satisfied_trackers.try_push_back(tracker, time); + } else { + satisfied_trackers.try_erase(tracker); } - } - if (satisfied_count < task->node_num) { - last_time = absl::InfinitePast(); - } else { - if (last_time == absl::InfinitePast()) { - last_time = time; + if (tracker->genNext()) { + pq.emplace(tracker); } - if (time - last_time >= task->time_limit || pq.empty()) { - *start_time = last_time; - craned_ids->clear(); - for (auto& tracker : trackers) { - if (tracker.satisfied) { - craned_ids->emplace_back(tracker.craned_id); - if (craned_ids->size() >= task->node_num) break; - } - } - CRANE_ASSERT(craned_ids->size() == task->node_num); - return true; + } + if (pq.empty() || satisfied_trackers.kth_time() + task->time_limit <= + pq.top()->it->first) { + *start_time = satisfied_trackers.kth_time(); + craned_ids->clear(); + auto it = satisfied_trackers.kth_elem; + while (it != nullptr) { + craned_ids->emplace_back(it->tracker_ptr->craned_id); + it = it->prev; } + CRANE_ASSERT(*start_time != absl::InfiniteFuture()); + CRANE_ASSERT(craned_ids->size() == task->node_num); + return true; } } diff --git a/src/CraneCtld/TaskScheduler.h b/src/CraneCtld/TaskScheduler.h index 3cd0fd77..34234349 100644 --- a/src/CraneCtld/TaskScheduler.h +++ b/src/CraneCtld/TaskScheduler.h @@ -166,7 +166,7 @@ class MinLoadFirst : public INodeSelectionAlgo { TimeAvailResMap::const_iterator it; const TimeAvailResMap::const_iterator end; const ResourceInNode* task_res; - bool satisfied; + void* tracker_list_elem; TimeAvailResTracker(const CranedId& craned_id, const TimeAvailResMap::const_iterator& begin, @@ -176,11 +176,71 @@ class MinLoadFirst : public INodeSelectionAlgo { it(begin), end(end), task_res(task_res), - satisfied(false) {} + tracker_list_elem(nullptr) {} - bool genNext() { - satisfied = *task_res <= it->second; - return ++it != end; + bool satisfied() const { return *task_res <= it->second; } + + bool genNext() { return ++it != end; } + }; + + struct TrackerList { + struct TrackerListElem { + TimeAvailResTracker* tracker_ptr; + absl::Time time; + TrackerListElem* prev; + TrackerListElem* next; + bool first_k; + }; + + size_t size; + int node_num; + TrackerListElem* tail; + TrackerListElem* kth_elem; + + TrackerList(int node_num) + : size(0), node_num(node_num), tail(nullptr), kth_elem(nullptr) {} + + void try_push_back(TimeAvailResTracker* it, absl::Time time) { + if (it->tracker_list_elem) return; + TrackerListElem* elem = + new TrackerListElem{it, time, nullptr, nullptr, ++size <= node_num}; + if (tail) { + elem->prev = tail; + tail->next = elem; + } + if (size == node_num) { + kth_elem = elem; + } + tail = elem; + it->tracker_list_elem = elem; + } + + void try_erase(TimeAvailResTracker* it) { + TrackerListElem* elem = + static_cast(it->tracker_list_elem); + if (!elem) return; + if (elem->first_k && kth_elem) { + kth_elem = kth_elem->next; + if (kth_elem) { + kth_elem->first_k = true; + } + } + if (elem->prev) { + elem->prev->next = elem->next; + } + if (elem->next) { + elem->next->prev = elem->prev; + } + if (elem == tail) { + tail = elem->prev; + } + elem->tracker_ptr->tracker_list_elem = nullptr; + delete elem; + --size; + } + + absl::Time kth_time() const { + return kth_elem ? kth_elem->time : absl::InfiniteFuture(); } }; diff --git a/src/Utilities/PublicHeader/include/crane/PublicHeader.h b/src/Utilities/PublicHeader/include/crane/PublicHeader.h index 83cabe7c..4cf91633 100644 --- a/src/Utilities/PublicHeader/include/crane/PublicHeader.h +++ b/src/Utilities/PublicHeader/include/crane/PublicHeader.h @@ -165,8 +165,6 @@ using SlotId = std::string; // Model the allocatable resources on a craned node. // It contains CPU and memory by now. -// Delta of resources is used, so it can be negative. -// Using unsigned type do not affect the correctness. struct AllocatableResource { cpu_t cpu_count{0};