Skip to content

Commit

Permalink
Adds support for a timed dataframe, and histogram calculation from th…
Browse files Browse the repository at this point in the history
…is dataframe
  • Loading branch information
rettigl committed May 15, 2023
1 parent 462bbeb commit c9b8a17
Show file tree
Hide file tree
Showing 10 changed files with 320 additions and 34 deletions.
2 changes: 1 addition & 1 deletion sed/calibrator/energy.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def bin_data(
if bias_key is None:
bias_key = self._config.get("energy", {}).get("bias_key", "")

dataframe, _ = self.loader.read_dataframe(
dataframe, _, _ = self.loader.read_dataframe(
files=data_files,
collect_metadata=False,
)
Expand Down
93 changes: 91 additions & 2 deletions sed/core/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import xarray as xr

from sed.binning import bin_dataframe
from sed.binning.utils import bin_centers_to_bin_edges
from sed.calibrator import DelayCalibrator
from sed.calibrator import EnergyCalibrator
from sed.calibrator import MomentumCorrector
Expand Down Expand Up @@ -87,6 +88,7 @@ def __init__(
self._config["binning"]["num_cores"] = num_cores

self._dataframe: Union[pd.DataFrame, ddf.DataFrame] = None
self._timed_dataframe: Union[pd.DataFrame, ddf.DataFrame] = None
self._files: List[str] = []

self._binned: xr.DataArray = None
Expand Down Expand Up @@ -281,23 +283,26 @@ def load(
metadata = {}
if dataframe is not None:
self._dataframe = dataframe
self._timed_dataframe = None
elif folder is not None:
dataframe, metadata = self.loader.read_dataframe(
dataframe, timed_dataframe, metadata = self.loader.read_dataframe(
folder=cast(str, self.cpy(folder)),
metadata=metadata,
collect_metadata=collect_metadata,
**kwds,
)
self._dataframe = dataframe
self._timed_dataframe = timed_dataframe
self._files = self.loader.files
elif files is not None:
dataframe, metadata = self.loader.read_dataframe(
dataframe, timed_dataframe, metadata = self.loader.read_dataframe(
files=cast(List[str], self.cpy(files)),
metadata=metadata,
collect_metadata=collect_metadata,
**kwds,
)
self._dataframe = dataframe
self._timed_dataframe = timed_dataframe
self._files = self.loader.files
else:
raise ValueError(
Expand Down Expand Up @@ -503,6 +508,10 @@ def apply_momentum_correction(
self._dataframe, metadata = self.mc.apply_corrections(
df=self._dataframe,
)
if self._timed_dataframe is not None:
self._timed_dataframe, _ = self.mc.apply_corrections(
self._timed_dataframe,
)
# Add Metadata
self._attributes.add(
metadata,
Expand Down Expand Up @@ -592,6 +601,11 @@ class and the momentum calibration to the dataframe.
df=self._dataframe,
calibration=calibration,
)
if self._timed_dataframe is not None:
self._timed_dataframe, _ = self.mc.append_k_axis(
df=self._timed_dataframe,
calibration=calibration,
)

# Add Metadata
self._attributes.add(
Expand Down Expand Up @@ -677,6 +691,12 @@ def apply_energy_correction(
correction=correction,
**kwds,
)
if self._timed_dataframe is not None:
self._timed_dataframe, _ = self.ec.apply_energy_correction(
df=self._timed_dataframe,
correction=correction,
**kwds,
)

# Add Metadata
self._attributes.add(
Expand Down Expand Up @@ -929,6 +949,12 @@ def append_energy_axis(
calibration=calibration,
**kwds,
)
if self._timed_dataframe is not None:
self._timed_dataframe, _ = self.ec.append_energy_axis(
df=self._timed_dataframe,
calibration=calibration,
**kwds,
)

# Add Metadata
self._attributes.add(
Expand Down Expand Up @@ -969,6 +995,12 @@ def calibrate_delay_axis(
delay_range=delay_range,
**kwds,
)
if self._timed_dataframe is not None:
self._timed_dataframe, _ = self.dc.append_delay_axis(
self._timed_dataframe,
delay_range=delay_range,
**kwds,
)
else:
if datafile is None:
try:
Expand All @@ -985,6 +1017,12 @@ def calibrate_delay_axis(
datafile=datafile,
**kwds,
)
if self._timed_dataframe is not None:
self._timed_dataframe, _ = self.dc.append_delay_axis(
self._timed_dataframe,
datafile=datafile,
**kwds,
)

# Add Metadata
self._attributes.add(
Expand Down Expand Up @@ -1192,6 +1230,57 @@ def compute(

return self._binned

def get_normalization_histogram(
self,
axis: str = "delay",
**kwds,
) -> np.ndarray:
"""Generates a normalization histogram from the TimeStamps column of the
dataframe.
Args:
axis (str, optional): The axis for which to compute histogram.
Defaults to "delay".
**kwds: Keyword arguments:
-df_partitions (int, optional): Number of dataframe partitions to use.
Defaults to all.
Raises:
ValueError: Raised if no data are binned.
ValueError: Raised if 'axis' not in binned coordinates.
ValueError: Raised if config["dataframe"]["time_stamp_alias"] not found
in Dataframe.
Returns:
np.ndarray: The computed normalization histogram (in TimeStamp units
per bin).
"""

if self._binned is None:
raise ValueError("Need to bin data first!")
if axis not in self._binned.coords:
raise ValueError(f"Axis '{axis}' not found in binned data!")

df_partitions = kwds.pop("df_partitions", None)
if df_partitions is not None:
timed_dataframe = self._timed_dataframe.partitions[
0 : min(df_partitions, self._timed_dataframe.npartitions)
]
else:
timed_dataframe = self._timed_dataframe

bins = timed_dataframe[axis].map_partitions(
pd.cut,
bins=bin_centers_to_bin_edges(self._binned.coords[axis].values),
)

histogram = (
timed_dataframe[axis].groupby([bins]).count().compute().values
) / 1000

return histogram

def view_event_histogram(
self,
dfpid: int,
Expand Down
8 changes: 4 additions & 4 deletions sed/loader/base/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def read_dataframe(
metadata: dict = None,
collect_metadata: bool = False,
**kwds,
) -> Tuple[ddf.DataFrame, dict]:
) -> Tuple[ddf.DataFrame, ddf.DataFrame, dict]:
"""Reads data from given files or folder and returns a dask dataframe
and corresponding metadata.
Expand All @@ -70,8 +70,8 @@ def read_dataframe(
**kwds: keyword arguments. Se describtion in respective loader.
Returns:
Tuple[ddf.DataFrame, dict]: Dask dataframe and metadata read from
specified files.
Tuple[ddf.DataFrame, dict]: Dask dataframe, timed dataframe and metadata
read from specified files.
"""

if metadata is None:
Expand Down Expand Up @@ -100,7 +100,7 @@ def read_dataframe(
if not files:
raise FileNotFoundError("No valid files found!")

return None, None
return None, None, None

@abstractmethod
def get_count_rate(
Expand Down
14 changes: 7 additions & 7 deletions sed/loader/generic/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def read_dataframe(
metadata: dict = None,
collect_metadata: bool = False,
**kwds,
) -> Tuple[ddf.DataFrame, dict]:
) -> Tuple[ddf.DataFrame, ddf.DataFrame, dict]:
"""Read stored files from a folder into a dataframe.
Args:
Expand All @@ -57,8 +57,8 @@ def read_dataframe(
ValueError: Raised if the file type is not supported.
Returns:
Tuple[ddf.DataFrame, dict]: Dask dataframe and metadata read from specified
files.
Tuple[ddf.DataFrame, dict]: Dask dataframe, timed dataframe and metadata
read from specified files.
"""
# pylint: disable=duplicate-code
super().read_dataframe(
Expand All @@ -76,16 +76,16 @@ def read_dataframe(
self.metadata = self.metadata

if ftype == "parquet":
return (ddf.read_parquet(self.files, **kwds), self.metadata)
return (ddf.read_parquet(self.files, **kwds), None, self.metadata)

if ftype == "json":
return (ddf.read_json(self.files, **kwds), self.metadata)
return (ddf.read_json(self.files, **kwds), None, self.metadata)

if ftype == "csv":
return (ddf.read_csv(self.files, **kwds), self.metadata)
return (ddf.read_csv(self.files, **kwds), None, self.metadata)

try:
return (ddf.read_table(self.files, **kwds), self.metadata)
return (ddf.read_table(self.files, **kwds), None, self.metadata)
except (TypeError, ValueError, NotImplementedError) as exc:
raise ValueError(
"The file format cannot be understood!",
Expand Down
Loading

0 comments on commit c9b8a17

Please sign in to comment.