Skip to content

Commit

Permalink
add consistency check and force recreation of buffer files
Browse files Browse the repository at this point in the history
  • Loading branch information
zain-sohail committed Nov 4, 2023
1 parent 77b272b commit 6dfdf16
Showing 1 changed file with 36 additions and 16 deletions.
52 changes: 36 additions & 16 deletions sed/loader/flash/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import dask.dataframe as dd
import h5py
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
from joblib import delayed
from joblib import Parallel
from natsort import natsorted
Expand Down Expand Up @@ -129,7 +129,6 @@ def get_files_from_run_id(
extension (str, optional): The file extension. Defaults to "h5".
kwds: Keyword arguments:
- daq (str): The data acquisition identifier.
Defaults to config["dataframe"]["daq"].
Returns:
List[str]: A list of path strings representing the collected file names.
Expand Down Expand Up @@ -625,17 +624,18 @@ def create_buffer_file(self, h5_path: Path, parquet_path: Path) -> Union[bool, E
return exc
return None

def buffer_file_handler(self, data_parquet_dir: Path, detector: str):
def buffer_file_handler(self, data_parquet_dir: Path, detector: str, force_recreate: bool):
"""
Handles the conversion of buffer files (h5 to parquet) and returns the filenames.
Args:
data_parquet_dir (Path): Directory where the parquet files will be stored.
detector (str): Detector name.
force_recreate (bool): Forces recreation of buffer files
Returns:
Tuple[List[Path], List[pa.parquet.FileMetaData]]: Two lists, one for
parquet file paths and one for parquet metadata.
Tuple[List[Path], List, List]: Three lists, one for
parquet file paths, one for metadata and one for schema.
Raises:
FileNotFoundError: If the conversion fails for any files or no data is available.
Expand All @@ -650,16 +650,33 @@ def buffer_file_handler(self, data_parquet_dir: Path, detector: str):
parquet_filenames = [
buffer_file_dir.joinpath(Path(file).stem + detector) for file in self.files
]
existing_parquet_filenames = [file for file in parquet_filenames if file.exists()]

# Raise a value error if no data is available after the conversion
if len(h5_filenames) == 0:
raise ValueError("No data available. Probably failed reading all h5 files")

# read parquet metadata and schema
metadata = [pq.read_metadata(file) for file in existing_parquet_filenames]
schema = [pq.read_schema(file) for file in existing_parquet_filenames]

# check if available_channels are same as schema
available_channels_set = set(self.available_channels)
for i in len(schema):
schema_set = set(schema[i])
# Check if available_channels are the same as schema including pulseId
if not force_recreate and schema_set != available_channels_set.union({"pulseId"}):
raise ValueError(
"The available channels do not match the schema of file "
f"{existing_parquet_filenames[i]}"
"Please check the configuration file or set force_recreate to True.",
)

# Choose files to read
files_to_read = [
(h5_path, parquet_path)
for h5_path, parquet_path in zip(h5_filenames, parquet_filenames)
if not parquet_path.exists()
if force_recreate or not parquet_path.exists()
]

print(f"Reading files: {len(files_to_read)} new files of {len(h5_filenames)} total.")
Expand All @@ -685,9 +702,7 @@ def buffer_file_handler(self, data_parquet_dir: Path, detector: str):

print("All files converted successfully!")

parquet_metadata = [pa.parquet.read_metadata(file) for file in parquet_filenames]

return parquet_filenames, parquet_metadata
return parquet_filenames, metadata, schema

def parquet_handler(
self,
Expand All @@ -697,6 +712,7 @@ def parquet_handler(
converted: bool = False,
load_parquet: bool = False,
save_parquet: bool = False,
force_recreate: bool = False,
):
"""
Handles loading and saving of parquet files based on the provided parameters.
Expand All @@ -710,7 +726,7 @@ def parquet_handler(
externally and saved into converted folder.
load_parquet (bool, optional): Loads the entire parquet into the dd dataframe.
save_parquet (bool, optional): Saves the entire dataframe into a parquet.
force_recreate (bool, optional): Forces recreation of buffer file.
Returns:
dataframe: Dataframe containing the loaded or processed data.
Expand Down Expand Up @@ -739,19 +755,23 @@ def parquet_handler(
) from exc

else:
# Obtain the filenames from the method which handles buffer file creation/reading
parquet_filenames, parquet_metadata = self.buffer_file_handler(
# Obtain the parquet filenames, metadata and schema from the method
# which handles buffer file creation/reading
filenames, metadata, schema = self.buffer_file_handler(
data_parquet_dir,
detector,
force_recreate,
)

# Read all parquet files into one dataframe using dask
dataframe = dd.read_parquet(parquet_filenames, calculate_divisions=True)
dataframe = dd.read_parquet(filenames, calculate_divisions=True)

# Channels to fill NaN values
print("Filling nan values...")
channels: List[str] = self.get_channels_by_format(["per_pulse", "per_train"])

overlap = min(file.num_rows for file in parquet_metadata)
overlap = min(file.num_rows for file in metadata)

print("Filling nan values...")
dataframe = dfops.forward_fill_lazy(
df=dataframe,
columns=channels,
Expand Down Expand Up @@ -859,7 +879,7 @@ def read_dataframe(
dataframe = self.parquet_handler(data_parquet_dir, **kwds)

metadata = self.parse_metadata() if collect_metadata else {}
print(f"loading complete in {time.time() - t0:.2f} s")
print(f"loading complete in {time.time() - t0: .2f} s")

return dataframe, metadata

Expand Down

0 comments on commit 6dfdf16

Please sign in to comment.