Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/unsequa multiprocessing #763

Merged
merged 34 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
8a325ae
Update pandas iteritems to items
Aug 3, 2023
b3d2f00
Change pd append to concat
Aug 3, 2023
0456873
Replace pathos with multiprocess for unsequa
Aug 3, 2023
4ab0972
Fix minimal chunksize at 1
Aug 3, 2023
85c5a28
Remove global matplotlib styles
Aug 3, 2023
9bc4589
Remove deprecated tot_value
Aug 3, 2023
fb0e361
Adjust tests remove tot_valu
Aug 3, 2023
91c4f1e
Remove tot_value from unsequa
Aug 3, 2023
c36f0f6
Add docstring for processes
Aug 3, 2023
416e949
Add changelog details
Aug 4, 2023
301c0b0
Merge branch 'develop' into feature/unsequa_multiprocessing
emanuel-schmid Aug 4, 2023
429fe76
Add chunked version of parallel computing.
Aug 21, 2023
13a4a9f
Make chunksize more efficient by default
Aug 21, 2023
8c45b49
Update docstring
Aug 22, 2023
4e77b86
Change cost benefit to use chunks in parallel
Aug 22, 2023
c678791
Merge branch 'develop' into feature/unsequa_multiprocessing
Aug 23, 2023
2d6130c
Remove deprecated numpy vstack of objects
Aug 24, 2023
a21abe3
Feature/order samples unsequa (#766)
chahank Aug 24, 2023
e869f95
Update climada/engine/unsequa/calc_impact.py
chahank Aug 25, 2023
7fb0835
Update climada/engine/unsequa/calc_cost_benefit.py
chahank Aug 25, 2023
6ad26a0
Make sample iterator to single function
Aug 25, 2023
d67651e
Add comment on copy statement
Aug 25, 2023
afb8147
Remove not needed parallel pool chunksize argument
Aug 25, 2023
9ca7081
Make chunksize base function
Aug 25, 2023
ede1c56
Make transpose data function
Aug 25, 2023
6e7d24f
Import pathos instead of multiprocessing
Aug 25, 2023
261415c
Make method for uncertainty computation step
Aug 25, 2023
f90a0b4
Improve docstrings
Aug 25, 2023
15b5579
Improve docstring
Aug 28, 2023
abcc548
Add description of parallelization logic
Aug 28, 2023
b6bf301
Update climada/engine/unsequa/calc_base.py
chahank Aug 28, 2023
e324fbc
Update climada/engine/unsequa/calc_base.py
chahank Aug 28, 2023
01d5a85
Update climada/engine/unsequa/calc_impact.py
chahank Aug 28, 2023
6b6512f
Update docstrings
Aug 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Removed:
- Added method `Exposures.centroids_total_value` to replace the functionality of `Exposures.affected_total_value`. This method is temporary and deprecated. [#702](https://github.com/CLIMADA-project/climada_python/pull/702)
- New method `climada.util.api_client.Client.purge_cache`: utility function to remove outdated files from the local file system to free disk space.
([#737](https://github.com/CLIMADA-project/climada_python/pull/737))
- Added advanced examples in unsequa tutorial for coupled input variables and for handling efficiently the loading of multiple large files [#766](https://github.com/CLIMADA-project/climada_python/pull/766)

### Changed

Expand Down Expand Up @@ -57,6 +58,7 @@ Removed:
- `list_dataset_infos` from `climada.util.api_client.Client`: the `properties` argument, a `dict`, can now have `None` as values. Before, only strings and lists of strings were allowed. Setting a particular property to `None` triggers a search for datasets where this property is not assigned. [#752](https://github.com/CLIMADA-project/climada_python/pull/752)
- Reduce memory requirements of `TropCyclone.from_tracks` [#749](https://github.com/CLIMADA-project/climada_python/pull/749)
- Support for different wind speed and pressure units in `TCTracks` when running `TropCyclone.from_tracks` [#749](https://github.com/CLIMADA-project/climada_python/pull/749)
- Changed the parallel package from Pathos to Multiproess in the unsequa module [#763](https://github.com/CLIMADA-project/climada_python/pull/763)

### Fixed

Expand All @@ -65,6 +67,9 @@ Removed:
- Correctly handle assertion errors in `Centroids.values_from_vector_files` and fix the associated test [#768](https://github.com/CLIMADA-project/climada_python/pull/768/)
- Text in `Forecast` class plots can now be adjusted [#769](https://github.com/CLIMADA-project/climada_python/issues/769)
- `Impact.impact_at_reg` now supports impact matrices where all entries are zero [#773](https://github.com/CLIMADA-project/climada_python/pull/773)
- upgrade pathos 0.3.0 -> 0.3.1 issue [#761](https://github.com/CLIMADA-project/climada_python/issues/761) (for unsequa module [#763](https://github.com/CLIMADA-project/climada_python/pull/763))
- Fix bugs with pandas 2.0 (iteritems -> items, append -> concat) (fix issue [#700](https://github.com/CLIMADA-project/climada_python/issues/700) for unsequa module) [#763](https://github.com/CLIMADA-project/climada_python/pull/763))
- Remove matplotlib styles in unsequa module (fixes issue [#758](https://github.com/CLIMADA-project/climada_python/issues/758)) [#763](https://github.com/CLIMADA-project/climada_python/pull/763)

### Deprecated

Expand All @@ -77,6 +82,7 @@ Removed:
- `Centroids.set_raster_from_pix_bounds` [#721](https://github.com/CLIMADA-project/climada_python/pull/721)
- `requirements/env_developer.yml` environment specs. Use 'extra' requirements when installing the Python package instead [#712](https://github.com/CLIMADA-project/climada_python/pull/712)
- `Impact.tag` attribute. This change is not backwards-compatible with respect to the files written and read by the `Impact` class [#743](https://github.com/CLIMADA-project/climada_python/pull/743)
- `impact.tot_value ` attribute removed from unsequa module [#763](https://github.com/CLIMADA-project/climada_python/pull/763)

## v3.3.2

Expand Down
33 changes: 29 additions & 4 deletions climada/engine/unsequa/calc_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import logging
import copy
import itertools

import datetime as dt

Expand Down Expand Up @@ -126,7 +127,7 @@ def distr_dict(self):
distr_dict.update(input_var.distr_dict)
return distr_dict

def est_comp_time(self, n_samples, time_one_run, pool=None):
def est_comp_time(self, n_samples, time_one_run, processes=None):
"""
Estimate the computation time

Expand Down Expand Up @@ -154,8 +155,7 @@ def est_comp_time(self, n_samples, time_one_run, pool=None):
"\n If computation cannot be reduced, consider using"
" a surrogate model https://www.uqlab.com/", time_one_run)

ncpus = pool.ncpus if pool else 1
total_time = n_samples * time_one_run / ncpus
total_time = n_samples * time_one_run / processes
LOGGER.info("\n\nEstimated computaion time: %s\n",
dt.timedelta(seconds=total_time))

Expand Down Expand Up @@ -354,11 +354,36 @@ def sensitivity(self, unc_output, sensitivity_method = 'sobol',

return sens_output

def _sample_parallel_iterator(self, samples, chunksize, **kwargs):
chahank marked this conversation as resolved.
Show resolved Hide resolved
"""
Make iterator over rows of dataframe plus repeated kwargs for parallel computing

Parameters
----------
samples : pd.DataFrame
Dataframe of samples
**kwargs : arguments to repeat
Arguments to repeat for parallel computations

Returns
-------
iterator
suitable for methods _map_impact_calc and _map_costben_calc

"""
return zip(
_chunker(samples, chunksize),
*(itertools.repeat(item) for item in kwargs.values())
)

def _chunker(seq, size):
for pos in range(0, len(seq), size):
yield seq.iloc[pos:pos + size]

def _calc_sens_df(method, problem_sa, sensitivity_kwargs, param_labels, X, unc_df):
sens_first_order_dict = {}
sens_second_order_dict = {}
for (submetric_name, metric_unc) in unc_df.iteritems():
for (submetric_name, metric_unc) in unc_df.items():
Y = metric_unc.to_numpy()
if X is not None:
sens_indices = method.analyze(problem_sa, X, Y,
Expand Down
176 changes: 113 additions & 63 deletions climada/engine/unsequa/calc_cost_benefit.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@

import logging
import time
from functools import partial
from typing import Optional, Union
import itertools

from typing import Optional, Union
import pandas as pd
import numpy as np
import multiprocess as mp
# use multiprocess fork of multiprocessing because of https://stackoverflow.com/a/65001152/12454103

from climada.engine.cost_benefit import CostBenefit
from climada.engine.unsequa import Calc, InputVar, UncCostBenefitOutput
Expand All @@ -50,13 +53,13 @@ class CalcCostBenefit(Calc):
----------
value_unit : str
Unit of the exposures value
haz_input_var : climada.engine.uncertainty.input_var.InputVar
haz_input_var : InputVar or Hazard
Present Hazard uncertainty variable
ent_input_var : climada.engine.uncertainty.input_var.InputVar
ent_input_var : InputVar or Entity
Present Entity uncertainty variable
haz_unc_fut_Var: climada.engine.uncertainty.input_var.InputVar
haz_unc_fut_Var: InputVar or Hazard
Future Hazard uncertainty variable
ent_fut_input_var : climada.engine.uncertainty.input_var.InputVar
ent_fut_input_var : InputVar or Entity
Future Entity uncertainty variable
_input_var_names : tuple(str)
Names of the required uncertainty variables
Expand Down Expand Up @@ -127,7 +130,11 @@ def __init__(



def uncertainty(self, unc_data, pool=None, **cost_benefit_kwargs):
def uncertainty(self,
peanutfun marked this conversation as resolved.
Show resolved Hide resolved
unc_sample,
processes=1,
chunksize=None,
**cost_benefit_kwargs):
"""
Computes the cost benefit for each sample in unc_output.sample_df.

Expand All @@ -145,13 +152,17 @@ def uncertainty(self, unc_data, pool=None, **cost_benefit_kwargs):

Parameters
----------
unc_data : climada.engine.uncertainty.unc_output.UncOutput
unc_sample : climada.engine.uncertainty.unc_output.UncOutput
Uncertainty data object with the input parameters samples
pool : pathos.pools.ProcessPool, optional
Pool of CPUs for parralel computations. Default is None.
The default is None.
processes : int, optional
Number of CPUs to use for parralel computations.
The default is 1 (not parallel)
cost_benefit_kwargs : keyword arguments
Keyword arguments passed on to climada.engine.CostBenefit.calc()
chunksize: int, optional
Size of the sample chunks for parallel processing.
Default is equal to the number of samples divided by the
number of processes.

Returns
-------
Expand All @@ -168,52 +179,81 @@ def uncertainty(self, unc_data, pool=None, **cost_benefit_kwargs):
See Also
--------
climada.engine.cost_benefit:
Compute risk and adptation option cost benefits.
compute risk and adptation option cost benefits.

"""

if unc_data.samples_df.empty:
if unc_sample.samples_df.empty:
raise ValueError("No sample was found. Please create one first" +
"using UncImpact.make_sample(N)")

samples_df = unc_data.samples_df.copy(deep=True)
chunksize = np.ceil(
unc_sample.samples_df.shape[0] / processes
).astype(int) if chunksize is None else chunksize

peanutfun marked this conversation as resolved.
Show resolved Hide resolved
samples_df = unc_sample.samples_df.copy(deep=True)
chahank marked this conversation as resolved.
Show resolved Hide resolved
unit = self.value_unit

LOGGER.info("The freq_curve is not saved. Please "
"change the risk_func (see climada.engine.cost_benefit) "
"if return period information is needed")

start = time.time()
one_sample = samples_df.iloc[0:1].iterrows()
cb_metrics = map(self._map_costben_calc, one_sample)
one_sample = samples_df.iloc[0:1]
p_iterator = self._sample_parallel_iterator(
one_sample,
chunksize=chunksize,
ent_input_var=self.ent_input_var,
haz_input_var=self.haz_input_var,
ent_fut_input_var=self.ent_fut_input_var,
haz_fut_input_var=self.haz_fut_input_var,
cost_benefit_kwargs=cost_benefit_kwargs
)
cb_metrics = itertools.starmap(_map_costben_calc, p_iterator)
[imp_meas_present,
imp_meas_future,
tot_climate_risk,
benefit,
cost_ben_ratio] = list(zip(*cb_metrics))
cost_ben_ratio] = [
list(itertools.chain.from_iterable(x))
for x in list(zip(*cb_metrics))
]
chahank marked this conversation as resolved.
Show resolved Hide resolved
elapsed_time = (time.time() - start)
self.est_comp_time(unc_data.n_samples, elapsed_time, pool)
self.est_comp_time(unc_sample.n_samples, elapsed_time, processes)

#Compute impact distributions
with log_level(level='ERROR', name_prefix='climada'):
if pool:
LOGGER.info('Using %s CPUs.', pool.ncpus)
chunksize = max(min(unc_data.n_samples // pool.ncpus, 100), 1)
cb_metrics = pool.map(partial(self._map_costben_calc, **cost_benefit_kwargs),
samples_df.iterrows(),
chunksize=chunksize)

p_iterator = self._sample_parallel_iterator(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Make the entire iterator-computation-datamerge part a (local?) function that only takes a dataframe and the process number as arguments. Then you can pass the first DF row with processes=1 to estimate the compute time and afterwards pass the rest of the dataframe with processes=processes. No need to double the code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 261415c

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With "local" I meant a function that is defined (only) in the scope of uncertainty. This way, it would need fewer parameters. But it works this way nonetheless

samples_df,
chunksize=chunksize,
ent_input_var=self.ent_input_var,
haz_input_var=self.haz_input_var,
ent_fut_input_var=self.ent_fut_input_var,
haz_fut_input_var=self.haz_fut_input_var,
cost_benefit_kwargs=cost_benefit_kwargs
)
if processes>1:
with mp.Pool(processes=processes) as pool:
chahank marked this conversation as resolved.
Show resolved Hide resolved
LOGGER.info('Using %s CPUs.', processes)
chunksize = min(unc_sample.n_samples // processes + 1, 100)
chahank marked this conversation as resolved.
Show resolved Hide resolved
cb_metrics = pool.starmap(
_map_costben_calc, p_iterator, chunksize = chunksize
)
else:
cb_metrics = map(partial(self._map_costben_calc, **cost_benefit_kwargs),
samples_df.iterrows())
cb_metrics = itertools.starmap(
_map_costben_calc, p_iterator
)

#Perform the actual computation
with log_level(level='ERROR', name_prefix='climada'):
[imp_meas_present,
imp_meas_future,
tot_climate_risk,
benefit,
cost_ben_ratio] = list(zip(*cb_metrics)) #Transpose list of list
cost_ben_ratio] = [
list(itertools.chain.from_iterable(x))
for x in list(zip(*cb_metrics))
]

# Assign computed impact distribution data to self
tot_climate_risk_unc_df = \
Expand Down Expand Up @@ -248,8 +288,10 @@ def uncertainty(self, unc_data, pool=None, **cost_benefit_kwargs):
in zip(imp_metric_names, metrics)
}
met_dic.update(dic_tmp)
df_imp_meas = df_imp_meas.append(
pd.DataFrame(met_dic), ignore_index=True
df_imp_meas = pd.concat(
[df_imp_meas, pd.DataFrame(met_dic)],
ignore_index=True,
sort=False
chahank marked this conversation as resolved.
Show resolved Hide resolved
)
im_periods[name + '_unc_df'] = df_imp_meas
cost_benefit_kwargs = {
Expand All @@ -266,48 +308,56 @@ def uncertainty(self, unc_data, pool=None, **cost_benefit_kwargs):
unit=unit,
cost_benefit_kwargs=cost_benefit_kwargs)

def _map_costben_calc(self, param_sample, **kwargs):
"""
Map to compute cost benefit for all parameter samples in parallel

Parameters
----------
param_sample : pd.DataFrame.iterrows()
Generator of the parameter samples
kwargs :
Keyword arguments passed on to climada.engine.CostBenefit.calc()

Returns
-------
list
icost benefit metrics list for all samples containing
imp_meas_present, imp_meas_future, tot_climate_risk,
benefit, cost_ben_ratio
def _map_costben_calc(
sample_chunks, ent_input_var, haz_input_var,
ent_fut_input_var, haz_fut_input_var, cost_benefit_kwargs
):
"""
Map to compute cost benefit for all parameter samples in parallel

"""
Parameters
----------
sample_chunks : pd.DataFrame
Dataframe of the parameter samples
kwargs :
Keyword arguments passed on to climada.engine.CostBenefit.calc()

Returns
-------
list
icost benefit metrics list for all samples containing
imp_meas_present, imp_meas_future, tot_climate_risk,
benefit, cost_ben_ratio

"""

# [1] only the rows of the dataframe passed by pd.DataFrame.iterrows()
haz_samples = param_sample[1][self.haz_input_var.labels].to_dict()
ent_samples = param_sample[1][self.ent_input_var.labels].to_dict()
haz_fut_samples = param_sample[1][self.haz_fut_input_var.labels].to_dict()
ent_fut_samples = param_sample[1][self.ent_fut_input_var.labels].to_dict()
uncertainty_values = []
for _, sample in sample_chunks.iterrows():
haz_samples = sample[haz_input_var.labels].to_dict()
ent_samples = sample[ent_input_var.labels].to_dict()
haz_fut_samples = sample[haz_fut_input_var.labels].to_dict()
ent_fut_samples = sample[ent_fut_input_var.labels].to_dict()

haz = self.haz_input_var.evaluate(**haz_samples)
ent = self.ent_input_var.evaluate(**ent_samples)
haz_fut = self.haz_fut_input_var.evaluate(**haz_fut_samples)
ent_fut = self.ent_fut_input_var.evaluate(**ent_fut_samples)
haz = haz_input_var.evaluate(**haz_samples)
ent = ent_input_var.evaluate(**ent_samples)
haz_fut = haz_fut_input_var.evaluate(**haz_fut_samples)
ent_fut = ent_fut_input_var.evaluate(**ent_fut_samples)

cb = CostBenefit()
ent.exposures.assign_centroids(haz, overwrite=False)
if ent_fut:
ent_fut.exposures.assign_centroids(haz_fut if haz_fut else haz, overwrite=False)
cb.calc(hazard=haz, entity=ent, haz_future=haz_fut, ent_future=ent_fut,
save_imp=False, assign_centroids=False, **kwargs)

save_imp=False, assign_centroids=False, **cost_benefit_kwargs)
# Extract from climada.impact the chosen metrics
return [cb.imp_meas_present,
cb.imp_meas_future,
cb.tot_climate_risk,
cb.benefit,
cb.cost_ben_ratio
]
uncertainty_values.append([
cb.imp_meas_present,
cb.imp_meas_future,
cb.tot_climate_risk,
cb.benefit,
cb.cost_ben_ratio
])

return list(zip(*uncertainty_values))
chahank marked this conversation as resolved.
Show resolved Hide resolved
Loading