From 513729ad2d26e5d55102408420f6c22f261108c6 Mon Sep 17 00:00:00 2001 From: nickjcroucher Date: Thu, 7 May 2020 11:39:00 +0100 Subject: [PATCH] Use numba pthreads for parallelisation of assignment to refined models --- PopPUNK/models.py | 50 ++++++++++++++++++++++++++--------------------- PopPUNK/refine.py | 16 +++++++-------- PopPUNK/utils.py | 2 ++ 3 files changed, 38 insertions(+), 30 deletions(-) diff --git a/PopPUNK/models.py b/PopPUNK/models.py index 2e6ee4cb..466c16c1 100644 --- a/PopPUNK/models.py +++ b/PopPUNK/models.py @@ -16,6 +16,7 @@ import scipy.optimize from scipy.spatial.distance import euclidean from scipy import stats +from numba import prange, njit import collections from functools import partial try: @@ -619,10 +620,10 @@ def fit(self, X, sample_names, model, max_move, min_move, startFile = None, indi sys.stderr.write("Could not separately refine core and accessory boundaries. " "Using joint 2D refinement only.\n") - y = self.assign(X) + y = self.assign(X, num_processes = threads) return y - def apply_threshold(self, X, threshold): + def apply_threshold(self, X, threshold, num_processes = 1): '''Applies a boundary threshold, given by user. Does not run optimisation. @@ -659,7 +660,7 @@ def apply_threshold(self, X, threshold): self.threshold = True self.indiv_fitted = False - y = self.assign(X) + y = self.assign(X, num_processes = 1) return y def save(self): @@ -727,8 +728,7 @@ def plot(self, X, y=None): self.max_move, self.scale, self.threshold, self.indiv_fitted, "Refined fit boundary", self.outPrefix + "/" + os.path.basename(self.outPrefix) + "_refined_fit") - - def assign(self, distMat, slope=None): + def assign(self, distMat, slope = None, num_processes = 1): '''Assign the clustering of new samples using :func:`~PopPUNK.refine.withinBoundary` Args: @@ -750,33 +750,39 @@ def assign(self, distMat, slope=None): x_max = self.optimal_x y_max = self.optimal_y if slope == 2 or (slope == None and self.slope == 2): -# y = withinBoundary(X/self.scale, self.optimal_x, self.optimal_y) slope = 2 elif slope == 0 or (slope == None and self.slope == 0): -# y = withinBoundary(X/self.scale, self.core_boundary, 0, slope=0) x_max = self.core_boundary y_max = 0 slope = 0 elif slope == 1 or (slope == None and self.slope == 1): -# y = withinBoundary(X/self.scale, 0, self.accessory_boundary, slope=1) x_max = 0 y_max = self.accessory_boundary slope = 1 # run parallelised assignment - with SharedMemoryManager() as smm: - num_processes = 4 - scaled_distMat = distMat/self.scale - coord_ranges = get_chunk_ranges(scaled_distMat.shape[0], num_processes) - scaled_distMat_array, scaled_distMat_raw = get_shared_memory_version(scaled_distMat, smm) - - with Pool(processes = num_processes) as pool: - pooled_boundary_assignments = pool.map(partial(withinBoundary, - dists = scaled_distMat_array, - x_max = x_max, - y_max = y_max, - slope = slope), - coord_ranges) - y = np.concatenate(pooled_boundary_assignments) + scaled_distMat = distMat/self.scale + pooled_boundary_assignments = parallel_assign_boundaries(scaled_distMat = scaled_distMat, + x_max = x_max, + y_max = y_max, + slope = slope, + num_processes = num_processes) + + y = np.concatenate(pooled_boundary_assignments) return y + +@njit(parallel=True) +def parallel_assign_boundaries(scaled_distMat = None, x_max = None, y_max = None, slope = None, num_processes = None): + coord_ranges = get_chunk_ranges(scaled_distMat.shape[0], num_processes) + pooled_boundary_assignments = [] + for i in range(num_processes): + pooled_boundary_assignments.append(np.ones(coord_ranges[i][1] - coord_ranges[i][0])) + for i in prange(num_processes): + pooled_boundary_assignments[i] = withinBoundary(coord_ranges[i], + dists = scaled_distMat, + x_max = x_max, + y_max = y_max, + slope = slope) + return pooled_boundary_assignments + diff --git a/PopPUNK/refine.py b/PopPUNK/refine.py index 0c6895e1..79b8ee3c 100644 --- a/PopPUNK/refine.py +++ b/PopPUNK/refine.py @@ -26,7 +26,7 @@ from .utils import get_chunk_ranges def refineFit(distMat, sample_names, start_s, mean0, mean1, - max_move, min_move, slope = 2, no_local = False, num_processes = 4): + max_move, min_move, slope = 2, no_local = False, num_processes = 1): """Try to refine a fit by maximising a network score based on transitivity and density. Iteratively move the decision boundary to do this, using starting point from existing model. @@ -126,8 +126,8 @@ def refineFit(distMat, sample_names, start_s, mean0, mean1, return start_point, optimal_x, optimal_y -#@jit(nopython=False) -def withinBoundary(coords, dists = None, x_max = None, y_max = None, slope=2): +@jit(nopython=True) +def withinBoundary(coords, dists = None, x_max = None, y_max = None, slope = 2): """Classifies points as within or outside of a refined boundary. Numba JIT compiled for speed. @@ -149,8 +149,8 @@ def withinBoundary(coords, dists = None, x_max = None, y_max = None, slope=2): 0 if exactly on boundary. """ # load distMat slice - dists_shm = shared_memory.SharedMemory(name = dists.name) - dists = np.ndarray(dists.shape, dtype = dists.dtype, buffer = dists_shm.buf) +# dists_shm = shared_memory.SharedMemory(name = dists.name) +# dists = np.ndarray(dists.shape, dtype = dists.dtype, buffer = dists_shm.buf) # See https://stackoverflow.com/questions/2049582/how-to-determine-if-a-point-is-in-a-2d-triangle # x_max and y_max from decisionBoundary @@ -197,9 +197,9 @@ def newNetwork(s, sample_names, distMat, start_point, mean1, gradient, slope=2): score (float) -1 * network score. Where network score is from :func:`~PopPUNK.network.networkSummary` """ -# if isinstance(distMat, NumpyShared): -# distMat_shm = shared_memory.SharedMemory(name = distMat.name) -# distMat = np.ndarray(distMat.shape, dtype = distMat.dtype, buffer = distMat_shm.buf) + if isinstance(distMat, NumpyShared): + distMat_shm = shared_memory.SharedMemory(name = distMat.name) + distMat = np.ndarray(distMat.shape, dtype = distMat.dtype, buffer = distMat_shm.buf) # Set up boundary new_intercept = transformLine(s, start_point, mean1) diff --git a/PopPUNK/utils.py b/PopPUNK/utils.py index 99b0f818..cf895fc2 100644 --- a/PopPUNK/utils.py +++ b/PopPUNK/utils.py @@ -12,6 +12,7 @@ from collections import defaultdict from tempfile import mkstemp from functools import partial +from numba import jit import collections try: from multiprocessing import Pool, shared_memory @@ -574,6 +575,7 @@ def readRfile(rFile, oneSeq=False): return (names, sequences) +@jit(nopython=True) def get_chunk_ranges(N, nb): """ Calculates boundaries for dividing distances array into chunks for parallelisation.