Skip to content

Commit

Permalink
Merge pull request #85 from UM-Bridge/hpc-multiple-backends
Browse files Browse the repository at this point in the history
HPC: Multiple Backends
  • Loading branch information
linusseelinger authored Oct 12, 2024
2 parents 99424b4 + eb4e1c6 commit 11fb3c9
Show file tree
Hide file tree
Showing 6 changed files with 692 additions and 307 deletions.
191 changes: 73 additions & 118 deletions hpc/LoadBalancer.cpp
Original file line number Diff line number Diff line change
@@ -1,23 +1,14 @@
#include "LoadBalancer.hpp"
#include <iostream>
#include <string>
#include "../lib/umbridge.h"

#include <algorithm>
#include <chrono>
#include <thread>
#include <filesystem>
#include <algorithm>

#include <unistd.h>
#include <limits.h>

#include "../lib/umbridge.h"
#include <iostream>
#include <string>

void create_directory_if_not_existing(std::string directory) {
if (!std::filesystem::is_directory(directory) || !std::filesystem::exists(directory)) {
std::filesystem::create_directory(directory);
}
}

void clear_url(std::string directory) {
void clear_url(const std::string& directory) {
for (auto& file : std::filesystem::directory_iterator(directory)) {
if (std::regex_match(file.path().filename().string(), std::regex("url-\\d+\\.txt"))) {
std::filesystem::remove(file);
Expand All @@ -29,140 +20,104 @@ void launch_hq_with_alloc_queue() {
std::system("./hq server stop &> /dev/null");

std::system("./hq server start &");
sleep(1); // Workaround: give the HQ server enough time to start.
// Wait for the HQ server to start
std::system("until ./hq server info &> /dev/null; do sleep 1; done");

// Create HQ allocation queue
std::system("hq_scripts/allocation_queue.sh");
}

const std::vector<std::string> get_model_names() {
// Don't start a client, always use the default job submission script.
HyperQueueJob hq_job("", false, true);
std::string get_arg(const std::vector<std::string>& args, const std::string& arg_name) {
// Check if a string matches the format --<arg_name>=...
const std::string search_string = "--" + arg_name + "=";
auto check_format = [&search_string](const std::string& s) {
return (s.length() > search_string.length()) && (s.rfind(search_string, 0) == 0);
};

return umbridge::SupportedModels(hq_job.server_url);
// Return value of the argument or empty string if not found
if (const auto it = std::find_if(args.begin(), args.end(), check_format); it != args.end()) {
return it->substr(search_string.length());
}

return "";
}

void print_model_and_job_script_information(const std::vector<std::string>& model_names) {
// Constants
const std::filesystem::path SUBMISSION_SCRIPT_DIR("./hq_scripts");
const std::filesystem::path SUBMISSION_SCRIPT_GENERIC("job.sh");

const std::string SECTION_START_DELIMITER = "==============================MODEL INFO==============================";
const std::string SECTION_END_DELIMITER = "======================================================================";

// Sort the model names in alphabetical order for cleaner output.
std::vector<std::string> model_names_sorted = model_names;
std::sort(model_names_sorted.begin(), model_names_sorted.end());

std::cout << SECTION_START_DELIMITER << "\n";
// Print list of available models and corresponding job-scripts.
std::cout << "Available models and corresponding job-scripts:\n";
for (const std::string& model_name : model_names_sorted) {
// Determine which job script will be used by checking if a model specific job script exists.
std::string used_job_script;
const std::filesystem::path submission_script_model_specific("job_" + model_name + ".sh");
if (std::filesystem::exists(SUBMISSION_SCRIPT_DIR / submission_script_model_specific)) {
used_job_script = submission_script_model_specific.string();
} else {
used_job_script = SUBMISSION_SCRIPT_GENERIC.string();
}
std::cout << "* Model '" << model_name << "' --> '" << used_job_script << "'\n";
}
std::cout << std::endl;


// Check if there are job scripts that are unused and print a warning.
std::vector<std::string> unused_job_scripts;

// Build a regex to parse job-script filenames and extract the model name.
// Format should be: job_<model_name>.sh
const std::string format_prefix = "^job_"; // Ensures that filename starts with 'job_'.
const std::string format_suffix = "\\.sh$"; // Ensures that filename ends with '.sh'.
const std::string format_model_name = "(.*)"; // Arbitrary sequence of characters as a marked subexpression.
const std::regex format_regex(format_prefix + format_model_name + format_suffix);

for (auto& file : std::filesystem::directory_iterator(SUBMISSION_SCRIPT_DIR)) {
const std::string filename = file.path().filename().string();
// Check if filename matches format of a model specific job script, i.e. 'job_<model_name>.sh'.
std::smatch match_result;
if (std::regex_search(filename, match_result, format_regex)) {
// Extract first matched subexpression, i.e. the model name.
const std::string model_name = match_result[1].str();
// Check if a corresponding model exists. If not, mark job script as unused.
if (!std::binary_search(model_names_sorted.begin(), model_names_sorted.end(), model_name)) {
unused_job_scripts.push_back(filename);
}
}
}
int main(int argc, char* argv[]) {
clear_url("urls");

// Print the warning message.
if(!unused_job_scripts.empty()) {
// Sort unused job scripts alphabetically for cleaner output.
std::sort(unused_job_scripts.begin(), unused_job_scripts.end());
// Process command line args
std::vector<std::string> args(argv + 1, argv + argc);

std::cout << "WARNING: The following model-specific job-scripts are not used by any of the available models:\n";
for (const std::string& job_script : unused_job_scripts) {
std::cout << "* '" << job_script << "'\n";
}
std::cout << std::endl;
// Scheduler used by the load balancer (currently either SLURM or HyperQueue)
std::string scheduler = get_arg(args, "scheduler");
// Specifying a scheduler is mandatory since this should be a conscious choice by the user
if (scheduler.empty()) {
std::cerr << "Missing required argument: --scheduler=[hyperqueue | slurm]" << std::endl;
std::exit(-1);
}

std::cout << "If this behavior is unintentional, then please verify that:\n"
<< "1. The filename of your model-specific job-script follows the format: 'job_<your_model_name>.sh' (e.g. 'job_mymodel.sh')\n"
<< "2. The spelling of your model name matches in the model definition and in the filename of your model-specific job-script.\n";
// Delay for job submissions in milliseconds
std::string delay_str = get_arg(args, "delay-ms");
std::chrono::milliseconds delay = std::chrono::milliseconds::zero();
if (!delay_str.empty()) {
delay = std::chrono::milliseconds(std::stoi(delay_str));
}

std::cout << SECTION_END_DELIMITER << std::endl;
}
// Load balancer port
std::string port_str = get_arg(args, "port");
int port = 4242;
if (port_str.empty()) {
std::cout << "Argument --port not set! Using port 4242 as default." << std::endl;
} else {
port = std::stoi(port_str);
}

std::atomic<int32_t> HyperQueueJob::job_count = 0;

// Assemble job manager
std::unique_ptr<JobSubmitter> job_submitter;
std::filesystem::path script_dir;
if (scheduler == "hyperqueue") {
launch_hq_with_alloc_queue();
job_submitter = std::make_unique<HyperQueueSubmitter>(delay);
script_dir = "hq_scripts";
} else if (scheduler == "slurm") {
job_submitter = std::make_unique<SlurmSubmitter>(delay);
script_dir = "slurm_scripts";
} else {
std::cerr << "Unrecognized value for argument --scheduler: "
<< "Expected hyperqueue or slurm but got " << scheduler << " instead." << std::endl;
std::exit(-1);
}

int main(int argc, char *argv[])
{
create_directory_if_not_existing("urls");
create_directory_if_not_existing("sub-jobs");
clear_url("urls");
// Only filesystem communication is implemented. May implement network-based communication in the future.
// Directory which stores URL files and polling cycle currently hard-coded.
std::unique_ptr<JobCommunicatorFactory> comm_factory
= std::make_unique<FilesystemCommunicatorFactory>("urls", std::chrono::milliseconds(500));

launch_hq_with_alloc_queue();
// Location of job scripts and naming currently hard-corded.
JobScriptLocator locator {script_dir, "job.sh", "job_", ".sh"};

// Read environment variables for configuration
char const *port_cstr = std::getenv("PORT");
int port = 0;
if (port_cstr == NULL)
{
std::cout << "Environment variable PORT not set! Using port 4242 as default." << std::endl;
port = 4242;
}
else
{
port = atoi(port_cstr);
}
std::shared_ptr<JobManager> job_manager = std::make_shared<CommandJobManager>(
std::move(job_submitter), std::move(comm_factory), locator);

char const *delay_cstr = std::getenv("HQ_SUBMIT_DELAY_MS");
if (delay_cstr != NULL)
{
hq_submit_delay_ms = atoi(delay_cstr);
}
std::cout << "HQ_SUBMIT_DELAY_MS set to " << hq_submit_delay_ms << std::endl;

// Initialize load balancer for each available model on the model server.
const std::vector<std::string> model_names = get_model_names();
std::vector<std::string> model_names = job_manager->getModelNames();

// Inform the user about the available models and the job scripts that will be used.
// Output a warning for unused model-specific job-scripts to prevent typos.
print_model_and_job_script_information(model_names);
locator.printModelJobScripts(model_names);

// Prepare models and serve via network
std::vector<LoadBalancer> LB_vector;
for (auto model_name : model_names)
{
// Set up and serve model
LB_vector.emplace_back(LoadBalancer{model_name});
for (auto model_name : model_names) {
LB_vector.emplace_back(model_name, job_manager);
}

// umbridge::serveModels currently only accepts raw pointers.
std::vector<umbridge::Model *> LB_ptr_vector(LB_vector.size());
std::transform(LB_vector.begin(), LB_vector.end(), LB_ptr_vector.begin(),
[](LoadBalancer& obj) { return &obj; });

std::cout << "Load balancer running port" << port << std::endl;
umbridge::serveModels(LB_ptr_vector, "0.0.0.0", port, true, false);
}
Loading

0 comments on commit 11fb3c9

Please sign in to comment.