From 569f5ead9600a6a017eb9e1a7b65f1b422d8886a Mon Sep 17 00:00:00 2001 From: Lev Gromov Date: Sun, 21 Jul 2024 18:58:53 +0200 Subject: [PATCH 01/15] WIP: Thinking of a good class hierarchy --- hpc/LoadBalancer.hpp | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/hpc/LoadBalancer.hpp b/hpc/LoadBalancer.hpp index 8a4e433..15d2d4d 100644 --- a/hpc/LoadBalancer.hpp +++ b/hpc/LoadBalancer.hpp @@ -65,15 +65,33 @@ std::string readUrl(const std::string &filename) return url; } +class JobManager +{ +public: + virtual std::unique_ptr requestModelAccess(const std::string& model_name) = 0; + virtual std::vector getModelNames() = 0; + virtual ~JobManager() {}; +}; + +class HyperQueueJobManager : public JobManager +{ +public: + virtual std::unique_ptr requestModelAccess(const std::string& model_name) override + { + return std::make_unique(model_name); + } +}; + std::mutex job_submission_mutex; int hq_submit_delay_ms = 0; -class HyperQueueJob +class HyperQueueJob : public umbridge::Model { public: static std::atomic job_count; HyperQueueJob(std::string model_name, bool start_client=true, bool force_default_submission_script=false) + : Model(model_name) { job_id = submitHQJob(model_name, force_default_submission_script); From 9ee1822e5bb4f60754d1d53400989b9429635a32 Mon Sep 17 00:00:00 2001 From: Lev Gromov Date: Mon, 29 Jul 2024 11:05:57 +0200 Subject: [PATCH 02/15] WIP: Writing the generic FileBasedJobManager --- hpc/LoadBalancer.hpp | 181 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 147 insertions(+), 34 deletions(-) diff --git a/hpc/LoadBalancer.hpp b/hpc/LoadBalancer.hpp index 15d2d4d..0ca376f 100644 --- a/hpc/LoadBalancer.hpp +++ b/hpc/LoadBalancer.hpp @@ -42,15 +42,15 @@ bool waitForFile(const std::string &filename) return true; } -std::string readUrl(const std::string &filename) +std::string readLineFromFile(const std::string &filename) { std::ifstream file(filename); - std::string url; + std::string line; if (file.is_open()) { std::string file_contents((std::istreambuf_iterator(file)), (std::istreambuf_iterator())); - url = file_contents; + line = file_contents; file.close(); } else @@ -59,31 +59,140 @@ std::string readUrl(const std::string &filename) } // delete the line break - if (!url.empty()) - url.pop_back(); + if (!line.empty()) + line.pop_back(); - return url; + return line; } class JobManager { public: - virtual std::unique_ptr requestModelAccess(const std::string& model_name) = 0; + // Grant exclusive ownership of a model (with a given name) to a caller. + // The returned object MUST release any resources that it holds once it goes out of scope in the code of the caller. + // This can be achieved by returning a unique pointer with an appropriate deleter. + // This method may return a nullptr to deny a request. + virtual std::unique_ptr requestModelAccess(const std::string& model_name) = 0; + + // To initialize the load balancer we first need a list of model names that are available on a server. + // Typically, this can be achieved by simply running the model code and requesting the model names from the server. + // Therefore, the implementation can most likely use the same mechanism that is also used for granting model access, + // which is why this method was placed in this class to avoid code duplication. virtual std::vector getModelNames() = 0; + virtual ~JobManager() {}; }; -class HyperQueueJobManager : public JobManager +class FileBasedModelDeleter { public: - virtual std::unique_ptr requestModelAccess(const std::string& model_name) override - { - return std::make_unique(model_name); + FileBasedModelDeleter(std::string cancelation_command, std::string file_to_delete) + : cancelation_command(cancelation_command), file_to_delete(file_to_delete) {} + + void operator()(umbridge::Model* model) { + delete model; + std::filesystem::remove(file_to_delete); + std::system(cancelation_command.c_str()); } + +protected: + std::string cancelation_command; + std::string file_to_delete; }; -std::mutex job_submission_mutex; -int hq_submit_delay_ms = 0; +class FileBasedJobManager : public JobManager +{ +public: + virtual std::unique_ptr requestModelAccess(const std::string& model_name) override + { + + } +protected: + virtual std::string getSubmissionCommand() = 0; + + std::unique_ptr setDeleter(std::unique_ptr client) + { + FileBasedModelDeleter deleter("",""); + return {client, deleter}; + } + std::unique_ptr submitJobAndStartClient(const std::string& model_name) { + std::string submission_command = getSubmissionCommand(); + std::string job_id = submitJob(submission_command); + std::string server_url = readURL(job_id); + auto client = connectToServer(server_url, model_name); + return client; + } + + std::unique_ptr connectToServer(const std::string& server_url, const std::string& model_name) + { + return std::make_unique(server_url, model_name); + } + + std::string readURL(const std::string& job_id) + { + std::filesystem::path url_file(url_file_prefix + job_id + url_file_suffix); + return readLineFromFile(url_file.string()); + } + + std::string submitJob(const std::string& command) + { + // Add optional delay to job submissions to prevent issues in some cases. + if (submission_delay_ms) { + std::lock_guard lock(submission_mutex); + std::this_thread::sleep_for(std::chrono::milliseconds(submission_delay_ms)); + } + // Submit job and increase job count + std::string command_output = getCommandOutput(command); + job_count++; + + // Extract the actual job id from the command output + return parseJobID(command_output); + } + + virtual std::string parseJobID(const std::string& unparsed_job_id) { + return unparsed_job_id; + } + + std::string selectJobScript(const std::string& model_name, bool force_default_submission_script = false) + { + namespace fs = std::filesystem; + + const fs::path submission_script_model_specific( + submission_script_model_specific_prefix + model_name + submission_script_model_specific_suffix); + std::string job_script = ""; + + // Use model specific job script if available, default otherwise. + if (fs::exists(submission_script_dir / submission_script_model_specific) && !force_default_submission_script) + { + std::string job_script = (submission_script_dir / submission_script_model_specific).string(); + } + else if (fs::exists(submission_script_dir / submission_script_default)) + { + std::string job_script = (submission_script_dir / submission_script_default).string(); + } + else + { + const std::string error_msg = "Job submission script not found: Check that file '" + + (submission_script_dir / submission_script_default).string() + "' exists."; + throw std::runtime_error(error_msg); + } + return job_script; + } + + const std::filesystem::path submission_script_dir; + const std::filesystem::path submission_script_default; + const std::string submission_script_model_specific_prefix; + const std::string submission_script_model_specific_suffix; + + const std::filesystem::path url_dir; + const std::string url_file_prefix; + const std::string url_file_suffix; + + int submission_delay_ms = 0; + std::mutex submission_mutex; + + std::atomic job_count = 0; +}; class HyperQueueJob : public umbridge::Model { @@ -204,24 +313,25 @@ class HyperQueueJob : public umbridge::Model class LoadBalancer : public umbridge::Model { public: - LoadBalancer(std::string name) : umbridge::Model(name) {} + LoadBalancer(std::string name, std::shared_ptr job_manager) + : umbridge::Model(name), job_manager(std::move(job_manager)) {} std::vector GetInputSizes(const json &config_json = json::parse("{}")) const override { - HyperQueueJob hq_job(name); - return hq_job.client_ptr->GetInputSizes(config_json); + auto model = job_manager->requestModelAccess(name); + return model->GetInputSizes(config_json); } std::vector GetOutputSizes(const json &config_json = json::parse("{}")) const override { - HyperQueueJob hq_job(name); - return hq_job.client_ptr->GetOutputSizes(config_json); + auto model = job_manager->requestModelAccess(name); + return model->GetOutputSizes(config_json); } std::vector> Evaluate(const std::vector> &inputs, json config_json = json::parse("{}")) override { - HyperQueueJob hq_job(name); - return hq_job.client_ptr->Evaluate(inputs, config_json); + auto model = job_manager->requestModelAccess(name); + return model->Evaluate(inputs, config_json); } std::vector Gradient(unsigned int outWrt, @@ -230,8 +340,8 @@ class LoadBalancer : public umbridge::Model const std::vector &sens, json config_json = json::parse("{}")) override { - HyperQueueJob hq_job(name); - return hq_job.client_ptr->Gradient(outWrt, inWrt, inputs, sens, config_json); + auto model = job_manager->requestModelAccess(name); + return model->Gradient(outWrt, inWrt, inputs, sens, config_json); } std::vector ApplyJacobian(unsigned int outWrt, @@ -240,8 +350,8 @@ class LoadBalancer : public umbridge::Model const std::vector &vec, json config_json = json::parse("{}")) override { - HyperQueueJob hq_job(name); - return hq_job.client_ptr->ApplyJacobian(outWrt, inWrt, inputs, vec, config_json); + auto model = job_manager->requestModelAccess(name); + return model->ApplyJacobian(outWrt, inWrt, inputs, vec, config_json); } std::vector ApplyHessian(unsigned int outWrt, @@ -252,28 +362,31 @@ class LoadBalancer : public umbridge::Model const std::vector &vec, json config_json = json::parse("{}")) { - HyperQueueJob hq_job(name); - return hq_job.client_ptr->ApplyHessian(outWrt, inWrt1, inWrt2, inputs, sens, vec, config_json); + auto model = job_manager->requestModelAccess(name); + return model->ApplyHessian(outWrt, inWrt1, inWrt2, inputs, sens, vec, config_json); } bool SupportsEvaluate() override { - HyperQueueJob hq_job(name); - return hq_job.client_ptr->SupportsEvaluate(); + auto model = job_manager->requestModelAccess(name); + return model->SupportsEvaluate(); } bool SupportsGradient() override { - HyperQueueJob hq_job(name); - return hq_job.client_ptr->SupportsGradient(); + auto model = job_manager->requestModelAccess(name); + return model->SupportsGradient(); } bool SupportsApplyJacobian() override { - HyperQueueJob hq_job(name); - return hq_job.client_ptr->SupportsApplyJacobian(); + auto model = job_manager->requestModelAccess(name); + return model->SupportsApplyJacobian(); } bool SupportsApplyHessian() override { - HyperQueueJob hq_job(name); - return hq_job.client_ptr->SupportsApplyHessian(); + auto model = job_manager->requestModelAccess(name); + return model->SupportsApplyHessian(); } + +private: + std::shared_ptr job_manager; }; From 40b6b2f4f79f1eaccbc1d5401e33ac7bbb861fc6 Mon Sep 17 00:00:00 2001 From: Lev Gromov Date: Sat, 3 Aug 2024 13:34:52 +0200 Subject: [PATCH 03/15] WIP: Figuring out how to add custom deleter to unique_ptr --- hpc/LoadBalancer.hpp | 50 +++++++++++++++++++++++--------------------- lib/umbridge.h | 1 + 2 files changed, 27 insertions(+), 24 deletions(-) diff --git a/hpc/LoadBalancer.hpp b/hpc/LoadBalancer.hpp index 0ca376f..e8d6158 100644 --- a/hpc/LoadBalancer.hpp +++ b/hpc/LoadBalancer.hpp @@ -42,26 +42,20 @@ bool waitForFile(const std::string &filename) return true; } -std::string readLineFromFile(const std::string &filename) +std::string readLineFromFile(const std::string& filename) { std::ifstream file(filename); - std::string line; + std::string line = ""; + if (file.is_open()) { - std::string file_contents((std::istreambuf_iterator(file)), - (std::istreambuf_iterator())); - line = file_contents; - file.close(); + std::getline(file, line); } else { - std::cerr << "Unable to open file " << filename << " ." << std::endl; + std::cerr << "Unable to open file: " << filename << std::endl; } - // delete the line break - if (!line.empty()) - line.pop_back(); - return line; } @@ -72,7 +66,7 @@ class JobManager // The returned object MUST release any resources that it holds once it goes out of scope in the code of the caller. // This can be achieved by returning a unique pointer with an appropriate deleter. // This method may return a nullptr to deny a request. - virtual std::unique_ptr requestModelAccess(const std::string& model_name) = 0; + virtual std::unique_ptr requestModelAccess(const std::string& model_name) = 0; // To initialize the load balancer we first need a list of model names that are available on a server. // Typically, this can be achieved by simply running the model code and requesting the model names from the server. @@ -99,28 +93,30 @@ class FileBasedModelDeleter std::string cancelation_command; std::string file_to_delete; }; +using unique_file_based_model_ptr = std::unique_ptr; class FileBasedJobManager : public JobManager { public: - virtual std::unique_ptr requestModelAccess(const std::string& model_name) override + virtual std::unique_ptr requestModelAccess(const std::string& model_name) override { - + std::string submission_command = getSubmissionCommand(); + std::string job_id = submitJob(submission_command); + std::string server_url = readURL(job_id); + FileBasedModelDeleter deleter(getCancelationCommand(job_id), getURLFileName(job_id)); + unique_file_based_model_ptr client(new umbridge::HTTPModel(server_url, model_name), deleter); + return client; } protected: virtual std::string getSubmissionCommand() = 0; + virtual std::string getCancelationCommand(const std::string& job_id) = 0; std::unique_ptr setDeleter(std::unique_ptr client) { - FileBasedModelDeleter deleter("",""); - return {client, deleter}; + } std::unique_ptr submitJobAndStartClient(const std::string& model_name) { - std::string submission_command = getSubmissionCommand(); - std::string job_id = submitJob(submission_command); - std::string server_url = readURL(job_id); - auto client = connectToServer(server_url, model_name); - return client; + } std::unique_ptr connectToServer(const std::string& server_url, const std::string& model_name) @@ -128,10 +124,14 @@ class FileBasedJobManager : public JobManager return std::make_unique(server_url, model_name); } + std::string getURLFileName(const std::string& job_id) + { + return url_file_prefix + job_id + url_file_suffix; + } + std::string readURL(const std::string& job_id) { - std::filesystem::path url_file(url_file_prefix + job_id + url_file_suffix); - return readLineFromFile(url_file.string()); + return readLineFromFile(getURLFileName(job_id)); } std::string submitJob(const std::string& command) @@ -181,9 +181,11 @@ class FileBasedJobManager : public JobManager const std::filesystem::path submission_script_dir; const std::filesystem::path submission_script_default; + // Model-specifc job-script format: const std::string submission_script_model_specific_prefix; const std::string submission_script_model_specific_suffix; + // URL file format: const std::filesystem::path url_dir; const std::string url_file_prefix; const std::string url_file_suffix; @@ -314,7 +316,7 @@ class LoadBalancer : public umbridge::Model { public: LoadBalancer(std::string name, std::shared_ptr job_manager) - : umbridge::Model(name), job_manager(std::move(job_manager)) {} + : umbridge::Model(name), job_manager(job_manager) {} std::vector GetInputSizes(const json &config_json = json::parse("{}")) const override { diff --git a/lib/umbridge.h b/lib/umbridge.h index 290ef4f..a779112 100644 --- a/lib/umbridge.h +++ b/lib/umbridge.h @@ -21,6 +21,7 @@ namespace umbridge { class Model { public: Model(std::string name) : name(name) {} + virtual ~Model() {} virtual std::vector GetInputSizes(const json& config_json = json::parse("{}")) const = 0; virtual std::vector GetOutputSizes(const json& config_json = json::parse("{}")) const = 0; From 28aec78bbf897e1eede0ffc3f14c09ebade02713 Mon Sep 17 00:00:00 2001 From: Lev Gromov Date: Sun, 4 Aug 2024 19:00:39 +0200 Subject: [PATCH 04/15] WIP: Still need to handle possible leak in submitJob --- hpc/LoadBalancer.hpp | 73 ++++++++++++++++---------------------------- 1 file changed, 27 insertions(+), 46 deletions(-) diff --git a/hpc/LoadBalancer.hpp b/hpc/LoadBalancer.hpp index e8d6158..7e5f36a 100644 --- a/hpc/LoadBalancer.hpp +++ b/hpc/LoadBalancer.hpp @@ -59,6 +59,8 @@ std::string readLineFromFile(const std::string& filename) return line; } +using SafeUniqueModelPointer = std::unique_ptr>; + class JobManager { public: @@ -66,67 +68,46 @@ class JobManager // The returned object MUST release any resources that it holds once it goes out of scope in the code of the caller. // This can be achieved by returning a unique pointer with an appropriate deleter. // This method may return a nullptr to deny a request. - virtual std::unique_ptr requestModelAccess(const std::string& model_name) = 0; + virtual SafeUniqueModelPointer requestModelAccess(const std::string& model_name) = 0; // To initialize the load balancer we first need a list of model names that are available on a server. // Typically, this can be achieved by simply running the model code and requesting the model names from the server. - // Therefore, the implementation can most likely use the same mechanism that is also used for granting model access, - // which is why this method was placed in this class to avoid code duplication. + // Therefore, the implementation can most likely use the same mechanism that is also used for granting model access. virtual std::vector getModelNames() = 0; virtual ~JobManager() {}; }; -class FileBasedModelDeleter -{ -public: - FileBasedModelDeleter(std::string cancelation_command, std::string file_to_delete) - : cancelation_command(cancelation_command), file_to_delete(file_to_delete) {} - - void operator()(umbridge::Model* model) { - delete model; - std::filesystem::remove(file_to_delete); - std::system(cancelation_command.c_str()); - } - -protected: - std::string cancelation_command; - std::string file_to_delete; -}; -using unique_file_based_model_ptr = std::unique_ptr; - class FileBasedJobManager : public JobManager { public: - virtual std::unique_ptr requestModelAccess(const std::string& model_name) override + virtual SafeUniqueModelPointer requestModelAccess(const std::string& model_name) override { - std::string submission_command = getSubmissionCommand(); - std::string job_id = submitJob(submission_command); + std::string job_id = submitJob(); std::string server_url = readURL(job_id); - FileBasedModelDeleter deleter(getCancelationCommand(job_id), getURLFileName(job_id)); - unique_file_based_model_ptr client(new umbridge::HTTPModel(server_url, model_name), deleter); + + SafeUniqueModelPointer client(new umbridge::HTTPModel(server_url, model_name), createModelDeleter(job_id)); return client; } protected: virtual std::string getSubmissionCommand() = 0; virtual std::string getCancelationCommand(const std::string& job_id) = 0; - std::unique_ptr setDeleter(std::unique_ptr client) - { - - } - std::unique_ptr submitJobAndStartClient(const std::string& model_name) { - - } - - std::unique_ptr connectToServer(const std::string& server_url, const std::string& model_name) + std::function createModelDeleter(const std::string& job_id) { - return std::make_unique(server_url, model_name); + std::string file_to_delete = getURLFileName(job_id); + std::string cancelation_command = getCancelationCommand(job_id); + return [file_to_delete, cancelation_command](umbridge::Model* model) { + delete model; + std::filesystem::remove(file_to_delete); + std::system(cancelation_command.c_str()); + }; } std::string getURLFileName(const std::string& job_id) { - return url_file_prefix + job_id + url_file_suffix; + std::filesystem::path url_file_name(url_file_prefix + job_id + url_file_suffix); + return (url_dir / url_file_name).string(); } std::string readURL(const std::string& job_id) @@ -134,7 +115,7 @@ class FileBasedJobManager : public JobManager return readLineFromFile(getURLFileName(job_id)); } - std::string submitJob(const std::string& command) + std::string submitJob() { // Add optional delay to job submissions to prevent issues in some cases. if (submission_delay_ms) { @@ -142,7 +123,7 @@ class FileBasedJobManager : public JobManager std::this_thread::sleep_for(std::chrono::milliseconds(submission_delay_ms)); } // Submit job and increase job count - std::string command_output = getCommandOutput(command); + std::string command_output = getCommandOutput(getSubmissionCommand()); job_count++; // Extract the actual job id from the command output @@ -179,16 +160,16 @@ class FileBasedJobManager : public JobManager return job_script; } - const std::filesystem::path submission_script_dir; - const std::filesystem::path submission_script_default; + std::filesystem::path submission_script_dir; + std::filesystem::path submission_script_default; // Model-specifc job-script format: - const std::string submission_script_model_specific_prefix; - const std::string submission_script_model_specific_suffix; + std::string submission_script_model_specific_prefix; + std::string submission_script_model_specific_suffix; // URL file format: - const std::filesystem::path url_dir; - const std::string url_file_prefix; - const std::string url_file_suffix; + std::filesystem::path url_dir; + std::string url_file_prefix; + std::string url_file_suffix; int submission_delay_ms = 0; std::mutex submission_mutex; From 7a0d085e8c19ce171476dc9888de2f2bdd579961 Mon Sep 17 00:00:00 2001 From: Lev Gromov Date: Tue, 6 Aug 2024 20:45:25 +0200 Subject: [PATCH 05/15] WIP: Trying to fix possible leak --- hpc/LoadBalancer.hpp | 35 +++++++++++++++++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/hpc/LoadBalancer.hpp b/hpc/LoadBalancer.hpp index 7e5f36a..c67b8cc 100644 --- a/hpc/LoadBalancer.hpp +++ b/hpc/LoadBalancer.hpp @@ -75,9 +75,40 @@ class JobManager // Therefore, the implementation can most likely use the same mechanism that is also used for granting model access. virtual std::vector getModelNames() = 0; - virtual ~JobManager() {}; + virtual ~JobManager() = default; }; +class FileBasedJob +{ +public: + FileBasedJob() + { + + } + + FileBaseJob~() + { + + } + + FileBasedJob(const FileBasedJob& other) = delete; + + std::string getJobID() const + { + return job_id; + } +private: + std::string job_id; +}; + +template +void deleteFileBased(T* t, std::string file_to_delete, std::string cancellation_command) +{ + delete t; + std::filesystem::remove(file_to_delete); + std::system(cancelation_command.c_str()); +} + class FileBasedJobManager : public JobManager { public: @@ -118,7 +149,7 @@ class FileBasedJobManager : public JobManager std::string submitJob() { // Add optional delay to job submissions to prevent issues in some cases. - if (submission_delay_ms) { + if (submission_delay_ms > 0) { std::lock_guard lock(submission_mutex); std::this_thread::sleep_for(std::chrono::milliseconds(submission_delay_ms)); } From a347ce4a3402643600a45e250f001f5bfc8b2b93 Mon Sep 17 00:00:00 2001 From: Lev Gromov Date: Mon, 12 Aug 2024 10:06:05 +0200 Subject: [PATCH 06/15] WIP: Introducing a RAII wrapper for jobs --- hpc/LoadBalancer.hpp | 36 ++++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/hpc/LoadBalancer.hpp b/hpc/LoadBalancer.hpp index c67b8cc..59fb3d9 100644 --- a/hpc/LoadBalancer.hpp +++ b/hpc/LoadBalancer.hpp @@ -78,37 +78,45 @@ class JobManager virtual ~JobManager() = default; }; -class FileBasedJob +class Job { public: - FileBasedJob() - { + virtual ~Job() = default; - } + virtual std::string getJobId() const = 0; +}; - FileBaseJob~() - { +class FileBasedJob : public Job +{ +public: + FileBasedJob(const std::string& command, std::function extract_job_id) + { + std::string command_output = getCommandOutput(command); + id = extract_job_id(command_output); } - FileBasedJob(const FileBasedJob& other) = delete; - - std::string getJobID() const + ~FileBasedJob() { - return job_id; + } -private: - std::string job_id; +protected: + std::string id; }; template -void deleteFileBased(T* t, std::string file_to_delete, std::string cancellation_command) +void deleteFileBased(T* t, std::string file_to_delete, std::string cancel_command) { delete t; std::filesystem::remove(file_to_delete); - std::system(cancelation_command.c_str()); + std::system(cancel_command.c_str()); } +// Basic idea: +// 1. Run some command to request a resource allocation on the HPC cluster. +// 2. Launch a model server in the resource allocation. +// 3. Retrieve the URL of the model server. +// 4. Connect to the model server using the URL. class FileBasedJobManager : public JobManager { public: From 1d96b2721d207b926ad1c0083e1d2c50b3cade2b Mon Sep 17 00:00:00 2001 From: Lev Gromov Date: Mon, 26 Aug 2024 00:33:47 +0200 Subject: [PATCH 07/15] WIP: Deciding between templates and inheritance --- hpc/LoadBalancer.hpp | 157 ++++++++++++++++++++++++++++++------------- 1 file changed, 112 insertions(+), 45 deletions(-) diff --git a/hpc/LoadBalancer.hpp b/hpc/LoadBalancer.hpp index 59fb3d9..47e83c5 100644 --- a/hpc/LoadBalancer.hpp +++ b/hpc/LoadBalancer.hpp @@ -10,7 +10,7 @@ #include "../lib/umbridge.h" // run and get the result of command -std::string getCommandOutput(const std::string command) +std::string getCommandOutput(const std::string& command) { FILE *pipe = popen(command.c_str(), "r"); // execute the command and return the output as stream if (!pipe) @@ -59,8 +59,6 @@ std::string readLineFromFile(const std::string& filename) return line; } -using SafeUniqueModelPointer = std::unique_ptr>; - class JobManager { public: @@ -68,7 +66,7 @@ class JobManager // The returned object MUST release any resources that it holds once it goes out of scope in the code of the caller. // This can be achieved by returning a unique pointer with an appropriate deleter. // This method may return a nullptr to deny a request. - virtual SafeUniqueModelPointer requestModelAccess(const std::string& model_name) = 0; + virtual std::unique_ptr requestModelAccess(const std::string& model_name) = 0; // To initialize the load balancer we first need a list of model names that are available on a server. // Typically, this can be achieved by simply running the model code and requesting the model names from the server. @@ -78,72 +76,148 @@ class JobManager virtual ~JobManager() = default; }; +void remove_trailing_newline(std::string& s) +{ + if (!s.empty() && s.back() == '\n') + { + s.pop_back(); + } +} + class Job { public: + Job() = default; + Job(Job &other) = delete; + Job(Job &&other) = delete; + Job &operator=(Job &other) = delete; + Job &operator=(Job &&other) = delete; virtual ~Job() = default; virtual std::string getJobId() const = 0; }; +class HyperQueueJob : public Job +{ +public: + explicit HyperQueueJob(const std::string& command) + { + std::string output = getCommandOutput(command); + remove_trailing_newline(output); + id = output; + } + + ~HyperQueueJob() override + { + std::system(("./hq job cancel " + id).c_str()); + } + + std::string getJobId() const override + { + return id; + } + +private: + std::string id; +}; -class FileBasedJob : public Job +class SlurmJob : public Job { public: - FileBasedJob(const std::string& command, std::function extract_job_id) + explicit SlurmJob(const std::string& command) { - std::string command_output = getCommandOutput(command); - id = extract_job_id(command_output); + std::string output = getCommandOutput(command); + id = output.substr(0, output.find(';')); } - ~FileBasedJob() + ~SlurmJob() override { + std::system(("scancel " + id).c_str()); + } + std::string getJobId() const override + { + return id; } -protected: + +private: std::string id; }; -template -void deleteFileBased(T* t, std::string file_to_delete, std::string cancel_command) +/* +std::string submit_hyperqueue_job(const std::string& command) { - delete t; - std::filesystem::remove(file_to_delete); - std::system(cancel_command.c_str()); + std::string id = getCommandOutput(command); + remove_trailing_newline(id); + return id; } +std::string submit_slurm_job(const std::string& command) +{ + std::string id = getCommandOutput(command); + return id.substr(0, id.find(';')); +} + +void cancel_hyperqueue_job(const std::string& id) +{ + std::system(("./hq job cancel " + id).c_str()); +} + +void cancel_slurm_job(const std::string& id) +{ + std::system(("scancel " + id).c_str()); +} + +template +class Job +{ +public: + Job(std::string submit_command, SubmitFunction submit, CancelFunction cancel) : id(submit(submit_command)), cancel(cancel) {} + Job(Job &other) = delete; + Job(Job &&other) = delete; + Job &operator=(Job &other) = delete; + Job &operator=(Job &&other) = delete; + ~Job() + { + cancel(id); + } + +private: + std::string id; + CancelFunction cancel; +}; + +class HyperQueueJob : public Job +{ + HyperQueueJob() +}; + + +using SlurmJob = Job; +using HyperQueueJob = Job; +*/ + // Basic idea: // 1. Run some command to request a resource allocation on the HPC cluster. // 2. Launch a model server in the resource allocation. // 3. Retrieve the URL of the model server. // 4. Connect to the model server using the URL. -class FileBasedJobManager : public JobManager +template +class CommandJobManager : public JobManager { public: - virtual SafeUniqueModelPointer requestModelAccess(const std::string& model_name) override + std::unique_ptr requestModelAccess(const std::string& model_name) override { std::string job_id = submitJob(); std::string server_url = readURL(job_id); - SafeUniqueModelPointer client(new umbridge::HTTPModel(server_url, model_name), createModelDeleter(job_id)); - return client; + std::unique_ptr model = std::make_unique(model_name, server_url); + return model; } -protected: +private: virtual std::string getSubmissionCommand() = 0; - virtual std::string getCancelationCommand(const std::string& job_id) = 0; - std::function createModelDeleter(const std::string& job_id) - { - std::string file_to_delete = getURLFileName(job_id); - std::string cancelation_command = getCancelationCommand(job_id); - return [file_to_delete, cancelation_command](umbridge::Model* model) { - delete model; - std::filesystem::remove(file_to_delete); - std::system(cancelation_command.c_str()); - }; - } - - std::string getURLFileName(const std::string& job_id) + std::string getURLFileName(const std::string& job_id) const { std::filesystem::path url_file_name(url_file_prefix + job_id + url_file_suffix); return (url_dir / url_file_name).string(); @@ -158,19 +232,12 @@ class FileBasedJobManager : public JobManager { // Add optional delay to job submissions to prevent issues in some cases. if (submission_delay_ms > 0) { - std::lock_guard lock(submission_mutex); + std::lock_guard lock(submission_mutex); std::this_thread::sleep_for(std::chrono::milliseconds(submission_delay_ms)); } // Submit job and increase job count - std::string command_output = getCommandOutput(getSubmissionCommand()); + Job job(getSubmissionCommand()); // getSubmissionCommand may depend on job_count. Possible race condition! job_count++; - - // Extract the actual job id from the command output - return parseJobID(command_output); - } - - virtual std::string parseJobID(const std::string& unparsed_job_id) { - return unparsed_job_id; } std::string selectJobScript(const std::string& model_name, bool force_default_submission_script = false) @@ -201,7 +268,7 @@ class FileBasedJobManager : public JobManager std::filesystem::path submission_script_dir; std::filesystem::path submission_script_default; - // Model-specifc job-script format: + // Model-specific job-script format: std::string submission_script_model_specific_prefix; std::string submission_script_model_specific_suffix; @@ -216,7 +283,7 @@ class FileBasedJobManager : public JobManager std::atomic job_count = 0; }; -class HyperQueueJob : public umbridge::Model +class HyperQueueJob { public: static std::atomic job_count; @@ -382,7 +449,7 @@ class LoadBalancer : public umbridge::Model const std::vector> &inputs, const std::vector &sens, const std::vector &vec, - json config_json = json::parse("{}")) + json config_json = json::parse("{}")) override { auto model = job_manager->requestModelAccess(name); return model->ApplyHessian(outWrt, inWrt1, inWrt2, inputs, sens, vec, config_json); From f7a284bd112308146b1ed217b93bfc1e7e5ef4ce Mon Sep 17 00:00:00 2001 From: Lev Gromov Date: Thu, 29 Aug 2024 21:32:55 +0200 Subject: [PATCH 08/15] WIP: Added Command class --- hpc/LoadBalancer.hpp | 101 ++++++++++++++++++++----------------------- 1 file changed, 46 insertions(+), 55 deletions(-) diff --git a/hpc/LoadBalancer.hpp b/hpc/LoadBalancer.hpp index 47e83c5..5b786bd 100644 --- a/hpc/LoadBalancer.hpp +++ b/hpc/LoadBalancer.hpp @@ -59,6 +59,32 @@ std::string readLineFromFile(const std::string& filename) return line; } +struct Command +{ + std::string exec; + std::vector options; + std::string target; + + void addOption(const std::string& option) + { + options.push_back(option); + } + + std::string toString() const + { + std::string result = exec; + for (const std::string& s : options) + { + result += " " + s; + } + result += " " + target; + + return result; + } +}; + + + class JobManager { public: @@ -84,6 +110,9 @@ void remove_trailing_newline(std::string& s) } } +// A Job instance escaping its scope would cause the destructor of the temporary to prematurely cancel the system resource allocation. +// Therefore, copy/move-constructor/assignment are marked as deleted. +// Instead, use explicit ownership mechanisms like std::unique_ptr. class Job { public: @@ -100,11 +129,15 @@ class Job class HyperQueueJob : public Job { public: - explicit HyperQueueJob(const std::string& command) + explicit HyperQueueJob(const std::vector& options) { - std::string output = getCommandOutput(command); - remove_trailing_newline(output); - id = output; + Command command {"./hq", options, "hq_scripts/job.sh"}; + + // Makes HQ output "\n" + command.addOption("--output-mode=quiet"); + id = getCommandOutput(command.toString()); + + remove_trailing_newline(id); } ~HyperQueueJob() override @@ -124,10 +157,16 @@ class HyperQueueJob : public Job class SlurmJob : public Job { public: - explicit SlurmJob(const std::string& command) + explicit SlurmJob(const std::vector& options) { - std::string output = getCommandOutput(command); + Command command {"sbatch", options, "slurm_scripts/job.sh"}; + + // Makes SLURM output "[;]\n" + command.addOption("--parsable"); + std::string output = getCommandOutput(command.toString()); + id = output.substr(0, output.find(';')); + remove_trailing_newline(id); } ~SlurmJob() override @@ -144,59 +183,11 @@ class SlurmJob : public Job std::string id; }; -/* -std::string submit_hyperqueue_job(const std::string& command) -{ - std::string id = getCommandOutput(command); - remove_trailing_newline(id); - return id; -} - -std::string submit_slurm_job(const std::string& command) -{ - std::string id = getCommandOutput(command); - return id.substr(0, id.find(';')); -} - -void cancel_hyperqueue_job(const std::string& id) -{ - std::system(("./hq job cancel " + id).c_str()); -} - -void cancel_slurm_job(const std::string& id) +class JobCommunicator { - std::system(("scancel " + id).c_str()); -} -template -class Job -{ -public: - Job(std::string submit_command, SubmitFunction submit, CancelFunction cancel) : id(submit(submit_command)), cancel(cancel) {} - Job(Job &other) = delete; - Job(Job &&other) = delete; - Job &operator=(Job &other) = delete; - Job &operator=(Job &&other) = delete; - ~Job() - { - cancel(id); - } - -private: - std::string id; - CancelFunction cancel; }; -class HyperQueueJob : public Job -{ - HyperQueueJob() -}; - - -using SlurmJob = Job; -using HyperQueueJob = Job; -*/ - // Basic idea: // 1. Run some command to request a resource allocation on the HPC cluster. // 2. Launch a model server in the resource allocation. From a4e34411ef86d6be9d9eb4aefb7df98f035a77c4 Mon Sep 17 00:00:00 2001 From: Lev Gromov Date: Sun, 15 Sep 2024 17:12:40 +0200 Subject: [PATCH 09/15] WIP: Implementing job communication --- hpc/LoadBalancer.hpp | 321 ++++++++++++++++++++++--------------------- 1 file changed, 164 insertions(+), 157 deletions(-) diff --git a/hpc/LoadBalancer.hpp b/hpc/LoadBalancer.hpp index 5b786bd..1665ecb 100644 --- a/hpc/LoadBalancer.hpp +++ b/hpc/LoadBalancer.hpp @@ -6,9 +6,16 @@ #include #include #include +#include #include #include "../lib/umbridge.h" +void create_directory_if_not_existing(const std::filesystem::path& directory) { + if (!std::filesystem::is_directory(directory) || !std::filesystem::exists(directory)) { + std::filesystem::create_directory(directory); + } +} + // run and get the result of command std::string getCommandOutput(const std::string& command) { @@ -31,20 +38,20 @@ std::string getCommandOutput(const std::string& command) } // wait until file is created -bool waitForFile(const std::string &filename) +bool wait_for_file(const std::filesystem::path& file_path, std::chrono::milliseconds polling_cycle) { // Check if the file exists - while (!std::filesystem::exists(filename)) { + while (!std::filesystem::exists(file_path)) { // If the file doesn't exist, wait for a certain period - std::this_thread::sleep_for(std::chrono::seconds(1)); + std::this_thread::sleep_for(polling_cycle); } return true; } -std::string readLineFromFile(const std::string& filename) +std::string read_line_from_file(const std::filesystem::path& file_path) { - std::ifstream file(filename); + std::ifstream file(file_path); std::string line = ""; if (file.is_open()) @@ -53,7 +60,7 @@ std::string readLineFromFile(const std::string& filename) } else { - std::cerr << "Unable to open file: " << filename << std::endl; + std::cerr << "Unable to open file: " << file_path.string() << std::endl; } return line; @@ -84,10 +91,11 @@ struct Command }; - class JobManager { public: + virtual ~JobManager() = default; + // Grant exclusive ownership of a model (with a given name) to a caller. // The returned object MUST release any resources that it holds once it goes out of scope in the code of the caller. // This can be achieved by returning a unique pointer with an appropriate deleter. @@ -98,8 +106,6 @@ class JobManager // Typically, this can be achieved by simply running the model code and requesting the model names from the server. // Therefore, the implementation can most likely use the same mechanism that is also used for granting model access. virtual std::vector getModelNames() = 0; - - virtual ~JobManager() = default; }; void remove_trailing_newline(std::string& s) @@ -110,28 +116,28 @@ void remove_trailing_newline(std::string& s) } } -// A Job instance escaping its scope would cause the destructor of the temporary to prematurely cancel the system resource allocation. +// A Job instance escaping its scope would cause the destructor to prematurely cancel the system resource allocation. // Therefore, copy/move-constructor/assignment are marked as deleted. // Instead, use explicit ownership mechanisms like std::unique_ptr. class Job { public: Job() = default; - Job(Job &other) = delete; - Job(Job &&other) = delete; - Job &operator=(Job &other) = delete; - Job &operator=(Job &&other) = delete; + Job(Job& other) = delete; + Job(Job&& other) = delete; + Job& operator=(Job& other) = delete; + Job& operator=(Job&& other) = delete; virtual ~Job() = default; virtual std::string getJobId() const = 0; }; - +// Environment vars: --env KEY1=VAL1 --env KEY2=VAL2 class HyperQueueJob : public Job { public: - explicit HyperQueueJob(const std::vector& options) + explicit HyperQueueJob(const std::vector& options, const std::string& target) { - Command command {"./hq", options, "hq_scripts/job.sh"}; + Command command {"./hq", options, target}; // Makes HQ output "\n" command.addOption("--output-mode=quiet"); @@ -153,13 +159,13 @@ class HyperQueueJob : public Job private: std::string id; }; - +// Environment vars: --export=KEY1=VAL1,KEY2=VAL2 class SlurmJob : public Job { public: - explicit SlurmJob(const std::vector& options) + explicit SlurmJob(const std::vector& options, const std::string& target) { - Command command {"sbatch", options, "slurm_scripts/job.sh"}; + Command command {"sbatch", options, target}; // Makes SLURM output "[;]\n" command.addOption("--parsable"); @@ -183,9 +189,146 @@ class SlurmJob : public Job std::string id; }; +class JobSubmitter +{ +public: + virtual ~JobSubmitter() = default; + + virtual std::unique_ptr submit(const std::string& model_name, const std::map& env) = 0; +}; + +class HyperQueueSubmitter : public JobSubmitter +{ +public: + HyperQueueSubmitter(std::filesystem::path submission_script_dir, std::chrono::milliseconds submission_delay) + : submission_delay(submission_delay) + { + + } + + std::unique_ptr submit(const std::string& model_name, const std::map& env) override + { + // Add optional delay to job submissions to prevent issues in some cases. + if (submission_delay > std::chrono::milliseconds::zero()) { + std::lock_guard lock(submission_mutex); + std::this_thread::sleep_for(submission_delay); + } + + // Submit job and increase job count + std::vector options = env_to_options(env); + options.push_back("--priority=-" + job_count); + std::unique_ptr job = std::make_unique(options, target); + job_count++; + return job; + } +private: + std::vector env_to_options(const std::map& env) const + { + std::vector options; + options.reserve(env.size()); + + for (const auto& [key, val] : env) + { + options.push_back("--env " + key + "=" + val); + } + return options; + } + + std::chrono::milliseconds submission_delay = std::chrono::milliseconds::zero(); + std::mutex submission_mutex; + + std::atomic job_count = 0; + + std::filesystem::path submission_script_dir; + std::filesystem::path submission_script_default; + // Model-specific job-script format: + std::string submission_script_model_specific_prefix; + std::string submission_script_model_specific_suffix; +}; + +class SlurmSubmitter : public JobSubmitter +{ + +}; + class JobCommunicator { +public: + virtual ~JobCommunicator() = default; + + virtual std::map getInitMessage() = 0; + + virtual std::string getModelUrl(const std::string& job_id) = 0; +}; + +class JobCommunicatorFactory +{ +public: + virtual ~JobCommunicatorFactory() = default; + + virtual std::unique_ptr create() = 0; +}; + +class FilesystemCommunicatorFactory : public JobCommunicatorFactory +{ + FilesystemCommunicatorFactory(std::filesystem::path file_dir, std::chrono::milliseconds polling_cycle) + : file_dir(file_dir), polling_cycle(polling_cycle) + { + create_directory_if_not_existing(file_dir); + } + std::unique_ptr create() override + { + return std::make_unique(file_dir, polling_cycle); + } + +private: + std::filesystem::path file_dir; + + std::chrono::milliseconds polling_cycle; +}; + +class FilesystemCommunicator : public JobCommunicator +{ +public: + FilesystemCommunicator(std::filesystem::path file_dir, std::chrono::milliseconds polling_cycle) + : file_dir(file_dir), polling_cycle(polling_cycle) {} + + ~FilesystemCommunicator() override + { + if(!file_path.empty()) + { + std::filesystem::remove(file_path); + } + } + + std::map getInitMessage() override + { + std::map msg {{"UMBRIDGE_LOADBALANCER_COMM_FILEDIR", file_dir.string()}}; + return msg; + } + + std::string getModelUrl(const std::string& job_id) override + { + file_path = file_dir / getUrlFileName(job_id); + + std::cout << "Waiting for URL file: " << file_path.string() << std::endl; + wait_for_file(file_path, polling_cycle); + + // TODO: What if opening the file fails? + std::string url = read_line_from_file(file_path); + return url; + } +private: + std::string getUrlFileName(const std::string& job_id) const + { + return "url-" + job_id + ".txt"; + } + + std::filesystem::path file_dir; + std::filesystem::path file_path; + + std::chrono::milliseconds polling_cycle; }; // Basic idea: @@ -193,10 +336,10 @@ class JobCommunicator // 2. Launch a model server in the resource allocation. // 3. Retrieve the URL of the model server. // 4. Connect to the model server using the URL. -template class CommandJobManager : public JobManager { public: + CommandJobManager(std::shared_ptr job_submitter, std::shared_ptr job_comm_factory) {} std::unique_ptr requestModelAccess(const std::string& model_name) override { std::string job_id = submitJob(); @@ -208,17 +351,6 @@ class CommandJobManager : public JobManager private: virtual std::string getSubmissionCommand() = 0; - std::string getURLFileName(const std::string& job_id) const - { - std::filesystem::path url_file_name(url_file_prefix + job_id + url_file_suffix); - return (url_dir / url_file_name).string(); - } - - std::string readURL(const std::string& job_id) - { - return readLineFromFile(getURLFileName(job_id)); - } - std::string submitJob() { // Add optional delay to job submissions to prevent issues in some cases. @@ -262,131 +394,6 @@ class CommandJobManager : public JobManager // Model-specific job-script format: std::string submission_script_model_specific_prefix; std::string submission_script_model_specific_suffix; - - // URL file format: - std::filesystem::path url_dir; - std::string url_file_prefix; - std::string url_file_suffix; - - int submission_delay_ms = 0; - std::mutex submission_mutex; - - std::atomic job_count = 0; -}; - -class HyperQueueJob -{ -public: - static std::atomic job_count; - HyperQueueJob(std::string model_name, bool start_client=true, - bool force_default_submission_script=false) - : Model(model_name) - { - job_id = submitHQJob(model_name, force_default_submission_script); - - // Get the server URL - server_url = readUrl("./urls/url-" + job_id + ".txt"); - - // Start a client, using unique pointer - if(start_client) - { - client_ptr = std::make_unique(server_url, model_name); - } - } - - ~HyperQueueJob() - { - // Cancel the SLURM job - std::system(("./hq job cancel " + job_id).c_str()); - - // Delete the url text file - std::system(("rm ./urls/url-" + job_id + ".txt").c_str()); - } - - std::string server_url; - std::unique_ptr client_ptr; - -private: - std::string submitHQJob(const std::string &model_name, bool force_default_submission_script=false) - { - // Add optional delay to job submissions to prevent issues in some cases. - if (hq_submit_delay_ms) { - std::lock_guard lock(job_submission_mutex); - std::this_thread::sleep_for(std::chrono::milliseconds(hq_submit_delay_ms)); - } - - // Use model specific job script if available, default otherwise. - const std::filesystem::path submission_script_dir("./hq_scripts"); - const std::filesystem::path submission_script_generic("job.sh"); - const std::filesystem::path submission_script_model_specific("job_" + model_name + ".sh"); - - std::string hq_command = "./hq submit --output-mode=quiet "; - hq_command += "--priority=" + std::to_string(job_count) + " "; - if (std::filesystem::exists(submission_script_dir / submission_script_model_specific) && !force_default_submission_script) - { - hq_command += (submission_script_dir / submission_script_model_specific).string(); - } - else if (std::filesystem::exists(submission_script_dir / submission_script_generic)) - { - hq_command += (submission_script_dir / submission_script_generic).string(); - } - else - { - throw std::runtime_error("Job submission script not found: Check that file 'hq_script/job.sh' exists."); - } - - // Submit the HQ job and retrieve the HQ job ID. - std::string job_id = getCommandOutput(hq_command); - job_count--; - - // Delete the line break. - if (!job_id.empty()) - { - job_id.pop_back(); - } - - std::cout << "Waiting for job " << job_id << " to start." << std::endl; - - // Wait for the HQ Job to start - waitForHQJobState(job_id, "RUNNING"); - - // Also wait until job is running and url file is written - waitForFile("./urls/url-" + job_id + ".txt"); - - std::cout << "Job " << job_id << " started." << std::endl; - - return job_id; - } - - // state = ["WAITING", "RUNNING", "FINISHED", "CANCELED"] - bool waitForHQJobState(const std::string &job_id, const std::string &state) - { - const std::string command = "./hq job info " + job_id + " | grep State | awk '{print $4}'"; - // std::cout << "Checking runtime: " << command << std::endl; - std::string job_status; - - do - { - job_status = getCommandOutput(command); - - // Delete the line break - if (!job_status.empty()) - job_status.pop_back(); - - // Don't wait if there is an error or the job is ended - if (job_status == "" || (state != "FINISHED" && job_status == "FINISHED") || job_status == "FAILED" || job_status == "CANCELED") - { - std::cerr << "Wait for job status failure, status : " << job_status << std::endl; - return false; - } - - sleep(1); - } while (job_status != state); - - return true; - } - - std::string job_id; }; From 41394e780a37247184f32633c4786ff8fd869e47 Mon Sep 17 00:00:00 2001 From: Lev Gromov Date: Sun, 6 Oct 2024 16:40:14 +0200 Subject: [PATCH 10/15] WIP: Testing first compilable version --- hpc/LoadBalancer.cpp | 110 ++-------------- hpc/LoadBalancer.hpp | 296 ++++++++++++++++++++++++++++--------------- 2 files changed, 203 insertions(+), 203 deletions(-) diff --git a/hpc/LoadBalancer.cpp b/hpc/LoadBalancer.cpp index 4b51fb2..eca75b3 100644 --- a/hpc/LoadBalancer.cpp +++ b/hpc/LoadBalancer.cpp @@ -11,12 +11,6 @@ #include "../lib/umbridge.h" -void create_directory_if_not_existing(std::string directory) { - if (!std::filesystem::is_directory(directory) || !std::filesystem::exists(directory)) { - std::filesystem::create_directory(directory); - } -} - void clear_url(std::string directory) { for (auto& file : std::filesystem::directory_iterator(directory)) { if (std::regex_match(file.path().filename().string(), std::regex("url-\\d+\\.txt"))) { @@ -28,98 +22,15 @@ void clear_url(std::string directory) { void launch_hq_with_alloc_queue() { std::system("./hq server stop &> /dev/null"); - std::system("./hq server start &"); - sleep(1); // Workaround: give the HQ server enough time to start. + std::system("until ./hq server info &> /dev/null; do sleep 1; done"); // Create HQ allocation queue std::system("hq_scripts/allocation_queue.sh"); } -const std::vector get_model_names() { - // Don't start a client, always use the default job submission script. - HyperQueueJob hq_job("", false, true); - - return umbridge::SupportedModels(hq_job.server_url); -} - -void print_model_and_job_script_information(const std::vector& model_names) { - // Constants - const std::filesystem::path SUBMISSION_SCRIPT_DIR("./hq_scripts"); - const std::filesystem::path SUBMISSION_SCRIPT_GENERIC("job.sh"); - - const std::string SECTION_START_DELIMITER = "==============================MODEL INFO=============================="; - const std::string SECTION_END_DELIMITER = "======================================================================"; - - // Sort the model names in alphabetical order for cleaner output. - std::vector model_names_sorted = model_names; - std::sort(model_names_sorted.begin(), model_names_sorted.end()); - - std::cout << SECTION_START_DELIMITER << "\n"; - // Print list of available models and corresponding job-scripts. - std::cout << "Available models and corresponding job-scripts:\n"; - for (const std::string& model_name : model_names_sorted) { - // Determine which job script will be used by checking if a model specific job script exists. - std::string used_job_script; - const std::filesystem::path submission_script_model_specific("job_" + model_name + ".sh"); - if (std::filesystem::exists(SUBMISSION_SCRIPT_DIR / submission_script_model_specific)) { - used_job_script = submission_script_model_specific.string(); - } else { - used_job_script = SUBMISSION_SCRIPT_GENERIC.string(); - } - std::cout << "* Model '" << model_name << "' --> '" << used_job_script << "'\n"; - } - std::cout << std::endl; - - - // Check if there are job scripts that are unused and print a warning. - std::vector unused_job_scripts; - - // Build a regex to parse job-script filenames and extract the model name. - // Format should be: job_.sh - const std::string format_prefix = "^job_"; // Ensures that filename starts with 'job_'. - const std::string format_suffix = "\\.sh$"; // Ensures that filename ends with '.sh'. - const std::string format_model_name = "(.*)"; // Arbitrary sequence of characters as a marked subexpression. - const std::regex format_regex(format_prefix + format_model_name + format_suffix); - - for (auto& file : std::filesystem::directory_iterator(SUBMISSION_SCRIPT_DIR)) { - const std::string filename = file.path().filename().string(); - // Check if filename matches format of a model specific job script, i.e. 'job_.sh'. - std::smatch match_result; - if (std::regex_search(filename, match_result, format_regex)) { - // Extract first matched subexpression, i.e. the model name. - const std::string model_name = match_result[1].str(); - // Check if a corresponding model exists. If not, mark job script as unused. - if (!std::binary_search(model_names_sorted.begin(), model_names_sorted.end(), model_name)) { - unused_job_scripts.push_back(filename); - } - } - } - - // Print the warning message. - if(!unused_job_scripts.empty()) { - // Sort unused job scripts alphabetically for cleaner output. - std::sort(unused_job_scripts.begin(), unused_job_scripts.end()); - - std::cout << "WARNING: The following model-specific job-scripts are not used by any of the available models:\n"; - for (const std::string& job_script : unused_job_scripts) { - std::cout << "* '" << job_script << "'\n"; - } - std::cout << std::endl; - - std::cout << "If this behavior is unintentional, then please verify that:\n" - << "1. The filename of your model-specific job-script follows the format: 'job_.sh' (e.g. 'job_mymodel.sh')\n" - << "2. The spelling of your model name matches in the model definition and in the filename of your model-specific job-script.\n"; - } - - std::cout << SECTION_END_DELIMITER << std::endl; -} - -std::atomic HyperQueueJob::job_count = 0; int main(int argc, char *argv[]) { - create_directory_if_not_existing("urls"); - create_directory_if_not_existing("sub-jobs"); clear_url("urls"); launch_hq_with_alloc_queue(); @@ -137,25 +48,24 @@ int main(int argc, char *argv[]) port = atoi(port_cstr); } - char const *delay_cstr = std::getenv("HQ_SUBMIT_DELAY_MS"); - if (delay_cstr != NULL) - { - hq_submit_delay_ms = atoi(delay_cstr); - } - std::cout << "HQ_SUBMIT_DELAY_MS set to " << hq_submit_delay_ms << std::endl; + JobScriptLocator locator {"hq_scripts", "job.sh", "job_", ".sh"}; + std::shared_ptr job_manager = std::make_shared( + std::make_unique(std::chrono::milliseconds(100)), + std::make_unique("urls", std::chrono::milliseconds(100)), + locator + ); // Initialize load balancer for each available model on the model server. - const std::vector model_names = get_model_names(); + std::vector model_names = job_manager->getModelNames(); // Inform the user about the available models and the job scripts that will be used. - // Output a warning for unused model-specific job-scripts to prevent typos. - print_model_and_job_script_information(model_names); + locator.printModelJobScripts(model_names); std::vector LB_vector; for (auto model_name : model_names) { // Set up and serve model - LB_vector.emplace_back(LoadBalancer{model_name}); + LB_vector.emplace_back(model_name, job_manager); } // umbridge::serveModels currently only accepts raw pointers. diff --git a/hpc/LoadBalancer.hpp b/hpc/LoadBalancer.hpp index 1665ecb..8364d2f 100644 --- a/hpc/LoadBalancer.hpp +++ b/hpc/LoadBalancer.hpp @@ -10,14 +10,8 @@ #include #include "../lib/umbridge.h" -void create_directory_if_not_existing(const std::filesystem::path& directory) { - if (!std::filesystem::is_directory(directory) || !std::filesystem::exists(directory)) { - std::filesystem::create_directory(directory); - } -} - // run and get the result of command -std::string getCommandOutput(const std::string& command) +std::string get_command_output(const std::string& command) { FILE *pipe = popen(command.c_str(), "r"); // execute the command and return the output as stream if (!pipe) @@ -90,24 +84,6 @@ struct Command } }; - -class JobManager -{ -public: - virtual ~JobManager() = default; - - // Grant exclusive ownership of a model (with a given name) to a caller. - // The returned object MUST release any resources that it holds once it goes out of scope in the code of the caller. - // This can be achieved by returning a unique pointer with an appropriate deleter. - // This method may return a nullptr to deny a request. - virtual std::unique_ptr requestModelAccess(const std::string& model_name) = 0; - - // To initialize the load balancer we first need a list of model names that are available on a server. - // Typically, this can be achieved by simply running the model code and requesting the model names from the server. - // Therefore, the implementation can most likely use the same mechanism that is also used for granting model access. - virtual std::vector getModelNames() = 0; -}; - void remove_trailing_newline(std::string& s) { if (!s.empty() && s.back() == '\n') @@ -141,7 +117,7 @@ class HyperQueueJob : public Job // Makes HQ output "\n" command.addOption("--output-mode=quiet"); - id = getCommandOutput(command.toString()); + id = get_command_output(command.toString()); remove_trailing_newline(id); } @@ -169,7 +145,7 @@ class SlurmJob : public Job // Makes SLURM output "[;]\n" command.addOption("--parsable"); - std::string output = getCommandOutput(command.toString()); + std::string output = get_command_output(command.toString()); id = output.substr(0, output.find(';')); remove_trailing_newline(id); @@ -194,19 +170,16 @@ class JobSubmitter public: virtual ~JobSubmitter() = default; - virtual std::unique_ptr submit(const std::string& model_name, const std::map& env) = 0; + virtual std::unique_ptr submit(const std::string& job_script, const std::map& env) = 0; }; class HyperQueueSubmitter : public JobSubmitter { public: - HyperQueueSubmitter(std::filesystem::path submission_script_dir, std::chrono::milliseconds submission_delay) - : submission_delay(submission_delay) - { - - } + HyperQueueSubmitter(std::chrono::milliseconds submission_delay) + : submission_delay(submission_delay) {} - std::unique_ptr submit(const std::string& model_name, const std::map& env) override + std::unique_ptr submit(const std::string& job_script, const std::map& env) override { // Add optional delay to job submissions to prevent issues in some cases. if (submission_delay > std::chrono::milliseconds::zero()) { @@ -216,8 +189,8 @@ class HyperQueueSubmitter : public JobSubmitter // Submit job and increase job count std::vector options = env_to_options(env); - options.push_back("--priority=-" + job_count); - std::unique_ptr job = std::make_unique(options, target); + options.emplace_back("--priority=-" + job_count); + std::unique_ptr job = std::make_unique(options, job_script); job_count++; return job; } @@ -238,12 +211,6 @@ class HyperQueueSubmitter : public JobSubmitter std::mutex submission_mutex; std::atomic job_count = 0; - - std::filesystem::path submission_script_dir; - std::filesystem::path submission_script_default; - // Model-specific job-script format: - std::string submission_script_model_specific_prefix; - std::string submission_script_model_specific_suffix; }; class SlurmSubmitter : public JobSubmitter @@ -269,24 +236,6 @@ class JobCommunicatorFactory virtual std::unique_ptr create() = 0; }; -class FilesystemCommunicatorFactory : public JobCommunicatorFactory -{ - FilesystemCommunicatorFactory(std::filesystem::path file_dir, std::chrono::milliseconds polling_cycle) - : file_dir(file_dir), polling_cycle(polling_cycle) - { - create_directory_if_not_existing(file_dir); - } - std::unique_ptr create() override - { - return std::make_unique(file_dir, polling_cycle); - } - -private: - std::filesystem::path file_dir; - - std::chrono::milliseconds polling_cycle; -}; - class FilesystemCommunicator : public JobCommunicator { public: @@ -331,72 +280,213 @@ class FilesystemCommunicator : public JobCommunicator std::chrono::milliseconds polling_cycle; }; -// Basic idea: -// 1. Run some command to request a resource allocation on the HPC cluster. -// 2. Launch a model server in the resource allocation. -// 3. Retrieve the URL of the model server. -// 4. Connect to the model server using the URL. -class CommandJobManager : public JobManager +class FilesystemCommunicatorFactory : public JobCommunicatorFactory { public: - CommandJobManager(std::shared_ptr job_submitter, std::shared_ptr job_comm_factory) {} - std::unique_ptr requestModelAccess(const std::string& model_name) override + FilesystemCommunicatorFactory(std::filesystem::path file_dir, std::chrono::milliseconds polling_cycle) + : file_dir(file_dir), polling_cycle(polling_cycle) { - std::string job_id = submitJob(); - std::string server_url = readURL(job_id); - - std::unique_ptr model = std::make_unique(model_name, server_url); - return model; + std::filesystem::create_directory(file_dir); } -private: - virtual std::string getSubmissionCommand() = 0; - - std::string submitJob() + std::unique_ptr create() override { - // Add optional delay to job submissions to prevent issues in some cases. - if (submission_delay_ms > 0) { - std::lock_guard lock(submission_mutex); - std::this_thread::sleep_for(std::chrono::milliseconds(submission_delay_ms)); - } - // Submit job and increase job count - Job job(getSubmissionCommand()); // getSubmissionCommand may depend on job_count. Possible race condition! - job_count++; + return std::make_unique(file_dir, polling_cycle); } - std::string selectJobScript(const std::string& model_name, bool force_default_submission_script = false) - { - namespace fs = std::filesystem; +private: + std::filesystem::path file_dir; - const fs::path submission_script_model_specific( - submission_script_model_specific_prefix + model_name + submission_script_model_specific_suffix); - std::string job_script = ""; + std::chrono::milliseconds polling_cycle; +}; + + +struct JobScriptLocator +{ + std::filesystem::path selectJobScript(const std::string& model_name) + { + std::filesystem::path script_default = script_dir / script_default_name; + std::filesystem::path script_model_specific = script_dir / (model_prefix + model_name + model_suffix); // Use model specific job script if available, default otherwise. - if (fs::exists(submission_script_dir / submission_script_model_specific) && !force_default_submission_script) + if (std::filesystem::exists(script_model_specific)) { - std::string job_script = (submission_script_dir / submission_script_model_specific).string(); + return script_model_specific; } - else if (fs::exists(submission_script_dir / submission_script_default)) + else if (std::filesystem::exists(script_default) ) { - std::string job_script = (submission_script_dir / submission_script_default).string(); + return script_default; } else { - const std::string error_msg = "Job submission script not found: Check that file '" - + (submission_script_dir / submission_script_default).string() + "' exists."; + std::string error_msg = "Job script not found: Check that file '" + script_default.string() + "' exists."; throw std::runtime_error(error_msg); } - return job_script; } - - std::filesystem::path submission_script_dir; - std::filesystem::path submission_script_default; + + std::filesystem::path getDefaultJobScript() + { + return script_dir / script_default_name; + } + + void printModelJobScripts(std::vector model_names) { + const std::string section_start_delimiter = "==============================MODEL INFO=============================="; + const std::string section_end_delimiter = "======================================================================"; + + // Sort the model names in alphabetical order for cleaner output. + std::sort(model_names.begin(), model_names.end()); + + std::cout << section_start_delimiter << std::endl; + + std::cout << "Available models and corresponding job-scripts:\n"; + for (const std::string& model_name : model_names) { + std::filesystem::path used_job_script = selectJobScript(model_name); + std::cout << "* Model '" << model_name << "' --> '" << used_job_script << std::endl; + } + std::cout << std::endl; + + std::cout << section_end_delimiter << std::endl; + } + + + std::filesystem::path script_dir; + + std::string script_default_name; + // Model-specific job-script format: - std::string submission_script_model_specific_prefix; - std::string submission_script_model_specific_suffix; + std::string model_prefix; + std::string model_suffix; +}; + +class JobManager +{ +public: + virtual ~JobManager() = default; + + // Grant exclusive ownership of a model (with a given name) to a caller. + // The returned object MUST release any resources that it holds once it goes out of scope in the code of the caller. + // This can be achieved by returning a unique pointer with an appropriate deleter. + // This method may return a nullptr to deny a request. + virtual std::unique_ptr requestModelAccess(const std::string& model_name) = 0; + + // To initialize the load balancer we first need a list of model names that are available on a server. + // Typically, this can be achieved by simply running the model code and requesting the model names from the server. + // Therefore, the implementation can most likely use the same mechanism that is also used for granting model access. + virtual std::vector getModelNames() = 0; }; +// TODO: Ugly repetition, maybe there is a better way to wrap a job and a model? +class JobModel : public umbridge::Model +{ +public: + JobModel(std::unique_ptr job, std::unique_ptr model) + : umbridge::Model(model->GetName()), job(std::move(job)), model(std::move(model)) {} + + std::vector GetInputSizes(const json &config_json = json::parse("{}")) const override + { + return model->GetInputSizes(config_json); + } + + std::vector GetOutputSizes(const json &config_json = json::parse("{}")) const override + { + return model->GetOutputSizes(config_json); + } + + std::vector> Evaluate(const std::vector> &inputs, json config_json = json::parse("{}")) override + { + return model->Evaluate(inputs, config_json); + } + + std::vector Gradient(unsigned int outWrt, + unsigned int inWrt, + const std::vector> &inputs, + const std::vector &sens, + json config_json = json::parse("{}")) override + { + return model->Gradient(outWrt, inWrt, inputs, sens, config_json); + } + + std::vector ApplyJacobian(unsigned int outWrt, + unsigned int inWrt, + const std::vector> &inputs, + const std::vector &vec, + json config_json = json::parse("{}")) override + { + return model->ApplyJacobian(outWrt, inWrt, inputs, vec, config_json); + } + + std::vector ApplyHessian(unsigned int outWrt, + unsigned int inWrt1, + unsigned int inWrt2, + const std::vector> &inputs, + const std::vector &sens, + const std::vector &vec, + json config_json = json::parse("{}")) override + { + return model->ApplyHessian(outWrt, inWrt1, inWrt2, inputs, sens, vec, config_json); + } + + bool SupportsEvaluate() override + { + return model->SupportsEvaluate(); + } + bool SupportsGradient() override + { + return model->SupportsGradient(); + } + bool SupportsApplyJacobian() override + { + return model->SupportsApplyJacobian(); + } + bool SupportsApplyHessian() override + { + return model->SupportsApplyHessian(); + } + +private: + std::unique_ptr job; + std::unique_ptr model; +}; + +// Basic idea: +// 1. Run some command to request a resource allocation on the HPC cluster. +// 2. Launch a model server in the resource allocation. +// 3. Retrieve the URL of the model server. +// 4. Connect to the model server using the URL. +class CommandJobManager : public JobManager +{ +public: + CommandJobManager( + std::unique_ptr job_submitter, + std::unique_ptr job_comm_factory, + JobScriptLocator locator) + : job_submitter(std::move(job_submitter)), job_comm_factory(std::move(job_comm_factory)), locator(std::move(locator)) {} + + std::unique_ptr requestModelAccess(const std::string& model_name) override + { + std::filesystem::path job_script = locator.selectJobScript(model_name); + std::unique_ptr comm = job_comm_factory->create(); + std::unique_ptr job = job_submitter->submit(job_script, comm->getInitMessage()); + std::string url = comm->getModelUrl(job->getJobId()); + auto model = std::make_unique(url, model_name); + return std::make_unique(std::move(job), std::move(model)); + } + + std::vector getModelNames() override + { + std::filesystem::path job_script = locator.getDefaultJobScript(); + std::unique_ptr comm = job_comm_factory->create(); + std::unique_ptr job = job_submitter->submit(job_script, comm->getInitMessage()); + std::string url = comm->getModelUrl(job->getJobId()); + return umbridge::SupportedModels(url); + } + +private: + std::unique_ptr job_submitter; + std::unique_ptr job_comm_factory; + JobScriptLocator locator; +}; + class LoadBalancer : public umbridge::Model { public: From ed198a512d79ff885fbd5d94723dae9b198ff5d6 Mon Sep 17 00:00:00 2001 From: Lev Gromov Date: Mon, 7 Oct 2024 11:14:02 +0200 Subject: [PATCH 11/15] WIP: Fix HQ server start and job submission --- hpc/LoadBalancer.cpp | 2 ++ hpc/LoadBalancer.hpp | 5 +++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/hpc/LoadBalancer.cpp b/hpc/LoadBalancer.cpp index eca75b3..391380c 100644 --- a/hpc/LoadBalancer.cpp +++ b/hpc/LoadBalancer.cpp @@ -22,6 +22,8 @@ void clear_url(std::string directory) { void launch_hq_with_alloc_queue() { std::system("./hq server stop &> /dev/null"); + std::system("./hq server start &"); + std::system("until ./hq server info &> /dev/null; do sleep 1; done"); // Create HQ allocation queue diff --git a/hpc/LoadBalancer.hpp b/hpc/LoadBalancer.hpp index 8364d2f..0169280 100644 --- a/hpc/LoadBalancer.hpp +++ b/hpc/LoadBalancer.hpp @@ -113,10 +113,11 @@ class HyperQueueJob : public Job public: explicit HyperQueueJob(const std::vector& options, const std::string& target) { - Command command {"./hq", options, target}; + Command command {"./hq submit", options, target}; // Makes HQ output "\n" command.addOption("--output-mode=quiet"); + std::cout << "Running command: " << command.toString() << std::endl; id = get_command_output(command.toString()); remove_trailing_newline(id); @@ -189,7 +190,7 @@ class HyperQueueSubmitter : public JobSubmitter // Submit job and increase job count std::vector options = env_to_options(env); - options.emplace_back("--priority=-" + job_count); + options.emplace_back("--priority=-" + std::to_string(job_count)); std::unique_ptr job = std::make_unique(options, job_script); job_count++; return job; From 16ff5f9b2bc6ce84e25af0e4d237b65b49265cc7 Mon Sep 17 00:00:00 2001 From: Lev Gromov Date: Fri, 11 Oct 2024 01:32:38 +0200 Subject: [PATCH 12/15] WIP: Add SLURM scheduler and CLI --- hpc/LoadBalancer.cpp | 102 ++++++++++++++++++++++++++++----------- hpc/LoadBalancer.hpp | 36 +++++++++++++- hpc/hq_scripts/job.sh | 9 ++-- hpc/slurm_scripts/job.sh | 48 ++++++++++++++++++ 4 files changed, 159 insertions(+), 36 deletions(-) create mode 100755 hpc/slurm_scripts/job.sh diff --git a/hpc/LoadBalancer.cpp b/hpc/LoadBalancer.cpp index 391380c..c75ee4f 100644 --- a/hpc/LoadBalancer.cpp +++ b/hpc/LoadBalancer.cpp @@ -1,17 +1,14 @@ #include "LoadBalancer.hpp" -#include -#include +#include "../lib/umbridge.h" + +#include #include -#include #include -#include - -#include -#include +#include +#include -#include "../lib/umbridge.h" -void clear_url(std::string directory) { +void clear_url(const std::string& directory) { for (auto& file : std::filesystem::directory_iterator(directory)) { if (std::regex_match(file.path().filename().string(), std::regex("url-\\d+\\.txt"))) { std::filesystem::remove(file); @@ -30,32 +27,80 @@ void launch_hq_with_alloc_queue() { std::system("hq_scripts/allocation_queue.sh"); } +std::string get_arg(const std::vector& args, const std::string& arg_name) { + // Check if a string matches the format --=... + const std::string search_string = "--" + arg_name + "="; + auto check_format = [&search_string](const std::string& s) { + return (s.length() > search_string.length()) && (s.rfind(search_string, 0) == 0); + }; -int main(int argc, char *argv[]) -{ + // Return value of the argument or empty string if not found + if (const auto it = std::find_if(args.begin(), args.end(), check_format); it != args.end()) { + return it->substr(search_string.length()); + } + + return ""; +} + + +int main(int argc, char* argv[]) { clear_url("urls"); - launch_hq_with_alloc_queue(); + // Process command line args + std::vector args(argv + 1, argv + argc); - // Read environment variables for configuration - char const *port_cstr = std::getenv("PORT"); - int port = 0; - if (port_cstr == NULL) - { - std::cout << "Environment variable PORT not set! Using port 4242 as default." << std::endl; - port = 4242; + // Scheduler used by the load balancer (currently either SLURM or HyperQueue) + std::string scheduler = get_arg(args, "scheduler"); + // Specifying a scheduler is mandatory since this should be a conscious choice by the user + if (scheduler.empty()) { + std::cerr << "Missing required argument: --scheduler=[hyperqueue | slurm]" << std::endl; + std::exit(-1); } - else - { - port = atoi(port_cstr); + + // Delay for job submissions in milliseconds + std::string delay_str = get_arg(args, "delay-ms"); + std::chrono::milliseconds delay = std::chrono::milliseconds::zero(); + if (!delay_str.empty()) { + delay = std::chrono::milliseconds(std::stoi(delay_str)); + } + + // Load balancer port + std::string port_str = get_arg(args, "port"); + int port = 4242; + if (port_str.empty()) { + std::cout << "Argument --port not set! Using port 4242 as default." << std::endl; + } else { + port = std::stoi(port_str); + } + + + // Assemble job manager + std::unique_ptr job_submitter; + std::string script_dir; + if (scheduler == "hyperqueue") { + launch_hq_with_alloc_queue(); + job_submitter = std::make_unique(delay); + script_dir = "hq_scripts"; + } else if (scheduler == "slurm") { + job_submitter = std::make_unique(delay); + script_dir = "slurm_scripts"; + } else { + std::cerr << "Unrecognized value for argument --scheduler: " + << "Expected hyperqueue or slurm but got " << scheduler << " instead." << std::endl; + std::exit(-1); } - JobScriptLocator locator {"hq_scripts", "job.sh", "job_", ".sh"}; + // Only filesystem communication is implemented. May implement network-based communication in the future. + // Directory which stores URL files and polling cycle currently hard-coded. + std::unique_ptr comm_factory + = std::make_unique("urls", std::chrono::milliseconds(500)); + + // Location of job scripts and naming currently hard-corded. + JobScriptLocator locator {script_dir, "job.sh", "job_", ".sh"}; + std::shared_ptr job_manager = std::make_shared( - std::make_unique(std::chrono::milliseconds(100)), - std::make_unique("urls", std::chrono::milliseconds(100)), - locator - ); + std::move(job_submitter), std::move(comm_factory), locator); + // Initialize load balancer for each available model on the model server. std::vector model_names = job_manager->getModelNames(); @@ -63,10 +108,10 @@ int main(int argc, char *argv[]) // Inform the user about the available models and the job scripts that will be used. locator.printModelJobScripts(model_names); + // Prepare models and serve via network std::vector LB_vector; for (auto model_name : model_names) { - // Set up and serve model LB_vector.emplace_back(model_name, job_manager); } @@ -75,6 +120,5 @@ int main(int argc, char *argv[]) std::transform(LB_vector.begin(), LB_vector.end(), LB_ptr_vector.begin(), [](LoadBalancer& obj) { return &obj; }); - std::cout << "Load balancer running port" << port << std::endl; umbridge::serveModels(LB_ptr_vector, "0.0.0.0", port, true, false); } diff --git a/hpc/LoadBalancer.hpp b/hpc/LoadBalancer.hpp index 0169280..861d221 100644 --- a/hpc/LoadBalancer.hpp +++ b/hpc/LoadBalancer.hpp @@ -117,7 +117,6 @@ class HyperQueueJob : public Job // Makes HQ output "\n" command.addOption("--output-mode=quiet"); - std::cout << "Running command: " << command.toString() << std::endl; id = get_command_output(command.toString()); remove_trailing_newline(id); @@ -196,6 +195,7 @@ class HyperQueueSubmitter : public JobSubmitter return job; } private: + // HyperQueue environment variables: --env=KEY1=VAL1 --env=KEY2=VAL2 ... std::vector env_to_options(const std::map& env) const { std::vector options; @@ -216,7 +216,39 @@ class HyperQueueSubmitter : public JobSubmitter class SlurmSubmitter : public JobSubmitter { +public: + SlurmSubmitter(std::chrono::milliseconds submission_delay) + : submission_delay(submission_delay) {} + + std::unique_ptr submit(const std::string& job_script, const std::map& env) override + { + // Add optional delay to job submissions to prevent issues in some cases. + if (submission_delay > std::chrono::milliseconds::zero()) { + std::lock_guard lock(submission_mutex); + std::this_thread::sleep_for(submission_delay); + } + // Submit job + std::vector options = env_to_options(env); + std::unique_ptr job = std::make_unique(options, job_script); + return job; + } +private: + // SLURM environment variables: --export=KEY1=VAL1,KEY2=VAL2,... + std::vector env_to_options(const std::map& env) const + { + // By default include all SLURM_* and SPANK option environment variables. + std::string env_option = "--export=ALL"; + + for (const auto& [key, val] : env) { + env_option += "," + key + "=" + val; + } + + return {env_option}; + } + + std::chrono::milliseconds submission_delay = std::chrono::milliseconds::zero(); + std::mutex submission_mutex; }; class JobCommunicator @@ -341,7 +373,7 @@ struct JobScriptLocator std::cout << "Available models and corresponding job-scripts:\n"; for (const std::string& model_name : model_names) { std::filesystem::path used_job_script = selectJobScript(model_name); - std::cout << "* Model '" << model_name << "' --> '" << used_job_script << std::endl; + std::cout << "* Model '" << model_name << "' --> '" << used_job_script.string() << "'" << std::endl; } std::cout << std::endl; diff --git a/hpc/hq_scripts/job.sh b/hpc/hq_scripts/job.sh index 03cb381..fcbb91a 100755 --- a/hpc/hq_scripts/job.sh +++ b/hpc/hq_scripts/job.sh @@ -9,7 +9,7 @@ # Launch model server, send back server URL # and wait to ensure that HQ won't schedule any more jobs to this allocation. -function get_avaliable_port { +function get_available_port { # Define the range of ports to select from MIN_PORT=1024 MAX_PORT=65535 @@ -26,7 +26,7 @@ function get_avaliable_port { echo $port } -port=$(get_avaliable_port) +port=$(get_available_port) export PORT=$port # Assume that server sets the port according to the environment variable 'PORT'. @@ -43,8 +43,7 @@ done echo "Model server responded" # Write server URL to file identified by HQ job ID. -load_balancer_dir="." -mkdir -p "$load_balancer_dir/urls" -echo "http://$host:$port" > "$load_balancer_dir/urls/url-$HQ_JOB_ID.txt" +mkdir -p $UMBRIDGE_LOADBALANCER_COMM_FILEDIR +echo "http://$host:$port" > "$UMBRIDGE_LOADBALANCER_COMM_FILEDIR/url-$HQ_JOB_ID.txt" sleep infinity # keep the job occupied diff --git a/hpc/slurm_scripts/job.sh b/hpc/slurm_scripts/job.sh new file mode 100755 index 0000000..449d5c4 --- /dev/null +++ b/hpc/slurm_scripts/job.sh @@ -0,0 +1,48 @@ +#! /bin/bash + +#SBATCH --partition=devel +#SBATCH --ntasks=1 +#SBATCH --time=00:05:00 + + +# Launch model server, send back server URL and wait so that SLURM does not cancel the allocation. + +function get_available_port { + # Define the range of ports to select from + MIN_PORT=1024 + MAX_PORT=65535 + + # Generate a random port number + port=$(shuf -i $MIN_PORT-$MAX_PORT -n 1) + + # Check if the port is in use + while lsof -Pi :$port -sTCP:LISTEN -t >/dev/null; do + # If the port is in use, generate a new random port number + port=$(shuf -i $MIN_PORT-$MAX_PORT -n 1) + done + + echo $port +} + +port=$(get_available_port) +export PORT=$port + +# Assume that server sets the port according to the environment variable 'PORT'. +# Otherwise the job script will be stuck waiting for model server's response. +./testmodel & # CHANGE ME! + + +host=$(hostname -I | awk '{print $1}') + +echo "Waiting for model server to respond at $host:$port..." +while ! curl -s "http://$host:$port/Info" > /dev/null; do + sleep 1 +done +echo "Model server responded" + +# Write server URL to file identified by HQ job ID. +load_balancer_dir="." +mkdir -p $UMBRIDGE_LOADBALANCER_COMM_FILEDIR +echo "http://$host:$port" > "$UMBRIDGE_LOADBALANCER_COMM_FILEDIR/url-$SLURM_JOB_ID.txt" + +sleep infinity # keep the job occupied From af8a3a44e2bfc86dd27cd883d945e83c28327068 Mon Sep 17 00:00:00 2001 From: Lev Gromov Date: Fri, 11 Oct 2024 01:35:24 +0200 Subject: [PATCH 13/15] WIP: Minor cleanup --- hpc/LoadBalancer.cpp | 2 +- hpc/slurm_scripts/job.sh | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/hpc/LoadBalancer.cpp b/hpc/LoadBalancer.cpp index c75ee4f..1b20041 100644 --- a/hpc/LoadBalancer.cpp +++ b/hpc/LoadBalancer.cpp @@ -76,7 +76,7 @@ int main(int argc, char* argv[]) { // Assemble job manager std::unique_ptr job_submitter; - std::string script_dir; + std::filesystem::path script_dir; if (scheduler == "hyperqueue") { launch_hq_with_alloc_queue(); job_submitter = std::make_unique(delay); diff --git a/hpc/slurm_scripts/job.sh b/hpc/slurm_scripts/job.sh index 449d5c4..a843db5 100755 --- a/hpc/slurm_scripts/job.sh +++ b/hpc/slurm_scripts/job.sh @@ -41,7 +41,6 @@ done echo "Model server responded" # Write server URL to file identified by HQ job ID. -load_balancer_dir="." mkdir -p $UMBRIDGE_LOADBALANCER_COMM_FILEDIR echo "http://$host:$port" > "$UMBRIDGE_LOADBALANCER_COMM_FILEDIR/url-$SLURM_JOB_ID.txt" From 096baa52e9f6f680ac815bae83bdb549327c5588 Mon Sep 17 00:00:00 2001 From: Lev Gromov Date: Fri, 11 Oct 2024 14:59:02 +0200 Subject: [PATCH 14/15] WIP: Add comments and fix formatting --- hpc/LoadBalancer.cpp | 5 +- hpc/LoadBalancer.hpp | 314 +++++++++++++++++++------------------------ 2 files changed, 138 insertions(+), 181 deletions(-) diff --git a/hpc/LoadBalancer.cpp b/hpc/LoadBalancer.cpp index 1b20041..b494d6e 100644 --- a/hpc/LoadBalancer.cpp +++ b/hpc/LoadBalancer.cpp @@ -20,7 +20,7 @@ void launch_hq_with_alloc_queue() { std::system("./hq server stop &> /dev/null"); std::system("./hq server start &"); - + // Wait for the HQ server to start std::system("until ./hq server info &> /dev/null; do sleep 1; done"); // Create HQ allocation queue @@ -110,8 +110,7 @@ int main(int argc, char* argv[]) { // Prepare models and serve via network std::vector LB_vector; - for (auto model_name : model_names) - { + for (auto model_name : model_names) { LB_vector.emplace_back(model_name, job_manager); } diff --git a/hpc/LoadBalancer.hpp b/hpc/LoadBalancer.hpp index 861d221..bf1fa48 100644 --- a/hpc/LoadBalancer.hpp +++ b/hpc/LoadBalancer.hpp @@ -1,78 +1,76 @@ +#include "../lib/umbridge.h" + #include +#include +#include +#include #include +#include +#include #include #include -#include -#include -#include -#include -#include -#include -#include "../lib/umbridge.h" -// run and get the result of command -std::string get_command_output(const std::string& command) -{ - FILE *pipe = popen(command.c_str(), "r"); // execute the command and return the output as stream - if (!pipe) - { - std::cerr << "Failed to execute the command: " + command << std::endl; - return ""; + +// Run a shell command and get the result. +// Warning: Prone to injection, do not call with user-supplied arguments. +// Note: POSIX specific and may not run on other platforms (e.g. Windows), but most HPC systems are POSIX-compliant. +// Using an external library (e.g. Boost) would be cleaner, but not worth the effort of managing another dependency. +std::string get_command_output(const std::string& command) { + std::unique_ptr pipe(popen(command.c_str(), "r"), &pclose); + + if (!pipe) { + std::string error_msg = "Failed to run command: " + command + "\n" + + "popen failed with error: " + std::strerror(errno) + "\n"; + throw std::runtime_error(error_msg); } - char buffer[128]; + // Buffer size can be small and is largely unimportant since most commands we use only return a single line. + std::array buffer; std::string output; - while (fgets(buffer, 128, pipe)) - { - output += buffer; + while (fgets(buffer.data(), static_cast(buffer.size()), pipe.get())) { + output += buffer.data(); } - pclose(pipe); return output; } -// wait until file is created -bool wait_for_file(const std::filesystem::path& file_path, std::chrono::milliseconds polling_cycle) -{ - // Check if the file exists +// Wait until a file exists using polling. +void wait_for_file(const std::filesystem::path& file_path, std::chrono::milliseconds polling_cycle) { while (!std::filesystem::exists(file_path)) { - // If the file doesn't exist, wait for a certain period std::this_thread::sleep_for(polling_cycle); } - - return true; } -std::string read_line_from_file(const std::filesystem::path& file_path) -{ +std::string read_line_from_file(const std::filesystem::path& file_path) { std::ifstream file(file_path); - std::string line = ""; - if (file.is_open()) - { - std::getline(file, line); - } - else - { - std::cerr << "Unable to open file: " << file_path.string() << std::endl; + if (!file.is_open()) { + std::string error_msg = "Unable to open file: '" + file_path.string() + "'\n"; + throw std::runtime_error(error_msg); } + std::string line; + std::getline(file, line); + return line; } -struct Command -{ +void remove_trailing_newline(std::string& s) { + if (!s.empty() && s.back() == '\n') { + s.pop_back(); + } +} + +struct Command { std::string exec; std::vector options; std::string target; - void addOption(const std::string& option) - { + void addOption(const std::string& option) { options.push_back(option); } - std::string toString() const - { + std::string toString() const { std::string result = exec; for (const std::string& s : options) { @@ -84,19 +82,12 @@ struct Command } }; -void remove_trailing_newline(std::string& s) -{ - if (!s.empty() && s.back() == '\n') - { - s.pop_back(); - } -} -// A Job instance escaping its scope would cause the destructor to prematurely cancel the system resource allocation. +// A Job represents a resource allocation on an HPC system and has a unique string ID. +// Note: A Job instance escaping its scope would cause the destructor to prematurely cancel the system resource allocation. // Therefore, copy/move-constructor/assignment are marked as deleted. // Instead, use explicit ownership mechanisms like std::unique_ptr. -class Job -{ +class Job { public: Job() = default; Job(Job& other) = delete; @@ -107,12 +98,12 @@ class Job virtual std::string getJobId() const = 0; }; -// Environment vars: --env KEY1=VAL1 --env KEY2=VAL2 -class HyperQueueJob : public Job -{ + +// Note: The location of the HyperQueue binary is currently hard-coded. +// If required, this can be changed easily by accepting a new parameter in the HyperQueueJob and HyperQueueSubmitter classes. +class HyperQueueJob : public Job { public: - explicit HyperQueueJob(const std::vector& options, const std::string& target) - { + HyperQueueJob(const std::vector& options, const std::string& target) { Command command {"./hq submit", options, target}; // Makes HQ output "\n" @@ -122,25 +113,21 @@ class HyperQueueJob : public Job remove_trailing_newline(id); } - ~HyperQueueJob() override - { + ~HyperQueueJob() override { std::system(("./hq job cancel " + id).c_str()); } - std::string getJobId() const override - { + std::string getJobId() const override { return id; } private: std::string id; }; -// Environment vars: --export=KEY1=VAL1,KEY2=VAL2 -class SlurmJob : public Job -{ + +class SlurmJob : public Job { public: - explicit SlurmJob(const std::vector& options, const std::string& target) - { + SlurmJob(const std::vector& options, const std::string& target) { Command command {"sbatch", options, target}; // Makes SLURM output "[;]\n" @@ -151,13 +138,11 @@ class SlurmJob : public Job remove_trailing_newline(id); } - ~SlurmJob() override - { + ~SlurmJob() override { std::system(("scancel " + id).c_str()); } - std::string getJobId() const override - { + std::string getJobId() const override { return id; } @@ -165,22 +150,22 @@ class SlurmJob : public Job std::string id; }; -class JobSubmitter -{ + +// Factory class meant to provide a more high-level interface for job submission. +// In particular, makes it possible to pass environment variables to a job as key-value pairs. +class JobSubmitter { public: virtual ~JobSubmitter() = default; virtual std::unique_ptr submit(const std::string& job_script, const std::map& env) = 0; }; -class HyperQueueSubmitter : public JobSubmitter -{ +class HyperQueueSubmitter : public JobSubmitter { public: - HyperQueueSubmitter(std::chrono::milliseconds submission_delay) + explicit HyperQueueSubmitter(std::chrono::milliseconds submission_delay) : submission_delay(submission_delay) {} - std::unique_ptr submit(const std::string& job_script, const std::map& env) override - { + std::unique_ptr submit(const std::string& job_script, const std::map& env) override { // Add optional delay to job submissions to prevent issues in some cases. if (submission_delay > std::chrono::milliseconds::zero()) { std::lock_guard lock(submission_mutex); @@ -190,19 +175,18 @@ class HyperQueueSubmitter : public JobSubmitter // Submit job and increase job count std::vector options = env_to_options(env); options.emplace_back("--priority=-" + std::to_string(job_count)); + std::unique_ptr job = std::make_unique(options, job_script); job_count++; return job; } private: // HyperQueue environment variables: --env=KEY1=VAL1 --env=KEY2=VAL2 ... - std::vector env_to_options(const std::map& env) const - { + std::vector env_to_options(const std::map& env) const { std::vector options; options.reserve(env.size()); - for (const auto& [key, val] : env) - { + for (const auto& [key, val] : env) { options.push_back("--env " + key + "=" + val); } return options; @@ -214,14 +198,12 @@ class HyperQueueSubmitter : public JobSubmitter std::atomic job_count = 0; }; -class SlurmSubmitter : public JobSubmitter -{ +class SlurmSubmitter : public JobSubmitter { public: SlurmSubmitter(std::chrono::milliseconds submission_delay) : submission_delay(submission_delay) {} - std::unique_ptr submit(const std::string& job_script, const std::map& env) override - { + std::unique_ptr submit(const std::string& job_script, const std::map& env) override { // Add optional delay to job submissions to prevent issues in some cases. if (submission_delay > std::chrono::milliseconds::zero()) { std::lock_guard lock(submission_mutex); @@ -235,8 +217,7 @@ class SlurmSubmitter : public JobSubmitter } private: // SLURM environment variables: --export=KEY1=VAL1,KEY2=VAL2,... - std::vector env_to_options(const std::map& env) const - { + std::vector env_to_options(const std::map& env) const { // By default include all SLURM_* and SPANK option environment variables. std::string env_option = "--export=ALL"; @@ -251,9 +232,19 @@ class SlurmSubmitter : public JobSubmitter std::mutex submission_mutex; }; -class JobCommunicator -{ + +// A JobCommunicator is used to establish communication between the load balancer and a submitted job script. +// The JobCommunicator first generates an initial message of key-value pairs +// which are then passed to the job script via environment variables. +// This message should allow the job script to send back the URL of the hosted model to the load balancer. +// Note: Like a Job, a JobCommunicator shall not be copied or moved. +class JobCommunicator { public: + JobCommunicator() = default; + JobCommunicator(JobCommunicator& other) = delete; + JobCommunicator(JobCommunicator&& other) = delete; + JobCommunicator& operator=(JobCommunicator& other) = delete; + JobCommunicator& operator=(JobCommunicator&& other) = delete; virtual ~JobCommunicator() = default; virtual std::map getInitMessage() = 0; @@ -261,36 +252,31 @@ class JobCommunicator virtual std::string getModelUrl(const std::string& job_id) = 0; }; -class JobCommunicatorFactory -{ +class JobCommunicatorFactory { public: virtual ~JobCommunicatorFactory() = default; virtual std::unique_ptr create() = 0; }; -class FilesystemCommunicator : public JobCommunicator -{ +class FilesystemCommunicator : public JobCommunicator { public: FilesystemCommunicator(std::filesystem::path file_dir, std::chrono::milliseconds polling_cycle) - : file_dir(file_dir), polling_cycle(polling_cycle) {} + : file_dir(std::move(file_dir)), polling_cycle(polling_cycle) {} - ~FilesystemCommunicator() override - { - if(!file_path.empty()) - { + ~FilesystemCommunicator() override { + if(!file_path.empty()) { std::filesystem::remove(file_path); } } - std::map getInitMessage() override - { + // Tell the job script which directory the URL file should be written to. + std::map getInitMessage() override { std::map msg {{"UMBRIDGE_LOADBALANCER_COMM_FILEDIR", file_dir.string()}}; return msg; } - std::string getModelUrl(const std::string& job_id) override - { + std::string getModelUrl(const std::string& job_id) override { file_path = file_dir / getUrlFileName(job_id); std::cout << "Waiting for URL file: " << file_path.string() << std::endl; @@ -302,8 +288,9 @@ class FilesystemCommunicator : public JobCommunicator } private: - std::string getUrlFileName(const std::string& job_id) const - { + // Currently, the naming of the URL file is hard-code. + // In the future, it might be better to have the communicator itself generate the filename and then send it to the job script. + std::string getUrlFileName(const std::string& job_id) const { return "url-" + job_id + ".txt"; } @@ -313,16 +300,14 @@ class FilesystemCommunicator : public JobCommunicator std::chrono::milliseconds polling_cycle; }; -class FilesystemCommunicatorFactory : public JobCommunicatorFactory -{ +class FilesystemCommunicatorFactory : public JobCommunicatorFactory { public: FilesystemCommunicatorFactory(std::filesystem::path file_dir, std::chrono::milliseconds polling_cycle) - : file_dir(file_dir), polling_cycle(polling_cycle) - { + : file_dir(file_dir), polling_cycle(polling_cycle) { std::filesystem::create_directory(file_dir); } - std::unique_ptr create() override - { + + std::unique_ptr create() override { return std::make_unique(file_dir, polling_cycle); } @@ -333,31 +318,24 @@ class FilesystemCommunicatorFactory : public JobCommunicatorFactory }; -struct JobScriptLocator -{ - std::filesystem::path selectJobScript(const std::string& model_name) - { +// A JobScriptLocator specifies where the job script for a particular model is located. +struct JobScriptLocator { + std::filesystem::path selectJobScript(const std::string& model_name) { std::filesystem::path script_default = script_dir / script_default_name; std::filesystem::path script_model_specific = script_dir / (model_prefix + model_name + model_suffix); // Use model specific job script if available, default otherwise. - if (std::filesystem::exists(script_model_specific)) - { + if (std::filesystem::exists(script_model_specific)) { return script_model_specific; - } - else if (std::filesystem::exists(script_default) ) - { + } else if (std::filesystem::exists(script_default)) { return script_default; - } - else - { - std::string error_msg = "Job script not found: Check that file '" + script_default.string() + "' exists."; + } else { + std::string error_msg = "Job script not found: Check that file '" + script_default.string() + "' exists.\n"; throw std::runtime_error(error_msg); } } - std::filesystem::path getDefaultJobScript() - { + std::filesystem::path getDefaultJobScript() { return script_dir / script_default_name; } @@ -390,15 +368,14 @@ struct JobScriptLocator std::string model_suffix; }; -class JobManager -{ + +// A Job manager provides access to an UM-Bridge model on an HPC system. +class JobManager { public: virtual ~JobManager() = default; // Grant exclusive ownership of a model (with a given name) to a caller. // The returned object MUST release any resources that it holds once it goes out of scope in the code of the caller. - // This can be achieved by returning a unique pointer with an appropriate deleter. - // This method may return a nullptr to deny a request. virtual std::unique_ptr requestModelAccess(const std::string& model_name) = 0; // To initialize the load balancer we first need a list of model names that are available on a server. @@ -409,24 +386,21 @@ class JobManager // TODO: Ugly repetition, maybe there is a better way to wrap a job and a model? -class JobModel : public umbridge::Model -{ +class JobModel : public umbridge::Model { public: JobModel(std::unique_ptr job, std::unique_ptr model) : umbridge::Model(model->GetName()), job(std::move(job)), model(std::move(model)) {} - std::vector GetInputSizes(const json &config_json = json::parse("{}")) const override - { + std::vector GetInputSizes(const json &config_json = json::parse("{}")) const override { return model->GetInputSizes(config_json); } - std::vector GetOutputSizes(const json &config_json = json::parse("{}")) const override - { + std::vector GetOutputSizes(const json &config_json = json::parse("{}")) const override { return model->GetOutputSizes(config_json); } - std::vector> Evaluate(const std::vector> &inputs, json config_json = json::parse("{}")) override - { + std::vector> Evaluate(const std::vector> &inputs, + json config_json = json::parse("{}")) override { return model->Evaluate(inputs, config_json); } @@ -434,8 +408,7 @@ class JobModel : public umbridge::Model unsigned int inWrt, const std::vector> &inputs, const std::vector &sens, - json config_json = json::parse("{}")) override - { + json config_json = json::parse("{}")) override { return model->Gradient(outWrt, inWrt, inputs, sens, config_json); } @@ -443,8 +416,7 @@ class JobModel : public umbridge::Model unsigned int inWrt, const std::vector> &inputs, const std::vector &vec, - json config_json = json::parse("{}")) override - { + json config_json = json::parse("{}")) override { return model->ApplyJacobian(outWrt, inWrt, inputs, vec, config_json); } @@ -454,25 +426,20 @@ class JobModel : public umbridge::Model const std::vector> &inputs, const std::vector &sens, const std::vector &vec, - json config_json = json::parse("{}")) override - { + json config_json = json::parse("{}")) override { return model->ApplyHessian(outWrt, inWrt1, inWrt2, inputs, sens, vec, config_json); } - bool SupportsEvaluate() override - { + bool SupportsEvaluate() override { return model->SupportsEvaluate(); } - bool SupportsGradient() override - { + bool SupportsGradient() override { return model->SupportsGradient(); } - bool SupportsApplyJacobian() override - { + bool SupportsApplyJacobian() override { return model->SupportsApplyJacobian(); } - bool SupportsApplyHessian() override - { + bool SupportsApplyHessian() override { return model->SupportsApplyHessian(); } @@ -486,8 +453,7 @@ class JobModel : public umbridge::Model // 2. Launch a model server in the resource allocation. // 3. Retrieve the URL of the model server. // 4. Connect to the model server using the URL. -class CommandJobManager : public JobManager -{ +class CommandJobManager : public JobManager { public: CommandJobManager( std::unique_ptr job_submitter, @@ -495,8 +461,7 @@ class CommandJobManager : public JobManager JobScriptLocator locator) : job_submitter(std::move(job_submitter)), job_comm_factory(std::move(job_comm_factory)), locator(std::move(locator)) {} - std::unique_ptr requestModelAccess(const std::string& model_name) override - { + std::unique_ptr requestModelAccess(const std::string& model_name) override { std::filesystem::path job_script = locator.selectJobScript(model_name); std::unique_ptr comm = job_comm_factory->create(); std::unique_ptr job = job_submitter->submit(job_script, comm->getInitMessage()); @@ -520,26 +485,26 @@ class CommandJobManager : public JobManager JobScriptLocator locator; }; -class LoadBalancer : public umbridge::Model -{ + +// A LoadBalancer acts like a regular UM-Bridge model with the key difference, that incoming requests are +// redirected to models running in a job allocation of an HPC system. +class LoadBalancer : public umbridge::Model { public: LoadBalancer(std::string name, std::shared_ptr job_manager) : umbridge::Model(name), job_manager(job_manager) {} - std::vector GetInputSizes(const json &config_json = json::parse("{}")) const override - { + std::vector GetInputSizes(const json &config_json = json::parse("{}")) const override { auto model = job_manager->requestModelAccess(name); return model->GetInputSizes(config_json); } - std::vector GetOutputSizes(const json &config_json = json::parse("{}")) const override - { + std::vector GetOutputSizes(const json &config_json = json::parse("{}")) const override { auto model = job_manager->requestModelAccess(name); return model->GetOutputSizes(config_json); } - std::vector> Evaluate(const std::vector> &inputs, json config_json = json::parse("{}")) override - { + std::vector> Evaluate(const std::vector> &inputs, + json config_json = json::parse("{}")) override { auto model = job_manager->requestModelAccess(name); return model->Evaluate(inputs, config_json); } @@ -548,8 +513,7 @@ class LoadBalancer : public umbridge::Model unsigned int inWrt, const std::vector> &inputs, const std::vector &sens, - json config_json = json::parse("{}")) override - { + json config_json = json::parse("{}")) override { auto model = job_manager->requestModelAccess(name); return model->Gradient(outWrt, inWrt, inputs, sens, config_json); } @@ -558,8 +522,7 @@ class LoadBalancer : public umbridge::Model unsigned int inWrt, const std::vector> &inputs, const std::vector &vec, - json config_json = json::parse("{}")) override - { + json config_json = json::parse("{}")) override { auto model = job_manager->requestModelAccess(name); return model->ApplyJacobian(outWrt, inWrt, inputs, vec, config_json); } @@ -570,29 +533,24 @@ class LoadBalancer : public umbridge::Model const std::vector> &inputs, const std::vector &sens, const std::vector &vec, - json config_json = json::parse("{}")) override - { + json config_json = json::parse("{}")) override { auto model = job_manager->requestModelAccess(name); return model->ApplyHessian(outWrt, inWrt1, inWrt2, inputs, sens, vec, config_json); } - bool SupportsEvaluate() override - { + bool SupportsEvaluate() override { auto model = job_manager->requestModelAccess(name); return model->SupportsEvaluate(); } - bool SupportsGradient() override - { + bool SupportsGradient() override { auto model = job_manager->requestModelAccess(name); return model->SupportsGradient(); } - bool SupportsApplyJacobian() override - { + bool SupportsApplyJacobian() override { auto model = job_manager->requestModelAccess(name); return model->SupportsApplyJacobian(); } - bool SupportsApplyHessian() override - { + bool SupportsApplyHessian() override { auto model = job_manager->requestModelAccess(name); return model->SupportsApplyHessian(); } From eb4e1c6fa04abf00ad98734f9de59b754141e143 Mon Sep 17 00:00:00 2001 From: Lev Gromov Date: Sat, 12 Oct 2024 17:15:35 +0200 Subject: [PATCH 15/15] WIP: Add docs for HyperQueue and SLURM schedulers --- hpc/README.md | 116 +++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 100 insertions(+), 16 deletions(-) diff --git a/hpc/README.md b/hpc/README.md index b54ed05..817fbb0 100644 --- a/hpc/README.md +++ b/hpc/README.md @@ -1,6 +1,6 @@ # HPC -This load balancer allows any scaling up UM-Bridge applications to HPC systems. To the client, it behaves like a regular UM-Bridge server, except that i can process concurrent model evaluation requests. When it receives requests, it will adaptively spawn model server instances on the HPC system, and forward evaluation requests to them. To each model server instance, the load balancer in turn appears as a regular UM-Bridge client. +This load balancer allows scaling up any UM-Bridge application to HPC systems. To the client, it behaves like a regular UM-Bridge server, except that it can process concurrent model evaluation requests. When it receives requests, it will adaptively spawn model server instances on the HPC system, and forward evaluation requests to them. To each model server instance, the load balancer in turn appears as a regular UM-Bridge client. ## Installation @@ -8,35 +8,59 @@ This load balancer allows any scaling up UM-Bridge applications to HPC systems. Clone the UM-Bridge repository. - ``` + ```shell git clone https://github.com/UM-Bridge/umbridge.git ``` Then navigate to the `hpc` directory. - ``` + ```shell cd umbridge/hpc ``` - Finally, compile the load balancer. Depending on your HPC system, you likely have to load a module providing a recent c++ compiler. + Finally, compile the load balancer. Depending on your HPC system, you likely have to load a module providing a recent C++ compiler. - ``` + ```shell make ``` -2. **Download HyperQueue** + Once compilation is finished, you should have a `load-balancer` binary in your `umbridge/hpc` directory. It provides a simple CLI for launching and configuring the load balancer. +2. **(Optional) Download HyperQueue** + + If you wish to use the HyperQueue scheduler, download HyperQueue from the most recent release at [https://github.com/It4innovations/hyperqueue/releases](https://github.com/It4innovations/hyperqueue/releases) and place the `hq` binary in the `hpc` directory next to the load balancer. - Download HyperQueue from the most recent release at [https://github.com/It4innovations/hyperqueue/releases](https://github.com/It4innovations/hyperqueue/releases) and place the `hq` binary in the `hpc` directory next to the load balancer. ## Usage -The load balancer is primarily intended to run on a login node. +### Choosing a suitable scheduler + +You must first decide which scheduling strategy the load balancer should use. Here is a summary of the available options: + +* **SLURM** - Easy to setup, but slow for very quick models +* **HyperQueue** - Can be trickier to setup, but much more powerful + +We recommend trying out the simple SLURM scheduler first, before switching to the HyperQueue scheduler, if the performance is not good enough. + +### (Option 1) SLURM scheduler + +The load balancer will submit a new SLURM job for each incoming model request. Note that this can incur a sizable overhead, especially if your cluster is busy, so consider switching to the HyperQueue scheduler if your individual model runs are very short. + +To setup the SLURM scheduler, simply adjust the SLURM job script in `hpc/slurm_scripts/job.sh` to your needs: + +* Specify what UM-Bridge model server to run, +* and set `#SBATCH` variables at the top to specify what resources each instance should receive. + +Importantly, the UM-Bridge model server must serve its models at the port specified by the environment variable `PORT`. The value of `PORT` is automatically determined by `job.sh`, avoiding potential conflicts if multiple servers run on the same compute node. + +Also, make sure that the job script complies with the requirements of your cluster (e.g. setting credentials, partition, etc.). You can test if your job script works correctly by submitting it as usual using `sbatch`. + +### (Option 2) HyperQueue scheduler 1. **Configure resource allocation** The load balancer instructs HyperQueue to allocate batches of resources on the HPC system, depending on demand for model evaluations. HyperQueue will submit SLURM or PBS jobs on the HPC system when needed, scheduling requested model runs within those jobs. When demand decreases, HyperQueue will cancel some of those jobs again. - Adapt the configuration in ``hpc/hq_scripts/allocation_queue.sh`` to your needs. + Adapt the configuration in ``hpc/hq_scripts/allocation_queue.sh`` to your needs by setting options in the `hq alloc add` command (see the section on [resource management](#resource-management-with-hyperqueue) down below for more detailed instructions). For example, when running a very fast UM-Bridge model on an HPC cluster, it is advisable to choose medium-sized jobs for resource allocation. That will avoid submitting large numbers of jobs to the HPC system's scheduler, while HyperQueue itself will handle large numbers of small model runs within those allocated jobs. @@ -48,29 +72,89 @@ The load balancer is primarily intended to run on a login node. Importantly, the UM-Bridge model server must serve its models at the port specified by the environment variable `PORT`. The value of `PORT` is automatically determined by `job.sh`, avoiding potential conflicts if multiple servers run on the same compute node. - If your job is supposed to span multiple compute nodes via MPI, make sure that you forward the nodes HyperQueue allocates to you in `HQ_NODE_FILE` to MPI. See [https://it4innovations.github.io/hyperqueue/stable/jobs/multinode/](https://it4innovations.github.io/hyperqueue/stable/jobs/multinode/#running-mpi-tasks) for instructions. + If your job is supposed to span multiple compute nodes via MPI, make sure that you forward the nodes HyperQueue allocates to you in `HQ_NODE_FILE` to MPI. See the [HyperQueue documentation](https://it4innovations.github.io/hyperqueue/stable/jobs/multinode/#running-mpi-tasks) for further instructions. Also check out the section on [MPI tasks with HyperQueue under SLURM](#optional-launching-mpi-tasks-with-hyperqueue-under-slurm) to learn about some common pitfalls if you intend to run your model on a system with SLURM. + +### Running the load balancer -4. **Run load balancer** +1. **Launch the load balancer on the login node** - Navigate to the `hpc` directory and execute the load balancer. + Navigate to the `hpc` directory and execute the load balancer with your desired arguments. Note that specifying a scheduler is mandatory. + ```shell + ./load-balancer --scheduler=slurm # Use the SLURM scheduler + ./load-balancer --scheduler=hyperqueue # Use the HyperQueue scheduler ``` - ./load-balancer + The other optional arguments the load balancer accepts are: + ```shell + --port=1234 # Run load balancer on the specified port instead of the default 4242 + --delay-ms=100 # Set a delay (in milliseconds) for job submissions. Useful if too many rapid job submissions cause stability issues. ``` -5. **Connect from client** +4. **Connect from client** Once running, you can connect to the load balancer from any UM-Bridge client on the login node via `http://localhost:4242`. To the client, it will appear like any other UM-Bridge server, except that it can process concurrent evaluation requests. +## Resource management with HyperQueue + +### Specifying HyperQueue worker resources + +Each HyperQueue worker created by an allocation queue gets assigned a set of resources. A HyperQueue job may request resources and will only be scheduled to workers which have enough resources to fulfill the request. These resources are **purely logical** and are only used for scheduling in HyperQueue. **There is no inherent connection to actual hardware resources!** + +HyperQueue can automatically detect certain generic resources like CPUs, GPUs and memory. However, this mechanism may not work on all types of hardware. We recommend the much more robust approach of manually specifying resources for HyperQueue workers in `hpc/hq_scripts/allocation_queue.sh`. Please refer to the [HyperQueue documentation](https://it4innovations.github.io/hyperqueue/stable/jobs/resources/) to find out how to manually specify resources. + +### Requesting hardware resources with a job manager (SLURM/PBS) + +As stated in the previous section, resources assigned to workers do not correspond to actual hardware. If you want to run your model on a system that uses a job manager (e.g. SLURM or PBS), then you also need to request hardware resources from your job manager and ensure that these match with the logical resources assigned to each worker. + +The following section assumes that your system is using SLURM as a job manager, but a similar approach should also apply to other job managers like PBS: + +An allocation queue will automatically submit and cancel SLURM job allocations depending on the number of incoming requests received by the load balancer. HyperQueue will spawn exactly one worker per node inside each SLURM job allocation. Therefore, you should **always specify hardware resources on a per node basis** to ensure that each HyperQueue worker has the same logical and hardware resources. + +You can configure hardware resources in `hpc/hq_scripts/allocation_queue.sh` by passing options to the `hq alloc add` command or directly to SLURM (everything after the `--` at the end). Here is a summary of the options you should and should not use to properly specify hardware resources: + +* Always use the `--time-limit` parameter from HyperQueue instead of directly passing `--time` to SLURM. +* Always use the `--workers-per-alloc` parameter from HyperQueue instead of directly passing `--nodes` to SLURM. +* Never use SLURM's `--ntasks` or `--ntasks-per-node` since this will prevent HyperQueue from correctly spawning exactly one worker per node. +* Instead, use SLURM's `--cpus-per-task` to specify the number of CPUs each worker should have. +* Use SLURM's `--mem` to specify the memory each worker should have. + +## (Optional) Launching MPI tasks with HyperQueue under SLURM + +### Support for multi-node tasks in HyperQueue + +**Disclaimer:** Support for multi-node tasks is still in an experimental stage in HyperQueue. These instructions were tested on a specific system with HyperQueue v0.19.0. They may not work for your individual setup and are subject to change. + +A HyperQueue job may request multiple nodes using the `--nodes` option. Nodes assigned to this job are reserved exclusively and may not be used by other HyperQueue jobs until the job is finished. + +HyperQueue will set the environment variables `HQ_NODE_FILE` and `HQ_HOST_FILE` which contain a path to a file where each line is respectively the shortened or full host name of a node that was assigned to the job. For most systems using the shortened names in the `HQ_NODE_FILE` should be good enough. + +For convenience HyperQueue also provides the environment variable `HQ_NUM_NODES` which contains the number of nodes assigned to the job. + +### Launching MPI tasks in a HyperQueue job under SLURM + +The recommended way to launch MPI tasks under SLURM is using `srun`. Make sure to pass the parameters `--nodes=$HQ_NUM_NODES` and `--nodefile=$HQ_NODE_FILE` to ensure that your MPI tasks will be executed only on the nodes that were assigned to the HyperQueue job. + +As a reference, here is an example `srun` command which will launch two MPI tasks each on two nodes for a total of four MPI tasks. Note that each node has two CPUs available in this scenario. Read the following sections for further explanations of the options that were used: + +```shell +srun --overlap --mem=0 --ntasks=4 --ntasks-per-node=2 --cpus-per-task=1 --nodes=$HQ_NUM_NODES --nodefile=$HQ_NODE_FILE +``` + +A common problem which you may encounter is that the `srun` command simply gets stuck forever because it is unable to acquire the requested resources. This can happen due to HyperQueue also using `srun` to spawn HyperQueue workers on each node in an allocation. Since the HyperQueue workers are running at all times, SLURM thinks that the resources in the job allocation are occupied and therefore refuses to execute your `srun` command. You can fix this issue by adding the `--overlap` and `--mem=0` options, which will tell `srun` to share it's resources with other job steps and to use as much memory as available on each node. + +You also need to ensure that the total number of MPI tasks requested in `srun` does not exceed the total number of CPUs available in the nodes your job was assigned to. Otherwise you will encounter the same issue where `srun` gets stuck forever. + +Lastly, always specify the `--ntasks`, `--ntasks-per-node` and `--cpus-per-task` options in your `srun` command. This is required because otherwise `srun` will inherit the SLURM options that HyperQueue set to spawn the workers in the SLURM job allocation which will lead to undesired behavior. + ## (Optional) Varying resource requirements per model (e.g. for multilevel / multifidelity) -If your UM-Bridge server provides multiple models, you can specify different resource requirements for each of them. Define a separate job script ``hpc/hq_scripts/job_.sh`` for each model that needs different resources than what you defined in the default ``job.sh``. +If your UM-Bridge server provides multiple models, you can specify different resource requirements for each of them. Define a separate job script ``job_.sh`` (in `hpc/slurm_scripts` or `hpc/hq_scripts` for SLURM and HyperQueue respectively) for each model that needs different resources than what you defined in the default ``job.sh``. ## (Optional) Running clients on your own machine while offloading runs to HPC Alternatively, a client may run on your own device. In order to connect UM-Bridge clients on your machine to the login node, you can create an SSH tunnel to the HPC system. -``` +```shell ssh @hpc.cluster.address -N -f -L 4242::4242 # start ssh tunnel # -N : do not execute remote command