Skip to content

Commit

Permalink
Use numba pthreads for parallelisation of assignment to refined models
Browse files Browse the repository at this point in the history
  • Loading branch information
nickjcroucher committed May 7, 2020
1 parent 8cb08c9 commit 513729a
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 30 deletions.
50 changes: 28 additions & 22 deletions PopPUNK/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import scipy.optimize
from scipy.spatial.distance import euclidean
from scipy import stats
from numba import prange, njit
import collections
from functools import partial
try:
Expand Down Expand Up @@ -619,10 +620,10 @@ def fit(self, X, sample_names, model, max_move, min_move, startFile = None, indi
sys.stderr.write("Could not separately refine core and accessory boundaries. "
"Using joint 2D refinement only.\n")

y = self.assign(X)
y = self.assign(X, num_processes = threads)
return y

def apply_threshold(self, X, threshold):
def apply_threshold(self, X, threshold, num_processes = 1):
'''Applies a boundary threshold, given by user. Does not run
optimisation.
Expand Down Expand Up @@ -659,7 +660,7 @@ def apply_threshold(self, X, threshold):
self.threshold = True
self.indiv_fitted = False

y = self.assign(X)
y = self.assign(X, num_processes = 1)
return y

def save(self):
Expand Down Expand Up @@ -727,8 +728,7 @@ def plot(self, X, y=None):
self.max_move, self.scale, self.threshold, self.indiv_fitted, "Refined fit boundary",
self.outPrefix + "/" + os.path.basename(self.outPrefix) + "_refined_fit")


def assign(self, distMat, slope=None):
def assign(self, distMat, slope = None, num_processes = 1):
'''Assign the clustering of new samples using :func:`~PopPUNK.refine.withinBoundary`
Args:
Expand All @@ -750,33 +750,39 @@ def assign(self, distMat, slope=None):
x_max = self.optimal_x
y_max = self.optimal_y
if slope == 2 or (slope == None and self.slope == 2):
# y = withinBoundary(X/self.scale, self.optimal_x, self.optimal_y)
slope = 2
elif slope == 0 or (slope == None and self.slope == 0):
# y = withinBoundary(X/self.scale, self.core_boundary, 0, slope=0)
x_max = self.core_boundary
y_max = 0
slope = 0
elif slope == 1 or (slope == None and self.slope == 1):
# y = withinBoundary(X/self.scale, 0, self.accessory_boundary, slope=1)
x_max = 0
y_max = self.accessory_boundary
slope = 1

# run parallelised assignment
with SharedMemoryManager() as smm:
num_processes = 4
scaled_distMat = distMat/self.scale
coord_ranges = get_chunk_ranges(scaled_distMat.shape[0], num_processes)
scaled_distMat_array, scaled_distMat_raw = get_shared_memory_version(scaled_distMat, smm)

with Pool(processes = num_processes) as pool:
pooled_boundary_assignments = pool.map(partial(withinBoundary,
dists = scaled_distMat_array,
x_max = x_max,
y_max = y_max,
slope = slope),
coord_ranges)
y = np.concatenate(pooled_boundary_assignments)
scaled_distMat = distMat/self.scale
pooled_boundary_assignments = parallel_assign_boundaries(scaled_distMat = scaled_distMat,
x_max = x_max,
y_max = y_max,
slope = slope,
num_processes = num_processes)

y = np.concatenate(pooled_boundary_assignments)

return y

@njit(parallel=True)
def parallel_assign_boundaries(scaled_distMat = None, x_max = None, y_max = None, slope = None, num_processes = None):
coord_ranges = get_chunk_ranges(scaled_distMat.shape[0], num_processes)
pooled_boundary_assignments = []
for i in range(num_processes):
pooled_boundary_assignments.append(np.ones(coord_ranges[i][1] - coord_ranges[i][0]))
for i in prange(num_processes):
pooled_boundary_assignments[i] = withinBoundary(coord_ranges[i],
dists = scaled_distMat,
x_max = x_max,
y_max = y_max,
slope = slope)
return pooled_boundary_assignments

16 changes: 8 additions & 8 deletions PopPUNK/refine.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from .utils import get_chunk_ranges

def refineFit(distMat, sample_names, start_s, mean0, mean1,
max_move, min_move, slope = 2, no_local = False, num_processes = 4):
max_move, min_move, slope = 2, no_local = False, num_processes = 1):
"""Try to refine a fit by maximising a network score based on transitivity and density.
Iteratively move the decision boundary to do this, using starting point from existing model.
Expand Down Expand Up @@ -126,8 +126,8 @@ def refineFit(distMat, sample_names, start_s, mean0, mean1,
return start_point, optimal_x, optimal_y


#@jit(nopython=False)
def withinBoundary(coords, dists = None, x_max = None, y_max = None, slope=2):
@jit(nopython=True)
def withinBoundary(coords, dists = None, x_max = None, y_max = None, slope = 2):
"""Classifies points as within or outside of a refined boundary.
Numba JIT compiled for speed.
Expand All @@ -149,8 +149,8 @@ def withinBoundary(coords, dists = None, x_max = None, y_max = None, slope=2):
0 if exactly on boundary.
"""
# load distMat slice
dists_shm = shared_memory.SharedMemory(name = dists.name)
dists = np.ndarray(dists.shape, dtype = dists.dtype, buffer = dists_shm.buf)
# dists_shm = shared_memory.SharedMemory(name = dists.name)
# dists = np.ndarray(dists.shape, dtype = dists.dtype, buffer = dists_shm.buf)

# See https://stackoverflow.com/questions/2049582/how-to-determine-if-a-point-is-in-a-2d-triangle
# x_max and y_max from decisionBoundary
Expand Down Expand Up @@ -197,9 +197,9 @@ def newNetwork(s, sample_names, distMat, start_point, mean1, gradient, slope=2):
score (float)
-1 * network score. Where network score is from :func:`~PopPUNK.network.networkSummary`
"""
# if isinstance(distMat, NumpyShared):
# distMat_shm = shared_memory.SharedMemory(name = distMat.name)
# distMat = np.ndarray(distMat.shape, dtype = distMat.dtype, buffer = distMat_shm.buf)
if isinstance(distMat, NumpyShared):
distMat_shm = shared_memory.SharedMemory(name = distMat.name)
distMat = np.ndarray(distMat.shape, dtype = distMat.dtype, buffer = distMat_shm.buf)

# Set up boundary
new_intercept = transformLine(s, start_point, mean1)
Expand Down
2 changes: 2 additions & 0 deletions PopPUNK/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from collections import defaultdict
from tempfile import mkstemp
from functools import partial
from numba import jit
import collections
try:
from multiprocessing import Pool, shared_memory
Expand Down Expand Up @@ -574,6 +575,7 @@ def readRfile(rFile, oneSeq=False):

return (names, sequences)

@jit(nopython=True)
def get_chunk_ranges(N, nb):
""" Calculates boundaries for dividing distances array
into chunks for parallelisation.
Expand Down

0 comments on commit 513729a

Please sign in to comment.