From be623c2e7a54f0500af83ce2891334f0707c2ea2 Mon Sep 17 00:00:00 2001 From: rettigl Date: Wed, 11 Oct 2023 00:11:15 +0200 Subject: [PATCH] add tests for timed dataframes, histogram generation and related processor function add accessor functions for binned and normalized histograms and normalization histograms --- sed/core/processor.py | 63 ++++++++++++++++++++++++++++ tests/loader/test_loaders.py | 23 +++++++++++ tests/test_binning.py | 28 +++++++++++++ tests/test_processor.py | 79 +++++++++++++++++++++++++++++++++++- 4 files changed, 192 insertions(+), 1 deletion(-) diff --git a/sed/core/processor.py b/sed/core/processor.py index fb9b5e76..66ece055 100644 --- a/sed/core/processor.py +++ b/sed/core/processor.py @@ -189,6 +189,34 @@ def dataframe(self, dataframe: Union[pd.DataFrame, ddf.DataFrame]): ) self._dataframe = dataframe + @property + def timed_dataframe(self) -> Union[pd.DataFrame, ddf.DataFrame]: + """Accessor to the underlying timed_dataframe. + + Returns: + Union[pd.DataFrame, ddf.DataFrame]: Timed Dataframe object. + """ + return self._timed_dataframe + + @timed_dataframe.setter + def timed_dataframe(self, timed_dataframe: Union[pd.DataFrame, ddf.DataFrame]): + """Setter for the underlying timed dataframe. + + Args: + timed_dataframe (Union[pd.DataFrame, ddf.DataFrame]): The timed dataframe object to set + """ + if not isinstance(timed_dataframe, (pd.DataFrame, ddf.DataFrame)) or not isinstance( + timed_dataframe, + self._timed_dataframe.__class__, + ): + raise ValueError( + "'timed_dataframe' has to be a Pandas or Dask dataframe and has to be of the same " + "kind as the dataframe loaded into the SedProcessor!.\n" + f"Loaded type: {self._timed_dataframe.__class__}, " + f"provided type: {timed_dataframe}.", + ) + self._timed_dataframe = timed_dataframe + @property def attributes(self) -> dict: """Accessor to the metadata dict. @@ -229,6 +257,41 @@ def files(self) -> List[str]: """ return self._files + @property + def binned(self) -> xr.DataArray: + """Getter attribute for the binned data array + + Returns: + xr.DataArray: The binned data array + """ + if self._binned is None: + raise ValueError("No binned data available, need to compute histogram first!") + return self._binned + + @property + def normalized(self) -> xr.DataArray: + """Getter attribute for the normalized data array + + Returns: + xr.DataArray: The normalized data array + """ + if self._normalized is None: + raise ValueError( + "No normalized data available, compute data with normalization enabled!", + ) + return self._normalized + + @property + def normalization_histogram(self) -> xr.DataArray: + """Getter attribute for the normalization histogram + + Returns: + xr.DataArray: The normalizazion histogram + """ + if self._normalization_histogram is None: + raise ValueError("No normalization histogram available, generate histogram first!") + return self._normalization_histogram + def cpy(self, path: Union[str, List[str]]) -> Union[str, List[str]]: """Function to mirror a list of files or a folder from a network drive to a local storage. Returns either the original or the copied path to the given diff --git a/tests/loader/test_loaders.py b/tests/loader/test_loaders.py index 2beb5804..b3bbbe79 100644 --- a/tests/loader/test_loaders.py +++ b/tests/loader/test_loaders.py @@ -158,6 +158,29 @@ def test_has_correct_read_dataframe_func(loader: BaseLoader, read_type: str): os.remove(Path(parquet_data_dir, "buffer", file)) +@pytest.mark.parametrize("loader", get_all_loaders()) +def test_timed_dataframe(loader: BaseLoader): + """Test if the loaders return a correct timed dataframe + + Args: + loader (BaseLoader): the loader object to test + """ + if loader.__name__ != "BaseLoader": + loader_name = get_loader_name_from_loader_object(loader) + input_folder = os.path.join(test_data_dir, "loader", loader_name) + for supported_file_type in loader.supported_file_types: + loaded_dataframe, loaded_timed_dataframe, _ = loader.read_dataframe( + folders=input_folder, + ftype=supported_file_type, + collect_metadata=False, + ) + if loaded_timed_dataframe is None: + pytest.skip("Not implemented") + assert isinstance(loaded_timed_dataframe, ddf.DataFrame) + assert set(loaded_timed_dataframe.columns).issubset(set(loaded_dataframe.columns)) + assert loaded_timed_dataframe.npartitions == loaded_dataframe.npartitions + + @pytest.mark.parametrize("loader", get_all_loaders()) def test_get_count_rate(loader: BaseLoader): """Test the get_count_rate function diff --git a/tests/test_binning.py b/tests/test_binning.py index 871be1bc..482961b1 100644 --- a/tests/test_binning.py +++ b/tests/test_binning.py @@ -14,6 +14,8 @@ from sed.binning.binning import bin_dataframe from sed.binning.binning import bin_partition +from sed.binning.binning import normalization_histogram_from_timed_dataframe +from sed.binning.binning import normalization_histogram_from_timestamps from sed.binning.binning import numba_histogramdd from sed.binning.binning import simplify_binning_arguments from sed.binning.numba_bin import _hist_from_bin_range @@ -504,3 +506,29 @@ def test_bin_dataframe(): np.testing.assert_allclose(res.values, res2.values) res2 = bin_dataframe(df=sample_ddf, bins=bins, axes=columns, ranges=ranges, mode="lean") np.testing.assert_allclose(res.values, res2.values) + + +def test_normalization_histogram_from_timestamps(): + """Test the function to generate the normalization histogram from timestamps""" + time_stamped_df = sample_ddf.copy() + time_stamped_df["timeStamps"] = time_stamped_df.index + res = bin_dataframe(df=sample_ddf, bins=[bins[0]], axes=[columns[0]], ranges=[ranges[0]]) + histogram = normalization_histogram_from_timestamps( + df=time_stamped_df, + axis=columns[0], + bin_centers=res.coords[columns[0]].values, + time_stamp_column="timeStamps", + ) + np.testing.assert_allclose(res / res.sum(), histogram / histogram.sum(), rtol=0.001) + + +def test_normalization_histogram_from_timed_dataframe(): + """Test the function to generate the normalization histogram from the timed dataframe""" + res = bin_dataframe(df=sample_ddf, bins=[bins[0]], axes=[columns[0]], ranges=[ranges[0]]) + histogram = normalization_histogram_from_timed_dataframe( + df=sample_ddf, + axis=columns[0], + bin_centers=res.coords[columns[0]].values, + time_unit=1, + ) + np.testing.assert_allclose(res / res.sum(), histogram / histogram.sum()) diff --git a/tests/test_processor.py b/tests/test_processor.py index 19bf02cc..62167db5 100644 --- a/tests/test_processor.py +++ b/tests/test_processor.py @@ -23,6 +23,7 @@ # pylint: disable=duplicate-code package_dir = os.path.dirname(find_spec("sed").origin) df_folder = package_dir + "/../tests/data/loader/generic/" +mpes_folder = package_dir + "/../tests/data/loader/mpes/" folder = package_dir + "/../tests/data/calibrator/" files = glob.glob(df_folder + "*.parquet") runs = ["43878", "43878"] @@ -627,10 +628,86 @@ def test_compute(): axes = ["X", "Y", "t", "ADC"] ranges = [[0, 2048], [0, 2048], [0, 200000], [0, 50000]] result = processor.compute(bins=bins, axes=axes, ranges=ranges, df_partitions=5) - assert result.data.shape == (10, 10, 10, 10) + assert result.data.shape == tuple(bins) assert result.data.sum(axis=(0, 1, 2, 3)) > 0 +def test_compute_with_normalization(): + """Test binning of final result with histogram normalization""" + config = parse_config( + config={"core": {"loader": "mpes"}}, + folder_config={}, + user_config={}, + system_config={}, + ) + processor = SedProcessor( + folder=mpes_folder, + config=config, + folder_config={}, + user_config={}, + system_config={}, + ) + bins = [10, 10, 10, 5] + axes = ["X", "Y", "t", "ADC"] + ranges = [[0, 2048], [0, 2048], [0, 200000], [650, 655]] + result = processor.compute( + bins=bins, + axes=axes, + ranges=ranges, + df_partitions=5, + normalize_to_acquisition_time="ADC", + ) + assert result.data.shape == tuple(bins) + assert result.data.sum(axis=(0, 1, 2, 3)) > 0 + assert processor.normalization_histogram is not None + assert processor.normalized is not None + np.testing.assert_allclose( + processor.binned.data, + (processor.normalized * processor.normalization_histogram).data, + ) + + +def test_get_normalization_histogram(): + """Test the generation function for the normalization histogram""" + config = parse_config( + config={"core": {"loader": "mpes"}, "dataframe": {"time_stamp_alias": "timeStamps"}}, + folder_config={}, + user_config={}, + system_config={}, + ) + processor = SedProcessor( + folder=mpes_folder, + config=config, + folder_config={}, + user_config={}, + system_config={}, + time_stamps=True, + ) + bins = [10, 10, 10, 5] + axes = ["X", "Y", "t", "ADC"] + ranges = [[0, 2048], [0, 2048], [0, 200000], [650, 655]] + with pytest.raises(ValueError): + processor.get_normalization_histogram(axis="ADC") + processor.compute(bins=bins, axes=axes, ranges=ranges, df_partitions=5) + with pytest.raises(ValueError): + processor.get_normalization_histogram(axis="Delay") + histogram1 = processor.get_normalization_histogram(axis="ADC", df_partitions=1) + histogram2 = processor.get_normalization_histogram( + axis="ADC", + use_time_stamps="True", + df_partitions=1, + ) + # TODO: Check why histograms are so different + np.testing.assert_allclose( + histogram1 / histogram1.sum(), + histogram2 / histogram2.sum(), + atol=0.02, + ) + # histogram1 = processor.get_normalization_histogram(axis="ADC") + # histogram2 = processor.get_normalization_histogram(axis="ADC", use_time_stamps="True") + # np.testing.assert_allclose(histogram1, histogram2) + + metadata: Dict[Any, Any] = {} metadata["entry_title"] = "Title" # User