Skip to content

Commit

Permalink
Switch to the subpar library for parallelization.
Browse files Browse the repository at this point in the history
Users now have a central place to understand the parallelization scheme, rather
than the logic being present in both tatami::parallelize() and our internal
OpenMP calls. Now, overriding just requires SINGLEPP_CUSTOM_PARALLEL.

This change allows us to remove all the OpenMP macros, as subpar::parallelize
handles all of that under the hood. We also get rid of all the special
parallelization test executables as there's nothing to test anymore.

The expectations for SINGLEPP_CUSTOM_PARALLEL have been changed to follow the
subpar::parallelize() signature. This adds an extra argument to pass along a
thread-specific workspace, which provides some opportunities for optimization
when re-using a thread. Some code reorganization is required as a result.
  • Loading branch information
LTLA committed Aug 26, 2024
1 parent b5738ce commit dc97caf
Show file tree
Hide file tree
Showing 15 changed files with 367 additions and 354 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/doxygenate.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ jobs:
with:
args: -O docs/tatami.tag https://tatami-inc.github.io/tatami/tatami.tag

- name: Add subpar tagfile
uses: wei/wget@v1
with:
args: -O docs/tatami.tag https://ltla.github.io/subpar/subpar.tag

- name: Doxygen Action
uses: mattnotmitt/doxygen-action@v1
with:
Expand Down
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ if(SINGLEPP_FETCH_EXTERN)
else()
find_package(knncolle_knncolle 2.0.0 CONFIG REQUIRED)
find_package(tatami_tatami 3.0.0 CONFIG REQUIRED)
find_package(ltla_subpar 0.1.0 CONFIG REQUIRED)
endif()

target_link_libraries(singlepp INTERFACE knncolle::knncolle tatami::tatami)
target_link_libraries(singlepp INTERFACE knncolle::knncolle tatami::tatami ltla::subpar)

# Tests
if(CMAKE_PROJECT_NAME STREQUAL PROJECT_NAME)
Expand Down
1 change: 1 addition & 0 deletions cmake/Config.cmake.in
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
include(CMakeFindDependencyMacro)
find_dependency(knncolle_knncolle 2.0.0 CONFIG REQUIRED)
find_dependency(tatami_tatami 3.0.0 CONFIG REQUIRED)
find_dependency(ltla_subpar 0.2.0 CONFIG REQUIRED)

include("${CMAKE_CURRENT_LIST_DIR}/singler_singleppTargets.cmake")
3 changes: 2 additions & 1 deletion docs/Doxyfile
Original file line number Diff line number Diff line change
Expand Up @@ -2375,7 +2375,8 @@ SKIP_FUNCTION_MACROS = YES
# run, you must also specify the path to the tagfile here.

TAGFILES = knncolle.tag=https://knncolle.github.io/knncolle \
tatami.tag=https://tatami-inc.github.io/tatami
tatami.tag=https://tatami-inc.github.io/tatami \
subpar.tag=https://ltla.github.io/subpar

# When a file name is specified after GENERATE_TAGFILE, doxygen will create a
# tag file that is based on the input files it reads. See section "Linking to
Expand Down
12 changes: 10 additions & 2 deletions extern/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
include(FetchContent)

FetchContent_Declare(
subpar
GIT_REPOSITORY https://github.com/LTLA/subpar
GIT_TAG master
)


FetchContent_Declare(
knncolle
GIT_REPOSITORY https://github.com/LTLA/knncolle
GIT_REPOSITORY https://github.com/knncolle/knncolle
GIT_TAG master
)

FetchContent_Declare(
tatami
GIT_REPOSITORY https://github.com/LTLA/tatami
GIT_REPOSITORY https://github.com/tatami-inc/tatami
GIT_TAG master
)

FetchContent_MakeAvailable(subpar)
FetchContent_MakeAvailable(knncolle)
FetchContent_MakeAvailable(tatami)
133 changes: 83 additions & 50 deletions include/singlepp/annotate_cells_integrated.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,69 +160,102 @@ void annotate_cells_integrated(
{
auto NR = test.nrow();
auto nref = trained.markers.size();
tatami::VectorPtr<Index_> universe_ptr(tatami::VectorPtr<Index_>{}, &(trained.universe));

tatami::parallelize([&](size_t, Index_ start, Index_ len) -> void {
// We perform an indexed extraction, so all subsequent indices
// will refer to indices into this subset (i.e., 'universe').
tatami::VectorPtr<Index_> universe_ptr(tatami::VectorPtr<Index_>{}, &(trained.universe));
auto wrk = tatami::consecutive_extractor<false>(&test, false, start, len, std::move(universe_ptr));
std::vector<Value_> buffer(trained.universe.size());

PerReferenceIntegratedWorkspace<Index_, Value_, Float_> workspace;
workspace.test_ranked.reserve(NR);
workspace.ref_ranked.reserve(NR);
struct PerThreadWorkspace {
PerThreadWorkspace(size_t universe_size, size_t test_ngenes) : buffer(universe_size) {
workspace.test_ranked.reserve(test_ngenes);
workspace.ref_ranked.reserve(test_ngenes);
test_ranked_full.reserve(test_ngenes);
}

RankedVector<Value_, Index_> test_ranked_full;
test_ranked_full.reserve(NR);
std::unordered_set<Index_> miniverse_tmp;
std::vector<Index_> miniverse;

RankedVector<Value_, Index_> test_ranked_full;
std::vector<Value_> buffer;

PerReferenceIntegratedWorkspace<Index_, Value_, Float_> workspace;
std::vector<Float_> all_scores;
std::vector<RefLabel_> reflabels_in_use;
};

SINGLEPP_CUSTOM_PARALLEL(
num_threads,
test.ncol(),
[&]() -> PerThreadWorkspace {
return PerThreadWorkspace(trained.universe.size(), NR);
},
[&](size_t, Index_ start, Index_ len, PerThreadWorkspace& thread_work) -> void {
// We perform an indexed extraction, so all subsequent indices
// will refer to indices into this subset (i.e., 'universe').
auto mat_work = tatami::consecutive_extractor<false>(&test, false, start, len, universe_ptr);

for (Index_ i = start, end = start + len; i < end; ++i) {
// Extracting only the markers of the best labels for this cell.
thread_work.miniverse_tmp.clear();
for (size_t r = 0; r < nref; ++r) {
auto curassigned = assigned[r][i];
const auto& curmarkers = trained.markers[r][curassigned];
thread_work.miniverse_tmp.insert(curmarkers.begin(), curmarkers.end());
}

for (Index_ i = start, end = start + len; i < end; ++i) {
// Extracting only the markers of the best labels for this cell.
miniverse_tmp.clear();
for (size_t r = 0; r < nref; ++r) {
auto curassigned = assigned[r][i];
const auto& curmarkers = trained.markers[r][curassigned];
miniverse_tmp.insert(curmarkers.begin(), curmarkers.end());
}

miniverse.clear();
miniverse.insert(miniverse.end(), miniverse_tmp.begin(), miniverse_tmp.end());
std::sort(miniverse.begin(), miniverse.end()); // sorting for consistency in floating-point summation within scaled_ranks().
thread_work.miniverse.clear();
thread_work.miniverse.insert(thread_work.miniverse.end(), thread_work.miniverse_tmp.begin(), thread_work.miniverse_tmp.end());
std::sort(thread_work.miniverse.begin(), thread_work.miniverse.end()); // sorting for consistency in floating-point summation within scaled_ranks().

test_ranked_full.clear();
auto ptr = wrk->fetch(buffer.data());
for (auto u : miniverse) {
test_ranked_full.emplace_back(ptr[u], u);
}
std::sort(test_ranked_full.begin(), test_ranked_full.end());

// Scanning through each reference and computing the score for the best group.
all_scores.clear();
workspace.direct_mapping_filled = false;
for (size_t r = 0; r < nref; ++r) {
auto score = compute_single_reference_score_integrated(r, assigned[r][i], test_ranked_full, trained, miniverse, workspace, quantile);
all_scores.push_back(score);
if (scores[r]) {
scores[r][i] = score;
thread_work.test_ranked_full.clear();
auto ptr = mat_work->fetch(thread_work.buffer.data());
for (auto u : thread_work.miniverse) {
thread_work.test_ranked_full.emplace_back(ptr[u], u);
}
std::sort(thread_work.test_ranked_full.begin(), thread_work.test_ranked_full.end());

// Scanning through each reference and computing the score for the best group.
thread_work.all_scores.clear();
thread_work.workspace.direct_mapping_filled = false;
for (size_t r = 0; r < nref; ++r) {
auto score = compute_single_reference_score_integrated(
r,
assigned[r][i],
thread_work.test_ranked_full,
trained,
thread_work.miniverse,
thread_work.workspace,
quantile
);
thread_work.all_scores.push_back(score);
if (scores[r]) {
scores[r][i] = score;
}
}
}

std::pair<Label_, Float_> candidate;
if (!fine_tune) {
candidate = find_best_and_delta<Label_>(all_scores);
} else {
candidate = fine_tune_integrated(i, test_ranked_full, all_scores, trained, assigned, reflabels_in_use, miniverse_tmp, miniverse, workspace, quantile, threshold);
}
std::pair<Label_, Float_> candidate;
if (!fine_tune) {
candidate = find_best_and_delta<Label_>(thread_work.all_scores);
} else {
candidate = fine_tune_integrated(
i,
thread_work.test_ranked_full,
thread_work.all_scores,
trained,
assigned,
thread_work.reflabels_in_use,
thread_work.miniverse_tmp,
thread_work.miniverse,
thread_work.workspace,
quantile,
threshold
);
}

best[i] = candidate.first;
if (delta) {
delta[i] = candidate.second;
best[i] = candidate.first;
if (delta) {
delta[i] = candidate.second;
}
}
}
}, test.ncol(), num_threads);
);
}

}
Expand Down
102 changes: 58 additions & 44 deletions include/singlepp/annotate_cells_single.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,61 +140,75 @@ void annotate_cells_single(

std::vector<Index_> subcopy(subset, subset + num_subset);
SubsetSanitizer<Index_> subsorted(subcopy);

tatami::parallelize([&](size_t, Index_ start, Index_ length) -> void {
std::vector<Value_> buffer(num_subset);
tatami::VectorPtr<Index_> mock_ptr(tatami::VectorPtr<Index_>{}, &(subsorted.extraction_subset()));
auto wrk = tatami::consecutive_extractor<false>(&test, false, start, length, std::move(mock_ptr));

std::vector<std::unique_ptr<knncolle::Searcher<Index_, Float_> > > searchers(num_labels);
for (size_t r = 0; r < num_labels; ++r) {
searchers[r] = ref[r].index->initialize();
tatami::VectorPtr<Index_> subset_ptr(tatami::VectorPtr<Index_>{}, &(subsorted.extraction_subset()));

struct PerThreadWorkspace {
PerThreadWorkspace(size_t num_subset, size_t num_labels, const std::vector<PerLabelReference<Index_, Float_> >& ref) :
buffer(num_subset),
curscores(num_labels)
{
vec.reserve(num_subset);
searchers.reserve(num_labels);
for (size_t r = 0; r < num_labels; ++r) {
searchers.emplace_back(ref[r].index->initialize());
}
}
std::vector<Float_> distances;

std::vector<Value_> buffer;
std::vector<std::unique_ptr<knncolle::Searcher<Index_, Float_> > > searchers;
std::vector<Float_> distances;
RankedVector<Value_, Index_> vec;
vec.reserve(num_subset);
FineTuneSingle<Label_, Index_, Float_, Value_> ft;
std::vector<Float_> curscores(num_labels);

for (Index_ c = start, end = start + length; c < end; ++c) {
auto ptr = wrk->fetch(buffer.data());
subsorted.fill_ranks(ptr, vec);
scaled_ranks(vec, buffer.data()); // 'buffer' can be re-used for output here, as all data is already extracted to 'vec'.
std::vector<Float_> curscores;
};

SINGLEPP_CUSTOM_PARALLEL(
num_threads,
test.ncol(),
[&]() -> PerThreadWorkspace {
return PerThreadWorkspace(num_subset, num_labels, ref);
},
[&](size_t, Index_ start, Index_ length, PerThreadWorkspace& work) -> void {
auto ext = tatami::consecutive_extractor<false>(&test, false, start, length, subset_ptr);

for (Index_ c = start, end = start + length; c < end; ++c) {
auto ptr = ext->fetch(work.buffer.data());
subsorted.fill_ranks(ptr, work.vec);
scaled_ranks(work.vec, work.buffer.data()); // 'buffer' can be re-used for output here, as all data is already extracted to 'vec'.

work.curscores.resize(num_labels);
for (size_t r = 0; r < num_labels; ++r) {
size_t k = search_k[r];
work.searchers[r]->search(work.buffer.data(), k, NULL, &(work.distances));

Float_ last = work.distances[k - 1];
last = 1 - 2 * last * last;
if (k == 1) {
work.curscores[r] = last;
} else {
Float_ next = work.distances[k - 2];
next = 1 - 2 * next * next;
work.curscores[r] = coeffs[r].first * next + coeffs[r].second * last;
}

curscores.resize(num_labels);
for (size_t r = 0; r < num_labels; ++r) {
size_t k = search_k[r];
searchers[r]->search(buffer.data(), k, NULL, &distances);
if (scores[r]) {
scores[r][c] = work.curscores[r];
}
}

Float_ last = distances[k - 1];
last = 1 - 2 * last * last;
if (k == 1) {
curscores[r] = last;
std::pair<Label_, Float_> chosen;
if (!fine_tune) {
chosen = find_best_and_delta<Label_>(work.curscores);
} else {
Float_ next = distances[k - 2];
next = 1 - 2 * next * next;
curscores[r] = coeffs[r].first * next + coeffs[r].second * last;
chosen = work.ft.run(work.vec, ref, markers, work.curscores, quantile, threshold);
}

if (scores[r]) {
scores[r][c] = curscores[r];
best[c] = chosen.first;
if (delta) {
delta[c] = chosen.second;
}
}

std::pair<Label_, Float_> chosen;
if (!fine_tune) {
chosen = find_best_and_delta<Label_>(curscores);
} else {
chosen = ft.run(vec, ref, markers, curscores, quantile, threshold);
}
best[c] = chosen.first;
if (delta) {
delta[c] = chosen.second;
}
}

}, test.ncol(), num_threads);
);

return;
}
Expand Down
Loading

0 comments on commit dc97caf

Please sign in to comment.