Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor flashloader #329

Merged
merged 80 commits into from
Jun 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
58dbcd7
major refactor to flash code
zain-sohail Nov 25, 2023
f376387
update dataframe class to be able to use index and dataset keys
zain-sohail Nov 25, 2023
08e8d9f
minor changes introduced
zain-sohail Nov 29, 2023
5c9a04c
change majorly the class with a new initialize method. now save parqu…
zain-sohail Nov 29, 2023
ff5dd07
now uses a simpler notation and save_parquet method after loading dat…
zain-sohail Nov 29, 2023
7852aaf
methods made more consistent and fixing the get_index_dataset_key
zain-sohail Nov 29, 2023
ac9abea
include steinn's proposed solution to pulse_id channel being empty
zain-sohail Nov 29, 2023
41fd70d
include unit tests and fixtures. still many to be done. needs to move…
zain-sohail Nov 29, 2023
da00635
add more tests, simplify logic on dataframe class
zain-sohail Dec 1, 2023
8b39bdb
remove the gmdTunnel channel because the datafile is not correct. Rep…
zain-sohail Dec 3, 2023
e1b9a9f
major structure changes
zain-sohail Dec 11, 2023
cd85dfd
docstrings etc
zain-sohail Dec 11, 2023
f6ca14e
updated buffer creation etc. tests won't work currently
zain-sohail Dec 12, 2023
c9f1fcc
fix linting errors and comment out tests for now
zain-sohail Dec 12, 2023
1398bf2
fix the error of getting wrong attribute in loader, and fix parquet l…
zain-sohail Dec 13, 2023
eb72230
fix lint error
zain-sohail Dec 13, 2023
4d950db
cleaning up the classes
zain-sohail Jan 6, 2024
b8bfdf0
add back easy access apis
zain-sohail Jan 6, 2024
1f95408
small fix
zain-sohail Jan 6, 2024
8f551d0
small fix
zain-sohail Jan 6, 2024
c85fdec
small fix
zain-sohail Jan 6, 2024
0a7e836
fix error with pickling
zain-sohail Jan 6, 2024
4a787eb
use old cfg
zain-sohail Jan 6, 2024
084f407
docstrings fixes
zain-sohail Jan 7, 2024
73802fa
fix tests
zain-sohail Jan 7, 2024
70a3c5b
fix certain problems with df_electron and add comphrehensive tests fo…
zain-sohail Jan 8, 2024
77bf46b
add tests
zain-sohail Jan 8, 2024
d8cc6f6
buffer handler tests
zain-sohail Jan 8, 2024
09cffec
ruff formated
zain-sohail Jan 8, 2024
0f23ddb
add parquethandler tests
zain-sohail Jan 8, 2024
ac4f8cd
further tests
zain-sohail Jan 8, 2024
1519752
fixes
zain-sohail Jan 8, 2024
d31e6b1
fix the lint error
zain-sohail Jan 8, 2024
ed18a5c
fix parse_metadata
zain-sohail Mar 27, 2024
ce8134f
put everything in one file
zain-sohail Mar 27, 2024
08a2adc
reoder
zain-sohail Mar 27, 2024
74b41dc
update interface from suggestions
zain-sohail Mar 27, 2024
b937db8
limit the cores used
zain-sohail Mar 27, 2024
9dc69aa
change interface of parquethandler to suggested
zain-sohail Mar 27, 2024
09a93d3
fix bug for df indexing
zain-sohail Mar 28, 2024
55cfa0c
merge main branch
zain-sohail Apr 18, 2024
4b3e6f7
Merge branch 'main' into refactor-flashloader
zain-sohail Apr 27, 2024
d316137
lint fix
zain-sohail Apr 27, 2024
c00207d
update dataframe saving and loading from parquet behavior
zain-sohail Apr 27, 2024
89130b0
remove saving/loading of parquets
zain-sohail May 19, 2024
dbef804
add instrument option
zain-sohail May 19, 2024
afd9772
fix tests
zain-sohail May 19, 2024
b9fce76
fix tests
zain-sohail May 19, 2024
6400878
fix tests
zain-sohail May 19, 2024
7129f57
fix tests
zain-sohail May 19, 2024
bc53214
Merge branch 'main' into refactor-flashloader
zain-sohail Jun 5, 2024
02aee6e
- added retrocompabtibility for older buffer files that have sectorID…
zain-sohail Jun 5, 2024
2142c11
fix ruff settings
zain-sohail Jun 5, 2024
79922ef
update tests
zain-sohail Jun 5, 2024
02ae74e
make small change to check actions status
zain-sohail Jun 5, 2024
f520310
bring back types
zain-sohail Jun 5, 2024
4c7d069
fix small error
zain-sohail Jun 5, 2024
9f6a31b
move utility func test to utility tests
zain-sohail Jun 6, 2024
f4a30e0
seperate to different modules
zain-sohail Jun 10, 2024
08f8f13
add time_elapsed method
zain-sohail Jun 10, 2024
fa68746
fix test issues
zain-sohail Jun 10, 2024
cb884dd
add tests for elapsed time
zain-sohail Jun 10, 2024
ae26555
fix main loader tests
zain-sohail Jun 10, 2024
04714bc
fix sxp loader tests
zain-sohail Jun 10, 2024
6589595
fix tests
zain-sohail Jun 10, 2024
1b73b76
fix minor issue with repr html
zain-sohail Jun 12, 2024
852a867
add available runs property
zain-sohail Jun 12, 2024
f010a2e
Merge branch 'main' into refactor-flashloader
zain-sohail Jun 13, 2024
8dd5e6a
Merge branch 'main' into refactor-flashloader
zain-sohail Jun 18, 2024
cd6fbf0
Merge branch 'v1_feature_branch' into refactor-flashloader
zain-sohail Jun 24, 2024
147e913
add back annotations
zain-sohail Jun 24, 2024
f2a26b9
use index and dataset keys
zain-sohail Jun 24, 2024
ebd2b32
Merge remote-tracking branch 'origin/v1_feature_branch' into refactor…
rettigl Jun 26, 2024
d131fe4
remove nans from all electron channels
zain-sohail Jun 28, 2024
194c874
use pd import, load h5 file inside df creator
zain-sohail Jun 28, 2024
af33740
update comments to explain the code
zain-sohail Jun 28, 2024
50f7ee1
make review changes
zain-sohail Jun 28, 2024
65d909d
fix tests with review comments
zain-sohail Jun 28, 2024
b7537a8
fix dropna
zain-sohail Jun 28, 2024
b0b090d
fix minor stuff and add test to see if exception handling works in pa…
zain-sohail Jun 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 28 additions & 12 deletions sed/config/flash_example_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ core:
beamtime_id: 11013410
# the year of the beamtime
year: 2023
# the instrument used
instrument: hextof # hextof, wespe, etc

# The paths to the raw and parquet data directories. If these are not
# provided, the loader will try to find the data based on year beamtimeID etc
Expand Down Expand Up @@ -52,18 +54,20 @@ dataframe:
tof_ns_column: dldTime
# dataframe column containing corrected time-of-flight data
corrected_tof_column: "tm"
# the time stamp column
time_stamp_alias: timeStamp
# time length of a base time-of-flight bin in seconds
tof_binwidth: 2.0576131995767355E-11
# binning parameter for time-of-flight data. 2**tof_binning bins per base bin
tof_binning: 3 # power of 2, 3 means 8 bins per step
# dataframe column containing sector ID. obtained from dldTimeSteps column
sector_id_column: dldSectorID

sector_delays: [0., 0., 0., 0., 0., 0., 0., 0.]
# the delay stage column
delay_column: delayStage
# the corrected pump-probe time axis
corrected_delay_column: pumpProbeTime
# the columns to be used for jitter correction
jitter_cols: ["dldPosX", "dldPosY", "dldTimeSteps"]

units:
Expand Down Expand Up @@ -95,39 +99,45 @@ dataframe:
# The timestamp
timeStamp:
format: per_train
group_name: "/uncategorised/FLASH.DIAG/TIMINGINFO/TIME1.BUNCH_FIRST_INDEX.1/"
index_key: "/uncategorised/FLASH.DIAG/TIMINGINFO/TIME1.BUNCH_FIRST_INDEX.1/index"
dataset_key: "/uncategorised/FLASH.DIAG/TIMINGINFO/TIME1.BUNCH_FIRST_INDEX.1/time"

# pulse ID is a necessary channel for using the loader.
pulseId:
format: per_electron
group_name: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/"
index_key: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/index"
dataset_key: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/value"
slice: 2

# detector x position
dldPosX:
format: per_electron
group_name: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/"
index_key: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/index"
dataset_key: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/value"
slice: 1

# detector y position
dldPosY:
format: per_electron
group_name: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/"
index_key: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/index"
dataset_key: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/value"
slice: 0

# Detector time-of-flight channel
# if split_sector_id_from_dld_time is set to True, This this will generate
# also the dldSectorID channel
dldTimeSteps:
format: per_electron
group_name: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/"
index_key: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/index"
dataset_key: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/value"
slice: 3

# The auxillary channel has a special structure where the group further contains
# a multidimensional structure so further aliases are defined below
dldAux:
format: per_pulse
group_name: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/"
index_key: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/index"
dataset_key: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/value"
slice: 4
dldAuxChannels:
sampleBias: 0
Expand All @@ -141,29 +151,35 @@ dataframe:
# ADC containing the pulser sign (1: value approx. 35000, 0: 33000)
pulserSignAdc:
format: per_pulse
group_name: "/FL1/Experiment/PG/SIS8300 100MHz ADC/CH6/TD/"
index_key: "/FL1/Experiment/PG/SIS8300 100MHz ADC/CH6/TD/index"
dataset_key: "/FL1/Experiment/PG/SIS8300 100MHz ADC/CH6/TD/value"

# the energy of the monochromatized beam. This is a quasi-static value.
# there is a better channel which still needs implementation.
monochromatorPhotonEnergy:
format: per_train
group_name: "/FL1/Beamlines/PG/Monochromator/monochromator photon energy/"
index_key: "/FL1/Beamlines/PG/Monochromator/monochromator photon energy/index"
dataset_key: "/FL1/Beamlines/PG/Monochromator/monochromator photon energy/value"

# The GMDs can not be read yet...
gmdBda:
format: per_train
group_name: "/FL1/Photon Diagnostic/GMD/Average energy/energy BDA/"
index_key: "/FL1/Photon Diagnostic/GMD/Average energy/energy BDA/index"
dataset_key: "/FL1/Photon Diagnostic/GMD/Average energy/energy BDA/value"

# Beam Arrival Monitor, vital for pump-probe experiments as it can compensate sase
# timing fluctuations.
# Here we use the DBC2 BAM as the "normal" one is broken.
bam:
format: per_pulse
group_name: "/uncategorised/FLASH.SDIAG/BAM.DAQ/FL0.DBC2.ARRIVAL_TIME.ABSOLUTE.SA1.COMP/"
index_key: "/uncategorised/FLASH.SDIAG/BAM.DAQ/FL0.DBC2.ARRIVAL_TIME.ABSOLUTE.SA1.COMP/index"
dataset_key: "/uncategorised/FLASH.SDIAG/BAM.DAQ/FL0.DBC2.ARRIVAL_TIME.ABSOLUTE.SA1.COMP/value"

# The delay Stage position, encoding the pump-probe delay
delayStage:
format: per_train
group_name: "/zraw/FLASH.SYNC/LASER.LOCK.EXP/F1.PG.OSC/FMC0.MD22.1.ENCODER_POSITION.RD/dGroup/"
index_key: "/zraw/FLASH.SYNC/LASER.LOCK.EXP/F1.PG.OSC/FMC0.MD22.1.ENCODER_POSITION.RD/dGroup/index"
dataset_key: "/zraw/FLASH.SYNC/LASER.LOCK.EXP/F1.PG.OSC/FMC0.MD22.1.ENCODER_POSITION.RD/dGroup/value"

# The prefixes of the stream names for different DAQ systems for parsing filenames
# (Not to be changed by user)
Expand Down
2 changes: 2 additions & 0 deletions sed/core/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ def _format_attributes(self, attributes: dict, indent: int = 0) -> str:
INDENT_FACTOR = 20
html = ""
for key, value in attributes.items():
# Ensure the key is a string
key = str(key)
# Format key
formatted_key = key.replace("_", " ").title()
formatted_key = f"<b>{formatted_key}</b>"
Expand Down
238 changes: 238 additions & 0 deletions sed/loader/flash/buffer_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
from __future__ import annotations

import os
from itertools import compress
from pathlib import Path

import dask.dataframe as dd
import pyarrow.parquet as pq
from joblib import delayed
from joblib import Parallel

from sed.core.dfops import forward_fill_lazy
from sed.loader.flash.dataframe import DataFrameCreator
from sed.loader.flash.utils import get_channels
from sed.loader.flash.utils import initialize_paths
from sed.loader.utils import get_parquet_metadata
from sed.loader.utils import split_dld_time_from_sector_id


class BufferHandler:
"""
A class for handling the creation and manipulation of buffer files using DataFrameCreator.
"""

def __init__(
self,
config: dict,
) -> None:
"""
Initializes the BufferHandler.

Args:
config (dict): The configuration dictionary.
"""
self._config = config["dataframe"]
self.n_cores = config["core"].get("num_cores", os.cpu_count() - 1)
rettigl marked this conversation as resolved.
Show resolved Hide resolved

self.buffer_paths: list[Path] = []
self.missing_h5_files: list[Path] = []
self.save_paths: list[Path] = []

self.df_electron: dd.DataFrame = None
self.df_pulse: dd.DataFrame = None
self.metadata: dict = {}

def _schema_check(self) -> None:
"""
Checks the schema of the Parquet files.

Raises:
ValueError: If the schema of the Parquet files does not match the configuration.
"""
existing_parquet_filenames = [file for file in self.buffer_paths if file.exists()]
parquet_schemas = [pq.read_schema(file) for file in existing_parquet_filenames]
config_schema_set = set(
get_channels(self._config["channels"], formats="all", index=True, extend_aux=True),
)

for filename, schema in zip(existing_parquet_filenames, parquet_schemas):
# for retro compatibility when sectorID was also saved in buffer
if self._config["sector_id_column"] in schema.names:
config_schema_set.add(
self._config["sector_id_column"],
)
schema_set = set(schema.names)
if schema_set != config_schema_set:
missing_in_parquet = config_schema_set - schema_set
missing_in_config = schema_set - config_schema_set

errors = []
if missing_in_parquet:
errors.append(f"Missing in parquet: {missing_in_parquet}")
if missing_in_config:
errors.append(f"Missing in config: {missing_in_config}")

raise ValueError(
f"The available channels do not match the schema of file {filename}. "
f"{' '.join(errors)}. "
"Please check the configuration file or set force_recreate to True.",
)

def _get_files_to_read(
self,
h5_paths: list[Path],
folder: Path,
prefix: str,
suffix: str,
force_recreate: bool,
) -> None:
"""
Determines the list of files to read and the corresponding buffer files to create.

Args:
h5_paths (List[Path]): List of paths to H5 files.
folder (Path): Path to the folder for buffer files.
prefix (str): Prefix for buffer file names.
suffix (str): Suffix for buffer file names.
force_recreate (bool): Flag to force recreation of buffer files.
"""
# Getting the paths of the buffer files, with subfolder as buffer and no extension
self.buffer_paths = initialize_paths(
filenames=[h5_path.stem for h5_path in h5_paths],
folder=folder,
subfolder="buffer",
prefix=prefix,
suffix=suffix,
extension="",
)
# read only the files that do not exist or if force_recreate is True
files_to_read = [
force_recreate or not parquet_path.exists() for parquet_path in self.buffer_paths
]

# Get the list of H5 files to read and the corresponding buffer files to create
self.missing_h5_files = list(compress(h5_paths, files_to_read))
self.save_paths = list(compress(self.buffer_paths, files_to_read))

print(f"Reading files: {len(self.missing_h5_files)} new files of {len(h5_paths)} total.")

def _save_buffer_file(self, h5_path: Path, parquet_path: Path) -> None:
"""
Creates a single buffer file.

Args:
h5_path (Path): Path to the H5 file.
parquet_path (Path): Path to the buffer file.
"""

# Create a DataFrameCreator instance and the h5 file
df = DataFrameCreator(config_dataframe=self._config, h5_path=h5_path).df

# Reset the index of the DataFrame and save it as a parquet file
df.reset_index().to_parquet(parquet_path)

def _save_buffer_files(self, debug: bool) -> None:
"""
Creates the buffer files.

Args:
debug (bool): Flag to enable debug mode, which serializes the creation.
"""
n_cores = min(len(self.missing_h5_files), self.n_cores)
paths = zip(self.missing_h5_files, self.save_paths)
if n_cores > 0:
if debug:
for h5_path, parquet_path in paths:
self._save_buffer_file(h5_path, parquet_path)
else:
Parallel(n_jobs=n_cores, verbose=10)(
delayed(self._save_buffer_file)(h5_path, parquet_path)
for h5_path, parquet_path in paths
)

def _fill_dataframes(self):
"""
Reads all parquet files into one dataframe using dask and fills NaN values.
"""
dataframe = dd.read_parquet(self.buffer_paths, calculate_divisions=True)
file_metadata = get_parquet_metadata(
self.buffer_paths,
time_stamp_col=self._config.get("time_stamp_alias", "timeStamp"),
)
self.metadata["file_statistics"] = file_metadata

fill_channels: list[str] = get_channels(
self._config["channels"],
["per_pulse", "per_train"],
extend_aux=True,
)
index: list[str] = get_channels(index=True)
overlap = min(file["num_rows"] for file in file_metadata.values())

dataframe = forward_fill_lazy(
df=dataframe,
columns=fill_channels,
before=overlap,
iterations=self._config.get("forward_fill_iterations", 2),
)
self.metadata["forward_fill"] = {
"columns": fill_channels,
"overlap": overlap,
"iterations": self._config.get("forward_fill_iterations", 2),
}

# Drop rows with nan values in electron channels
df_electron = dataframe.dropna(
subset=get_channels(self._config["channels"], ["per_electron"]),
)

# Set the dtypes of the channels here as there should be no null values
channel_dtypes = get_channels(self._config["channels"], "all")
rettigl marked this conversation as resolved.
Show resolved Hide resolved
config_channels = self._config["channels"]
dtypes = {
channel: config_channels[channel].get("dtype")
for channel in channel_dtypes
if config_channels[channel].get("dtype") is not None
}
rettigl marked this conversation as resolved.
Show resolved Hide resolved

# Correct the 3-bit shift which encodes the detector ID in the 8s time
if self._config.get("split_sector_id_from_dld_time", False):
df_electron, meta = split_dld_time_from_sector_id(
df_electron,
config=self._config,
)
self.metadata.update(meta)

self.df_electron = df_electron.astype(dtypes)
self.df_pulse = dataframe[index + fill_channels]

def run(
self,
h5_paths: list[Path],
folder: Path,
force_recreate: bool = False,
prefix: str = "",
suffix: str = "",
debug: bool = False,
) -> None:
"""
Runs the buffer file creation process.

Args:
h5_paths (List[Path]): List of paths to H5 files.
folder (Path): Path to the folder for buffer files.
force_recreate (bool): Flag to force recreation of buffer files.
prefix (str): Prefix for buffer file names.
suffix (str): Suffix for buffer file names.
debug (bool): Flag to enable debug mode.):
"""

self._get_files_to_read(h5_paths, folder, prefix, suffix, force_recreate)

if not force_recreate:
self._schema_check()

self._save_buffer_files(debug)

self._fill_dataframes()
Loading