diff --git a/CMakeLists.txt b/CMakeLists.txt index 88ddd57e..6f82f992 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -35,7 +35,8 @@ add_library("${TARGET_NAME}" MODULE) # Compile CPU library target_sources("${TARGET_NAME}" PRIVATE src/python_bindings.cpp - src/boundary.cpp) + src/boundary.cpp + src/extend.cpp) set_target_properties("${TARGET_NAME}" PROPERTIES CXX_VISIBILITY_PRESET "hidden" diff --git a/PopPUNK/__init__.py b/PopPUNK/__init__.py index 82bed522..79da77c1 100644 --- a/PopPUNK/__init__.py +++ b/PopPUNK/__init__.py @@ -3,7 +3,7 @@ '''PopPUNK (POPulation Partitioning Using Nucleotide Kmers)''' -__version__ = '2.4.3' +__version__ = '2.4.4' # Minimum sketchlib version SKETCHLIB_MAJOR = 1 diff --git a/PopPUNK/models.py b/PopPUNK/models.py index 40e32260..edb73156 100644 --- a/PopPUNK/models.py +++ b/PopPUNK/models.py @@ -971,6 +971,10 @@ def assign(self, X, slope=None): return y +# Wrapper function for LineageFit.__reduce_rank__ to be called by +# multiprocessing threads +def reduce_rank(lower_rank, fit, higher_rank_sparse_mat, n_samples, dtype): + fit.__reduce_rank__(higher_rank_sparse_mat, lower_rank, n_samples, dtype) class LineageFit(ClusterFit): '''Class for fits using the lineage assignment model. Inherits from :class:`ClusterFit`. @@ -998,6 +1002,37 @@ def __init__(self, outPrefix, ranks, use_gpu = False): self.ranks.append(int(rank)) self.use_gpu = use_gpu + def __save_sparse__(self, data, row, col, rank, n_samples, dtype): + '''Save a sparse matrix in coo format + ''' + if self.use_gpu: + data = cp.array(data) + data[data < epsilon] = epsilon + self.nn_dists[rank] = cupyx.scipy.sparse.coo_matrix((data, (cp.array(row), cp.array(col))), + shape=(n_samples, n_samples), + dtype = dtype) + else: + data = np.array(data) + data[data < epsilon] = epsilon + self.nn_dists[rank] = scipy.sparse.coo_matrix((data, (row, col)), + shape=(n_samples, n_samples), + dtype = dtype) + + def __reduce_rank__(self, higher_rank_sparse_mat, lower_rank, n_samples, dtype): + '''Lowers the rank of a fit and saves it + ''' + lower_rank_sparse_mat = \ + poppunk_refine.lowerRank( + higher_rank_sparse_mat, + n_samples, + lower_rank) + self.__save_sparse__(lower_rank_sparse_mat[2], + lower_rank_sparse_mat[0], + lower_rank_sparse_mat[1], + lower_rank, + n_samples, + dtype) + def fit(self, X, accessory): '''Extends :func:`~ClusterFit.fit` @@ -1016,7 +1051,7 @@ def fit(self, X, accessory): ''' # Check if model requires GPU check_and_set_gpu(self.use_gpu, gpu_lib, quit_on_fail = True) - + ClusterFit.fit(self, X) sample_size = int(round(0.5 * (1 + np.sqrt(1 + 8 * X.shape[0])))) if (max(self.ranks) >= sample_size): @@ -1036,18 +1071,7 @@ def fit(self, X, accessory): 0, rank ) - if self.use_gpu: - data = cp.array(data) - data[data < epsilon] = epsilon - self.nn_dists[rank] = cupyx.scipy.sparse.coo_matrix((data,(cp.array(row),cp.array(col))), - shape=(sample_size, sample_size), - dtype = X.dtype) - else: - data = np.array(data) - data[data < epsilon] = epsilon - self.nn_dists[rank] = scipy.sparse.coo_matrix((data, (row, col)), - shape=(sample_size, sample_size), - dtype = X.dtype) + self.__save_sparse__(data, row, col, rank, sample_size, X.dtype) self.fitted = True @@ -1152,7 +1176,6 @@ def extend(self, qqDists, qrDists): y (list of tuples) Edges to include in network ''' - # Check if model requires GPU check_and_set_gpu(self.use_gpu, gpu_lib, quit_on_fail = True) @@ -1160,71 +1183,38 @@ def extend(self, qqDists, qrDists): if self.use_gpu: qqDists = cp.array(qqDists) qrDists = cp.array(qrDists) - + # Reshape qq and qr dist matrices qqSquare = pp_sketchlib.longToSquare(qqDists[:, [self.dist_col]], self.threads) qqSquare[qqSquare < epsilon] = epsilon n_ref = self.nn_dists[self.ranks[0]].shape[0] n_query = qqSquare.shape[1] - qrRect = qrDists[:, [self.dist_col]].reshape(n_query, n_ref) + qrRect = qrDists[:, [self.dist_col]].reshape(n_query, n_ref).T qrRect[qrRect < epsilon] = epsilon - for rank in self.ranks: - # Add the matrices together to make a large square matrix - if self.use_gpu: - full_mat = cupyx.scipy.sparse.bmat([[self.nn_dists[rank], - qrRect.transpose()], - [qrRect,qqSquare]], - format = 'csr', - dtype = self.nn_dists[rank].dtype) - else: - full_mat = scipy.sparse.bmat([[self.nn_dists[rank], - qrRect.transpose()], - [qrRect,qqSquare]], - format = 'csr', - dtype = self.nn_dists[rank].dtype) - - # Reapply the rank to each row, using sparse matrix functions - data = [] - row = [] - col = [] - for row_idx in range(full_mat.shape[0]): - sample_row = full_mat.getrow(row_idx) - if self.use_gpu: - dist_row, dist_col, dist = cupyx.scipy.sparse.find(sample_row) - else: - dist_row, dist_col, dist = scipy.sparse.find(sample_row) - dist[dist < epsilon] = epsilon - dist_idx_sort = np.argsort(dist) - - # Identical to C++ code in matrix_ops.cpp:sparsify_dists - neighbours = 0 - prev_val = -1 - for sort_idx in dist_idx_sort: - if row_idx == dist_col[sort_idx]: - continue - new_val = abs(dist[sort_idx] - prev_val) < epsilon - if (neighbours < rank or new_val): - data.append(dist[sort_idx]) - row.append(row_idx) - col.append(dist_col[sort_idx]) - - if not new_val: - neighbours += 1 - prev_val = data[-1] - else: - break - - if self.use_gpu: - self.nn_dists[rank] = cupyx.scipy.sparse.coo_matrix( - (cp.array(data), (cp.array(row), cp.array(col))), - shape=(full_mat.shape[0], full_mat.shape[0]), - dtype = self.nn_dists[rank].dtype) - else: - self.nn_dists[rank] = scipy.sparse.coo_matrix((data, (row, col)), - shape=(full_mat.shape[0], full_mat.shape[0]), - dtype = self.nn_dists[rank].dtype) + max_rank = max(self.ranks) + rrSparse = self.nn_dists[max_rank] + higher_rank = \ + poppunk_refine.extend( + (rrSparse.row, rrSparse.col, rrSparse.data), + qqSquare, + qrRect, + max_rank, + self.threads) + self.__save_sparse__(higher_rank[2], higher_rank[0], higher_rank[1], + max_rank, n_ref + n_query, rrSparse.dtype) + + # Apply lower ranks + thread_map(partial(reduce_rank, + fit=self, + higher_rank_sparse_mat=higher_rank, + n_samples=n_ref + n_query, + dtype=rrSparse.dtype), + sorted(self.ranks, reverse=True)[1:], + max_workers=self.threads, + disable=True) y = self.assign(min(self.ranks)) return y + diff --git a/src/boundary.cpp b/src/boundary.cpp index d1991fa7..915bb4be 100644 --- a/src/boundary.cpp +++ b/src/boundary.cpp @@ -4,18 +4,13 @@ * Functions to move a network boundary * */ -#include #include // assert #include // floor/sqrt #include // size_t #include -#include #include #include -// Parallel sort -#include - #include "boundary.hpp" const float epsilon = 1E-10; @@ -41,20 +36,6 @@ inline size_t square_to_condensed(const size_t i, const size_t j, return (n * i - ((i * (i + 1)) >> 1) + j - 1 - i); } -// https://stackoverflow.com/a/12399290 -template -std::vector sort_indexes(const T &v, const uint32_t n_threads) { - // initialize original index locations - std::vector idx(v.size()); - std::iota(idx.begin(), idx.end(), 0); - - boost::sort::parallel_stable_sort( - idx.begin(), idx.end(), [&v](long i1, long i2) { return v[i1] < v[i2]; }, - n_threads); - - return idx; -} - // Unnormalised (signed_ distance between a point (x0, y0) and a line defined // by the two points (xmax, 0) and (0, ymax) // Divide by 1/sqrt(xmax^2 + ymax^2) to get distance diff --git a/src/boundary.hpp b/src/boundary.hpp index d29b2448..6aff8162 100644 --- a/src/boundary.hpp +++ b/src/boundary.hpp @@ -1,7 +1,7 @@ /* * - * matrix.hpp - * functions in matrix_ops.cpp + * boundary.hpp + * prototypes for boundary fns * */ #pragma once @@ -10,15 +10,33 @@ #include #include #include +#include +#include #include +// Parallel sort +#include typedef Eigen::Matrix NumpyMatrix; typedef std::tuple, std::vector, std::vector> network_coo; typedef std::vector> edge_tuple; +// https://stackoverflow.com/a/12399290 +template +std::vector sort_indexes(const T &v, const uint32_t n_threads) { + // initialize original index locations + std::vector idx(v.size()); + std::iota(idx.begin(), idx.end(), 0); + + boost::sort::parallel_stable_sort( + idx.begin(), idx.end(), [&v](long i1, long i2) { return v[i1] < v[i2]; }, + n_threads); + + return idx; +} + Eigen::VectorXf assign_threshold(const NumpyMatrix &distMat, const int slope, const float x_max, const float y_max, unsigned int num_threads); diff --git a/src/extend.cpp b/src/extend.cpp new file mode 100644 index 00000000..8f704703 --- /dev/null +++ b/src/extend.cpp @@ -0,0 +1,174 @@ +/* + * + * extend.cpp + * Functions to extend a sparse distance matrix + * + */ +#include // size_t +#include +#include + +#include "extend.hpp" + +const float epsilon = 1E-10; + +// Get indices where each row starts in the sparse matrix +std::vector row_start_indices(const sparse_coo &sparse_rr_mat, + const size_t nr_samples) { + const std::vector i_vec = std::get<0>(sparse_rr_mat); + std::vector row_start_idx(nr_samples + 1); + size_t i_idx = 0; + row_start_idx[0] = 0; + row_start_idx[nr_samples] = i_vec.size(); + for (long i = 1; i < nr_samples; ++i) { + while (i_vec[i_idx] < i) { + i_idx++; + } + row_start_idx[i] = i_idx; + } + return row_start_idx; +} + +template +std::vector combine_vectors(const std::vector> &vec, + const size_t len) { + std::vector all(len); + auto all_it = all.begin(); + for (size_t i = 0; i < vec.size(); ++i) { + std::copy(vec[i].cbegin(), vec[i].cend(), all_it); + all_it += vec[i].size(); + } + return all; +} + +sparse_coo extend(const sparse_coo &sparse_rr_mat, + const NumpyMatrix &qq_mat_square, + const NumpyMatrix &qr_mat_rect, const size_t kNN, + const size_t num_threads) { + const size_t nr_samples = qr_mat_rect.rows(); + const size_t nq_samples = qr_mat_rect.cols(); + + std::vector row_start_idx = + row_start_indices(sparse_rr_mat, nr_samples); + + // ijv vectors + std::vector> dists(nr_samples + nq_samples); + std::vector> i_vec(nr_samples + nq_samples); + std::vector> j_vec(nr_samples + nq_samples); + size_t len = 0; + + std::vector dist_vec = std::get<2>(sparse_rr_mat); +#pragma omp parallel for schedule(static) num_threads(num_threads) reduction(+:len) + for (long i = 0; i < nr_samples + nq_samples; ++i) { + // Extract the dists for the row from the qr (dense) and rr (sparse) + // matrices + Eigen::VectorXf rr_dists, qr_dists; + if (i < nr_samples) { + qr_dists = qr_mat_rect.row(i); + Eigen::Map rr_map(dist_vec.data() + row_start_idx[i], + row_start_idx[i + 1] - + row_start_idx[i]); + rr_dists = rr_map; + } else { + rr_dists = qr_mat_rect.col(i - nr_samples); + qr_dists = qq_mat_square.row(i - nr_samples); + } + + // Sort these. Then do a merge below + std::vector qr_ordered_idx = sort_indexes(qr_dists, 1); + std::vector rr_ordered_idx = sort_indexes(rr_dists, 1); + + // See sparsify_dists in pp_sketchlib. + // This is very similar, but merging two lists as input + long unique_neighbors = 0; + float prev_value = -1; + auto rr_it = rr_ordered_idx.cbegin(); + auto qr_it = qr_ordered_idx.cbegin(); + while (qr_it != qr_ordered_idx.cend() && rr_it != rr_ordered_idx.cend()) { + // Get the next smallest dist, and corresponding j + long j; + float dist; + if (rr_it == rr_ordered_idx.cend() || + (!(qr_it == qr_ordered_idx.cend()) && + qr_dists[*qr_it] <= rr_dists[*rr_it])) { + j = *qr_it + nr_samples; + dist = qr_dists[*qr_it]; + ++qr_it; + } else { + if (i < nr_samples) { + j = std::get<1>(sparse_rr_mat)[row_start_idx[i] + *rr_it]; + } else { + j = *rr_it; + } + dist = rr_dists[*rr_it]; + ++rr_it; + } + + if (j == i) { + continue; + } + bool new_val = abs(dist - prev_value) >= epsilon; + if (unique_neighbors < kNN || !new_val) { + dists[i].push_back(dist); + i_vec[i].push_back(i); + j_vec[i].push_back(j); + if (new_val) { + unique_neighbors++; + prev_value = dist; + } + } else { + break; // next i + } + } + len += dists[i].size(); + } + + // Combine the lists from each thread + std::vector dists_all = combine_vectors(dists, len); + std::vector i_vec_all = combine_vectors(i_vec, len); + std::vector j_vec_all = combine_vectors(j_vec, len); + + return (std::make_tuple(i_vec_all, j_vec_all, dists_all)); +} + +sparse_coo lower_rank(const sparse_coo &sparse_rr_mat, const size_t n_samples, + const size_t kNN) { + std::vector row_start_idx = row_start_indices(sparse_rr_mat, n_samples); + + // ijv vectors + std::vector dists; + std::vector i_vec; + std::vector j_vec; + std::vector dist_vec = std::get<2>(sparse_rr_mat); + const std::vector j_sparse = std::get<1>(sparse_rr_mat); + for (long i = 0; i < n_samples; ++i) { + Eigen::Map rr_dists(dist_vec.data() + row_start_idx[i], + row_start_idx[i + 1] - + row_start_idx[i]); + std::vector rr_ordered_idx = sort_indexes(rr_dists, 1); + + long unique_neighbors = 0; + float prev_value = -1; + for (auto rr_it = rr_ordered_idx.cbegin(); rr_it != rr_ordered_idx.cend(); + ++rr_it) { + long j = std::get<1>(sparse_rr_mat)[row_start_idx[i] + *rr_it]; + float dist = rr_dists[*rr_it]; + if (j == i) { + continue; + } + bool new_val = abs(dist - prev_value) >= epsilon; + if (unique_neighbors < kNN || !new_val) { + dists.push_back(dist); + i_vec.push_back(i); + j_vec.push_back(j); + if (new_val) { + unique_neighbors++; + prev_value = dist; + } + } else { + break; // next i + } + } + } + return (std::make_tuple(i_vec, j_vec, dists)); +} diff --git a/src/extend.hpp b/src/extend.hpp new file mode 100644 index 00000000..9dd4cb49 --- /dev/null +++ b/src/extend.hpp @@ -0,0 +1,14 @@ +#pragma once + +#include "boundary.hpp" + +typedef std::tuple, std::vector, std::vector> + sparse_coo; + +sparse_coo extend(const sparse_coo &sparse_rr_mat, + const NumpyMatrix &qq_mat_square, + const NumpyMatrix &qr_mat_rect, const size_t kNN, + const size_t num_threads); + +sparse_coo lower_rank(const sparse_coo &sparse_rr_mat, const size_t n_samples, + const size_t kNN); diff --git a/src/python_bindings.cpp b/src/python_bindings.cpp index 1fdf6634..cdf1ddd1 100644 --- a/src/python_bindings.cpp +++ b/src/python_bindings.cpp @@ -12,6 +12,7 @@ namespace py = pybind11; #include "boundary.hpp" +#include "extend.hpp" // Wrapper which makes a ref to the python/numpy array Eigen::VectorXf assignThreshold(const Eigen::Ref &distMat, @@ -31,27 +32,18 @@ edge_tuple edgeThreshold(const Eigen::Ref &distMat, } edge_tuple generateTuples(const std::vector &assignments, - const int within_label, - bool self, - const int num_ref, - const int int_offset) { - edge_tuple edges = generate_tuples(assignments, - within_label, - self, - num_ref, - int_offset); - return (edges); + const int within_label, bool self, const int num_ref, + const int int_offset) { + edge_tuple edges = + generate_tuples(assignments, within_label, self, num_ref, int_offset); + return (edges); } -edge_tuple generateAllTuples(const int num_ref, - const int num_queries, - bool self = true, - const int int_offset = 0) { - edge_tuple edges = generate_all_tuples(num_ref, - num_queries, - self, - int_offset); - return (edges); +edge_tuple generateAllTuples(const int num_ref, const int num_queries, + bool self = true, const int int_offset = 0) { + edge_tuple edges = + generate_all_tuples(num_ref, num_queries, self, int_offset); + return (edges); } network_coo thresholdIterate1D(const Eigen::Ref &distMat, @@ -99,21 +91,15 @@ PYBIND11_MODULE(poppunk_refine, m) { m.def("generateTuples", &generateTuples, py::return_value_policy::reference_internal, - "Return edge tuples based on assigned groups", - py::arg("assignments"), - py::arg("within_label"), - py::arg("self") = true, - py::arg("num_ref") = 0, + "Return edge tuples based on assigned groups", py::arg("assignments"), + py::arg("within_label"), py::arg("self") = true, py::arg("num_ref") = 0, py::arg("int_offset") = 0); m.def("generateAllTuples", &generateAllTuples, - py::return_value_policy::reference_internal, - "Return all edge tuples", - py::arg("num_ref"), - py::arg("num_queries") = 0, - py::arg("self") = true, + py::return_value_policy::reference_internal, "Return all edge tuples", + py::arg("num_ref"), py::arg("num_queries") = 0, py::arg("self") = true, py::arg("int_offset") = 0); - + m.def("thresholdIterate1D", &thresholdIterate1D, py::return_value_policy::reference_internal, "Move a 2D boundary to grow a network by adding edges at each offset", @@ -125,4 +111,14 @@ PYBIND11_MODULE(poppunk_refine, m) { py::return_value_policy::reference_internal, "Move a 2D boundary to grow a network by adding edges at each offset", py::arg("distMat").noconvert(), py::arg("x_max"), py::arg("y_max")); + + m.def("extend", &extend, py::return_value_policy::reference_internal, + "Extend a sparse distance matrix keeping k nearest-neighbours", + py::arg("rr_mat"), py::arg("qq_mat").noconvert(), + py::arg("qr_mat").noconvert(), py::arg("kNN"), + py::arg("num_threads") = 1); + + m.def("lowerRank", &lower_rank, py::return_value_policy::reference_internal, + "Extend a sparse distance matrix keeping k nearest-neighbours", + py::arg("rr_mat"), py::arg("n_samples"), py::arg("kNN")); } diff --git a/test/test-update.py b/test/test-update.py index 22aa630e..12b1b9d0 100755 --- a/test/test-update.py +++ b/test/test-update.py @@ -33,6 +33,11 @@ def compare_sparse_matrices(d1,d2,r1,r2): d2_pairs = get_seq_tuples(d2.row,d2.col,r2) d1_dists = [] d2_dists = [] + if (len(d1_pairs) != len(d2_pairs)): + sys.stderr.write("Distance matrix number of entries differ!\n") + print(d1_pairs) + print(d2_pairs) + sys.exit(1) for (pair1,dist1) in zip(d1_pairs,d1.data): for (pair2,dist2) in zip(d2_pairs,d2.data): @@ -91,6 +96,11 @@ def old_get_seq_tuples(rows,cols): S2 = scipy.sparse.load_npz("batch2/batch2_rank2_fit.npz") compare_sparse_matrices(S1,S2,rlist1,rlist2) +# Check rank 1 +S3 = scipy.sparse.load_npz("batch12/batch12_rank1_fit.npz") +S4 = scipy.sparse.load_npz("batch2/batch2_rank1_fit.npz") +compare_sparse_matrices(S3,S4,rlist1,rlist2) + # Check distances after second query # Check that order is the same after doing 1 + 2 + 3 with --update-db, as doing all of 1 + 2 + 3 together @@ -118,7 +128,12 @@ def old_get_seq_tuples(rows,cols): # Check sparse distances after second query with open("batch123/batch123.dists.pkl", 'rb') as pickle_file: rlist3, qlist, self = pickle.load(pickle_file) -S3 = scipy.sparse.load_npz("batch123/batch123_rank2_fit.npz") -S4 = scipy.sparse.load_npz("batch3/batch3_rank2_fit.npz") +S5 = scipy.sparse.load_npz("batch123/batch123_rank2_fit.npz") +S6 = scipy.sparse.load_npz("batch3/batch3_rank2_fit.npz") + +compare_sparse_matrices(S5,S6,rlist3,rlist4) -compare_sparse_matrices(S3,S4,rlist3,rlist4) +# Check rank 1 +S7 = scipy.sparse.load_npz("batch123/batch123_rank1_fit.npz") +S8 = scipy.sparse.load_npz("batch3/batch3_rank1_fit.npz") +compare_sparse_matrices(S7,S8,rlist3,rlist4)