Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mpes process files from memory #532

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 60 additions & 32 deletions sed/loader/mpes/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""
import datetime
import glob
import io
import json
import os
from typing import Dict
Expand All @@ -27,6 +28,29 @@
from sed.loader.base.loader import BaseLoader


def load_h5_in_memory(file_path):
"""
Load an HDF5 file entirely into memory and open it with h5py.

Parameters:
file_path (str): Path to the .h5 file.

Returns:
h5py.File: An h5py File object representing the in-memory HDF5 file.
"""
# Read the entire file into memory
with open(file_path, "rb") as f:
file_content = f.read()

# Load the content into a BytesIO object
file_buffer = io.BytesIO(file_content)

# Open the HDF5 file using h5py from the in-memory buffer
h5_file = h5py.File(file_buffer, "r")

return h5_file


def hdf5_to_dataframe(
files: Sequence[str],
group_names: Sequence[str] = None,
Expand Down Expand Up @@ -67,7 +91,7 @@ def hdf5_to_dataframe(

# Read a file to parse the file structure
test_fid = kwds.pop("test_fid", 0)
test_proc = h5py.File(files[test_fid])
test_proc = load_h5_in_memory(files[test_fid])
if group_names == []:
group_names, alias_dict = get_groups_and_aliases(
h5file=test_proc,
Expand All @@ -80,7 +104,7 @@ def hdf5_to_dataframe(
column_names.append(time_stamp_alias)

test_array = hdf5_to_array(
h5file=test_proc,
h5filename=files[test_fid],
group_names=group_names,
time_stamps=time_stamps,
ms_markers_group=ms_markers_group,
Expand All @@ -94,7 +118,7 @@ def hdf5_to_dataframe(
arrays.append(
da.from_delayed(
dask.delayed(hdf5_to_array)(
h5file=h5py.File(f),
h5filename=f,
group_names=group_names,
time_stamps=time_stamps,
ms_markers_group=ms_markers_group,
Expand All @@ -111,6 +135,8 @@ def hdf5_to_dataframe(

array_stack = da.concatenate(arrays, axis=1).T

test_proc.close()

return ddf.from_dask_array(array_stack, columns=column_names)


Expand Down Expand Up @@ -155,7 +181,7 @@ def hdf5_to_timed_dataframe(

# Read a file to parse the file structure
test_fid = kwds.pop("test_fid", 0)
test_proc = h5py.File(files[test_fid])
test_proc = load_h5_in_memory(files[test_fid])
if group_names == []:
group_names, alias_dict = get_groups_and_aliases(
h5file=test_proc,
Expand All @@ -168,7 +194,7 @@ def hdf5_to_timed_dataframe(
column_names.append(time_stamp_alias)

test_array = hdf5_to_timed_array(
h5file=test_proc,
h5filename=files[test_fid],
group_names=group_names,
time_stamps=time_stamps,
ms_markers_group=ms_markers_group,
Expand All @@ -182,7 +208,7 @@ def hdf5_to_timed_dataframe(
arrays.append(
da.from_delayed(
dask.delayed(hdf5_to_timed_array)(
h5file=h5py.File(f),
h5filename=f,
group_names=group_names,
time_stamps=time_stamps,
ms_markers_group=ms_markers_group,
Expand All @@ -198,6 +224,8 @@ def hdf5_to_timed_dataframe(

array_stack = da.concatenate(arrays, axis=1).T

test_proc.close()

return ddf.from_dask_array(array_stack, columns=column_names)


Expand Down Expand Up @@ -237,7 +265,7 @@ def get_groups_and_aliases(


def hdf5_to_array(
h5file: h5py.File,
h5filename: str,
group_names: Sequence[str],
data_type: str = "float32",
time_stamps=False,
Expand All @@ -248,14 +276,10 @@ def hdf5_to_array(
2-dimensional array with the corresponding values.

Args:
h5file (h5py.File):
hdf5 file handle to read from
group_names (str):
group names to read
data_type (str, optional):
Data type of the output data. Defaults to "float32".
time_stamps (bool, optional):
Option to calculate time stamps. Defaults to False.
h5filename (str): hdf5 file name to read from
group_names (str): group names to read
data_type (str, optional): Data type of the output data. Defaults to "float32".
time_stamps (bool, optional): Option to calculate time stamps. Defaults to False.
ms_markers_group (str): h5 column containing timestamp information.
Defaults to "msMarkers".
first_event_time_stamp_key (str): h5 attribute containing the start
Expand All @@ -267,6 +291,8 @@ def hdf5_to_array(

# Delayed array for loading an HDF5 file of reasonable size (e.g. < 1GB)

h5file = load_h5_in_memory(h5filename)

# Read out groups:
data_list = []
for group in group_names:
Expand All @@ -293,7 +319,7 @@ def hdf5_to_array(
except KeyError:
# get the start time of the file from its modification date if the key
# does not exist (old files)
start_time = os.path.getmtime(h5file.filename) # convert to ms
start_time = os.path.getmtime(h5filename) # convert to ms
# the modification time points to the time when the file was finished, so we
# need to correct for the time it took to write the file
start_time -= len(ms_marker) / 1000
Expand All @@ -316,11 +342,13 @@ def hdf5_to_array(

data_list.append(time_stamp_data)

h5file.close()

return np.asarray(data_list)


def hdf5_to_timed_array(
h5file: h5py.File,
h5filename: str,
group_names: Sequence[str],
data_type: str = "float32",
time_stamps=False,
Expand All @@ -331,14 +359,10 @@ def hdf5_to_timed_array(
timed version of a 2-dimensional array with the corresponding values.

Args:
h5file (h5py.File):
hdf5 file handle to read from
group_names (str):
group names to read
data_type (str, optional):
Data type of the output data. Defaults to "float32".
time_stamps (bool, optional):
Option to calculate time stamps. Defaults to False.
h5filename (str): hdf5 file name to read from
group_names (str): group names to read
data_type (str, optional): Data type of the output data. Defaults to "float32".
time_stamps (bool, optional): Option to calculate time stamps. Defaults to False.
ms_markers_group (str): h5 column containing timestamp information.
Defaults to "msMarkers".
first_event_time_stamp_key (str): h5 attribute containing the start
Expand All @@ -351,6 +375,8 @@ def hdf5_to_timed_array(

# Delayed array for loading an HDF5 file of reasonable size (e.g. < 1GB)

h5file = load_h5_in_memory(h5filename)

# Read out groups:
data_list = []
ms_marker = np.asarray(h5file[ms_markers_group])
Expand All @@ -377,7 +403,7 @@ def hdf5_to_timed_array(
except KeyError:
# get the start time of the file from its modification date if the key
# does not exist (old files)
start_time = os.path.getmtime(h5file.filename) # convert to ms
start_time = os.path.getmtime(h5filename) # convert to ms
# the modification time points to the time when the file was finished, so we
# need to correct for the time it took to write the file
start_time -= len(ms_marker) / 1000
Expand All @@ -386,6 +412,8 @@ def hdf5_to_timed_array(

data_list.append(time_stamp_data)

h5file.close()

return np.asarray(data_list)


Expand Down Expand Up @@ -692,16 +720,16 @@ def get_start_and_end_time(self) -> Tuple[float, float]:
Returns:
Tuple[float, float]: A tuple containing the start and end time stamps
"""
h5file = h5py.File(self.files[0])
h5filename = self.files[0]
timestamps = hdf5_to_array(
h5file,
h5filename=h5filename,
group_names=self._config["dataframe"]["hdf5_groupnames"],
time_stamps=True,
)
ts_from = timestamps[-1][1]
h5file = h5py.File(self.files[-1])
h5filename = self.files[-1]
timestamps = hdf5_to_array(
h5file,
h5filename=h5filename,
group_names=self._config["dataframe"]["hdf5_groupnames"],
time_stamps=True,
)
Expand Down Expand Up @@ -929,7 +957,7 @@ def get_count_rate(
for fid in fids:
try:
count_rate_, secs_ = get_count_rate(
h5py.File(self.files[fid]),
load_h5_in_memory(self.files[fid]),
ms_markers_group=ms_markers_group,
)
secs_list.append((accumulated_time + secs_).T)
Expand Down Expand Up @@ -974,7 +1002,7 @@ def get_elapsed_time(self, fids: Sequence[int] = None, **kwds) -> float:
for fid in fids:
try:
secs += get_elapsed_time(
h5py.File(self.files[fid]),
load_h5_in_memory(self.files[fid]),
ms_markers_group=ms_markers_group,
)
except OSError as exc:
Expand Down
Loading