Skip to content

Commit

Permalink
enable multiprocess function to use kwargs (#137)
Browse files Browse the repository at this point in the history
* enable multiprocess function to use kwargs

* fix docstring of only public function

* need keywoard arbuments in cpus and verbose now
  • Loading branch information
ismael-mendoza authored Apr 26, 2021
1 parent 6c4d029 commit 3e565d2
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 10 deletions.
4 changes: 2 additions & 2 deletions btk/draw_blends.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,8 @@ def __next__(self):
mini_batch_results = multiprocess(
self.render_mini_batch,
input_args,
self.cpus,
self.verbose,
cpus=self.cpus,
verbose=self.verbose,
)

# join results across mini-batches.
Expand Down
4 changes: 2 additions & 2 deletions btk/measure.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,8 @@ def __next__(self):
measure_results = multiprocess(
self.run_batch,
input_args,
self.cpus,
self.verbose,
cpus=self.cpus,
verbose=self.verbose,
)
if self.verbose:
print("Measurement performed on batch")
Expand Down
41 changes: 35 additions & 6 deletions btk/multiprocess.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,49 @@
"""Tools for multiprocessing in BTK."""
import multiprocessing as mp
from itertools import repeat
from itertools import starmap


def multiprocess(func, input_args, cpus, verbose=False):
"""Sole Function that implements multiprocessing across mini-batches for BTK."""
def _apply_args_and_kwargs(fn, args, kwargs):
return fn(*args, **kwargs)


def _pool_starmap_with_kwargs(pool, fn, args_iter, kwargs_iter):
args_for_starmap = zip(repeat(fn), args_iter, kwargs_iter)
return pool.starmap(_apply_args_and_kwargs, args_for_starmap)


def _starmap_with_kwargs(fn, args_iter, kwargs_iter):
args_for_starmap = zip(repeat(fn), args_iter, kwargs_iter)
return starmap(_apply_args_and_kwargs, args_for_starmap)


def multiprocess(fn, args_iter, kwargs_iter=None, cpus=1, verbose=False):
"""Sole function that implements multiprocessing across mini-batches/batches for BTK.
Args:
fn (function): Function to run in parallel on each positional arguments returned by
`args_iter` and each keyword arguments returned by `kwargs_iter`.
args_iter (iter): Iterator returning positional arguments to be passed in to function for
multiprocessing. This iterator must have a `__len__` method implemented. Each
argument returned by the iterator must be unpackable like: `*args`.
kwargs_iter (iter): Iterator returning keyword arguments to be passed in to
function for multiprocessing. Default value `None` means that no keyword arguments
are passed in. Each element returned by the iterator must be a `dict`.
cpus (int): # of cpus to use for multiprocessing.
verbose (bool): Whether to print information related to multiprocessing
"""
kwargs_iter = repeat({}) if kwargs_iter is None else kwargs_iter
if cpus > 1:
if verbose:
print(
f"Running mini-batch of size {len(input_args)} with multiprocessing with "
f"Running mini-batch of size {len(args_iter)} with multiprocessing with "
f"pool {cpus}"
)
with mp.Pool(processes=cpus) as pool:
results = pool.starmap(func, input_args)
results = _pool_starmap_with_kwargs(pool, fn, args_iter, kwargs_iter)
else:
if verbose:
print(f"Running mini-batch of size {len(input_args)} serial {cpus} times")
results = list(starmap(func, input_args))
print(f"Running mini-batch of size {len(args_iter)} serial {cpus} times")
results = list(_starmap_with_kwargs(fn, args_iter, kwargs_iter))
return results

0 comments on commit 3e565d2

Please sign in to comment.