From 772d13716a2065972b96f3849439f944a772c16f Mon Sep 17 00:00:00 2001 From: rettigl Date: Wed, 1 Nov 2023 21:50:56 +0100 Subject: [PATCH] add processor function to add time-stamped data either from directly provided data or from data extracted from an EPICS archiver instance, and add tests for it --- sed/config/mpes_example_config.yaml | 2 + sed/core/processor.py | 62 +++++++++++++++++++++++++++++ tests/test_processor.py | 27 +++++++++++++ 3 files changed, 91 insertions(+) diff --git a/sed/config/mpes_example_config.yaml b/sed/config/mpes_example_config.yaml index 5b9eca7b..b3e47670 100644 --- a/sed/config/mpes_example_config.yaml +++ b/sed/config/mpes_example_config.yaml @@ -216,6 +216,8 @@ histogram: ranges: [[0, 1800], [0, 1800], [128000, 138000], [0, 32000]] metadata: + # URL of the epics archiver request engine + archiver_url: "http://aa0.fhi-berlin.mpg.de:17668/retrieval/data/getData.json?pv=" # EPICS channels to collect from EPICS archiver epics_pvs: ["KTOF:Lens:Extr:I", "trARPES:Carving:TEMP_RBV", "trARPES:XGS600:PressureAC:P_RD", "KTOF:Lens:UDLD:V", "KTOF:Lens:Sample:V", "KTOF:Apertures:m1.RBV", "KTOF:Apertures:m2.RBV", "KTOF:Apertures:m3.RBV", "trARPES:Carving:TRX.RBV", "trARPES:Carving:TRY.RBV", "trARPES:Carving:TRZ.RBV", "trARPES:Carving:THT.RBV", "trARPES:Carving:PHI.RBV", "trARPES:Carving:OMG.RBV"] # hdf5 attribute containing the field aperture "in" motor position diff --git a/sed/core/processor.py b/sed/core/processor.py index 15efc245..0c82188c 100644 --- a/sed/core/processor.py +++ b/sed/core/processor.py @@ -25,6 +25,7 @@ from sed.calibrator import MomentumCorrector from sed.core.config import parse_config from sed.core.config import save_config +from sed.core.dfops import add_time_stamped_data from sed.core.dfops import apply_jitter from sed.core.metadata import MetaHandler from sed.diagnostics import grid_histogram @@ -33,6 +34,8 @@ from sed.io import to_tiff from sed.loader import CopyTool from sed.loader import get_loader +from sed.loader.mpes.loader import get_archiver_data +from sed.loader.mpes.loader import MpesLoader N_CPU = psutil.cpu_count() @@ -1503,6 +1506,65 @@ def add_jitter( metadata.append(col) self._attributes.add(metadata, "jittering", duplicate_policy="append") + def add_time_stamped_data( + self, + dest_column: str, + time_stamps: np.ndarray = None, + data: np.ndarray = None, + archiver_channel: str = None, + **kwds, + ): + """Add data in form of timestamp/value pairs to the dataframe using interpolation to the + timestamps in the dataframe. The time-stamped data can either be provided, or fetched from + an EPICS archiver instance. + + Args: + dest_column (str): destination column name + time_stamps (np.ndarray, optional): Time stamps of the values to add. If omitted, + time stamps are retrieved from the epics archiver + data (np.ndarray, optional): Values corresponding at the time stamps in time_stamps. + If omitted, data are retrieved from the epics archiver. + archiver_channel (str, optional): EPICS archiver channel from which to retrieve data. + Either this or data and time_stamps have to be present. + **kwds: additional keyword arguments passed to add_time_stamped_data + """ + time_stamp_column = kwds.pop( + "time_stamp_column", + self._config["dataframe"].get("time_stamp_alias", ""), + ) + + if time_stamps is None and data is None: + if archiver_channel is None: + raise ValueError( + "Either archiver_channel or both time_stamps and data have to be present!", + ) + if self.loader.__name__ != "mpes": + raise NotImplementedError( + "This function is currently only implemented for the mpes loader!", + ) + ts_from, ts_to = cast(MpesLoader, self.loader).get_start_and_end_time() + # get channel data with +-5 seconds safety margin + time_stamps, data = get_archiver_data( + archiver_url=self._config["metadata"].get("archiver_url", ""), + archiver_channel=archiver_channel, + ts_from=ts_from - 5, + ts_to=ts_to + 5, + ) + + self._dataframe = add_time_stamped_data( + self._dataframe, + time_stamps=time_stamps, + data=data, + dest_column=dest_column, + time_stamp_column=time_stamp_column, + **kwds, + ) + metadata: List[Any] = [] + metadata.append(dest_column) + metadata.append(time_stamps) + metadata.append(data) + self._attributes.add(metadata, "time_stamped_data", duplicate_policy="append") + def pre_binning( self, df_partitions: int = 100, diff --git a/tests/test_processor.py b/tests/test_processor.py index dcf8cd0a..bfbb6779 100644 --- a/tests/test_processor.py +++ b/tests/test_processor.py @@ -701,6 +701,33 @@ def test_add_jitter(): np.testing.assert_allclose(res1a, res2a) +def test_add_time_stamped_data(): + """Test the function to add time-stamped data""" + processor = SedProcessor( + folder=df_folder + "../mpes/", + config=package_dir + "/config/mpes_example_config.yaml", + folder_config={}, + user_config={}, + system_config={}, + time_stamps=True, + ) + df_ts = processor.dataframe.timeStamps.compute().values + data = np.linspace(0, 1, 20) + time_stamps = np.linspace(df_ts[0], df_ts[-1], 20) + processor.add_time_stamped_data( + time_stamps=time_stamps, + data=data, + dest_column="time_stamped_data", + ) + assert "time_stamped_data" in processor.dataframe + res = processor.dataframe["time_stamped_data"].compute().values + assert res[0] == 0 + assert res[-1] == 1 + assert processor.attributes["time_stamped_data"][0] == "time_stamped_data" + np.testing.assert_array_equal(processor.attributes["time_stamped_data"][1], time_stamps) + np.testing.assert_array_equal(processor.attributes["time_stamped_data"][2], data) + + def test_event_histogram(): """Test histogram plotting function""" config = parse_config(