Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor sxploader #331

Closed
wants to merge 41 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
7192dda
major refactor to flash code
zain-sohail Nov 25, 2023
3d64bd0
update dataframe class to be able to use index and dataset keys
zain-sohail Nov 25, 2023
ca17804
minor changes introduced
zain-sohail Nov 29, 2023
3695b2c
change majorly the class with a new initialize method. now save parqu…
zain-sohail Nov 29, 2023
3ac71f9
now uses a simpler notation and save_parquet method after loading dat…
zain-sohail Nov 29, 2023
d3fcf03
methods made more consistent and fixing the get_index_dataset_key
zain-sohail Nov 29, 2023
15fe4b0
include steinn's proposed solution to pulse_id channel being empty
zain-sohail Nov 29, 2023
96917bf
include unit tests and fixtures. still many to be done. needs to move…
zain-sohail Nov 29, 2023
4615a99
add more tests, simplify logic on dataframe class
zain-sohail Dec 1, 2023
e425825
remove the gmdTunnel channel because the datafile is not correct. Rep…
zain-sohail Dec 3, 2023
a3783f3
major structure changes
zain-sohail Dec 11, 2023
4846d24
docstrings etc
zain-sohail Dec 11, 2023
5a0195e
updated buffer creation etc. tests won't work currently
zain-sohail Dec 12, 2023
835de8e
fix linting errors and comment out tests for now
zain-sohail Dec 12, 2023
7439c91
fix the error of getting wrong attribute in loader, and fix parquet l…
zain-sohail Dec 13, 2023
6531d49
fix lint error
zain-sohail Dec 13, 2023
63d1cd7
cleaning up the classes
zain-sohail Jan 6, 2024
c4c7b1c
add back easy access apis
zain-sohail Jan 6, 2024
808df96
small fix
zain-sohail Jan 6, 2024
4ea65d0
small fix
zain-sohail Jan 6, 2024
a511f02
small fix
zain-sohail Jan 6, 2024
0831827
fix error with pickling
zain-sohail Jan 6, 2024
2abbd57
use old cfg
zain-sohail Jan 6, 2024
b811c48
docstrings fixes
zain-sohail Jan 7, 2024
b9ef280
fix tests
zain-sohail Jan 7, 2024
2c04f49
fix certain problems with df_electron and add comphrehensive tests fo…
zain-sohail Jan 8, 2024
c4d5a2c
add tests
zain-sohail Jan 8, 2024
205257b
buffer handler tests
zain-sohail Jan 8, 2024
5720782
pre commit
zain-sohail Jan 8, 2024
13cb95c
ruff formated
zain-sohail Jan 8, 2024
095b86b
add parquethandler tests
zain-sohail Jan 8, 2024
fdb9363
further tests
zain-sohail Jan 8, 2024
aca2664
fixes
zain-sohail Jan 8, 2024
1ab55ab
fix the lint error
zain-sohail Jan 8, 2024
83fe432
adding pydantic model for flash config
zain-sohail Jan 14, 2024
084603d
update loader to work with config model
zain-sohail Jan 14, 2024
2850040
update all tests
zain-sohail Jan 14, 2024
e4c75dc
add sxp loader, update tests and config model
zain-sohail Jan 15, 2024
0806cf8
fix lint and test errors
zain-sohail Jan 16, 2024
5be3cbf
add file so dir exists
zain-sohail Jan 16, 2024
7ed5c23
try another way of creating the dir beforehand
zain-sohail Jan 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,6 @@ dmypy.json

# IDE stuff
\.vscode

# Mac stuff
.DS_Store
11 changes: 11 additions & 0 deletions sed/loader/fel/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
"""sed.loader.fel module easy access APIs
"""
from .buffer import BufferHandler
from .dataframe import DataFrameCreator
from .parquet import ParquetHandler

__all__ = [
"BufferHandler",
"DataFrameCreator",
"ParquetHandler",
]
249 changes: 249 additions & 0 deletions sed/loader/fel/buffer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
"""
The BufferFileHandler uses the DataFrameCreator class and uses the ParquetHandler class to
manage buffer files. It provides methods for initializing paths, checking the schema,
determining the list of files to read, serializing and parallelizing the creation, and reading
all files into one Dask DataFrame.

After initialization, the electron and timed dataframes can be accessed as:

buffer_handler = BufferFileHandler(config, h5_paths, folder)

buffer_handler.electron_dataframe
buffer_handler.pulse_dataframe

Force_recreate flag forces recreation of buffer files. Useful when the schema has changed.
Debug mode serializes the creation of buffer files.
"""
from __future__ import annotations

from itertools import compress
from pathlib import Path
from typing import Type

import dask.dataframe as ddf
import h5py
import pyarrow.parquet as pq
from joblib import delayed
from joblib import Parallel

from sed.core.dfops import forward_fill_lazy
from sed.loader.fel.config_model import DataFrameConfig
from sed.loader.fel.dataframe import DataFrameCreator
from sed.loader.fel.parquet import ParquetHandler
from sed.loader.fel.utils import get_channels
from sed.loader.utils import split_dld_time_from_sector_id


class BufferHandler:
"""
A class for handling the creation and manipulation of buffer files using DataFrameCreator
and ParquetHandler.
"""

def __init__(
self,
df_creator: type[DataFrameCreator],
config: DataFrameConfig,
h5_paths: list[Path],
folder: Path,
force_recreate: bool = False,
prefix: str = "",
suffix: str = "",
debug: bool = False,
auto: bool = True,
) -> None:
"""
Initializes the BufferFileHandler.

Args:
df_creator (Type[DataFrameCreator]): Derived class based on DataFrameCreator.
config (DataFrameConfig): The dataframe section of the config model.
h5_paths (List[Path]): List of paths to H5 files.
folder (Path): Path to the folder for buffer files.
force_recreate (bool): Flag to force recreation of buffer files.
prefix (str): Prefix for buffer file names.
suffix (str): Suffix for buffer file names.
debug (bool): Flag to enable debug mode.
auto (bool): Flag to automatically create buffer files and fill the dataframe.
"""
self.df_creator = df_creator
self._config = config

self.buffer_paths: list[Path] = []
self.h5_to_create: list[Path] = []
self.buffer_to_create: list[Path] = []

self.dataframe_electron: ddf.DataFrame = None
self.dataframe_pulse: ddf.DataFrame = None

# In auto mode, these methods are called automatically
if auto:
self.get_files_to_read(h5_paths, folder, prefix, suffix, force_recreate)

if not force_recreate:
self.schema_check()

self.create_buffer_files(debug)

self.get_filled_dataframe()

def schema_check(self) -> None:
"""
Checks the schema of the Parquet files.

Raises:
ValueError: If the schema of the Parquet files does not match the configuration.
"""
existing_parquet_filenames = [file for file in self.buffer_paths if file.exists()]
parquet_schemas = [pq.read_schema(file) for file in existing_parquet_filenames]
config_schema = set(
get_channels(self._config.channels, formats="all", index=True, extend_aux=True),
)

for i, schema in enumerate(parquet_schemas):
schema_set = set(schema.names)
if schema_set != config_schema:
missing_in_parquet = config_schema - schema_set
missing_in_config = schema_set - config_schema

missing_in_parquet_str = (
f"Missing in parquet: {missing_in_parquet}" if missing_in_parquet else ""
)
missing_in_config_str = (
f"Missing in config: {missing_in_config}" if missing_in_config else ""
)

raise ValueError(
"The available channels do not match the schema of file",
f"{existing_parquet_filenames[i]}",
f"{missing_in_parquet_str}",
f"{missing_in_config_str}",
"Please check the configuration file or set force_recreate to True.",
)

def get_files_to_read(
self,
h5_paths: list[Path],
folder: Path,
prefix: str,
suffix: str,
force_recreate: bool,
) -> None:
"""
Determines the list of files to read and the corresponding buffer files to create.

Args:
h5_paths (List[Path]): List of paths to H5 files.
folder (Path): Path to the folder for buffer files.
prefix (str): Prefix for buffer file names.
suffix (str): Suffix for buffer file names.
force_recreate (bool): Flag to force recreation of buffer files.
"""
# Getting the paths of the buffer files, with subfolder as buffer and no extension
pq_handler = ParquetHandler(
[Path(h5_path).stem for h5_path in h5_paths],
folder,
"buffer",
prefix,
suffix,
extension="",
)
self.buffer_paths = pq_handler.parquet_paths
# read only the files that do not exist or if force_recreate is True
files_to_read = [
force_recreate or not parquet_path.exists() for parquet_path in self.buffer_paths
]

# Get the list of H5 files to read and the corresponding buffer files to create
self.h5_to_create = list(compress(h5_paths, files_to_read))
self.buffer_to_create = list(compress(self.buffer_paths, files_to_read))

self.num_files = len(self.h5_to_create)

print(f"Reading files: {self.num_files} new files of {len(h5_paths)} total.")

def _create_buffer_file(self, h5_path: Path, parquet_path: Path) -> None:
"""
Creates a single buffer file. Useful because h5py.File cannot be pickled if left open.

Args:
h5_path (Path): Path to the H5 file.
parquet_path (Path): Path to the buffer file.
"""
# Open the h5 file in read mode
h5_file = h5py.File(h5_path, "r")

# Create a DataFrameCreator instance with the configuration and the h5 file
dfc = self.df_creator(self._config, h5_file)

# Get the DataFrame from the DataFrameCreator instance
df = dfc.df

# Close the h5 file
h5_file.close()

# Reset the index of the DataFrame and save it as a parquet file
df.reset_index().to_parquet(parquet_path)

def create_buffer_files(self, debug: bool) -> None:
"""
Creates the buffer files.

Args:
debug (bool): Flag to enable debug mode, which serializes the creation.
"""
if self.num_files > 0:
if debug:
for h5_path, parquet_path in zip(self.h5_to_create, self.buffer_to_create):
self._create_buffer_file(h5_path, parquet_path)
else:
Parallel(n_jobs=self.num_files, verbose=10)(
delayed(self._create_buffer_file)(h5_path, parquet_path)
for h5_path, parquet_path in zip(self.h5_to_create, self.buffer_to_create)
)

def get_filled_dataframe(self) -> None:
"""
Reads all parquet files into one dataframe using dask and fills NaN values.
"""
dataframe = ddf.read_parquet(self.buffer_paths, calculate_divisions=True)
metadata = [pq.read_metadata(file) for file in self.buffer_paths]

channels: list[str] = get_channels(
self._config.channels,
["per_pulse", "per_train"],
extend_aux=True,
)
index: list[str] = get_channels(index=True)
overlap = min(file.num_rows for file in metadata)

print("Filling nan values...")
dataframe = forward_fill_lazy(
df=dataframe,
columns=channels,
before=overlap,
iterations=self._config.forward_fill_iterations,
)

# Drop rows with nan values in the tof column
dataframe_electron = dataframe.dropna(subset=self._config.tof_column)

# Set the dtypes of the channels here as there should be no null values
ch_names = get_channels(self._config.channels, "all")
cfg_ch = self._config.channels
dtypes = {
channel: cfg_ch[channel].dtype
for channel in ch_names
if cfg_ch[channel].dtype is not None
}

# Correct the 3-bit shift which encodes the detector ID in the 8s time
if self._config.split_sector_id_from_dld_time:
dataframe_electron = split_dld_time_from_sector_id(
dataframe_electron,
self._config.tof_column,
self._config.sector_id_column,
self._config.sector_id_reserved_bits,
)
self.dataframe_electron = dataframe_electron.astype(dtypes)
self.dataframe_pulse = dataframe[index + channels]
Loading