diff --git a/sed/loader/flash/loader.py b/sed/loader/flash/loader.py index fea8979b..91fe79e1 100644 --- a/sed/loader/flash/loader.py +++ b/sed/loader/flash/loader.py @@ -39,7 +39,7 @@ class FlashLoader(BaseLoader): __name__ = "flash" - supported_file_types = ["h5", "parquet"] + supported_file_types = ["h5"] def __init__(self, config: dict) -> None: @@ -626,37 +626,35 @@ def buffer_file_handler(self, data_parquet_dir, detector): detector (str): Detector name. Returns: - Dict[Path, Path]: Dictionary mapping h5 file paths to corresponding parquet file paths. + Tuple[List[Path], List[Path]]: Two lists, one for h5 file paths and one for + corresponding parquet file paths. Raises: FileNotFoundError: If the conversion fails for any files or no data is available. - """ # Create the directory for buffer parquet files buffer_file_dir = data_parquet_dir.joinpath("buffer") buffer_file_dir.mkdir(parents=True, exist_ok=True) - # Create a dictionary to store the filenames of h5 and parquet files - filenames = { - Path(file): buffer_file_dir.joinpath( - Path(file).stem + detector, - ) - for file in self.files - } + # Create two separate lists for h5 and parquet file paths + h5_filenames = [Path(file) for file in self.files] + parquet_filenames = [ + buffer_file_dir.joinpath(Path(file).stem + detector) for file in self.files + ] # Raise a value error if no data is available after the conversion - if len(filenames) == 0: - raise ValueError( - "No data available. Probably failed reading all h5 files", - ) + if len(h5_filenames) == 0: + raise ValueError("No data available. Probably failed reading all h5 files") # Choose files to read - files_to_read = {h_5: parquet for h_5, parquet in filenames.items() if not parquet.exists()} + files_to_read = [ + (h5_path, parquet_path) + for h5_path, parquet_path in zip(h5_filenames, parquet_filenames) + if not parquet_path.exists() + ] - print( - f"Reading files: {len(files_to_read)} new files of {len(filenames)} total.", - ) + print(f"Reading files: {len(files_to_read)} new files of {len(h5_filenames)} total.") # Initialize the indices for create_buffer_file conversion self.reset_multi_index() @@ -665,19 +663,18 @@ def buffer_file_handler(self, data_parquet_dir, detector): if len(files_to_read) > 0: Parallel(n_jobs=len(files_to_read), verbose=10)( delayed(self.create_buffer_file)(h5_path, parquet_path) - for h5_path, parquet_path in files_to_read.items() + for h5_path, parquet_path in files_to_read ) # Raise an error if the conversion failed for any files if self.failed_files_error: raise FileNotFoundError( - "Conversion failed for the following files: \n" - + "\n".join(self.failed_files_error), + "Conversion failed for the following files:\n" + "\n".join(self.failed_files_error), ) print("All files converted successfully!") - return filenames + return h5_filenames, parquet_filenames def fill_na( self, @@ -782,14 +779,14 @@ def parquet_handler(self, data_parquet_dir, **kwds): else: # Obtain the filenames from the method which handles buffer file creation/reading - filenames = self.buffer_file_handler( + h5_filenames, parquet_filenames = self.buffer_file_handler( data_parquet_dir, detector, ) # Read all parquet files using dask and concatenate into one dataframe after filling dataframe = self.fill_na( - [dd.read_parquet(file) for file in filenames.values()], + [dd.read_parquet(file) for file in parquet_filenames], ) dataframe = dataframe.dropna(