diff --git a/.github/workflows/doxygenate.yaml b/.github/workflows/doxygenate.yaml index 489ec40..b06d10d 100644 --- a/.github/workflows/doxygenate.yaml +++ b/.github/workflows/doxygenate.yaml @@ -26,6 +26,11 @@ jobs: with: args: -O docs/tatami.tag https://tatami-inc.github.io/tatami/tatami.tag + - name: Add subpar tagfile + uses: wei/wget@v1 + with: + args: -O docs/tatami.tag https://ltla.github.io/subpar/subpar.tag + - name: Doxygen Action uses: mattnotmitt/doxygen-action@v1 with: diff --git a/CMakeLists.txt b/CMakeLists.txt index b737e4c..c2267a8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -24,9 +24,10 @@ if(SINGLEPP_FETCH_EXTERN) else() find_package(knncolle_knncolle 2.0.0 CONFIG REQUIRED) find_package(tatami_tatami 3.0.0 CONFIG REQUIRED) + find_package(ltla_subpar 0.1.0 CONFIG REQUIRED) endif() -target_link_libraries(singlepp INTERFACE knncolle::knncolle tatami::tatami) +target_link_libraries(singlepp INTERFACE knncolle::knncolle tatami::tatami ltla::subpar) # Tests if(CMAKE_PROJECT_NAME STREQUAL PROJECT_NAME) diff --git a/cmake/Config.cmake.in b/cmake/Config.cmake.in index 1f263b8..ce857f3 100644 --- a/cmake/Config.cmake.in +++ b/cmake/Config.cmake.in @@ -3,5 +3,6 @@ include(CMakeFindDependencyMacro) find_dependency(knncolle_knncolle 2.0.0 CONFIG REQUIRED) find_dependency(tatami_tatami 3.0.0 CONFIG REQUIRED) +find_dependency(ltla_subpar 0.2.0 CONFIG REQUIRED) include("${CMAKE_CURRENT_LIST_DIR}/singler_singleppTargets.cmake") diff --git a/docs/Doxyfile b/docs/Doxyfile index 0b22440..a9aa25f 100644 --- a/docs/Doxyfile +++ b/docs/Doxyfile @@ -2375,7 +2375,8 @@ SKIP_FUNCTION_MACROS = YES # run, you must also specify the path to the tagfile here. TAGFILES = knncolle.tag=https://knncolle.github.io/knncolle \ - tatami.tag=https://tatami-inc.github.io/tatami + tatami.tag=https://tatami-inc.github.io/tatami \ + subpar.tag=https://ltla.github.io/subpar # When a file name is specified after GENERATE_TAGFILE, doxygen will create a # tag file that is based on the input files it reads. See section "Linking to diff --git a/extern/CMakeLists.txt b/extern/CMakeLists.txt index 35a160e..acd5571 100644 --- a/extern/CMakeLists.txt +++ b/extern/CMakeLists.txt @@ -1,16 +1,24 @@ include(FetchContent) +FetchContent_Declare( + subpar + GIT_REPOSITORY https://github.com/LTLA/subpar + GIT_TAG master +) + + FetchContent_Declare( knncolle - GIT_REPOSITORY https://github.com/LTLA/knncolle + GIT_REPOSITORY https://github.com/knncolle/knncolle GIT_TAG master ) FetchContent_Declare( tatami - GIT_REPOSITORY https://github.com/LTLA/tatami + GIT_REPOSITORY https://github.com/tatami-inc/tatami GIT_TAG master ) +FetchContent_MakeAvailable(subpar) FetchContent_MakeAvailable(knncolle) FetchContent_MakeAvailable(tatami) diff --git a/include/singlepp/annotate_cells_integrated.hpp b/include/singlepp/annotate_cells_integrated.hpp index afc2c25..339fd7d 100644 --- a/include/singlepp/annotate_cells_integrated.hpp +++ b/include/singlepp/annotate_cells_integrated.hpp @@ -160,69 +160,102 @@ void annotate_cells_integrated( { auto NR = test.nrow(); auto nref = trained.markers.size(); + tatami::VectorPtr universe_ptr(tatami::VectorPtr{}, &(trained.universe)); - tatami::parallelize([&](size_t, Index_ start, Index_ len) -> void { - // We perform an indexed extraction, so all subsequent indices - // will refer to indices into this subset (i.e., 'universe'). - tatami::VectorPtr universe_ptr(tatami::VectorPtr{}, &(trained.universe)); - auto wrk = tatami::consecutive_extractor(&test, false, start, len, std::move(universe_ptr)); - std::vector buffer(trained.universe.size()); - - PerReferenceIntegratedWorkspace workspace; - workspace.test_ranked.reserve(NR); - workspace.ref_ranked.reserve(NR); + struct PerThreadWorkspace { + PerThreadWorkspace(size_t universe_size, size_t test_ngenes) : buffer(universe_size) { + workspace.test_ranked.reserve(test_ngenes); + workspace.ref_ranked.reserve(test_ngenes); + test_ranked_full.reserve(test_ngenes); + } - RankedVector test_ranked_full; - test_ranked_full.reserve(NR); std::unordered_set miniverse_tmp; std::vector miniverse; + + RankedVector test_ranked_full; + std::vector buffer; + + PerReferenceIntegratedWorkspace workspace; std::vector all_scores; std::vector reflabels_in_use; + }; + + SINGLEPP_CUSTOM_PARALLEL( + num_threads, + test.ncol(), + [&]() -> PerThreadWorkspace { + return PerThreadWorkspace(trained.universe.size(), NR); + }, + [&](size_t, Index_ start, Index_ len, PerThreadWorkspace& thread_work) -> void { + // We perform an indexed extraction, so all subsequent indices + // will refer to indices into this subset (i.e., 'universe'). + auto mat_work = tatami::consecutive_extractor(&test, false, start, len, universe_ptr); + + for (Index_ i = start, end = start + len; i < end; ++i) { + // Extracting only the markers of the best labels for this cell. + thread_work.miniverse_tmp.clear(); + for (size_t r = 0; r < nref; ++r) { + auto curassigned = assigned[r][i]; + const auto& curmarkers = trained.markers[r][curassigned]; + thread_work.miniverse_tmp.insert(curmarkers.begin(), curmarkers.end()); + } - for (Index_ i = start, end = start + len; i < end; ++i) { - // Extracting only the markers of the best labels for this cell. - miniverse_tmp.clear(); - for (size_t r = 0; r < nref; ++r) { - auto curassigned = assigned[r][i]; - const auto& curmarkers = trained.markers[r][curassigned]; - miniverse_tmp.insert(curmarkers.begin(), curmarkers.end()); - } - - miniverse.clear(); - miniverse.insert(miniverse.end(), miniverse_tmp.begin(), miniverse_tmp.end()); - std::sort(miniverse.begin(), miniverse.end()); // sorting for consistency in floating-point summation within scaled_ranks(). + thread_work.miniverse.clear(); + thread_work.miniverse.insert(thread_work.miniverse.end(), thread_work.miniverse_tmp.begin(), thread_work.miniverse_tmp.end()); + std::sort(thread_work.miniverse.begin(), thread_work.miniverse.end()); // sorting for consistency in floating-point summation within scaled_ranks(). - test_ranked_full.clear(); - auto ptr = wrk->fetch(buffer.data()); - for (auto u : miniverse) { - test_ranked_full.emplace_back(ptr[u], u); - } - std::sort(test_ranked_full.begin(), test_ranked_full.end()); - - // Scanning through each reference and computing the score for the best group. - all_scores.clear(); - workspace.direct_mapping_filled = false; - for (size_t r = 0; r < nref; ++r) { - auto score = compute_single_reference_score_integrated(r, assigned[r][i], test_ranked_full, trained, miniverse, workspace, quantile); - all_scores.push_back(score); - if (scores[r]) { - scores[r][i] = score; + thread_work.test_ranked_full.clear(); + auto ptr = mat_work->fetch(thread_work.buffer.data()); + for (auto u : thread_work.miniverse) { + thread_work.test_ranked_full.emplace_back(ptr[u], u); + } + std::sort(thread_work.test_ranked_full.begin(), thread_work.test_ranked_full.end()); + + // Scanning through each reference and computing the score for the best group. + thread_work.all_scores.clear(); + thread_work.workspace.direct_mapping_filled = false; + for (size_t r = 0; r < nref; ++r) { + auto score = compute_single_reference_score_integrated( + r, + assigned[r][i], + thread_work.test_ranked_full, + trained, + thread_work.miniverse, + thread_work.workspace, + quantile + ); + thread_work.all_scores.push_back(score); + if (scores[r]) { + scores[r][i] = score; + } } - } - std::pair candidate; - if (!fine_tune) { - candidate = find_best_and_delta(all_scores); - } else { - candidate = fine_tune_integrated(i, test_ranked_full, all_scores, trained, assigned, reflabels_in_use, miniverse_tmp, miniverse, workspace, quantile, threshold); - } + std::pair candidate; + if (!fine_tune) { + candidate = find_best_and_delta(thread_work.all_scores); + } else { + candidate = fine_tune_integrated( + i, + thread_work.test_ranked_full, + thread_work.all_scores, + trained, + assigned, + thread_work.reflabels_in_use, + thread_work.miniverse_tmp, + thread_work.miniverse, + thread_work.workspace, + quantile, + threshold + ); + } - best[i] = candidate.first; - if (delta) { - delta[i] = candidate.second; + best[i] = candidate.first; + if (delta) { + delta[i] = candidate.second; + } } } - }, test.ncol(), num_threads); + ); } } diff --git a/include/singlepp/annotate_cells_single.hpp b/include/singlepp/annotate_cells_single.hpp index 12aee4e..e8d1c3c 100644 --- a/include/singlepp/annotate_cells_single.hpp +++ b/include/singlepp/annotate_cells_single.hpp @@ -140,61 +140,75 @@ void annotate_cells_single( std::vector subcopy(subset, subset + num_subset); SubsetSanitizer subsorted(subcopy); - - tatami::parallelize([&](size_t, Index_ start, Index_ length) -> void { - std::vector buffer(num_subset); - tatami::VectorPtr mock_ptr(tatami::VectorPtr{}, &(subsorted.extraction_subset())); - auto wrk = tatami::consecutive_extractor(&test, false, start, length, std::move(mock_ptr)); - - std::vector > > searchers(num_labels); - for (size_t r = 0; r < num_labels; ++r) { - searchers[r] = ref[r].index->initialize(); + tatami::VectorPtr subset_ptr(tatami::VectorPtr{}, &(subsorted.extraction_subset())); + + struct PerThreadWorkspace { + PerThreadWorkspace(size_t num_subset, size_t num_labels, const std::vector >& ref) : + buffer(num_subset), + curscores(num_labels) + { + vec.reserve(num_subset); + searchers.reserve(num_labels); + for (size_t r = 0; r < num_labels; ++r) { + searchers.emplace_back(ref[r].index->initialize()); + } } - std::vector distances; + std::vector buffer; + std::vector > > searchers; + std::vector distances; RankedVector vec; - vec.reserve(num_subset); FineTuneSingle ft; - std::vector curscores(num_labels); - - for (Index_ c = start, end = start + length; c < end; ++c) { - auto ptr = wrk->fetch(buffer.data()); - subsorted.fill_ranks(ptr, vec); - scaled_ranks(vec, buffer.data()); // 'buffer' can be re-used for output here, as all data is already extracted to 'vec'. + std::vector curscores; + }; + + SINGLEPP_CUSTOM_PARALLEL( + num_threads, + test.ncol(), + [&]() -> PerThreadWorkspace { + return PerThreadWorkspace(num_subset, num_labels, ref); + }, + [&](size_t, Index_ start, Index_ length, PerThreadWorkspace& work) -> void { + auto ext = tatami::consecutive_extractor(&test, false, start, length, subset_ptr); + + for (Index_ c = start, end = start + length; c < end; ++c) { + auto ptr = ext->fetch(work.buffer.data()); + subsorted.fill_ranks(ptr, work.vec); + scaled_ranks(work.vec, work.buffer.data()); // 'buffer' can be re-used for output here, as all data is already extracted to 'vec'. + + work.curscores.resize(num_labels); + for (size_t r = 0; r < num_labels; ++r) { + size_t k = search_k[r]; + work.searchers[r]->search(work.buffer.data(), k, NULL, &(work.distances)); + + Float_ last = work.distances[k - 1]; + last = 1 - 2 * last * last; + if (k == 1) { + work.curscores[r] = last; + } else { + Float_ next = work.distances[k - 2]; + next = 1 - 2 * next * next; + work.curscores[r] = coeffs[r].first * next + coeffs[r].second * last; + } - curscores.resize(num_labels); - for (size_t r = 0; r < num_labels; ++r) { - size_t k = search_k[r]; - searchers[r]->search(buffer.data(), k, NULL, &distances); + if (scores[r]) { + scores[r][c] = work.curscores[r]; + } + } - Float_ last = distances[k - 1]; - last = 1 - 2 * last * last; - if (k == 1) { - curscores[r] = last; + std::pair chosen; + if (!fine_tune) { + chosen = find_best_and_delta(work.curscores); } else { - Float_ next = distances[k - 2]; - next = 1 - 2 * next * next; - curscores[r] = coeffs[r].first * next + coeffs[r].second * last; + chosen = work.ft.run(work.vec, ref, markers, work.curscores, quantile, threshold); } - - if (scores[r]) { - scores[r][c] = curscores[r]; + best[c] = chosen.first; + if (delta) { + delta[c] = chosen.second; } } - - std::pair chosen; - if (!fine_tune) { - chosen = find_best_and_delta(curscores); - } else { - chosen = ft.run(vec, ref, markers, curscores, quantile, threshold); - } - best[c] = chosen.first; - if (delta) { - delta[c] = chosen.second; - } } - - }, test.ncol(), num_threads); + ); return; } diff --git a/include/singlepp/build_indices.hpp b/include/singlepp/build_indices.hpp index bc09f02..49a7cf0 100644 --- a/include/singlepp/build_indices.hpp +++ b/include/singlepp/build_indices.hpp @@ -62,62 +62,59 @@ std::vector > build_indices( } SubsetSanitizer subsorter(subset); - - tatami::parallelize([&](size_t, Index_ start, Index_ len) -> void { - std::vector buffer(NR); - tatami::VectorPtr mock_ptr(tatami::VectorPtr{}, &(subsorter.extraction_subset())); - auto wrk = tatami::consecutive_extractor(&ref, false, start, len, std::move(mock_ptr)); - RankedVector ranked(NR); - - for (Index_ c = start, end = start + len; c < end; ++c) { - auto ptr = wrk->fetch(buffer.data()); - subsorter.fill_ranks(ptr, ranked); - - auto curlab = labels[c]; - auto curoff = label_offsets[c]; - auto scaled = nndata[curlab].data() + curoff * NR; // these are already size_t's, so no need to cast. - scaled_ranks(ranked, scaled); - - // Storing as a pair of ints to save space; as long - // as we respect ties, everything should be fine. - auto& stored_ranks = nnrefs[curlab].ranked[curoff]; - stored_ranks.reserve(ranked.size()); - simplify_ranks(ranked, stored_ranks); + tatami::VectorPtr subset_ptr(tatami::VectorPtr{}, &(subsorter.extraction_subset())); + struct PerThreadWorkspace { + PerThreadWorkspace(size_t num_rows) : buffer(num_rows) { + ranked.reserve(num_rows); } - }, ref.ncol(), num_threads); - -#ifndef SINGLEPP_CUSTOM_PARALLEL -#ifdef _OPENMP - #pragma omp parallel num_threads(num_threads) -#endif - { -#else - SINGLEPP_CUSTOM_PARALLEL([&](int, size_t start, size_t len) -> void { -#endif - -#ifndef SINGLEPP_CUSTOM_PARALLEL -#ifdef _OPENMP - #pragma omp for -#endif - for (size_t l = 0; l < nlabels; ++l) { -#else - for (size_t l = start, end = start + len; l < end; ++l) { -#endif - - nnrefs[l].index = builder.build_shared(knncolle::SimpleMatrix(NR, label_count[l], nndata[l].data())); - - // Trying to free the memory as we go along, to offset the copying - // of nndata into the memory store owned by the knncolle index. - nndata[l].clear(); - nndata[l].shrink_to_fit(); - -#ifndef SINGLEPP_CUSTOM_PARALLEL + std::vector buffer; + RankedVector ranked; + }; + + SINGLEPP_CUSTOM_PARALLEL( + num_threads, + ref.ncol(), + [&]() -> PerThreadWorkspace { + return PerThreadWorkspace(NR); + }, + [&](size_t, Index_ start, Index_ len, PerThreadWorkspace& work) -> void { + auto ext = tatami::consecutive_extractor(&ref, false, start, len, subset_ptr); + + for (Index_ c = start, end = start + len; c < end; ++c) { + auto ptr = ext->fetch(work.buffer.data()); + subsorter.fill_ranks(ptr, work.ranked); + + auto curlab = labels[c]; + auto curoff = label_offsets[c]; + auto scaled = nndata[curlab].data() + curoff * NR; // these are already size_t's, so no need to cast. + scaled_ranks(work.ranked, scaled); + + // Storing as a pair of ints to save space; as long + // as we respect ties, everything should be fine. + auto& stored_ranks = nnrefs[curlab].ranked[curoff]; + stored_ranks.reserve(work.ranked.size()); + simplify_ranks(work.ranked, stored_ranks); + } } - } -#else + ); + + SINGLEPP_CUSTOM_PARALLEL( + num_threads, + nlabels, + []() -> bool { + return false; + }, + [&](int, size_t start, size_t len, bool&) -> void { + for (size_t l = start, end = start + len; l < end; ++l) { + nnrefs[l].index = builder.build_shared(knncolle::SimpleMatrix(NR, label_count[l], nndata[l].data())); + + // Trying to free the memory as we go along, to offset the copying + // of nndata into the memory store owned by the knncolle index. + nndata[l].clear(); + nndata[l].shrink_to_fit(); + } } - }, nlabels, num_threads); -#endif + ); return nnrefs; } diff --git a/include/singlepp/choose_classic_markers.hpp b/include/singlepp/choose_classic_markers.hpp index a6a837f..2a2d259 100644 --- a/include/singlepp/choose_classic_markers.hpp +++ b/include/singlepp/choose_classic_markers.hpp @@ -46,6 +46,7 @@ struct ChooseClassicMarkersOptions { /** * Number of threads to use. + * Parallelization is performed using the `SINGLEPP_CUSTOM_PARALLEL` macro function, which defaults to `subpar::parallelize()` if not defined by the user. */ int num_threads = 1; }; @@ -80,9 +81,6 @@ Markers choose_classic_markers( const std::vector& labels, const ChooseClassicMarkersOptions& options) { - /** - * @cond - */ size_t nrefs = representatives.size(); if (nrefs != labels.size()) { throw std::runtime_error("'representatives' and 'labels' should have the same length"); @@ -155,106 +153,97 @@ Markers choose_classic_markers( pairs.insert(pairs.end(), pairs0.begin(), pairs0.end()); // already sorted by the std::set. } - size_t npairs = pairs.size(); - -#ifndef SINGLEPP_CUSTOM_PARALLEL -#ifdef _OPENMP - #pragma omp parallel num_threads(options.num_threads) -#endif - { -#else - SINGLEPP_CUSTOM_PARALLEL([&](int, size_t start, size_t len) -> void { -#endif - - std::vector > sorter(ngenes), sorted_tmp(ngenes); - std::vector rbuffer(ngenes), lbuffer(ngenes); - std::vector > > rworks(nrefs), lworks(nrefs); - -#ifndef SINGLEPP_CUSTOM_PARALLEL -#ifdef _OPENMP - #pragma omp for -#endif - for (size_t p = 0; p < npairs; ++p) { -#else - for (size_t p = start, end = start + len; p < end; ++p) { -#endif + struct PerThreadWorkspace { + PerThreadWorkspace(size_t ngenes, size_t nrefs) : + sorter(ngenes), sorted_tmp(ngenes), + rbuffer(ngenes), lbuffer(ngenes), + rextractors(nrefs), lextractors(nrefs) + {} + + std::vector > sorter, sorted_tmp; + std::vector rbuffer, lbuffer; + std::vector > > rextractors, lextractors; + }; + + SINGLEPP_CUSTOM_PARALLEL( + options.num_threads, + pairs.size(), + [&]() -> PerThreadWorkspace { + return PerThreadWorkspace(ngenes, nrefs); + }, + [&](int, size_t start, size_t len, PerThreadWorkspace& work) -> void { + auto& sorter = work.sorter; + auto& sorted_tmp = work.sorted_tmp; + + for (size_t p = start, end = start + len; p < end; ++p) { + auto curleft = pairs[p].first; + auto curright = pairs[p].second; - auto curleft = pairs[p].first; - auto curright = pairs[p].second; - - for (size_t g = 0; g < ngenes; ++g) { - sorter[g].first = 0; - sorter[g].second = g; - } - - for (size_t i = 0; i < nrefs; ++i) { - const auto& curavail = labels_to_index[i]; - auto lcol = curavail[curleft]; - auto rcol = curavail[curright]; - if (!lcol.first || !rcol.first) { - continue; + for (size_t g = 0; g < ngenes; ++g) { + sorter[g].first = 0; + sorter[g].second = g; } - // Initialize extractors as needed. - auto& lwrk = lworks[i]; - if (!lwrk) { - lwrk = representatives[i]->dense_column(); - } - auto lptr = lwrk->fetch(lcol.second, lbuffer.data()); + for (size_t i = 0; i < nrefs; ++i) { + const auto& curavail = labels_to_index[i]; + auto lcol = curavail[curleft]; + auto rcol = curavail[curright]; + if (!lcol.first || !rcol.first) { + continue; + } - auto& rwrk = rworks[i]; - if (!rwrk) { - rwrk = representatives[i]->dense_column(); - } - auto rptr = rwrk->fetch(rcol.second, rbuffer.data()); + // Initialize extractors as needed. + auto& lext = work.lextractors[i]; + if (!lext) { + lext = representatives[i]->dense_column(); + } + auto lptr = lext->fetch(lcol.second, work.lbuffer.data()); - for (size_t g = 0; g < ngenes; ++g) { - sorter[g].first += lptr[g] - rptr[g]; - } - } + auto& rext = work.rextractors[i]; + if (!rext) { + rext = representatives[i]->dense_column(); + } + auto rptr = rext->fetch(rcol.second, work.rbuffer.data()); - // At flip = 0, we're looking for genes upregulated in right over left, - // as we're sorting on left - right in increasing order. At flip = 1, - // we reverse the signs to we sort on right - left. - for (int flip = 0; flip < 2; ++flip) { - if (flip) { - sorter.swap(sorted_tmp); - for (auto& s : sorter) { - s.first *= -1; + for (size_t g = 0; g < ngenes; ++g) { + sorter[g].first += lptr[g] - rptr[g]; } - } else { - // We do a copy so that the treatment of tries would be the same as if - // we had sorted on the reversed log-fold changes in the first place; - // otherwise the first sort might change the order of ties. - std::copy(sorter.begin(), sorter.end(), sorted_tmp.begin()); } - // partial sort is guaranteed to be stable due to the second index resolving ties. - std::partial_sort(sorter.begin(), sorter.begin() + actual_number, sorter.end()); + // At flip = 0, we're looking for genes upregulated in right over left, + // as we're sorting on left - right in increasing order. At flip = 1, + // we reverse the signs to we sort on right - left. + for (int flip = 0; flip < 2; ++flip) { + if (flip) { + sorter.swap(sorted_tmp); + for (auto& s : sorter) { + s.first *= -1; + } + } else { + // We do a copy so that the treatment of tries would be the same as if + // we had sorted on the reversed log-fold changes in the first place; + // otherwise the first sort might change the order of ties. + std::copy(sorter.begin(), sorter.end(), sorted_tmp.begin()); + } - std::vector stuff; - stuff.reserve(actual_number); - for (int g = 0; g < actual_number && sorter[g].first < 0; ++g) { // only negative values (i.e., positive log-fold changes for our comparison). - stuff.push_back(sorter[g].second); - } + // partial sort is guaranteed to be stable due to the second index resolving ties. + std::partial_sort(sorter.begin(), sorter.begin() + actual_number, sorter.end()); - if (flip) { - output[curleft][curright] = std::move(stuff); - } else { - output[curright][curleft] = std::move(stuff); + std::vector stuff; + stuff.reserve(actual_number); + for (int g = 0; g < actual_number && sorter[g].first < 0; ++g) { // only negative values (i.e., positive log-fold changes for our comparison). + stuff.push_back(sorter[g].second); + } + + if (flip) { + output[curleft][curright] = std::move(stuff); + } else { + output[curright][curleft] = std::move(stuff); + } } } - -#ifndef SINGLEPP_CUSTOM_PARALLEL } - } -#else - } - }, npairs, options.num_threads); -#endif - /** - * @endcond - */ + ); return output; } diff --git a/include/singlepp/classify_integrated.hpp b/include/singlepp/classify_integrated.hpp index ebb775d..fc8398e 100644 --- a/include/singlepp/classify_integrated.hpp +++ b/include/singlepp/classify_integrated.hpp @@ -45,6 +45,7 @@ struct ClassifyIntegratedOptions { /** * Number of threads to use. + * Parallelization is performed using the `SINGLEPP_CUSTOM_PARALLEL` macro function, which defaults to `subpar::parallelize()` if not defined by the user. */ int num_threads = 1; }; diff --git a/include/singlepp/classify_single.hpp b/include/singlepp/classify_single.hpp index 8780e3a..416b4f1 100644 --- a/include/singlepp/classify_single.hpp +++ b/include/singlepp/classify_single.hpp @@ -50,6 +50,7 @@ struct ClassifySingleOptions { /** * Number of threads to use. + * Parallelization is performed using the `SINGLEPP_CUSTOM_PARALLEL` macro function, which defaults to `subpar::parallelize()` if not defined by the user. */ int num_threads = 1; }; diff --git a/include/singlepp/macros.hpp b/include/singlepp/macros.hpp index 8ac1ddd..25a48ce 100644 --- a/include/singlepp/macros.hpp +++ b/include/singlepp/macros.hpp @@ -1,42 +1,9 @@ #ifndef SINGLEPP_MACROS_HPP #define SINGLEPP_MACROS_HPP -/** - * @file macros.hpp - * - * @brief Set common macros used throughout **singlepp**. - * - * @details - * The `SINGLEPP_CUSTOM_PARALLEL` macro can be set to a function that specifies a custom parallelization scheme. - * This function should be a template that accept three arguments: - * - * - `fun`, a lambda that accepts three arguments: - * - `thread`, the thread number. - * - `start`, the index of the first job for this thread. - * - `length`, the number of jobs for this thread. - * - `njobs`, an integer specifying the number of jobs. - * - `nthreads`, an integer specifying the number of threads to use. - * - * The function should split `[0, njobs)` into any number of contiguous, non-overlapping intervals, and call `fun` on each interval, possibly in different threads. - * The details of the splitting and evaluation are left to the discretion of the developer defining the macro. - * The function should only return once all evaluations of `fun` are complete. - * - * If `SINGLEPP_CUSTOM_PARALLEL` is set, the following macros are also set (if they are not already defined): - * - * - `TATAMI_CUSTOM_PARALLEL`, from the [**tatami**](https://ltla.github.io/tatami) library. - * - * This ensures that any custom parallelization scheme is propagated to all of **singlepp**'s dependencies. - * If these libraries are used outside of **singlepp**, some care is required to ensure that the macros are consistently defined through the client library/application; - * otherwise, developers may observe ODR compilation errors. - */ - -// Synchronizing all parallelization schemes. -#ifdef SINGLEPP_CUSTOM_PARALLEL - -#ifndef TATAMI_CUSTOM_PARALLEL -#define TATAMI_CUSTOM_PARALLEL SINGLEPP_CUSTOM_PARALLEL -#endif - +#ifndef SINGLEPP_CUSTOM_PARALLEL +#include "subpar/subpar.hpp" +#define SINGLEPP_CUSTOM_PARALLEL subpar::parallelize #endif #endif diff --git a/include/singlepp/train_integrated.hpp b/include/singlepp/train_integrated.hpp index 8e3493e..873a171 100644 --- a/include/singlepp/train_integrated.hpp +++ b/include/singlepp/train_integrated.hpp @@ -263,6 +263,7 @@ class TrainedIntegrated { struct TrainIntegratedOptions { /** * Number of threads to use. + * Parallelization is performed using the `SINGLEPP_CUSTOM_PARALLEL` macro function, which defaults to `subpar::parallelize()` if not defined by the user. */ int num_threads = 1; }; @@ -273,7 +274,7 @@ struct TrainIntegratedOptions { namespace internal { template -void train_integrated( +void train_integrated_per_reference( size_t ref_i, Input_& curinput, TrainedIntegrated& output, @@ -318,29 +319,42 @@ void train_integrated( } if (!curinput.with_intersection) { - tatami::parallelize([&](size_t, Index_ start, Index_ len) -> void { + // The universe is guaranteed to be sorted and unique, see its derivation + // in internal::train_integrated() below. This means we can directly use it + // for indexed extraction from a tatami::Matrix. + tatami::VectorPtr universe_ptr(tatami::VectorPtr{}, &(output.universe)); + + struct PerThreadWorkspace { + PerThreadWorkspace(size_t universe_size) : buffer(universe_size) { + tmp_ranked.reserve(universe_size); + } internal::RankedVector tmp_ranked; - tmp_ranked.reserve(output.universe.size()); - - // The universe is guaranteed to be sorted and unique, see its derivation above. - // This means we can directly use it for indexed extraction. - tatami::VectorPtr universe_ptr(tatami::VectorPtr{}, &(output.universe)); - auto wrk = tatami::consecutive_extractor(&ref, false, start, len, std::move(universe_ptr)); - std::vector buffer(output.universe.size()); - - for (Index_ c = start, end = start + len; c < end; ++c) { - auto ptr = wrk->fetch(buffer.data()); - - tmp_ranked.clear(); - for (int i = 0, end = output.universe.size(); i < end; ++i, ++ptr) { - tmp_ranked.emplace_back(*ptr, i); + std::vector buffer; + }; + + SINGLEPP_CUSTOM_PARALLEL( + options.num_threads, + ref.ncol(), + [&]() -> PerThreadWorkspace { + return PerThreadWorkspace(output.universe.size()); + }, + [&](int, Index_ start, Index_ len, PerThreadWorkspace& work) -> void { + auto ext = tatami::consecutive_extractor(&ref, false, start, len, universe_ptr); + + for (Index_ c = start, end = start + len; c < end; ++c) { + auto ptr = ext->fetch(work.buffer.data()); + + work.tmp_ranked.clear(); + for (int i = 0, end = output.universe.size(); i < end; ++i, ++ptr) { + work.tmp_ranked.emplace_back(*ptr, i); + } + std::sort(work.tmp_ranked.begin(), work.tmp_ranked.end()); + + auto& final_ranked = cur_ranked[curlab[c]][positions[c]]; + simplify_ranks(work.tmp_ranked, final_ranked); } - std::sort(tmp_ranked.begin(), tmp_ranked.end()); - - auto& final_ranked = cur_ranked[curlab[c]][positions[c]]; - simplify_ranks(tmp_ranked, final_ranked); } - }, ref.ncol(), options.num_threads); + ); } else { output.check_availability[ref_i] = 1; @@ -373,29 +387,40 @@ void train_integrated( for (const auto& p : intersection_in_universe) { to_extract.push_back(p.first); } + tatami::VectorPtr to_extract_ptr(tatami::VectorPtr{}, &to_extract); - tatami::parallelize([&](size_t, Index_ start, Index_ len) -> void { + struct PerThreadWorkspace { + PerThreadWorkspace(size_t extract_size) : buffer(extract_size) { + tmp_ranked.reserve(extract_size); + } internal::RankedVector tmp_ranked; - tmp_ranked.reserve(to_extract.size()); - - std::vector buffer(to_extract.size()); - tatami::VectorPtr to_extract_ptr(tatami::VectorPtr{}, &to_extract); - auto wrk = tatami::consecutive_extractor(&ref, false, start, len, std::move(to_extract_ptr)); - - for (size_t c = start, end = start + len; c < end; ++c) { - auto ptr = wrk->fetch(buffer.data()); - - tmp_ranked.clear(); - for (const auto& p : intersection_in_universe) { - tmp_ranked.emplace_back(*ptr, p.second); // remember, 'p.second' corresponds to indices into the universe. - ++ptr; + std::vector buffer; + }; + + SINGLEPP_CUSTOM_PARALLEL( + options.num_threads, + ref.ncol(), + [&]() -> PerThreadWorkspace { + return PerThreadWorkspace(to_extract.size()); + }, + [&](size_t, Index_ start, Index_ len, PerThreadWorkspace& work) -> void { + auto ext = tatami::consecutive_extractor(&ref, false, start, len, to_extract_ptr); + + for (size_t c = start, end = start + len; c < end; ++c) { + auto ptr = ext->fetch(work.buffer.data()); + + work.tmp_ranked.clear(); + for (const auto& p : intersection_in_universe) { + work.tmp_ranked.emplace_back(*ptr, p.second); // remember, 'p.second' corresponds to indices into the universe. + ++ptr; + } + std::sort(work.tmp_ranked.begin(), work.tmp_ranked.end()); + + auto& final_ranked = cur_ranked[curlab[c]][positions[c]]; + simplify_ranks(work.tmp_ranked, final_ranked); } - std::sort(tmp_ranked.begin(), tmp_ranked.end()); - - auto& final_ranked = cur_ranked[curlab[c]][positions[c]]; - simplify_ranks(tmp_ranked, final_ranked); } - }, ref.ncol(), options.num_threads); + ); } } @@ -425,7 +450,7 @@ TrainedIntegrated train_integrated(Inputs_& inputs, const TrainIntegrate } for (size_t r = 0; r < nrefs; ++r) { - train_integrated(r, inputs[r], output, remap_to_universe, options); + train_integrated_per_reference(r, inputs[r], output, remap_to_universe, options); } return output; diff --git a/include/singlepp/train_single.hpp b/include/singlepp/train_single.hpp index ab9e729..11491af 100644 --- a/include/singlepp/train_single.hpp +++ b/include/singlepp/train_single.hpp @@ -44,6 +44,7 @@ struct TrainSingleOptions { /** * Number of threads to use. + * Parallelization is performed using the `SINGLEPP_CUSTOM_PARALLEL` macro function, which defaults to `subpar::parallelize()` if not defined by the user. */ int num_threads = 1; }; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 470b3ab..d07b873 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -31,45 +31,14 @@ add_executable( src/choose_classic_markers.cpp ) -include(GoogleTest) +target_link_libraries(libtest gtest_main singlepp tatami_stats) +target_compile_options(libtest PRIVATE -Wall -Werror -Wpedantic -Wextra) option(CODE_COVERAGE "Enable coverage testing" OFF) -set(DO_CODE_COVERAGE OFF) if(CODE_COVERAGE AND CMAKE_CXX_COMPILER_ID MATCHES "GNU|Clang") - set(DO_CODE_COVERAGE ON) + target_compile_options(libtest PRIVATE -O0 -g --coverage) + target_link_options(libtest PRIVATE --coverage) endif() -macro(decorate_test name) - target_link_libraries(${name} gtest_main singlepp) - target_compile_options(${name} PRIVATE -Wall -Werror -Wpedantic -Wextra) - if(DO_CODE_COVERAGE) - target_compile_options(${name} PRIVATE -O0 -g --coverage) - target_link_options(${name} PRIVATE --coverage) - endif() - gtest_discover_tests(${name}) -endmacro() - -target_link_libraries(libtest tatami_stats) -decorate_test(libtest) - -# Check that the custom parallelization schemes are properly set up. -add_executable( - cuspartest - src/classify_single.cpp - src/classify_integrated.cpp - src/choose_classic_markers.cpp -) -set_target_properties(cuspartest PROPERTIES COMPILE_DEFINITIONS "TEST_SINGLEPP_CUSTOM_PARALLEL") -decorate_test(cuspartest) - -find_package(OpenMP) -if (OpenMP_FOUND) - add_executable( - omptest - src/classify_single.cpp - src/classify_integrated.cpp - src/choose_classic_markers.cpp - ) - target_link_libraries(omptest OpenMP::OpenMP_CXX) - decorate_test(omptest) -endif() +include(GoogleTest) +gtest_discover_tests(libtest)