diff --git a/sed/calibrator/energy.py b/sed/calibrator/energy.py index 7c5bbfa4..6feae0dc 100644 --- a/sed/calibrator/energy.py +++ b/sed/calibrator/energy.py @@ -251,7 +251,7 @@ def bin_data( if bias_key is None: bias_key = self._config.get("energy", {}).get("bias_key", "") - dataframe, _ = self.loader.read_dataframe( + dataframe, _, _ = self.loader.read_dataframe( files=data_files, collect_metadata=False, ) diff --git a/sed/core/processor.py b/sed/core/processor.py index a7e3d8fe..c35a6b70 100644 --- a/sed/core/processor.py +++ b/sed/core/processor.py @@ -18,6 +18,7 @@ import xarray as xr from sed.binning import bin_dataframe +from sed.binning.utils import bin_centers_to_bin_edges from sed.calibrator import DelayCalibrator from sed.calibrator import EnergyCalibrator from sed.calibrator import MomentumCorrector @@ -87,6 +88,7 @@ def __init__( self._config["binning"]["num_cores"] = num_cores self._dataframe: Union[pd.DataFrame, ddf.DataFrame] = None + self._timed_dataframe: Union[pd.DataFrame, ddf.DataFrame] = None self._files: List[str] = [] self._binned: xr.DataArray = None @@ -281,23 +283,26 @@ def load( metadata = {} if dataframe is not None: self._dataframe = dataframe + self._timed_dataframe = None elif folder is not None: - dataframe, metadata = self.loader.read_dataframe( + dataframe, timed_dataframe, metadata = self.loader.read_dataframe( folder=cast(str, self.cpy(folder)), metadata=metadata, collect_metadata=collect_metadata, **kwds, ) self._dataframe = dataframe + self._timed_dataframe = timed_dataframe self._files = self.loader.files elif files is not None: - dataframe, metadata = self.loader.read_dataframe( + dataframe, timed_dataframe, metadata = self.loader.read_dataframe( files=cast(List[str], self.cpy(files)), metadata=metadata, collect_metadata=collect_metadata, **kwds, ) self._dataframe = dataframe + self._timed_dataframe = timed_dataframe self._files = self.loader.files else: raise ValueError( @@ -503,6 +508,10 @@ def apply_momentum_correction( self._dataframe, metadata = self.mc.apply_corrections( df=self._dataframe, ) + if self._timed_dataframe is not None: + self._timed_dataframe, _ = self.mc.apply_corrections( + self._timed_dataframe, + ) # Add Metadata self._attributes.add( metadata, @@ -592,6 +601,11 @@ class and the momentum calibration to the dataframe. df=self._dataframe, calibration=calibration, ) + if self._timed_dataframe is not None: + self._timed_dataframe, _ = self.mc.append_k_axis( + df=self._timed_dataframe, + calibration=calibration, + ) # Add Metadata self._attributes.add( @@ -677,6 +691,12 @@ def apply_energy_correction( correction=correction, **kwds, ) + if self._timed_dataframe is not None: + self._timed_dataframe, _ = self.ec.apply_energy_correction( + df=self._timed_dataframe, + correction=correction, + **kwds, + ) # Add Metadata self._attributes.add( @@ -929,6 +949,12 @@ def append_energy_axis( calibration=calibration, **kwds, ) + if self._timed_dataframe is not None: + self._timed_dataframe, _ = self.ec.append_energy_axis( + df=self._timed_dataframe, + calibration=calibration, + **kwds, + ) # Add Metadata self._attributes.add( @@ -969,6 +995,12 @@ def calibrate_delay_axis( delay_range=delay_range, **kwds, ) + if self._timed_dataframe is not None: + self._timed_dataframe, _ = self.dc.append_delay_axis( + self._timed_dataframe, + delay_range=delay_range, + **kwds, + ) else: if datafile is None: try: @@ -985,6 +1017,12 @@ def calibrate_delay_axis( datafile=datafile, **kwds, ) + if self._timed_dataframe is not None: + self._timed_dataframe, _ = self.dc.append_delay_axis( + self._timed_dataframe, + datafile=datafile, + **kwds, + ) # Add Metadata self._attributes.add( @@ -1192,6 +1230,57 @@ def compute( return self._binned + def get_normalization_histogram( + self, + axis: str = "delay", + **kwds, + ) -> np.ndarray: + """Generates a normalization histogram from the TimeStamps column of the + dataframe. + + Args: + axis (str, optional): The axis for which to compute histogram. + Defaults to "delay". + **kwds: Keyword arguments: + + -df_partitions (int, optional): Number of dataframe partitions to use. + Defaults to all. + + Raises: + ValueError: Raised if no data are binned. + ValueError: Raised if 'axis' not in binned coordinates. + ValueError: Raised if config["dataframe"]["time_stamp_alias"] not found + in Dataframe. + + Returns: + np.ndarray: The computed normalization histogram (in TimeStamp units + per bin). + """ + + if self._binned is None: + raise ValueError("Need to bin data first!") + if axis not in self._binned.coords: + raise ValueError(f"Axis '{axis}' not found in binned data!") + + df_partitions = kwds.pop("df_partitions", None) + if df_partitions is not None: + timed_dataframe = self._timed_dataframe.partitions[ + 0 : min(df_partitions, self._timed_dataframe.npartitions) + ] + else: + timed_dataframe = self._timed_dataframe + + bins = timed_dataframe[axis].map_partitions( + pd.cut, + bins=bin_centers_to_bin_edges(self._binned.coords[axis].values), + ) + + histogram = ( + timed_dataframe[axis].groupby([bins]).count().compute().values + ) / 1000 + + return histogram + def view_event_histogram( self, dfpid: int, diff --git a/sed/loader/base/loader.py b/sed/loader/base/loader.py index 7f6e842d..dd2b8df0 100644 --- a/sed/loader/base/loader.py +++ b/sed/loader/base/loader.py @@ -51,7 +51,7 @@ def read_dataframe( metadata: dict = None, collect_metadata: bool = False, **kwds, - ) -> Tuple[ddf.DataFrame, dict]: + ) -> Tuple[ddf.DataFrame, ddf.DataFrame, dict]: """Reads data from given files or folder and returns a dask dataframe and corresponding metadata. @@ -70,8 +70,8 @@ def read_dataframe( **kwds: keyword arguments. Se describtion in respective loader. Returns: - Tuple[ddf.DataFrame, dict]: Dask dataframe and metadata read from - specified files. + Tuple[ddf.DataFrame, dict]: Dask dataframe, timed dataframe and metadata + read from specified files. """ if metadata is None: @@ -100,7 +100,7 @@ def read_dataframe( if not files: raise FileNotFoundError("No valid files found!") - return None, None + return None, None, None @abstractmethod def get_count_rate( diff --git a/sed/loader/generic/loader.py b/sed/loader/generic/loader.py index 548aedf9..fdd4288b 100644 --- a/sed/loader/generic/loader.py +++ b/sed/loader/generic/loader.py @@ -33,7 +33,7 @@ def read_dataframe( metadata: dict = None, collect_metadata: bool = False, **kwds, - ) -> Tuple[ddf.DataFrame, dict]: + ) -> Tuple[ddf.DataFrame, ddf.DataFrame, dict]: """Read stored files from a folder into a dataframe. Args: @@ -57,8 +57,8 @@ def read_dataframe( ValueError: Raised if the file type is not supported. Returns: - Tuple[ddf.DataFrame, dict]: Dask dataframe and metadata read from specified - files. + Tuple[ddf.DataFrame, dict]: Dask dataframe, timed dataframe and metadata + read from specified files. """ # pylint: disable=duplicate-code super().read_dataframe( @@ -76,16 +76,16 @@ def read_dataframe( self.metadata = self.metadata if ftype == "parquet": - return (ddf.read_parquet(self.files, **kwds), self.metadata) + return (ddf.read_parquet(self.files, **kwds), None, self.metadata) if ftype == "json": - return (ddf.read_json(self.files, **kwds), self.metadata) + return (ddf.read_json(self.files, **kwds), None, self.metadata) if ftype == "csv": - return (ddf.read_csv(self.files, **kwds), self.metadata) + return (ddf.read_csv(self.files, **kwds), None, self.metadata) try: - return (ddf.read_table(self.files, **kwds), self.metadata) + return (ddf.read_table(self.files, **kwds), None, self.metadata) except (TypeError, ValueError, NotImplementedError) as exc: raise ValueError( "The file format cannot be understood!", diff --git a/sed/loader/mpes/loader.py b/sed/loader/mpes/loader.py index 21b94a56..f1b7a915 100644 --- a/sed/loader/mpes/loader.py +++ b/sed/loader/mpes/loader.py @@ -102,6 +102,87 @@ def hdf5_to_dataframe( return ddf.from_dask_array(array_stack, columns=column_names) +def hdf5_to_timed_dataframe( + files: Sequence[str], + group_names: Sequence[str] = None, + alias_dict: Dict[str, str] = None, + time_stamps: bool = False, + time_stamp_alias: str = "timeStamps", + ms_markers_group: str = "msMarkers", + first_event_time_stamp_key: str = "FirstEventTimeStamp", + **kwds, +) -> ddf.DataFrame: + """Function to read a selection of hdf5-files, and generate a delayed dask + dataframe from provided groups in the files. Optionally, aliases can be defined. + Returns a dataframe for evenly spaced time intervals. + + Args: + files (List[str]): A list of the file paths to load. + group_names (List[str], optional): hdf5 group names to load. Defaults to load + all groups containing "Stream" + alias_dict (Dict[str, str], optional): Dictionary of aliases for the dataframe + columns. Keys are the hdf5 groupnames, and values the aliases. If an alias + is not found, its group name is used. Defaults to read the attribute + "Name" from each group. + time_stamps (bool, optional): Option to calculate time stamps. Defaults to + False. + time_stamp_alias (str): Alias name for the timestamp column. + Defaults to "timeStamps". + ms_markers_group (str): h5 column containing timestamp information. + Defaults to "msMarkers". + first_event_time_stamp_key (str): h5 attribute containing the start + timestamp of a file. Defaults to "FirstEventTimeStamp". + + Returns: + ddf.DataFrame: The delayed Dask DataFrame + """ + if group_names is None: + group_names = [] + if alias_dict is None: + alias_dict = {} + + # Read a file to parse the file structure + test_fid = kwds.pop("test_fid", 0) + test_proc = h5py.File(files[test_fid]) + if group_names == []: + group_names, alias_dict = get_groups_and_aliases( + h5file=test_proc, + seach_pattern="Stream", + ) + + column_names = [alias_dict.get(group, group) for group in group_names] + + if time_stamps: + column_names.append(time_stamp_alias) + + test_array = hdf5_to_timed_array( + h5file=test_proc, + group_names=group_names, + time_stamps=time_stamps, + ms_markers_group=ms_markers_group, + first_event_time_stamp_key=first_event_time_stamp_key, + ) + + # Delay-read all files + arrays = [ + da.from_delayed( + dask.delayed(hdf5_to_timed_array)( + h5file=h5py.File(f), + group_names=group_names, + time_stamps=time_stamps, + ms_markers_group=ms_markers_group, + first_event_time_stamp_key=first_event_time_stamp_key, + ), + dtype=test_array.dtype, + shape=(test_array.shape[0], np.nan), + ) + for f in files + ] + array_stack = da.concatenate(arrays, axis=1).T + + return ddf.from_dask_array(array_stack, columns=column_names) + + def get_groups_and_aliases( h5file: h5py.File, seach_pattern: str = None, @@ -225,6 +306,81 @@ def hdf5_to_array( return np.asarray(data_list) +def hdf5_to_timed_array( + h5file: h5py.File, + group_names: Sequence[str], + data_type: str = "float32", + time_stamps=False, + ms_markers_group: str = "msMarkers", + first_event_time_stamp_key: str = "FirstEventTimeStamp", +) -> np.ndarray: + """Reads the content of the given groups in an hdf5 file, and returns a + timed version of a 2-dimensional array with the corresponding values. + + Args: + h5file (h5py.File): + hdf5 file handle to read from + group_names (str): + group names to read + data_type (str, optional): + Data type of the output data. Defaults to "float32". + time_stamps (bool, optional): + Option to calculate time stamps. Defaults to False. + ms_markers_group (str): h5 column containing timestamp information. + Defaults to "msMarkers". + first_event_time_stamp_key (str): h5 attribute containing the start + timestamp of a file. Defaults to "FirstEventTimeStamp". + + Returns: + np.ndarray: the array of the values at evently spaced timing obtained from + the ms_markers. + """ + + # Delayed array for loading an HDF5 file of reasonable size (e.g. < 1GB) + + # Read out groups: + data_list = [] + ms_marker = np.asarray(h5file[ms_markers_group]) + for group in group_names: + + g_dataset = np.asarray(h5file[group]) + if bool(data_type): + g_dataset = g_dataset.astype(data_type) + + timed_dataset = np.zeros_like(ms_marker) + for i, point in enumerate(ms_marker): + timed_dataset[i] = ( + g_dataset[point] + if point < len(g_dataset) + else g_dataset[len(g_dataset) - 1] + ) + + data_list.append(timed_dataset) + + # calculate time stamps + if time_stamps: + # try to get start timestamp from "FirstEventTimeStamp" attribute + try: + start_time_str = get_attribute(h5file, first_event_time_stamp_key) + start_time = datetime.datetime.strptime( + start_time_str, + "%Y-%m-%dT%H:%M:%S.%f%z", + ).timestamp() + except KeyError: + # get the start time of the file from its modification date if the key + # does not exist (old files) + start_time = os.path.getmtime(h5file.filename) # convert to ms + # the modification time points to the time when the file was finished, so we + # need to correct for the time it took to write the file + start_time -= len(ms_marker) / 1000 + + time_stamp_data = start_time + ms_marker / 1000 + + data_list.append(time_stamp_data) + + return np.asarray(data_list) + + def get_attribute(h5group: h5py.Group, attribute: str) -> str: """Reads, decodes and returns an attrubute from an hdf5 group @@ -323,7 +479,7 @@ def read_dataframe( collect_metadata: bool = False, time_stamps: bool = False, **kwds, - ) -> Tuple[ddf.DataFrame, dict]: + ) -> Tuple[ddf.DataFrame, ddf.DataFrame, dict]: """Read stored hdf5 files from a list or from folder and returns a dask dataframe and corresponding metadata. @@ -357,8 +513,8 @@ def read_dataframe( FileNotFoundError: Raised if a file or folder is not found. Returns: - Tuple[ddf.DataFrame, dict]: Dask dataframe and metadata read from specified - files. + Tuple[ddf.DataFrame, ddf.DataFrame, dict]: Dask dataframe, timed Dask + dataframe and metadata read from specified files. """ # pylint: disable=duplicate-code super().read_dataframe( @@ -407,6 +563,16 @@ def read_dataframe( first_event_time_stamp_key=first_event_time_stamp_key, **kwds, ) + timed_df = hdf5_to_timed_dataframe( + files=self.files, + group_names=hdf5_groupnames, + alias_dict=hdf5_aliases, + time_stamps=time_stamps, + time_stamp_alias=time_stamp_alias, + ms_markers_group=ms_markers_group, + first_event_time_stamp_key=first_event_time_stamp_key, + **kwds, + ) if collect_metadata: metadata = self.gather_metadata( @@ -416,7 +582,7 @@ def read_dataframe( else: metadata = self.metadata - return df, metadata + return df, timed_df, metadata def gather_metadata( self, diff --git a/tests/calibrator/test_delay.py b/tests/calibrator/test_delay.py index 56c4fb40..945ba615 100644 --- a/tests/calibrator/test_delay.py +++ b/tests/calibrator/test_delay.py @@ -16,7 +16,7 @@ def test_delay_parameters_from_file(): """Test the option to extract the delay parameters from a file""" - df, _ = get_loader(loader_name="mpes", config=config).read_dataframe( + df, _, _ = get_loader(loader_name="mpes", config=config).read_dataframe( files=[file], collect_metadata=False, ) @@ -33,7 +33,7 @@ def test_delay_parameters_from_file(): def test_delay_parameters_from_delay_range(): """Test the option to extract the delay parameters from a delay range""" # from keywords - df, _ = get_loader(loader_name="mpes", config=config).read_dataframe( + df, _, _ = get_loader(loader_name="mpes", config=config).read_dataframe( files=[file], collect_metadata=False, ) @@ -44,7 +44,7 @@ def test_delay_parameters_from_delay_range(): assert "adc_range" in metadata["calibration"] # from calibration - df, _ = get_loader(loader_name="mpes", config=config).read_dataframe( + df, _, _ = get_loader(loader_name="mpes", config=config).read_dataframe( files=[file], collect_metadata=False, ) @@ -60,7 +60,7 @@ def test_delay_parameters_from_delay_range(): def test_delay_parameters_from_delay_range_mm(): """Test the option to extract the delay parameters from a mm range + t0""" # from keywords - df, _ = get_loader(loader_name="mpes", config=config).read_dataframe( + df, _, _ = get_loader(loader_name="mpes", config=config).read_dataframe( files=[file], collect_metadata=False, ) @@ -75,7 +75,7 @@ def test_delay_parameters_from_delay_range_mm(): assert "delay_range_mm" in metadata["calibration"] # from dict - df, _ = get_loader(loader_name="mpes", config=config).read_dataframe( + df, _, _ = get_loader(loader_name="mpes", config=config).read_dataframe( files=[file], collect_metadata=False, ) diff --git a/tests/calibrator/test_energy.py b/tests/calibrator/test_energy.py index 73b2d346..1276454e 100644 --- a/tests/calibrator/test_energy.py +++ b/tests/calibrator/test_energy.py @@ -109,7 +109,7 @@ def test_calibrate_append(energy_scale: str, calibration_method: str): calibration_method (str): method used for ralibration """ loader = get_loader(loader_name="mpes", config=config) - df, _ = loader.read_dataframe(folder=df_folder, collect_metadata=False) + df, _, _ = loader.read_dataframe(folder=df_folder, collect_metadata=False) ec = EnergyCalibrator(config=config, loader=loader) ec.load_data(biases=biases, traces=traces, tof=tof) ec.normalize() @@ -159,7 +159,7 @@ def test_apply_correction_from_dict_kwds(calib_type: str, calib_dict: dict): """ loader = get_loader(loader_name="mpes", config=config) # from dict - df, _ = loader.read_dataframe(folder=df_folder, collect_metadata=False) + df, _, _ = loader.read_dataframe(folder=df_folder, collect_metadata=False) ec = EnergyCalibrator(config=config, loader=loader) df, metadata = ec.append_energy_axis(df, calibration=calib_dict) assert config["dataframe"]["energy_column"] in df.columns @@ -169,7 +169,7 @@ def test_apply_correction_from_dict_kwds(calib_type: str, calib_dict: dict): assert metadata["calibration"]["calib_type"] == calib_type # from kwds - df, _ = loader.read_dataframe(folder=df_folder, collect_metadata=False) + df, _, _ = loader.read_dataframe(folder=df_folder, collect_metadata=False) ec = EnergyCalibrator(config=config, loader=loader) df, metadata = ec.append_energy_axis(df, **calib_dict) assert config["dataframe"]["energy_column"] in df.columns diff --git a/tests/calibrator/test_momentum.py b/tests/calibrator/test_momentum.py index 73da57c5..e3d4283c 100644 --- a/tests/calibrator/test_momentum.py +++ b/tests/calibrator/test_momentum.py @@ -107,7 +107,7 @@ def test_pose_correction(): def test_apply_correction(): """Test the application of the distortion correction to the dataframe.""" - df, _ = get_loader(loader_name="mpes", config=config).read_dataframe( + df, _, _ = get_loader(loader_name="mpes", config=config).read_dataframe( folder=df_folder, collect_metadata=False, ) @@ -214,7 +214,7 @@ def test_apply_registration( depends_on: Dict[Any, Any], ): """Test the application of the distortion correction to the dataframe.""" - df, _ = get_loader(loader_name="mpes", config=config).read_dataframe( + df, _, _ = get_loader(loader_name="mpes", config=config).read_dataframe( folder=df_folder, collect_metadata=False, ) @@ -263,7 +263,7 @@ def test_momentum_calibration_equiscale(): """Test the calibration using one point and the k-distance, and application to the dataframe. """ - df, _ = get_loader(loader_name="mpes", config=config).read_dataframe( + df, _, _ = get_loader(loader_name="mpes", config=config).read_dataframe( folder=df_folder, collect_metadata=False, ) @@ -288,7 +288,7 @@ def test_momentum_calibration_equiscale(): def test_momentum_calibration_two_points(): """Test the calibration using two k-points, and application to the dataframe.""" - df, _ = get_loader(loader_name="mpes", config=config).read_dataframe( + df, _, _ = get_loader(loader_name="mpes", config=config).read_dataframe( folder=df_folder, collect_metadata=False, ) diff --git a/tests/loader/test_loaders.py b/tests/loader/test_loaders.py index 8782a077..7497dcdf 100644 --- a/tests/loader/test_loaders.py +++ b/tests/loader/test_loaders.py @@ -70,13 +70,21 @@ def test_has_correct_read_dataframe_func(loader): extension=supported_file_type, ) if read_type == "folder": - loaded_dataframe, loaded_metadata = loader.read_dataframe( + ( + loaded_dataframe, + _, + loaded_metadata, + ) = loader.read_dataframe( folder=input_folder, ftype=supported_file_type, collect_metadata=False, ) else: - loaded_dataframe, loaded_metadata = loader.read_dataframe( + ( + loaded_dataframe, + _, + loaded_metadata, + ) = loader.read_dataframe( files=list(input_files), ftype=supported_file_type, collect_metadata=False, @@ -92,7 +100,7 @@ def test_mpes_timestamps(): loader_name = "mpes" loader = get_loader(loader_name) input_folder = os.path.join(test_data_dir, "loader", loader_name) - df, _ = loader.read_dataframe( + df, _, _ = loader.read_dataframe( folder=input_folder, collect_metadata=False, time_stamps=True, diff --git a/tutorial/3 - Conversion Pipeline for example time-resolved ARPES data.ipynb b/tutorial/3 - Conversion Pipeline for example time-resolved ARPES data.ipynb index 3e8dd4c2..014f9c0b 100644 --- a/tutorial/3 - Conversion Pipeline for example time-resolved ARPES data.ipynb +++ b/tutorial/3 - Conversion Pipeline for example time-resolved ARPES data.ipynb @@ -483,7 +483,7 @@ "axes = ['kx', 'ky', 'energy', 'delay']\n", "bins = [100, 100, 200, 50]\n", "ranges = [[-2, 2], [-2, 2], [-4, 2], [-600, 1600]]\n", - "res = sp.compute(bins=bins, axes=axes, ranges=ranges)" + "res = sp.compute(bins=bins, axes=axes, ranges=ranges, df_partitions=10)" ] }, { @@ -515,6 +515,29 @@ "id": "3073b9ba", "metadata": {}, "outputs": [], + "source": [ + "histogram = sp.get_normalization_histogram()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "596a3217", + "metadata": {}, + "outputs": [], + "source": [ + "ax, fig = plt.subplots(1,1)\n", + "plt.plot(histogram)\n", + "plt.plot(sp._binned.sum(axis=(0,1,2))/10000)\n", + "plt.show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0664d479", + "metadata": {}, + "outputs": [], "source": [] }, {