diff --git a/sed/loader/flash/loader.py b/sed/loader/flash/loader.py index e91b340c..b0beefd6 100644 --- a/sed/loader/flash/loader.py +++ b/sed/loader/flash/loader.py @@ -18,7 +18,7 @@ import dask.dataframe as dd import h5py import numpy as np -import pyarrow as pa +import pyarrow.parquet as pq from joblib import delayed from joblib import Parallel from natsort import natsorted @@ -129,7 +129,6 @@ def get_files_from_run_id( extension (str, optional): The file extension. Defaults to "h5". kwds: Keyword arguments: - daq (str): The data acquisition identifier. - Defaults to config["dataframe"]["daq"]. Returns: List[str]: A list of path strings representing the collected file names. @@ -625,17 +624,18 @@ def create_buffer_file(self, h5_path: Path, parquet_path: Path) -> Union[bool, E return exc return None - def buffer_file_handler(self, data_parquet_dir: Path, detector: str): + def buffer_file_handler(self, data_parquet_dir: Path, detector: str, force_recreate: bool): """ Handles the conversion of buffer files (h5 to parquet) and returns the filenames. Args: data_parquet_dir (Path): Directory where the parquet files will be stored. detector (str): Detector name. + force_recreate (bool): Forces recreation of buffer files Returns: - Tuple[List[Path], List[pa.parquet.FileMetaData]]: Two lists, one for - parquet file paths and one for parquet metadata. + Tuple[List[Path], List, List]: Three lists, one for + parquet file paths, one for metadata and one for schema. Raises: FileNotFoundError: If the conversion fails for any files or no data is available. @@ -650,16 +650,33 @@ def buffer_file_handler(self, data_parquet_dir: Path, detector: str): parquet_filenames = [ buffer_file_dir.joinpath(Path(file).stem + detector) for file in self.files ] + existing_parquet_filenames = [file for file in parquet_filenames if file.exists()] # Raise a value error if no data is available after the conversion if len(h5_filenames) == 0: raise ValueError("No data available. Probably failed reading all h5 files") + # read parquet metadata and schema + metadata = [pq.read_metadata(file) for file in existing_parquet_filenames] + schema = [pq.read_schema(file) for file in existing_parquet_filenames] + + # check if available_channels are same as schema + available_channels_set = set(self.available_channels) + for i in len(schema): + schema_set = set(schema[i]) + # Check if available_channels are the same as schema including pulseId + if not force_recreate and schema_set != available_channels_set.union({"pulseId"}): + raise ValueError( + "The available channels do not match the schema of file " + f"{existing_parquet_filenames[i]}" + "Please check the configuration file or set force_recreate to True.", + ) + # Choose files to read files_to_read = [ (h5_path, parquet_path) for h5_path, parquet_path in zip(h5_filenames, parquet_filenames) - if not parquet_path.exists() + if force_recreate or not parquet_path.exists() ] print(f"Reading files: {len(files_to_read)} new files of {len(h5_filenames)} total.") @@ -685,9 +702,7 @@ def buffer_file_handler(self, data_parquet_dir: Path, detector: str): print("All files converted successfully!") - parquet_metadata = [pa.parquet.read_metadata(file) for file in parquet_filenames] - - return parquet_filenames, parquet_metadata + return parquet_filenames, metadata, schema def parquet_handler( self, @@ -697,6 +712,7 @@ def parquet_handler( converted: bool = False, load_parquet: bool = False, save_parquet: bool = False, + force_recreate: bool = False, ): """ Handles loading and saving of parquet files based on the provided parameters. @@ -710,7 +726,7 @@ def parquet_handler( externally and saved into converted folder. load_parquet (bool, optional): Loads the entire parquet into the dd dataframe. save_parquet (bool, optional): Saves the entire dataframe into a parquet. - + force_recreate (bool, optional): Forces recreation of buffer file. Returns: dataframe: Dataframe containing the loaded or processed data. @@ -739,19 +755,23 @@ def parquet_handler( ) from exc else: - # Obtain the filenames from the method which handles buffer file creation/reading - parquet_filenames, parquet_metadata = self.buffer_file_handler( + # Obtain the parquet filenames, metadata and schema from the method + # which handles buffer file creation/reading + filenames, metadata, schema = self.buffer_file_handler( data_parquet_dir, detector, + force_recreate, ) + # Read all parquet files into one dataframe using dask - dataframe = dd.read_parquet(parquet_filenames, calculate_divisions=True) + dataframe = dd.read_parquet(filenames, calculate_divisions=True) + # Channels to fill NaN values - print("Filling nan values...") channels: List[str] = self.get_channels_by_format(["per_pulse", "per_train"]) - overlap = min(file.num_rows for file in parquet_metadata) + overlap = min(file.num_rows for file in metadata) + print("Filling nan values...") dataframe = dfops.forward_fill_lazy( df=dataframe, columns=channels, @@ -859,7 +879,7 @@ def read_dataframe( dataframe = self.parquet_handler(data_parquet_dir, **kwds) metadata = self.parse_metadata() if collect_metadata else {} - print(f"loading complete in {time.time() - t0:.2f} s") + print(f"loading complete in {time.time() - t0: .2f} s") return dataframe, metadata