Skip to content

Commit

Permalink
refactor PositiveOutput for performances
Browse files Browse the repository at this point in the history
  • Loading branch information
CyrilJl committed Jan 10, 2025
1 parent 6896eb4 commit bec2e21
Showing 1 changed file with 101 additions and 78 deletions.
179 changes: 101 additions & 78 deletions timefiller/_positive_output_transformer.py
Original file line number Diff line number Diff line change
@@ -1,124 +1,147 @@
import numpy as np
import pandas as pd
from sklearn.base import TransformerMixin
from numba import njit, prange
from sklearn.base import BaseEstimator, TransformerMixin

__all__ = ["PositiveOutput"]


class PositiveOutput(TransformerMixin):
"""
A transformer that applies negative expansion to data based on a threshold.
Args:
q (float, optional): The quantile used as a threshold for expansion. Default is `10`,
which means the 10th percentile is used as the threshold. If `v` is provided,
`q` is ignored.
v (float, optional): A fixed value used as a threshold for negative expansion.
If provided, this threshold will be used for all features. Default is `None`,
which means the threshold is automatically calculated from the data.
columns (list, optional): List of column names to process if the input is a DataFrame.
If `None`, all columns will be processed. Default is `None`.
Raises:
ValueError: If both `q` and `v` are `None`.
class PositiveOutput(TransformerMixin, BaseEstimator):
"""Transforms values below a threshold by extending them into the negative domain.
This class transforms values in an array or DataFrame that are below a certain threshold
by extending them into the negative domain. Values greater than or equal to the threshold
remain unchanged.
Attributes:
q (int or float, optional): The percentile used to calculate the thresholds. Default is 10.
v (float, optional): The fixed threshold value to use. Default is None.
thresholds_ (np.ndarray): The calculated or fixed thresholds for each column.
"""

def __init__(self, q=10, v=None, columns=None):
if q is None and v is None:
raise ValueError("At least one of the arguments 'q' or 'v' must be different from None.")
if q is not None and (q < 0 or q > 100):
raise ValueError("The quantile must be between 0 and 100.")
"""Initializes the PositiveOutput object.
Args:
q (int or float, optional): The percentile used to calculate the thresholds. Default is 10.
v (float, optional): The fixed threshold value to use. Default is None.
Raises:
ValueError: If both `q` and `v` arguments are None.
"""
if q is None and v is None:
raise ValueError("At least one of the arguments 'q' or 'v' must be provided.")
self.q = q
self.v = v
self.columns = columns
self.thresholds_ = None
self.thresholds_ = v

def fit(self, X, y=None):
"""
Calculate and store the thresholds necessary for negative expansion.
"""Computes the thresholds from the input data.
Args:
X (array-like or DataFrame): The training data. Must not contain negative values.
y (array-like, optional): The training labels. Not used in this method.
X (np.ndarray or pd.DataFrame): The input data.
y (ignored): Not used, present for compatibility with the scikit-learn API.
Returns:
PositiveOutput: The fitted instance of the transformer.
self: The instance of the PositiveOutput object.
Raises:
ValueError: If the data contains negative values.
"""
if isinstance(X, pd.DataFrame):
if self.columns is not None:
X_subset = X[self.columns]
self.columns_ = self.columns
else:
X_subset = X
self.columns_ = list(X.columns)
if isinstance(X, np.ndarray):
X_subset = X

if np.nanmin(X_subset) < 0:
if np.nanmin(X) < 0:
raise ValueError("The data must not contain negative values.")

if self.v is None:
self.thresholds_ = np.nanpercentile(X_subset, q=self.q, axis=0)
else:
self.thresholds_ = np.full(shape=X_subset.shape[1], fill_value=self.v)
if isinstance(X, np.ndarray):
if self.v is None:
self.thresholds_ = np.nanpercentile(X, q=self.q, axis=0)
else:
self.thresholds_ = np.full(shape=X.shape[1], fill_value=self.v)
if isinstance(X, pd.DataFrame):
if self.columns is None:
if self.v is None:
self.thresholds_ = X.quantile(q=self.q / 100.0).values
else:
self.thresholds_ = pd.Series(data=self.v, index=X.columns).values
else:
if self.v is None:
self.thresholds_ = X[self.columns].quantile(q=self.q / 100.0).values
else:
self.thresholds_ = pd.Series(data=self.v, index=self.columns).values
return self

@staticmethod
@njit(parallel=True, boundscheck=False, fastmath=True, cache=True)
def transform_numpy(X, thresholds):
result = np.empty_like(X)
if isinstance(thresholds, (float, int)):
thresholds = np.full(shape=X.shape[1], fill_value=thresholds)
for i in prange(X.shape[0]):
for j in range(X.shape[1]):
if X[i, j] < thresholds[j]:
result[i, j] = 2 * X[i, j] - thresholds[j]
else:
result[i, j] = X[i, j]
return result

@staticmethod
@njit(parallel=True, boundscheck=False, fastmath=True, cache=True)
def inverse_transform_numpy(X, thresholds):
result = np.empty_like(X)
if isinstance(thresholds, (float, int)):
thresholds = np.full(shape=X.shape[1], fill_value=thresholds)
for i in prange(X.shape[0]):
for j in range(X.shape[1]):
if X[i, j] < thresholds[j]:
result[i, j] = 0.5 * X[i, j] + 0.5 * thresholds[j]
else:
result[i, j] = X[i, j]
return np.maximum(0, result)

def transform(self, X, y=None):
"""
Apply negative expansion on the data.
"""Transforms the data by extending values below the threshold.
Args:
X (array-like or DataFrame): The data to transform.
y (array-like, optional): The labels. Not used in this method.
X (np.ndarray or pd.DataFrame): The data to transform.
y (ignored): Not used, present for compatibility with the scikit-learn API.
Returns:
array-like or DataFrame: The transformed data with negative expansion.
np.ndarray or pd.DataFrame: The transformed data.
"""
if isinstance(X, np.ndarray):
return self.transform_numpy(X, self.thresholds_)
if isinstance(X, pd.DataFrame):
if self.columns is not None:
X_subset = X[self.columns]
a = X.drop(columns=self.columns)
b = pd.DataFrame(
self.transform_numpy(X[self.columns].values, self.thresholds_), index=X.index, columns=self.columns
)
return pd.concat([a, b], axis=1)[X.columns]
else:
X_subset = X
else:
X_subset = X

transformed = np.where(X_subset < self.thresholds_, 2 * X_subset - self.thresholds_, X_subset)

if isinstance(X, pd.DataFrame):
X_transformed = X.copy()
X_transformed[self.columns_] = transformed
return X_transformed
else:
return transformed
return pd.DataFrame(self.transform_numpy(X.values, self.thresholds_), index=X.index, columns=X.columns)

def inverse_transform(self, X, y=None):
"""
Reverse the negative expansion on the transformed data.
"""Reverses the transformation by bringing the extended values back into the positive domain.
Args:
X (array-like or DataFrame): The transformed data to invert.
y (array-like, optional): The labels. Not used in this method.
X (np.ndarray or pd.DataFrame): The data to reverse.
y (ignored): Not used, present for compatibility with the scikit-learn API.
Returns:
array-like or DataFrame: The original data after reversing the negative expansion.
np.ndarray or pd.DataFrame: The reversed data.
"""
if isinstance(X, np.ndarray):
return self.inverse_transform_numpy(X, self.thresholds_)
if isinstance(X, pd.DataFrame):
if self.columns is not None:
X_subset = X[self.columns]
a = X.drop(columns=self.columns)
b = pd.DataFrame(
self.inverse_transform_numpy(X[self.columns].values, self.thresholds_),
index=X.index,
columns=self.columns,
)
return pd.concat([a, b], axis=1)[X.columns]
else:
X_subset = X
else:
X_subset = X

inverted = np.maximum(0, np.where(X_subset < self.thresholds_, 0.5 * X_subset + self.thresholds_ / 2, X_subset))

if isinstance(X, pd.DataFrame):
X_inverted = X.copy()
X_inverted[self.columns_] = inverted
return X_inverted
else:
return inverted
return pd.DataFrame(
self.inverse_transform_numpy(X.values, self.thresholds_), index=X.index, columns=X.columns
)

0 comments on commit bec2e21

Please sign in to comment.