From 93ebed6d61e7d0274e8e8e3a91599b95c3061013 Mon Sep 17 00:00:00 2001 From: rettigl Date: Mon, 3 Apr 2023 23:56:11 +0200 Subject: [PATCH 01/13] Adds support for a timed dataframe, and histogram calculation from this dataframe --- sed/calibrator/energy.py | 2 +- sed/core/processor.py | 96 +++++++++- sed/loader/base/loader.py | 30 ++- sed/loader/generic/loader.py | 14 +- sed/loader/mpes/loader.py | 174 +++++++++++++++++- tests/calibrator/test_delay.py | 10 +- tests/calibrator/test_energy.py | 6 +- tests/calibrator/test_momentum.py | 8 +- tests/loader/test_loaders.py | 14 +- ...for example time-resolved ARPES data.ipynb | 41 ++++- 10 files changed, 355 insertions(+), 40 deletions(-) diff --git a/sed/calibrator/energy.py b/sed/calibrator/energy.py index d09d5923..2a2e5c95 100644 --- a/sed/calibrator/energy.py +++ b/sed/calibrator/energy.py @@ -232,7 +232,7 @@ def bin_data( "Either Bias Values or a valid bias_key has to be present!", ) from exc - dataframe, _ = self.loader.read_dataframe( + dataframe, _, _ = self.loader.read_dataframe( files=data_files, collect_metadata=False, ) diff --git a/sed/core/processor.py b/sed/core/processor.py index 6866843f..eafe09cf 100644 --- a/sed/core/processor.py +++ b/sed/core/processor.py @@ -18,6 +18,7 @@ import xarray as xr from sed.binning import bin_dataframe +from sed.binning.utils import bin_centers_to_bin_edges from sed.calibrator import DelayCalibrator from sed.calibrator import EnergyCalibrator from sed.calibrator import MomentumCorrector @@ -96,6 +97,7 @@ def __init__( self._config["binning"]["num_cores"] = num_cores self._dataframe: Union[pd.DataFrame, ddf.DataFrame] = None + self._timed_dataframe: Union[pd.DataFrame, ddf.DataFrame] = None self._files: List[str] = [] self._binned: xr.DataArray = None @@ -281,12 +283,13 @@ def load( metadata = {} if dataframe is not None: self._dataframe = dataframe + self._timed_dataframe = None elif runs is not None: # If runs are provided, we only use the copy tool if also folder is provided. # In that case, we copy the whole provided base folder tree, and pass the copied # version to the loader as base folder to look for the runs. if folder is not None: - dataframe, metadata = self.loader.read_dataframe( + dataframe, timed_dataframe, metadata = self.loader.read_dataframe( folders=cast(str, self.cpy(folder)), runs=runs, metadata=metadata, @@ -294,7 +297,7 @@ def load( **kwds, ) else: - dataframe, metadata = self.loader.read_dataframe( + dataframe, timed_dataframe, metadata = self.loader.read_dataframe( runs=runs, metadata=metadata, collect_metadata=collect_metadata, @@ -302,7 +305,7 @@ def load( ) elif folder is not None: - dataframe, metadata = self.loader.read_dataframe( + dataframe, timed_dataframe, metadata = self.loader.read_dataframe( folders=cast(str, self.cpy(folder)), metadata=metadata, collect_metadata=collect_metadata, @@ -310,7 +313,7 @@ def load( ) elif files is not None: - dataframe, metadata = self.loader.read_dataframe( + dataframe, timed_dataframe, metadata = self.loader.read_dataframe( files=cast(List[str], self.cpy(files)), metadata=metadata, collect_metadata=collect_metadata, @@ -323,6 +326,7 @@ def load( ) self._dataframe = dataframe + self._timed_dataframe = timed_dataframe self._files = self.loader.files for key in metadata: @@ -582,6 +586,10 @@ def apply_momentum_correction( self._dataframe, metadata = self.mc.apply_corrections( df=self._dataframe, ) + if self._timed_dataframe is not None: + self._timed_dataframe, _ = self.mc.apply_corrections( + self._timed_dataframe, + ) # Add Metadata self._attributes.add( metadata, @@ -705,6 +713,11 @@ def apply_momentum_calibration( df=self._dataframe, calibration=calibration, ) + if self._timed_dataframe is not None: + self._timed_dataframe, _ = self.mc.append_k_axis( + df=self._timed_dataframe, + calibration=calibration, + ) # Add Metadata self._attributes.add( @@ -823,6 +836,12 @@ def apply_energy_correction( correction=correction, **kwds, ) + if self._timed_dataframe is not None: + self._timed_dataframe, _ = self.ec.apply_energy_correction( + df=self._timed_dataframe, + correction=correction, + **kwds, + ) # Add Metadata self._attributes.add( @@ -1147,6 +1166,12 @@ def append_energy_axis( calibration=calibration, **kwds, ) + if self._timed_dataframe is not None: + self._timed_dataframe, _ = self.ec.append_energy_axis( + df=self._timed_dataframe, + calibration=calibration, + **kwds, + ) # Add Metadata self._attributes.add( @@ -1187,6 +1212,12 @@ def calibrate_delay_axis( delay_range=delay_range, **kwds, ) + if self._timed_dataframe is not None: + self._timed_dataframe, _ = self.dc.append_delay_axis( + self._timed_dataframe, + delay_range=delay_range, + **kwds, + ) else: if datafile is None: try: @@ -1203,6 +1234,12 @@ def calibrate_delay_axis( datafile=datafile, **kwds, ) + if self._timed_dataframe is not None: + self._timed_dataframe, _ = self.dc.append_delay_axis( + self._timed_dataframe, + datafile=datafile, + **kwds, + ) # Add Metadata self._attributes.add( @@ -1413,6 +1450,57 @@ def compute( return self._binned + def get_normalization_histogram( + self, + axis: str = "delay", + **kwds, + ) -> np.ndarray: + """Generates a normalization histogram from the TimeStamps column of the + dataframe. + + Args: + axis (str, optional): The axis for which to compute histogram. + Defaults to "delay". + **kwds: Keyword arguments: + + -df_partitions (int, optional): Number of dataframe partitions to use. + Defaults to all. + + Raises: + ValueError: Raised if no data are binned. + ValueError: Raised if 'axis' not in binned coordinates. + ValueError: Raised if config["dataframe"]["time_stamp_alias"] not found + in Dataframe. + + Returns: + np.ndarray: The computed normalization histogram (in TimeStamp units + per bin). + """ + + if self._binned is None: + raise ValueError("Need to bin data first!") + if axis not in self._binned.coords: + raise ValueError(f"Axis '{axis}' not found in binned data!") + + df_partitions = kwds.pop("df_partitions", None) + if df_partitions is not None: + timed_dataframe = self._timed_dataframe.partitions[ + 0 : min(df_partitions, self._timed_dataframe.npartitions) + ] + else: + timed_dataframe = self._timed_dataframe + + bins = timed_dataframe[axis].map_partitions( + pd.cut, + bins=bin_centers_to_bin_edges(self._binned.coords[axis].values), + ) + + histogram = ( + timed_dataframe[axis].groupby([bins]).count().compute().values + ) / 1000 + + return histogram + def view_event_histogram( self, dfpid: int, diff --git a/sed/loader/base/loader.py b/sed/loader/base/loader.py index dfff9c5f..1f905d82 100644 --- a/sed/loader/base/loader.py +++ b/sed/loader/base/loader.py @@ -54,7 +54,7 @@ def read_dataframe( metadata: dict = None, collect_metadata: bool = False, **kwds, - ) -> Tuple[ddf.DataFrame, dict]: + ) -> Tuple[ddf.DataFrame, ddf.DataFrame, dict]: """Reads data from given files, folder, or runs and returns a dask dataframe and corresponding metadata. @@ -77,8 +77,8 @@ def read_dataframe( **kwds: keyword arguments. See description in respective loader. Returns: - Tuple[ddf.DataFrame, dict]: Dask dataframe and metadata read from - specified files. + Tuple[ddf.DataFrame, dict]: Dask dataframe, timed dataframe and metadata + read from specified files. """ if metadata is None: @@ -123,7 +123,29 @@ def read_dataframe( if not files: raise FileNotFoundError("No valid files or runs found!") - return None, None + return None, None, None + + @abstractmethod + def get_files_from_run_id( + self, + run_id: str, + folders: Union[str, Sequence[str]] = None, + extension: str = None, + **kwds, + ) -> List[str]: + """Locate the files for a given run identifier. + + Args: + run_id (str): The run identifier to locate. + folders (Union[str, Sequence[str]], optional): The directory(ies) where the raw + data is located. Defaults to None. + extension (str, optional): The file extension. Defaults to None. + kwds: Keyword arguments + + Return: + List[str]: List of files for the given run. + """ + raise NotImplementedError @abstractmethod def get_files_from_run_id( diff --git a/sed/loader/generic/loader.py b/sed/loader/generic/loader.py index 3212bebe..7eb92e72 100644 --- a/sed/loader/generic/loader.py +++ b/sed/loader/generic/loader.py @@ -36,7 +36,7 @@ def read_dataframe( metadata: dict = None, collect_metadata: bool = False, **kwds, - ) -> Tuple[ddf.DataFrame, dict]: + ) -> Tuple[ddf.DataFrame, ddf.DataFrame, dict]: """Read stored files from a folder into a dataframe. Args: @@ -64,8 +64,8 @@ def read_dataframe( ValueError: Raised if the file type is not supported. Returns: - Tuple[ddf.DataFrame, dict]: Dask dataframe and metadata read from specified - files. + Tuple[ddf.DataFrame, dict]: Dask dataframe, timed dataframe and metadata + read from specified files. """ # pylint: disable=duplicate-code super().read_dataframe( @@ -84,16 +84,16 @@ def read_dataframe( self.metadata = self.metadata if ftype == "parquet": - return (ddf.read_parquet(self.files, **kwds), self.metadata) + return (ddf.read_parquet(self.files, **kwds), None, self.metadata) if ftype == "json": - return (ddf.read_json(self.files, **kwds), self.metadata) + return (ddf.read_json(self.files, **kwds), None, self.metadata) if ftype == "csv": - return (ddf.read_csv(self.files, **kwds), self.metadata) + return (ddf.read_csv(self.files, **kwds), None, self.metadata) try: - return (ddf.read_table(self.files, **kwds), self.metadata) + return (ddf.read_table(self.files, **kwds), None, self.metadata) except (TypeError, ValueError, NotImplementedError) as exc: raise ValueError( "The file format cannot be understood!", diff --git a/sed/loader/mpes/loader.py b/sed/loader/mpes/loader.py index 965abe91..82c1a99d 100644 --- a/sed/loader/mpes/loader.py +++ b/sed/loader/mpes/loader.py @@ -105,6 +105,87 @@ def hdf5_to_dataframe( return ddf.from_dask_array(array_stack, columns=column_names) +def hdf5_to_timed_dataframe( + files: Sequence[str], + group_names: Sequence[str] = None, + alias_dict: Dict[str, str] = None, + time_stamps: bool = False, + time_stamp_alias: str = "timeStamps", + ms_markers_group: str = "msMarkers", + first_event_time_stamp_key: str = "FirstEventTimeStamp", + **kwds, +) -> ddf.DataFrame: + """Function to read a selection of hdf5-files, and generate a delayed dask + dataframe from provided groups in the files. Optionally, aliases can be defined. + Returns a dataframe for evenly spaced time intervals. + + Args: + files (List[str]): A list of the file paths to load. + group_names (List[str], optional): hdf5 group names to load. Defaults to load + all groups containing "Stream" + alias_dict (Dict[str, str], optional): Dictionary of aliases for the dataframe + columns. Keys are the hdf5 groupnames, and values the aliases. If an alias + is not found, its group name is used. Defaults to read the attribute + "Name" from each group. + time_stamps (bool, optional): Option to calculate time stamps. Defaults to + False. + time_stamp_alias (str): Alias name for the timestamp column. + Defaults to "timeStamps". + ms_markers_group (str): h5 column containing timestamp information. + Defaults to "msMarkers". + first_event_time_stamp_key (str): h5 attribute containing the start + timestamp of a file. Defaults to "FirstEventTimeStamp". + + Returns: + ddf.DataFrame: The delayed Dask DataFrame + """ + if group_names is None: + group_names = [] + if alias_dict is None: + alias_dict = {} + + # Read a file to parse the file structure + test_fid = kwds.pop("test_fid", 0) + test_proc = h5py.File(files[test_fid]) + if group_names == []: + group_names, alias_dict = get_groups_and_aliases( + h5file=test_proc, + seach_pattern="Stream", + ) + + column_names = [alias_dict.get(group, group) for group in group_names] + + if time_stamps: + column_names.append(time_stamp_alias) + + test_array = hdf5_to_timed_array( + h5file=test_proc, + group_names=group_names, + time_stamps=time_stamps, + ms_markers_group=ms_markers_group, + first_event_time_stamp_key=first_event_time_stamp_key, + ) + + # Delay-read all files + arrays = [ + da.from_delayed( + dask.delayed(hdf5_to_timed_array)( + h5file=h5py.File(f), + group_names=group_names, + time_stamps=time_stamps, + ms_markers_group=ms_markers_group, + first_event_time_stamp_key=first_event_time_stamp_key, + ), + dtype=test_array.dtype, + shape=(test_array.shape[0], np.nan), + ) + for f in files + ] + array_stack = da.concatenate(arrays, axis=1).T + + return ddf.from_dask_array(array_stack, columns=column_names) + + def get_groups_and_aliases( h5file: h5py.File, seach_pattern: str = None, @@ -224,6 +305,81 @@ def hdf5_to_array( return np.asarray(data_list) +def hdf5_to_timed_array( + h5file: h5py.File, + group_names: Sequence[str], + data_type: str = "float32", + time_stamps=False, + ms_markers_group: str = "msMarkers", + first_event_time_stamp_key: str = "FirstEventTimeStamp", +) -> np.ndarray: + """Reads the content of the given groups in an hdf5 file, and returns a + timed version of a 2-dimensional array with the corresponding values. + + Args: + h5file (h5py.File): + hdf5 file handle to read from + group_names (str): + group names to read + data_type (str, optional): + Data type of the output data. Defaults to "float32". + time_stamps (bool, optional): + Option to calculate time stamps. Defaults to False. + ms_markers_group (str): h5 column containing timestamp information. + Defaults to "msMarkers". + first_event_time_stamp_key (str): h5 attribute containing the start + timestamp of a file. Defaults to "FirstEventTimeStamp". + + Returns: + np.ndarray: the array of the values at evently spaced timing obtained from + the ms_markers. + """ + + # Delayed array for loading an HDF5 file of reasonable size (e.g. < 1GB) + + # Read out groups: + data_list = [] + ms_marker = np.asarray(h5file[ms_markers_group]) + for group in group_names: + + g_dataset = np.asarray(h5file[group]) + if bool(data_type): + g_dataset = g_dataset.astype(data_type) + + timed_dataset = np.zeros_like(ms_marker) + for i, point in enumerate(ms_marker): + timed_dataset[i] = ( + g_dataset[point] + if point < len(g_dataset) + else g_dataset[len(g_dataset) - 1] + ) + + data_list.append(timed_dataset) + + # calculate time stamps + if time_stamps: + # try to get start timestamp from "FirstEventTimeStamp" attribute + try: + start_time_str = get_attribute(h5file, first_event_time_stamp_key) + start_time = datetime.datetime.strptime( + start_time_str, + "%Y-%m-%dT%H:%M:%S.%f%z", + ).timestamp() + except KeyError: + # get the start time of the file from its modification date if the key + # does not exist (old files) + start_time = os.path.getmtime(h5file.filename) # convert to ms + # the modification time points to the time when the file was finished, so we + # need to correct for the time it took to write the file + start_time -= len(ms_marker) / 1000 + + time_stamp_data = start_time + ms_marker / 1000 + + data_list.append(time_stamp_data) + + return np.asarray(data_list) + + def get_attribute(h5group: h5py.Group, attribute: str) -> str: """Reads, decodes and returns an attrubute from an hdf5 group @@ -323,7 +479,7 @@ def read_dataframe( collect_metadata: bool = False, time_stamps: bool = False, **kwds, - ) -> Tuple[ddf.DataFrame, dict]: + ) -> Tuple[ddf.DataFrame, ddf.DataFrame, dict]: """Read stored hdf5 files from a list or from folder and returns a dask dataframe and corresponding metadata. @@ -361,8 +517,8 @@ def read_dataframe( FileNotFoundError: Raised if a file or folder is not found. Returns: - Tuple[ddf.DataFrame, dict]: Dask dataframe and metadata read from specified - files. + Tuple[ddf.DataFrame, ddf.DataFrame, dict]: Dask dataframe, timed Dask + dataframe and metadata read from specified files. """ # if runs is provided, try to locate the respective files relative to the provided folder. if runs is not None: # pylint: disable=duplicate-code @@ -428,6 +584,16 @@ def read_dataframe( first_event_time_stamp_key=first_event_time_stamp_key, **kwds, ) + timed_df = hdf5_to_timed_dataframe( + files=self.files, + group_names=hdf5_groupnames, + alias_dict=hdf5_aliases, + time_stamps=time_stamps, + time_stamp_alias=time_stamp_alias, + ms_markers_group=ms_markers_group, + first_event_time_stamp_key=first_event_time_stamp_key, + **kwds, + ) if collect_metadata: metadata = self.gather_metadata( @@ -437,7 +603,7 @@ def read_dataframe( else: metadata = self.metadata - return df, metadata + return df, timed_df, metadata def get_files_from_run_id( self, diff --git a/tests/calibrator/test_delay.py b/tests/calibrator/test_delay.py index e702a7cc..ff4b1a85 100644 --- a/tests/calibrator/test_delay.py +++ b/tests/calibrator/test_delay.py @@ -28,7 +28,7 @@ def test_delay_parameters_from_file(): user_config={}, system_config={}, ) - df, _ = get_loader(loader_name="mpes", config=config).read_dataframe( + df, _, _ = get_loader(loader_name="mpes", config=config).read_dataframe( files=[file], collect_metadata=False, ) @@ -51,7 +51,7 @@ def test_delay_parameters_from_delay_range(): user_config={}, system_config={}, ) - df, _ = get_loader(loader_name="mpes", config=config).read_dataframe( + df, _, _ = get_loader(loader_name="mpes", config=config).read_dataframe( files=[file], collect_metadata=False, ) @@ -62,7 +62,7 @@ def test_delay_parameters_from_delay_range(): assert "adc_range" in metadata["calibration"] # from calibration - df, _ = get_loader(loader_name="mpes", config=config).read_dataframe( + df, _, _ = get_loader(loader_name="mpes", config=config).read_dataframe( files=[file], collect_metadata=False, ) @@ -84,7 +84,7 @@ def test_delay_parameters_from_delay_range_mm(): user_config={}, system_config={}, ) - df, _ = get_loader(loader_name="mpes", config=config).read_dataframe( + df, _, _ = get_loader(loader_name="mpes", config=config).read_dataframe( files=[file], collect_metadata=False, ) @@ -99,7 +99,7 @@ def test_delay_parameters_from_delay_range_mm(): assert "delay_range_mm" in metadata["calibration"] # from dict - df, _ = get_loader(loader_name="mpes", config=config).read_dataframe( + df, _, _ = get_loader(loader_name="mpes", config=config).read_dataframe( files=[file], collect_metadata=False, ) diff --git a/tests/calibrator/test_energy.py b/tests/calibrator/test_energy.py index ba931cc2..5793d42e 100644 --- a/tests/calibrator/test_energy.py +++ b/tests/calibrator/test_energy.py @@ -183,7 +183,7 @@ def test_calibrate_append(energy_scale: str, calibration_method: str): system_config={}, ) loader = get_loader(loader_name="mpes", config=config) - df, _ = loader.read_dataframe(folders=df_folder, collect_metadata=False) + df, _, _ = loader.read_dataframe(folders=df_folder, collect_metadata=False) ec = EnergyCalibrator(config=config, loader=loader) ec.load_data(biases=biases, traces=traces, tof=tof) ec.normalize() @@ -234,7 +234,7 @@ def test_append_energy_axis_from_dict_kwds(calib_type: str, calib_dict: dict): config = parse_config(config={}, folder_config={}, user_config={}, system_config={}) loader = get_loader(loader_name="mpes", config=config) # from dict - df, _ = loader.read_dataframe(folders=df_folder, collect_metadata=False) + df, _, _ = loader.read_dataframe(folders=df_folder, collect_metadata=False) ec = EnergyCalibrator(config=config, loader=loader) df, metadata = ec.append_energy_axis(df, calibration=calib_dict) assert config["dataframe"]["energy_column"] in df.columns @@ -244,7 +244,7 @@ def test_append_energy_axis_from_dict_kwds(calib_type: str, calib_dict: dict): assert metadata["calibration"]["calib_type"] == calib_type # from kwds - df, _ = loader.read_dataframe(folders=df_folder, collect_metadata=False) + df, _, _ = loader.read_dataframe(folders=df_folder, collect_metadata=False) ec = EnergyCalibrator(config=config, loader=loader) df, metadata = ec.append_energy_axis(df, **calib_dict) assert config["dataframe"]["energy_column"] in df.columns diff --git a/tests/calibrator/test_momentum.py b/tests/calibrator/test_momentum.py index 63ddb0f2..7aad2b47 100644 --- a/tests/calibrator/test_momentum.py +++ b/tests/calibrator/test_momentum.py @@ -136,7 +136,7 @@ def test_apply_correction(): user_config={}, system_config={}, ) - df, _ = get_loader(loader_name="mpes", config=config).read_dataframe( + df, _, _ = get_loader(loader_name="mpes", config=config).read_dataframe( folders=df_folder, collect_metadata=False, ) @@ -249,7 +249,7 @@ def test_apply_registration( user_config={}, system_config={}, ) - df, _ = get_loader(loader_name="mpes", config=config).read_dataframe( + df, _, _ = get_loader(loader_name="mpes", config=config).read_dataframe( folders=df_folder, collect_metadata=False, ) @@ -321,7 +321,7 @@ def test_momentum_calibration_equiscale(): user_config={}, system_config={}, ) - df, _ = get_loader(loader_name="mpes", config=config).read_dataframe( + df, _, _ = get_loader(loader_name="mpes", config=config).read_dataframe( folders=df_folder, collect_metadata=False, ) @@ -352,7 +352,7 @@ def test_momentum_calibration_two_points(): user_config={}, system_config={}, ) - df, _ = get_loader(loader_name="mpes", config=config).read_dataframe( + df, _, _ = get_loader(loader_name="mpes", config=config).read_dataframe( folders=df_folder, collect_metadata=False, ) diff --git a/tests/loader/test_loaders.py b/tests/loader/test_loaders.py index 3947adfc..b55b30fb 100644 --- a/tests/loader/test_loaders.py +++ b/tests/loader/test_loaders.py @@ -109,28 +109,28 @@ def test_has_correct_read_dataframe_func(loader: BaseLoader, read_type: str): extension=supported_file_type, ) if read_type == "one_file": - loaded_dataframe, loaded_metadata = loader.read_dataframe( + loaded_dataframe, _, loaded_metadata = loader.read_dataframe( files=input_files[0], ftype=supported_file_type, collect_metadata=False, ) expected_size = 1 elif read_type == "files": - loaded_dataframe, loaded_metadata = loader.read_dataframe( + loaded_dataframe, _, loaded_metadata = loader.read_dataframe( files=list(input_files), ftype=supported_file_type, collect_metadata=False, ) expected_size = len(input_files) elif read_type == "one_folder": - loaded_dataframe, loaded_metadata = loader.read_dataframe( + loaded_dataframe, _, loaded_metadata = loader.read_dataframe( folders=input_folder, ftype=supported_file_type, collect_metadata=False, ) expected_size = len(input_files) elif read_type == "folders": - loaded_dataframe, loaded_metadata = loader.read_dataframe( + loaded_dataframe, _, loaded_metadata = loader.read_dataframe( folders=[input_folder], ftype=supported_file_type, collect_metadata=False, @@ -139,7 +139,7 @@ def test_has_correct_read_dataframe_func(loader: BaseLoader, read_type: str): elif read_type == "one_run": if runs[get_loader_name_from_loader_object(loader)] is None: pytest.skip("Not implemented") - loaded_dataframe, loaded_metadata = loader.read_dataframe( + loaded_dataframe, _, loaded_metadata = loader.read_dataframe( runs=runs[get_loader_name_from_loader_object(loader)][0], ftype=supported_file_type, collect_metadata=False, @@ -148,7 +148,7 @@ def test_has_correct_read_dataframe_func(loader: BaseLoader, read_type: str): elif read_type == "runs": if runs[get_loader_name_from_loader_object(loader)] is None: pytest.skip("Not implemented") - loaded_dataframe, loaded_metadata = loader.read_dataframe( + loaded_dataframe, _, loaded_metadata = loader.read_dataframe( runs=runs[get_loader_name_from_loader_object(loader)], ftype=supported_file_type, collect_metadata=False, @@ -263,7 +263,7 @@ def test_mpes_timestamps(): loader_name = "mpes" loader = get_loader(loader_name) input_folder = os.path.join(test_data_dir, "loader", loader_name) - df, _ = loader.read_dataframe( + df, _, _ = loader.read_dataframe( folders=input_folder, collect_metadata=False, time_stamps=True, diff --git a/tutorial/3 - Conversion Pipeline for example time-resolved ARPES data.ipynb b/tutorial/3 - Conversion Pipeline for example time-resolved ARPES data.ipynb index 13633790..72e45d3f 100644 --- a/tutorial/3 - Conversion Pipeline for example time-resolved ARPES data.ipynb +++ b/tutorial/3 - Conversion Pipeline for example time-resolved ARPES data.ipynb @@ -589,7 +589,7 @@ "axes = ['kx', 'ky', 'energy', 'delay']\n", "bins = [100, 100, 200, 50]\n", "ranges = [[-2, 2], [-2, 2], [-4, 2], [-600, 1600]]\n", - "res = sp.compute(bins=bins, axes=axes, ranges=ranges)" + "res = sp.compute(bins=bins, axes=axes, ranges=ranges, df_partitions=10)" ] }, { @@ -615,6 +615,45 @@ "res.loc[{'kx':slice(-.8, -.5), 'energy':slice(.5, 2)}].sum(axis=(0,1)).plot(ax=axs[3])" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "3073b9ba", + "metadata": {}, + "outputs": [], + "source": [ + "histogram = sp.get_normalization_histogram()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "596a3217", + "metadata": {}, + "outputs": [], + "source": [ + "ax, fig = plt.subplots(1,1)\n", + "plt.plot(histogram)\n", + "plt.plot(sp._binned.sum(axis=(0,1,2))/10000)\n", + "plt.show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0664d479", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "60dfa14c", + "metadata": {}, + "outputs": [], + "source": [] + }, { "cell_type": "code", "execution_count": null, From 773f402c550328869733993ea6e0f6fa78957853 Mon Sep 17 00:00:00 2001 From: rettigl Date: Tue, 16 May 2023 22:22:36 +0200 Subject: [PATCH 02/13] unified both methods for histogram calculation, and moved the actual code to binning.py --- sed/binning/binning.py | 59 ++++++++++++++++++++++++++++++++++++++++++ sed/core/processor.py | 59 +++++++++++++++++++++++++++++------------- 2 files changed, 100 insertions(+), 18 deletions(-) diff --git a/sed/binning/binning.py b/sed/binning/binning.py index 9a488312..b273e025 100644 --- a/sed/binning/binning.py +++ b/sed/binning/binning.py @@ -433,6 +433,65 @@ def bin_dataframe( return data_array +def normalization_histogram_from_timestamps( + df: dask.dataframe.DataFrame, + axis: str, + bin_centers: np.ndarray, + time_stamp_column: str, +) -> np.ndarray: + """Get a normalization histogram from the time stamps column in the dataframe. + + Args: + df (dask.dataframe.DataFrame): a dask.DataFrame on which to perform the + histogram. + axis (str): The axis (dataframe column) on which to calculate the normalization + histogram. + bin_centers (np.ndarray): Bin centers used for binning of the axis. + time_stamp_column (str): Dataframe column containing the time stamps. + + Returns: + np.ndarray: Calculated normalization histogram. + """ + time_per_electron = df[time_stamp_column].diff() + + bins = df[axis].map_partitions( + pd.cut, + bins=bin_centers_to_bin_edges(bin_centers), + ) + + histogram = time_per_electron.groupby([bins]).sum().compute().values + + return histogram + + +def normalization_histogram_from_timed_dataframe( + df: dask.dataframe.DataFrame, + axis: str, + bin_centers: np.ndarray, +) -> np.ndarray: + """Get a normalization histogram from a timed datafram. + + Args: + df (dask.dataframe.DataFrame): a dask.DataFrame on which to perform the + histogram. Entries should be based on an equal time unit. + axis (str): The axis (dataframe column) on which to calculate the normalization + histogram. + bin_centers (np.ndarray): Bin centers used for binning of the axis. + + Returns: + np.ndarray: Calculated normalization histogram. + """ + bins = df[axis].map_partitions( + pd.cut, + bins=bin_centers_to_bin_edges(bin_centers), + ) + + histogram = df[axis].groupby([bins]).count().compute().values + #histogram = bin_dataframe(df, axes=[axis], bins=[bin_centers]) + + return histogram + + def apply_jitter_on_column( df: Union[dask.dataframe.core.DataFrame, pd.DataFrame], amp: float, diff --git a/sed/core/processor.py b/sed/core/processor.py index eafe09cf..b763799c 100644 --- a/sed/core/processor.py +++ b/sed/core/processor.py @@ -18,7 +18,8 @@ import xarray as xr from sed.binning import bin_dataframe -from sed.binning.utils import bin_centers_to_bin_edges +from sed.binning.binning import normalization_histogram_from_timed_dataframe +from sed.binning.binning import normalization_histogram_from_timestamps from sed.calibrator import DelayCalibrator from sed.calibrator import EnergyCalibrator from sed.calibrator import MomentumCorrector @@ -1453,14 +1454,17 @@ def compute( def get_normalization_histogram( self, axis: str = "delay", + use_time_stamps: bool = False, **kwds, ) -> np.ndarray: - """Generates a normalization histogram from the TimeStamps column of the - dataframe. + """Generates a normalization histogram from the timed dataframe. Optionally, + use the TimeStamps column instead. Args: axis (str, optional): The axis for which to compute histogram. Defaults to "delay". + use_time_stamps (bool, optional): Use the TimeStamps column of the + dataframe, rather than the timed dataframe. Defaults to False. **kwds: Keyword arguments: -df_partitions (int, optional): Number of dataframe partitions to use. @@ -1482,22 +1486,41 @@ def get_normalization_histogram( if axis not in self._binned.coords: raise ValueError(f"Axis '{axis}' not found in binned data!") - df_partitions = kwds.pop("df_partitions", None) - if df_partitions is not None: - timed_dataframe = self._timed_dataframe.partitions[ - 0 : min(df_partitions, self._timed_dataframe.npartitions) - ] - else: - timed_dataframe = self._timed_dataframe - - bins = timed_dataframe[axis].map_partitions( - pd.cut, - bins=bin_centers_to_bin_edges(self._binned.coords[axis].values), - ) + df_partitions: Union[int, slice] = kwds.pop("df_partitions", None) + if isinstance(df_partitions, int): + df_partitions = slice( + 0, + min(df_partitions, self._dataframe.npartitions), + ) - histogram = ( - timed_dataframe[axis].groupby([bins]).count().compute().values - ) / 1000 + if use_time_stamps or self._timed_dataframe is None: + if df_partitions is not None: + histogram = normalization_histogram_from_timestamps( + self._dataframe.partitions[df_partitions], + axis, + self._binned.coords[axis].values, + self._config["dataframe"]["time_stamp_alias"], + ) + else: + histogram = normalization_histogram_from_timestamps( + self._dataframe, + axis, + self._binned.coords[axis].values, + self._config["dataframe"]["time_stamp_alias"], + ) + else: + if df_partitions is not None: + histogram = normalization_histogram_from_timed_dataframe( + self._timed_dataframe.partitions[df_partitions], + axis, + self._binned.coords[axis].values, + ) + else: + histogram = normalization_histogram_from_timed_dataframe( + self._timed_dataframe, + axis, + self._binned.coords[axis].values, + ) return histogram From 509a48976ffffbef6a79d22fbeebc9ef6d76c589 Mon Sep 17 00:00:00 2001 From: rettigl Date: Tue, 16 May 2023 23:13:17 +0200 Subject: [PATCH 03/13] add config value for unit time of timed data frame --- sed/binning/binning.py | 6 ++++-- sed/config/default.yaml | 2 ++ sed/config/mpes_example_config.yaml | 2 ++ sed/core/processor.py | 2 ++ 4 files changed, 10 insertions(+), 2 deletions(-) diff --git a/sed/binning/binning.py b/sed/binning/binning.py index b273e025..82195ac5 100644 --- a/sed/binning/binning.py +++ b/sed/binning/binning.py @@ -468,6 +468,7 @@ def normalization_histogram_from_timed_dataframe( df: dask.dataframe.DataFrame, axis: str, bin_centers: np.ndarray, + time_unit: float, ) -> np.ndarray: """Get a normalization histogram from a timed datafram. @@ -477,6 +478,7 @@ def normalization_histogram_from_timed_dataframe( axis (str): The axis (dataframe column) on which to calculate the normalization histogram. bin_centers (np.ndarray): Bin centers used for binning of the axis. + time_unit (float): Time unit the data frame entries are based on. Returns: np.ndarray: Calculated normalization histogram. @@ -487,9 +489,9 @@ def normalization_histogram_from_timed_dataframe( ) histogram = df[axis].groupby([bins]).count().compute().values - #histogram = bin_dataframe(df, axes=[axis], bins=[bin_centers]) + # histogram = bin_dataframe(df, axes=[axis], bins=[bin_centers]) - return histogram + return histogram * time_unit def apply_jitter_on_column( diff --git a/sed/config/default.yaml b/sed/config/default.yaml index 10c362e6..5f3899ff 100644 --- a/sed/config/default.yaml +++ b/sed/config/default.yaml @@ -37,6 +37,8 @@ dataframe: jitter_cols: ["@x_column", "@y_column", "@tof_column"] # Jitter amplitude or list of jitter amplitudes. Should equal half the digitial step size of each jitter_column jitter_amps: 0.5 + # Time stepping in seconds of the succesive events in the timed dataframe + timed_dataframe_unit_time: 0.001 energy: # Number of bins to use for energy calibration traces diff --git a/sed/config/mpes_example_config.yaml b/sed/config/mpes_example_config.yaml index ba46bfec..5b9eca7b 100644 --- a/sed/config/mpes_example_config.yaml +++ b/sed/config/mpes_example_config.yaml @@ -29,6 +29,8 @@ dataframe: ms_markers_group: "msMarkers" # hdf5 attribute containing the timestamp of the first event in a file first_event_time_stamp_key: "FirstEventTimeStamp" + # Time stepping in seconds of the succesive events in the timed dataframe + timed_dataframe_unit_time: 0.001 # list of columns to apply jitter to jitter_cols: ["X", "Y", "t", "ADC"] # dataframe column containing x coordinates diff --git a/sed/core/processor.py b/sed/core/processor.py index b763799c..8b37714d 100644 --- a/sed/core/processor.py +++ b/sed/core/processor.py @@ -1514,12 +1514,14 @@ def get_normalization_histogram( self._timed_dataframe.partitions[df_partitions], axis, self._binned.coords[axis].values, + self._config["dataframe"]["timed_dataframe_unit_time"], ) else: histogram = normalization_histogram_from_timed_dataframe( self._timed_dataframe, axis, self._binned.coords[axis].values, + self._config["dataframe"]["timed_dataframe_unit_time"], ) return histogram From c4ccef4ed56c725b02424742557f3d7024f1af62 Mon Sep 17 00:00:00 2001 From: rettigl Date: Wed, 17 May 2023 22:50:58 +0200 Subject: [PATCH 04/13] add normalization option to compute function of processor fix performance issue in mpes loader for timed dataframes return normalization histograms as xarrays --- sed/binning/binning.py | 26 ++-- sed/core/processor.py | 124 +++++++++++++----- sed/loader/mpes/loader.py | 6 +- ...for example time-resolved ARPES data.ipynb | 34 +---- 4 files changed, 116 insertions(+), 74 deletions(-) diff --git a/sed/binning/binning.py b/sed/binning/binning.py index 82195ac5..c57efa72 100644 --- a/sed/binning/binning.py +++ b/sed/binning/binning.py @@ -438,7 +438,7 @@ def normalization_histogram_from_timestamps( axis: str, bin_centers: np.ndarray, time_stamp_column: str, -) -> np.ndarray: +) -> xr.DataArray: """Get a normalization histogram from the time stamps column in the dataframe. Args: @@ -450,7 +450,7 @@ def normalization_histogram_from_timestamps( time_stamp_column (str): Dataframe column containing the time stamps. Returns: - np.ndarray: Calculated normalization histogram. + xr.DataArray: Calculated normalization histogram. """ time_per_electron = df[time_stamp_column].diff() @@ -461,7 +461,12 @@ def normalization_histogram_from_timestamps( histogram = time_per_electron.groupby([bins]).sum().compute().values - return histogram + data_array = xr.DataArray( + data=histogram, + coords={axis: bin_centers}, + ) + + return data_array def normalization_histogram_from_timed_dataframe( @@ -469,7 +474,7 @@ def normalization_histogram_from_timed_dataframe( axis: str, bin_centers: np.ndarray, time_unit: float, -) -> np.ndarray: +) -> xr.DataArray: """Get a normalization histogram from a timed datafram. Args: @@ -481,17 +486,22 @@ def normalization_histogram_from_timed_dataframe( time_unit (float): Time unit the data frame entries are based on. Returns: - np.ndarray: Calculated normalization histogram. + xr.DataArray: Calculated normalization histogram. """ bins = df[axis].map_partitions( pd.cut, bins=bin_centers_to_bin_edges(bin_centers), ) - histogram = df[axis].groupby([bins]).count().compute().values - # histogram = bin_dataframe(df, axes=[axis], bins=[bin_centers]) + histogram = df[axis].groupby([bins]).count().compute().values * time_unit + # histogram = bin_dataframe(df, axes=[axis], bins=[bin_centers]) * time_unit + + data_array = xr.DataArray( + data=histogram, + coords={axis: bin_centers}, + ) - return histogram * time_unit + return data_array def apply_jitter_on_column( diff --git a/sed/core/processor.py b/sed/core/processor.py index 8b37714d..3d743d92 100644 --- a/sed/core/processor.py +++ b/sed/core/processor.py @@ -103,6 +103,8 @@ def __init__( self._binned: xr.DataArray = None self._pre_binned: xr.DataArray = None + self._normalization_histogram: xr.DataArray = None + self._normalized: xr.DataArray = None self._attributes = MetaHandler(meta=metadata) @@ -1352,6 +1354,7 @@ def compute( ] = 100, axes: Union[str, Sequence[str]] = None, ranges: Sequence[Tuple[float, float]] = None, + normalize_to_acquisition_time: Union[bool, str] = False, **kwds, ) -> xr.DataArray: """Compute the histogram along the given dimensions. @@ -1373,6 +1376,10 @@ def compute( dimensions in the resulting array. Defaults to None. ranges (Sequence[Tuple[float, float]], optional): list of tuples containing the start and end point of the binning range. Defaults to None. + normalize_to_acquisition_time (Union[bool, str]): Option to normalize the + result to the acquistion time. If a "slow" axis was scanned, providing + the name of the scanned axis will compute and apply the corresponding + normalization histogram. Defaults to False. **kwds: Keyword arguments: - **hist_mode**: Histogram calculation method. "numpy" or "numba". See @@ -1391,8 +1398,8 @@ def compute( - **threadpool_api**: The API to use for multiprocessing. "blas", "openmp" or None. See ``threadpool_limit`` for details. Defaults to config["binning"]["threadpool_API"]. - - **df_partitions**: A list of dataframe partitions. Defaults to all - partitions. + - **df_partitions**: A range or list of dataframe partitions, or the + number of the dataframe partitions to use. Defaults to all partitions. Additional kwds are passed to ``bin_dataframe``. @@ -1418,10 +1425,13 @@ def compute( self._config["binning"]["threadpool_API"], ) df_partitions = kwds.pop("df_partitions", None) + if isinstance(df_partitions, int): + df_partitions = slice( + 0, + min(df_partitions, self._dataframe.npartitions), + ) if df_partitions is not None: - dataframe = self._dataframe.partitions[ - 0 : min(df_partitions, self._dataframe.npartitions) - ] + dataframe = self._dataframe.partitions[df_partitions] else: dataframe = self._dataframe @@ -1449,6 +1459,45 @@ def compute( self._binned.attrs["long_name"] = "photoelectron counts" self._binned.attrs["metadata"] = self._attributes.metadata + if normalize_to_acquisition_time: + if isinstance(normalize_to_acquisition_time, str): + axis = normalize_to_acquisition_time + print( + f"Calculate normalization histogram for axis '{axis}'...", + ) + self._normalization_histogram = ( + self.get_normalization_histogram( + axis=axis, + df_partitions=df_partitions, + ) + ) + # if the axes are named correctly, xarray figures out the normalization correctly + self._normalized = self._binned / self._normalization_histogram + self._attributes.add( + self._normalization_histogram.values, + name="normalization_histogram", + duplicate_policy="overwrite", + ) + else: + acquisition_time = self.loader.get_elapsed_time( + fids=df_partitions, + ) + if acquisition_time > 0: + self._normalized = self._binned / acquisition_time + self._attributes.add( + acquisition_time, + name="normalization_histogram", + duplicate_policy="overwrite", + ) + + self._normalized.attrs["units"] = "counts/second" + self._normalized.attrs[ + "long_name" + ] = "photoelectron counts per second" + self._normalized.attrs["metadata"] = self._attributes.metadata + + return self._normalized + return self._binned def get_normalization_histogram( @@ -1456,7 +1505,7 @@ def get_normalization_histogram( axis: str = "delay", use_time_stamps: bool = False, **kwds, - ) -> np.ndarray: + ) -> xr.DataArray: """Generates a normalization histogram from the timed dataframe. Optionally, use the TimeStamps column instead. @@ -1477,7 +1526,7 @@ def get_normalization_histogram( in Dataframe. Returns: - np.ndarray: The computed normalization histogram (in TimeStamp units + xr.DataArray: The computed normalization histogram (in TimeStamp units per bin). """ @@ -1495,36 +1544,44 @@ def get_normalization_histogram( if use_time_stamps or self._timed_dataframe is None: if df_partitions is not None: - histogram = normalization_histogram_from_timestamps( - self._dataframe.partitions[df_partitions], - axis, - self._binned.coords[axis].values, - self._config["dataframe"]["time_stamp_alias"], + self._normalization_histogram = ( + normalization_histogram_from_timestamps( + self._dataframe.partitions[df_partitions], + axis, + self._binned.coords[axis].values, + self._config["dataframe"]["time_stamp_alias"], + ) ) else: - histogram = normalization_histogram_from_timestamps( - self._dataframe, - axis, - self._binned.coords[axis].values, - self._config["dataframe"]["time_stamp_alias"], + self._normalization_histogram = ( + normalization_histogram_from_timestamps( + self._dataframe, + axis, + self._binned.coords[axis].values, + self._config["dataframe"]["time_stamp_alias"], + ) ) else: if df_partitions is not None: - histogram = normalization_histogram_from_timed_dataframe( - self._timed_dataframe.partitions[df_partitions], - axis, - self._binned.coords[axis].values, - self._config["dataframe"]["timed_dataframe_unit_time"], + self._normalization_histogram = ( + normalization_histogram_from_timed_dataframe( + self._timed_dataframe.partitions[df_partitions], + axis, + self._binned.coords[axis].values, + self._config["dataframe"]["timed_dataframe_unit_time"], + ) ) else: - histogram = normalization_histogram_from_timed_dataframe( - self._timed_dataframe, - axis, - self._binned.coords[axis].values, - self._config["dataframe"]["timed_dataframe_unit_time"], + self._normalization_histogram = ( + normalization_histogram_from_timed_dataframe( + self._timed_dataframe, + axis, + self._binned.coords[axis].values, + self._config["dataframe"]["timed_dataframe_unit_time"], + ) ) - return histogram + return self._normalization_histogram def view_event_histogram( self, @@ -1656,17 +1713,22 @@ def save( if self._binned is None: raise NameError("Need to bin data first!") + if self._normalized is not None: + data = self._normalized + else: + data = self._binned + extension = pathlib.Path(faddr).suffix if extension in (".tif", ".tiff"): to_tiff( - data=self._binned, + data=data, faddr=faddr, **kwds, ) elif extension in (".h5", ".hdf5"): to_h5( - data=self._binned, + data=data, faddr=faddr, **kwds, ) @@ -1693,7 +1755,7 @@ def save( input_files.append(kwds.pop("eln_data")) to_nexus( - data=self._binned, + data=data, faddr=faddr, reader=reader, definition=definition, diff --git a/sed/loader/mpes/loader.py b/sed/loader/mpes/loader.py index 82c1a99d..b145cb0d 100644 --- a/sed/loader/mpes/loader.py +++ b/sed/loader/mpes/loader.py @@ -348,11 +348,7 @@ def hdf5_to_timed_array( timed_dataset = np.zeros_like(ms_marker) for i, point in enumerate(ms_marker): - timed_dataset[i] = ( - g_dataset[point] - if point < len(g_dataset) - else g_dataset[len(g_dataset) - 1] - ) + timed_dataset[i] = g_dataset[int(point) - 1] data_list.append(timed_dataset) diff --git a/tutorial/3 - Conversion Pipeline for example time-resolved ARPES data.ipynb b/tutorial/3 - Conversion Pipeline for example time-resolved ARPES data.ipynb index 72e45d3f..1ae9785c 100644 --- a/tutorial/3 - Conversion Pipeline for example time-resolved ARPES data.ipynb +++ b/tutorial/3 - Conversion Pipeline for example time-resolved ARPES data.ipynb @@ -589,7 +589,7 @@ "axes = ['kx', 'ky', 'energy', 'delay']\n", "bins = [100, 100, 200, 50]\n", "ranges = [[-2, 2], [-2, 2], [-4, 2], [-600, 1600]]\n", - "res = sp.compute(bins=bins, axes=axes, ranges=ranges, df_partitions=10)" + "res = sp.compute(bins=bins, axes=axes, ranges=ranges, normalize_to_acquisition_time=\"delay\")" ] }, { @@ -615,16 +615,6 @@ "res.loc[{'kx':slice(-.8, -.5), 'energy':slice(.5, 2)}].sum(axis=(0,1)).plot(ax=axs[3])" ] }, - { - "cell_type": "code", - "execution_count": null, - "id": "3073b9ba", - "metadata": {}, - "outputs": [], - "source": [ - "histogram = sp.get_normalization_histogram()" - ] - }, { "cell_type": "code", "execution_count": null, @@ -632,28 +622,12 @@ "metadata": {}, "outputs": [], "source": [ - "ax, fig = plt.subplots(1,1)\n", - "plt.plot(histogram)\n", - "plt.plot(sp._binned.sum(axis=(0,1,2))/10000)\n", + "fig, ax = plt.subplots(1,1)\n", + "(sp._normalization_histogram*90000).plot(ax=ax)\n", + "sp._binned.sum(axis=(0,1,2)).plot(ax=ax)\n", "plt.show()" ] }, - { - "cell_type": "code", - "execution_count": null, - "id": "0664d479", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "60dfa14c", - "metadata": {}, - "outputs": [], - "source": [] - }, { "cell_type": "code", "execution_count": null, From 57ac19baae3d91b27257954251c4665bad340ca6 Mon Sep 17 00:00:00 2001 From: rettigl Date: Mon, 29 May 2023 22:44:26 +0200 Subject: [PATCH 05/13] add jittering of timed dataframe --- sed/core/processor.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sed/core/processor.py b/sed/core/processor.py index 3d743d92..29f91676 100644 --- a/sed/core/processor.py +++ b/sed/core/processor.py @@ -1288,6 +1288,12 @@ def add_jitter( amps=amps, **kwds, ) + if self._timed_dataframe is not None: + self._timed_dataframe = self._timed_dataframe.map_partitions( + apply_jitter, + cols=cols, + cols_jittered=cols, + ) metadata = [] for col in cols: metadata.append(col) From fc5cb2697be0b8a27eb779c3b8df7c70951cc77e Mon Sep 17 00:00:00 2001 From: rettigl Date: Sat, 12 Aug 2023 22:13:29 +0200 Subject: [PATCH 06/13] fix merge issues --- sed/core/processor.py | 65 +++++++++++++-------------------- sed/loader/base/loader.py | 22 ----------- sed/loader/flash/loader.py | 4 +- tests/calibrator/test_energy.py | 2 +- tests/test_diagnostics.py | 2 +- tests/test_processor.py | 6 +-- 6 files changed, 32 insertions(+), 69 deletions(-) diff --git a/sed/core/processor.py b/sed/core/processor.py index 29f91676..ddbe5daa 100644 --- a/sed/core/processor.py +++ b/sed/core/processor.py @@ -285,8 +285,7 @@ def load( if metadata is None: metadata = {} if dataframe is not None: - self._dataframe = dataframe - self._timed_dataframe = None + timed_dataframe = kwds.pop("timed_dataframe", None) elif runs is not None: # If runs are provided, we only use the copy tool if also folder is provided. # In that case, we copy the whole provided base folder tree, and pass the copied @@ -314,7 +313,6 @@ def load( collect_metadata=collect_metadata, **kwds, ) - elif files is not None: dataframe, timed_dataframe, metadata = self.loader.read_dataframe( files=cast(List[str], self.cpy(files)), @@ -322,7 +320,6 @@ def load( collect_metadata=collect_metadata, **kwds, ) - else: raise ValueError( "Either 'dataframe', 'files', 'folder', or 'runs' needs to be provided!", @@ -1471,11 +1468,9 @@ def compute( print( f"Calculate normalization histogram for axis '{axis}'...", ) - self._normalization_histogram = ( - self.get_normalization_histogram( - axis=axis, - df_partitions=df_partitions, - ) + self._normalization_histogram = self.get_normalization_histogram( + axis=axis, + df_partitions=df_partitions, ) # if the axes are named correctly, xarray figures out the normalization correctly self._normalized = self._binned / self._normalization_histogram @@ -1497,9 +1492,7 @@ def compute( ) self._normalized.attrs["units"] = "counts/second" - self._normalized.attrs[ - "long_name" - ] = "photoelectron counts per second" + self._normalized.attrs["long_name"] = "photoelectron counts per second" self._normalized.attrs["metadata"] = self._attributes.metadata return self._normalized @@ -1550,41 +1543,33 @@ def get_normalization_histogram( if use_time_stamps or self._timed_dataframe is None: if df_partitions is not None: - self._normalization_histogram = ( - normalization_histogram_from_timestamps( - self._dataframe.partitions[df_partitions], - axis, - self._binned.coords[axis].values, - self._config["dataframe"]["time_stamp_alias"], - ) + self._normalization_histogram = normalization_histogram_from_timestamps( + self._dataframe.partitions[df_partitions], + axis, + self._binned.coords[axis].values, + self._config["dataframe"]["time_stamp_alias"], ) else: - self._normalization_histogram = ( - normalization_histogram_from_timestamps( - self._dataframe, - axis, - self._binned.coords[axis].values, - self._config["dataframe"]["time_stamp_alias"], - ) + self._normalization_histogram = normalization_histogram_from_timestamps( + self._dataframe, + axis, + self._binned.coords[axis].values, + self._config["dataframe"]["time_stamp_alias"], ) else: if df_partitions is not None: - self._normalization_histogram = ( - normalization_histogram_from_timed_dataframe( - self._timed_dataframe.partitions[df_partitions], - axis, - self._binned.coords[axis].values, - self._config["dataframe"]["timed_dataframe_unit_time"], - ) + self._normalization_histogram = normalization_histogram_from_timed_dataframe( + self._timed_dataframe.partitions[df_partitions], + axis, + self._binned.coords[axis].values, + self._config["dataframe"]["timed_dataframe_unit_time"], ) else: - self._normalization_histogram = ( - normalization_histogram_from_timed_dataframe( - self._timed_dataframe, - axis, - self._binned.coords[axis].values, - self._config["dataframe"]["timed_dataframe_unit_time"], - ) + self._normalization_histogram = normalization_histogram_from_timed_dataframe( + self._timed_dataframe, + axis, + self._binned.coords[axis].values, + self._config["dataframe"]["timed_dataframe_unit_time"], ) return self._normalization_histogram diff --git a/sed/loader/base/loader.py b/sed/loader/base/loader.py index 1f905d82..880be88d 100644 --- a/sed/loader/base/loader.py +++ b/sed/loader/base/loader.py @@ -147,28 +147,6 @@ def get_files_from_run_id( """ raise NotImplementedError - @abstractmethod - def get_files_from_run_id( - self, - run_id: str, - folders: Union[str, Sequence[str]] = None, - extension: str = None, - **kwds, - ) -> List[str]: - """Locate the files for a given run identifier. - - Args: - run_id (str): The run identifier to locate. - folders (Union[str, Sequence[str]], optional): The directory(ies) where the raw - data is located. Defaults to None. - extension (str, optional): The file extension. Defaults to None. - kwds: Keyword arguments - - Return: - List[str]: List of files for the given run. - """ - raise NotImplementedError - @abstractmethod def get_count_rate( self, diff --git a/sed/loader/flash/loader.py b/sed/loader/flash/loader.py index c1622c87..1f78c100 100644 --- a/sed/loader/flash/loader.py +++ b/sed/loader/flash/loader.py @@ -792,7 +792,7 @@ def read_dataframe( metadata: dict = None, collect_metadata: bool = False, **kwds, - ) -> Tuple[dd.DataFrame, dict]: + ) -> Tuple[dd.DataFrame, dd.DataFrame, dict]: """ Read express data from the DAQ, generating a parquet in between. @@ -850,7 +850,7 @@ def read_dataframe( metadata = self.parse_metadata() if collect_metadata else {} print(f"loading complete in {time.time() - t0:.2f} s") - return dataframe, metadata + return dataframe, None, metadata LOADER = FlashLoader diff --git a/tests/calibrator/test_energy.py b/tests/calibrator/test_energy.py index 5793d42e..86c48e57 100644 --- a/tests/calibrator/test_energy.py +++ b/tests/calibrator/test_energy.py @@ -258,7 +258,7 @@ def test_append_energy_axis_raises(): """Test if apply_correction raises the correct errors""" config = parse_config(config={}, folder_config={}, user_config={}, system_config={}) loader = get_loader(loader_name="mpes", config=config) - df, _ = loader.read_dataframe(folders=df_folder, collect_metadata=False) + df, _, _ = loader.read_dataframe(folders=df_folder, collect_metadata=False) ec = EnergyCalibrator(config=config, loader=loader) with pytest.raises(ValueError): df, _ = ec.append_energy_axis(df, calibration={"d": 1, "t0": 0}) diff --git a/tests/test_diagnostics.py b/tests/test_diagnostics.py index ab80bde7..21b20093 100644 --- a/tests/test_diagnostics.py +++ b/tests/test_diagnostics.py @@ -31,7 +31,7 @@ def test_plot_histogram(ncols: int, backend: str): ncols (int): number of columns backend (str): plotting backend to use """ - dataframe, _ = loader.read_dataframe(files=files) + dataframe, _, _ = loader.read_dataframe(files=files) axes = config["histogram"]["axes"] ranges = config["histogram"]["ranges"] bins = config["histogram"]["bins"] diff --git a/tests/test_processor.py b/tests/test_processor.py index 4d0fca66..8bb41240 100644 --- a/tests/test_processor.py +++ b/tests/test_processor.py @@ -49,7 +49,7 @@ def test_processor_from_dataframe(): """Test generation of the processor from a dataframe object""" config = {"core": {"loader": "generic"}} - dataframe, _ = loader.read_dataframe(files=files) + dataframe, _, _ = loader.read_dataframe(files=files) processor = SedProcessor( dataframe=dataframe, config=config, @@ -64,7 +64,7 @@ def test_processor_from_dataframe(): def test_processor_from_files(): """Test generation of the processor from a list of files""" config = {"core": {"loader": "generic"}} - dataframe, _ = loader.read_dataframe(files=files) + dataframe, _, _ = loader.read_dataframe(files=files) processor = SedProcessor( files=files, config=config, @@ -79,7 +79,7 @@ def test_processor_from_files(): def test_processor_from_folders(): """Test generation of the processor from a folder""" config = {"core": {"loader": "generic"}} - dataframe, _ = loader.read_dataframe(files=files) + dataframe, _, _ = loader.read_dataframe(files=files) processor = SedProcessor( folder=df_folder, config=config, From 1acba31b9bc34714888b6440e0874a9a5871e392 Mon Sep 17 00:00:00 2001 From: "M. Zain Sohail" Date: Fri, 29 Sep 2023 14:26:03 +0200 Subject: [PATCH 07/13] added the timed dataframe --- sed/loader/flash/loader.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/sed/loader/flash/loader.py b/sed/loader/flash/loader.py index 1f78c100..8dd289cd 100644 --- a/sed/loader/flash/loader.py +++ b/sed/loader/flash/loader.py @@ -701,7 +701,9 @@ def parquet_handler( save_parquet (bool, optional): Saves the entire dataframe into a parquet. Returns: - dataframe: Dataframe containing the loaded or processed data. + tuple: A tuple containing two dataframes: + - dataframe_electron: Dataframe containing the loaded/augmented electron data. + - dataframe_pulse: Dataframe containing the loaded/augmented timed data. Raises: FileNotFoundError: If the requested parquet file is not found. @@ -748,15 +750,22 @@ def parquet_handler( iterations=self._config["dataframe"].get("forward_fill_iterations", 2), ) # Remove the NaNs from per_electron channels - dataframe = dataframe.dropna( + dataframe_electron = dataframe.dropna( subset=self.get_channels_by_format(["per_electron"]), ) + dataframe_pulse = dataframe[ + self.multi_index + self.get_channels_by_format(["per_pulse", "per_train"]) + ] + dataframe_pulse = dataframe_pulse[ + (dataframe_pulse["electronId"] == 0) | (np.isnan(dataframe_pulse["electronId"])) + ] + # Save the dataframe as parquet if requested if save_parquet: - dataframe.compute().reset_index(drop=True).to_parquet(parquet_path) + dataframe_electron.compute().reset_index(drop=True).to_parquet(parquet_path) print("Combined parquet file saved.") - return dataframe + return dataframe_electron, dataframe_pulse def parse_metadata(self) -> dict: """Uses the MetadataRetriever class to fetch metadata from scicat for each run. @@ -845,12 +854,12 @@ def read_dataframe( metadata=metadata, ) - dataframe = self.parquet_handler(data_parquet_dir, **kwds) + df, df_timed = self.parquet_handler(data_parquet_dir, **kwds) metadata = self.parse_metadata() if collect_metadata else {} print(f"loading complete in {time.time() - t0:.2f} s") - return dataframe, None, metadata + return df, df_timed, metadata LOADER = FlashLoader From 4377ad20ebe6824b688cf2234f6fbe2bbc393ac9 Mon Sep 17 00:00:00 2001 From: rettigl Date: Wed, 11 Oct 2023 00:11:15 +0200 Subject: [PATCH 08/13] add tests for timed dataframes, histogram generation and related processor function add accessor functions for binned and normalized histograms and normalization histograms --- sed/core/processor.py | 63 ++++++++++++++++++++++++++++ tests/loader/test_loaders.py | 23 +++++++++++ tests/test_binning.py | 28 +++++++++++++ tests/test_processor.py | 79 +++++++++++++++++++++++++++++++++++- 4 files changed, 192 insertions(+), 1 deletion(-) diff --git a/sed/core/processor.py b/sed/core/processor.py index ddbe5daa..d7c61871 100644 --- a/sed/core/processor.py +++ b/sed/core/processor.py @@ -189,6 +189,34 @@ def dataframe(self, dataframe: Union[pd.DataFrame, ddf.DataFrame]): ) self._dataframe = dataframe + @property + def timed_dataframe(self) -> Union[pd.DataFrame, ddf.DataFrame]: + """Accessor to the underlying timed_dataframe. + + Returns: + Union[pd.DataFrame, ddf.DataFrame]: Timed Dataframe object. + """ + return self._timed_dataframe + + @timed_dataframe.setter + def timed_dataframe(self, timed_dataframe: Union[pd.DataFrame, ddf.DataFrame]): + """Setter for the underlying timed dataframe. + + Args: + timed_dataframe (Union[pd.DataFrame, ddf.DataFrame]): The timed dataframe object to set + """ + if not isinstance(timed_dataframe, (pd.DataFrame, ddf.DataFrame)) or not isinstance( + timed_dataframe, + self._timed_dataframe.__class__, + ): + raise ValueError( + "'timed_dataframe' has to be a Pandas or Dask dataframe and has to be of the same " + "kind as the dataframe loaded into the SedProcessor!.\n" + f"Loaded type: {self._timed_dataframe.__class__}, " + f"provided type: {timed_dataframe}.", + ) + self._timed_dataframe = timed_dataframe + @property def attributes(self) -> dict: """Accessor to the metadata dict. @@ -229,6 +257,41 @@ def files(self) -> List[str]: """ return self._files + @property + def binned(self) -> xr.DataArray: + """Getter attribute for the binned data array + + Returns: + xr.DataArray: The binned data array + """ + if self._binned is None: + raise ValueError("No binned data available, need to compute histogram first!") + return self._binned + + @property + def normalized(self) -> xr.DataArray: + """Getter attribute for the normalized data array + + Returns: + xr.DataArray: The normalized data array + """ + if self._normalized is None: + raise ValueError( + "No normalized data available, compute data with normalization enabled!", + ) + return self._normalized + + @property + def normalization_histogram(self) -> xr.DataArray: + """Getter attribute for the normalization histogram + + Returns: + xr.DataArray: The normalizazion histogram + """ + if self._normalization_histogram is None: + raise ValueError("No normalization histogram available, generate histogram first!") + return self._normalization_histogram + def cpy(self, path: Union[str, List[str]]) -> Union[str, List[str]]: """Function to mirror a list of files or a folder from a network drive to a local storage. Returns either the original or the copied path to the given diff --git a/tests/loader/test_loaders.py b/tests/loader/test_loaders.py index b55b30fb..cc539a28 100644 --- a/tests/loader/test_loaders.py +++ b/tests/loader/test_loaders.py @@ -168,6 +168,29 @@ def test_has_correct_read_dataframe_func(loader: BaseLoader, read_type: str): os.remove(Path(parquet_data_dir, "buffer", file)) +@pytest.mark.parametrize("loader", get_all_loaders()) +def test_timed_dataframe(loader: BaseLoader): + """Test if the loaders return a correct timed dataframe + + Args: + loader (BaseLoader): the loader object to test + """ + if loader.__name__ != "BaseLoader": + loader_name = get_loader_name_from_loader_object(loader) + input_folder = os.path.join(test_data_dir, "loader", loader_name) + for supported_file_type in loader.supported_file_types: + loaded_dataframe, loaded_timed_dataframe, _ = loader.read_dataframe( + folders=input_folder, + ftype=supported_file_type, + collect_metadata=False, + ) + if loaded_timed_dataframe is None: + pytest.skip("Not implemented") + assert isinstance(loaded_timed_dataframe, ddf.DataFrame) + assert set(loaded_timed_dataframe.columns).issubset(set(loaded_dataframe.columns)) + assert loaded_timed_dataframe.npartitions == loaded_dataframe.npartitions + + @pytest.mark.parametrize("loader", get_all_loaders()) def test_get_count_rate(loader: BaseLoader): """Test the get_count_rate function diff --git a/tests/test_binning.py b/tests/test_binning.py index 74062cb5..89f79f7e 100644 --- a/tests/test_binning.py +++ b/tests/test_binning.py @@ -14,6 +14,8 @@ from sed.binning.binning import bin_dataframe from sed.binning.binning import bin_partition +from sed.binning.binning import normalization_histogram_from_timed_dataframe +from sed.binning.binning import normalization_histogram_from_timestamps from sed.binning.binning import numba_histogramdd from sed.binning.binning import simplify_binning_arguments from sed.binning.numba_bin import _hist_from_bin_range @@ -504,3 +506,29 @@ def test_bin_dataframe(): np.testing.assert_allclose(res.values, res2.values) res2 = bin_dataframe(df=sample_ddf, bins=bins, axes=columns, ranges=ranges, mode="lean") np.testing.assert_allclose(res.values, res2.values) + + +def test_normalization_histogram_from_timestamps(): + """Test the function to generate the normalization histogram from timestamps""" + time_stamped_df = sample_ddf.copy() + time_stamped_df["timeStamps"] = time_stamped_df.index + res = bin_dataframe(df=sample_ddf, bins=[bins[0]], axes=[columns[0]], ranges=[ranges[0]]) + histogram = normalization_histogram_from_timestamps( + df=time_stamped_df, + axis=columns[0], + bin_centers=res.coords[columns[0]].values, + time_stamp_column="timeStamps", + ) + np.testing.assert_allclose(res / res.sum(), histogram / histogram.sum(), rtol=0.001) + + +def test_normalization_histogram_from_timed_dataframe(): + """Test the function to generate the normalization histogram from the timed dataframe""" + res = bin_dataframe(df=sample_ddf, bins=[bins[0]], axes=[columns[0]], ranges=[ranges[0]]) + histogram = normalization_histogram_from_timed_dataframe( + df=sample_ddf, + axis=columns[0], + bin_centers=res.coords[columns[0]].values, + time_unit=1, + ) + np.testing.assert_allclose(res / res.sum(), histogram / histogram.sum()) diff --git a/tests/test_processor.py b/tests/test_processor.py index 8bb41240..2bf2d228 100644 --- a/tests/test_processor.py +++ b/tests/test_processor.py @@ -24,6 +24,7 @@ # pylint: disable=duplicate-code package_dir = os.path.dirname(find_spec("sed").origin) df_folder = package_dir + "/../tests/data/loader/generic/" +mpes_folder = package_dir + "/../tests/data/loader/mpes/" folder = package_dir + "/../tests/data/calibrator/" files = glob.glob(df_folder + "*.parquet") runs = ["43878", "43878"] @@ -655,10 +656,86 @@ def test_compute(): axes = ["X", "Y", "t", "ADC"] ranges = [[0, 2048], [0, 2048], [0, 200000], [0, 50000]] result = processor.compute(bins=bins, axes=axes, ranges=ranges, df_partitions=5) - assert result.data.shape == (10, 10, 10, 10) + assert result.data.shape == tuple(bins) assert result.data.sum(axis=(0, 1, 2, 3)) > 0 +def test_compute_with_normalization(): + """Test binning of final result with histogram normalization""" + config = parse_config( + config={"core": {"loader": "mpes"}}, + folder_config={}, + user_config={}, + system_config={}, + ) + processor = SedProcessor( + folder=mpes_folder, + config=config, + folder_config={}, + user_config={}, + system_config={}, + ) + bins = [10, 10, 10, 5] + axes = ["X", "Y", "t", "ADC"] + ranges = [[0, 2048], [0, 2048], [0, 200000], [650, 655]] + result = processor.compute( + bins=bins, + axes=axes, + ranges=ranges, + df_partitions=5, + normalize_to_acquisition_time="ADC", + ) + assert result.data.shape == tuple(bins) + assert result.data.sum(axis=(0, 1, 2, 3)) > 0 + assert processor.normalization_histogram is not None + assert processor.normalized is not None + np.testing.assert_allclose( + processor.binned.data, + (processor.normalized * processor.normalization_histogram).data, + ) + + +def test_get_normalization_histogram(): + """Test the generation function for the normalization histogram""" + config = parse_config( + config={"core": {"loader": "mpes"}, "dataframe": {"time_stamp_alias": "timeStamps"}}, + folder_config={}, + user_config={}, + system_config={}, + ) + processor = SedProcessor( + folder=mpes_folder, + config=config, + folder_config={}, + user_config={}, + system_config={}, + time_stamps=True, + ) + bins = [10, 10, 10, 5] + axes = ["X", "Y", "t", "ADC"] + ranges = [[0, 2048], [0, 2048], [0, 200000], [650, 655]] + with pytest.raises(ValueError): + processor.get_normalization_histogram(axis="ADC") + processor.compute(bins=bins, axes=axes, ranges=ranges, df_partitions=5) + with pytest.raises(ValueError): + processor.get_normalization_histogram(axis="Delay") + histogram1 = processor.get_normalization_histogram(axis="ADC", df_partitions=1) + histogram2 = processor.get_normalization_histogram( + axis="ADC", + use_time_stamps="True", + df_partitions=1, + ) + # TODO: Check why histograms are so different + np.testing.assert_allclose( + histogram1 / histogram1.sum(), + histogram2 / histogram2.sum(), + atol=0.02, + ) + # histogram1 = processor.get_normalization_histogram(axis="ADC") + # histogram2 = processor.get_normalization_histogram(axis="ADC", use_time_stamps="True") + # np.testing.assert_allclose(histogram1, histogram2) + + metadata: Dict[Any, Any] = {} metadata["entry_title"] = "Title" # User From 9c3faf086dfa0672519648c7debac23b280b4666 Mon Sep 17 00:00:00 2001 From: rettigl Date: Sat, 28 Oct 2023 23:15:45 +0200 Subject: [PATCH 09/13] add exceptions for missing columns in timed_dataframe --- sed/core/processor.py | 82 ++++++++++++++++++++++++++----------------- 1 file changed, 50 insertions(+), 32 deletions(-) diff --git a/sed/core/processor.py b/sed/core/processor.py index d7c61871..55fbdf3b 100644 --- a/sed/core/processor.py +++ b/sed/core/processor.py @@ -650,9 +650,13 @@ def apply_momentum_correction( df=self._dataframe, ) if self._timed_dataframe is not None: - self._timed_dataframe, _ = self.mc.apply_corrections( - self._timed_dataframe, - ) + if ( + self._config["dataframe"]["x_column"] in self._timed_dataframe.columns + and self._config["dataframe"]["y_column"] in self._timed_dataframe.columns + ): + self._timed_dataframe, _ = self.mc.apply_corrections( + self._timed_dataframe, + ) # Add Metadata self._attributes.add( metadata, @@ -777,10 +781,14 @@ def apply_momentum_calibration( calibration=calibration, ) if self._timed_dataframe is not None: - self._timed_dataframe, _ = self.mc.append_k_axis( - df=self._timed_dataframe, - calibration=calibration, - ) + if ( + self._config["dataframe"]["x_column"] in self._timed_dataframe.columns + and self._config["dataframe"]["y_column"] in self._timed_dataframe.columns + ): + self._timed_dataframe, _ = self.mc.append_k_axis( + df=self._timed_dataframe, + calibration=calibration, + ) # Add Metadata self._attributes.add( @@ -900,11 +908,12 @@ def apply_energy_correction( **kwds, ) if self._timed_dataframe is not None: - self._timed_dataframe, _ = self.ec.apply_energy_correction( - df=self._timed_dataframe, - correction=correction, - **kwds, - ) + if self._config["dataframe"]["tof_column"] in self._timed_dataframe.columns: + self._timed_dataframe, _ = self.ec.apply_energy_correction( + df=self._timed_dataframe, + correction=correction, + **kwds, + ) # Add Metadata self._attributes.add( @@ -1230,11 +1239,12 @@ def append_energy_axis( **kwds, ) if self._timed_dataframe is not None: - self._timed_dataframe, _ = self.ec.append_energy_axis( - df=self._timed_dataframe, - calibration=calibration, - **kwds, - ) + if self._config["dataframe"]["tof_column"] in self._timed_dataframe.columns: + self._timed_dataframe, _ = self.ec.append_energy_axis( + df=self._timed_dataframe, + calibration=calibration, + **kwds, + ) # Add Metadata self._attributes.add( @@ -1276,11 +1286,12 @@ def calibrate_delay_axis( **kwds, ) if self._timed_dataframe is not None: - self._timed_dataframe, _ = self.dc.append_delay_axis( - self._timed_dataframe, - delay_range=delay_range, - **kwds, - ) + if self._config["dataframe"]["adc_column"] in self._timed_dataframe.columns: + self._timed_dataframe, _ = self.dc.append_delay_axis( + self._timed_dataframe, + delay_range=delay_range, + **kwds, + ) else: if datafile is None: try: @@ -1298,11 +1309,12 @@ def calibrate_delay_axis( **kwds, ) if self._timed_dataframe is not None: - self._timed_dataframe, _ = self.dc.append_delay_axis( - self._timed_dataframe, - datafile=datafile, - **kwds, - ) + if self._config["dataframe"]["adc_column"] in self._timed_dataframe.columns: + self._timed_dataframe, _ = self.dc.append_delay_axis( + self._timed_dataframe, + datafile=datafile, + **kwds, + ) # Add Metadata self._attributes.add( @@ -1349,11 +1361,17 @@ def add_jitter( **kwds, ) if self._timed_dataframe is not None: - self._timed_dataframe = self._timed_dataframe.map_partitions( - apply_jitter, - cols=cols, - cols_jittered=cols, - ) + cols_timed = cols.copy() + for col in cols: + if col not in self._timed_dataframe.columns: + cols_timed.remove(col) + + if cols_timed: + self._timed_dataframe = self._timed_dataframe.map_partitions( + apply_jitter, + cols=cols_timed, + cols_jittered=cols_timed, + ) metadata = [] for col in cols: metadata.append(col) From aa4c35fcc803cac9cd2a79ef31c71750e6e299b3 Mon Sep 17 00:00:00 2001 From: rettigl Date: Sat, 28 Oct 2023 23:58:54 +0200 Subject: [PATCH 10/13] Update processor tests to use mpes loader and test for timed dataframe --- tests/test_processor.py | 73 ++++++++++++++++++++++++----------------- 1 file changed, 43 insertions(+), 30 deletions(-) diff --git a/tests/test_processor.py b/tests/test_processor.py index 2bf2d228..9371ca35 100644 --- a/tests/test_processor.py +++ b/tests/test_processor.py @@ -23,12 +23,12 @@ # pylint: disable=duplicate-code package_dir = os.path.dirname(find_spec("sed").origin) -df_folder = package_dir + "/../tests/data/loader/generic/" -mpes_folder = package_dir + "/../tests/data/loader/mpes/" +df_folder = package_dir + "/../tests/data/loader/mpes/" +df_folder_generic = package_dir + "/../tests/data/loader/generic/" folder = package_dir + "/../tests/data/calibrator/" -files = glob.glob(df_folder + "*.parquet") +files = glob.glob(df_folder + "*.h5") runs = ["43878", "43878"] -loader = get_loader(loader_name="generic") +loader = get_loader(loader_name="mpes") source_folder = package_dir + "/../" dest_folder = tempfile.mkdtemp() gid = os.getgid() @@ -49,10 +49,11 @@ def test_processor_from_dataframe(): """Test generation of the processor from a dataframe object""" - config = {"core": {"loader": "generic"}} - dataframe, _, _ = loader.read_dataframe(files=files) + config = {"core": {"loader": "mpes"}} + dataframe, timed_dataframe, _ = loader.read_dataframe(files=files) processor = SedProcessor( dataframe=dataframe, + timed_dataframe=timed_dataframe, config=config, folder_config={}, user_config={}, @@ -60,12 +61,16 @@ def test_processor_from_dataframe(): ) for column in dataframe.columns: assert (dataframe[column].compute() == processor.dataframe[column].compute()).all() + for column in timed_dataframe.columns: + assert ( + timed_dataframe[column].compute() == processor.timed_dataframe[column].compute() + ).all() def test_processor_from_files(): """Test generation of the processor from a list of files""" - config = {"core": {"loader": "generic"}} - dataframe, _, _ = loader.read_dataframe(files=files) + config = {"core": {"loader": "mpes"}} + dataframe, timed_dataframe, _ = loader.read_dataframe(files=files) processor = SedProcessor( files=files, config=config, @@ -75,12 +80,16 @@ def test_processor_from_files(): ) for column in dataframe.columns: assert (dataframe[column].compute() == processor.dataframe[column].compute()).all() + for column in timed_dataframe.columns: + assert ( + timed_dataframe[column].compute() == processor.timed_dataframe[column].compute() + ).all() def test_processor_from_folders(): """Test generation of the processor from a folder""" - config = {"core": {"loader": "generic"}} - dataframe, _, _ = loader.read_dataframe(files=files) + config = {"core": {"loader": "mpes"}} + dataframe, timed_dataframe, _ = loader.read_dataframe(files=files) processor = SedProcessor( folder=df_folder, config=config, @@ -90,6 +99,10 @@ def test_processor_from_folders(): ) for column in dataframe.columns: assert (dataframe[column].compute() == processor.dataframe[column].compute()).all() + for column in timed_dataframe.columns: + assert ( + timed_dataframe[column].compute() == processor.timed_dataframe[column].compute() + ).all() def test_processor_from_runs(): @@ -116,7 +129,7 @@ def test_additional_parameter_to_loader(): """ config = {"core": {"loader": "generic"}} processor = SedProcessor( - folder=df_folder, + folder=df_folder_generic, ftype="json", config=config, folder_config={}, @@ -128,7 +141,7 @@ def test_additional_parameter_to_loader(): def test_repr(): """test the ___repr___ method""" - config = {"core": {"loader": "generic"}} + config = {"core": {"loader": "mpes"}} processor = SedProcessor( config=config, folder_config={}, @@ -147,7 +160,7 @@ def test_repr(): def test_attributes_setters(): """Test class attributes and setters.""" - config = {"core": {"loader": "generic"}} + config = {"core": {"loader": "mpes"}} processor = SedProcessor( config=config, folder_config={}, @@ -170,13 +183,13 @@ def test_attributes_setters(): assert "test" in processor_metadata.keys() processor.add_attribute({"key2": 5}, name="test2") assert processor.attributes["test2"]["key2"] == 5 - assert processor.config["core"]["loader"] == "generic" + assert processor.config["core"]["loader"] == "mpes" assert len(processor.files) == 2 def test_copy_tool(): """Test the copy tool functionality in the processor""" - config = {"core": {"loader": "generic", "use_copy_tool": True}} + config = {"core": {"loader": "mpes", "use_copy_tool": True}} processor = SedProcessor( config=config, folder_config={}, @@ -186,7 +199,7 @@ def test_copy_tool(): assert processor.use_copy_tool is False config = { "core": { - "loader": "generic", + "loader": "mpes", "use_copy_tool": True, "copy_tool_source": source_folder, "copy_tool_dest": dest_folder, @@ -246,7 +259,7 @@ def test_copy_tool(): def test_momentum_correction_workflow(features: np.ndarray): """Test for the momentum correction workflow""" config = parse_config( - config={"core": {"loader": "generic"}}, + config={"core": {"loader": "mpes"}}, folder_config={}, user_config={}, system_config={}, @@ -301,7 +314,7 @@ def test_momentum_correction_workflow(features: np.ndarray): def test_pose_adjustment(): """Test for the pose correction and application of momentum correction workflow""" config = parse_config( - config={"core": {"loader": "generic"}}, + config={"core": {"loader": "mpes"}}, folder_config={}, user_config={}, system_config={}, @@ -351,7 +364,7 @@ def test_pose_adjustment(): def test_momentum_calibration_workflow(): """Test the calibration of the momentum axes""" config = parse_config( - config={"core": {"loader": "generic"}}, + config={"core": {"loader": "mpes"}}, folder_config={}, user_config={}, system_config={}, @@ -404,7 +417,7 @@ def test_momentum_calibration_workflow(): def test_energy_correction(): """Test energy correction workflow.""" config = parse_config( - config={"core": {"loader": "generic"}}, + config={"core": {"loader": "mpes"}}, folder_config={}, user_config={}, system_config={}, @@ -471,7 +484,7 @@ def test_energy_calibration_workflow(energy_scale: str, calibration_method: str) system_config={}, ) processor = SedProcessor( - folder=df_folder + "../mpes/", + folder=df_folder, config=config, folder_config={}, user_config={}, @@ -480,9 +493,9 @@ def test_energy_calibration_workflow(energy_scale: str, calibration_method: str) with pytest.raises(ValueError): processor.load_bias_series() with pytest.raises(ValueError): - processor.load_bias_series(data_files=glob.glob(df_folder + "../mpes/*.h5"), normalize=True) + processor.load_bias_series(data_files=files, normalize=True) processor.load_bias_series( - data_files=glob.glob(df_folder + "../mpes/*.h5"), + data_files=files, normalize=True, bias_key="@KTOF:Lens:Sample:V", ) @@ -570,7 +583,7 @@ def test_delay_calibration_workflow(): system_config={}, ) processor = SedProcessor( - folder=df_folder + "../mpes/", + folder=df_folder, config=config, folder_config={}, user_config={}, @@ -593,7 +606,7 @@ def test_delay_calibration_workflow(): def test_add_jitter(): """Test the jittering function""" config = parse_config( - config={"core": {"loader": "generic"}}, + config={"core": {"loader": "mpes"}}, folder_config={}, user_config={}, system_config={}, @@ -620,7 +633,7 @@ def test_add_jitter(): def test_event_histogram(): """Test histogram plotting function""" config = parse_config( - config={"core": {"loader": "generic"}}, + config={"core": {"loader": "mpes"}}, folder_config={}, user_config={}, system_config={}, @@ -640,7 +653,7 @@ def test_event_histogram(): def test_compute(): """Test binning of final result""" config = parse_config( - config={"core": {"loader": "generic"}}, + config={"core": {"loader": "mpes"}}, folder_config={}, user_config={}, system_config={}, @@ -669,7 +682,7 @@ def test_compute_with_normalization(): system_config={}, ) processor = SedProcessor( - folder=mpes_folder, + folder=df_folder, config=config, folder_config={}, user_config={}, @@ -704,7 +717,7 @@ def test_get_normalization_histogram(): system_config={}, ) processor = SedProcessor( - folder=mpes_folder, + folder=df_folder, config=config, folder_config={}, user_config={}, @@ -767,7 +780,7 @@ def test_save(): system_config={}, ) processor = SedProcessor( - folder=df_folder + "../mpes/", + folder=df_folder, config=config, folder_config={}, user_config={}, From fc40a36f45a0ff66356f94bd4932b219f7aaaf63 Mon Sep 17 00:00:00 2001 From: rettigl Date: Fri, 3 Nov 2023 23:40:58 +0100 Subject: [PATCH 11/13] add timed dataframe to new processor functions --- sed/core/processor.py | 40 ++++++++++++++++++++++++++++++---------- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/sed/core/processor.py b/sed/core/processor.py index bd96f7de..15efc245 100644 --- a/sed/core/processor.py +++ b/sed/core/processor.py @@ -1208,11 +1208,7 @@ def save_energy_calibration( "Energy calibration parameters not found, need to generate parameters first!", ) from exc - config = { - "energy": { - "calibration": calibration, - }, - } + config = {"energy": {"calibration": calibration}} if isinstance(self.ec.offset, dict): config["energy"]["offset"] = self.ec.offset save_config(config, filename, overwrite) @@ -1289,12 +1285,12 @@ def add_energy_offset( ValueError: If the energy column is not in the dataframe. """ energy_column = self._config["dataframe"]["energy_column"] - if energy_column not in self._dataframe.columns: - raise ValueError( - f"Energy column {energy_column} not found in dataframe! " - "Run `append energy axis` first.", - ) if self.dataframe is not None: + if energy_column not in self._dataframe.columns: + raise ValueError( + f"Energy column {energy_column} not found in dataframe! " + "Run `append energy axis` first.", + ) df, metadata = self.ec.add_offsets( df=self._dataframe, constant=constant, @@ -1304,6 +1300,17 @@ def add_energy_offset( reductions=reductions, preserve_mean=preserve_mean, ) + if self._timed_dataframe is not None: + if energy_column in self._timed_dataframe.columns: + self._timed_dataframe, _ = self.ec.add_offsets( + df=self._timed_dataframe, + constant=constant, + columns=columns, + energy_column=energy_column, + signs=signs, + reductions=reductions, + preserve_mean=preserve_mean, + ) self._attributes.add( metadata, "add_energy_offset", @@ -1336,6 +1343,12 @@ def append_tof_ns_axis( df=self._dataframe, **kwargs, ) + if self._timed_dataframe is not None: + if self._config["dataframe"]["tof_column"] in self._timed_dataframe.columns: + self._timed_dataframe, _ = self.ec.append_tof_ns_axis( + df=self._timed_dataframe, + **kwargs, + ) self._attributes.add( metadata, "tof_ns_conversion", @@ -1357,6 +1370,13 @@ def align_dld_sectors(self, sector_delays: np.ndarray = None, **kwargs): sector_delays=sector_delays, **kwargs, ) + if self._timed_dataframe is not None: + if self._config["dataframe"]["tof_column"] in self._timed_dataframe.columns: + self._timed_dataframe, _ = self.ec.align_dld_sectors( + df=self._timed_dataframe, + sector_delays=sector_delays, + **kwargs, + ) self._attributes.add( metadata, "dld_sector_alignment", From 157a07cc211ade6d60ac77cc6dc4a929807113f3 Mon Sep 17 00:00:00 2001 From: rettigl Date: Sun, 5 Nov 2023 11:03:41 +0100 Subject: [PATCH 12/13] fix bugs, and add tests for processor functions --- sed/calibrator/energy.py | 6 +- tests/calibrator/test_energy.py | 4 +- tests/data/loader/flash/config.yaml | 6 +- tests/test_processor.py | 87 ++++++++++++++++++++++++++--- 4 files changed, 88 insertions(+), 15 deletions(-) diff --git a/sed/calibrator/energy.py b/sed/calibrator/energy.py index f63cd5c5..9548af3b 100644 --- a/sed/calibrator/energy.py +++ b/sed/calibrator/energy.py @@ -1442,8 +1442,10 @@ def align_dld_sectors( dask.dataframe.DataFrame: Dataframe with the new columns. dict: Metadata dictionary. """ - sector_delays = sector_delays or self.sector_delays - sector_id_column = sector_id_column or self.sector_id_column + if sector_delays is None: + sector_delays = self.sector_delays + if sector_id_column is None: + sector_id_column = self.sector_id_column if sector_delays is None or sector_id_column is None: raise ValueError( diff --git a/tests/calibrator/test_energy.py b/tests/calibrator/test_energy.py index 042cc4a4..5f9cd56b 100644 --- a/tests/calibrator/test_energy.py +++ b/tests/calibrator/test_energy.py @@ -287,14 +287,14 @@ def test_append_tof_ns_axis(): loader = get_loader(loader_name="mpes", config=config) # from kwds - df, _ = loader.read_dataframe(folders=df_folder, collect_metadata=False) + df, _, _ = loader.read_dataframe(folders=df_folder, collect_metadata=False) ec = EnergyCalibrator(config=config, loader=loader) df, _ = ec.append_tof_ns_axis(df, binwidth=2e-9, binning=1) assert config["dataframe"]["tof_ns_column"] in df.columns np.testing.assert_allclose(df[ec.tof_column], df[ec.tof_ns_column] / 4) # from config - df, _ = loader.read_dataframe(folders=df_folder, collect_metadata=False) + df, _, _ = loader.read_dataframe(folders=df_folder, collect_metadata=False) ec = EnergyCalibrator(config=config, loader=loader) df, _ = ec.append_tof_ns_axis(df) assert config["dataframe"]["tof_ns_column"] in df.columns diff --git a/tests/data/loader/flash/config.yaml b/tests/data/loader/flash/config.yaml index d7d09f63..f1161dcb 100644 --- a/tests/data/loader/flash/config.yaml +++ b/tests/data/loader/flash/config.yaml @@ -26,9 +26,9 @@ dataframe: # the number of iterations to fill the pulseId forward. forward_fill_iterations: 2 - # if true, removes the 3 bits reserved for dldSectorID from the dldTimeandSector column - unravel_8s_detector_time_channel: True - + # if true, removes the 3 bits reserved for dldSectorID from the dldTimeSteps column + split_sector_id_from_dld_time: True + sector_id_reserved_bits: 3 # dataframe column containing x coordinates x_column: dldPosX # dataframe column containing corrected x coordinates diff --git a/tests/test_processor.py b/tests/test_processor.py index 9371ca35..e0787368 100644 --- a/tests/test_processor.py +++ b/tests/test_processor.py @@ -27,7 +27,8 @@ df_folder_generic = package_dir + "/../tests/data/loader/generic/" folder = package_dir + "/../tests/data/calibrator/" files = glob.glob(df_folder + "*.h5") -runs = ["43878", "43878"] +runs = ["30", "50"] +runs_flash = ["43878", "43878"] loader = get_loader(loader_name="mpes") source_folder = package_dir + "/../" dest_folder = tempfile.mkdtemp() @@ -107,20 +108,23 @@ def test_processor_from_folders(): def test_processor_from_runs(): """Test generation of the processor from runs""" - config = df_folder + "../flash/config.yaml" + config = {"core": {"loader": "mpes"}} + dataframe, timed_dataframe, _ = loader.read_dataframe(files=files) processor = SedProcessor( - folder=df_folder + "../flash/", + folder=df_folder, config=config, runs=runs, folder_config={}, user_config={}, system_config={}, ) - assert "dldPosX" in processor.dataframe.columns - # cleanup flash inermediaries - _, parquet_data_dir = processor.loader.initialize_paths() - for file in os.listdir(Path(parquet_data_dir, "buffer")): - os.remove(Path(parquet_data_dir, "buffer", file)) + assert processor.loader.runs == runs + for column in dataframe.columns: + assert (dataframe[column].compute() == processor.dataframe[column].compute()).all() + for column in timed_dataframe.columns: + assert ( + timed_dataframe[column].compute() == processor.timed_dataframe[column].compute() + ).all() def test_additional_parameter_to_loader(): @@ -568,11 +572,78 @@ def test_energy_calibration_workflow(energy_scale: str, calibration_method: str) user_config={}, system_config={}, ) + with pytest.raises(ValueError): + processor.add_energy_offset(constant=1) processor.append_energy_axis(preview=True) assert "energy" in processor.dataframe.columns assert processor.attributes["energy_calibration"]["calibration"]["energy_scale"] == energy_scale os.remove(f"sed_config_energy_calibration_{energy_scale}-{calibration_method}.yaml") + energy1 = processor.dataframe["energy"].compute().values + processor.add_energy_offset(constant=1) + energy2 = processor.dataframe["energy"].compute().values + np.testing.assert_allclose(energy1, energy2 + (1 if energy_scale == "binding" else -1)) + + +def test_align_dld_sectors(): + """Test alignment of DLD sectors for flash detector""" + config = df_folder + "../flash/config.yaml" + processor = SedProcessor( + folder=df_folder + "../flash/", + config=config, + runs=runs_flash, + folder_config={}, + user_config={}, + system_config={}, + ) + assert "dldTimeSteps" in processor.dataframe.columns + assert "dldSectorID" in processor.dataframe.columns + + sector_delays = np.asarray([10, -10, 20, -20, 30, -30, 40, -40]) + + tof_ref = [] + for i in range(len(sector_delays)): + tof_ref.append( + processor.dataframe[processor.dataframe["dldSectorID"] == i]["dldTimeSteps"] + .compute() + .values.astype("float"), + ) + tof_ref_array = np.zeros([len(tof_ref), len(max(tof_ref, key=len))]) + tof_ref_array[:] = np.nan + for i, val in enumerate(tof_ref): + tof_ref_array[i][0 : len(val)] = val + processor.align_dld_sectors(sector_delays=sector_delays) + tof_aligned = [] + for i in range(len(sector_delays)): + tof_aligned.append( + processor.dataframe[processor.dataframe["dldSectorID"] == i]["dldTimeSteps"] + .compute() + .values, + ) + tof_aligned_array = np.zeros([len(tof_aligned), len(max(tof_aligned, key=len))]) + tof_aligned_array[:] = np.nan + for i, val in enumerate(tof_aligned): + tof_aligned_array[i][0 : len(val)] = val + np.testing.assert_allclose(tof_ref_array, tof_aligned_array + sector_delays[:, np.newaxis]) + + # cleanup flash inermediaries + _, parquet_data_dir = processor.loader.initialize_paths() + for file in os.listdir(Path(parquet_data_dir, "buffer")): + os.remove(Path(parquet_data_dir, "buffer", file)) + + +def test_append_tof_ns_axis(): + """Test the append_tof_ns_axis function""" + processor = SedProcessor( + folder=df_folder, + config={"core": {"loader": "mpes"}}, + folder_config={}, + user_config={}, + system_config={}, + ) + processor.append_tof_ns_axis() + assert processor.config["dataframe"]["tof_ns_column"] in processor.dataframe + def test_delay_calibration_workflow(): """Test the delay calibration workflow""" From 3dc7c5f74f86422590584843ba9210b3d88952c5 Mon Sep 17 00:00:00 2001 From: rettigl Date: Sun, 5 Nov 2023 11:56:56 +0100 Subject: [PATCH 13/13] try fix for test failures --- tests/test_processor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_processor.py b/tests/test_processor.py index e0787368..dcf8cd0a 100644 --- a/tests/test_processor.py +++ b/tests/test_processor.py @@ -90,7 +90,7 @@ def test_processor_from_files(): def test_processor_from_folders(): """Test generation of the processor from a folder""" config = {"core": {"loader": "mpes"}} - dataframe, timed_dataframe, _ = loader.read_dataframe(files=files) + dataframe, timed_dataframe, _ = loader.read_dataframe(folders=df_folder) processor = SedProcessor( folder=df_folder, config=config, @@ -109,7 +109,7 @@ def test_processor_from_folders(): def test_processor_from_runs(): """Test generation of the processor from runs""" config = {"core": {"loader": "mpes"}} - dataframe, timed_dataframe, _ = loader.read_dataframe(files=files) + dataframe, timed_dataframe, _ = loader.read_dataframe(folders=df_folder, runs=runs) processor = SedProcessor( folder=df_folder, config=config,