Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New interpolation util functionality #930

Merged
merged 8 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
254 changes: 254 additions & 0 deletions climada/util/interpolation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
"""
This file is part of CLIMADA.

Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.

CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.

CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.

---

define fit functionalities for (local) exceedance frequencies and return periods
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this line supposed to be here? As it is now I don't find it very explanatory for an external user. It's a matter of taste I guess but maybe something around the lines of: "Define interpolation and extrapolation functions for calculating (local) exceedance frequencies and return periods."?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, I believe this was the description of the original PR, now it should be adapted as you say :)

"""


import logging

import numpy as np
from scipy import interpolate

from climada.util.value_representation import sig_dig_list

LOGGER = logging.getLogger(__name__)

def interpolate_ev(

Check warning on line 32 in climada/util/interpolation.py

View check run for this annotation

Jenkins - WCR / Pylint

too-many-arguments

LOW: Too many arguments (9/7)
Raw output
Used when a function or method takes too many arguments.
x_test,
x_train,
y_train,
logx = False,
logy = False,
x_threshold = None,
y_threshold = None,
extrapolation = False,
y_asymptotic = np.nan
):
"""
Util function to interpolate (and extrapolate) training data (x_train, y_train)
to new points x_test with several options (log scale, thresholds)

Parameters:
-------
x_test : array_like
1-D array of x-values for which training data should be interpolated
x_train : array_like
1-D array of x-values of training data
y_train : array_like
1-D array of y-values of training data
logx : bool, optional
If set to True, x_values are convert to log scale. Defaults to False.
logy : bool, optional
If set to True, x_values are convert to log scale. Defaults to False.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
If set to True, x_values are convert to log scale. Defaults to False.
If set to True, y_values are convert to log scale. Defaults to False.

x_threshold : float, optional
Lower threshold to filter x_train. Defaults to None.
y_threshold : float, optional
Lower threshold to filter y_train. Defaults to None.
extrapolation : bool, optional
If set to True, values will be extrapolated. If set to False, x_test values
smaller than x_train will be assigned y_train[0] (x_train must be sorted in
ascending order), and x_test values larger than x_train will be assigned
y_asymptotic. Defaults to False
y_asymptotic : float, optional
Return value and if extrapolation is True or x_train.size < 2, for x_test
values larger than x_train. Defaults to np.nan.

Returns
-------
np.array
interpolated values y_test for the test points x_test
"""

# preprocess interpolation data
x_test, x_train, y_train = _preprocess_interpolation_data(
x_test, x_train, y_train, logx, logy, x_threshold, y_threshold
)

# handle case of small training data sizes
if x_train.size < 2:
LOGGER.warning('Data is being extrapolated.')
return _interpolate_small_input(x_test, x_train, y_train, logy, y_asymptotic)

# calculate fill values
if extrapolation:
fill_value = 'extrapolate'
if np.min(x_test) < np.min(x_train) or np.max(x_test) > np.max(x_train):
LOGGER.warning('Data is being extrapolated.')
else:
if not all(sorted(x_train) == x_train):
raise ValueError('x_train array must be sorted in ascending order.')

Check warning on line 95 in climada/util/interpolation.py

View check run for this annotation

Jenkins - WCR / Code Coverage

Not covered line

Line 95 is not covered by tests
fill_value = (y_train[0], np.log10(y_asymptotic) if logy else y_asymptotic)

Check warning on line 96 in climada/util/interpolation.py

View check run for this annotation

Jenkins - WCR / Pylint

trailing-whitespace

LOW: Trailing whitespace
Raw output
Used when there is whitespace between the end of a line and the newline.

interpolation = interpolate.interp1d(
x_train, y_train, fill_value=fill_value, bounds_error=False)
y_test = interpolation(x_test)

# adapt output scale
if logy:
y_test = np.power(10., y_test)
return y_test

def stepfunction_ev(
x_test,
x_train,
y_train,
x_threshold = None,
y_threshold = None,
y_asymptotic = np.nan
):
"""
Util function to interpolate and extrapolate training data (x_train, y_train)
to new points x_test using a step function

Parameters:
-------
x_test : array_like
1-D array of x-values for which training data should be interpolated
x_train : array_like
1-D array of x-values of training data
y_train : array_like
1-D array of y-values of training data
x_threshold : float, optional
Lower threshold to filter x_train. Defaults to None.
y_threshold : float, optional
Lower threshold to filter y_train. Defaults to None.
y_asymptotic : float, optional
Return value if x_test > x_train. Defaults to np.nan.

Returns
-------
np.array
interpolated values y_test for the test points x_test
"""

# preprocess interpolation data
x_test, x_train, y_train = _preprocess_interpolation_data(
x_test, x_train, y_train, None, None, x_threshold, y_threshold
)

# handle case of small training data sizes
if x_train.size < 2:
return _interpolate_small_input(x_test, x_train, y_train, None, y_asymptotic)

# find indeces of x_test if sorted into x_train
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# find indeces of x_test if sorted into x_train
# find indices of x_test if sorted into x_train

if not all(sorted(x_train) == x_train):
raise ValueError('Input array x_train must be sorted in ascending order.')

Check warning on line 151 in climada/util/interpolation.py

View check run for this annotation

Jenkins - WCR / Code Coverage

Not covered line

Line 151 is not covered by tests
indx = np.searchsorted(x_train, x_test)
y_test = y_train[indx.clip(max = len(x_train) - 1)]
y_test[indx == len(x_train)] = y_asymptotic

return y_test

def _preprocess_interpolation_data(
x_test,
x_train,
y_train,
logx,
logy,
x_threshold,
y_threshold
):
"""
helper function to preprocess interpolation training and test data by filtering data below
thresholds and converting to log scale if required
"""

if x_train.shape != y_train.shape:
raise ValueError(f'Incompatible shapes of input data, x_train {x_train.shape} '

Check warning on line 173 in climada/util/interpolation.py

View check run for this annotation

Jenkins - WCR / Code Coverage

Not covered line

Line 173 is not covered by tests
f'and y_train {y_train.shape}. Should be the same')

# transform input to float arrays
x_test, x_train, y_train = (np.array(x_test).astype(float),
np.array(x_train).astype(float),
np.array(y_train).astype(float))

# cut x and y above threshold
if x_threshold or x_threshold==0:
x_th = np.asarray(x_train > x_threshold).squeeze()
x_train = x_train[x_th]
y_train = y_train[x_th]

if y_threshold or y_threshold==0:
y_th = np.asarray(y_train > y_threshold).squeeze()
x_train = x_train[y_th]
y_train = y_train[y_th]

# convert to log scale
if logx:
x_train, x_test = np.log10(x_train), np.log10(x_test)
if logy:
y_train = np.log10(y_train)

return (x_test, x_train, y_train)

def _interpolate_small_input(x_test, x_train, y_train, logy, y_asymptotic):
"""
helper function to handle if interpolation data is small (empty or one point)
"""
# return y_asymptotic if x_train and y_train empty
if x_train.size == 0:
return np.full_like(x_test, y_asymptotic)

# reconvert logarithmic y_train to original y_train
if logy:
y_train = np.power(10., y_train)

Check warning on line 210 in climada/util/interpolation.py

View check run for this annotation

Jenkins - WCR / Code Coverage

Not covered line

Line 210 is not covered by tests

# if only one (x_train, y_train), return stepfunction with
# y_train if x_test < x_train and y_asymtotic if x_test > x_train
y_test = np.full_like(x_test, y_train[0])
y_test[np.squeeze(x_test) > np.squeeze(x_train)] = y_asymptotic
return y_test

def group_frequency(frequency, value, n_sig_dig=2):
"""
Util function to aggregate (add) frequencies for equal values

Parameters:
------
frequency : array_like
Frequency array
value : array_like
Value array in ascending order
n_sig_dig : int
number of significant digits for value when grouping frequency.
Defaults to 2.

Returns:
------
tuple
(frequency array after aggregation,
unique value array in ascending order)
"""
frequency, value = np.array(frequency), np.array(value)
if frequency.size == 0 and value.size == 0:
return ([], [])

if len(value) != len(np.unique(sig_dig_list(value, n_sig_dig=n_sig_dig))):
#check ordering of value
if not all(sorted(value) == value):
raise ValueError('Value array must be sorted in ascending order.')
# add frequency for equal value
value, start_indices = np.unique(
sig_dig_list(value, n_sig_dig=n_sig_dig), return_index=True)
start_indices = np.insert(start_indices, len(value), len(frequency))
frequency = np.array([
sum(frequency[start_indices[i]:start_indices[i+1]])
for i in range(len(value))
])
return frequency, value
Loading