Skip to content

Commit

Permalink
add processor function to add time-stamped data either from directly …
Browse files Browse the repository at this point in the history
…provided data or from data extracted from an EPICS archiver instance, and add tests for it
  • Loading branch information
rettigl committed Nov 1, 2023
1 parent 13633c4 commit 421a0df
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 0 deletions.
2 changes: 2 additions & 0 deletions sed/config/mpes_example_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ histogram:
ranges: [[0, 1800], [0, 1800], [128000, 138000], [0, 32000]]

metadata:
# URL of the epics archiver request engine
archiver_url: "http://aa0.fhi-berlin.mpg.de:17668/retrieval/data/getData.json?pv="
# EPICS channels to collect from EPICS archiver
epics_pvs: ["KTOF:Lens:Extr:I", "trARPES:Carving:TEMP_RBV", "trARPES:XGS600:PressureAC:P_RD", "KTOF:Lens:UDLD:V", "KTOF:Lens:Sample:V", "KTOF:Apertures:m1.RBV", "KTOF:Apertures:m2.RBV", "KTOF:Apertures:m3.RBV", "trARPES:Carving:TRX.RBV", "trARPES:Carving:TRY.RBV", "trARPES:Carving:TRZ.RBV", "trARPES:Carving:THT.RBV", "trARPES:Carving:PHI.RBV", "trARPES:Carving:OMG.RBV"]
# hdf5 attribute containing the field aperture "in" motor position
Expand Down
62 changes: 62 additions & 0 deletions sed/core/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from sed.calibrator import MomentumCorrector
from sed.core.config import parse_config
from sed.core.config import save_config
from sed.core.dfops import add_time_stamped_data
from sed.core.dfops import apply_jitter
from sed.core.metadata import MetaHandler
from sed.diagnostics import grid_histogram
Expand All @@ -31,6 +32,8 @@
from sed.io import to_tiff
from sed.loader import CopyTool
from sed.loader import get_loader
from sed.loader.mpes.loader import get_archiver_data
from sed.loader.mpes.loader import MpesLoader

N_CPU = psutil.cpu_count()

Expand Down Expand Up @@ -1253,6 +1256,65 @@ def add_jitter(
metadata.append(col)
self._attributes.add(metadata, "jittering", duplicate_policy="append")

def add_time_stamped_data(
self,
dest_column: str,
time_stamps: np.ndarray = None,
data: np.ndarray = None,
archiver_channel: str = None,
**kwds,
):
"""Add data in form of timestamp/value pairs to the dataframe using interpolation to the
timestamps in the dataframe. The time-stamped data can either be provided, or fetched from
an EPICS archiver instance.
Args:
dest_column (str): destination column name
time_stamps (np.ndarray, optional): Time stamps of the values to add. If omitted,
time stamps are retrieved from the epics archiver
data (np.ndarray, optional): Values corresponding at the time stamps in time_stamps.
If omitted, data are retrieved from the epics archiver.
archiver_channel (str, optional): EPICS archiver channel from which to retrieve data.
Either this or data and time_stamps have to be present.
**kwds: additional keyword arguments passed to add_time_stamped_data
"""
time_stamp_column = kwds.pop(
"time_stamp_column",
self._config["dataframe"].get("time_stamp_alias", ""),
)

if time_stamps is None and data is None:
if archiver_channel is None:
raise ValueError(
"Either archiver_channel or both time_stamps and data have to be present!",
)
if self.loader.__name__ != "mpes":
raise NotImplementedError(
"This function is currently only implemented for the mpes loader!",
)
ts_from, ts_to = cast(MpesLoader, self.loader).get_start_and_end_time()
# get channel data with +-5 seconds safety margin
time_stamps, data = get_archiver_data(
archiver_url=self._config["metadata"].get("archiver_url", ""),
archiver_channel=archiver_channel,
ts_from=ts_from - 5,
ts_to=ts_to + 5,
)

self._dataframe = add_time_stamped_data(
self._dataframe,
time_stamps=time_stamps,
data=data,
dest_column=dest_column,
time_stamp_column=time_stamp_column,
**kwds,
)
metadata: List[Any] = []
metadata.append(dest_column)
metadata.append(time_stamps)
metadata.append(data)
self._attributes.add(metadata, "time_stamped_data", duplicate_policy="append")

def pre_binning(
self,
df_partitions: int = 100,
Expand Down
27 changes: 27 additions & 0 deletions tests/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,33 @@ def test_add_jitter():
np.testing.assert_allclose(res1a, res2a)


def test_add_time_stamped_data():
"""Test the function to add time-stamped data"""
processor = SedProcessor(
folder=df_folder + "../mpes/",
config=package_dir + "/config/mpes_example_config.yaml",
folder_config={},
user_config={},
system_config={},
time_stamps=True,
)
df_ts = processor.dataframe.timeStamps.compute().values
data = np.linspace(0, 1, 20)
time_stamps = np.linspace(df_ts[0], df_ts[-1], 20)
processor.add_time_stamped_data(
time_stamps=time_stamps,
data=data,
dest_column="time_stamped_data",
)
assert "time_stamped_data" in processor.dataframe
res = processor.dataframe["time_stamped_data"].compute().values
assert res[0] == 0
assert res[-1] == 1
assert processor.attributes["time_stamped_data"][0] == "time_stamped_data"
np.testing.assert_array_equal(processor.attributes["time_stamped_data"][1], time_stamps)
np.testing.assert_array_equal(processor.attributes["time_stamped_data"][2], data)


def test_event_histogram():
"""Test histogram plotting function"""
config = parse_config(
Expand Down

0 comments on commit 421a0df

Please sign in to comment.