Skip to content

Commit

Permalink
apply suggested changes and remove rolling avg
Browse files Browse the repository at this point in the history
  • Loading branch information
steinnymir committed Oct 30, 2023
1 parent 050f89c commit 61045d0
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 168 deletions.
81 changes: 36 additions & 45 deletions sed/calibrator/energy.py
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,10 @@ def append_tof_ns_axis(
time-of-flight steps. Defaults to config["dataframe"]["tof_column"].
tof_ns_column (str, optional): Name of the column to store the
time-of-flight in nanoseconds. Defaults to config["dataframe"]["tof_ns_column"].
binwidth (float, optional): Time-of-flight binwidth in ns.
Defaults to config["energy"]["binwidth"].
binning (int, optional): Time-of-flight binning factor.
Defaults to config["energy"]["binning"].
Returns:
dask.dataframe.DataFrame: Dataframe with the new columns.
Expand All @@ -926,8 +930,6 @@ def append_tof_ns_axis(

if tof_ns_column is None:
tof_ns_column = self.tof_ns_column
if tof_ns_column is None:
raise AttributeError("tof_ns_column not set!")

df[tof_ns_column] = tof2ns(
binwidth,
Expand Down Expand Up @@ -1423,26 +1425,34 @@ def gather_correction_metadata(self, correction: dict = None) -> dict:
def align_dld_sectors(
self,
df: Union[pd.DataFrame, dask.dataframe.DataFrame],
**kwds,
tof_column: str = None,
sector_id_column: str = None,
sector_delays: np.ndarray = None,
) -> Tuple[Union[pd.DataFrame, dask.dataframe.DataFrame], dict]:
"""Aligns the time-of-flight axis of the different sections of a detector.
Args:
df (Union[pd.DataFrame, dask.dataframe.DataFrame]): Dataframe to use.
tof_column (str, optional): Name of the column containing the time-of-flight values.
Defaults to config["dataframe"]["tof_column"].
sector_id_column (str, optional): Name of the column containing the sector id values.
Defaults to config["dataframe"]["sector_id_column"].
sector_delays (np.ndarray, optional): Array containing the sector delays. Defaults to
config["dataframe"]["sector_delays"].
Returns:
Returns:
dask.dataframe.DataFrame: Dataframe with the new columns.
dict: Metadata dictionary.
"""
sector_delays = kwds.pop("sector_delays", self.sector_delays)
sector_id_column = kwds.pop("sector_id_column", self.sector_id_column)
sector_delays = sector_delays or self.sector_delays
sector_id_column = sector_id_column or self.sector_id_column

if sector_delays is None or sector_id_column is None:
raise ValueError(
"No value for sector_delays or sector_id_column found in config."
"config file is not properly configured for dld sector correction.",
"Config file is not properly configured for dld sector correction.",
)
tof_column = kwds.pop("tof_column", self.tof_column)
tof_column = tof_column or self.tof_column

# align the 8s sectors
sector_delays_arr = dask.array.from_array(sector_delays)
Expand All @@ -1467,8 +1477,8 @@ def apply_energy_offset(
subtract_mean: Union[bool, Sequence[bool]] = None,
energy_column: str = None,
reductions: Union[str, Sequence[str]] = None,
) -> Union[pd.DataFrame, dask.dataframe.DataFrame]:
"""Apply an energy shift to the given column(s).
) -> Tuple[Union[pd.DataFrame, dask.dataframe.DataFrame], dict]:
"""Apply an offset to the energy column by the values of the provided columns.
If no parameter is passed to this function, the offset is applied as defined in the
config file. If parameters are passed, they are used to generate a new offset dictionary
Expand All @@ -1479,19 +1489,16 @@ def apply_energy_offset(
Args:
df (Union[pd.DataFrame, dask.dataframe.DataFrame]): Dataframe to use.
constant (float, optional): The constant to shift the energy axis by.
columns (Union[str, Sequence[str]]): Name of the column(s) to apply the shift to.
columns (Union[str, Sequence[str]]): Name of the column(s) to apply the shift from.
signs (Union[int, Sequence[int]]): Sign of the shift to apply. (+1 or -1) A positive
sign shifts the energy axis to higher kinetic energies. Defaults to +1.
energy_column (str, optional): Name of the column containing the energy values.
reductions (str): The reduction to apply to the column. If "rolled" it searches for
columns with suffix "_rolled", e.g. "sampleBias_rolled", as those generated by the
``SedProcessor.smooth_columns()`` function. Otherwise should be an available method
reductions (str): The reduction to apply to the column. Should be an available method
of dask.dataframe.Series. For example "mean". In this case the function is applied
to the column to generate a single value for the whole dataset. If None, the shift
is applied per-dataframe-row. Defaults to None.
subtract_mean (bool): Whether to subtract the mean of the column before applying the
shift. Defaults to False.
**kwargs: Additional arguments for the rolling average function.
"""
if energy_column is None:
energy_column = self.energy_column
Expand All @@ -1506,16 +1513,12 @@ def apply_energy_offset(
constant = v
print(f"Applying constant offset of {constant} to energy axis.")
else:
assert k in df.columns, f"Column {k} not found in dataframe."
if k not in df.columns:
raise KeyError(f"Column {k} not found in dataframe.")
columns.append(k)
signs.append(v.get("sign", 1))
subtract_mean.append(v.get("subtract_mean", False))
reductions.append(v.get("reduction", None))
s = "+" if signs[-1] > 0 else "-"
msg = f"Shifting {energy_column} by {s} {k}"
if subtract_mean[-1]:
msg += " and subtracting mean"
print(msg)
else:
# use passed parameters
if columns is not None and (signs is None or subtract_mean is None):
Expand All @@ -1539,34 +1542,16 @@ def apply_energy_offset(
elif energy_scale == "kinetic":
pass
elif energy_scale is None:
raise ValueError("Energy scale not set. Please run `set_energy_scale` first.")
# check if columns have been smoothed
columns_: List[str] = []
reductions_: List[str] = []
to_roll: List[str] = []
for c, r in zip(columns, reductions):
if r == "rolled":
cname = c + "_rolled"
if cname not in df.columns:
to_roll.append(cname)
else:
columns_.append(cname)
reductions_.append(None)
else:
columns_.append(c)
reductions_.append(r)
if len(to_roll) > 0:
raise RuntimeError(
f"Columns {to_roll} have not been smoothed. please run `smooth_column`",
)
raise ValueError("Energy scale not set. I don't know how to interpret the sign.")

# apply offset
df = dfops.apply_offset_from_columns(
df=df,
target_column=energy_column,
offset_columns=columns_,
offset_columns=columns,
signs=signs,
subtract_mean=subtract_mean,
reductions=reductions_,
reductions=reductions,
inplace=True,
)
# apply constant
Expand Down Expand Up @@ -2062,6 +2047,12 @@ def fit_energy_calibation(
- **'kinetic'**: increasing energy with decreasing TOF.
- **'binding'**: increasing energy with increasing TOF.
t0 (float, optional): constrains and initial values for the fit parameter t0, corresponding
to the time of flight offset. Defaults to 1e-6.
E0 (float, optional): constrains and initial values for the fit parameter E0, corresponding
to the energy offset. Defaults to min(vals).
d (float, optional): constrains and initial values for the fit parameter d, corresponding
to the drift distance. Defaults to 1.
Returns:
dict: A dictionary of fitting parameters including the following,
Expand Down Expand Up @@ -2337,7 +2328,7 @@ def tof2ns(
binwidth: float,
binning: int,
t: float,
) -> Union[List[float], np.ndarray]:
) -> float:
"""Converts the time-of-flight steps to time-of-flight in nanoseconds.
designed for use with dask.dataframe.DataFrame.map_partitions.
Expand All @@ -2349,5 +2340,5 @@ def tof2ns(
Returns:
float: Converted time in nanoseconds.
"""
val = t * 1e9 * binwidth * 2**binning
val = t * 1e9 * binwidth * 2.0**binning
return val
2 changes: 2 additions & 0 deletions sed/config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ dataframe:
y_column: "Y"
# dataframe column containing time-of-flight data
tof_column: "t"
# dataframe column containing time-of-flight data in nanoseconds
tof_ns_column: "t_ns"
# dataframe column containing analog-to-digital data
adc_column: "ADC"
# dataframe column containing bias voltage data
Expand Down
7 changes: 2 additions & 5 deletions sed/config/flash_example_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,19 @@ dataframe:
# dataframe column containing kx coordinates
kx_column: "kx"
# dataframe column containing y coordinates

y_column: dldPosY
# dataframe column containing corrected y coordinates
corrected_y_column: "Y"
# dataframe column containing kx coordinates
ky_column: "ky"
# dataframe column containing time-of-flight data

tof_column: dldTimeSteps
# dataframe column containing time-of-flight data in ns
tof_ns_column: dldTime
# dataframe column containing corrected time-of-flight data
corrected_tof_column: "tm"

# time length of a base time-of-flight bin in ns
tof_binwidth: 0.020576131995767355 # 0.16460905596613884
# time length of a base time-of-flight bin in seconds
tof_binwidth: 2.0576131995767355E-11
# binning parameter for time-of-flight data. 2**tof_binning bins per base bin
tof_binning: 3 # power of 2, 3 means 8 bins per step
# dataframe column containing sector ID. obtained from dldTimeSteps column
Expand Down
75 changes: 11 additions & 64 deletions sed/core/dfops.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,66 +265,6 @@ def backward_fill_partition(df):
return df


def rolling_average_on_acquisition_time(
df: Union[pd.DataFrame, dask.dataframe.DataFrame],
rolling_group_channel: str = None,
columns: Union[str, Sequence[str]] = None,
window: float = None,
sigma: float = 2,
config: dict = None,
) -> Union[pd.DataFrame, dask.dataframe.DataFrame]:
"""Perform a rolling average with a gaussian weighted window.
The rolling average is performed on the acquisition time instead of the index.
This can be a time-stamp or similar, such as the trainID at FLASH.
This is necessary first when considering the recorded electrons do not come at a regular time
interval, but even more importantly when loading multiple datasets with gaps in the acquisition.
In order to preserve the number of points, the first and last "window"
number of points are substituted with the original signal.
# TODO: this is currently very slow, and could do with a remake.
Args:
df (Union[pd.DataFrame, dask.dataframe.DataFrame]): Dataframe to use.
group_channel: (str): Name of the column on which to group the data
cols (str): Name of the column on which to perform the rolling average
window (float): Size of the rolling average window
sigma (float): number of standard deviations for the gaussian weighting of the window.
a value of 2 corresponds to a gaussian with sigma equal to half the window size.
Smaller values reduce the weighting in the window frame.
Returns:
Union[pd.DataFrame, dask.dataframe.DataFrame]: Dataframe with the new columns.
"""
if rolling_group_channel is None:
if config is None:
raise ValueError("Either group_channel or config must be given.")
rolling_group_channel = config["dataframe"]["rolling_group_channel"]
if isinstance(columns, str):
columns = [columns]
s = f"rolling average over {rolling_group_channel} on "
for c in columns:
s += f"{c}, "
print(s)
with ProgressBar():
df_ = df.groupby(rolling_group_channel).agg({c: "mean" for c in columns}).compute()
df_["dt"] = pd.to_datetime(df_.index, unit="s")
df_["ts"] = df_.index
for c in columns:
df_[c + "_rolled"] = (
df_[c]
.interpolate(method="nearest")
.rolling(window, center=True, win_type="gaussian")
.mean(std=window / sigma)
.fillna(df_[c])
)
df_ = df_.drop(c, axis=1)
if c + "_rolled" in df.columns:
df = df.drop(c + "_rolled", axis=1)
return df.merge(df_, left_on="timeStamp", right_on="ts").drop(["ts", "dt"], axis=1)


def apply_offset_from_columns(
df: Union[pd.DataFrame, dask.dataframe.DataFrame],
target_column: str,
Expand All @@ -342,7 +282,9 @@ def apply_offset_from_columns(
offset_columns (str): Name of the column(s) to use for the offset.
signs (int): Sign of the offset. Defaults to 1.
reductions (str): Reduction function to use for the offset. Defaults to "mean".
subtract_mean (bool): Whether to subtract the mean of the offset column. Defaults to False.
If a list is given, it must have the same length as offset_columns. Otherwise the value
passed is used for all columns.
Returns:
Union[pd.DataFrame, dask.dataframe.DataFrame]: Dataframe with the new column.
"""
Expand All @@ -351,8 +293,7 @@ def apply_offset_from_columns(
if not inplace:
df[target_column + "_offset"] = df[target_column]
target_column = target_column + "_offset"
if reductions is None:
reductions = "mean"

if isinstance(reductions, str):
reductions = [reductions] * len(offset_columns)
if isinstance(signs, int):
Expand All @@ -363,11 +304,17 @@ def apply_offset_from_columns(
subtract_mean = [subtract_mean] * len(offset_columns)

for col, sign, red, submean in zip(offset_columns, signs, reductions, subtract_mean):
assert col in df.columns, f"{col} not in dataframe!"
if col not in df.columns:
raise KeyError(f"{col} not in dataframe!")
if red is not None:
df[target_column] = df[target_column] + sign * df[col].agg(red)
else:
df[target_column] = df[target_column] + sign * df[col]
if submean:
df[target_column] = df[target_column] - sign * df[col].mean()
s = "+" if sign > 0 else "-"
msg = f"Shifting {target_column} by {s} {col}"
if submean[-1]:
msg += " and subtracting mean"
print(msg)
return df
Loading

0 comments on commit 61045d0

Please sign in to comment.