Skip to content

Commit

Permalink
add oom event notify
Browse files Browse the repository at this point in the history
  • Loading branch information
wtr0504 committed Dec 18, 2024
1 parent 815c797 commit 74b5fe0
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 15 deletions.
94 changes: 85 additions & 9 deletions src/Craned/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <sys/wait.h>

#include "CforedClient.h"
#include "CgroupManager.h"
#include "CtldClient.h"
#include "crane/String.h"
#include "protos/CraneSubprocess.pb.h"
Expand Down Expand Up @@ -236,17 +237,25 @@ void TaskManager::TaskStopAndDoStatusChangeAsync(uint32_t task_id) {
if (instance->task.type() == crane::grpc::Batch || instance->IsCrun()) {
// For a Batch task, the end of the process means it is done.
if (sigchld_info.is_terminated_by_signal) {
if (instance->cancelled_by_user)
if (instance->termination_event == TerminatedBy::CANCELLED_BY_USER)
ActivateTaskStatusChangeAsync_(
task_id, crane::grpc::TaskStatus::Cancelled,
sigchld_info.value + ExitCode::kTerminationSignalBase,
std::nullopt);
else if (instance->terminated_by_timeout)
else if (instance->termination_event ==
TerminatedBy::TERMINATION_BY_TIMEOUT)
ActivateTaskStatusChangeAsync_(
task_id, crane::grpc::TaskStatus::ExceedTimeLimit,
sigchld_info.value + ExitCode::kTerminationSignalBase,
std::nullopt);
else
else if (instance->termination_event ==
TerminatedBy::TERMINATION_BY_OOM) {
CRANE_INFO("oom event");
ActivateTaskStatusChangeAsync_(
task_id, crane::grpc::TaskStatus::Failed,
sigchld_info.value + ExitCode::kTerminationSignalBase,
std::nullopt);
} else
ActivateTaskStatusChangeAsync_(
task_id, crane::grpc::TaskStatus::Failed,
sigchld_info.value + ExitCode::kTerminationSignalBase,
Expand Down Expand Up @@ -362,6 +371,11 @@ void TaskManager::EvCleanSigchldQueueCb_() {
ProcessInstance* proc = proc_iter->second;
uint32_t task_id = instance->task.task_id();

if (g_cg_mgr->GetCgroupVersion() ==
CgroupConstant::CgroupVersion::CGROUP_V1) {
instance->monitor_thread_stop = true;
}

// Remove indexes from pid to ProcessInstance*
m_pid_proc_map_.erase(proc_iter);
m_pid_task_map_.erase(task_iter);
Expand Down Expand Up @@ -953,6 +967,14 @@ void TaskManager::LaunchTaskInstanceMt_(TaskInstance* instance) {
instance->cgroup = cg;
instance->cgroup_path = cg->GetCgroupString();

if (g_cg_mgr->GetCgroupVersion() ==
CgroupConstant::CgroupVersion::CGROUP_V1) {
SetCgroupV1TerminationOOM_(instance);
} else if (g_cg_mgr->GetCgroupVersion() ==
CgroupConstant::CgroupVersion::CGROUP_V2) {
SetCgroupV2TerminationOOM_(instance);
}

// Calloc tasks have no scripts to run. Just return.
if (instance->IsCalloc()) return;

Expand Down Expand Up @@ -1187,7 +1209,7 @@ void TaskManager::EvTaskTimerCb_(task_id_t task_id) {
if (task_instance->task.type() == crane::grpc::Batch) {
TaskTerminateQueueElem ev_task_terminate{
.task_id = task_id,
.terminated_by_timeout = true,
.terminated_by = TerminatedBy::TERMINATION_BY_TIMEOUT,
};
m_task_terminate_queue_.enqueue(ev_task_terminate);
m_terminate_task_async_handle_->send();
Expand All @@ -1198,6 +1220,51 @@ void TaskManager::EvTaskTimerCb_(task_id_t task_id) {
}
}

void TaskManager::EvOomCb_(std::string oom_path, TaskManager* task_manager,
task_id_t task_id) {
auto this_ = task_manager;

std::ifstream events_file(oom_path);
if (events_file.is_open()) {
std::string line;
while (std::getline(events_file, line)) {
if (line.find("oom_kill ") != std::string::npos) {
std::istringstream iss(line);
std::string field;
int value = 0;

iss >> field >> value;
if (value > 0) {
CRANE_TRACE("Task #{} exceeded its memory limit. Terminating it...",
task_id);

auto task_it = this_->m_task_map_.find(task_id);
if (task_it == this_->m_task_map_.end()) {
CRANE_TRACE("Task #{} has already been removed.", task_id);
return;
}
TaskInstance* task_instance = task_it->second.get();

if (task_instance->task.type() == crane::grpc::Batch) {
TaskTerminateQueueElem ev_task_terminate{
.task_id = task_id,
.terminated_by = TerminatedBy::TERMINATION_BY_OOM,
};
this_->m_task_terminate_queue_.enqueue(ev_task_terminate);
this_->m_terminate_task_async_handle_->send();
} else {
this_->ActivateTaskStatusChangeAsync_(
task_id, crane::grpc::TaskStatus::Failed,
ExitCode::kExitCodeOOMError, std::nullopt);
}
}
break;
}
}
}
events_file.close();
}

void TaskManager::EvCleanTerminateTaskQueueCb_() {
TaskTerminateQueueElem elem;
while (m_task_terminate_queue_.try_dequeue(elem)) {
Expand Down Expand Up @@ -1236,9 +1303,16 @@ void TaskManager::EvCleanTerminateTaskQueueCb_() {

TaskInstance* task_instance = iter->second.get();

if (elem.terminated_by_user) task_instance->cancelled_by_user = true;
if (task_instance->termination_event == TerminatedBy::NONE) {
if (elem.terminated_by == TerminatedBy::CANCELLED_BY_USER) {
task_instance->termination_event = TerminatedBy::CANCELLED_BY_USER;
} else if (elem.terminated_by == TerminatedBy::TERMINATION_BY_TIMEOUT) {
task_instance->termination_event = TerminatedBy::TERMINATION_BY_TIMEOUT;
} else if (elem.terminated_by == TerminatedBy::TERMINATION_BY_OOM) {
task_instance->termination_event = TerminatedBy::TERMINATION_BY_OOM;
}
}
if (elem.mark_as_orphaned) task_instance->orphaned = true;
if (elem.terminated_by_timeout) task_instance->terminated_by_timeout = true;

int sig = SIGTERM; // For BatchTask
if (task_instance->IsCrun()) sig = SIGHUP;
Expand All @@ -1258,7 +1332,8 @@ void TaskManager::EvCleanTerminateTaskQueueCb_() {
}

void TaskManager::TerminateTaskAsync(uint32_t task_id) {
TaskTerminateQueueElem elem{.task_id = task_id, .terminated_by_user = true};
TaskTerminateQueueElem elem{.task_id = task_id,
.terminated_by = TerminatedBy::CANCELLED_BY_USER};
m_task_terminate_queue_.enqueue(elem);
m_terminate_task_async_handle_->send();
}
Expand Down Expand Up @@ -1339,8 +1414,9 @@ void TaskManager::EvCleanChangeTaskTimeLimitQueueCb_() {

if (now - start_time >= new_time_limit) {
// If the task times out, terminate it.
TaskTerminateQueueElem ev_task_terminate{.task_id = elem.task_id,
.terminated_by_timeout = true};
TaskTerminateQueueElem ev_task_terminate{
.task_id = elem.task_id,
.terminated_by = TerminatedBy::TERMINATION_BY_TIMEOUT};
m_task_terminate_queue_.enqueue(ev_task_terminate);
m_terminate_task_async_handle_->send();

Expand Down
140 changes: 134 additions & 6 deletions src/Craned/TaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,23 @@
// Precompiled header comes first.

#include <grp.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <sys/inotify.h>
#include <unistd.h>

#include <atomic>
#include <memory>

#include "CgroupManager.h"
#include "crane/PasswordEntry.h"
#include "protos/Crane.grpc.pb.h"

namespace Craned {

inline const char* MemoryEvents = "memory.events";
inline const char* MemoryOomControl = "memory.oom_control";

struct BatchMetaInProcessInstance {
std::string parsed_output_file_pattern;
std::string parsed_error_file_pattern;
Expand Down Expand Up @@ -132,12 +142,22 @@ struct ProcSigchldInfo {
std::shared_ptr<uvw::timer_handle> resend_timer{nullptr};
};

enum class TerminatedBy : uint64_t {
NONE = 0,
CANCELLED_BY_USER,
TERMINATION_BY_TIMEOUT,
TERMINATION_BY_OOM
};

// Todo: Task may consists of multiple subtasks
struct TaskInstance {
~TaskInstance() {
if (termination_timer) {
termination_timer->close();
}
if (termination_oom) {
termination_oom->close();
}

if (this->IsCrun()) {
close(dynamic_cast<CrunMetaInTaskInstance*>(meta.get())->msg_fd);
Expand All @@ -156,12 +176,13 @@ struct TaskInstance {
std::string cgroup_path;
CgroupInterface* cgroup;
std::shared_ptr<uvw::timer_handle> termination_timer{nullptr};
std::shared_ptr<uvw::fs_event_handle> termination_oom{nullptr};
std::atomic<bool> monitor_thread_stop{true};

// Task execution results
bool orphaned{false};
CraneErr err_before_exec{CraneErr::kOk};
bool cancelled_by_user{false};
bool terminated_by_timeout{false};
TerminatedBy termination_event{TerminatedBy::NONE};
ProcSigchldInfo sigchld_info{};

absl::flat_hash_map<pid_t, std::unique_ptr<ProcessInstance>> processes;
Expand Down Expand Up @@ -232,10 +253,11 @@ class TaskManager {

struct TaskTerminateQueueElem {
uint32_t task_id{0};
bool terminated_by_user{false}; // If the task is canceled by user,
// task->status=Cancelled
bool terminated_by_timeout{false}; // If the task is canceled by user,
// task->status=Timeout
TerminatedBy terminated_by{TerminatedBy::NONE};
// If the task is canceled by user,
// task->status=Cancelled
// If the task is canceled by user,
// task->status=Timeout
bool mark_as_orphaned{false};
};

Expand Down Expand Up @@ -301,6 +323,109 @@ class TaskManager {
instance->termination_timer = termination_handel;
}

void SetCgroupV1TerminationOOM_(TaskInstance* instance) {
using namespace CgroupConstant;
std::string slice = "/";
std::string oom_control_full_path;

oom_control_full_path =
CgroupConstant::RootCgroupFullPath + slice +
std::string(GetControllerStringView(Controller::MEMORY_CONTROLLER)) +
slice + instance->cgroup_path + "/" + MemoryOomControl;

int oom_control_fd = open(oom_control_full_path.c_str(), O_RDONLY);

int efd = eventfd(0, EFD_CLOEXEC);
if (efd == -1) {
CRANE_ERROR("Failed to create event fd");
return;
}

int epfd = epoll_create1(EPOLL_CLOEXEC);
if (epfd == -1) {
CRANE_ERROR("Failed to create epoll fd");
}

struct epoll_event event;
event.events = EPOLLIN | EPOLLERR | EPOLLHUP;
event.data.fd = efd;

if (epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &event) == -1) {
CRANE_ERROR("Failed to set epoll_stl .");
close(oom_control_fd);
close(efd);
close(epfd);
return;
}

std::string cgroup_event_control =
CgroupConstant::RootCgroupFullPath + slice +
std::string(GetControllerStringView(Controller::MEMORY_CONTROLLER)) +
slice + instance->cgroup_path + "/" + "cgroup.event_control";
std::ofstream eventControlFile(cgroup_event_control);
if (!eventControlFile.is_open()) {
CRANE_ERROR("Failed to open cgroup.event_control file.");
close(oom_control_fd);
close(efd);
close(epfd);
return;
}

std::stringstream ss;
ss << efd << " " << oom_control_fd;
eventControlFile << ss.str();
eventControlFile.close();
close(oom_control_fd);
instance->monitor_thread_stop = false;
std::thread([this, epfd, efd, oom_control_full_path, instance]() {
struct epoll_event events[32];
uint64_t buf = 0;
while (!instance->monitor_thread_stop) {
// check monitor_thread_stop every 500 ms
int nfds = epoll_wait(epfd, events, 32, 500);
for (int i = 0; i < nfds; i++) {
if (events[i].events & EPOLLIN) {
ssize_t readBytes = read(events[i].data.fd, &buf, sizeof(buf));
if (readBytes < 0) {
close(efd);
close(epfd);
return;
}
EvOomCb_(oom_control_full_path, this, instance->task.task_id());
close(efd);
close(epfd);
return;
}
if (events[i].events & (EPOLLERR | EPOLLHUP)) {
CRANE_ERROR("Error or hangup on eventfd");
}
}
}
close(efd);
close(epfd);
}).detach();
}

void SetCgroupV2TerminationOOM_(TaskInstance* instance) {
std::string slice = "/";
std::string memory_events_full_path = CgroupConstant::RootCgroupFullPath +
slice + instance->cgroup_path +
slice + MemoryEvents;

auto ev = m_uvw_loop_->resource<uvw::fs_event_handle>();

ev->on<uvw::fs_event_event>(
[this, memory_events_full_path, instance](
uvw::fs_event_event& event, uvw::fs_event_handle& handle) {
EvOomCb_(memory_events_full_path, this, instance->task.task_id());
});

ev->start(memory_events_full_path,
uvw::fs_event_handle::event_flags::RECURSIVE);

instance->termination_oom = ev;
}

static void DelTerminationTimer_(TaskInstance* instance) {
// Close handle before free
instance->termination_timer->close();
Expand Down Expand Up @@ -375,6 +500,9 @@ class TaskManager {

void EvSigchldTimerCb_(ProcSigchldInfo* sigchld_info);

void EvOomCb_(std::string oom_path, TaskManager* task_manager,
task_id_t task_id);

std::shared_ptr<uvw::loop> m_uvw_loop_;

std::shared_ptr<uvw::signal_handle> m_sigchld_handle_;
Expand Down
1 change: 1 addition & 0 deletions src/Utilities/PublicHeader/include/crane/PublicHeader.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ enum ExitCodeEnum : uint16_t {
kExitCodeExceedTimeLimit,
kExitCodeCranedDown,
kExitCodeExecutionError,
kExitCodeOOMError,

__MAX_EXIT_CODE // NOLINT(bugprone-reserved-identifier)
};
Expand Down

0 comments on commit 74b5fe0

Please sign in to comment.