Skip to content

Commit

Permalink
add fluctuation compensation (bam)
Browse files Browse the repository at this point in the history
  • Loading branch information
steinnymir committed Nov 4, 2023
1 parent 80a2ad4 commit 859a929
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 1 deletion.
78 changes: 77 additions & 1 deletion sed/calibrator/delay.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Any
from typing import Dict
from typing import List
from typing import Sequence
from typing import Tuple
from typing import Union

Expand All @@ -12,6 +13,8 @@
import numpy as np
import pandas as pd

from sed.core import dfops


class DelayCalibrator:
"""
Expand All @@ -37,7 +40,7 @@ def __init__(
if self.loader == "hextof":
self._append_delay_axis_docstring: str = self.append_delay_axis_hextof.__doc__
elif self.loader == "mpes":
self._append_delay_axis_docstring: str = self.append_delay_axis_mpes.__doc__
self._append_delay_axis_docstring = self.append_delay_axis_mpes.__doc__
else:
raise NotImplementedError(f"Loader '{self.loader}' not implemented.")

Expand All @@ -47,6 +50,7 @@ def __init__(
raise ValueError("No delay stage column specified.")
self.delay_column: str = self._config["dataframe"]["delay_column"]
self.calibration: Dict[str, Any] = {}
self.fluctuations: Dict[str, Any] = self._config["delay"].get("fluctuations", {})

def append_delay_axis(
self,
Expand Down Expand Up @@ -246,6 +250,78 @@ def append_delay_axis_mpes(

return df, metadata

def correct_timing_fluctuation(
self,
df: Union[pd.DataFrame, dask.dataframe.DataFrame],
delay_column: str = None,
columns: Union[str, Sequence[str]] = None,
signs: Union[int, Sequence[int]] = None,
preserve_mean: Union[bool, Sequence[bool]] = None,
reductions: Union[str, Sequence[str]] = None,
inplace: bool = True,
rename: str = None,
) -> Union[pd.DataFrame, dask.dataframe.DataFrame]:
"""Corrects fluctuations on the delay axis based on other monitored parameters.
An example application is the correction of the SASE jitter based on the
values of the Beam Arrival Monitor (BAM) at FLASH, or the correction of the
long term drifts using the "Streak Camera".
Args:
df (Union[pd.DataFrame, dask.dataframe.DataFrame]): The dataframe where
to apply the delay calibration to.
delay_column (str, optional): Destination column for delay calibration.
Defaults to config["dataframe"]["delay_column"].
fluctuation_column (str, optional): Source column for fluctuation correction.
sign (int, optional): Sign of the jitter correction. Defaults to 1.
preserve_mean (bool, optional): Subtract mean value of fluctuation column.
Using this ensures the average time of the delay axis is not changed.
reductions (str, optional): Reduction to apply to the fluctuation column.
inplace (bool, optional): Apply the correction inplace. If False, a new column will be
generated. The name will depend on the rename argument.
rename (str, optional): New name for the column generated not in place.
Returns:
Union[pd.DataFrame, dask.dataframe.DataFrame]: dataframe with corrected
delay axis.
"""
delay_column = delay_column or self.delay_column

if columns is None:
# load from config
columns = []
signs = []
preserve_mean = []
reductions = []
for k, v in self.fluctuations.items():
columns.append(k)
try:
signs.append(v["sign"])
except KeyError as exc:
raise KeyError(f"Missing sign for fluctuation column {k} in config.") from exc
preserve_mean.append(v.get("preserve_mean", False))
reductions.append(v.get("reduction", None))

df = dfops.offset_by_other_columns(
df=df,
target_column=delay_column,
offset_columns=columns,
signs=signs,
reductions=reductions,
preserve_mean=preserve_mean,
inplace=inplace,
rename=rename,
)

metadata: Dict[str, Any] = {
"fluctuations": {
"columns": columns,
"preserve_mean": preserve_mean,
"signs": signs,
},
}
return df, metadata


def extract_delay_stage_parameters(
file: str,
Expand Down
46 changes: 46 additions & 0 deletions tests/calibrator/test_delay.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,49 @@ def test_hextof_append_delay():
assert metadata["calibration"]["time0"] == 1
assert metadata["calibration"]["flip_time_axis"] is False
np.testing.assert_allclose(df["delay"], np.linspace(0, 1, 100) - 1)


def test_correct_timing_fluctuation():
"""test that the timing fluctuation is corrected for correctly"""
cfg = {
"core": {"loader": "hextof"},
"dataframe": {"delay_column": "delay", "delay_stage_column": "delayStage"},
"delay": {
"time0": 1,
"fluctuations": {
"bam": {
"sign": 1,
"preserve_mean": False,
},
},
},
}
config = parse_config(
config=cfg,
folder_config={},
user_config={},
system_config={},
)
df = dask.dataframe.from_pandas(
pd.DataFrame({"bam": np.random.normal(100) + 5, "delayStage": np.linspace(0, 1, 100)}),
npartitions=2,
)
dc = DelayCalibrator(config=config)
df, _ = dc.append_delay_axis(df)
assert "delay" in df.columns
df, meta = dc.correct_timing_fluctuation(df)
expected = df["delayStage"] + df["bam"] - 1
np.testing.assert_allclose(df["delay"], expected)

cfg["delay"]["fluctuations"]["bam"]["preserve_mean"] = True
config = parse_config(
config=cfg,
folder_config={},
user_config={},
system_config={},
)
dc = DelayCalibrator(config=config)
df, _ = dc.append_delay_axis(df)
assert "delay" in df.columns
df, meta = dc.correct_timing_fluctuation(df)
expected = df["delayStage"] - 1 + df["bam"] - 5

0 comments on commit 859a929

Please sign in to comment.