Skip to content

Commit

Permalink
load metadata and schema after creating all buffer files
Browse files Browse the repository at this point in the history
  • Loading branch information
zain-sohail committed Nov 4, 2023
1 parent fd5c13a commit 328d4a4
Showing 1 changed file with 19 additions and 17 deletions.
36 changes: 19 additions & 17 deletions sed/loader/flash/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -656,22 +656,20 @@ def buffer_file_handler(self, data_parquet_dir: Path, detector: str, force_recre
if len(h5_filenames) == 0:
raise ValueError("No data available. Probably failed reading all h5 files")

# read parquet metadata and schema
metadatas = [pq.read_metadata(file) for file in existing_parquet_filenames]
schemas = [pq.read_schema(file) for file in existing_parquet_filenames]

# check if available_channels are same as schema
available_channels_set = set(self.available_channels)

for i, schema in enumerate(schemas):
schema_set = set(schema.names)
# Check if available_channels are the same as schema including pulseId
if not force_recreate and schema_set != available_channels_set.union({"pulseId"}):
raise ValueError(
"The available channels do not match the schema of file "
f"{existing_parquet_filenames[i]}"
"Please check the configuration file or set force_recreate to True.",
)
if not force_recreate:
# Check if the available channels match the schema of the existing parquet files
schemas = [pq.read_schema(file) for file in existing_parquet_filenames]
available_channels_set = set(self.available_channels)

for i, schema in enumerate(schemas):
schema_set = set(schema.names)
# Check if available_channels are the same as schema including pulseId
if schema_set != available_channels_set.union({"pulseId"}):
raise ValueError(
"The available channels do not match the schema of file "
f"{existing_parquet_filenames[i]}"
"Please check the configuration file or set force_recreate to True.",
)

# Choose files to read
files_to_read = [
Expand Down Expand Up @@ -703,7 +701,11 @@ def buffer_file_handler(self, data_parquet_dir: Path, detector: str, force_recre

print("All files converted successfully!")

return parquet_filenames, metadatas, schemas
# read all parquet metadata and schema
metadata = [pq.read_metadata(file) for file in parquet_filenames]
schema = [pq.read_schema(file) for file in parquet_filenames]

return parquet_filenames, metadata, schema

def parquet_handler(
self,
Expand Down

0 comments on commit 328d4a4

Please sign in to comment.