Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove remaining mpi code #35

Merged
merged 3 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 1 addition & 23 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,6 @@ endif()
set (CMAKE_CXX_STANDARD 17)
set (CMAKE_CXX_STANDARD_REQUIRED ON)

### MPI
if (CMAKE_ENABLE_MPI_SUPPORT)
find_package(MPI REQUIRED)
set(MSWEEP_MPI_SUPPORT 1)
include_directories(MPI_C_INCLUDE_DIRS)
if (CMAKE_MPI_MAX_PROCESSES)
set(MSWEEP_MPI_MAX_PROCESSES ${CMAKE_MPI_MAX_PROCESSES})
else()
set(MSWEEP_MPI_MAX_PROCESSES 1024)
endif()
else()
set(MSWEEP_MPI_SUPPORT 0)
endif()

set(LIBRARY_OUTPUT_PATH ${CMAKE_CURRENT_BINARY_DIR}/lib)
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/bin)

Expand All @@ -103,8 +89,6 @@ string(TIMESTAMP _BUILD_TIMESTAMP)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/include/mSWEEP_version.h.in ${CMAKE_CURRENT_BINARY_DIR}/include/mSWEEP_version.h @ONLY)
## Configure OpenMP if it supported on the system.
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/include/mSWEEP_openmp_config.hpp.in ${CMAKE_CURRENT_BINARY_DIR}/include/mSWEEP_openmp_config.hpp @ONLY)
## Configure MPI if it's supported on the system.
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/include/mSWEEP_mpi_config.hpp.in ${CMAKE_CURRENT_BINARY_DIR}/include/mSWEEP_mpi_config.hpp @ONLY)

add_executable(mSWEEP ${CMAKE_CURRENT_SOURCE_DIR}/src/mSWEEP.cpp)

Expand Down Expand Up @@ -300,8 +284,7 @@ else()
SOURCE_DIR "${CMAKE_CURRENT_SOURCE_DIR}/external/rcgpar"
BINARY_DIR "${CMAKE_CURRENT_BINARY_DIR}/external/rcgpar"
BUILD_IN_SOURCE 0
CMAKE_ARGS -D CMAKE_ENABLE_MPI_SUPPORT=${MSWEEP_MPI_SUPPORT}
-D CMAKE_SEAMAT_HEADERS=${CMAKE_SEAMAT_HEADERS}
CMAKE_ARGS -D CMAKE_SEAMAT_HEADERS=${CMAKE_SEAMAT_HEADERS}
-D CMAKE_BITMAGIC_HEADERS=${CMAKE_BITMAGIC_HEADERS}
-D CMAKE_LIBTORCH_PATH=${CMAKE_LIBTORCH_PATH}
-D CMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
Expand Down Expand Up @@ -368,11 +351,6 @@ include_directories(
${CMAKE_CURRENT_BINARY_DIR}/include
)

if(MPI_FOUND)
add_dependencies(mSWEEP rcgmpi bigmpi)
target_link_libraries(mSWEEP rcgmpi ${LIBRARY_OUTPUT_PATH}/libbigmpi.a)
endif()

add_library(libmsweep
${CMAKE_CURRENT_SOURCE_DIR}/src/Sample.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/PlainSample.cpp
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ Estimation options:
--no-fit-model Do not estimate the abundances. Useful if only the likelihood matrix is required (default: false).
--max-iters Maximum number of iterations to run the abundance estimation optimizer for (default: 5000).
--tol Optimization terminates when the bound changes by less than the given tolerance (default: 0.000001).
--algorithm Which algorithm to use for abundance estimation (one of rcggpu, emgpu, rcgcpu (original mSWEEP); default: rcggpu).
--algorithm Which algorithm to use for abundance estimation (one of rcggpu, emgpu, rcgcpu (original mSWEEP); default: rcgcpu).
--emprecision Precision to use for the emgpu algorithm (one of float, double; default: double).

Bootstrapping options:
Expand Down
8 changes: 1 addition & 7 deletions include/mSWEEP_log.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include <string>

#include "cxxio.hpp"
#include "mSWEEP_mpi_config.hpp"

namespace mSWEEP {
class Log : public cxxio::Out {
Expand Down Expand Up @@ -43,12 +42,7 @@ class Log : public cxxio::Out {

template <typename T>
Log& operator<<(Log &os, T t) {
int rank = 0;
#if defined(MSWEEP_MPI_SUPPORT) && (MSWEEP_MPI_SUPPORT) == 1
// Only root will log
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
#endif
if (os.verbose && rank == 0) {
if (os.verbose) {
if (os.log_time){
std::chrono::time_point<std::chrono::system_clock> now = std::chrono::system_clock::now();
std::time_t now_t = std::chrono::system_clock::to_time_t(now);
Expand Down
36 changes: 0 additions & 36 deletions include/mSWEEP_mpi_config.hpp.in

This file was deleted.

77 changes: 12 additions & 65 deletions src/mSWEEP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@

#include "mSWEEP_alignment.hpp"

#include "mSWEEP_mpi_config.hpp"
#include "mSWEEP_openmp_config.hpp"
#include "mSWEEP_version.h"

Expand Down Expand Up @@ -125,7 +124,7 @@ void parse_args(int argc, char* argv[], cxxargs::Arguments &args) {
// Tolerance for abundance estimation convergence
args.add_long_argument<double>("tol", "Optimization terminates when the bound changes by less than the given tolerance (default: 0.000001).", (double)0.000001);
// Algorithm to use for abundance estimation
args.add_long_argument<std::string>("algorithm", "Which algorithm to use for abundance estimation (one of rcggpu, emgpu, rcgcpu (original mSWEEP); default: rcggpu).", "rcggpu");
args.add_long_argument<std::string>("algorithm", "Which algorithm to use for abundance estimation (one of rcggpu, emgpu, rcgcpu (original mSWEEP); default: rcgcpu).", "rcgcpu");
// Precision for abundance estimation with emgpu algorithm
args.add_long_argument<std::string>("emprecision", "Precision to use for the emgpu algorithm (one of float, double; default: double).\n\nBootstrapping options:", "double");

Expand Down Expand Up @@ -162,30 +161,20 @@ void parse_args(int argc, char* argv[], cxxargs::Arguments &args) {

void finalize(const std::string &msg, mSWEEP::Log &log, bool abort = false) {
// Set the state of the program so that it can finish correctly:
// - Finalizes (or aborts) any/all MPI processes.
// - Writes a potential message `msg` to the log.
// - Flushes the log (ensure all messages are displayed).
//
// Input:
// `msg`: message to print.
// `log`: logger (see msweep_log.hpp).
// `abort`: terminate MPI (from any process).
//
if (abort != 0) {
#if defined(MSWEEP_MPI_SUPPORT) && (MSWEEP_MPI_SUPPORT) == 1
MPI_Abort(MPI_COMM_WORLD, 1);
#endif
}
#if defined(MSWEEP_MPI_SUPPORT) && (MSWEEP_MPI_SUPPORT) == 1
MPI_Finalize();
#endif
if (!msg.empty())
std::cerr << msg;
log.flush();
}

seamat::DenseMatrix<double> rcg_optl(const cxxargs::Arguments &args, const seamat::Matrix<double> &ll_mat, const std::vector<double> &log_ec_counts, const std::vector<double> &prior_counts, mSWEEP::Log &log) {
// Wrapper for calling rcgpar with omp or mpi depending on config.
// Wrapper for calling rcgpar with omp or gpu depending on config.
//
// Input:
// `args`: commandl line arguments.
Expand All @@ -198,49 +187,29 @@ seamat::DenseMatrix<double> rcg_optl(const cxxargs::Arguments &args, const seama
// `ec_probs`: read-lineage probability matrix. Use rcgpar::mixture_components to
// transform this into the relative abundances vector.
//
std::ofstream of; // Silence output from ranks > 1 with an empty ofstream
#if defined(MSWEEP_MPI_SUPPORT) && (MSWEEP_MPI_SUPPORT) == 1
// MPI parallelization (hybrid with OpenMP if enabled).
int rank;
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
const seamat::DenseMatrix<double> &ec_probs = rcgpar::rcg_optl_mpi(ll_mat, log_ec_counts, prior_counts, args.value<double>("tol"), args.value<size_t>("max-iters"), (rank == 0 && args.value<bool>("verbose") ? log.stream() : of));
return ec_probs;

#else
// Only OpenMP parallelization (if enabled).
std::ofstream of;

if (args.value<std::string>("algorithm") == "rcggpu") {
// Run rcg on CPU or GPU if present
const seamat::DenseMatrix<double> &ec_probs = rcgpar::rcg_optl_torch(ll_mat, log_ec_counts, prior_counts, args.value<double>("tol"), args.value<size_t>("max-iters"), (args.value<bool>("verbose") ? log.stream() : of));
return ec_probs;
} else if (args.value<std::string>("algorithm") == "rcgcpu") {
// Run rcg on CPU, use OpenMP if supported
const seamat::DenseMatrix<double> &ec_probs = rcgpar::rcg_optl_omp(ll_mat, log_ec_counts, prior_counts, args.value<double>("tol"), args.value<size_t>("max-iters"), (args.value<bool>("verbose") ? log.stream() : of));
return ec_probs;
} else {
// Run em on CPU or GPU if present
const seamat::DenseMatrix<double> &ec_probs = rcgpar::em_torch(ll_mat, log_ec_counts, prior_counts, args.value<double>("tol"), args.value<size_t>("max-iters"), (args.value<bool>("verbose") ? log.stream() : of), args.value<std::string>("emprecision"));
return ec_probs;
}
#endif
}

int main (int argc, char *argv[]) {
// mSWEEP executable main

int rank = 0; // If MPI is not supported then we are always on the root process
mSWEEP::Log log(std::cerr, CmdOptionPresent(argv, argv+argc, "--verbose"), false); // logger class from msweep_log.hpp

// Initialize MPI if enabled
#if defined(MSWEEP_MPI_SUPPORT) && (MSWEEP_MPI_SUPPORT) == 1
int rc = MPI_Init(&argc, &argv);
if (rc != MPI_SUCCESS) {
finalize("MPI initialization failed\n\n", log);
return 1;
}
int ntasks;
rc=MPI_Comm_size(MPI_COMM_WORLD,&ntasks);
rc=MPI_Comm_rank(MPI_COMM_WORLD,&rank);
#endif

// Use the cxxargs library to parse the command line arguments.
// Note: if MPI is enabled the arguments are currently parsed to all processes.
cxxargs::Arguments args("mSWEEP", "Usage: mSWEEP --themisto-1 <forwardPseudoalignments> --themisto-2 <reversePseudoalignments> -i <groupIndicatorsFile>");

// Print version when running if `--verbose` is used.
Expand Down Expand Up @@ -290,42 +259,28 @@ int main (int argc, char *argv[]) {
std::unique_ptr<mSWEEP::Reference> reference;
log << "Reading the input files" << '\n';
try {
if (rank == 0) { // Only root reads in data
log << " reading group indicators" << '\n';
cxxio::In indicators(args.value<std::string>('i'));
reference = std::move(mSWEEP::ConstructAdaptiveReference(&indicators.stream(), '\t'));
if (reference->get_n_groupings() > 1) {
log << " read " << reference->get_n_groupings() << " groupings" << '\n';
}
log << " read " << reference->get_n_refs() << " group indicators" << '\n';
}
} catch (std::exception &e) {
finalize("Reading group indicators failed:\n " + std::string(e.what()) + "\nexiting\n", log, true);
return 1;
}

// Estimate abundances with all groupings that were provided
uint16_t n_groupings;
if (rank == 0) { // rank 0
n_groupings = reference->get_n_groupings();
}
#if defined(MSWEEP_MPI_SUPPORT) && (MSWEEP_MPI_SUPPORT) == 1
// Only root process has read in the input.
MPI_Bcast(&n_groupings, 1, MPI_UINT16_T, 0, MPI_COMM_WORLD);
#endif
n_groupings = reference->get_n_groupings();

// Wrapper class for ensuring the outfile names are set consistently and correctly
mSWEEP::OutfileDesignator out(args.value<std::string>('o'), n_groupings, args.value<std::string>("compress"), args.value<int>("compression-level"));

// Estimate abundances with all groupings
for (uint16_t i = 0; i < n_groupings; ++i) {
// Send the number of groups in this grouping from root to all processes
uint16_t n_groups;
if (rank == 0) // rank 0
n_groups = reference->n_groups(i);
#if defined(MSWEEP_MPI_SUPPORT) && (MSWEEP_MPI_SUPPORT) == 1
MPI_Bcast(&n_groups, 1, MPI_UINT16_T, 0, MPI_COMM_WORLD);
#endif
size_t n_groups = reference->n_groups(i);

// `Sample` and its children are classes for storing data from
// the pseudoalignment that are needed or not needed depending on
Expand All @@ -339,9 +294,7 @@ int main (int argc, char *argv[]) {
std::unique_ptr<mSWEEP::Likelihood<double>> log_likelihoods;

// Check if reading likelihood from file.
// In MPI configurations, only root needs to read in the data. Distributing the values
// is handled by the rcgpar::rcg_optl_mpi implementation.
if (rank == 0 && !CmdOptionPresent(argv, argv+argc, "--read-likelihood")) {
if (!CmdOptionPresent(argv, argv+argc, "--read-likelihood")) {
// Start from the pseudoalignments.
// To save memory, the alignment can go out of scope.
// The necessary values are stored in the Sample class.
Expand Down Expand Up @@ -397,7 +350,6 @@ int main (int argc, char *argv[]) {
}

// Initialize Sample depending on how the alignment needs to be processed.
// Note: this is also only used by the root process in MPI configuration.
mSWEEP::ConstructSample(alignment, args.value<size_t>("iters"), args.value<size_t>("bootstrap-count"), args.value<size_t>("seed"), bin_reads, sample);

log.flush();
Expand All @@ -419,7 +371,7 @@ int main (int argc, char *argv[]) {

try {
// Write the likelihood to disk here if it was requested.
if (rank == 0 && (args.value<bool>("write-likelihood") || args.value<bool>("write-likelihood-bitseq")))
if (args.value<bool>("write-likelihood") || args.value<bool>("write-likelihood-bitseq"))
log_likelihoods->write((args.value<bool>("write-likelihood-bitseq") ? "bitseq" : "mSWEEP"), out.likelihoods((args.value<bool>("write-likelihood-bitseq") ? "bitseq" : "mSWEEP")));
} catch (std::exception &e) {
finalize("Writing the likelihood to file failed:\n " + std::string(e.what()) + "\nexiting\n", log, true);
Expand Down Expand Up @@ -462,7 +414,6 @@ int main (int argc, char *argv[]) {
}

// Run binning if requested and write results to files.
if (rank == 0) { // root performs the rest.
// Turn the probs into relative abundances
if (args.value<std::string>("algorithm") == "rcgcpu") {
sample->store_abundances(rcgpar::mixture_components(sample->get_probs(), log_likelihoods->log_counts()));
Expand Down Expand Up @@ -539,7 +490,6 @@ int main (int argc, char *argv[]) {
finalize("Writing the probabilities failed:\n " + std::string(e.what()) + "\nexiting\n", log, true);
return 1;
}
}

// Bootstrap the ec_counts and estimate from the bootstrapped data if required
if (bootstrap_mode) {
Expand All @@ -548,7 +498,6 @@ int main (int argc, char *argv[]) {
// Bootstrap the counts
log << "Bootstrap" << " iter " << k + 1 << "/" << args.value<size_t>("iters") << '\n';
std::vector<double> resampled_counts;
if (rank == 0)
resampled_counts = std::move(static_cast<mSWEEP::BootstrapSample*>(&(*sample))->resample_counts());

// Estimate with the bootstrapped counts
Expand All @@ -559,19 +508,17 @@ int main (int argc, char *argv[]) {
finalize("Bootstrap iteration " + std::to_string(k) + "/" + std::to_string(args.value<size_t>("iters")) + " failed:\n " + std::string(e.what()) + "\nexiting\n", log, true);
return 1;
}
if (rank == 0) {
if (args.value<std::string>("algorithm") == "rcgcpu") {
sample->store_abundances(rcgpar::mixture_components(sample->get_probs(), resampled_counts));
} else {
sample->store_abundances(rcgpar::mixture_components_torch(sample->get_probs(), resampled_counts));
}
}
}
}
}

// Write relative abundances
if (rank == 0 && !args.value<bool>("no-fit-model")) {
if (!args.value<bool>("no-fit-model")) {
try {
if (sample->get_rate_run()) {
const std::vector<double> &log_kld = sample->get_log_klds();
Expand Down