Skip to content

Commit

Permalink
add smooth functions
Browse files Browse the repository at this point in the history
  • Loading branch information
steinnymir committed Oct 18, 2023
1 parent 8c75446 commit 9a3677f
Show file tree
Hide file tree
Showing 5 changed files with 515 additions and 61 deletions.
95 changes: 34 additions & 61 deletions sed/calibrator/energy.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import xarray as xr
from bokeh.io import output_notebook
from bokeh.palettes import Category10 as ColorCycle
from dask.diagnostics import ProgressBar
from fastdtw import fastdtw
from IPython.display import display
from lmfit import Minimizer
Expand Down Expand Up @@ -2172,10 +2171,7 @@ def apply_energy_offset(
columns: Union[str, Sequence[str]],
signs: Union[int, Sequence[int]],
energy_column: str = None,
mode: Union[str, Sequence[str]] = "direct",
window: float = None,
sigma: float = 2,
rolling_group_channel: str = None,
reductions: Union[str, Sequence[str]] = None,
config: dict = None,
) -> Union[pd.DataFrame, dask.dataframe.DataFrame]:
"""Apply an energy shift to the given column(s).
Expand All @@ -2184,12 +2180,15 @@ def apply_energy_offset(
Args:
df (Union[pd.DataFrame, dask.dataframe.DataFrame]): Dataframe to use.
energy_column (str): Name of the column containing the energy values.
column_name (Union[str,Sequence[str]]): Name of the column(s) to apply the shift to.
sign (Union[int,Sequence[int]]): Sign of the shift to apply. (+1 or -1)
mode (str): The mode of the shift. One of 'direct', 'average' or rolled.
if rolled, window and sigma must be given.
config (dict): Configuration dictionary.
columns (Union[str, Sequence[str]]): Name of the column(s) to apply the shift to.
signs (Union[int, Sequence[int]]): Sign of the shift to apply. (+1 or -1)
energy_column (str, optional): Name of the column containing the energy values.
reduce (str): The reduction to apply to the column. If "rolled" it searches for columns with
suffix "_rolled", e.g. "sampleBias_rolled", as those generated by the
``SedProcessor.smooth_columns()`` function. Otherwise should be an available method of
dask.dataframe.Series. For example "mean". In this case the function is applied to the
column to generate a single value for the whole dataset. If None, the shift is applied
per-dataframe-row. Defaults to None.
**kwargs: Additional arguments for the rolling average function.
"""
if energy_column is None:
Expand All @@ -2200,58 +2199,32 @@ def apply_energy_offset(
columns = [columns]
if isinstance(signs, int):
signs = [signs]
# if isinstance(mode, str):
# mode = [mode] * len(columns)
if len(columns) != len(signs):
raise ValueError("column_name and sign must have the same length.")
with ProgressBar(
minimum=5,
):
if mode == "rolled":
if window is None:
if config is None:
raise ValueError("Either window or config must be given.")
window = config["dataframe"]["rolling_window"]
if sigma is None:
if config is None:
raise ValueError("Either sigma or config must be given.")
sigma = config["dataframe"]["rolling_sigma"]
if rolling_group_channel is None:
if config is None:
raise ValueError("Either rolling_group_channel or config must be given.")
rolling_group_channel = config["dataframe"]["rolling_group_channel"]
print("rolling averages...")
df = dfops.rolling_average_on_acquisition_time(
df,
rolling_group_channel=rolling_group_channel,
columns=columns,
window=window,
sigma=sigma,
)

if mode in ["rolled", "direct"]:

def apply_shift(x, cols, signs):
shifts = [x[c] * s for c, s in zip(cols, signs)]
shift = None
for s in shifts:
shift = shift + s if shift is not None else s
return x[energy_column] + shift

use_cols = columns if mode == "direct" else [c + "_rolled" for c in columns]
df[energy_column] = df.map_partitions(
apply_shift,
cols=use_cols,
signs=signs,
meta=(energy_column, np.float32),
)
elif mode == "mean":
with ProgressBar():
print("Computing means...")
means = dask.compute([df[c].mean() for c in columns])
df[energy_column] = df[energy_column] + signs * means
columns_ = []
reductions_ = []
to_roll = []

for c, r in zip(columns, reductions):
if r == "rolled":
cname = c + "_rolled"
if cname not in df.columns:
to_roll.append(cname)
else:
columns_.append(cname)
reductions_.append(None)
else:
raise ValueError(f"mode must be one of 'direct', 'mean' or 'rolled'. Got {mode}.")
columns_.append(c)
reductions_.append(r)
if len(to_roll) > 0:
raise RuntimeError(f"Columns {to_roll} have not been smoothed. please run `smooth_column`")

df = dfops.apply_offset_from_columns(
tartget_column=energy_column,
offset_columns=columns_,
signs=signs,
reductions=reductions_,
inplace=True,
)
metadata: Dict[str, Any] = {
"applied": True,
"energy_column": energy_column,
Expand Down
43 changes: 43 additions & 0 deletions sed/core/dfops.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,3 +253,46 @@ def rolling_average_on_acquisition_time(
if c + "_rolled" in df.columns:
df = df.drop(c + "_rolled", axis=1)
return df.merge(df_, left_on="timeStamp", right_on="ts").drop(["ts", "dt"], axis=1)


def apply_offset_from_columns(
df: Union[pd.DataFrame, dask.dataframe.DataFrame],
target_column: str,
offset_columns: Union[str, Sequence[str]],
signs: Union[int, Sequence[int]],
reductions: Union[str, Sequence[str]],
inplace: bool = True,
) -> Union[pd.DataFrame, dask.dataframe.DataFrame]:
"""Apply an offset to a column based on the values of other columns.
Args:
df (Union[pd.DataFrame, dask.dataframe.DataFrame]): Dataframe to use.
target_column (str): Name of the column to apply the offset to.
offset_columns (str): Name of the column(s) to use for the offset.
signs (int): Sign of the offset. Defaults to 1.
reductions (str): Reduction function to use for the offset. Defaults to "mean".
Returns:
Union[pd.DataFrame, dask.dataframe.DataFrame]: Dataframe with the new column.
"""
if isinstance(offset_columns, str):
offset_columns = [offset_columns]
if not inplace:
df[target_column + "_offset"] = df[target_column]
target_column = target_column + "_offset"
if reductions is None:
reductions = "mean"
if isinstance(reductions, str):
reductions = [reductions] * len(offset_columns)
if isinstance(signs, int):
signs = [signs]
if len(signs) != len(offset_columns):
raise ValueError("signs and offset_columns must have the same length!")

for col, sign, red in zip(offset_columns, signs):
assert col in df.columns, f"{col} not in dataframe!"
if red is not None:
df[target_column] = df[target_column] + sign * df[col].agg(red)
else:
df[target_column] = df[target_column] + sign * df[col]
return df
39 changes: 39 additions & 0 deletions sed/core/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import cast
from typing import Dict
from typing import List
from typing import Literal
from typing import Sequence
from typing import Tuple
from typing import Union
Expand All @@ -25,6 +26,7 @@
from sed.core.config import parse_config
from sed.core.config import save_config
from sed.core.dfops import apply_jitter
from sed.core.dfops import rolling_average_on_acquisition_time
from sed.core.metadata import MetaHandler
from sed.diagnostics import grid_histogram
from sed.io import to_h5
Expand Down Expand Up @@ -1340,6 +1342,43 @@ def add_jitter(self, cols: Sequence[str] = None):
metadata.append(col)
self._attributes.add(metadata, "jittering", duplicate_policy="append")

def smooth_columns(
self,
columns: Union[str, Sequence[str]] = None,
method: Literal["rolling"] = "rolling",
**kwargs,
) -> None:
"""Apply a filter along one or more columns of the dataframe.
Currently only supports rolling average on acquisition time.
Args:
columns (Union[str,Sequence[str]]): The colums onto which to apply the filter.
method (Literal['rolling'], optional): The filter method. Defaults to 'rolling'.
**kwargs: Keyword arguments passed to the filter method.
"""
if isinstance(columns, str):
columns = [columns]
for column in columns:
if column not in self._dataframe.columns:
raise ValueError(f"Cannot smooth {column}. Column not in dataframe!")
kwargs = {**self._config["smooth"], **kwargs}
if method == "rolling":
self._dataframe = rolling_average_on_acquisition_time(
df=self._dataframe,
rolling_group_channel=kwargs.get("rolling_group_channel", None),
columns=columns or kwargs.get("columns", None),
window=kwargs.get("window", None),
sigma=kwargs.get("sigma", None),
)
else:
raise ValueError(f"Method {method} not supported!")
self._attributes.add(
columns,
"smooth",
duplicate_policy="append",
)

def pre_binning(
self,
df_partitions: int = 100,
Expand Down
Loading

0 comments on commit 9a3677f

Please sign in to comment.