diff --git a/src/Craned/TaskManager.cpp b/src/Craned/TaskManager.cpp index 1ef5d37b8..c9ca75eee 100644 --- a/src/Craned/TaskManager.cpp +++ b/src/Craned/TaskManager.cpp @@ -25,6 +25,7 @@ #include #include "CforedClient.h" +#include "CgroupManager.h" #include "CtldClient.h" #include "crane/String.h" #include "protos/CraneSubprocess.pb.h" @@ -236,17 +237,25 @@ void TaskManager::TaskStopAndDoStatusChangeAsync(uint32_t task_id) { if (instance->task.type() == crane::grpc::Batch || instance->IsCrun()) { // For a Batch task, the end of the process means it is done. if (sigchld_info.is_terminated_by_signal) { - if (instance->cancelled_by_user) + if (instance->termination_event == TerminatedBy::CANCELLED_BY_USER) ActivateTaskStatusChangeAsync_( task_id, crane::grpc::TaskStatus::Cancelled, sigchld_info.value + ExitCode::kTerminationSignalBase, std::nullopt); - else if (instance->terminated_by_timeout) + else if (instance->termination_event == + TerminatedBy::TERMINATION_BY_TIMEOUT) ActivateTaskStatusChangeAsync_( task_id, crane::grpc::TaskStatus::ExceedTimeLimit, sigchld_info.value + ExitCode::kTerminationSignalBase, std::nullopt); - else + else if (instance->termination_event == + TerminatedBy::TERMINATION_BY_OOM) { + CRANE_INFO("oom event"); + ActivateTaskStatusChangeAsync_( + task_id, crane::grpc::TaskStatus::Failed, + sigchld_info.value + ExitCode::kTerminationSignalBase, + std::nullopt); + } else ActivateTaskStatusChangeAsync_( task_id, crane::grpc::TaskStatus::Failed, sigchld_info.value + ExitCode::kTerminationSignalBase, @@ -362,6 +371,11 @@ void TaskManager::EvCleanSigchldQueueCb_() { ProcessInstance* proc = proc_iter->second; uint32_t task_id = instance->task.task_id(); + if (g_cg_mgr->GetCgroupVersion() == + CgroupConstant::CgroupVersion::CGROUP_V1) { + instance->monitor_thread_stop = true; + } + // Remove indexes from pid to ProcessInstance* m_pid_proc_map_.erase(proc_iter); m_pid_task_map_.erase(task_iter); @@ -953,6 +967,14 @@ void TaskManager::LaunchTaskInstanceMt_(TaskInstance* instance) { instance->cgroup = cg; instance->cgroup_path = cg->GetCgroupString(); + if (g_cg_mgr->GetCgroupVersion() == + CgroupConstant::CgroupVersion::CGROUP_V1) { + SetCgroupV1TerminationOOM_(instance); + } else if (g_cg_mgr->GetCgroupVersion() == + CgroupConstant::CgroupVersion::CGROUP_V2) { + SetCgroupV2TerminationOOM_(instance); + } + // Calloc tasks have no scripts to run. Just return. if (instance->IsCalloc()) return; @@ -1187,7 +1209,7 @@ void TaskManager::EvTaskTimerCb_(task_id_t task_id) { if (task_instance->task.type() == crane::grpc::Batch) { TaskTerminateQueueElem ev_task_terminate{ .task_id = task_id, - .terminated_by_timeout = true, + .terminated_by = TerminatedBy::TERMINATION_BY_TIMEOUT, }; m_task_terminate_queue_.enqueue(ev_task_terminate); m_terminate_task_async_handle_->send(); @@ -1198,6 +1220,51 @@ void TaskManager::EvTaskTimerCb_(task_id_t task_id) { } } +void TaskManager::EvOomCb_(std::string oom_path, TaskManager* task_manager, + task_id_t task_id) { + auto this_ = task_manager; + + std::ifstream events_file(oom_path); + if (events_file.is_open()) { + std::string line; + while (std::getline(events_file, line)) { + if (line.find("oom_kill ") != std::string::npos) { + std::istringstream iss(line); + std::string field; + int value = 0; + + iss >> field >> value; + if (value > 0) { + CRANE_TRACE("Task #{} exceeded its memory limit. Terminating it...", + task_id); + + auto task_it = this_->m_task_map_.find(task_id); + if (task_it == this_->m_task_map_.end()) { + CRANE_TRACE("Task #{} has already been removed.", task_id); + return; + } + TaskInstance* task_instance = task_it->second.get(); + + if (task_instance->task.type() == crane::grpc::Batch) { + TaskTerminateQueueElem ev_task_terminate{ + .task_id = task_id, + .terminated_by = TerminatedBy::TERMINATION_BY_OOM, + }; + this_->m_task_terminate_queue_.enqueue(ev_task_terminate); + this_->m_terminate_task_async_handle_->send(); + } else { + this_->ActivateTaskStatusChangeAsync_( + task_id, crane::grpc::TaskStatus::Failed, + ExitCode::kExitCodeOOMError, std::nullopt); + } + } + break; + } + } + } + events_file.close(); +} + void TaskManager::EvCleanTerminateTaskQueueCb_() { TaskTerminateQueueElem elem; while (m_task_terminate_queue_.try_dequeue(elem)) { @@ -1236,9 +1303,16 @@ void TaskManager::EvCleanTerminateTaskQueueCb_() { TaskInstance* task_instance = iter->second.get(); - if (elem.terminated_by_user) task_instance->cancelled_by_user = true; + if (task_instance->termination_event == TerminatedBy::NONE) { + if (elem.terminated_by == TerminatedBy::CANCELLED_BY_USER) { + task_instance->termination_event = TerminatedBy::CANCELLED_BY_USER; + } else if (elem.terminated_by == TerminatedBy::TERMINATION_BY_TIMEOUT) { + task_instance->termination_event = TerminatedBy::TERMINATION_BY_TIMEOUT; + } else if (elem.terminated_by == TerminatedBy::TERMINATION_BY_OOM) { + task_instance->termination_event = TerminatedBy::TERMINATION_BY_OOM; + } + } if (elem.mark_as_orphaned) task_instance->orphaned = true; - if (elem.terminated_by_timeout) task_instance->terminated_by_timeout = true; int sig = SIGTERM; // For BatchTask if (task_instance->IsCrun()) sig = SIGHUP; @@ -1258,7 +1332,8 @@ void TaskManager::EvCleanTerminateTaskQueueCb_() { } void TaskManager::TerminateTaskAsync(uint32_t task_id) { - TaskTerminateQueueElem elem{.task_id = task_id, .terminated_by_user = true}; + TaskTerminateQueueElem elem{.task_id = task_id, + .terminated_by = TerminatedBy::CANCELLED_BY_USER}; m_task_terminate_queue_.enqueue(elem); m_terminate_task_async_handle_->send(); } @@ -1339,8 +1414,9 @@ void TaskManager::EvCleanChangeTaskTimeLimitQueueCb_() { if (now - start_time >= new_time_limit) { // If the task times out, terminate it. - TaskTerminateQueueElem ev_task_terminate{.task_id = elem.task_id, - .terminated_by_timeout = true}; + TaskTerminateQueueElem ev_task_terminate{ + .task_id = elem.task_id, + .terminated_by = TerminatedBy::TERMINATION_BY_TIMEOUT}; m_task_terminate_queue_.enqueue(ev_task_terminate); m_terminate_task_async_handle_->send(); diff --git a/src/Craned/TaskManager.h b/src/Craned/TaskManager.h index 3ee9449b6..c0118769a 100644 --- a/src/Craned/TaskManager.h +++ b/src/Craned/TaskManager.h @@ -22,6 +22,13 @@ // Precompiled header comes first. #include +#include +#include +#include +#include + +#include +#include #include "CgroupManager.h" #include "crane/PasswordEntry.h" @@ -29,6 +36,9 @@ namespace Craned { +inline const char* MemoryEvents = "memory.events"; +inline const char* MemoryOomControl = "memory.oom_control"; + struct BatchMetaInProcessInstance { std::string parsed_output_file_pattern; std::string parsed_error_file_pattern; @@ -132,12 +142,22 @@ struct ProcSigchldInfo { std::shared_ptr resend_timer{nullptr}; }; +enum class TerminatedBy : uint64_t { + NONE = 0, + CANCELLED_BY_USER, + TERMINATION_BY_TIMEOUT, + TERMINATION_BY_OOM +}; + // Todo: Task may consists of multiple subtasks struct TaskInstance { ~TaskInstance() { if (termination_timer) { termination_timer->close(); } + if (termination_oom) { + termination_oom->close(); + } if (this->IsCrun()) { close(dynamic_cast(meta.get())->msg_fd); @@ -156,12 +176,13 @@ struct TaskInstance { std::string cgroup_path; CgroupInterface* cgroup; std::shared_ptr termination_timer{nullptr}; + std::shared_ptr termination_oom{nullptr}; + std::atomic monitor_thread_stop{true}; // Task execution results bool orphaned{false}; CraneErr err_before_exec{CraneErr::kOk}; - bool cancelled_by_user{false}; - bool terminated_by_timeout{false}; + TerminatedBy termination_event{TerminatedBy::NONE}; ProcSigchldInfo sigchld_info{}; absl::flat_hash_map> processes; @@ -232,10 +253,11 @@ class TaskManager { struct TaskTerminateQueueElem { uint32_t task_id{0}; - bool terminated_by_user{false}; // If the task is canceled by user, - // task->status=Cancelled - bool terminated_by_timeout{false}; // If the task is canceled by user, - // task->status=Timeout + TerminatedBy terminated_by{TerminatedBy::NONE}; + // If the task is canceled by user, + // task->status=Cancelled + // If the task is canceled by user, + // task->status=Timeout bool mark_as_orphaned{false}; }; @@ -301,6 +323,109 @@ class TaskManager { instance->termination_timer = termination_handel; } + void SetCgroupV1TerminationOOM_(TaskInstance* instance) { + using namespace CgroupConstant; + std::string slice = "/"; + std::string oom_control_full_path; + + oom_control_full_path = + CgroupConstant::RootCgroupFullPath + slice + + std::string(GetControllerStringView(Controller::MEMORY_CONTROLLER)) + + slice + instance->cgroup_path + "/" + MemoryOomControl; + + int oom_control_fd = open(oom_control_full_path.c_str(), O_RDONLY); + + int efd = eventfd(0, EFD_CLOEXEC); + if (efd == -1) { + CRANE_ERROR("Failed to create event fd"); + return; + } + + int epfd = epoll_create1(EPOLL_CLOEXEC); + if (epfd == -1) { + CRANE_ERROR("Failed to create epoll fd"); + } + + struct epoll_event event; + event.events = EPOLLIN | EPOLLERR | EPOLLHUP; + event.data.fd = efd; + + if (epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &event) == -1) { + CRANE_ERROR("Failed to set epoll_stl ."); + close(oom_control_fd); + close(efd); + close(epfd); + return; + } + + std::string cgroup_event_control = + CgroupConstant::RootCgroupFullPath + slice + + std::string(GetControllerStringView(Controller::MEMORY_CONTROLLER)) + + slice + instance->cgroup_path + "/" + "cgroup.event_control"; + std::ofstream eventControlFile(cgroup_event_control); + if (!eventControlFile.is_open()) { + CRANE_ERROR("Failed to open cgroup.event_control file."); + close(oom_control_fd); + close(efd); + close(epfd); + return; + } + + std::stringstream ss; + ss << efd << " " << oom_control_fd; + eventControlFile << ss.str(); + eventControlFile.close(); + close(oom_control_fd); + instance->monitor_thread_stop = false; + std::thread([this, epfd, efd, oom_control_full_path, instance]() { + struct epoll_event events[32]; + uint64_t buf = 0; + while (!instance->monitor_thread_stop) { + // check monitor_thread_stop every 500 ms + int nfds = epoll_wait(epfd, events, 32, 500); + for (int i = 0; i < nfds; i++) { + if (events[i].events & EPOLLIN) { + ssize_t readBytes = read(events[i].data.fd, &buf, sizeof(buf)); + if (readBytes < 0) { + close(efd); + close(epfd); + return; + } + EvOomCb_(oom_control_full_path, this, instance->task.task_id()); + close(efd); + close(epfd); + return; + } + if (events[i].events & (EPOLLERR | EPOLLHUP)) { + CRANE_ERROR("Error or hangup on eventfd"); + } + } + } + close(efd); + close(epfd); + }).detach(); + } + + void SetCgroupV2TerminationOOM_(TaskInstance* instance) { + std::string slice = "/"; + std::string memory_events_full_path = CgroupConstant::RootCgroupFullPath + + slice + instance->cgroup_path + + slice + MemoryEvents; + + auto ev = m_uvw_loop_->resource(); + + ev->on( + [this, memory_events_full_path, instance]( + uvw::fs_event_event& event, uvw::fs_event_handle& handle) { + EvOomCb_(memory_events_full_path, this, instance->task.task_id()); + }); + + ev->start(memory_events_full_path, + uvw::fs_event_handle::event_flags::RECURSIVE); + + instance->termination_oom = ev; + } + static void DelTerminationTimer_(TaskInstance* instance) { // Close handle before free instance->termination_timer->close(); @@ -375,6 +500,9 @@ class TaskManager { void EvSigchldTimerCb_(ProcSigchldInfo* sigchld_info); + void EvOomCb_(std::string oom_path, TaskManager* task_manager, + task_id_t task_id); + std::shared_ptr m_uvw_loop_; std::shared_ptr m_sigchld_handle_; diff --git a/src/Utilities/PublicHeader/include/crane/PublicHeader.h b/src/Utilities/PublicHeader/include/crane/PublicHeader.h index 4cf91633e..0e2ad6bf7 100644 --- a/src/Utilities/PublicHeader/include/crane/PublicHeader.h +++ b/src/Utilities/PublicHeader/include/crane/PublicHeader.h @@ -112,6 +112,7 @@ enum ExitCodeEnum : uint16_t { kExitCodeExceedTimeLimit, kExitCodeCranedDown, kExitCodeExecutionError, + kExitCodeOOMError, __MAX_EXIT_CODE // NOLINT(bugprone-reserved-identifier) };