Skip to content

Commit

Permalink
refactor and move apply_energy_offset
Browse files Browse the repository at this point in the history
  • Loading branch information
steinnymir committed Oct 25, 2023
1 parent 5b866ca commit fa1cd5c
Show file tree
Hide file tree
Showing 4 changed files with 256 additions and 115 deletions.
237 changes: 144 additions & 93 deletions sed/calibrator/energy.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def __init__(
self.color_clip = self._config["energy"]["color_clip"]
self.sector_delays = self._config["dataframe"].get("sector_delays", None)
self.sector_id_column = self._config["dataframe"].get("sector_id_column", None)

self.offset: Dict[str, Any] = self._config["energy"].get("offset", {})
self.correction: Dict[Any, Any] = {}

@property
Expand Down Expand Up @@ -773,6 +773,26 @@ def view( # pylint: disable=dangerous-default-value

pbk.show(fig)

def get_current_calibration(self) -> dict:
"""Return the current calibration dictionary.
if none is present, return the one from the config. If none is present there,
return an empty dictionary.
Returns:
dict: Calibration dictionary.
"""
if self.calibration:
calibration = deepcopy(self.calibration)
else:
calibration = deepcopy(
self._config["energy"].get(
"calibration",
{},
),
)
return calibration

def append_energy_axis(
self,
df: Union[pd.DataFrame, dask.dataframe.DataFrame],
Expand Down Expand Up @@ -816,17 +836,7 @@ def append_energy_axis(
binwidth = kwds.pop("binwidth", self.binwidth)
binning = kwds.pop("binning", self.binning)

# pylint: disable=duplicate-code
if calibration is None:
if self.calibration:
calibration = deepcopy(self.calibration)
else:
calibration = deepcopy(
self._config["energy"].get(
"calibration",
{},
),
)
calibration = self.get_current_calibration()

for key, value in kwds.items():
calibration[key] = value
Expand Down Expand Up @@ -1413,14 +1423,9 @@ def align_dld_sectors(
self,
df: Union[pd.DataFrame, dask.dataframe.DataFrame],
**kwds,
# sector_delays: Sequence[float] = None,
# sector_id_column: str = None,
# tof_column: str = None,
) -> Tuple[Union[pd.DataFrame, dask.dataframe.DataFrame], dict]:
"""Aligns the time-of-flight axis of the different sections of a detector.
# TODO: move inside the ec class
Args:
df (Union[pd.DataFrame, dask.dataframe.DataFrame]): Dataframe to use.
Expand Down Expand Up @@ -1452,6 +1457,128 @@ def align_sector(x):
}
return df, metadata

def apply_energy_offset(
self,
df: Union[pd.DataFrame, dask.dataframe.DataFrame] = None,
constant: float = None,
columns: Union[str, Sequence[str]] = None,
signs: Union[int, Sequence[int]] = None,
subtract_mean: Union[bool, Sequence[bool]] = None,
energy_column: str = None,
reductions: Union[str, Sequence[str]] = None,
) -> Union[pd.DataFrame, dask.dataframe.DataFrame]:
"""Apply an energy shift to the given column(s).
If no parameter is passed to this function, the offset is applied as defined in the
config file. If parameters are passed, they are used to generate a new offset dictionary
and the offset is applied using the ``dfops.apply_offset_from_columns()`` function.
# TODO: This funcion can still be improved and needs testsing
Args:
df (Union[pd.DataFrame, dask.dataframe.DataFrame]): Dataframe to use.
constant (float, optional): The constant to shift the energy axis by.
columns (Union[str, Sequence[str]]): Name of the column(s) to apply the shift to.
signs (Union[int, Sequence[int]]): Sign of the shift to apply. (+1 or -1) A positive
sign shifts the energy axis to higher kinetic energies. Defaults to +1.
energy_column (str, optional): Name of the column containing the energy values.
reductions (str): The reduction to apply to the column. If "rolled" it searches for
columns with suffix "_rolled", e.g. "sampleBias_rolled", as those generated by the
``SedProcessor.smooth_columns()`` function. Otherwise should be an available method
of dask.dataframe.Series. For example "mean". In this case the function is applied
to the column to generate a single value for the whole dataset. If None, the shift
is applied per-dataframe-row. Defaults to None.
subtract_mean (bool): Whether to subtract the mean of the column before applying the
shift. Defaults to False.
**kwargs: Additional arguments for the rolling average function.
"""
if energy_column is None:
energy_column = self.energy_column
if columns is None:
# load from config
columns = []
signs = []
subtract_mean = []
reductions = []
for k, v in self.offset.items():
if k == "constant":
constant = v
print(f"Applying constant offset of {constant} to energy axis.")
else:
assert k in df.columns, f"Column {k} not found in dataframe."
columns.append(k)
signs.append(v.get("sign", 1))
subtract_mean.append(v.get("subtract_mean", False))
reductions.append(v.get("reduction", None))
s = "+" if signs[-1] > 0 else "-"
msg = f"Shifting {energy_column} by {s} {k}"
if subtract_mean[-1]:
msg += " and subtracting mean"
print(msg)
else:
# use passed parameters
if isinstance(columns, str):
columns = [columns]
if isinstance(signs, int):
signs = [signs]
if len(signs) != len(columns):
raise ValueError("signs and columns must have the same length.")
if isinstance(subtract_mean, bool):
subtract_mean = [subtract_mean] * len(columns)
if reductions is None:
reductions = [None] * len(columns)
# flip sign for binding energy scale
energy_scale = self.get_current_calibration().get("energy_scale", None)
if energy_scale == "binding":
signs = [-s for s in signs]
elif energy_scale == "kinetic":
pass
elif energy_scale is None:
raise ValueError("Energy scale not set. Please run `set_energy_scale` first.")
# check if columns have been smoothed
columns_: List[str] = []
reductions_: List[str] = []
to_roll: List[str] = []
for c, r in zip(columns, reductions):
if r == "rolled":
cname = c + "_rolled"
if cname not in df.columns:
to_roll.append(cname)
else:
columns_.append(cname)
reductions_.append(None)
else:
columns_.append(c)
reductions_.append(r)
if len(to_roll) > 0:
raise RuntimeError(
f"Columns {to_roll} have not been smoothed. please run `smooth_column`",
)
# apply offset
df = dfops.apply_offset_from_columns(
df=df,
target_column=energy_column,
offset_columns=columns_,
signs=signs,
subtract_mean=subtract_mean,
reductions=reductions_,
inplace=True,
)
# apply constant
if constant is not None:
df[energy_column] += constant

metadata: Dict[str, Any] = {
"applied": True,
"constant": constant,
"energy_column": energy_column,
"column_names": columns,
"signs": signs,
"subtract_mean": subtract_mean,
"reductions": reductions,
}
return df, metadata


def extract_bias(files: List[str], bias_key: str) -> np.ndarray:
"""Read bias values from hdf5 files
Expand Down Expand Up @@ -2219,79 +2346,3 @@ def tof2ns(
"""
val = t * 1e9 * binwidth * 2**binning
return val


def apply_energy_offset(
df: Union[pd.DataFrame, dask.dataframe.DataFrame],
columns: Union[str, Sequence[str]],
signs: Union[int, Sequence[int]],
subtract_mean: Union[bool, Sequence[bool]] = True,
energy_column: str = None,
reductions: Union[str, Sequence[str]] = None,
config: dict = None,
) -> Union[pd.DataFrame, dask.dataframe.DataFrame]:
"""Apply an energy shift to the given column(s).
# TODO: This funcion can still be improved and needs testsing
# TODO: move inside the ec class
Args:
df (Union[pd.DataFrame, dask.dataframe.DataFrame]): Dataframe to use.
columns (Union[str, Sequence[str]]): Name of the column(s) to apply the shift to.
signs (Union[int, Sequence[int]]): Sign of the shift to apply. (+1 or -1)
energy_column (str, optional): Name of the column containing the energy values.
reductions (str): The reduction to apply to the column. If "rolled" it searches for columns
with suffix "_rolled", e.g. "sampleBias_rolled", as those generated by the
``SedProcessor.smooth_columns()`` function. Otherwise should be an available method of
dask.dataframe.Series. For example "mean". In this case the function is applied to the
column to generate a single value for the whole dataset. If None, the shift is applied
per-dataframe-row. Defaults to None.
**kwargs: Additional arguments for the rolling average function.
"""
if energy_column is None:
if config is None:
raise ValueError("Either energy_column or config must be given.")
energy_column = config["dataframe"]["energy_column"]
if isinstance(columns, str):
columns = [columns]
if isinstance(signs, int):
signs = [signs]
if len(signs) != len(columns):
raise ValueError("signs and columns must have the same length.")
if isinstance(subtract_mean, bool):
subtract_mean = [subtract_mean] * len(columns)
if reductions is None:
reductions = [None] * len(columns)
columns_: List[str] = []
reductions_: List[str] = []
to_roll: List[str] = []
for c, r in zip(columns, reductions):
if r == "rolled":
cname = c + "_rolled"
if cname not in df.columns:
to_roll.append(cname)
else:
columns_.append(cname)
reductions_.append(None)
else:
columns_.append(c)
reductions_.append(r)
if len(to_roll) > 0:
raise RuntimeError(f"Columns {to_roll} have not been smoothed. please run `smooth_column`")

df = dfops.apply_offset_from_columns(
df=df,
target_column=energy_column,
offset_columns=columns_,
signs=signs,
subtract_mean=subtract_mean,
reductions=reductions_,
inplace=True,
)
metadata: Dict[str, Any] = {
"applied": True,
"energy_column": energy_column,
"column_names": columns,
"sign": signs,
}
return df, metadata
25 changes: 11 additions & 14 deletions sed/core/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

from sed.binning import bin_dataframe
from sed.calibrator import DelayCalibrator
from sed.calibrator import energy
from sed.calibrator import EnergyCalibrator
from sed.calibrator import MomentumCorrector
from sed.core.config import parse_config
Expand Down Expand Up @@ -1182,6 +1181,8 @@ def apply_energy_offset(
of dask.dataframe.Series. For example "mean". In this case the function is applied
to the column to generate a single value for the whole dataset. If None, the shift
is applied per-dataframe-row. Defaults to None.
subtract_mean (bool): Whether to subtract the mean of the column before applying the
shift. Defaults to False.
Raises:
ValueError: If the energy column is not in the dataframe.
"""
Expand All @@ -1192,19 +1193,15 @@ def apply_energy_offset(
"Run energy calibration first",
)
metadata = {}
if columns is not None:
self._dataframe, metadata = energy.apply_energy_offset(
df=self._dataframe,
columns=columns,
energy_column=energy_column,
signs=signs,
reductions=reductions,
subtract_mean=subtract_mean,
config=self._config,
)
if constant is not None:
self._dataframe[energy_column] += constant
metadata["offset"] = constant
self._dataframe, metadata = self.ec.apply_energy_offset(
df=self._dataframe,
constant=constant,
columns=columns,
energy_column=energy_column,
signs=signs,
reductions=reductions,
subtract_mean=subtract_mean,
)
if len(metadata) > 0:
self._attributes.add(
metadata,
Expand Down
Loading

0 comments on commit fa1cd5c

Please sign in to comment.