Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/unsequa multiprocessing #763

Merged
merged 34 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
8a325ae
Update pandas iteritems to items
Aug 3, 2023
b3d2f00
Change pd append to concat
Aug 3, 2023
0456873
Replace pathos with multiprocess for unsequa
Aug 3, 2023
4ab0972
Fix minimal chunksize at 1
Aug 3, 2023
85c5a28
Remove global matplotlib styles
Aug 3, 2023
9bc4589
Remove deprecated tot_value
Aug 3, 2023
fb0e361
Adjust tests remove tot_valu
Aug 3, 2023
91c4f1e
Remove tot_value from unsequa
Aug 3, 2023
c36f0f6
Add docstring for processes
Aug 3, 2023
416e949
Add changelog details
Aug 4, 2023
301c0b0
Merge branch 'develop' into feature/unsequa_multiprocessing
emanuel-schmid Aug 4, 2023
429fe76
Add chunked version of parallel computing.
Aug 21, 2023
13a4a9f
Make chunksize more efficient by default
Aug 21, 2023
8c45b49
Update docstring
Aug 22, 2023
4e77b86
Change cost benefit to use chunks in parallel
Aug 22, 2023
c678791
Merge branch 'develop' into feature/unsequa_multiprocessing
Aug 23, 2023
2d6130c
Remove deprecated numpy vstack of objects
Aug 24, 2023
a21abe3
Feature/order samples unsequa (#766)
chahank Aug 24, 2023
e869f95
Update climada/engine/unsequa/calc_impact.py
chahank Aug 25, 2023
7fb0835
Update climada/engine/unsequa/calc_cost_benefit.py
chahank Aug 25, 2023
6ad26a0
Make sample iterator to single function
Aug 25, 2023
d67651e
Add comment on copy statement
Aug 25, 2023
afb8147
Remove not needed parallel pool chunksize argument
Aug 25, 2023
9ca7081
Make chunksize base function
Aug 25, 2023
ede1c56
Make transpose data function
Aug 25, 2023
6e7d24f
Import pathos instead of multiprocessing
Aug 25, 2023
261415c
Make method for uncertainty computation step
Aug 25, 2023
f90a0b4
Improve docstrings
Aug 25, 2023
15b5579
Improve docstring
Aug 28, 2023
abcc548
Add description of parallelization logic
Aug 28, 2023
b6bf301
Update climada/engine/unsequa/calc_base.py
chahank Aug 28, 2023
e324fbc
Update climada/engine/unsequa/calc_base.py
chahank Aug 28, 2023
01d5a85
Update climada/engine/unsequa/calc_impact.py
chahank Aug 28, 2023
6b6512f
Update docstrings
Aug 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Removed:
- Added method `Exposures.centroids_total_value` to replace the functionality of `Exposures.affected_total_value`. This method is temporary and deprecated. [#702](https://github.com/CLIMADA-project/climada_python/pull/702)
- New method `climada.util.api_client.Client.purge_cache`: utility function to remove outdated files from the local file system to free disk space.
([#737](https://github.com/CLIMADA-project/climada_python/pull/737))
- Added advanced examples in unsequa tutorial for coupled input variables and for handling efficiently the loading of multiple large files [#766](https://github.com/CLIMADA-project/climada_python/pull/766)

### Changed

Expand Down Expand Up @@ -57,6 +58,7 @@ Removed:
- `list_dataset_infos` from `climada.util.api_client.Client`: the `properties` argument, a `dict`, can now have `None` as values. Before, only strings and lists of strings were allowed. Setting a particular property to `None` triggers a search for datasets where this property is not assigned. [#752](https://github.com/CLIMADA-project/climada_python/pull/752)
- Reduce memory requirements of `TropCyclone.from_tracks` [#749](https://github.com/CLIMADA-project/climada_python/pull/749)
- Support for different wind speed and pressure units in `TCTracks` when running `TropCyclone.from_tracks` [#749](https://github.com/CLIMADA-project/climada_python/pull/749)
- Changed the parallel package from Pathos to Multiproess in the unsequa module [#763](https://github.com/CLIMADA-project/climada_python/pull/763)

### Fixed

Expand All @@ -65,6 +67,9 @@ Removed:
- Correctly handle assertion errors in `Centroids.values_from_vector_files` and fix the associated test [#768](https://github.com/CLIMADA-project/climada_python/pull/768/)
- Text in `Forecast` class plots can now be adjusted [#769](https://github.com/CLIMADA-project/climada_python/issues/769)
- `Impact.impact_at_reg` now supports impact matrices where all entries are zero [#773](https://github.com/CLIMADA-project/climada_python/pull/773)
- upgrade pathos 0.3.0 -> 0.3.1 issue [#761](https://github.com/CLIMADA-project/climada_python/issues/761) (for unsequa module [#763](https://github.com/CLIMADA-project/climada_python/pull/763))
- Fix bugs with pandas 2.0 (iteritems -> items, append -> concat) (fix issue [#700](https://github.com/CLIMADA-project/climada_python/issues/700) for unsequa module) [#763](https://github.com/CLIMADA-project/climada_python/pull/763))
- Remove matplotlib styles in unsequa module (fixes issue [#758](https://github.com/CLIMADA-project/climada_python/issues/758)) [#763](https://github.com/CLIMADA-project/climada_python/pull/763)

### Deprecated

Expand All @@ -77,6 +82,7 @@ Removed:
- `Centroids.set_raster_from_pix_bounds` [#721](https://github.com/CLIMADA-project/climada_python/pull/721)
- `requirements/env_developer.yml` environment specs. Use 'extra' requirements when installing the Python package instead [#712](https://github.com/CLIMADA-project/climada_python/pull/712)
- `Impact.tag` attribute. This change is not backwards-compatible with respect to the files written and read by the `Impact` class [#743](https://github.com/CLIMADA-project/climada_python/pull/743)
- `impact.tot_value ` attribute removed from unsequa module [#763](https://github.com/CLIMADA-project/climada_python/pull/763)

## v3.3.2

Expand Down
170 changes: 166 additions & 4 deletions climada/engine/unsequa/calc_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import logging
import copy
import itertools

import datetime as dt

Expand All @@ -47,6 +48,31 @@ class Calc():
Names of the required uncertainty variables.
_metric_names : tuple(str)
Names of the output metrics.

Notes
-----
Parallelization logics: for computation of the uncertainty users may
specify a number N of processes on which to perform the computations in
parallel. Since the computation for each individual sample of the
input parameters is independent of one another, we implemented a simple
distribution on the processes.

1. The samples are divided in N equal sub-sample chunks
2. Each chunk of samples is sent as one to a node for processing
chahank marked this conversation as resolved.
Show resolved Hide resolved

Hence, this is equivalent to the user running the computation N times,
once for each sub-sample.
Note that for each process, all the input variables must be copied once,
and hence each parallel process requires roughly the same amount of memory
as if a single process would be used.

This approach differs from the usual parallelization strategy (where individual
samples are distributed), because each sample requires the entire input data.
With this method, copying data between processes is reduced to a minimum.

Parallelization is currently not available for the sensitivity computation,
as this requires all samples simoultenaously in the current implementation
of the SaLib library.
"""

_input_var_names = ()
Expand Down Expand Up @@ -126,7 +152,7 @@ def distr_dict(self):
distr_dict.update(input_var.distr_dict)
return distr_dict

def est_comp_time(self, n_samples, time_one_run, pool=None):
def est_comp_time(self, n_samples, time_one_run, processes=None):
"""
Estimate the computation time

Expand Down Expand Up @@ -154,8 +180,7 @@ def est_comp_time(self, n_samples, time_one_run, pool=None):
"\n If computation cannot be reduced, consider using"
" a surrogate model https://www.uqlab.com/", time_one_run)

ncpus = pool.ncpus if pool else 1
total_time = n_samples * time_one_run / ncpus
total_time = n_samples * time_one_run / processes
LOGGER.info("\n\nEstimated computaion time: %s\n",
dt.timedelta(seconds=total_time))

Expand Down Expand Up @@ -354,11 +379,118 @@ def sensitivity(self, unc_output, sensitivity_method = 'sobol',

return sens_output

def _multiprocess_chunksize(samples_df, processes):
"""Divides the samples into chunks for multiprocesses computing

The goal is to send to each processing node an equal number
of samples to process. This make the parallel processing anologous
to running the uncertainty assessment independently on each nodes
for a subset of the samples, instead of distributing individual samples
on the nodes dynamically. Hence, all the heavy input variables
are copied/sent once to each node only.

Parameters
----------
samples_df : pd.DataFrame
samples dataframe
processes : int
number of processes

Returns
-------
int
the number of samples in each chunk
"""
return np.ceil(
samples_df.shape[0] / processes
).astype(int)

def _transpose_chunked_data(metrics):
"""Transposes the output metrics lists from one list per
chunk of samples to one list per output metric

[ [x1, [y1, z1]], [x2, [y2, z2]] ] ->
[ [x1, x2], [[y1, z1], [y2, z2]] ]

Parameters
----------
metrics : list
list of list as returned by the uncertainty mapings

Returns
-------
list
list of climada output uncertainty

See Also
--------
calc_impact._map_impact_calc
map for impact uncertainty
calc_cost_benefits._map_costben_calc
map for cost benefit uncertainty
"""
return [
list(itertools.chain.from_iterable(x))
for x in zip(*metrics)
]

def _sample_parallel_iterator(samples, chunksize, **kwargs):
"""
Make iterator over chunks of samples
with repeated kwargs for each chunk.

Parameters
----------
samples : pd.DataFrame
Dataframe of samples
**kwargs : arguments to repeat
Arguments to repeat for parallel computations

Returns
-------
iterator
suitable for methods _map_impact_calc and _map_costben_calc

"""
def _chunker(df, size):
"""
Divide the dataframe into chunks of size number of lines
"""
for pos in range(0, len(df), size):
yield df.iloc[pos:pos + size]

return zip(
_chunker(samples, chunksize),
*(itertools.repeat(item) for item in kwargs.values())
)


def _calc_sens_df(method, problem_sa, sensitivity_kwargs, param_labels, X, unc_df):
"""Compute the sensitifity indices

Parameters
----------
method : str
SALib sensitivity method name
problem_sa :dict
dictionnary for sensitivty method for SALib
sensitivity_kwargs : kwargs
passed on to SALib.method.analyse
param_labels : list(str)
list of name of uncertainty input parameters
X : numpy.ndarray
array of input parameter samples
unc_df : DataFrame
Dataframe containing the uncertainty values

Returns
-------
DataFrame
Values of the sensitivity indices
"""
sens_first_order_dict = {}
sens_second_order_dict = {}
for (submetric_name, metric_unc) in unc_df.iteritems():
for (submetric_name, metric_unc) in unc_df.items():
Y = metric_unc.to_numpy()
if X is not None:
sens_indices = method.analyze(problem_sa, X, Y,
Expand Down Expand Up @@ -404,6 +536,21 @@ def _calc_sens_df(method, problem_sa, sensitivity_kwargs, param_labels, X, unc_d


def _si_param_first(param_labels, sens_indices):
"""Extract the first order sensivity indices from SALib ouput

Parameters
----------
param_labels : list(str)
name of the unceratinty input parameters
sens_indices : dict
sensitivity indidices dictionnary as produced by SALib

Returns
-------
si_names_first_order, param_names_first_order: list, list
Names of the sensivity indices of first order for all input parameters
and Parameter names for each sentivity index
"""
n_params = len(param_labels)

si_name_first_order_list = [
Expand All @@ -421,6 +568,21 @@ def _si_param_first(param_labels, sens_indices):


def _si_param_second(param_labels, sens_indices):
"""Extract second order sensitivity indices

Parameters
----------
param_labels : list(str)
name of the unceratinty input parameters
sens_indices : dict
sensitivity indidices dictionnary as produced by SALib

Returns
-------
si_names_second_order, param_names_second_order, param_names_second_order_2: list, list, list
Names of the sensivity indices of second order for all input parameters
and Pairs of parameter names for each 2nd order sentivity index
"""
n_params = len(param_labels)
si_name_second_order_list = [
key
Expand Down
Loading