Skip to content

Commit

Permalink
fix node inconsistency
Browse files Browse the repository at this point in the history
  • Loading branch information
NamelessOIer committed Nov 28, 2024
1 parent 6de6b95 commit 5e54ce5
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 40 deletions.
56 changes: 23 additions & 33 deletions src/CraneCtld/TaskScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2083,53 +2083,43 @@ bool MinLoadFirst::CalculateRunningNodesAndStartTime_(
for (CranedId craned_id : craned_indexes_) {
auto& time_avail_res_map =
node_selection_info.node_time_avail_res_map.at(craned_id);
auto it = time_avail_res_map.begin();
trackers.emplace_back(craned_id, it, time_avail_res_map.end(),
trackers.emplace_back(craned_id, time_avail_res_map.begin(),
time_avail_res_map.end(),
&task->Resources().at(craned_id));
pq.emplace(&trackers.back());
}

int satisfied_count = 0;
absl::Time last_time = absl::InfinitePast();
TrackerList satisfied_trackers(task->node_num);

while (!pq.empty()) {
absl::Time time = pq.top()->it->first;
if (time - now > kAlgoMaxTimeWindow) {
return false;
}
while (!pq.empty() && pq.top()->it->first == time) {
auto tmp = pq.top();
auto tracker = pq.top();
pq.pop();
satisfied_count -= tmp->satisfied;
if (tmp->genNext()) pq.emplace(tmp);
satisfied_count += tmp->satisfied;
}
if constexpr (kAlgoTraceOutput) {
CRANE_TRACE("At time now+{}s, {} nodes are satisfied.",
(time - now) / absl::Seconds(1), satisfied_count);
for (auto& tracker : trackers) {
if (tracker.satisfied) {
CRANE_TRACE("Craned {} is satisfied.", tracker.craned_id);
}
if (tracker->satisfied()) {
satisfied_trackers.try_push_back(tracker, time);
} else {
satisfied_trackers.try_erase(tracker);
}
}
if (satisfied_count < task->node_num) {
last_time = absl::InfinitePast();
} else {
if (last_time == absl::InfinitePast()) {
last_time = time;
if (tracker->genNext()) {
pq.emplace(tracker);
}
if (time - last_time >= task->time_limit || pq.empty()) {
*start_time = last_time;
craned_ids->clear();
for (auto& tracker : trackers) {
if (tracker.satisfied) {
craned_ids->emplace_back(tracker.craned_id);
if (craned_ids->size() >= task->node_num) break;
}
}
CRANE_ASSERT(craned_ids->size() == task->node_num);
return true;
}
if (pq.empty() || satisfied_trackers.kth_time() + task->time_limit <=
pq.top()->it->first) {
*start_time = satisfied_trackers.kth_time();
craned_ids->clear();
auto it = satisfied_trackers.kth_elem;
while (it != nullptr) {
craned_ids->emplace_back(it->tracker_ptr->craned_id);
it = it->prev;
}
CRANE_ASSERT(*start_time != absl::InfiniteFuture());
CRANE_ASSERT(craned_ids->size() == task->node_num);
return true;
}
}

Expand Down
70 changes: 65 additions & 5 deletions src/CraneCtld/TaskScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ class MinLoadFirst : public INodeSelectionAlgo {
TimeAvailResMap::const_iterator it;
const TimeAvailResMap::const_iterator end;
const ResourceInNode* task_res;
bool satisfied;
void* tracker_list_elem;

TimeAvailResTracker(const CranedId& craned_id,
const TimeAvailResMap::const_iterator& begin,
Expand All @@ -176,11 +176,71 @@ class MinLoadFirst : public INodeSelectionAlgo {
it(begin),
end(end),
task_res(task_res),
satisfied(false) {}
tracker_list_elem(nullptr) {}

bool genNext() {
satisfied = *task_res <= it->second;
return ++it != end;
bool satisfied() const { return *task_res <= it->second; }

bool genNext() { return ++it != end; }
};

struct TrackerList {
struct TrackerListElem {
TimeAvailResTracker* tracker_ptr;
absl::Time time;
TrackerListElem* prev;
TrackerListElem* next;
bool first_k;
};

size_t size;
int node_num;
TrackerListElem* tail;
TrackerListElem* kth_elem;

TrackerList(int node_num)
: size(0), node_num(node_num), tail(nullptr), kth_elem(nullptr) {}

void try_push_back(TimeAvailResTracker* it, absl::Time time) {
if (it->tracker_list_elem) return;
TrackerListElem* elem =
new TrackerListElem{it, time, nullptr, nullptr, ++size <= node_num};
if (tail) {
elem->prev = tail;
tail->next = elem;
}
if (size == node_num) {
kth_elem = elem;
}
tail = elem;
it->tracker_list_elem = elem;
}

void try_erase(TimeAvailResTracker* it) {
TrackerListElem* elem =
static_cast<TrackerListElem*>(it->tracker_list_elem);
if (!elem) return;
if (elem->first_k && kth_elem) {
kth_elem = kth_elem->next;
if (kth_elem) {
kth_elem->first_k = true;
}
}
if (elem->prev) {
elem->prev->next = elem->next;
}
if (elem->next) {
elem->next->prev = elem->prev;
}
if (elem == tail) {
tail = elem->prev;
}
elem->tracker_ptr->tracker_list_elem = nullptr;
delete elem;
--size;
}

absl::Time kth_time() const {
return kth_elem ? kth_elem->time : absl::InfiniteFuture();
}
};

Expand Down
2 changes: 0 additions & 2 deletions src/Utilities/PublicHeader/include/crane/PublicHeader.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,6 @@ using SlotId = std::string;

// Model the allocatable resources on a craned node.
// It contains CPU and memory by now.
// Delta of resources is used, so it can be negative.
// Using unsigned type do not affect the correctness.
struct AllocatableResource {
cpu_t cpu_count{0};

Expand Down

0 comments on commit 5e54ce5

Please sign in to comment.