Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BTHofmann2023 additions #305

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from
3 changes: 2 additions & 1 deletion sed/binning/binning.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,8 @@ def bin_dataframe(
xarray object, combining the data with the axes (bin centers).
"""
bins, axes, ranges = simplify_binning_arguments(bins, axes, ranges)

# filter dataframe to use only the columns needed for the binning
df = df[axes]
Comment on lines +296 to +297
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw this before in your commits. I don't think this is helpful, as it introduces another graph layer. Did you try whether this really improve computation time? In my tests, it slowed things down. Or why did you introduce this in the first place?

# create the coordinate axes for the xarray output
# if provided as array, they are interpreted as bin centers
if isinstance(bins[0], np.ndarray):
Expand Down
76 changes: 72 additions & 4 deletions sed/core/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import pandas as pd
import psutil
import xarray as xr
from dask.diagnostics import ProgressBar

from sed.binning import bin_dataframe
from sed.binning.binning import normalization_histogram_from_timed_dataframe
Expand All @@ -25,8 +26,8 @@
from sed.calibrator import MomentumCorrector
from sed.core.config import parse_config
from sed.core.config import save_config
from sed.core.dfops import apply_filter
from sed.core.dfops import add_time_stamped_data
from sed.core.dfops import apply_filter
from sed.core.dfops import apply_jitter
from sed.core.metadata import MetaHandler
from sed.diagnostics import grid_histogram
Expand Down Expand Up @@ -164,14 +165,81 @@ def __init__(
)

def __repr__(self):
info = self.get_run_info()
if self._dataframe is None:
df_str = "Data Frame: No Data loaded"
else:
df_str = self._dataframe.__repr__()
attributes_str = f"Metadata: {self._attributes.metadata}"
pretty_str = df_str + "\n" + attributes_str
df_str = f"Data Frame: {len(info['dataframe']['columns'])} columns.\n"
df_str += f"{' '*11} {info['dataframe']['num_electrons']:,.0f} electrons.\n"
df_str += f"{' '*11} {info['dataframe']['num_trains']:,.0f} trains.\n"
df_str += f"{' '*11} {info['dataframe']['electrons_per_train']:,.1f} electrons/train.\n"
if "num_pulses" in info["dataframe"]:
df_str += f"{' '*11} {info['dataframe']['num_pulses']:,.0f} pulses.\n"
df_str += (
f"{' '*11} {info['dataframe']['electrons_per_pulse']} " "electrons/pulse.\n"
)
df_str += f"{' '*11} {info['dataframe']['timestamp_duration']:,.0f} seconds.\n"
df_str += f"{' '*11} {info['dataframe']['duration']}.\n"
df_str += (
f"{' '*11} {info['dataframe']['start_time']} to {info['dataframe']['end_time']}.\n"
)

# df_str = self._dataframe.__repr__()
# attributes_str = f"Metadata: {self._attributes.metadata}"
pretty_str = df_str # + "\n" + attributes_str
return pretty_str

def get_run_info(self, compute=False) -> dict:
"""Function to return a dict of information about the loaded data.

TODO: add dtypes from dataframe. add columns per pulse/per electron/per train

Returns:
dict: Dictionary with information about the loaded data.
"""
info: Dict[str, Any] = {}
head = self.dataframe.head(1)
tail = self.dataframe.tail(1)
info["dataframe"] = {}
info["dataframe"]["columns"] = self.dataframe.columns
if hasattr(self.loader, "num_electrons"):
n_el: int = self.loader.num_electrons
else:
n_el = None
if n_el is None and compute:
with ProgressBar():
print("computing number of electrons")
n_el = len(self.dataframe)
info["dataframe"]["num_electrons"] = n_el
if hasattr(self.loader, "num_pulses"):
n_pulses: int = self.loader.num_pulses
else:
n_pulses = None
if n_pulses is None and compute:
with ProgressBar():
print("computing number of pulses")
n_pulses = len(self.dataframe[self.dataframe["electronId"] == 0])
train_range: tuple = int(head["trainId"]), int(tail["trainId"])
n_trains = train_range[1] - train_range[0]
info["dataframe"]["trainId_min"] = train_range[0]
info["dataframe"]["trainId_max"] = train_range[1]
info["dataframe"]["num_trains"] = n_trains
if n_pulses is not None:
info["dataframe"]["electrons_per_pulse"] = n_el / n_pulses
if n_trains is not None:
info["dataframe"]["electrons_per_train"] = n_el / n_trains
tsr = float(head["timeStamp"]), float(tail["timeStamp"])
info["dataframe"]["timestamp_min"] = tsr[0]
info["dataframe"]["timestamp_max"] = tsr[1]
info["dataframe"]["timestamp_duration"] = tsr[1] - tsr[0]
info["dataframe"]["start_time"] = pd.to_datetime(tsr[0], unit="s")
info["dataframe"]["end_time"] = pd.to_datetime(tsr[1], unit="s")
info["dataframe"]["duration"] = pd.to_timedelta(tsr[1] - tsr[0], unit="s")

info["metadata"] = self._attributes.metadata
info["config"] = self._config
return info

@property
def dataframe(self) -> Union[pd.DataFrame, ddf.DataFrame]:
"""Accessor to the underlying dataframe.
Expand Down
52 changes: 43 additions & 9 deletions sed/loader/flash/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import time
from functools import reduce
from pathlib import Path
from typing import Any
from typing import List
from typing import Sequence
from typing import Tuple
Expand All @@ -25,6 +26,7 @@
from pandas import DataFrame
from pandas import MultiIndex
from pandas import Series
from tqdm.auto import tqdm

from sed.core import dfops
from sed.loader.base.loader import BaseLoader
Expand Down Expand Up @@ -52,6 +54,12 @@ def __init__(self, config: dict) -> None:
self.index_per_pulse: MultiIndex = None
self.failed_files_error: List[str] = []

self.prq_metadata: List[Any] = None
self.num_electrons: int = None
self.num_electrons_per_part: List[int] = None
self.num_pulses: int = None
self.parallel_loader: bool = True

def initialize_paths(self) -> Tuple[List[Path], Path]:
"""
Initializes the paths based on the configuration.
Expand Down Expand Up @@ -431,7 +439,15 @@ def create_dataframe_per_pulse(
# Macrobunch resolved data is exploded to a DataFrame and the MultiIndex is set

# Creates the index_per_pulse for the given channel
self.create_multi_index_per_pulse(train_id, np_array)
if np_array.ndim != 2:
np_array = np.empty((train_id.size, 0))
np_array[:, :] = np.nan
try:
self.create_multi_index_per_pulse(train_id, np_array)
except IndexError:
raise IndexError(
f"IndexError: {channel} seems to be empty.",
)
data = (
Series((np_array[i] for i in train_id.index), name=channel)
.explode()
Expand Down Expand Up @@ -520,6 +536,8 @@ def create_dataframe_per_channel(
# Pulse resolved data is treated here
elif channel_dict["format"] == "per_pulse":
# Create a DataFrame for pulse-resolved data
if np_array.ndim != 2:
np_array = np_array.reshape((np_array.size, 1))
data = self.create_dataframe_per_pulse(
np_array,
train_id,
Expand Down Expand Up @@ -646,6 +664,7 @@ def buffer_file_handler(
data_parquet_dir: Path,
detector: str,
force_recreate: bool,
parallel_loader=True,
) -> Tuple[List[Path], List, List]:
"""
Handles the conversion of buffer files (h5 to parquet) and returns the filenames.
Expand Down Expand Up @@ -720,12 +739,23 @@ def buffer_file_handler(

# Convert the remaining h5 files to parquet in parallel if there are any
if len(files_to_read) > 0:
error = Parallel(n_jobs=len(files_to_read), verbose=10)(
delayed(self.create_buffer_file)(h5_path, parquet_path)
for h5_path, parquet_path in files_to_read
)
if any(error):
raise RuntimeError(f"Conversion failed for some files. {error}")
if parallel_loader:
error = Parallel(n_jobs=len(files_to_read), verbose=10)(
delayed(self.create_buffer_file)(h5_path, parquet_path)
for h5_path, parquet_path in files_to_read
)
if any(error):
raise RuntimeError(f"Conversion failed for some files. {error}") from error[0]
else:
for h5_path, parquet_path in tqdm(
files_to_read,
desc="Converting h5 files to parquet",
):
error = self.create_buffer_file(h5_path, parquet_path)
if error:
raise RuntimeError(
f"Conversion failed for some file {h5_path}.\n {error}",
) from error

# Raise an error if the conversion failed for any files
# TODO: merge this and the previous error trackings
Expand All @@ -737,10 +767,12 @@ def buffer_file_handler(
print("All files converted successfully!")

# read all parquet metadata and schema
metadata = [pq.read_metadata(file) for file in parquet_filenames]
self.prq_metadata = [pq.read_metadata(file) for file in parquet_filenames]
self.num_electrons_per_part = [metadata.num_rows for metadata in self.prq_metadata]
self.num_electrons = sum(self.num_electrons_per_part)
schema = [pq.read_schema(file) for file in parquet_filenames]

return parquet_filenames, metadata, schema
return parquet_filenames, self.prq_metadata, schema

def parquet_handler(
self,
Expand All @@ -751,6 +783,7 @@ def parquet_handler(
load_parquet: bool = False,
save_parquet: bool = False,
force_recreate: bool = False,
parallel_loader: bool = True,
) -> Tuple[dd.DataFrame, dd.DataFrame]:
"""
Handles loading and saving of parquet files based on the provided parameters.
Expand Down Expand Up @@ -801,6 +834,7 @@ def parquet_handler(
data_parquet_dir,
detector,
force_recreate,
parallel_loader,
)

# Read all parquet files into one dataframe using dask
Expand Down
Loading