Skip to content

Commit

Permalink
seperate dict into two lists
Browse files Browse the repository at this point in the history
  • Loading branch information
zain-sohail committed Sep 28, 2023
1 parent 04030de commit 787b3df
Showing 1 changed file with 21 additions and 24 deletions.
45 changes: 21 additions & 24 deletions sed/loader/flash/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class FlashLoader(BaseLoader):

__name__ = "flash"

supported_file_types = ["h5", "parquet"]
supported_file_types = ["h5"]

def __init__(self, config: dict) -> None:

Expand Down Expand Up @@ -626,37 +626,35 @@ def buffer_file_handler(self, data_parquet_dir, detector):
detector (str): Detector name.
Returns:
Dict[Path, Path]: Dictionary mapping h5 file paths to corresponding parquet file paths.
Tuple[List[Path], List[Path]]: Two lists, one for h5 file paths and one for
corresponding parquet file paths.
Raises:
FileNotFoundError: If the conversion fails for any files or no data is available.
"""

# Create the directory for buffer parquet files
buffer_file_dir = data_parquet_dir.joinpath("buffer")
buffer_file_dir.mkdir(parents=True, exist_ok=True)

# Create a dictionary to store the filenames of h5 and parquet files
filenames = {
Path(file): buffer_file_dir.joinpath(
Path(file).stem + detector,
)
for file in self.files
}
# Create two separate lists for h5 and parquet file paths
h5_filenames = [Path(file) for file in self.files]
parquet_filenames = [
buffer_file_dir.joinpath(Path(file).stem + detector) for file in self.files
]

# Raise a value error if no data is available after the conversion
if len(filenames) == 0:
raise ValueError(
"No data available. Probably failed reading all h5 files",
)
if len(h5_filenames) == 0:
raise ValueError("No data available. Probably failed reading all h5 files")

# Choose files to read
files_to_read = {h_5: parquet for h_5, parquet in filenames.items() if not parquet.exists()}
files_to_read = [
(h5_path, parquet_path)
for h5_path, parquet_path in zip(h5_filenames, parquet_filenames)
if not parquet_path.exists()
]

print(
f"Reading files: {len(files_to_read)} new files of {len(filenames)} total.",
)
print(f"Reading files: {len(files_to_read)} new files of {len(h5_filenames)} total.")

# Initialize the indices for create_buffer_file conversion
self.reset_multi_index()
Expand All @@ -665,19 +663,18 @@ def buffer_file_handler(self, data_parquet_dir, detector):
if len(files_to_read) > 0:
Parallel(n_jobs=len(files_to_read), verbose=10)(
delayed(self.create_buffer_file)(h5_path, parquet_path)
for h5_path, parquet_path in files_to_read.items()
for h5_path, parquet_path in files_to_read
)

# Raise an error if the conversion failed for any files
if self.failed_files_error:
raise FileNotFoundError(
"Conversion failed for the following files: \n"
+ "\n".join(self.failed_files_error),
"Conversion failed for the following files:\n" + "\n".join(self.failed_files_error),
)

print("All files converted successfully!")

return filenames
return h5_filenames, parquet_filenames

def fill_na(
self,
Expand Down Expand Up @@ -782,14 +779,14 @@ def parquet_handler(self, data_parquet_dir, **kwds):

else:
# Obtain the filenames from the method which handles buffer file creation/reading
filenames = self.buffer_file_handler(
h5_filenames, parquet_filenames = self.buffer_file_handler(
data_parquet_dir,
detector,
)

# Read all parquet files using dask and concatenate into one dataframe after filling
dataframe = self.fill_na(
[dd.read_parquet(file) for file in filenames.values()],
[dd.read_parquet(file) for file in parquet_filenames],
)

dataframe = dataframe.dropna(
Expand Down

0 comments on commit 787b3df

Please sign in to comment.