Skip to content

Commit

Permalink
Update multiprocessing in unsequa (#763)
Browse files Browse the repository at this point in the history
* Update pandas iteritems to items
* Change pd append to concat
* Fix minimal chunksize at 1
* Remove not needed parallel pool chunksize argument
* Remove global matplotlib styles
* Remove deprecated tot_value from unsequa
* Add chunked version of parallel computing
* Remove deprecated numpy vstack of objects
* Feature/order samples unsequa (#766)
* Allow to set loglevel
* Add method to sort samples
* Add advanced examples for unsequa
* Remove logging control
* Remove unecessary output prints
* Update CHANGELOG.md

---------

Co-authored-by: Chahan Kropf <[email protected]>
Co-authored-by: emanuel-schmid <[email protected]>
Co-authored-by: Lukas Riedel <[email protected]>
  • Loading branch information
4 people authored Aug 28, 2023
1 parent ef410e1 commit 8e0d851
Show file tree
Hide file tree
Showing 8 changed files with 2,152 additions and 1,678 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Removed:
- Added method `Exposures.centroids_total_value` to replace the functionality of `Exposures.affected_total_value`. This method is temporary and deprecated. [#702](https://github.com/CLIMADA-project/climada_python/pull/702)
- New method `climada.util.api_client.Client.purge_cache`: utility function to remove outdated files from the local file system to free disk space.
([#737](https://github.com/CLIMADA-project/climada_python/pull/737))
- Added advanced examples in unsequa tutorial for coupled input variables and for handling efficiently the loading of multiple large files [#766](https://github.com/CLIMADA-project/climada_python/pull/766)

### Changed

Expand Down Expand Up @@ -57,6 +58,7 @@ Removed:
- `list_dataset_infos` from `climada.util.api_client.Client`: the `properties` argument, a `dict`, can now have `None` as values. Before, only strings and lists of strings were allowed. Setting a particular property to `None` triggers a search for datasets where this property is not assigned. [#752](https://github.com/CLIMADA-project/climada_python/pull/752)
- Reduce memory requirements of `TropCyclone.from_tracks` [#749](https://github.com/CLIMADA-project/climada_python/pull/749)
- Support for different wind speed and pressure units in `TCTracks` when running `TropCyclone.from_tracks` [#749](https://github.com/CLIMADA-project/climada_python/pull/749)
- Changed the parallel package from Pathos to Multiproess in the unsequa module [#763](https://github.com/CLIMADA-project/climada_python/pull/763)

### Fixed

Expand All @@ -65,6 +67,9 @@ Removed:
- Correctly handle assertion errors in `Centroids.values_from_vector_files` and fix the associated test [#768](https://github.com/CLIMADA-project/climada_python/pull/768/)
- Text in `Forecast` class plots can now be adjusted [#769](https://github.com/CLIMADA-project/climada_python/issues/769)
- `Impact.impact_at_reg` now supports impact matrices where all entries are zero [#773](https://github.com/CLIMADA-project/climada_python/pull/773)
- upgrade pathos 0.3.0 -> 0.3.1 issue [#761](https://github.com/CLIMADA-project/climada_python/issues/761) (for unsequa module [#763](https://github.com/CLIMADA-project/climada_python/pull/763))
- Fix bugs with pandas 2.0 (iteritems -> items, append -> concat) (fix issue [#700](https://github.com/CLIMADA-project/climada_python/issues/700) for unsequa module) [#763](https://github.com/CLIMADA-project/climada_python/pull/763))
- Remove matplotlib styles in unsequa module (fixes issue [#758](https://github.com/CLIMADA-project/climada_python/issues/758)) [#763](https://github.com/CLIMADA-project/climada_python/pull/763)

### Deprecated

Expand All @@ -77,6 +82,7 @@ Removed:
- `Centroids.set_raster_from_pix_bounds` [#721](https://github.com/CLIMADA-project/climada_python/pull/721)
- `requirements/env_developer.yml` environment specs. Use 'extra' requirements when installing the Python package instead [#712](https://github.com/CLIMADA-project/climada_python/pull/712)
- `Impact.tag` attribute. This change is not backwards-compatible with respect to the files written and read by the `Impact` class [#743](https://github.com/CLIMADA-project/climada_python/pull/743)
- `impact.tot_value ` attribute removed from unsequa module [#763](https://github.com/CLIMADA-project/climada_python/pull/763)

## v3.3.2

Expand Down
170 changes: 166 additions & 4 deletions climada/engine/unsequa/calc_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import logging
import copy
import itertools

import datetime as dt

Expand All @@ -47,6 +48,31 @@ class Calc():
Names of the required uncertainty variables.
_metric_names : tuple(str)
Names of the output metrics.
Notes
-----
Parallelization logics: for computation of the uncertainty users may
specify a number N of processes on which to perform the computations in
parallel. Since the computation for each individual sample of the
input parameters is independent of one another, we implemented a simple
distribution on the processes.
1. The samples are divided in N equal sub-sample chunks
2. Each chunk of samples is sent as one to a node for processing
Hence, this is equivalent to the user running the computation N times,
once for each sub-sample.
Note that for each process, all the input variables must be copied once,
and hence each parallel process requires roughly the same amount of memory
as if a single process would be used.
This approach differs from the usual parallelization strategy (where individual
samples are distributed), because each sample requires the entire input data.
With this method, copying data between processes is reduced to a minimum.
Parallelization is currently not available for the sensitivity computation,
as this requires all samples simoultenaously in the current implementation
of the SaLib library.
"""

_input_var_names = ()
Expand Down Expand Up @@ -126,7 +152,7 @@ def distr_dict(self):
distr_dict.update(input_var.distr_dict)
return distr_dict

def est_comp_time(self, n_samples, time_one_run, pool=None):
def est_comp_time(self, n_samples, time_one_run, processes=None):
"""
Estimate the computation time
Expand Down Expand Up @@ -154,8 +180,7 @@ def est_comp_time(self, n_samples, time_one_run, pool=None):
"\n If computation cannot be reduced, consider using"
" a surrogate model https://www.uqlab.com/", time_one_run)

ncpus = pool.ncpus if pool else 1
total_time = n_samples * time_one_run / ncpus
total_time = n_samples * time_one_run / processes
LOGGER.info("\n\nEstimated computaion time: %s\n",
dt.timedelta(seconds=total_time))

Expand Down Expand Up @@ -354,11 +379,118 @@ def sensitivity(self, unc_output, sensitivity_method = 'sobol',

return sens_output

def _multiprocess_chunksize(samples_df, processes):
"""Divides the samples into chunks for multiprocesses computing
The goal is to send to each processing node an equal number
of samples to process. This make the parallel processing anologous
to running the uncertainty assessment independently on each nodes
for a subset of the samples, instead of distributing individual samples
on the nodes dynamically. Hence, all the heavy input variables
are copied/sent once to each node only.
Parameters
----------
samples_df : pd.DataFrame
samples dataframe
processes : int
number of processes
Returns
-------
int
the number of samples in each chunk
"""
return np.ceil(
samples_df.shape[0] / processes
).astype(int)

def _transpose_chunked_data(metrics):
"""Transposes the output metrics lists from one list per
chunk of samples to one list per output metric
[ [x1, [y1, z1]], [x2, [y2, z2]] ] ->
[ [x1, x2], [[y1, z1], [y2, z2]] ]
Parameters
----------
metrics : list
list of list as returned by the uncertainty mapings
Returns
-------
list
list of climada output uncertainty
See Also
--------
calc_impact._map_impact_calc
map for impact uncertainty
calc_cost_benefits._map_costben_calc
map for cost benefit uncertainty
"""
return [
list(itertools.chain.from_iterable(x))
for x in zip(*metrics)
]

def _sample_parallel_iterator(samples, chunksize, **kwargs):
"""
Make iterator over chunks of samples
with repeated kwargs for each chunk.
Parameters
----------
samples : pd.DataFrame
Dataframe of samples
**kwargs : arguments to repeat
Arguments to repeat for parallel computations
Returns
-------
iterator
suitable for methods _map_impact_calc and _map_costben_calc
"""
def _chunker(df, size):
"""
Divide the dataframe into chunks of size number of lines
"""
for pos in range(0, len(df), size):
yield df.iloc[pos:pos + size]

return zip(
_chunker(samples, chunksize),
*(itertools.repeat(item) for item in kwargs.values())
)


def _calc_sens_df(method, problem_sa, sensitivity_kwargs, param_labels, X, unc_df):
"""Compute the sensitifity indices
Parameters
----------
method : str
SALib sensitivity method name
problem_sa :dict
dictionnary for sensitivty method for SALib
sensitivity_kwargs : kwargs
passed on to SALib.method.analyse
param_labels : list(str)
list of name of uncertainty input parameters
X : numpy.ndarray
array of input parameter samples
unc_df : DataFrame
Dataframe containing the uncertainty values
Returns
-------
DataFrame
Values of the sensitivity indices
"""
sens_first_order_dict = {}
sens_second_order_dict = {}
for (submetric_name, metric_unc) in unc_df.iteritems():
for (submetric_name, metric_unc) in unc_df.items():
Y = metric_unc.to_numpy()
if X is not None:
sens_indices = method.analyze(problem_sa, X, Y,
Expand Down Expand Up @@ -404,6 +536,21 @@ def _calc_sens_df(method, problem_sa, sensitivity_kwargs, param_labels, X, unc_d


def _si_param_first(param_labels, sens_indices):
"""Extract the first order sensivity indices from SALib ouput
Parameters
----------
param_labels : list(str)
name of the unceratinty input parameters
sens_indices : dict
sensitivity indidices dictionnary as produced by SALib
Returns
-------
si_names_first_order, param_names_first_order: list, list
Names of the sensivity indices of first order for all input parameters
and Parameter names for each sentivity index
"""
n_params = len(param_labels)

si_name_first_order_list = [
Expand All @@ -421,6 +568,21 @@ def _si_param_first(param_labels, sens_indices):


def _si_param_second(param_labels, sens_indices):
"""Extract second order sensitivity indices
Parameters
----------
param_labels : list(str)
name of the unceratinty input parameters
sens_indices : dict
sensitivity indidices dictionnary as produced by SALib
Returns
-------
si_names_second_order, param_names_second_order, param_names_second_order_2: list, list, list
Names of the sensivity indices of second order for all input parameters
and Pairs of parameter names for each 2nd order sentivity index
"""
n_params = len(param_labels)
si_name_second_order_list = [
key
Expand Down
Loading

0 comments on commit 8e0d851

Please sign in to comment.