From 7a0a3052a8d5346b2f6c25a3652c5005a722cbb5 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Tue, 15 Dec 2020 13:07:24 +0000 Subject: [PATCH 01/27] adding helloworld example code --- examples/mpi_helloworld.cpp | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100644 examples/mpi_helloworld.cpp diff --git a/examples/mpi_helloworld.cpp b/examples/mpi_helloworld.cpp new file mode 100644 index 000000000..efe8a4717 --- /dev/null +++ b/examples/mpi_helloworld.cpp @@ -0,0 +1,19 @@ +#include +#include + +int main() +{ + auto logger = faabric::util::getLogger(); + + MPI_Init(NULL, NULL); + + int rank, worldSize; + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &worldSize); + + logger->info("Hello world from rank %i of %i", rank, worldSize); + + MPI_Finalize(); + + return 0; +} From cd2c581736d1ddbe0bd719623933789c6b794233 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Thu, 17 Dec 2020 06:20:13 +0000 Subject: [PATCH 02/27] skeleton of native mpi compilation --- CMakeLists.txt | 2 ++ examples/CMakeLists.txt | 1 + src/native-mpi/CMakeLists.txt | 8 ++++++++ src/native-mpi/mpi.cpp | 38 +++++++++++++++++++++++++++++++++++ 4 files changed, 49 insertions(+) create mode 100644 src/native-mpi/CMakeLists.txt create mode 100644 src/native-mpi/mpi.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 53bd8c71b..010c1cd64 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -56,6 +56,7 @@ endfunction() add_subdirectory(src/endpoint) add_subdirectory(src/executor) add_subdirectory(src/mpi) +add_subdirectory(src/native-mpi) add_subdirectory(src/proto) add_subdirectory(src/redis) add_subdirectory(src/scheduler) @@ -75,6 +76,7 @@ add_library(faabric faabric.cpp $ $ + $ $ $ $ diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index cc4f4e08a..d994ce9b1 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -31,6 +31,7 @@ function(add_example example_name) endfunction() add_example(check) +add_example(mpi_helloworld) add_example(server) add_custom_target(all_examples DEPENDS ${ALL_EXAMPLES}) diff --git a/src/native-mpi/CMakeLists.txt b/src/native-mpi/CMakeLists.txt new file mode 100644 index 000000000..31a421719 --- /dev/null +++ b/src/native-mpi/CMakeLists.txt @@ -0,0 +1,8 @@ +set(LIB_FILES + mpi.cpp + ${FAABRIC_INCLUDE_DIR}/faabric/mpi/mpi.h + ) + +faabric_lib(mpi "${LIB_FILES}") + +#target_link_libraries(mpi) diff --git a/src/native-mpi/mpi.cpp b/src/native-mpi/mpi.cpp new file mode 100644 index 000000000..1e96fc6c8 --- /dev/null +++ b/src/native-mpi/mpi.cpp @@ -0,0 +1,38 @@ +#include +#include + +int MPI_Init(int* argc, char*** argv) +{ + auto logger = faabric::util::getLogger(); + logger->debug("MPI_Init"); + + return MPI_SUCCESS; +} + +int MPI_Comm_rank(MPI_Comm comm, int* rank) +{ + auto logger = faabric::util::getLogger(); + logger->debug("MPI_Comm_rank"); + + *rank = 1337; + + return MPI_SUCCESS; +} + +int MPI_Comm_size(MPI_Comm comm, int* size) +{ + auto logger = faabric::util::getLogger(); + logger->debug("MPI_Comm_size"); + + *size = 9337; + + return MPI_SUCCESS; +} + +int MPI_Finalize() +{ + auto logger = faabric::util::getLogger(); + logger->debug("MPI_Finalize"); + + return MPI_SUCCESS; +} From a4f6867b3b775f98e2d4ffb90102857548086f24 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Thu, 17 Dec 2020 08:02:26 +0000 Subject: [PATCH 03/27] adding a task to run mpi-specific applications --- examples/mpi_helloworld.cpp | 5 ++++- tasks/examples.py | 32 +++++++++++++++++++++++++++++++- 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/examples/mpi_helloworld.cpp b/examples/mpi_helloworld.cpp index efe8a4717..5c5a0256b 100644 --- a/examples/mpi_helloworld.cpp +++ b/examples/mpi_helloworld.cpp @@ -1,6 +1,8 @@ #include #include +#include + int main() { auto logger = faabric::util::getLogger(); @@ -11,7 +13,8 @@ int main() MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &worldSize); - logger->info("Hello world from rank %i of %i", rank, worldSize); + sleep(2); + logger->info("Hello world from rank {} of {}", rank, worldSize); MPI_Finalize(); diff --git a/tasks/examples.py b/tasks/examples.py index 999b070d9..6f5548c40 100644 --- a/tasks/examples.py +++ b/tasks/examples.py @@ -2,7 +2,7 @@ from shutil import rmtree from os.path import join, exists from copy import copy -from subprocess import run +from subprocess import run, Popen from tasks.util.env import PROJ_ROOT, FAABRIC_INSTALL_PREFIX @@ -14,6 +14,8 @@ INCLUDE_DIR = "{}/include".format(FAABRIC_INSTALL_PREFIX) LIB_DIR = "{}/lib".format(FAABRIC_INSTALL_PREFIX) +MPI_DEFAULT_WORLD_SIZE = 5 + @task(default=True) def build(ctx, clean=False): @@ -74,3 +76,31 @@ def execute(ctx, example): ) run(exe_path, env=shell_env, shell=True, check=True) + + +@task +def execute_mpi(ctx, example): + """ + Runs an MPI example + """ + exe_path = join(BUILD_DIR, example) + + if not exists(exe_path): + raise RuntimeError("Did not find {} as expected".format(exe_path)) + + shell_env = copy(environ) + shell_env.update( + { + "LD_LIBRARY_PATH": LIB_DIR, + } + ) + if "MPI_WORLD_SIZE" not in environ: + shell_env.update({"MPI_WORLD_SIZE": str(MPI_DEFAULT_WORLD_SIZE)}) + + # run(exe_path, env=shell_env, shell=True, check=True) + procs = [ + Popen(exe_path, env=shell_env, shell=True) + for _ in range(int(environ["MPI_WORLD_SIZE"])) + ] + for p in procs: + p.wait() From feb7e8eb05af71d664a5493f6e90aa0a4c926ffa Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Fri, 18 Dec 2020 12:49:59 +0000 Subject: [PATCH 04/27] adding executor script --- examples/mpi_helloworld.cpp | 1 + examples/mpi_runner.cpp | 39 +++++++++++++++++++++++++++++++++++++ src/native-mpi/mpi.cpp | 25 +++++++++++++++++++++++- tasks/examples.py | 8 ++------ 4 files changed, 66 insertions(+), 7 deletions(-) create mode 100644 examples/mpi_runner.cpp diff --git a/examples/mpi_helloworld.cpp b/examples/mpi_helloworld.cpp index 5c5a0256b..e2d61e426 100644 --- a/examples/mpi_helloworld.cpp +++ b/examples/mpi_helloworld.cpp @@ -5,6 +5,7 @@ int main() { + faabric::util::initLogging(); auto logger = faabric::util::getLogger(); MPI_Init(NULL, NULL); diff --git a/examples/mpi_runner.cpp b/examples/mpi_runner.cpp new file mode 100644 index 000000000..65cc82926 --- /dev/null +++ b/examples/mpi_runner.cpp @@ -0,0 +1,39 @@ +#include + +#include +#include + +#define EXAMPLE_BUILD_PATH "/code/faabric/examples/build" + +// Macro defined in include/faabric/executor/FaabricPool.h +// bool _execFunc(faabric::Message& msg) +FAABRIC_EXECUTOR +{ + // logger->info("Executor {} running function {}" + std::string function = msg.function(); + std::string function_path = + std::string(EXAMPLE_BUILD_PATH) + "/" + function; + return true; +} + +int main(int argc, char** argv) +{ + // Process input parameters + + faabric::util::initLogging(); + auto logger = faabric::util::getLogger(); + + logger->info("Starting faaslet pool in the background"); + _Pool p(5); + FaabricMain w(p); + w.startBackground(); + + logger->info("Running mpi function {}", function); + const char* funcName = "hellompi"; + faabric::Message msg = faabric::util::messageFactory("mpi", funcName); + msg.set_mpiworldsize(5); + auto sch = faabric::scheduler::getScheduler(); + sch.callFunction(msg); + + return 0; +} diff --git a/src/native-mpi/mpi.cpp b/src/native-mpi/mpi.cpp index 1e96fc6c8..278a042a9 100644 --- a/src/native-mpi/mpi.cpp +++ b/src/native-mpi/mpi.cpp @@ -1,10 +1,33 @@ #include +#include #include +static thread_local faabric::scheduler::MpiContext executingContext; + +faabric::scheduler::MpiWorld& getExecutingWorld() +{ + int worldId = executingContext.getWorldId(); + auto reg = faabric::scheduler::getMpiWorldRegistry(); + return reg.getOrInitialiseWorld(*getExecutingCall(), worldId); +} + int MPI_Init(int* argc, char*** argv) { auto logger = faabric::util::getLogger(); - logger->debug("MPI_Init"); + + faabric::Message* call = getExecutingCall(); + + if (call->mpirank() <= 0) { + logger->debug("S - MPI_Init (create) {} {}", a, b); + executingContext.createWorld(*call); + } else { + logger->debug("S - MPI_Init (join) {} {}", a, b); + executingContext.joinWorld(*call); + } + + int thisRank = executingContext.getRank(); + auto world = getExecutingWorld(); + world.barrier(thisRank); return MPI_SUCCESS; } diff --git a/tasks/examples.py b/tasks/examples.py index 6f5548c40..8431db819 100644 --- a/tasks/examples.py +++ b/tasks/examples.py @@ -79,7 +79,7 @@ def execute(ctx, example): @task -def execute_mpi(ctx, example): +def execute_mpi(ctx, example, np=MPI_DEFAULT_WORLD_SIZE): """ Runs an MPI example """ @@ -94,13 +94,9 @@ def execute_mpi(ctx, example): "LD_LIBRARY_PATH": LIB_DIR, } ) - if "MPI_WORLD_SIZE" not in environ: - shell_env.update({"MPI_WORLD_SIZE": str(MPI_DEFAULT_WORLD_SIZE)}) - # run(exe_path, env=shell_env, shell=True, check=True) procs = [ - Popen(exe_path, env=shell_env, shell=True) - for _ in range(int(environ["MPI_WORLD_SIZE"])) + Popen(exe_path, env=shell_env, shell=True) for _ in range(np) ] for p in procs: p.wait() From 6e97ea2b185d4980d6437f24bfbd971be8b14b02 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Mon, 4 Jan 2021 08:21:21 +0000 Subject: [PATCH 05/27] adding new faabric mpi main macro --- examples/mpi_helloworld.cpp | 79 +++++++++++++++++++++++++++++++++++-- examples/mpi_runner.cpp | 39 ------------------ src/native-mpi/mpi.cpp | 4 +- tasks/examples.py | 18 +++++++-- 4 files changed, 92 insertions(+), 48 deletions(-) delete mode 100644 examples/mpi_runner.cpp diff --git a/examples/mpi_helloworld.cpp b/examples/mpi_helloworld.cpp index e2d61e426..2a74d4685 100644 --- a/examples/mpi_helloworld.cpp +++ b/examples/mpi_helloworld.cpp @@ -1,12 +1,84 @@ -#include +#include +#include +#include +#include #include +#include + #include -int main() +#define FAABRIC_MPI_MAIN() + using namespace faabric::executor; + + bool _execMpiFunc(const faabric::Message* msg); + + class _Executor final : public FaabricExecutor + { + public: + explicit _Executor() + : FaabricExecutor(0) + {} + + void setExecutingCall(faabric::Message* msg) { + this->m_executingCall = msg; + } + + bool doExecute(faabric::Message& msg) override { + setExecutingCall(&msg); + return _execMpiFunc(&msg); + } + + faabric::Message* getExecutingCall() { return m_executingCall; } + + private: + faabric::Message* m_executingCall; + }; + class _SingletonPool : public FaabricPool + { + public: + explicit _SingletonPool() + : FaabricPool(1) + {} + + std::unique_ptr createExecutor(int threadIdx) override + { + return std::make_unique<_Executor>(); + } + }; + int main(int argc, char** argv) + { + auto logger = faabric::util::getLogger(); + faabric::scheduler::Scheduler& scheduler = faabric::scheduler::getScheduler(); + auto conf = faabric::util::getSystemConfig(); + faabric::endpoint::FaabricEndpoint endpoint; + + // Add host to the global set + scheduler.addHostToGlobalSet(); + + // Print current configuration + conf.print(); + + // Start the thread pool (nThreads == 1) and the state and function + // call servers. + _SingletonPool p; + p.startThreadPool(); + p.startStateServer(); + p.startFunctionCallServer(); + endpoint.start(); + + // Shutdown + scheduler.clear(); + p.shutdown(); + } + + bool _execMpiFunc(const faabric::Message* msg) + +FAABRIC_MPI_MAIN() { - faabric::util::initLogging(); auto logger = faabric::util::getLogger(); + logger->info("Hello world from Faabric MPI Main!"); + logger->info("this is our executing call {}", msg->user()); MPI_Init(NULL, NULL); @@ -15,7 +87,6 @@ int main() MPI_Comm_size(MPI_COMM_WORLD, &worldSize); sleep(2); - logger->info("Hello world from rank {} of {}", rank, worldSize); MPI_Finalize(); diff --git a/examples/mpi_runner.cpp b/examples/mpi_runner.cpp deleted file mode 100644 index 65cc82926..000000000 --- a/examples/mpi_runner.cpp +++ /dev/null @@ -1,39 +0,0 @@ -#include - -#include -#include - -#define EXAMPLE_BUILD_PATH "/code/faabric/examples/build" - -// Macro defined in include/faabric/executor/FaabricPool.h -// bool _execFunc(faabric::Message& msg) -FAABRIC_EXECUTOR -{ - // logger->info("Executor {} running function {}" - std::string function = msg.function(); - std::string function_path = - std::string(EXAMPLE_BUILD_PATH) + "/" + function; - return true; -} - -int main(int argc, char** argv) -{ - // Process input parameters - - faabric::util::initLogging(); - auto logger = faabric::util::getLogger(); - - logger->info("Starting faaslet pool in the background"); - _Pool p(5); - FaabricMain w(p); - w.startBackground(); - - logger->info("Running mpi function {}", function); - const char* funcName = "hellompi"; - faabric::Message msg = faabric::util::messageFactory("mpi", funcName); - msg.set_mpiworldsize(5); - auto sch = faabric::scheduler::getScheduler(); - sch.callFunction(msg); - - return 0; -} diff --git a/src/native-mpi/mpi.cpp b/src/native-mpi/mpi.cpp index 278a042a9..46fb33b76 100644 --- a/src/native-mpi/mpi.cpp +++ b/src/native-mpi/mpi.cpp @@ -7,7 +7,7 @@ static thread_local faabric::scheduler::MpiContext executingContext; faabric::scheduler::MpiWorld& getExecutingWorld() { int worldId = executingContext.getWorldId(); - auto reg = faabric::scheduler::getMpiWorldRegistry(); + faabric::scheduler::MpiWorldRegistry& reg = faabric::scheduler::getMpiWorldRegistry(); return reg.getOrInitialiseWorld(*getExecutingCall(), worldId); } @@ -26,7 +26,7 @@ int MPI_Init(int* argc, char*** argv) } int thisRank = executingContext.getRank(); - auto world = getExecutingWorld(); + faabric::scheduler::MpiWorld& world = getExecutingWorld(); world.barrier(thisRank); return MPI_SUCCESS; diff --git a/tasks/examples.py b/tasks/examples.py index 8431db819..5a410e3eb 100644 --- a/tasks/examples.py +++ b/tasks/examples.py @@ -8,6 +8,8 @@ from invoke import task +import requests + EXAMPLES_DIR = join(PROJ_ROOT, "examples") BUILD_DIR = join(EXAMPLES_DIR, "build") @@ -95,8 +97,18 @@ def execute_mpi(ctx, example, np=MPI_DEFAULT_WORLD_SIZE): } ) - procs = [ - Popen(exe_path, env=shell_env, shell=True) for _ in range(np) - ] + procs = [Popen(exe_path, env=shell_env, shell=True) for _ in range(np)] for p in procs: p.wait() + + +@task +def invoke_mpi(ctx, host="0.0.0.0", port="8080"): + """ + Invoke MPI function through HTTP handler + """ + # The host:port address must match that of the HTTP Endpoint + url = "http://{}:{}".format(host, port) + msg = {"user": "mpi", "function": "faabric", "mpi_world_size": 1} + response = requests.post(url, json=msg, headers=None) + print(response.text) From 0d17cd69cfa171c775f772ccae69282f1fdca64c Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Fri, 8 Jan 2021 06:38:16 +0000 Subject: [PATCH 06/27] adding scale-up and scale-down options through docker-compose and exposing the binary as a mount volume --- docker-compose.yml | 1 + tasks/examples.py | 27 +++++++++++++++++++++++---- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index bb1dbba16..0ebb817e3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,6 +9,7 @@ services: volumes: - /var/run/docker.sock:/var/run/docker.sock - /usr/bin/docker:/usr/bin/docker + - /usr/local/bin/docker-compose:/usr/bin/docker-compose - .:/code/faabric - ./build:/build/faabric working_dir: /code/faabric diff --git a/tasks/examples.py b/tasks/examples.py index 5a410e3eb..9098a58aa 100644 --- a/tasks/examples.py +++ b/tasks/examples.py @@ -16,7 +16,8 @@ INCLUDE_DIR = "{}/include".format(FAABRIC_INSTALL_PREFIX) LIB_DIR = "{}/lib".format(FAABRIC_INSTALL_PREFIX) -MPI_DEFAULT_WORLD_SIZE = 5 +FAABRIC_SERVICE_NAME = "cli" +MPI_DEFAULT_WORLD_SIZE = 1 @task(default=True) @@ -97,9 +98,16 @@ def execute_mpi(ctx, example, np=MPI_DEFAULT_WORLD_SIZE): } ) - procs = [Popen(exe_path, env=shell_env, shell=True) for _ in range(np)] - for p in procs: - p.wait() + # Start up np - 1 faabric_processes. Note that if there are any other + # scaled out processes they will be stopped and removed. Additionally, + # running /bin/cli.sh whilst having a scaled out client will stop & remove + # them as well. + scale_up_cmd = "docker-compose up -d --scale {}={} --no-recreate".format( + FAABRIC_SERVICE_NAME, np - 1 + ) + run(scale_up_cmd, shell=True, check=True) + + run(exe_path, env=shell_env, shell=True, check=True) @task @@ -112,3 +120,14 @@ def invoke_mpi(ctx, host="0.0.0.0", port="8080"): msg = {"user": "mpi", "function": "faabric", "mpi_world_size": 1} response = requests.post(url, json=msg, headers=None) print(response.text) + + +@task +def terminate_mpi(ctx): + """ + Terminate an MPI execution + """ + # This will stop and remove all containers scaled out (i.e. invoked using + # the --scale flag) and leave those specified in the docker-compose file. + scale_down_cmd = "docker-compose up -d --no-recreate" + run(scale_down_cmd, shell=True, check=True) From 732afe449f314b5a6ea3e5aa42a2e9b30deeaa00 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Mon, 11 Jan 2021 07:18:10 +0000 Subject: [PATCH 07/27] adding native mpi_executor in lieu of the overly complex macro --- examples/mpi_helloworld.cpp | 10 +++++ include/faabric/mpi/MpiExecutor.h | 53 +++++++++++++++++++++++++ src/mpi/MpiExecutor.cpp | 64 +++++++++++++++++++++++++++++++ src/native-mpi/mpi.cpp | 4 +- 4 files changed, 129 insertions(+), 2 deletions(-) create mode 100644 include/faabric/mpi/MpiExecutor.h create mode 100644 src/mpi/MpiExecutor.cpp diff --git a/examples/mpi_helloworld.cpp b/examples/mpi_helloworld.cpp index 2a74d4685..13ec94070 100644 --- a/examples/mpi_helloworld.cpp +++ b/examples/mpi_helloworld.cpp @@ -5,9 +5,11 @@ #include #include +#include #include +#if 0 #define FAABRIC_MPI_MAIN() using namespace faabric::executor; @@ -29,6 +31,11 @@ return _execMpiFunc(&msg); } + bool postFinishCall() override { + auto logger = faabric::util::getLogger(); + logger->debug("Finished MPI execution."); + } + faabric::Message* getExecutingCall() { return m_executingCall; } private: @@ -73,6 +80,9 @@ } bool _execMpiFunc(const faabric::Message* msg) +#endif + +using namespace faabric::executor; FAABRIC_MPI_MAIN() { diff --git a/include/faabric/mpi/MpiExecutor.h b/include/faabric/mpi/MpiExecutor.h new file mode 100644 index 000000000..c6a61f53b --- /dev/null +++ b/include/faabric/mpi/MpiExecutor.h @@ -0,0 +1,53 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace faabric::executor { +class MpiExecutor final : public faabric::executor::FaabricExecutor +{ + public: + explicit MpiExecutor(); + + bool doExecute(faabric::Message& msg) override; + + void postFinishCall() override; + + faabric::Message* getExecutingCall(); + + private: + faabric::Message* m_executingCall; +}; + +class SingletonPool : public faabric::executor::FaabricPool +{ + public: + explicit SingletonPool(); + + void startPool(); + + protected: + std::unique_ptr createExecutor(int threadIdx) override + { + return std::make_unique(); + } + + private: + faabric::scheduler::Scheduler& scheduler; + faabric::endpoint::FaabricEndpoint endpoint; +}; +} + +bool _execMpiFunc(const faabric::Message* msg); + +#define FAABRIC_MPI_MAIN() +int main() +{ + faabric::executor::SingletonPool p; + p.startPool(); +} + +bool _execMpiFunc(faabric::Message& msg) diff --git a/src/mpi/MpiExecutor.cpp b/src/mpi/MpiExecutor.cpp new file mode 100644 index 000000000..a812ad653 --- /dev/null +++ b/src/mpi/MpiExecutor.cpp @@ -0,0 +1,64 @@ +#include + +namespace faabric::executor { +MpiExecutor::MpiExecutor() + : FaabricExecutor(0){}; + +bool MpiExecutor::doExecute(faabric::Message& msg) +{ + auto logger = faabric::util::getLogger(); + // TODO sanity check mpi message + this->m_executingCall = &msg; + + // Execute MPI code + bool success = _execMpiFunc(&msg); + return success; +} + +void MpiExecutor::postFinishCall() +{ + auto logger = faabric::util::getLogger(); + logger->debug("Finished MPI execution."); + // TODO close everything +} + +faabric::Message* MpiExecutor::getExecutingCall() +{ + return this->m_executingCall; +} + +SingletonPool() + : FaabricPool(1) + , scheduler(faabric::scheduler::getScheduler()) +{ + auto logger = faabric::util::getLogger(); + auto conf = faabric::util::getSystemConfig(); + + // Ensure we can ping both redis instances + faabric::redis::Redis::getQueue().ping(); + faabric::redis::Redis::getState().ping(); + + // Add host to the list of global sets and print configuration + this->scheduler.addHostToGlobalSet(); + conf.print(); +} + +void SingletonPool::startPool() +{ + // Start singleton thread pool + this->startThreadPool(); + this->startStateServer(); + this->startFunctionCallServer(); + this->endpoint.start(); +} + +~SingletonPool() +{ + auto logger = faabric::util::getLogger(); + + logger->debug("Destructor for singleton pool."); + // scheduler.clear(); + this->shutdown(); + // TODO finish endpoint +} +} diff --git a/src/native-mpi/mpi.cpp b/src/native-mpi/mpi.cpp index 46fb33b76..ae0acd5ed 100644 --- a/src/native-mpi/mpi.cpp +++ b/src/native-mpi/mpi.cpp @@ -18,10 +18,10 @@ int MPI_Init(int* argc, char*** argv) faabric::Message* call = getExecutingCall(); if (call->mpirank() <= 0) { - logger->debug("S - MPI_Init (create) {} {}", a, b); + logger->debug("S - MPI_Init (create) {} {}"); executingContext.createWorld(*call); } else { - logger->debug("S - MPI_Init (join) {} {}", a, b); + logger->debug("S - MPI_Init (join) {} {}"); executingContext.joinWorld(*call); } From 5ae234da2e8dabe020e250f9ef9aa590d092bf73 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Mon, 11 Jan 2021 10:07:26 +0000 Subject: [PATCH 08/27] adding a different target for the native mpi build, and renamed for directory and in-file coherence --- CMakeLists.txt | 3 +- examples/CMakeLists.txt | 1 + include/faabric/mpi-native/MpiExecutor.h | 44 ++++++++++++++++++++++++ src/mpi-native/CMakeLists.txt | 40 +++++++++++++++++++++ src/{mpi => mpi-native}/MpiExecutor.cpp | 2 +- src/{native-mpi => mpi-native}/mpi.cpp | 6 ++++ src/native-mpi/CMakeLists.txt | 8 ----- 7 files changed, 93 insertions(+), 11 deletions(-) create mode 100644 include/faabric/mpi-native/MpiExecutor.h create mode 100644 src/mpi-native/CMakeLists.txt rename src/{mpi => mpi-native}/MpiExecutor.cpp (97%) rename src/{native-mpi => mpi-native}/mpi.cpp (92%) delete mode 100644 src/native-mpi/CMakeLists.txt diff --git a/CMakeLists.txt b/CMakeLists.txt index 010c1cd64..c8141456f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -56,7 +56,7 @@ endfunction() add_subdirectory(src/endpoint) add_subdirectory(src/executor) add_subdirectory(src/mpi) -add_subdirectory(src/native-mpi) +add_subdirectory(src/mpi-native) add_subdirectory(src/proto) add_subdirectory(src/redis) add_subdirectory(src/scheduler) @@ -76,7 +76,6 @@ add_library(faabric faabric.cpp $ $ - $ $ $ $ diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index d994ce9b1..20060dc88 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -17,6 +17,7 @@ function(add_example example_name) target_link_libraries(${example_name} faabric faabricmpi + faabricmpi_native protobuf pthread pistache diff --git a/include/faabric/mpi-native/MpiExecutor.h b/include/faabric/mpi-native/MpiExecutor.h new file mode 100644 index 000000000..9662aa0a9 --- /dev/null +++ b/include/faabric/mpi-native/MpiExecutor.h @@ -0,0 +1,44 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace faabric::executor { +class MpiExecutor final : public faabric::executor::FaabricExecutor +{ + public: + explicit MpiExecutor(); + + bool doExecute(faabric::Message& msg) override; + + void postFinishCall() override; + + private: + faabric::Message* m_executingCall; +}; + +class SingletonPool : public faabric::executor::FaabricPool +{ + public: + explicit SingletonPool(); + + void startPool(); + + protected: + std::unique_ptr createExecutor(int threadIdx) override + { + return std::make_unique(); + } + + private: + faabric::scheduler::Scheduler& scheduler; + faabric::endpoint::FaabricEndpoint endpoint; +}; + +faabric::Message* getExecutingCall(); +} + +bool _execMpiFunc(const faabric::Message* msg); diff --git a/src/mpi-native/CMakeLists.txt b/src/mpi-native/CMakeLists.txt new file mode 100644 index 000000000..a255a69c6 --- /dev/null +++ b/src/mpi-native/CMakeLists.txt @@ -0,0 +1,40 @@ +cmake_minimum_required(VERSION 3.13.0) +project(faabricmpi_native) + +# ----------------------------------------------- +# This library must support being compiled on its own +# ----------------------------------------------- + +message(STATUS "Faabric native MPI implementation") + +set(FAABRIC_INCLUDE_DIR ${CMAKE_CURRENT_LIST_DIR}/../../include) + +include_directories(${FAABRIC_INCLUDE_DIR}) + +file(GLOB MPI_NATIVE_HEADERS "${FAABRIC_INCLUDE_DIR}/faabric/mpi-native/*.h") + +set(LIB_FILES + mpi_native.cpp + MpiExecutor.cpp + ${MPI_NATIVE_HEADERS} + ) + +if(BUILD_SHARED_LIBS) + add_library(faabricmpi_native SHARED ${LIB_FILES}) +else() + add_library(faabricmpi_native STATIC ${LIB_FILES}) +endif() + +set_target_properties(faabricmpi_native + PROPERTIES PUBLIC_HEADER "${MPI_NATIVE_HEADERS}" +) + +target_link_libraries(faabricmpi_native + faabric + faabricmpi + /build/faabric/install/lib/libpistache.so +) + +install(TARGETS faabricmpi_native + PUBLIC_HEADER DESTINATION ${CMAKE_INSTALL_PREFIX}/include/faabric/mpi-native +) diff --git a/src/mpi/MpiExecutor.cpp b/src/mpi-native/MpiExecutor.cpp similarity index 97% rename from src/mpi/MpiExecutor.cpp rename to src/mpi-native/MpiExecutor.cpp index a812ad653..00ee3adc1 100644 --- a/src/mpi/MpiExecutor.cpp +++ b/src/mpi-native/MpiExecutor.cpp @@ -1,4 +1,4 @@ -#include +#include namespace faabric::executor { MpiExecutor::MpiExecutor() diff --git a/src/native-mpi/mpi.cpp b/src/mpi-native/mpi.cpp similarity index 92% rename from src/native-mpi/mpi.cpp rename to src/mpi-native/mpi.cpp index ae0acd5ed..7699b6a1a 100644 --- a/src/native-mpi/mpi.cpp +++ b/src/mpi-native/mpi.cpp @@ -1,7 +1,13 @@ #include +#include + #include +#include + #include +using namespace faabric::executor; + static thread_local faabric::scheduler::MpiContext executingContext; faabric::scheduler::MpiWorld& getExecutingWorld() diff --git a/src/native-mpi/CMakeLists.txt b/src/native-mpi/CMakeLists.txt deleted file mode 100644 index 31a421719..000000000 --- a/src/native-mpi/CMakeLists.txt +++ /dev/null @@ -1,8 +0,0 @@ -set(LIB_FILES - mpi.cpp - ${FAABRIC_INCLUDE_DIR}/faabric/mpi/mpi.h - ) - -faabric_lib(mpi "${LIB_FILES}") - -#target_link_libraries(mpi) From 0e3bd0825541748a87d7d08b4296e286346d722e Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Wed, 13 Jan 2021 06:25:23 +0000 Subject: [PATCH 09/27] updates to the executor to support being linked as a dynamic library --- include/faabric/executor/FaabricExecutor.h | 2 + include/faabric/mpi/MpiExecutor.h | 53 ---------------------- src/executor/FaabricExecutor.cpp | 5 ++ src/mpi-native/MpiExecutor.cpp | 34 ++++++-------- src/mpi-native/{mpi.cpp => mpi_native.cpp} | 7 ++- 5 files changed, 28 insertions(+), 73 deletions(-) delete mode 100644 include/faabric/mpi/MpiExecutor.h rename src/mpi-native/{mpi.cpp => mpi_native.cpp} (92%) diff --git a/include/faabric/executor/FaabricExecutor.h b/include/faabric/executor/FaabricExecutor.h index 31f3c0244..75b9f51f2 100644 --- a/include/faabric/executor/FaabricExecutor.h +++ b/include/faabric/executor/FaabricExecutor.h @@ -40,6 +40,8 @@ class FaabricExecutor bool success, const std::string& errorMsg); + virtual void postFinishCall(); + virtual void postFinish(); bool _isBound = false; diff --git a/include/faabric/mpi/MpiExecutor.h b/include/faabric/mpi/MpiExecutor.h deleted file mode 100644 index c6a61f53b..000000000 --- a/include/faabric/mpi/MpiExecutor.h +++ /dev/null @@ -1,53 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include - -namespace faabric::executor { -class MpiExecutor final : public faabric::executor::FaabricExecutor -{ - public: - explicit MpiExecutor(); - - bool doExecute(faabric::Message& msg) override; - - void postFinishCall() override; - - faabric::Message* getExecutingCall(); - - private: - faabric::Message* m_executingCall; -}; - -class SingletonPool : public faabric::executor::FaabricPool -{ - public: - explicit SingletonPool(); - - void startPool(); - - protected: - std::unique_ptr createExecutor(int threadIdx) override - { - return std::make_unique(); - } - - private: - faabric::scheduler::Scheduler& scheduler; - faabric::endpoint::FaabricEndpoint endpoint; -}; -} - -bool _execMpiFunc(const faabric::Message* msg); - -#define FAABRIC_MPI_MAIN() -int main() -{ - faabric::executor::SingletonPool p; - p.startPool(); -} - -bool _execMpiFunc(faabric::Message& msg) diff --git a/src/executor/FaabricExecutor.cpp b/src/executor/FaabricExecutor.cpp index ea56346d9..ec9829681 100644 --- a/src/executor/FaabricExecutor.cpp +++ b/src/executor/FaabricExecutor.cpp @@ -91,6 +91,9 @@ void FaabricExecutor::finishCall(faabric::Message& msg, // Increment the execution counter executionCount++; + + // Hook + this->postFinishCall(); } void FaabricExecutor::run() @@ -202,6 +205,8 @@ void FaabricExecutor::preFinishCall(faabric::Message& call, const std::string& errorMsg) {} +void FaabricExecutor::postFinishCall() {} + void FaabricExecutor::postFinish() {} void FaabricExecutor::flush() {} diff --git a/src/mpi-native/MpiExecutor.cpp b/src/mpi-native/MpiExecutor.cpp index 00ee3adc1..08b2146c2 100644 --- a/src/mpi-native/MpiExecutor.cpp +++ b/src/mpi-native/MpiExecutor.cpp @@ -1,4 +1,4 @@ -#include +#include namespace faabric::executor { MpiExecutor::MpiExecutor() @@ -8,7 +8,8 @@ bool MpiExecutor::doExecute(faabric::Message& msg) { auto logger = faabric::util::getLogger(); // TODO sanity check mpi message - this->m_executingCall = &msg; + //faabric::Message& faabric::executor::executingCall = &msg; + faabric::executor::executingCall = &msg; // Execute MPI code bool success = _execMpiFunc(&msg); @@ -19,15 +20,10 @@ void MpiExecutor::postFinishCall() { auto logger = faabric::util::getLogger(); logger->debug("Finished MPI execution."); - // TODO close everything + // TODO shutdown everything } -faabric::Message* MpiExecutor::getExecutingCall() -{ - return this->m_executingCall; -} - -SingletonPool() +SingletonPool::SingletonPool() : FaabricPool(1) , scheduler(faabric::scheduler::getScheduler()) { @@ -43,16 +39,7 @@ SingletonPool() conf.print(); } -void SingletonPool::startPool() -{ - // Start singleton thread pool - this->startThreadPool(); - this->startStateServer(); - this->startFunctionCallServer(); - this->endpoint.start(); -} - -~SingletonPool() +SingletonPool::~SingletonPool() { auto logger = faabric::util::getLogger(); @@ -61,4 +48,13 @@ void SingletonPool::startPool() this->shutdown(); // TODO finish endpoint } + +void SingletonPool::startPool() +{ + // Start singleton thread pool + this->startThreadPool(); + this->startStateServer(); + this->startFunctionCallServer(); + this->endpoint.start(); +} } diff --git a/src/mpi-native/mpi.cpp b/src/mpi-native/mpi_native.cpp similarity index 92% rename from src/mpi-native/mpi.cpp rename to src/mpi-native/mpi_native.cpp index 7699b6a1a..ef1cf4e95 100644 --- a/src/mpi-native/mpi.cpp +++ b/src/mpi-native/mpi_native.cpp @@ -1,5 +1,5 @@ #include -#include +#include #include #include @@ -10,6 +10,11 @@ using namespace faabric::executor; static thread_local faabric::scheduler::MpiContext executingContext; +faabric::Message* getExecutingCall() +{ + return faabric::executor::executingCall; +} + faabric::scheduler::MpiWorld& getExecutingWorld() { int worldId = executingContext.getWorldId(); From b70c9d8e3c1ddc21b1b4ea99c45e08cb7d7a71c4 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Wed, 13 Jan 2021 14:55:11 +0000 Subject: [PATCH 10/27] added option to run endpoint thread in background --- examples/mpi_helloworld.cpp | 99 +++++------------------------ include/faabric/endpoint/Endpoint.h | 7 +- src/endpoint/Endpoint.cpp | 17 ++++- 3 files changed, 36 insertions(+), 87 deletions(-) diff --git a/examples/mpi_helloworld.cpp b/examples/mpi_helloworld.cpp index 13ec94070..e71659903 100644 --- a/examples/mpi_helloworld.cpp +++ b/examples/mpi_helloworld.cpp @@ -1,94 +1,27 @@ -#include -#include -#include -#include #include #include -#include +#include #include -#if 0 -#define FAABRIC_MPI_MAIN() - using namespace faabric::executor; - - bool _execMpiFunc(const faabric::Message* msg); - - class _Executor final : public FaabricExecutor - { - public: - explicit _Executor() - : FaabricExecutor(0) - {} - - void setExecutingCall(faabric::Message* msg) { - this->m_executingCall = msg; - } - - bool doExecute(faabric::Message& msg) override { - setExecutingCall(&msg); - return _execMpiFunc(&msg); - } - - bool postFinishCall() override { - auto logger = faabric::util::getLogger(); - logger->debug("Finished MPI execution."); - } - - faabric::Message* getExecutingCall() { return m_executingCall; } - - private: - faabric::Message* m_executingCall; - }; - class _SingletonPool : public FaabricPool - { - public: - explicit _SingletonPool() - : FaabricPool(1) - {} - - std::unique_ptr createExecutor(int threadIdx) override - { - return std::make_unique<_Executor>(); - } - }; - int main(int argc, char** argv) - { - auto logger = faabric::util::getLogger(); - faabric::scheduler::Scheduler& scheduler = faabric::scheduler::getScheduler(); - auto conf = faabric::util::getSystemConfig(); - faabric::endpoint::FaabricEndpoint endpoint; - - // Add host to the global set - scheduler.addHostToGlobalSet(); - - // Print current configuration - conf.print(); - - // Start the thread pool (nThreads == 1) and the state and function - // call servers. - _SingletonPool p; - p.startThreadPool(); - p.startStateServer(); - p.startFunctionCallServer(); - endpoint.start(); - - // Shutdown - scheduler.clear(); - p.shutdown(); - } - - bool _execMpiFunc(const faabric::Message* msg) -#endif - -using namespace faabric::executor; - -FAABRIC_MPI_MAIN() +int main() { auto logger = faabric::util::getLogger(); + auto& scheduler = faabric::scheduler::getScheduler(); + + //auto mpiFunc = _execMpiFunc; + faabric::executor::SingletonPool p; + p.startPool(); + + // Send message to bootstrap execution + faabric::Message msg = faabric::util::messageFactory("mpi", "exec"); + msg.set_mpiworldsize(1); + logger->debug("Sending msg: {}/{}", msg.user(), msg.function()); + scheduler.callFunction(msg); + sleep(3); + logger->info("Hello world from Faabric MPI Main!"); - logger->info("this is our executing call {}", msg->user()); MPI_Init(NULL, NULL); @@ -96,7 +29,7 @@ FAABRIC_MPI_MAIN() MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &worldSize); - sleep(2); + logger->info("Hello faabric from process {} of {}", rank + 1, worldSize); MPI_Finalize(); diff --git a/include/faabric/endpoint/Endpoint.h b/include/faabric/endpoint/Endpoint.h index 62448b13b..a7e809294 100644 --- a/include/faabric/endpoint/Endpoint.h +++ b/include/faabric/endpoint/Endpoint.h @@ -5,6 +5,8 @@ #include #include +#include + namespace faabric::endpoint { class Endpoint { @@ -13,12 +15,15 @@ class Endpoint Endpoint(int port, int threadCount); - void start(); + void start(bool background = false); + + void doStart(); virtual std::shared_ptr getHandler() = 0; private: int port = faabric::util::getSystemConfig().endpointPort; int threadCount = faabric::util::getSystemConfig().endpointNumThreads; + std::thread servingThread; }; } diff --git a/src/endpoint/Endpoint.cpp b/src/endpoint/Endpoint.cpp index b007287f2..6d74b0eda 100644 --- a/src/endpoint/Endpoint.cpp +++ b/src/endpoint/Endpoint.cpp @@ -11,11 +11,22 @@ Endpoint::Endpoint(int portIn, int threadCountIn) , threadCount(threadCountIn) {} -void Endpoint::start() +void Endpoint::start(bool background) { - const std::shared_ptr& logger = faabric::util::getLogger(); + auto logger = faabric::util::getLogger(); - logger->info("Starting HTTP endpoint"); + if (background) { + logger->debug("Starting HTTP endpoint in background thread"); + servingThread = std::thread([this] { doStart(); }); + } else { + logger->debug("Starting HTTP endpoint in this thread"); + doStart(); + } +} + +void Endpoint::doStart() +{ + auto logger = faabric::util::getLogger(); // Set up signal handler sigset_t signals; From 37919c0759efc3f51662c2c402934648c50d1a4c Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Wed, 13 Jan 2021 14:56:21 +0000 Subject: [PATCH 11/27] helloworld working fine with one process --- include/faabric/mpi-native/MpiExecutor.h | 11 ++++------- src/mpi-native/MpiExecutor.cpp | 17 +++++++++++------ src/mpi-native/mpi_native.cpp | 12 +++++++----- src/scheduler/MpiContext.cpp | 1 - tasks/examples.py | 2 +- 5 files changed, 23 insertions(+), 20 deletions(-) diff --git a/include/faabric/mpi-native/MpiExecutor.h b/include/faabric/mpi-native/MpiExecutor.h index 9662aa0a9..7ef1d1480 100644 --- a/include/faabric/mpi-native/MpiExecutor.h +++ b/include/faabric/mpi-native/MpiExecutor.h @@ -15,15 +15,14 @@ class MpiExecutor final : public faabric::executor::FaabricExecutor bool doExecute(faabric::Message& msg) override; void postFinishCall() override; - - private: - faabric::Message* m_executingCall; }; class SingletonPool : public faabric::executor::FaabricPool { public: - explicit SingletonPool(); + SingletonPool(); + + ~SingletonPool(); void startPool(); @@ -38,7 +37,5 @@ class SingletonPool : public faabric::executor::FaabricPool faabric::endpoint::FaabricEndpoint endpoint; }; -faabric::Message* getExecutingCall(); +extern faabric::Message* executingCall; } - -bool _execMpiFunc(const faabric::Message* msg); diff --git a/src/mpi-native/MpiExecutor.cpp b/src/mpi-native/MpiExecutor.cpp index 08b2146c2..e35ff4148 100644 --- a/src/mpi-native/MpiExecutor.cpp +++ b/src/mpi-native/MpiExecutor.cpp @@ -1,19 +1,24 @@ #include +#include + namespace faabric::executor { +faabric::Message* executingCall; +thread_local std::function mpiFunc; + MpiExecutor::MpiExecutor() : FaabricExecutor(0){}; bool MpiExecutor::doExecute(faabric::Message& msg) { auto logger = faabric::util::getLogger(); - // TODO sanity check mpi message - //faabric::Message& faabric::executor::executingCall = &msg; + faabric::executor::executingCall = &msg; - // Execute MPI code - bool success = _execMpiFunc(&msg); - return success; + // TODO delete + sleep(10); + + return true; } void MpiExecutor::postFinishCall() @@ -55,6 +60,6 @@ void SingletonPool::startPool() this->startThreadPool(); this->startStateServer(); this->startFunctionCallServer(); - this->endpoint.start(); + this->endpoint.start(true); } } diff --git a/src/mpi-native/mpi_native.cpp b/src/mpi-native/mpi_native.cpp index ef1cf4e95..3b8359ab5 100644 --- a/src/mpi-native/mpi_native.cpp +++ b/src/mpi-native/mpi_native.cpp @@ -8,7 +8,7 @@ using namespace faabric::executor; -static thread_local faabric::scheduler::MpiContext executingContext; +static faabric::scheduler::MpiContext executingContext; faabric::Message* getExecutingCall() { @@ -29,10 +29,10 @@ int MPI_Init(int* argc, char*** argv) faabric::Message* call = getExecutingCall(); if (call->mpirank() <= 0) { - logger->debug("S - MPI_Init (create) {} {}"); + logger->debug("S - MPI_Init (create)"); executingContext.createWorld(*call); } else { - logger->debug("S - MPI_Init (join) {} {}"); + logger->debug("S - MPI_Init (join)"); executingContext.joinWorld(*call); } @@ -48,7 +48,8 @@ int MPI_Comm_rank(MPI_Comm comm, int* rank) auto logger = faabric::util::getLogger(); logger->debug("MPI_Comm_rank"); - *rank = 1337; + faabric::Message* call = getExecutingCall(); + *rank = call->mpirank(); return MPI_SUCCESS; } @@ -58,7 +59,8 @@ int MPI_Comm_size(MPI_Comm comm, int* size) auto logger = faabric::util::getLogger(); logger->debug("MPI_Comm_size"); - *size = 9337; + faabric::Message* call = getExecutingCall(); + *size = call->mpiworldsize(); return MPI_SUCCESS; } diff --git a/src/scheduler/MpiContext.cpp b/src/scheduler/MpiContext.cpp index 130bcc60a..f786fcdf8 100644 --- a/src/scheduler/MpiContext.cpp +++ b/src/scheduler/MpiContext.cpp @@ -23,7 +23,6 @@ void MpiContext::createWorld(const faabric::Message& msg) } worldId = (int)faabric::util::generateGid(); - logger->debug("Initialising world {}", worldId); // Create the MPI world scheduler::MpiWorldRegistry& reg = scheduler::getMpiWorldRegistry(); diff --git a/tasks/examples.py b/tasks/examples.py index 9098a58aa..33f904a1e 100644 --- a/tasks/examples.py +++ b/tasks/examples.py @@ -103,7 +103,7 @@ def execute_mpi(ctx, example, np=MPI_DEFAULT_WORLD_SIZE): # running /bin/cli.sh whilst having a scaled out client will stop & remove # them as well. scale_up_cmd = "docker-compose up -d --scale {}={} --no-recreate".format( - FAABRIC_SERVICE_NAME, np - 1 + FAABRIC_SERVICE_NAME, np ) run(scale_up_cmd, shell=True, check=True) From b2e2b7153396316a0fcdc0171a83effb69a853f0 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Thu, 14 Jan 2021 11:55:00 +0000 Subject: [PATCH 12/27] helloworld working with >1 processes --- examples/mpi_helloworld.cpp | 39 +++++++++++++++++------- include/faabric/mpi-native/MpiExecutor.h | 3 +- src/mpi-native/MpiExecutor.cpp | 13 +++++--- src/mpi-native/mpi_native.cpp | 4 +-- 4 files changed, 40 insertions(+), 19 deletions(-) diff --git a/examples/mpi_helloworld.cpp b/examples/mpi_helloworld.cpp index e71659903..0bb189352 100644 --- a/examples/mpi_helloworld.cpp +++ b/examples/mpi_helloworld.cpp @@ -1,26 +1,43 @@ #include -#include #include +#include #include -int main() +int main(int argc, char** argv) { auto logger = faabric::util::getLogger(); auto& scheduler = faabric::scheduler::getScheduler(); + auto& conf = faabric::util::getSystemConfig(); + + // Global configuration + conf.maxNodes = 1; + conf.maxNodesPerFunction = 1; + + bool __isRoot; + if (argc < 2) { + logger->debug("Non-root process started"); + __isRoot = false; + } else { + logger->debug("Root process started"); + __isRoot = true; + } + + // Pre-load message to bootstrap execution + if (__isRoot) { + faabric::Message msg = faabric::util::messageFactory("mpi", "exec"); + msg.set_mpiworldsize(2); + scheduler.callFunction(msg); + } - //auto mpiFunc = _execMpiFunc; faabric::executor::SingletonPool p; - p.startPool(); - - // Send message to bootstrap execution - faabric::Message msg = faabric::util::messageFactory("mpi", "exec"); - msg.set_mpiworldsize(1); - logger->debug("Sending msg: {}/{}", msg.user(), msg.function()); - scheduler.callFunction(msg); - sleep(3); + p.startPool(false); +} +bool faabric::executor::mpiFunc() +{ + auto logger = faabric::util::getLogger(); logger->info("Hello world from Faabric MPI Main!"); MPI_Init(NULL, NULL); diff --git a/include/faabric/mpi-native/MpiExecutor.h b/include/faabric/mpi-native/MpiExecutor.h index 7ef1d1480..28032deb5 100644 --- a/include/faabric/mpi-native/MpiExecutor.h +++ b/include/faabric/mpi-native/MpiExecutor.h @@ -24,7 +24,7 @@ class SingletonPool : public faabric::executor::FaabricPool ~SingletonPool(); - void startPool(); + void startPool(bool background); protected: std::unique_ptr createExecutor(int threadIdx) override @@ -38,4 +38,5 @@ class SingletonPool : public faabric::executor::FaabricPool }; extern faabric::Message* executingCall; +extern bool __attribute__((weak)) mpiFunc(); } diff --git a/src/mpi-native/MpiExecutor.cpp b/src/mpi-native/MpiExecutor.cpp index e35ff4148..3006ebdd0 100644 --- a/src/mpi-native/MpiExecutor.cpp +++ b/src/mpi-native/MpiExecutor.cpp @@ -4,7 +4,7 @@ namespace faabric::executor { faabric::Message* executingCall; -thread_local std::function mpiFunc; +bool mpiFunc(); MpiExecutor::MpiExecutor() : FaabricExecutor(0){}; @@ -15,8 +15,11 @@ bool MpiExecutor::doExecute(faabric::Message& msg) faabric::executor::executingCall = &msg; - // TODO delete - sleep(10); + logger->debug("executor binding to call"); + logger->debug("msg: {}/{}", msg.user(), msg.function()); + logger->debug("rank: {}", msg.mpirank()); + + bool success = mpiFunc(); return true; } @@ -54,12 +57,12 @@ SingletonPool::~SingletonPool() // TODO finish endpoint } -void SingletonPool::startPool() +void SingletonPool::startPool(bool background) { // Start singleton thread pool this->startThreadPool(); this->startStateServer(); this->startFunctionCallServer(); - this->endpoint.start(true); + this->endpoint.start(background); } } diff --git a/src/mpi-native/mpi_native.cpp b/src/mpi-native/mpi_native.cpp index 3b8359ab5..8176ddab4 100644 --- a/src/mpi-native/mpi_native.cpp +++ b/src/mpi-native/mpi_native.cpp @@ -59,8 +59,8 @@ int MPI_Comm_size(MPI_Comm comm, int* size) auto logger = faabric::util::getLogger(); logger->debug("MPI_Comm_size"); - faabric::Message* call = getExecutingCall(); - *size = call->mpiworldsize(); + auto& world = getExecutingWorld(); + *size = world.getSize(); return MPI_SUCCESS; } From c22aa51d1e71173642d290edf73be47235965928 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Fri, 15 Jan 2021 06:09:17 +0000 Subject: [PATCH 13/27] finished automation for deployment and cleanup of MPI tasks --- examples/mpi_helloworld.cpp | 10 ++- include/faabric/mpi-native/MpiExecutor.h | 2 +- src/mpi-native/MpiExecutor.cpp | 19 +++-- src/mpi-native/mpi_native.cpp | 3 +- src/scheduler/MpiContext.cpp | 1 + tasks/__init__.py | 2 + tasks/examples.py | 59 +------------ tasks/mpi.py | 102 +++++++++++++++++++++++ 8 files changed, 124 insertions(+), 74 deletions(-) create mode 100644 tasks/mpi.py diff --git a/examples/mpi_helloworld.cpp b/examples/mpi_helloworld.cpp index 0bb189352..f5ae361c2 100644 --- a/examples/mpi_helloworld.cpp +++ b/examples/mpi_helloworld.cpp @@ -3,8 +3,6 @@ #include #include -#include - int main(int argc, char** argv) { auto logger = faabric::util::getLogger(); @@ -16,12 +14,16 @@ int main(int argc, char** argv) conf.maxNodesPerFunction = 1; bool __isRoot; + int __worldSize; if (argc < 2) { logger->debug("Non-root process started"); __isRoot = false; + } else if (argc < 3) { + logger->debug("Root process started without specifying world size!"); } else { - logger->debug("Root process started"); + __worldSize = std::stoi(argv[2]); __isRoot = true; + logger->debug("MPI World Size: {}", __worldSize); } // Pre-load message to bootstrap execution @@ -35,7 +37,7 @@ int main(int argc, char** argv) p.startPool(false); } -bool faabric::executor::mpiFunc() +int faabric::executor::mpiFunc() { auto logger = faabric::util::getLogger(); logger->info("Hello world from Faabric MPI Main!"); diff --git a/include/faabric/mpi-native/MpiExecutor.h b/include/faabric/mpi-native/MpiExecutor.h index 28032deb5..be4819f5e 100644 --- a/include/faabric/mpi-native/MpiExecutor.h +++ b/include/faabric/mpi-native/MpiExecutor.h @@ -38,5 +38,5 @@ class SingletonPool : public faabric::executor::FaabricPool }; extern faabric::Message* executingCall; -extern bool __attribute__((weak)) mpiFunc(); +extern int __attribute__((weak)) mpiFunc(); } diff --git a/src/mpi-native/MpiExecutor.cpp b/src/mpi-native/MpiExecutor.cpp index 3006ebdd0..770c4d9fc 100644 --- a/src/mpi-native/MpiExecutor.cpp +++ b/src/mpi-native/MpiExecutor.cpp @@ -1,10 +1,8 @@ #include -#include - namespace faabric::executor { faabric::Message* executingCall; -bool mpiFunc(); +int mpiFunc(); MpiExecutor::MpiExecutor() : FaabricExecutor(0){}; @@ -15,13 +13,16 @@ bool MpiExecutor::doExecute(faabric::Message& msg) faabric::executor::executingCall = &msg; - logger->debug("executor binding to call"); - logger->debug("msg: {}/{}", msg.user(), msg.function()); - logger->debug("rank: {}", msg.mpirank()); - - bool success = mpiFunc(); + bool success; + int error = mpiFunc(); + if (error) { + logger->error("There was an error running the MPI function"); + success = false; + } else { + success = true; + } - return true; + return success; } void MpiExecutor::postFinishCall() diff --git a/src/mpi-native/mpi_native.cpp b/src/mpi-native/mpi_native.cpp index 8176ddab4..1a9d987f7 100644 --- a/src/mpi-native/mpi_native.cpp +++ b/src/mpi-native/mpi_native.cpp @@ -48,8 +48,7 @@ int MPI_Comm_rank(MPI_Comm comm, int* rank) auto logger = faabric::util::getLogger(); logger->debug("MPI_Comm_rank"); - faabric::Message* call = getExecutingCall(); - *rank = call->mpirank(); + *rank = executingContext.getRank(); return MPI_SUCCESS; } diff --git a/src/scheduler/MpiContext.cpp b/src/scheduler/MpiContext.cpp index f786fcdf8..130bcc60a 100644 --- a/src/scheduler/MpiContext.cpp +++ b/src/scheduler/MpiContext.cpp @@ -23,6 +23,7 @@ void MpiContext::createWorld(const faabric::Message& msg) } worldId = (int)faabric::util::generateGid(); + logger->debug("Initialising world {}", worldId); // Create the MPI world scheduler::MpiWorldRegistry& reg = scheduler::getMpiWorldRegistry(); diff --git a/tasks/__init__.py b/tasks/__init__.py index 6637db9c5..a033b4979 100644 --- a/tasks/__init__.py +++ b/tasks/__init__.py @@ -5,6 +5,7 @@ from . import dev from . import examples from . import git +from . import mpi ns = Collection( call, @@ -12,4 +13,5 @@ dev, examples, git, + mpi, ) diff --git a/tasks/examples.py b/tasks/examples.py index 33f904a1e..999b070d9 100644 --- a/tasks/examples.py +++ b/tasks/examples.py @@ -2,23 +2,18 @@ from shutil import rmtree from os.path import join, exists from copy import copy -from subprocess import run, Popen +from subprocess import run from tasks.util.env import PROJ_ROOT, FAABRIC_INSTALL_PREFIX from invoke import task -import requests - EXAMPLES_DIR = join(PROJ_ROOT, "examples") BUILD_DIR = join(EXAMPLES_DIR, "build") INCLUDE_DIR = "{}/include".format(FAABRIC_INSTALL_PREFIX) LIB_DIR = "{}/lib".format(FAABRIC_INSTALL_PREFIX) -FAABRIC_SERVICE_NAME = "cli" -MPI_DEFAULT_WORLD_SIZE = 1 - @task(default=True) def build(ctx, clean=False): @@ -79,55 +74,3 @@ def execute(ctx, example): ) run(exe_path, env=shell_env, shell=True, check=True) - - -@task -def execute_mpi(ctx, example, np=MPI_DEFAULT_WORLD_SIZE): - """ - Runs an MPI example - """ - exe_path = join(BUILD_DIR, example) - - if not exists(exe_path): - raise RuntimeError("Did not find {} as expected".format(exe_path)) - - shell_env = copy(environ) - shell_env.update( - { - "LD_LIBRARY_PATH": LIB_DIR, - } - ) - - # Start up np - 1 faabric_processes. Note that if there are any other - # scaled out processes they will be stopped and removed. Additionally, - # running /bin/cli.sh whilst having a scaled out client will stop & remove - # them as well. - scale_up_cmd = "docker-compose up -d --scale {}={} --no-recreate".format( - FAABRIC_SERVICE_NAME, np - ) - run(scale_up_cmd, shell=True, check=True) - - run(exe_path, env=shell_env, shell=True, check=True) - - -@task -def invoke_mpi(ctx, host="0.0.0.0", port="8080"): - """ - Invoke MPI function through HTTP handler - """ - # The host:port address must match that of the HTTP Endpoint - url = "http://{}:{}".format(host, port) - msg = {"user": "mpi", "function": "faabric", "mpi_world_size": 1} - response = requests.post(url, json=msg, headers=None) - print(response.text) - - -@task -def terminate_mpi(ctx): - """ - Terminate an MPI execution - """ - # This will stop and remove all containers scaled out (i.e. invoked using - # the --scale flag) and leave those specified in the docker-compose file. - scale_down_cmd = "docker-compose up -d --no-recreate" - run(scale_down_cmd, shell=True, check=True) diff --git a/tasks/mpi.py b/tasks/mpi.py new file mode 100644 index 000000000..75794bd28 --- /dev/null +++ b/tasks/mpi.py @@ -0,0 +1,102 @@ +from copy import copy +from os import environ +from os.path import exists, join +from subprocess import run + +from tasks.util.env import PROJ_ROOT, FAABRIC_INSTALL_PREFIX + +from invoke import task + +EXAMPLES_DIR = join(PROJ_ROOT, "examples") +BUILD_DIR = join(EXAMPLES_DIR, "build") + +INCLUDE_DIR = "{}/include".format(FAABRIC_INSTALL_PREFIX) +LIB_DIR = "{}/lib".format(FAABRIC_INSTALL_PREFIX) + +# As it appears in the docker-compose file +FAABRIC_WORKER_NAME = "cli" +MPI_DEFAULT_WORLD_SIZE = 1 + + +def _find_mpi_hosts(): + """ + Return all deployed MPI hosts. This is, active containers based on the + client image. This won't work if we change the container name used + to deploying replicas. + """ + docker_cmd = "docker ps --format '{{.Names}}'" + + active_containers = run( + docker_cmd, shell=True, check=True, capture_output=True + ) + + # Format output and filter only 'client' containers + mpi_hosts = [ + c + for c in active_containers.stdout.decode("utf-8").split("\n") + if FAABRIC_WORKER_NAME in c + ] + + return mpi_hosts + + +# TODO: eventually move all MPI examples to ./examples/mpi +@task +def execute(ctx, example, clean=False, np=MPI_DEFAULT_WORLD_SIZE): + """ + Runs an MPI example + """ + exe_path = join("./examples/build/", example) + + if not exists(exe_path): + raise RuntimeError("Did not find {} as expected".format(exe_path)) + + shell_env = copy(environ) + shell_env.update( + { + "LD_LIBRARY_PATH": LIB_DIR, + } + ) + + # Start up `np` faabric instances. Note that if there are any other + # scaled out processes they will be stopped and removed. Additionally, + # running /bin/cli.sh whilst having a scaled out cluster will stop & remove + # them as well. + scale_up_cmd = "docker-compose up -d --scale {}={} {}".format( + FAABRIC_WORKER_NAME, + np, + "--force-recreate" if clean else "--no-recreate", + ) + print(scale_up_cmd) + run(scale_up_cmd, shell=True, check=True) + + # Get a list of all hosts + mpi_hosts = _find_mpi_hosts() + + # Run the binary in each host + exe_cmd_fmt = ( + "docker exec -{}t {} bash -c 'export LD_LIBRARY_PATH={}; {} {} {}'" + ) + # Start first all non-root instances in detached mode (-dt) + for host in mpi_hosts[1:]: + exe_cmd = exe_cmd_fmt.format("d", host, LIB_DIR, exe_path, "", "") + print(exe_cmd) + run(exe_cmd, shell=True) + # Start the root process (this will bootstrap the execution) + exe_cmd = ( + exe_cmd_fmt.format("", mpi_hosts[0], LIB_DIR, exe_path, "root", np), + ) + print(exe_cmd) + run(exe_cmd, shell=True) + + +@task +def clean(ctx, force=False): + """ + Clean environment from failed deployments + """ + docker_cmd = "docker-compose up -d {}".format( + "--force-recreate" if force else "--no-recreate" + ) + print(docker_cmd) + run(docker_cmd, shell=True, check=True) From 6f965c9f0660a341ed1d7ce82f957f9be6ad5463 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Fri, 15 Jan 2021 06:39:12 +0000 Subject: [PATCH 14/27] updating the docs --- README.md | 6 ++++++ docs/native_mpi.md | 43 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+) create mode 100644 docs/native_mpi.md diff --git a/README.md b/README.md index 6be73ded1..b86114f81 100644 --- a/README.md +++ b/README.md @@ -110,3 +110,9 @@ inv container.push inv container.build --push ``` +## Additional documentation + +More detail on some key features and implementations can be found below: + +- [Native MPI builds](docs/native_mpi.md) - run native applications against +Faabric's MPI library. diff --git a/docs/native_mpi.md b/docs/native_mpi.md new file mode 100644 index 000000000..26d171632 --- /dev/null +++ b/docs/native_mpi.md @@ -0,0 +1,43 @@ +# Native MPI execution in Faabric + +Faabric supports linking MPI binaries against our custom MPI implementation +used in [Faasm](https://github.com/faasm/faasm). This way, you can test the +compliance of your MPI application with our API (a subset of the standard) +without the burden of cross-compiling to WebAssembly. + +To run native MPI applications you need to: compile the dynamic library, and +slightly modify the original soource code. + +## Compiling the library + +Compilation should be smooth if you are running our recommended containerised +[development environment](../README.md). You may access the container running +`./bin/cli.sh`. + +Then, to compile the library: +```bash +inv dev.cmake --shared +inv dev.cc faabricmpi_native --shared +inv dev.install faabricmpi_native --shared +``` + +## Adapting the source code + +## Running the binary + +To run an example, run this command _outside_ the container: +```bash +# The --clean flag re-creates _all_ containers +inv mpi.execute mpi_helloworld --clean --np 5 +``` + +To clean the cluster and set the development one again: +```bash +inv mpi.clean +``` + +Using the `--force` flag will recreate _all_ containers hence finishing any +sessions you may have open: +```bash +inv mpi.clean --force +``` From 7534a0dfd7c06742d94f43f243ecc4420beb0a57 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Fri, 15 Jan 2021 07:11:32 +0000 Subject: [PATCH 15/27] fixes --- docs/native_mpi.md | 8 ++++++++ examples/mpi_helloworld.cpp | 4 +++- src/mpi-native/MpiExecutor.cpp | 5 ++++- 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/docs/native_mpi.md b/docs/native_mpi.md index 26d171632..72a26690b 100644 --- a/docs/native_mpi.md +++ b/docs/native_mpi.md @@ -41,3 +41,11 @@ sessions you may have open: ```bash inv mpi.clean --force ``` + +## Debugging + +If at some point you reach an unstable state of the cluster, stop it completely +using: +```basg +docker-compose down +``` diff --git a/examples/mpi_helloworld.cpp b/examples/mpi_helloworld.cpp index f5ae361c2..28c13c893 100644 --- a/examples/mpi_helloworld.cpp +++ b/examples/mpi_helloworld.cpp @@ -21,6 +21,7 @@ int main(int argc, char** argv) } else if (argc < 3) { logger->debug("Root process started without specifying world size!"); } else { + logger->debug("Root process started"); __worldSize = std::stoi(argv[2]); __isRoot = true; logger->debug("MPI World Size: {}", __worldSize); @@ -29,7 +30,8 @@ int main(int argc, char** argv) // Pre-load message to bootstrap execution if (__isRoot) { faabric::Message msg = faabric::util::messageFactory("mpi", "exec"); - msg.set_mpiworldsize(2); + //msg.set_mpiworldsize(__worldSize); + msg.set_mpiworldsize(__worldSize); scheduler.callFunction(msg); } diff --git a/src/mpi-native/MpiExecutor.cpp b/src/mpi-native/MpiExecutor.cpp index 770c4d9fc..96ac0f0e3 100644 --- a/src/mpi-native/MpiExecutor.cpp +++ b/src/mpi-native/MpiExecutor.cpp @@ -30,6 +30,8 @@ void MpiExecutor::postFinishCall() auto logger = faabric::util::getLogger(); logger->debug("Finished MPI execution."); // TODO shutdown everything + // Stops function server, call server, and thread pool + // singletonPool->shutdown(); } SingletonPool::SingletonPool() @@ -54,7 +56,8 @@ SingletonPool::~SingletonPool() logger->debug("Destructor for singleton pool."); // scheduler.clear(); - this->shutdown(); + // Stops function server and call server + // this->shutdown(); // TODO finish endpoint } From 448c1f3254c260a517997c77c8d25b99da431824 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Fri, 15 Jan 2021 08:12:23 +0000 Subject: [PATCH 16/27] gracefuly exit --- docs/native_mpi.md | 2 +- examples/mpi_helloworld.cpp | 12 ++++-- include/faabric/endpoint/Endpoint.h | 7 +++- include/faabric/executor/FaabricPool.h | 2 +- include/faabric/mpi-native/MpiExecutor.h | 4 +- src/endpoint/Endpoint.cpp | 47 +++++++++++++++++------- src/executor/FaabricPool.cpp | 13 ++++++- src/mpi-native/MpiExecutor.cpp | 26 ++++++++----- src/mpi-native/mpi_native.cpp | 5 ++- 9 files changed, 82 insertions(+), 36 deletions(-) diff --git a/docs/native_mpi.md b/docs/native_mpi.md index 72a26690b..d4d963c2d 100644 --- a/docs/native_mpi.md +++ b/docs/native_mpi.md @@ -46,6 +46,6 @@ inv mpi.clean --force If at some point you reach an unstable state of the cluster, stop it completely using: -```basg +```bash docker-compose down ``` diff --git a/examples/mpi_helloworld.cpp b/examples/mpi_helloworld.cpp index 28c13c893..8fa0b61d6 100644 --- a/examples/mpi_helloworld.cpp +++ b/examples/mpi_helloworld.cpp @@ -19,7 +19,8 @@ int main(int argc, char** argv) logger->debug("Non-root process started"); __isRoot = false; } else if (argc < 3) { - logger->debug("Root process started without specifying world size!"); + logger->error("Root process started without specifying world size!"); + return 1; } else { logger->debug("Root process started"); __worldSize = std::stoi(argv[2]); @@ -30,13 +31,16 @@ int main(int argc, char** argv) // Pre-load message to bootstrap execution if (__isRoot) { faabric::Message msg = faabric::util::messageFactory("mpi", "exec"); - //msg.set_mpiworldsize(__worldSize); msg.set_mpiworldsize(__worldSize); scheduler.callFunction(msg); } - faabric::executor::SingletonPool p; - p.startPool(false); + { + faabric::executor::SingletonPool p; + p.startPool(false); + } + + return 0; } int faabric::executor::mpiFunc() diff --git a/include/faabric/endpoint/Endpoint.h b/include/faabric/endpoint/Endpoint.h index a7e809294..05ae49cb9 100644 --- a/include/faabric/endpoint/Endpoint.h +++ b/include/faabric/endpoint/Endpoint.h @@ -5,6 +5,7 @@ #include #include +#include #include namespace faabric::endpoint { @@ -15,15 +16,19 @@ class Endpoint Endpoint(int port, int threadCount); - void start(bool background = false); + void start(bool background = true); void doStart(); + void stop(); + virtual std::shared_ptr getHandler() = 0; private: + bool isBackground; int port = faabric::util::getSystemConfig().endpointPort; int threadCount = faabric::util::getSystemConfig().endpointNumThreads; std::thread servingThread; + std::unique_ptr httpEndpoint; }; } diff --git a/include/faabric/executor/FaabricPool.h b/include/faabric/executor/FaabricPool.h index ca9f117ba..5195a91b3 100644 --- a/include/faabric/executor/FaabricPool.h +++ b/include/faabric/executor/FaabricPool.h @@ -14,7 +14,7 @@ class FaabricPool void startFunctionCallServer(); - void startThreadPool(); + void startThreadPool(bool background = true); void startStateServer(); diff --git a/include/faabric/mpi-native/MpiExecutor.h b/include/faabric/mpi-native/MpiExecutor.h index be4819f5e..fc167f151 100644 --- a/include/faabric/mpi-native/MpiExecutor.h +++ b/include/faabric/mpi-native/MpiExecutor.h @@ -15,6 +15,8 @@ class MpiExecutor final : public faabric::executor::FaabricExecutor bool doExecute(faabric::Message& msg) override; void postFinishCall() override; + + void postFinish() override; }; class SingletonPool : public faabric::executor::FaabricPool @@ -33,8 +35,8 @@ class SingletonPool : public faabric::executor::FaabricPool } private: - faabric::scheduler::Scheduler& scheduler; faabric::endpoint::FaabricEndpoint endpoint; + faabric::scheduler::Scheduler& scheduler; }; extern faabric::Message* executingCall; diff --git a/src/endpoint/Endpoint.cpp b/src/endpoint/Endpoint.cpp index 6d74b0eda..f7eb00c1d 100644 --- a/src/endpoint/Endpoint.cpp +++ b/src/endpoint/Endpoint.cpp @@ -14,6 +14,7 @@ Endpoint::Endpoint(int portIn, int threadCountIn) void Endpoint::start(bool background) { auto logger = faabric::util::getLogger(); + this->isBackground = background; if (background) { logger->debug("Starting HTTP endpoint in background thread"); @@ -46,23 +47,41 @@ void Endpoint::doStart() .backlog(256) .flags(Pistache::Tcp::Options::ReuseAddr); - Pistache::Http::Endpoint httpEndpoint(addr); - httpEndpoint.init(opts); + httpEndpoint = std::make_unique(addr); + httpEndpoint->init(opts); // Configure and start endpoint - httpEndpoint.setHandler(this->getHandler()); - httpEndpoint.serveThreaded(); - - // Wait for a signal - logger->info("Awaiting signal"); - int signal = 0; - int status = sigwait(&signals, &signal); - if (status == 0) { - logger->info("Received signal: {}", signal); - } else { - logger->info("Sigwait return value: {}", signal); + httpEndpoint->setHandler(this->getHandler()); + httpEndpoint->serveThreaded(); + + // Wait for a signal if running in foreground (this blocks execution) + if (!isBackground) { + logger->info("Awaiting signal"); + int signal = 0; + int status = sigwait(&signals, &signal); + if (status == 0) { + logger->info("Received signal: {}", signal); + } else { + logger->info("Sigwait return value: {}", signal); + } + + stop(); } +} - httpEndpoint.shutdown(); +void Endpoint::stop() +{ + // Shut down the endpoint + auto logger = faabric::util::getLogger(); + + logger->debug("Shutting down HTTP endpoint"); + this->httpEndpoint->shutdown(); + + if (isBackground) { + logger->debug("Waiting for HTTP endpoint background thread"); + if (servingThread.joinable()) { + servingThread.join(); + } + } } } diff --git a/src/executor/FaabricPool.cpp b/src/executor/FaabricPool.cpp index 829f60438..532d3572d 100644 --- a/src/executor/FaabricPool.cpp +++ b/src/executor/FaabricPool.cpp @@ -39,7 +39,7 @@ void FaabricPool::startStateServer() stateServer.start(); } -void FaabricPool::startThreadPool() +void FaabricPool::startThreadPool(bool background) { const std::shared_ptr& logger = faabric::util::getLogger(); logger->info("Starting worker thread pool"); @@ -65,7 +65,11 @@ void FaabricPool::startThreadPool() createExecutor(threadIdx); // Worker will now run for a long time - executor.get()->run(); + try { + executor.get()->run(); + } catch (faabric::util::ExecutorFinishedException& e) { + this->_shutdown = true; + } // Handle thread finishing threadTokenPool.releaseToken(executor.get()->threadIdx); @@ -82,6 +86,11 @@ void FaabricPool::startThreadPool() // Will die gracefully at this point }); + + // Make the call to startThreadPool blocking + if (!background) { + poolThread.join(); + } } void FaabricPool::reset() diff --git a/src/mpi-native/MpiExecutor.cpp b/src/mpi-native/MpiExecutor.cpp index 96ac0f0e3..f5be4ed27 100644 --- a/src/mpi-native/MpiExecutor.cpp +++ b/src/mpi-native/MpiExecutor.cpp @@ -29,9 +29,12 @@ void MpiExecutor::postFinishCall() { auto logger = faabric::util::getLogger(); logger->debug("Finished MPI execution."); - // TODO shutdown everything - // Stops function server, call server, and thread pool - // singletonPool->shutdown(); + throw faabric::util::ExecutorFinishedException("Finished MPI Execution!"); +} + +void MpiExecutor::postFinish() +{ + throw faabric::util::ExecutorFinishedException("Finished MPI Execution!"); } SingletonPool::SingletonPool() @@ -46,6 +49,7 @@ SingletonPool::SingletonPool() faabric::redis::Redis::getState().ping(); // Add host to the list of global sets and print configuration + logger->debug("Adding host to global set"); this->scheduler.addHostToGlobalSet(); conf.print(); } @@ -54,19 +58,21 @@ SingletonPool::~SingletonPool() { auto logger = faabric::util::getLogger(); - logger->debug("Destructor for singleton pool."); - // scheduler.clear(); - // Stops function server and call server - // this->shutdown(); - // TODO finish endpoint + logger->debug("Destructor for singleton pool"); + this->shutdown(); + this->endpoint.stop(); + this->scheduler.shutdown(); } void SingletonPool::startPool(bool background) { + auto logger = faabric::util::getLogger(); + // Start singleton thread pool - this->startThreadPool(); + logger->debug("Starting signleton thread pool"); this->startStateServer(); this->startFunctionCallServer(); - this->endpoint.start(background); + this->endpoint.start(); + this->startThreadPool(false); } } diff --git a/src/mpi-native/mpi_native.cpp b/src/mpi-native/mpi_native.cpp index 1a9d987f7..8dc90f51e 100644 --- a/src/mpi-native/mpi_native.cpp +++ b/src/mpi-native/mpi_native.cpp @@ -1,5 +1,5 @@ -#include #include +#include #include #include @@ -18,7 +18,8 @@ faabric::Message* getExecutingCall() faabric::scheduler::MpiWorld& getExecutingWorld() { int worldId = executingContext.getWorldId(); - faabric::scheduler::MpiWorldRegistry& reg = faabric::scheduler::getMpiWorldRegistry(); + faabric::scheduler::MpiWorldRegistry& reg = + faabric::scheduler::getMpiWorldRegistry(); return reg.getOrInitialiseWorld(*getExecutingCall(), worldId); } From 38cd510e06247ee6fe86bb1731c4a1e395499249 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Fri, 15 Jan 2021 14:17:21 +0000 Subject: [PATCH 17/27] adding build in test file --- .github/workflows/tests.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index dca4e7020..133c6897a 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -105,6 +105,10 @@ jobs: run: inv dev.cc faabric --shared - name: "Install Faabric shared library" run: inv dev.install faabric --shared + - name: "Build MPI native library" + run: inv dev.cc faabricmpi_native --shared + - name: "Install MPI native library" + run: inv dev.install faabricmpi_native --shared - name: "Build examples" run: inv examples - name: "Run example to check" From 8622705af4f33d83902cde2b2bf1df5024f9f3e7 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Fri, 15 Jan 2021 15:33:50 +0000 Subject: [PATCH 18/27] fixes --- examples/mpi_helloworld.cpp | 2 +- include/faabric/mpi-native/MpiExecutor.h | 2 +- src/mpi-native/MpiExecutor.cpp | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/mpi_helloworld.cpp b/examples/mpi_helloworld.cpp index 8fa0b61d6..404dda673 100644 --- a/examples/mpi_helloworld.cpp +++ b/examples/mpi_helloworld.cpp @@ -37,7 +37,7 @@ int main(int argc, char** argv) { faabric::executor::SingletonPool p; - p.startPool(false); + p.startPool(); } return 0; diff --git a/include/faabric/mpi-native/MpiExecutor.h b/include/faabric/mpi-native/MpiExecutor.h index fc167f151..3a845987a 100644 --- a/include/faabric/mpi-native/MpiExecutor.h +++ b/include/faabric/mpi-native/MpiExecutor.h @@ -26,7 +26,7 @@ class SingletonPool : public faabric::executor::FaabricPool ~SingletonPool(); - void startPool(bool background); + void startPool(); protected: std::unique_ptr createExecutor(int threadIdx) override diff --git a/src/mpi-native/MpiExecutor.cpp b/src/mpi-native/MpiExecutor.cpp index f5be4ed27..28ebdd0c3 100644 --- a/src/mpi-native/MpiExecutor.cpp +++ b/src/mpi-native/MpiExecutor.cpp @@ -64,7 +64,7 @@ SingletonPool::~SingletonPool() this->scheduler.shutdown(); } -void SingletonPool::startPool(bool background) +void SingletonPool::startPool() { auto logger = faabric::util::getLogger(); From a7aa748bf47452374a7258da14a2879b69621e3f Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Fri, 15 Jan 2021 15:40:43 +0000 Subject: [PATCH 19/27] adding checks in dev commands --- tasks/dev.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tasks/dev.py b/tasks/dev.py index 375781d8f..9b0b93e77 100644 --- a/tasks/dev.py +++ b/tasks/dev.py @@ -39,7 +39,7 @@ def cmake(ctx, clean=False, shared=False): PROJ_ROOT, ] - run(" ".join(cmd), shell=True, cwd=build_dir) + run(" ".join(cmd), check=True, shell=True, cwd=build_dir) @task @@ -53,6 +53,7 @@ def cc(ctx, target, shared=False): run( "cmake --build . --target {}".format(target), + check=True, cwd=build_dir, shell=True, ) @@ -69,6 +70,7 @@ def install(ctx, target, shared=False): run( "ninja install {}".format(target), + check=True, cwd=build_dir, shell=True, ) From 6f0afcd704a409712de50127f82a31b4ddaf9c8c Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Fri, 15 Jan 2021 15:43:06 +0000 Subject: [PATCH 20/27] addiing link directory as cmake would find pistache --- cmake/ExternalProjects.cmake | 1 + docs/native_mpi.md | 2 +- src/mpi-native/CMakeLists.txt | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/cmake/ExternalProjects.cmake b/cmake/ExternalProjects.cmake index b494689ba..37c4609d2 100644 --- a/cmake/ExternalProjects.cmake +++ b/cmake/ExternalProjects.cmake @@ -30,6 +30,7 @@ set(PROTOC_EXE /usr/local/bin/protoc) set(GRPC_PLUGIN /usr/local/bin/grpc_cpp_plugin) # Pistache +link_directories(${CMAKE_INSTALL_PREFIX}/lib) ExternalProject_Add(pistache_ext GIT_REPOSITORY "https://github.com/oktal/pistache.git" GIT_TAG "2ef937c434810858e05d446e97acbdd6cc1a5a36" diff --git a/docs/native_mpi.md b/docs/native_mpi.md index d4d963c2d..7459c714c 100644 --- a/docs/native_mpi.md +++ b/docs/native_mpi.md @@ -28,7 +28,7 @@ inv dev.install faabricmpi_native --shared To run an example, run this command _outside_ the container: ```bash # The --clean flag re-creates _all_ containers -inv mpi.execute mpi_helloworld --clean --np 5 +inv mpi.execute mpi_helloworld --np 5 --clean ``` To clean the cluster and set the development one again: diff --git a/src/mpi-native/CMakeLists.txt b/src/mpi-native/CMakeLists.txt index a255a69c6..da8ccf47f 100644 --- a/src/mpi-native/CMakeLists.txt +++ b/src/mpi-native/CMakeLists.txt @@ -32,7 +32,7 @@ set_target_properties(faabricmpi_native target_link_libraries(faabricmpi_native faabric faabricmpi - /build/faabric/install/lib/libpistache.so + pistache ) install(TARGETS faabricmpi_native From 1f6e8a3644edc331301b33be3f319eaafd11b464 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Mon, 18 Jan 2021 11:17:35 +0000 Subject: [PATCH 21/27] changes suggested in pr comments --- CMakeLists.txt | 4 +++- cmake/ExternalProjects.cmake | 1 - docker-compose.yml | 1 - docs/native_mpi.md | 2 -- include/faabric/executor/FaabricPool.h | 8 ++++++++ src/executor/FaabricPool.cpp | 4 ++-- src/{mpi-native => mpi_native}/CMakeLists.txt | 0 src/{mpi-native => mpi_native}/MpiExecutor.cpp | 3 ++- src/{mpi-native => mpi_native}/mpi_native.cpp | 0 9 files changed, 15 insertions(+), 8 deletions(-) rename src/{mpi-native => mpi_native}/CMakeLists.txt (100%) rename src/{mpi-native => mpi_native}/MpiExecutor.cpp (95%) rename src/{mpi-native => mpi_native}/mpi_native.cpp (100%) diff --git a/CMakeLists.txt b/CMakeLists.txt index c8141456f..83949444d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -23,6 +23,8 @@ set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin) # External libraries include(cmake/ExternalProjects.cmake) +# TODO change +link_directories(${CMAKE_INSTALL_PREFIX}/lib) # Library funcs function(faabric_lib lib_name lib_deps) @@ -56,7 +58,7 @@ endfunction() add_subdirectory(src/endpoint) add_subdirectory(src/executor) add_subdirectory(src/mpi) -add_subdirectory(src/mpi-native) +add_subdirectory(src/mpi_native) add_subdirectory(src/proto) add_subdirectory(src/redis) add_subdirectory(src/scheduler) diff --git a/cmake/ExternalProjects.cmake b/cmake/ExternalProjects.cmake index 37c4609d2..b494689ba 100644 --- a/cmake/ExternalProjects.cmake +++ b/cmake/ExternalProjects.cmake @@ -30,7 +30,6 @@ set(PROTOC_EXE /usr/local/bin/protoc) set(GRPC_PLUGIN /usr/local/bin/grpc_cpp_plugin) # Pistache -link_directories(${CMAKE_INSTALL_PREFIX}/lib) ExternalProject_Add(pistache_ext GIT_REPOSITORY "https://github.com/oktal/pistache.git" GIT_TAG "2ef937c434810858e05d446e97acbdd6cc1a5a36" diff --git a/docker-compose.yml b/docker-compose.yml index 0ebb817e3..bb1dbba16 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,7 +9,6 @@ services: volumes: - /var/run/docker.sock:/var/run/docker.sock - /usr/bin/docker:/usr/bin/docker - - /usr/local/bin/docker-compose:/usr/bin/docker-compose - .:/code/faabric - ./build:/build/faabric working_dir: /code/faabric diff --git a/docs/native_mpi.md b/docs/native_mpi.md index 7459c714c..7a3f2d24d 100644 --- a/docs/native_mpi.md +++ b/docs/native_mpi.md @@ -21,8 +21,6 @@ inv dev.cc faabricmpi_native --shared inv dev.install faabricmpi_native --shared ``` -## Adapting the source code - ## Running the binary To run an example, run this command _outside_ the container: diff --git a/include/faabric/executor/FaabricPool.h b/include/faabric/executor/FaabricPool.h index 5195a91b3..9da4eaa15 100644 --- a/include/faabric/executor/FaabricPool.h +++ b/include/faabric/executor/FaabricPool.h @@ -42,6 +42,14 @@ class FaabricPool std::thread poolThread; std::vector poolThreads; }; + +class ExecutorPoolFinishedException : public faabric::util::FaabricException +{ + public: + explicit ExecutorPoolFinishedException(std::string message) + : FaabricException(std::move(message)) + {} +}; } // Macro for quickly defining functions diff --git a/src/executor/FaabricPool.cpp b/src/executor/FaabricPool.cpp index 532d3572d..b147a79ec 100644 --- a/src/executor/FaabricPool.cpp +++ b/src/executor/FaabricPool.cpp @@ -67,7 +67,7 @@ void FaabricPool::startThreadPool(bool background) // Worker will now run for a long time try { executor.get()->run(); - } catch (faabric::util::ExecutorFinishedException& e) { + } catch (faabric::executor::ExecutorPoolFinishedException& e) { this->_shutdown = true; } @@ -87,7 +87,7 @@ void FaabricPool::startThreadPool(bool background) // Will die gracefully at this point }); - // Make the call to startThreadPool blocking + // Block if in foreground if (!background) { poolThread.join(); } diff --git a/src/mpi-native/CMakeLists.txt b/src/mpi_native/CMakeLists.txt similarity index 100% rename from src/mpi-native/CMakeLists.txt rename to src/mpi_native/CMakeLists.txt diff --git a/src/mpi-native/MpiExecutor.cpp b/src/mpi_native/MpiExecutor.cpp similarity index 95% rename from src/mpi-native/MpiExecutor.cpp rename to src/mpi_native/MpiExecutor.cpp index 28ebdd0c3..1ff0a0b1c 100644 --- a/src/mpi-native/MpiExecutor.cpp +++ b/src/mpi_native/MpiExecutor.cpp @@ -34,7 +34,8 @@ void MpiExecutor::postFinishCall() void MpiExecutor::postFinish() { - throw faabric::util::ExecutorFinishedException("Finished MPI Execution!"); + throw faabric::executor::ExecutorPoolFinishedException( + "SingletonPool finished"); } SingletonPool::SingletonPool() diff --git a/src/mpi-native/mpi_native.cpp b/src/mpi_native/mpi_native.cpp similarity index 100% rename from src/mpi-native/mpi_native.cpp rename to src/mpi_native/mpi_native.cpp From 4efc512e4c40643303af4a2162e00630f983b0e5 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Mon, 18 Jan 2021 15:17:26 +0000 Subject: [PATCH 22/27] switched deployment method to use custom docker compose file and docker image --- bin/clean_mpi_native.sh | 17 +++++++++++ tasks/mpi.py => bin/mpi_native_dev.py | 28 ++++++++++++----- bin/run_mpi_native.sh | 25 ++++++++++++++++ docker/faabric-mpi-native.dockerfile | 30 +++++++++++++++++++ docker/mpi-native-docker-compose.yml | 34 +++++++++++++++++++++ docker/mpi-native.env | 10 +++++++ docs/native_mpi.md | 43 +++++---------------------- tasks/__init__.py | 2 -- tasks/container.py | 9 ++++++ 9 files changed, 152 insertions(+), 46 deletions(-) create mode 100755 bin/clean_mpi_native.sh rename tasks/mpi.py => bin/mpi_native_dev.py (81%) mode change 100644 => 100755 create mode 100755 bin/run_mpi_native.sh create mode 100644 docker/faabric-mpi-native.dockerfile create mode 100644 docker/mpi-native-docker-compose.yml create mode 100644 docker/mpi-native.env diff --git a/bin/clean_mpi_native.sh b/bin/clean_mpi_native.sh new file mode 100755 index 000000000..4c63d574e --- /dev/null +++ b/bin/clean_mpi_native.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +set -e + +THIS_DIR=$(dirname $(readlink -f $0)) +PROJ_ROOT=${THIS_DIR}/.. + +pushd ${PROJ_ROOT} >> /dev/null + +source ./docker/mpi-native.env + +docker-compose \ + --file ${COMPOSE_FILE} \ + --env-file ${ENV_FILE} \ + down + +popd >> /dev/null diff --git a/tasks/mpi.py b/bin/mpi_native_dev.py old mode 100644 new mode 100755 similarity index 81% rename from tasks/mpi.py rename to bin/mpi_native_dev.py index 75794bd28..0c76a0b25 --- a/tasks/mpi.py +++ b/bin/mpi_native_dev.py @@ -1,11 +1,13 @@ +#!/usr/bin/python3 + +import argparse from copy import copy from os import environ from os.path import exists, join from subprocess import run -from tasks.util.env import PROJ_ROOT, FAABRIC_INSTALL_PREFIX - -from invoke import task +PROJ_ROOT = "/code/faabric" +FAABRIC_INSTALL_PREFIX = "/build/faabric/install" EXAMPLES_DIR = join(PROJ_ROOT, "examples") BUILD_DIR = join(EXAMPLES_DIR, "build") @@ -15,7 +17,6 @@ # As it appears in the docker-compose file FAABRIC_WORKER_NAME = "cli" -MPI_DEFAULT_WORLD_SIZE = 1 def _find_mpi_hosts(): @@ -41,8 +42,7 @@ def _find_mpi_hosts(): # TODO: eventually move all MPI examples to ./examples/mpi -@task -def execute(ctx, example, clean=False, np=MPI_DEFAULT_WORLD_SIZE): +def execute(example, np, clean=False): """ Runs an MPI example """ @@ -90,8 +90,7 @@ def execute(ctx, example, clean=False, np=MPI_DEFAULT_WORLD_SIZE): run(exe_cmd, shell=True) -@task -def clean(ctx, force=False): +def clean(force=False): """ Clean environment from failed deployments """ @@ -100,3 +99,16 @@ def clean(ctx, force=False): ) print(docker_cmd) run(docker_cmd, shell=True, check=True) + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='Run MPI in development.') + parser.add_argument("example", default="mpi_helloworld") + parser.add_argument("--np", type=int, default=3) + parser.add_argument("--clean", action="store_true", default=False) + args = parser.parse_args() + + if args.clean: + clean() + else: + print(args.example, args.np) + execute(args.example, args.np) diff --git a/bin/run_mpi_native.sh b/bin/run_mpi_native.sh new file mode 100755 index 000000000..7550befe0 --- /dev/null +++ b/bin/run_mpi_native.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +set -e + +THIS_DIR=$(dirname $(readlink -f $0)) +PROJ_ROOT=${THIS_DIR}/.. + +pushd ${PROJ_ROOT} >> /dev/null + +source ./docker/mpi-native.env + +# Check for command line arguments +docker-compose \ + --file ${COMPOSE_FILE} \ + --env-file ${ENV_FILE} \ + up -d \ + --scale worker=$((${MPI_WORLD_SIZE} -1)) \ + --force-recreate + +docker-compose \ + --file ${COMPOSE_FILE} \ + --env-file ${ENV_FILE} \ + logs master worker + +popd >> /dev/null diff --git a/docker/faabric-mpi-native.dockerfile b/docker/faabric-mpi-native.dockerfile new file mode 100644 index 000000000..894604854 --- /dev/null +++ b/docker/faabric-mpi-native.dockerfile @@ -0,0 +1,30 @@ +FROM faasm/grpc-root:0.0.16 +ARG FAABRIC_VERSION + +# Note - the version of grpc-root here can be quite behind as it's rebuilt very +# rarely + +# Redis +RUN apt install -y \ + libpython3-dev \ + python3-dev \ + python3-pip \ + python3-venv \ + redis-tools + +# Put the code in place +WORKDIR /code +# RUN git clone -b v${FAABRIC_VERSION} https://github.com/faasm/faabric +RUN git clone -b standalone-mpi https://github.com/csegarragonz/faabric + +WORKDIR /code/faabric + +RUN pip3 install invoke + +# Build MPI native lib +RUN inv dev.cmake --shared +RUN inv dev.cc faabricmpi_native --shared +RUN inv dev.install faabricmpi_native --shared + +# Build examples +RUN inv examples.build diff --git a/docker/mpi-native-docker-compose.yml b/docker/mpi-native-docker-compose.yml new file mode 100644 index 000000000..c6ca1aed1 --- /dev/null +++ b/docker/mpi-native-docker-compose.yml @@ -0,0 +1,34 @@ +version: "3" + +services: + redis: + image: redis + + worker: + image: ${FAABRIC_MPI_NATIVE_IMAGE} + entrypoint: /code/faabric/examples/build/${MPI_EXAMPLE} + working_dir: /code/faabric + privileged: true + environment: + - LD_LIBRARY_PATH=/usr/local/lib:/build/faabric/install/lib + - FUNCTION_STORAGE=local + - LOG_LEVEL=debug + - REDIS_STATE_HOST=redis + - REDIS_QUEUE_HOST=redis + depends_on: + - redis + + master: + image: ${FAABRIC_MPI_NATIVE_IMAGE} + entrypoint: ['/code/faabric/examples/build/${MPI_EXAMPLE}', 'master', '${MPI_WORLD_SIZE}'] + working_dir: /code/faabric + privileged: true + environment: + - LD_LIBRARY_PATH=/usr/local/lib:/build/faabric/install/lib + - FUNCTION_STORAGE=local + - LOG_LEVEL=debug + - REDIS_STATE_HOST=redis + - REDIS_QUEUE_HOST=redis + depends_on: + - redis + - worker diff --git a/docker/mpi-native.env b/docker/mpi-native.env new file mode 100644 index 000000000..99ef838b6 --- /dev/null +++ b/docker/mpi-native.env @@ -0,0 +1,10 @@ +FAABRIC_VERSION=0.0.16 +FAABRIC_MPI_NATIVE_IMAGE=faasm/faabric-mpi-native:0.0.16 +COMPOSE_PROJECT_NAME=faabric-mpi + +COMPOSE_FILE="./docker/mpi-native-docker-compose.yml" +ENV_FILE="./docker/mpi-native.env" + +# Deployment-specific +MPI_WORLD_SIZE=1 +MPI_EXAMPLE=mpi_helloworld diff --git a/docs/native_mpi.md b/docs/native_mpi.md index 7a3f2d24d..1dafa9427 100644 --- a/docs/native_mpi.md +++ b/docs/native_mpi.md @@ -5,45 +5,16 @@ used in [Faasm](https://github.com/faasm/faasm). This way, you can test the compliance of your MPI application with our API (a subset of the standard) without the burden of cross-compiling to WebAssembly. -To run native MPI applications you need to: compile the dynamic library, and -slightly modify the original soource code. - -## Compiling the library - -Compilation should be smooth if you are running our recommended containerised -[development environment](../README.md). You may access the container running -`./bin/cli.sh`. - -Then, to compile the library: -```bash -inv dev.cmake --shared -inv dev.cc faabricmpi_native --shared -inv dev.install faabricmpi_native --shared +To run native MPI applications you need to first modify your binary matching +the examples provided, and then build the worker image running: ``` - -## Running the binary - -To run an example, run this command _outside_ the container: -```bash -# The --clean flag re-creates _all_ containers -inv mpi.execute mpi_helloworld --np 5 --clean -``` - -To clean the cluster and set the development one again: -```bash -inv mpi.clean -``` - -Using the `--force` flag will recreate _all_ containers hence finishing any -sessions you may have open: -```bash -inv mpi.clean --force +inv container.build-mpi-native ``` -## Debugging +Then you may run arbitrary deployments setting the right values in +`docker/mpi-native.env` and running `./bin/run_mpi_native.sh`. -If at some point you reach an unstable state of the cluster, stop it completely -using: +You may remove all stopped and running container images with: ```bash -docker-compose down +docker-compose -f docker/mpi-native-docker-compose.yml --env-file docker/mpi-native.env down ``` diff --git a/tasks/__init__.py b/tasks/__init__.py index a033b4979..6637db9c5 100644 --- a/tasks/__init__.py +++ b/tasks/__init__.py @@ -5,7 +5,6 @@ from . import dev from . import examples from . import git -from . import mpi ns = Collection( call, @@ -13,5 +12,4 @@ dev, examples, git, - mpi, ) diff --git a/tasks/container.py b/tasks/container.py index c9d5f1a85..ed75a05a7 100644 --- a/tasks/container.py +++ b/tasks/container.py @@ -6,6 +6,7 @@ FAABRIC_IMAGE_NAME = "faabric" GRPC_IMAGE_NAME = "grpc-root" +MPI_NATIVE_IMAGE_NAME = "faabric-mpi-native" def _get_docker_tag(img_name): @@ -65,6 +66,14 @@ def build_grpc(ctx, nocache=False, push=False): _do_container_build(GRPC_IMAGE_NAME, nocache=nocache, push=push) +@task +def build_mpi_native(ctx, nocache=False, push=False): + """ + Build current MPI native container + """ + _do_container_build(MPI_NATIVE_IMAGE_NAME, nocache=nocache, push=push) + + @task def push(ctx): """ From 35f34471f41362f93454ded819c1b6664aa53385 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Tue, 19 Jan 2021 07:05:54 +0000 Subject: [PATCH 23/27] reverting changes in endpoint, and update the docs --- bin/mpi_native_dev.py | 114 ---------------------------- docs/native_mpi.md | 2 +- include/faabric/endpoint/Endpoint.h | 12 +-- src/endpoint/Endpoint.cpp | 64 +++++----------- src/mpi_native/MpiExecutor.cpp | 2 - src/mpi_native/mpi_native.cpp | 2 +- 6 files changed, 20 insertions(+), 176 deletions(-) delete mode 100755 bin/mpi_native_dev.py diff --git a/bin/mpi_native_dev.py b/bin/mpi_native_dev.py deleted file mode 100755 index 0c76a0b25..000000000 --- a/bin/mpi_native_dev.py +++ /dev/null @@ -1,114 +0,0 @@ -#!/usr/bin/python3 - -import argparse -from copy import copy -from os import environ -from os.path import exists, join -from subprocess import run - -PROJ_ROOT = "/code/faabric" -FAABRIC_INSTALL_PREFIX = "/build/faabric/install" - -EXAMPLES_DIR = join(PROJ_ROOT, "examples") -BUILD_DIR = join(EXAMPLES_DIR, "build") - -INCLUDE_DIR = "{}/include".format(FAABRIC_INSTALL_PREFIX) -LIB_DIR = "{}/lib".format(FAABRIC_INSTALL_PREFIX) - -# As it appears in the docker-compose file -FAABRIC_WORKER_NAME = "cli" - - -def _find_mpi_hosts(): - """ - Return all deployed MPI hosts. This is, active containers based on the - client image. This won't work if we change the container name used - to deploying replicas. - """ - docker_cmd = "docker ps --format '{{.Names}}'" - - active_containers = run( - docker_cmd, shell=True, check=True, capture_output=True - ) - - # Format output and filter only 'client' containers - mpi_hosts = [ - c - for c in active_containers.stdout.decode("utf-8").split("\n") - if FAABRIC_WORKER_NAME in c - ] - - return mpi_hosts - - -# TODO: eventually move all MPI examples to ./examples/mpi -def execute(example, np, clean=False): - """ - Runs an MPI example - """ - exe_path = join("./examples/build/", example) - - if not exists(exe_path): - raise RuntimeError("Did not find {} as expected".format(exe_path)) - - shell_env = copy(environ) - shell_env.update( - { - "LD_LIBRARY_PATH": LIB_DIR, - } - ) - - # Start up `np` faabric instances. Note that if there are any other - # scaled out processes they will be stopped and removed. Additionally, - # running /bin/cli.sh whilst having a scaled out cluster will stop & remove - # them as well. - scale_up_cmd = "docker-compose up -d --scale {}={} {}".format( - FAABRIC_WORKER_NAME, - np, - "--force-recreate" if clean else "--no-recreate", - ) - print(scale_up_cmd) - run(scale_up_cmd, shell=True, check=True) - - # Get a list of all hosts - mpi_hosts = _find_mpi_hosts() - - # Run the binary in each host - exe_cmd_fmt = ( - "docker exec -{}t {} bash -c 'export LD_LIBRARY_PATH={}; {} {} {}'" - ) - # Start first all non-root instances in detached mode (-dt) - for host in mpi_hosts[1:]: - exe_cmd = exe_cmd_fmt.format("d", host, LIB_DIR, exe_path, "", "") - print(exe_cmd) - run(exe_cmd, shell=True) - # Start the root process (this will bootstrap the execution) - exe_cmd = ( - exe_cmd_fmt.format("", mpi_hosts[0], LIB_DIR, exe_path, "root", np), - ) - print(exe_cmd) - run(exe_cmd, shell=True) - - -def clean(force=False): - """ - Clean environment from failed deployments - """ - docker_cmd = "docker-compose up -d {}".format( - "--force-recreate" if force else "--no-recreate" - ) - print(docker_cmd) - run(docker_cmd, shell=True, check=True) - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description='Run MPI in development.') - parser.add_argument("example", default="mpi_helloworld") - parser.add_argument("--np", type=int, default=3) - parser.add_argument("--clean", action="store_true", default=False) - args = parser.parse_args() - - if args.clean: - clean() - else: - print(args.example, args.np) - execute(args.example, args.np) diff --git a/docs/native_mpi.md b/docs/native_mpi.md index 1dafa9427..a87c62bc1 100644 --- a/docs/native_mpi.md +++ b/docs/native_mpi.md @@ -16,5 +16,5 @@ Then you may run arbitrary deployments setting the right values in You may remove all stopped and running container images with: ```bash -docker-compose -f docker/mpi-native-docker-compose.yml --env-file docker/mpi-native.env down +./bin/clean_mpi_native.sh ``` diff --git a/include/faabric/endpoint/Endpoint.h b/include/faabric/endpoint/Endpoint.h index 05ae49cb9..62448b13b 100644 --- a/include/faabric/endpoint/Endpoint.h +++ b/include/faabric/endpoint/Endpoint.h @@ -5,9 +5,6 @@ #include #include -#include -#include - namespace faabric::endpoint { class Endpoint { @@ -16,19 +13,12 @@ class Endpoint Endpoint(int port, int threadCount); - void start(bool background = true); - - void doStart(); - - void stop(); + void start(); virtual std::shared_ptr getHandler() = 0; private: - bool isBackground; int port = faabric::util::getSystemConfig().endpointPort; int threadCount = faabric::util::getSystemConfig().endpointNumThreads; - std::thread servingThread; - std::unique_ptr httpEndpoint; }; } diff --git a/src/endpoint/Endpoint.cpp b/src/endpoint/Endpoint.cpp index f7eb00c1d..b007287f2 100644 --- a/src/endpoint/Endpoint.cpp +++ b/src/endpoint/Endpoint.cpp @@ -11,23 +11,11 @@ Endpoint::Endpoint(int portIn, int threadCountIn) , threadCount(threadCountIn) {} -void Endpoint::start(bool background) +void Endpoint::start() { - auto logger = faabric::util::getLogger(); - this->isBackground = background; + const std::shared_ptr& logger = faabric::util::getLogger(); - if (background) { - logger->debug("Starting HTTP endpoint in background thread"); - servingThread = std::thread([this] { doStart(); }); - } else { - logger->debug("Starting HTTP endpoint in this thread"); - doStart(); - } -} - -void Endpoint::doStart() -{ - auto logger = faabric::util::getLogger(); + logger->info("Starting HTTP endpoint"); // Set up signal handler sigset_t signals; @@ -47,41 +35,23 @@ void Endpoint::doStart() .backlog(256) .flags(Pistache::Tcp::Options::ReuseAddr); - httpEndpoint = std::make_unique(addr); - httpEndpoint->init(opts); + Pistache::Http::Endpoint httpEndpoint(addr); + httpEndpoint.init(opts); // Configure and start endpoint - httpEndpoint->setHandler(this->getHandler()); - httpEndpoint->serveThreaded(); - - // Wait for a signal if running in foreground (this blocks execution) - if (!isBackground) { - logger->info("Awaiting signal"); - int signal = 0; - int status = sigwait(&signals, &signal); - if (status == 0) { - logger->info("Received signal: {}", signal); - } else { - logger->info("Sigwait return value: {}", signal); - } - - stop(); + httpEndpoint.setHandler(this->getHandler()); + httpEndpoint.serveThreaded(); + + // Wait for a signal + logger->info("Awaiting signal"); + int signal = 0; + int status = sigwait(&signals, &signal); + if (status == 0) { + logger->info("Received signal: {}", signal); + } else { + logger->info("Sigwait return value: {}", signal); } -} - -void Endpoint::stop() -{ - // Shut down the endpoint - auto logger = faabric::util::getLogger(); - - logger->debug("Shutting down HTTP endpoint"); - this->httpEndpoint->shutdown(); - if (isBackground) { - logger->debug("Waiting for HTTP endpoint background thread"); - if (servingThread.joinable()) { - servingThread.join(); - } - } + httpEndpoint.shutdown(); } } diff --git a/src/mpi_native/MpiExecutor.cpp b/src/mpi_native/MpiExecutor.cpp index 1ff0a0b1c..d3c243521 100644 --- a/src/mpi_native/MpiExecutor.cpp +++ b/src/mpi_native/MpiExecutor.cpp @@ -61,7 +61,6 @@ SingletonPool::~SingletonPool() logger->debug("Destructor for singleton pool"); this->shutdown(); - this->endpoint.stop(); this->scheduler.shutdown(); } @@ -73,7 +72,6 @@ void SingletonPool::startPool() logger->debug("Starting signleton thread pool"); this->startStateServer(); this->startFunctionCallServer(); - this->endpoint.start(); this->startThreadPool(false); } } diff --git a/src/mpi_native/mpi_native.cpp b/src/mpi_native/mpi_native.cpp index 8dc90f51e..4026f0046 100644 --- a/src/mpi_native/mpi_native.cpp +++ b/src/mpi_native/mpi_native.cpp @@ -59,7 +59,7 @@ int MPI_Comm_size(MPI_Comm comm, int* size) auto logger = faabric::util::getLogger(); logger->debug("MPI_Comm_size"); - auto& world = getExecutingWorld(); + faabric::scheduler::MpiWorld& world = getExecutingWorld(); *size = world.getSize(); return MPI_SUCCESS; From 2bd711f2c99979ce416216f94f20f91c060968ba Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Tue, 19 Jan 2021 08:06:10 +0000 Subject: [PATCH 24/27] moving to top-level directory to encapsulate things --- bin/clean_mpi_native.sh => mpi-native/clean.sh | 2 +- {docker => mpi-native}/faabric-mpi-native.dockerfile | 0 {docker => mpi-native}/mpi-native-docker-compose.yml | 0 {docker => mpi-native}/mpi-native.env | 6 +++--- bin/run_mpi_native.sh => mpi-native/run.sh | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) rename bin/clean_mpi_native.sh => mpi-native/clean.sh (86%) rename {docker => mpi-native}/faabric-mpi-native.dockerfile (100%) rename {docker => mpi-native}/mpi-native-docker-compose.yml (100%) rename {docker => mpi-native}/mpi-native.env (58%) rename bin/run_mpi_native.sh => mpi-native/run.sh (92%) diff --git a/bin/clean_mpi_native.sh b/mpi-native/clean.sh similarity index 86% rename from bin/clean_mpi_native.sh rename to mpi-native/clean.sh index 4c63d574e..413984e82 100755 --- a/bin/clean_mpi_native.sh +++ b/mpi-native/clean.sh @@ -7,7 +7,7 @@ PROJ_ROOT=${THIS_DIR}/.. pushd ${PROJ_ROOT} >> /dev/null -source ./docker/mpi-native.env +source ./mpi-native/mpi-native.env docker-compose \ --file ${COMPOSE_FILE} \ diff --git a/docker/faabric-mpi-native.dockerfile b/mpi-native/faabric-mpi-native.dockerfile similarity index 100% rename from docker/faabric-mpi-native.dockerfile rename to mpi-native/faabric-mpi-native.dockerfile diff --git a/docker/mpi-native-docker-compose.yml b/mpi-native/mpi-native-docker-compose.yml similarity index 100% rename from docker/mpi-native-docker-compose.yml rename to mpi-native/mpi-native-docker-compose.yml diff --git a/docker/mpi-native.env b/mpi-native/mpi-native.env similarity index 58% rename from docker/mpi-native.env rename to mpi-native/mpi-native.env index 99ef838b6..d3532f053 100644 --- a/docker/mpi-native.env +++ b/mpi-native/mpi-native.env @@ -2,9 +2,9 @@ FAABRIC_VERSION=0.0.16 FAABRIC_MPI_NATIVE_IMAGE=faasm/faabric-mpi-native:0.0.16 COMPOSE_PROJECT_NAME=faabric-mpi -COMPOSE_FILE="./docker/mpi-native-docker-compose.yml" -ENV_FILE="./docker/mpi-native.env" +COMPOSE_FILE="./mpi-native/mpi-native-docker-compose.yml" +ENV_FILE="./mpi-native/mpi-native.env" # Deployment-specific -MPI_WORLD_SIZE=1 +MPI_WORLD_SIZE=3 MPI_EXAMPLE=mpi_helloworld diff --git a/bin/run_mpi_native.sh b/mpi-native/run.sh similarity index 92% rename from bin/run_mpi_native.sh rename to mpi-native/run.sh index 7550befe0..3fa4fb343 100755 --- a/bin/run_mpi_native.sh +++ b/mpi-native/run.sh @@ -7,7 +7,7 @@ PROJ_ROOT=${THIS_DIR}/.. pushd ${PROJ_ROOT} >> /dev/null -source ./docker/mpi-native.env +source ./mpi-native/mpi-native.env # Check for command line arguments docker-compose \ From 1ecd8f4b6bc638722f372bd35d49cde3c0c0f788 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Wed, 20 Jan 2021 06:15:44 +0000 Subject: [PATCH 25/27] modified cmake and added new check in actions --- .github/workflows/tests.yml | 20 +++++++++++++++++++ CMakeLists.txt | 4 +--- cmake/ExternalProjects.cmake | 8 +++++++- .../faabric-mpi-native.dockerfile | 3 +-- docs/native_mpi.md | 4 ++-- mpi-native/build.sh | 17 ++++++++++++++++ tasks/container.py | 8 ++++++++ 7 files changed, 56 insertions(+), 8 deletions(-) rename {mpi-native => docker}/faabric-mpi-native.dockerfile (78%) create mode 100755 mpi-native/build.sh diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 133c6897a..bdd03ac5e 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -113,3 +113,23 @@ jobs: run: inv examples - name: "Run example to check" run: inv examples.execute check + + mpi_native: + if: github.event.pull_request.draft == false + runs-on: ubuntu-latest + env: + HOST_TYPE: ci + REDIS_QUEUE_HOST: redis + REDIS_STATE_HOST: redis + services: + redis: + image: redis + steps: + # --- Code update --- + - name: "Fetch ref" + uses: actions/checkout@v2 + # --- Examples --- + - name: "Build docker compose images" + run: ./mpi-native/build.sh + - name: "Run a sample example" + run: ./mpi-native/run.sh diff --git a/CMakeLists.txt b/CMakeLists.txt index 83949444d..dc60aad39 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -23,8 +23,6 @@ set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin) # External libraries include(cmake/ExternalProjects.cmake) -# TODO change -link_directories(${CMAKE_INSTALL_PREFIX}/lib) # Library funcs function(faabric_lib lib_name lib_deps) @@ -85,7 +83,7 @@ add_library(faabric $ ) -add_dependencies(faabric pistache_ext spdlog_ext) +add_dependencies(faabric pistache spdlog_ext) target_link_libraries(faabric ${Protobuf_LIBRARIES} diff --git a/cmake/ExternalProjects.cmake b/cmake/ExternalProjects.cmake index b494689ba..f7518b6d3 100644 --- a/cmake/ExternalProjects.cmake +++ b/cmake/ExternalProjects.cmake @@ -31,12 +31,18 @@ set(GRPC_PLUGIN /usr/local/bin/grpc_cpp_plugin) # Pistache ExternalProject_Add(pistache_ext - GIT_REPOSITORY "https://github.com/oktal/pistache.git" + GIT_REPOSITORY "https://github.com/pistacheio/pistache.git" GIT_TAG "2ef937c434810858e05d446e97acbdd6cc1a5a36" CMAKE_CACHE_ARGS "-DCMAKE_INSTALL_PREFIX:STRING=${CMAKE_INSTALL_PREFIX}" + BUILD_BYPRODUCTS ${CMAKE_INSTALL_PREFIX}/lib/libpistache.so ) ExternalProject_Get_Property(pistache_ext SOURCE_DIR) set(PISTACHE_INCLUDE_DIR ${SOURCE_DIR}/include) +add_library(pistache SHARED IMPORTED) +add_dependencies(pistache pistache_ext) +set_target_properties(pistache + PROPERTIES IMPORTED_LOCATION ${CMAKE_INSTALL_PREFIX}/lib/libpistache.so +) # RapidJSON ExternalProject_Add(rapidjson_ext diff --git a/mpi-native/faabric-mpi-native.dockerfile b/docker/faabric-mpi-native.dockerfile similarity index 78% rename from mpi-native/faabric-mpi-native.dockerfile rename to docker/faabric-mpi-native.dockerfile index 894604854..1f8091f99 100644 --- a/mpi-native/faabric-mpi-native.dockerfile +++ b/docker/faabric-mpi-native.dockerfile @@ -14,8 +14,7 @@ RUN apt install -y \ # Put the code in place WORKDIR /code -# RUN git clone -b v${FAABRIC_VERSION} https://github.com/faasm/faabric -RUN git clone -b standalone-mpi https://github.com/csegarragonz/faabric +RUN git clone -b v${FAABRIC_VERSION} https://github.com/faasm/faabric WORKDIR /code/faabric diff --git a/docs/native_mpi.md b/docs/native_mpi.md index a87c62bc1..ad6037e3a 100644 --- a/docs/native_mpi.md +++ b/docs/native_mpi.md @@ -12,9 +12,9 @@ inv container.build-mpi-native ``` Then you may run arbitrary deployments setting the right values in -`docker/mpi-native.env` and running `./bin/run_mpi_native.sh`. +`mpi-native/mpi-native.env` and running `./mpi-native/run.sh`. You may remove all stopped and running container images with: ```bash -./bin/clean_mpi_native.sh +./mpi-native/clean.sh ``` diff --git a/mpi-native/build.sh b/mpi-native/build.sh new file mode 100755 index 000000000..59dbac754 --- /dev/null +++ b/mpi-native/build.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +set -e + +THIS_DIR=$(dirname $(readlink -f $0)) +PROJ_ROOT=${THIS_DIR}/.. + +pushd ${PROJ_ROOT} >> /dev/null + +source ./mpi-native/mpi-native.env + +docker-compose \ + --file ${COMPOSE_FILE} \ + --env-file ${ENV_FILE} \ + build + +popd >> /dev/null diff --git a/tasks/container.py b/tasks/container.py index ed75a05a7..c84743ac0 100644 --- a/tasks/container.py +++ b/tasks/container.py @@ -88,3 +88,11 @@ def push_grpc(ctx): Push current version of gRPC container """ _do_push(GRPC_IMAGE_NAME) + + +@task +def push_mpi_native(ctx): + """ + Push current version of gRPC container + """ + _do_push(MPI_NATIVE_IMAGE_NAME) From ca25dade09909e4431dd7f14cd62480103d2d78d Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Wed, 20 Jan 2021 09:27:01 +0000 Subject: [PATCH 26/27] building container each new release --- .github/workflows/release.yml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 424ccbb64..9cefc5cad 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -32,3 +32,11 @@ jobs: context: . tags: faasm/faabric:${{ env.TAG_VERSION }} build-args: FAABRIC_VERSION=${{ env.TAG_VERSION }} + - name: "Build native MPI container" + uses: docker/build-push-action@v2 + with: + push: true + file: docker/faabric-mpi-native.dockerfile + context: . + tags: faasm/faabric-mpi-native:${{ env.TAG_VERSION }} + build-args: FAABRIC_VERSION=${{ env.TAG_VERSION }} From 027af25f5a80a0173a1eda49f4c0232e2d4799f9 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Thu, 21 Jan 2021 15:56:20 +0000 Subject: [PATCH 27/27] tiny typo --- tasks/container.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tasks/container.py b/tasks/container.py index c84743ac0..4518a5c64 100644 --- a/tasks/container.py +++ b/tasks/container.py @@ -69,7 +69,7 @@ def build_grpc(ctx, nocache=False, push=False): @task def build_mpi_native(ctx, nocache=False, push=False): """ - Build current MPI native container + Build current native MPI container """ _do_container_build(MPI_NATIVE_IMAGE_NAME, nocache=nocache, push=push) @@ -93,6 +93,6 @@ def push_grpc(ctx): @task def push_mpi_native(ctx): """ - Push current version of gRPC container + Push current version of the native MPI container """ _do_push(MPI_NATIVE_IMAGE_NAME)