Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sxp fixes for new daq #510

Merged
merged 11 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .cspell/custom-dictionary.txt
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ dfpart
dfpid
dictionarized
dictmerge
DOOCS
dpkg
dropna
dset
Expand Down Expand Up @@ -381,6 +382,7 @@ xarray
xaxis
xcirc
xdata
XFEL
xind
Xinstrument
xlabel
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/documentation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ jobs:
run: |
cp -r $GITHUB_WORKSPACE/tutorial $GITHUB_WORKSPACE/docs/
cp -r $GITHUB_WORKSPACE/sed/config $GITHUB_WORKSPACE/docs/sed
rm $GITHUB_WORKSPACE/docs/tutorial/5_sxp_workflow.ipynb

# To be included later
# - name: Cache docs build
Expand All @@ -79,10 +78,11 @@ jobs:
cd $GITHUB_WORKSPACE/docs
poetry run python scripts/download_data.py

- name: build Flash parquet files
- name: build parquet files
run: |
cd $GITHUB_WORKSPACE/docs
poetry run python scripts/build_flash_parquets.py
poetry run python scripts/build_sxp_parquets.py

- name: build Sphinx docs
run: poetry run sphinx-build -b html $GITHUB_WORKSPACE/docs $GITHUB_WORKSPACE/_build
Expand Down
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Single-Event DataFrame (SED) documentation
tutorial/2_conversion_pipeline_for_example_time-resolved_ARPES_data
tutorial/3_metadata_collection_and_export_to_NeXus
tutorial/4_hextof_workflow.ipynb
tutorial/5_sxp_workflow.ipynb
tutorial/6_binning_with_time-stamped_data
tutorial/7_correcting_orthorhombic_symmetry
tutorial/8_jittering_tutorial
Expand Down
45 changes: 45 additions & 0 deletions docs/scripts/build_sxp_parquets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from pathlib import Path

import sed
from sed import SedProcessor
from sed.dataset import dataset

config_file = Path(sed.__file__).parent / "config/sxp_example_config.yaml"

dataset.get("Au_Mica", root_dir="./tutorial")
data_path = dataset.dir


config_override = {
"core": {
"paths": {
"data_raw_dir": data_path,
"data_parquet_dir": data_path + "/processed/",
},
},
}

runs = [
rettigl marked this conversation as resolved.
Show resolved Hide resolved
"0058",
"0059",
"0060",
"0061",
"0074",
"0073",
"0072",
"0071",
"0070",
"0064",
"0065",
"0066",
"0067",
"0068",
"0069",
]
for run in runs:
sp = SedProcessor(
runs=run,
config=config_override,
system_config=config_file,
collect_metadata=False,
)
1 change: 1 addition & 0 deletions docs/scripts/download_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
dataset.get("WSe2", remove_zip=True, root_dir=root_dir)
dataset.get("Gd_W110", remove_zip=True, root_dir=root_dir)
dataset.get("TaS2", remove_zip=True, root_dir=root_dir)
dataset.get("Au_Mica", remove_zip=True, root_dir=root_dir)
16 changes: 16 additions & 0 deletions tutorial/sxp_config.yaml → sed/config/sxp_example_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ dataframe:
daq: DA03
forward_fill_iterations: 2
num_trains: 10
# num_pulses: 400 # only needed for data from new DAQ
x_column: dldPosX
corrected_x_column: "X"
kx_column: "kx"
Expand All @@ -27,6 +28,7 @@ dataframe:
tof_ns_column: dldTime
corrected_tof_column: "tm"
bias_column: "sampleBias"
delay_column: "delayStage"
tof_binwidth: 6.875E-12 # in seconds
tof_binning: 0
jitter_cols: ["dldPosX", "dldPosY", "dldTimeSteps"]
Expand Down Expand Up @@ -84,6 +86,11 @@ dataframe:
format: per_train
dataset_key: "/CONTROL/SCS_ILH_LAS/MDL/OPTICALDELAY_PP800/actualPosition/value"
index_key: "/INDEX/trainId"
# test:
# daq: DA02 # change DAQ for a channel
# format: per_pulse
# dataset_key: "/INSTRUMENT/SA3_XTD10_XGM/XGM/DOOCS:output/data/intensitySa3TD"
# index_key: "/INSTRUMENT/SA3_XTD10_XGM/XGM/DOOCS:output/data/trainId"

stream_name_prefixes:
DA03: "RAW-R"
Expand All @@ -92,3 +99,12 @@ dataframe:

beamtime_dir:
sxp: "/gpfs/exfel/exp/SXP/"

histogram:
# number of bins used for histogram visualization
bins: [80, 80, 80, 80]
# default axes to use for histogram visualization.
# Axes names starting with "@" refer to keys in the "dataframe" section
axes: ["@x_column", "@y_column", "@tof_column", "@delay_column"]
# default ranges to use for histogram visualization (in unbinned detector coordinates)
ranges: [[0, 4000], [0, 4000], [1000, 28000], [-1000, 1000]]
3 changes: 3 additions & 0 deletions sed/dataset/datasets.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
"energycal_2020_07_20"
]
},
"Au_Mica": {
"url": "https://zenodo.org/records/13952965/files/Au_Mica_SXP.zip"
},
"Test": {
"url": "http://test.com/files/file.zip",
"subdirs": [
Expand Down
44 changes: 27 additions & 17 deletions sed/loader/sxp/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ def create_multi_index_per_electron(self, h5_file: h5py.File) -> None:
for i in train_id.index:
# removing broken trailing hit copies
num_trains = self._config["dataframe"].get("num_trains", 0)
num_pulses = self._config["dataframe"].get("num_pulses", 0)
if num_trains:
try:
num_valid_hits = np.where(np.diff(mib_array[i].astype(np.int32)) < 0)[0][
Expand All @@ -270,7 +271,10 @@ def create_multi_index_per_electron(self, h5_file: h5py.File) -> None:
index = 0
for train, train_end in enumerate(train_ends):
macrobunch_index.append(train_id[i] + np.uint(train))
microbunch_ids.append(mib_array[i, index:train_end])
if num_pulses:
microbunch_ids.append(mib_array[i, index:train_end] % num_pulses)
else:
microbunch_ids.append(mib_array[i, index:train_end])
indices.append(slice(index, train_end))
index = train_end + 1
macrobunch_indices.append(indices)
Expand Down Expand Up @@ -509,7 +513,7 @@ def create_dataframe_per_train(

def create_dataframe_per_channel(
self,
h5_file: h5py.File,
file_path: Path,
channel: str,
) -> Union[Series, DataFrame]:
"""
Expand All @@ -520,7 +524,7 @@ def create_dataframe_per_channel(
DataFrame depends on the channel's format specified in the configuration.

Args:
h5_file (h5py.File): The h5py.File object representing the HDF5 file.
file_path (Path): The path to the main HDF5 file.
channel (str): The name of the channel.

Returns:
Expand All @@ -530,11 +534,16 @@ def create_dataframe_per_channel(
ValueError: If the channel has an undefined format.

"""
channel_dict = self._config["dataframe"]["channels"][channel] # channel parameters
main_daq = self._config["dataframe"]["daq"]
channel_daq = self._config["dataframe"]["channels"][channel].get("daq", main_daq)
# load file corresponding to daq
h5_file = h5py.File(Path(str(file_path).replace(main_daq, channel_daq)))

[train_id, np_array] = self.create_numpy_array_per_channel(
h5_file,
channel,
) # numpy Array created
channel_dict = self._config["dataframe"]["channels"][channel] # channel parameters

# If np_array is size zero, fill with NaNs
if np_array.size == 0:
Expand Down Expand Up @@ -585,7 +594,7 @@ def create_dataframe_per_channel(

def concatenate_channels(
self,
h5_file: h5py.File,
file_path: Path,
) -> DataFrame:
"""
Concatenates the channels from the provided h5py.File into a pandas DataFrame.
Expand All @@ -595,7 +604,7 @@ def concatenate_channels(
available channels specified in the configuration.

Args:
h5_file (h5py.File): The h5py.File object representing the HDF5 file.
file_path (Path): The path to the main HDF5 file.

Returns:
DataFrame: A concatenated pandas DataFrame containing the channels.
Expand All @@ -604,11 +613,13 @@ def concatenate_channels(
ValueError: If the group_name for any channel does not exist in the file.

"""
all_keys = parse_h5_keys(h5_file) # Parses all channels present

# Check for if the provided dataset_keys and index_keys actually exists in the file
for channel in self._config["dataframe"]["channels"]:
dataset_key = self._config["dataframe"]["channels"][channel]["dataset_key"]
daq = self._config["dataframe"]["channels"][channel].get("daq", "DA03")
# load file corresponding to daq
h5_file = h5py.File(Path(str(file_path).replace("DA03", daq)))
all_keys = parse_h5_keys(h5_file) # Parses all channels present
if dataset_key not in all_keys:
raise ValueError(
f"The dataset_key for channel {channel} does not exist.",
Expand All @@ -621,7 +632,7 @@ def concatenate_channels(

# Create a generator expression to generate data frames for each channel
data_frames = (
self.create_dataframe_per_channel(h5_file, each) for each in self.available_channels
self.create_dataframe_per_channel(file_path, each) for each in self.available_channels
)

# Use the reduce function to join the data frames into a single DataFrame
Expand Down Expand Up @@ -649,14 +660,13 @@ def create_dataframe_per_file(

"""
# Loads h5 file and creates a dataframe
with h5py.File(file_path, "r") as h5_file:
self.reset_multi_index() # Reset MultiIndexes for next file
df = self.concatenate_channels(h5_file)
df = df.dropna(subset=self._config["dataframe"].get("tof_column", "dldTimeSteps"))
# correct the 3 bit shift which encodes the detector ID in the 8s time
if self._config["dataframe"].get("split_sector_id_from_dld_time", False):
df = split_dld_time_from_sector_id(df, config=self._config)
return df
self.reset_multi_index() # Reset MultiIndexes for next file
df = self.concatenate_channels(file_path)
df = df.dropna(subset=self._config["dataframe"].get("tof_column", "dldTimeSteps"))
# correct the 3 bit shift which encodes the detector ID in the 8s time
if self._config["dataframe"].get("split_sector_id_from_dld_time", False):
df = split_dld_time_from_sector_id(df, config=self._config)
return df

def create_buffer_file(self, h5_path: Path, parquet_path: Path) -> Union[bool, Exception]:
"""
Expand Down
Loading
Loading