From c6f86fd49c2264b47436da9a588f2591292171d7 Mon Sep 17 00:00:00 2001 From: vansangpfiev Date: Fri, 17 Jan 2025 12:31:21 +0700 Subject: [PATCH] chore: cleanup (#1849) * chore: cleanup * chore: spawn process --------- Co-authored-by: vansangpfiev --- engine/CMakeLists.txt | 1 + engine/cli/CMakeLists.txt | 1 + engine/cli/commands/server_start_cmd.cc | 30 +-- .../extensions/python-engine/python_engine.cc | 214 +++++++----------- .../extensions/python-engine/python_engine.h | 62 +---- engine/services/hardware_service.cc | 48 ++-- engine/utils/process/utils.cc | 106 +++++++++ engine/utils/process/utils.h | 25 ++ 8 files changed, 257 insertions(+), 230 deletions(-) create mode 100644 engine/utils/process/utils.cc create mode 100644 engine/utils/process/utils.h diff --git a/engine/CMakeLists.txt b/engine/CMakeLists.txt index ae354500b..6a8d441a0 100644 --- a/engine/CMakeLists.txt +++ b/engine/CMakeLists.txt @@ -149,6 +149,7 @@ add_executable(${TARGET_NAME} main.cc ${CMAKE_CURRENT_SOURCE_DIR}/extensions/python-engine/python_engine.cc ${CMAKE_CURRENT_SOURCE_DIR}/utils/dylib_path_manager.cc + ${CMAKE_CURRENT_SOURCE_DIR}/utils/process/utils.cc ${CMAKE_CURRENT_SOURCE_DIR}/extensions/remote-engine/remote_engine.cc diff --git a/engine/cli/CMakeLists.txt b/engine/cli/CMakeLists.txt index f9e8b673f..0f0b2b48d 100644 --- a/engine/cli/CMakeLists.txt +++ b/engine/cli/CMakeLists.txt @@ -95,6 +95,7 @@ add_executable(${TARGET_NAME} main.cc ${CMAKE_CURRENT_SOURCE_DIR}/../utils/file_manager_utils.cc ${CMAKE_CURRENT_SOURCE_DIR}/../utils/curl_utils.cc ${CMAKE_CURRENT_SOURCE_DIR}/../utils/system_info_utils.cc + ${CMAKE_CURRENT_SOURCE_DIR}/../utils/process/utils.cc ) target_link_libraries(${TARGET_NAME} PRIVATE CLI11::CLI11) diff --git a/engine/cli/commands/server_start_cmd.cc b/engine/cli/commands/server_start_cmd.cc index 4268f6362..fdc33e50a 100644 --- a/engine/cli/commands/server_start_cmd.cc +++ b/engine/cli/commands/server_start_cmd.cc @@ -3,6 +3,7 @@ #include "services/engine_service.h" #include "utils/cortex_utils.h" #include "utils/file_manager_utils.h" +#include "utils/process/utils.h" #if defined(_WIN32) || defined(_WIN64) #include "utils/widechar_conv.h" @@ -103,25 +104,26 @@ bool ServerStartCmd::Exec(const std::string& host, int port, } #else - // Unix-like system-specific code to fork a child process - pid_t pid = fork(); + std::vector commands; + // Some engines requires to add lib search path before process being created + auto download_srv = std::make_shared(); + auto dylib_path_mng = std::make_shared(); + auto db_srv = std::make_shared(); + EngineService(download_srv, dylib_path_mng, db_srv).RegisterEngineLibPath(); + std::string p = cortex_utils::GetCurrentPath() + "/" + exe; + commands.push_back(p); + commands.push_back("--config_file_path"); + commands.push_back(get_config_file_path()); + commands.push_back("--data_folder_path"); + commands.push_back(get_data_folder_path()); + commands.push_back("--loglevel"); + commands.push_back(log_level_); + auto pid = cortex::process::SpawnProcess(commands); if (pid < 0) { // Fork failed std::cerr << "Could not start server: " << std::endl; return false; - } else if (pid == 0) { - // Some engines requires to add lib search path before process being created - auto download_srv = std::make_shared(); - auto dylib_path_mng = std::make_shared(); - auto db_srv = std::make_shared(); - EngineService(download_srv, dylib_path_mng, db_srv).RegisterEngineLibPath(); - - std::string p = cortex_utils::GetCurrentPath() + "/" + exe; - execl(p.c_str(), exe.c_str(), "--start-server", "--config_file_path", - get_config_file_path().c_str(), "--data_folder_path", - get_data_folder_path().c_str(), "--loglevel", log_level_.c_str(), - (char*)0); } else { // Parent process if (!TryConnectToServer(host, port)) { diff --git a/engine/extensions/python-engine/python_engine.cc b/engine/extensions/python-engine/python_engine.cc index 13e31195d..d34f75c08 100644 --- a/engine/extensions/python-engine/python_engine.cc +++ b/engine/extensions/python-engine/python_engine.cc @@ -3,19 +3,67 @@ #include #include #include + namespace python_engine { +namespace { constexpr const int k200OK = 200; constexpr const int k400BadRequest = 400; constexpr const int k409Conflict = 409; constexpr const int k500InternalServerError = 500; constexpr const int kFileLoggerOption = 0; +size_t StreamWriteCallback(char* ptr, size_t size, size_t nmemb, + void* userdata) { + auto* context = static_cast(userdata); + std::string chunk(ptr, size * nmemb); + + context->buffer += chunk; + + // Process complete lines + size_t pos; + while ((pos = context->buffer.find('\n')) != std::string::npos) { + std::string line = context->buffer.substr(0, pos); + context->buffer = context->buffer.substr(pos + 1); + LOG_DEBUG << "line: " << line; + + // Skip empty lines + if (line.empty() || line == "\r") + continue; + + if (line == "data: [DONE]") { + Json::Value status; + status["is_done"] = true; + status["has_error"] = false; + status["is_stream"] = true; + status["status_code"] = 200; + (*context->callback)(std::move(status), Json::Value()); + break; + } + + // Parse the JSON + Json::Value chunk_json; + chunk_json["data"] = line + "\n\n"; + Json::Reader reader; + + Json::Value status; + status["is_done"] = false; + status["has_error"] = false; + status["is_stream"] = true; + status["status_code"] = 200; + (*context->callback)(std::move(status), std::move(chunk_json)); + } + + return size * nmemb; +} + static size_t WriteCallback(char* ptr, size_t size, size_t nmemb, std::string* data) { data->append(ptr, size * nmemb); return size * nmemb; } +} // namespace + PythonEngine::PythonEngine() : q_(4 /*n_parallel*/, "python_engine") {} PythonEngine::~PythonEngine() { @@ -31,118 +79,17 @@ config::PythonModelConfig* PythonEngine::GetModelConfig( } return nullptr; } -std::string constructWindowsCommandLine(const std::vector& args) { - std::string cmdLine; - for (const auto& arg : args) { - // Simple escaping for Windows command line - std::string escapedArg = arg; - if (escapedArg.find(' ') != std::string::npos) { - // Wrap in quotes and escape existing quotes - for (char& c : escapedArg) { - if (c == '"') - c = '\\'; - } - escapedArg = "\"" + escapedArg + "\""; - } - cmdLine += escapedArg + " "; - } - return cmdLine; -} - -std::vector convertToArgv(const std::vector& args) { - std::vector argv; - for (const auto& arg : args) { - argv.push_back(const_cast(arg.c_str())); - } - argv.push_back(nullptr); - return argv; -} - -pid_t PythonEngine::SpawnProcess(const std::string& model, - const std::vector& command) { - try { -#ifdef _WIN32 - // Windows process creation - STARTUPINFOA si = {0}; - PROCESS_INFORMATION pi = {0}; - si.cb = sizeof(si); - - // Construct command line - std::string cmdLine = constructWindowsCommandLine(command); - - // Convert string to char* for Windows API - char commandBuffer[4096]; - strncpy_s(commandBuffer, cmdLine.c_str(), sizeof(commandBuffer)); - - if (!CreateProcessA(NULL, // lpApplicationName - commandBuffer, // lpCommandLine - NULL, // lpProcessAttributes - NULL, // lpThreadAttributes - FALSE, // bInheritHandles - 0, // dwCreationFlags - NULL, // lpEnvironment - NULL, // lpCurrentDirectory - &si, // lpStartupInfo - &pi // lpProcessInformation - )) { - throw std::runtime_error("Failed to create process on Windows"); - } - - // Store the process ID - pid_t pid = pi.dwProcessId; - processMap[model] = pid; - - // Close handles to avoid resource leaks - CloseHandle(pi.hProcess); - CloseHandle(pi.hThread); - - return pid; - -#elif __APPLE__ || __linux__ - // POSIX process creation - pid_t pid; - - // Convert command vector to char*[] - std::vector argv = convertToArgv(command); - // for (auto c : command) { - // std::cout << c << " " << std::endl; - // } - - // Use posix_spawn for cross-platform compatibility - int spawn_result = posix_spawn(&pid, // pid output - command[0].c_str(), // executable path - NULL, // file actions - NULL, // spawn attributes - argv.data(), // argument vector - NULL // environment (inherit) - ); - - if (spawn_result != 0) { - throw std::runtime_error("Failed to spawn process"); - } - - // Store the process ID - processMap[model] = pid; - return pid; -#else -#error Unsupported platform -#endif - } catch (const std::exception& e) { - LOG_ERROR << "Process spawning error: " << e.what(); - return -1; - } -} bool PythonEngine::TerminateModelProcess(const std::string& model) { - auto it = processMap.find(model); - if (it == processMap.end()) { + auto it = process_map_.find(model); + if (it == process_map_.end()) { LOG_ERROR << "No process found for model: " << model << ", removing from list running models."; models_.erase(model); return false; } -#ifdef _WIN32 +#if defined(_WIN32) HANDLE hProcess = OpenProcess(PROCESS_TERMINATE, FALSE, it->second); if (hProcess == NULL) { LOG_ERROR << "Failed to open process"; @@ -153,20 +100,21 @@ bool PythonEngine::TerminateModelProcess(const std::string& model) { CloseHandle(hProcess); if (terminated) { - processMap.erase(it); + process_map_.erase(it); return true; } -#elif __APPLE__ || __linux__ +#elif defined(__APPLE__) || defined(__linux__) int result = kill(it->second, SIGTERM); if (result == 0) { - processMap.erase(it); + process_map_.erase(it); return true; } #endif return false; } + CurlResponse PythonEngine::MakeGetRequest(const std::string& model, const std::string& path) { auto const& config = models_[model]; @@ -182,6 +130,7 @@ CurlResponse PythonEngine::MakeGetRequest(const std::string& model, } return response; } + CurlResponse PythonEngine::MakeDeleteRequest(const std::string& model, const std::string& path) { auto const& config = models_[model]; @@ -304,7 +253,7 @@ void PythonEngine::LoadModel( auto data_folder_path = std::filesystem::path(model_folder_path) / std::filesystem::path("venv"); try { -#ifdef _WIN32 +#if defined(_WIN32) auto executable = std::filesystem::path(data_folder_path) / std::filesystem::path("Scripts"); #else @@ -356,7 +305,8 @@ void PythonEngine::LoadModel( // Add the parsed arguments to the command command.insert(command.end(), args.begin(), args.end()); - pid = SpawnProcess(model, command); + pid = cortex::process::SpawnProcess(command); + process_map_[model] = pid; if (pid == -1) { std::unique_lock lock(models_mutex_); if (models_.find(model) != models_.end()) { @@ -416,16 +366,16 @@ void PythonEngine::UnloadModel( return; } - const std::string& model = (*json_body)["model"].asString(); + auto model = (*json_body)["model"].asString(); { - std::unique_lock lock(models_mutex_); if (TerminateModelProcess(model)) { + std::unique_lock lock(models_mutex_); models_.erase(model); } else { Json::Value error; error["error"] = "Fail to terminate process with id: " + - std::to_string(processMap[model]); + std::to_string(process_map_[model]); Json::Value status; status["is_done"] = true; status["has_error"] = true; @@ -448,7 +398,9 @@ void PythonEngine::UnloadModel( void PythonEngine::HandleChatCompletion( std::shared_ptr json_body, - std::function&& callback) {} + std::function&& callback) { + LOG_WARN << "Does not support yet!"; +} CurlResponse PythonEngine::MakeStreamPostRequest( const std::string& model, const std::string& path, const std::string& body, @@ -509,7 +461,7 @@ CurlResponse PythonEngine::MakeStreamPostRequest( void PythonEngine::HandleInference( std::shared_ptr json_body, std::function&& callback) { - if (!json_body->isMember("model")) { + if (json_body && !json_body->isMember("model")) { Json::Value error; error["error"] = "Missing required field: model is required!"; Json::Value status; @@ -520,14 +472,14 @@ void PythonEngine::HandleInference( callback(std::move(status), std::move(error)); return; } + std::string method = "post"; std::string path = "/inference"; - std::string transform_request = - (*json_body).get("transform_request", "").asString(); - std::string transform_response = + auto transform_request = (*json_body).get("transform_request", "").asString(); + auto transform_response = (*json_body).get("transform_response", "").asString(); - std::string model = (*json_body)["model"].asString(); - Json::Value body = (*json_body)["body"]; + auto model = (*json_body)["model"].asString(); + auto& body = (*json_body)["body"]; if (models_.find(model) == models_.end()) { Json::Value error; @@ -680,10 +632,13 @@ void PythonEngine::HandleInference( callback(std::move(status), std::move(response_json)); } } + Json::Value PythonEngine::GetRemoteModels() { return Json::Value(); } + void PythonEngine::StopInferencing(const std::string& model_id) {} + void PythonEngine::HandleRouteRequest( std::shared_ptr json_body, std::function&& callback) { @@ -700,14 +655,13 @@ void PythonEngine::HandleRouteRequest( callback(std::move(status), std::move(error)); return; } - std::string method = (*json_body)["method"].asString(); - std::string path = (*json_body)["path"].asString(); - std::string transform_request = - (*json_body).get("transform_request", "").asString(); - std::string transform_response = + auto method = (*json_body)["method"].asString(); + auto path = (*json_body)["path"].asString(); + auto transform_request = (*json_body).get("transform_request", "").asString(); + auto transform_response = (*json_body).get("transform_response", "").asString(); - std::string model = (*json_body)["model"].asString(); - Json::Value body = (*json_body)["body"]; + auto model = (*json_body)["model"].asString(); + auto& body = (*json_body)["body"]; if (models_.find(model) == models_.end()) { Json::Value error; @@ -864,10 +818,11 @@ void PythonEngine::GetModelStatus( callback(std::move(status), std::move(error)); return; } + auto model = json_body->get("model", "").asString(); auto model_config = models_[model]; auto health_endpoint = model_config.heath_check; - auto pid = processMap[model]; + auto pid = process_map_[model]; auto is_process_live = process_status_utils::IsProcessRunning(pid); auto response_health = MakeGetRequest(model, health_endpoint.path); @@ -960,9 +915,4 @@ void PythonEngine::Unload(EngineUnloadOption opts) { } }; -// extern "C" { -// EngineI* get_engine() { -// return new PythonEngine(); -// } -// } } // namespace python_engine \ No newline at end of file diff --git a/engine/extensions/python-engine/python_engine.h b/engine/extensions/python-engine/python_engine.h index 4e4203c52..70a9b9829 100644 --- a/engine/extensions/python-engine/python_engine.h +++ b/engine/extensions/python-engine/python_engine.h @@ -16,17 +16,8 @@ #include "utils/file_manager_utils.h" #include "utils/process_status_utils.h" #include "utils/curl_utils.h" -#ifdef _WIN32 -#include -#include -using pid_t = DWORD; -#elif __APPLE__ || __linux__ -#include -#include -#include -#include -#include -#endif +#include "utils/process/utils.h" + // Helper for CURL response namespace python_engine { struct StreamContext { @@ -34,50 +25,6 @@ struct StreamContext { std::string buffer; }; -static size_t StreamWriteCallback(char* ptr, size_t size, size_t nmemb, - void* userdata) { - auto* context = static_cast(userdata); - std::string chunk(ptr, size * nmemb); - - context->buffer += chunk; - - // Process complete lines - size_t pos; - while ((pos = context->buffer.find('\n')) != std::string::npos) { - std::string line = context->buffer.substr(0, pos); - context->buffer = context->buffer.substr(pos + 1); - LOG_DEBUG << "line: "<callback)(std::move(status), Json::Value()); - break; - } - - // Parse the JSON - Json::Value chunk_json; - chunk_json["data"] = line + "\n\n"; - Json::Reader reader; - - Json::Value status; - status["is_done"] = false; - status["has_error"] = false; - status["is_stream"] = true; - status["status_code"] = 200; - (*context->callback)(std::move(status), std::move(chunk_json)); - } - - return size * nmemb; -} - struct CurlResponse { std::string body; bool error{false}; @@ -93,10 +40,9 @@ class PythonEngine : public EngineI { std::unordered_map models_; extensions::TemplateRenderer renderer_; std::unique_ptr async_file_logger_; - std::unordered_map processMap; + std::unordered_map process_map_; trantor::ConcurrentTaskQueue q_; - // Helper functions CurlResponse MakePostRequest(const std::string& model, const std::string& path, @@ -111,8 +57,6 @@ class PythonEngine : public EngineI { const std::function& callback); // Process manager functions - pid_t SpawnProcess(const std::string& model, - const std::vector& command); bool TerminateModelProcess(const std::string& model); // Internal model management diff --git a/engine/services/hardware_service.cc b/engine/services/hardware_service.cc index a0fb93878..00e48d27a 100644 --- a/engine/services/hardware_service.cc +++ b/engine/services/hardware_service.cc @@ -9,7 +9,10 @@ #endif #include "cli/commands/cortex_upd_cmd.h" #include "database/hardware.h" +#include "services/engine_service.h" #include "utils/cortex_utils.h" +#include "utils/dylib_path_manager.h" +#include "utils/process/utils.h" #if defined(__linux__) #include "services/download_service.h" #endif @@ -176,41 +179,36 @@ bool HardwareService::Restart(const std::string& host, int port) { } #else - // Unix-like system-specific code to fork a child process - pid_t pid = fork(); - + std::vector commands; + // Some engines requires to add lib search path before process being created + auto download_srv = std::make_shared(); + auto dylib_path_mng = std::make_shared(); + auto db_srv = std::make_shared(); + EngineService(download_srv, dylib_path_mng, db_srv).RegisterEngineLibPath(); + std::string p = cortex_utils::GetCurrentPath() + "/" + exe; + commands.push_back(p); + commands.push_back("--ignore_cout"); + commands.push_back("--config_file_path"); + commands.push_back(get_config_file_path()); + commands.push_back("--data_folder_path"); + commands.push_back(get_data_folder_path()); + commands.push_back("--loglevel"); + commands.push_back(luh::LogLevelStr(luh::global_log_level)); + auto pid = cortex::process::SpawnProcess(commands); if (pid < 0) { // Fork failed std::cerr << "Could not start server: " << std::endl; return false; - } else if (pid == 0) { - // No need to configure LD_LIBRARY_PATH for macOS -#if !defined(__APPLE__) || !defined(__MACH__) - const char* name = "LD_LIBRARY_PATH"; - auto data = getenv(name); - std::string v; - if (auto g = getenv(name); g) { - v += g; - } - CTL_INF("LD_LIBRARY_PATH: " << v); - auto llamacpp_path = file_manager_utils::GetCudaToolkitPath(kLlamaRepo); - auto trt_path = file_manager_utils::GetCudaToolkitPath(kTrtLlmRepo); - - auto new_v = trt_path.string() + ":" + llamacpp_path.string() + ":" + v; - setenv(name, new_v.c_str(), true); - CTL_INF("LD_LIBRARY_PATH: " << getenv(name)); -#endif - std::string p = cortex_utils::GetCurrentPath() + "/" + exe; - execl(p.c_str(), exe.c_str(), "--ignore_cout", "--config_file_path", - get_config_file_path().c_str(), "--data_folder_path", - get_data_folder_path().c_str(), "--loglevel", - luh::LogLevelStr(luh::global_log_level).c_str(), (char*)0); } else { // Parent process if (!TryConnectToServer(host, port)) { return false; } + std::cout << "Server started" << std::endl; + std::cout << "API Documentation available at: http://" << host << ":" + << port << std::endl; } + #endif return true; } diff --git a/engine/utils/process/utils.cc b/engine/utils/process/utils.cc new file mode 100644 index 000000000..c89dc7371 --- /dev/null +++ b/engine/utils/process/utils.cc @@ -0,0 +1,106 @@ +#include "utils/process/utils.h" +#include "utils/logging_utils.h" + +namespace cortex::process { + +std::string ConstructWindowsCommandLine(const std::vector& args) { + std::string cmd_line; + for (const auto& arg : args) { + // Simple escaping for Windows command line + std::string escaped_arg = arg; + if (escaped_arg.find(' ') != std::string::npos) { + // Wrap in quotes and escape existing quotes + for (char& c : escaped_arg) { + if (c == '"') + c = '\\'; + } + escaped_arg = "\"" + escaped_arg + "\""; + } + cmd_line += escaped_arg + " "; + } + return cmd_line; +} + +std::vector ConvertToArgv(const std::vector& args) { + std::vector argv; + for (const auto& arg : args) { + argv.push_back(const_cast(arg.c_str())); + } + argv.push_back(nullptr); + return argv; +} + +pid_t SpawnProcess(const std::vector& command) { + try { +#if defined(_WIN32) + // Windows process creation + STARTUPINFOA si = {0}; + PROCESS_INFORMATION pi = {0}; + si.cb = sizeof(si); + + // Construct command line + std::string cmd_line = ConstructWindowsCommandLine(command); + + // Convert string to char* for Windows API + char command_buffer[4096]; + strncpy_s(command_buffer, cmd_line.c_str(), sizeof(command_buffer)); + + if (!CreateProcessA(NULL, // lpApplicationName + command_buffer, // lpCommandLine + NULL, // lpProcessAttributes + NULL, // lpThreadAttributes + FALSE, // bInheritHandles + 0, // dwCreationFlags + NULL, // lpEnvironment + NULL, // lpCurrentDirectory + &si, // lpStartupInfo + &pi // lpProcessInformation + )) { + throw std::runtime_error("Failed to create process on Windows"); + } + + // Store the process ID + pid_t pid = pi.dwProcessId; + + // Close handles to avoid resource leaks + CloseHandle(pi.hProcess); + CloseHandle(pi.hThread); + + return pid; + +#elif defined(__APPLE__) || defined(__linux__) + // POSIX process creation + pid_t pid; + + // Convert command vector to char*[] + auto argv = ConvertToArgv(command); + + // Use posix_spawn for cross-platform compatibility + auto spawn_result = posix_spawn(&pid, // pid output + command[0].c_str(), // executable path + NULL, // file actions + NULL, // spawn attributes + argv.data(), // argument vector +#if defined(__linux__) + environ // environment (inherit) +#else + NULL +#endif + ); + + if (spawn_result != 0) { + throw std::runtime_error("Failed to spawn process"); + } + + return pid; + +#else +#error Unsupported platform +#endif + } catch (const std::exception& e) { + LOG_ERROR << "Process spawning error: " << e.what(); + return -1; + } +} + +} // namespace cortex::process \ No newline at end of file diff --git a/engine/utils/process/utils.h b/engine/utils/process/utils.h new file mode 100644 index 000000000..9332607e9 --- /dev/null +++ b/engine/utils/process/utils.h @@ -0,0 +1,25 @@ +#pragma once + +#if defined(_WIN32) +#include +#include +using pid_t = DWORD; +#elif defined(__APPLE__) || defined(__linux__) +#include +#include +#include +#include +#include +#endif + +#include +#include + +namespace cortex::process { +std::string ConstructWindowsCommandLine(const std::vector& args); + +std::vector ConvertToArgv(const std::vector& args); + +pid_t SpawnProcess(const std::vector& command); + +} \ No newline at end of file