diff --git a/sed/calibrator/energy.py b/sed/calibrator/energy.py index d09d5923..2c1f9a84 100644 --- a/sed/calibrator/energy.py +++ b/sed/calibrator/energy.py @@ -35,6 +35,7 @@ from scipy.sparse.linalg import lsqr from sed.binning import bin_dataframe +from sed.core import dfops from sed.loader.base.loader import BaseLoader @@ -95,6 +96,7 @@ def __init__( self.calibration: Dict[Any, Any] = {} self.tof_column = self._config["dataframe"]["tof_column"] + self.tof_ns_column = self._config["dataframe"].get("tof_ns_column", None) self.corrected_tof_column = self._config["dataframe"]["corrected_tof_column"] self.energy_column = self._config["dataframe"]["energy_column"] self.x_column = self._config["dataframe"]["x_column"] @@ -108,7 +110,9 @@ def __init__( ) / 2 ** (self.binning - 1) self.tof_fermi = self._config["energy"]["tof_fermi"] / 2 ** (self.binning - 1) self.color_clip = self._config["energy"]["color_clip"] - + self.sector_delays = self._config["dataframe"].get("sector_delays", None) + self.sector_id_column = self._config["dataframe"].get("sector_id_column", None) + self.offset: Dict[str, Any] = self._config["energy"].get("offset", {}) self.correction: Dict[Any, Any] = {} @property @@ -769,6 +773,26 @@ def view( # pylint: disable=dangerous-default-value pbk.show(fig) + def get_current_calibration(self) -> dict: + """Return the current calibration dictionary. + + if none is present, return the one from the config. If none is present there, + return an empty dictionary. + + Returns: + dict: Calibration dictionary. + """ + if self.calibration: + calibration = deepcopy(self.calibration) + else: + calibration = deepcopy( + self._config["energy"].get( + "calibration", + {}, + ), + ) + return calibration + def append_energy_axis( self, df: Union[pd.DataFrame, dask.dataframe.DataFrame], @@ -812,17 +836,8 @@ def append_energy_axis( binwidth = kwds.pop("binwidth", self.binwidth) binning = kwds.pop("binning", self.binning) - # pylint: disable=duplicate-code if calibration is None: - if self.calibration: - calibration = deepcopy(self.calibration) - else: - calibration = deepcopy( - self._config["energy"].get( - "calibration", - {}, - ), - ) + calibration = self.get_current_calibration() for key, value in kwds.items(): calibration[key] = value @@ -879,6 +894,53 @@ def append_energy_axis( return df, metadata + def append_tof_ns_axis( + self, + df: Union[pd.DataFrame, dask.dataframe.DataFrame], + tof_column: str = None, + tof_ns_column: str = None, + **kwds, + ) -> Tuple[Union[pd.DataFrame, dask.dataframe.DataFrame], dict]: + """Converts the time-of-flight time from steps to time in ns. + + Args: + df (Union[pd.DataFrame, dask.dataframe.DataFrame]): Dataframe to convert. + tof_column (str, optional): Name of the column containing the + time-of-flight steps. Defaults to config["dataframe"]["tof_column"]. + tof_ns_column (str, optional): Name of the column to store the + time-of-flight in nanoseconds. Defaults to config["dataframe"]["tof_ns_column"]. + binwidth (float, optional): Time-of-flight binwidth in ns. + Defaults to config["energy"]["tof_binwidth"]. + binning (int, optional): Time-of-flight binning factor. + Defaults to config["energy"]["tof_binning"]. + + Returns: + dask.dataframe.DataFrame: Dataframe with the new columns. + dict: Metadata dictionary. + """ + binwidth = kwds.pop("binwidth", self.binwidth) + binning = kwds.pop("binning", self.binning) + if tof_column is None: + if self.corrected_tof_column in df.columns: + tof_column = self.corrected_tof_column + else: + tof_column = self.tof_column + + if tof_ns_column is None: + tof_ns_column = self.tof_ns_column + + df[tof_ns_column] = tof2ns( + binwidth, + binning, + df[tof_column].astype("float64"), + ) + metadata: Dict[str, Any] = { + "applied": True, + "binwidth": binwidth, + "binning": binning, + } + return df, metadata + def gather_calibration_metadata(self, calibration: dict = None) -> dict: """Collects metadata from the energy calibration @@ -1358,6 +1420,161 @@ def gather_correction_metadata(self, correction: dict = None) -> dict: return metadata + def align_dld_sectors( + self, + df: dask.dataframe.DataFrame, + tof_column: str = None, + sector_id_column: str = None, + sector_delays: np.ndarray = None, + ) -> Tuple[dask.dataframe.DataFrame, dict]: + """Aligns the time-of-flight axis of the different sections of a detector. + + Args: + df (Union[pd.DataFrame, dask.dataframe.DataFrame]): Dataframe to use. + tof_column (str, optional): Name of the column containing the time-of-flight values. + Defaults to config["dataframe"]["tof_column"]. + sector_id_column (str, optional): Name of the column containing the sector id values. + Defaults to config["dataframe"]["sector_id_column"]. + sector_delays (np.ndarray, optional): Array containing the sector delays. Defaults to + config["dataframe"]["sector_delays"]. + + Returns: + dask.dataframe.DataFrame: Dataframe with the new columns. + dict: Metadata dictionary. + """ + sector_delays = sector_delays or self.sector_delays + sector_id_column = sector_id_column or self.sector_id_column + + if sector_delays is None or sector_id_column is None: + raise ValueError( + "No value for sector_delays or sector_id_column found in config." + "Config file is not properly configured for dld sector correction.", + ) + tof_column = tof_column or self.tof_column + + # align the 8s sectors + sector_delays_arr = dask.array.from_array(sector_delays) + + def align_sector(x): + val = x[tof_column] - sector_delays_arr[x[sector_id_column].values.astype(int)] + return val.astype(np.float32) + + df[tof_column] = df.map_partitions(align_sector, meta=(tof_column, np.float32)) + metadata: Dict[str, Any] = { + "applied": True, + "sector_delays": sector_delays, + } + return df, metadata + + def add_offsets( + self, + df: Union[pd.DataFrame, dask.dataframe.DataFrame] = None, + constant: float = None, + columns: Union[str, Sequence[str]] = None, + signs: Union[int, Sequence[int]] = None, + preserve_mean: Union[bool, Sequence[bool]] = False, + reductions: Union[str, Sequence[str]] = None, + energy_column: str = None, + ) -> Tuple[Union[pd.DataFrame, dask.dataframe.DataFrame], dict]: + """Apply an offset to the energy column by the values of the provided columns. + + If no parameter is passed to this function, the offset is applied as defined in the + config file. If parameters are passed, they are used to generate a new offset dictionary + and the offset is applied using the ``dfops.apply_offset_from_columns()`` function. + + # TODO: This funcion can still be improved and needs testsing + + Args: + df (Union[pd.DataFrame, dask.dataframe.DataFrame]): Dataframe to use. + constant (float, optional): The constant to shift the energy axis by. + columns (Union[str, Sequence[str]]): Name of the column(s) to apply the shift from. + signs (Union[int, Sequence[int]]): Sign of the shift to apply. (+1 or -1) A positive + sign shifts the energy axis to higher kinetic energies. Defaults to +1. + preserve_mean (bool): Whether to subtract the mean of the column before applying the + shift. Defaults to False. + reductions (str): The reduction to apply to the column. Should be an available method + of dask.dataframe.Series. For example "mean". In this case the function is applied + to the column to generate a single value for the whole dataset. If None, the shift + is applied per-dataframe-row. Defaults to None. Currently only "mean" is supported. + energy_column (str, optional): Name of the column containing the energy values. + + Returns: + dask.dataframe.DataFrame: Dataframe with the new columns. + dict: Metadata dictionary. + """ + if energy_column is None: + energy_column = self.energy_column + + # if no parameters are passed, use config + if columns is None and constant is None: + # load from config + columns = [] + signs = [] + preserve_mean = [] + reductions = [] + for k, v in self.offset.items(): + if k == "constant": + constant = v + else: + columns.append(k) + try: + signs.append(v["sign"]) + except KeyError as exc: + raise KeyError(f"Missing sign for offset column {k} in config.") from exc + preserve_mean.append(v.get("preserve_mean", False)) + reductions.append(v.get("reduction", None)) + + # flip sign for binding energy scale + energy_scale = self.get_current_calibration().get("energy_scale", None) + if energy_scale is None: + raise ValueError("Energy scale not set. Cannot interpret the sign of the offset.") + if energy_scale not in ["binding", "kinetic"]: + raise ValueError(f"Invalid energy scale: {energy_scale}") + scale_sign = -1 if energy_scale == "binding" else 1 + # initialize metadata container + metadata: Dict[str, Any] = { + "applied": True, + } + # apply offset + if columns is not None: + # use passed parameters + if isinstance(signs, int): + signs = [signs] + elif not isinstance(signs, Sequence): + raise TypeError(f"Invalid type for signs: {type(signs)}") + if not all(isinstance(s, int) for s in signs): + raise TypeError(f"Invalid type for signs: {type(signs)}") + # flip signs if binding energy scale + signs = [s * scale_sign for s in signs] + + df = dfops.offset_by_other_columns( + df=df, + target_column=energy_column, + offset_columns=columns, + signs=signs, + preserve_mean=preserve_mean, + reductions=reductions, + inplace=True, + ) + metadata["energy_column"] = energy_column + metadata["columns"] = columns + metadata["signs"] = signs + metadata["preserve_mean"] = preserve_mean + metadata["reductions"] = reductions + + # apply constant + if isinstance(constant, (int, float, np.integer, np.floating)): + df[energy_column] = df.map_partitions( + # flip sign if binding energy scale + lambda x: x[energy_column] + constant * scale_sign, + meta=(energy_column, np.float64), + ) + metadata["constant"] = constant + elif constant is not None: + raise TypeError(f"Invalid type for constant: {type(constant)}") + + return df, metadata + def extract_bias(files: List[str], bias_key: str) -> np.ndarray: """Read bias values from hdf5 files @@ -1836,6 +2053,12 @@ def fit_energy_calibation( - **'kinetic'**: increasing energy with decreasing TOF. - **'binding'**: increasing energy with increasing TOF. + t0 (float, optional): constrains and initial values for the fit parameter t0, corresponding + to the time of flight offset. Defaults to 1e-6. + E0 (float, optional): constrains and initial values for the fit parameter E0, corresponding + to the energy offset. Defaults to min(vals). + d (float, optional): constrains and initial values for the fit parameter d, corresponding + to the drift distance. Defaults to 1. Returns: dict: A dictionary of fitting parameters including the following, @@ -1868,13 +2091,33 @@ def residual(pars, time, data, binwidth, binning, energy_scale): return model - data pars = Parameters() - pars.add(name="d", value=kwds.pop("d_init", 1)) + d_pars = kwds.pop("d", {}) + pars.add( + name="d", + value=d_pars.get("value", 1), + min=d_pars.get("min", -np.inf), + max=d_pars.get("max", np.inf), + vary=d_pars.get("vary", True), + ) + t0_pars = kwds.pop("t0", {}) pars.add( name="t0", - value=kwds.pop("t0_init", 1e-6), - max=(min(pos) - 1) * binwidth * 2**binning, + value=t0_pars.get("value", 1e-6), + min=t0_pars.get("min", -np.inf), + max=t0_pars.get( + "max", + (min(pos) - 1) * binwidth * 2**binning, + ), + vary=t0_pars.get("vary", True), + ) + E0_pars = kwds.pop("E0", {}) # pylint: disable=invalid-name + pars.add( + name="E0", + value=E0_pars.get("value", min(vals)), + min=E0_pars.get("min", -np.inf), + max=E0_pars.get("max", np.inf), + vary=E0_pars.get("vary", True), ) - pars.add(name="E0", value=kwds.pop("E0_init", min(vals))) fit = Minimizer( residual, pars, @@ -2085,3 +2328,23 @@ def tof2evpoly( energy += energy_offset return energy + + +def tof2ns( + binwidth: float, + binning: int, + t: float, +) -> float: + """Converts the time-of-flight steps to time-of-flight in nanoseconds. + + designed for use with dask.dataframe.DataFrame.map_partitions. + + Args: + binwidth (float): Time step size in seconds. + binning (int): Binning of the time-of-flight steps. + t (float): TOF value in bin number. + Returns: + float: Converted time in nanoseconds. + """ + val = t * 1e9 * binwidth * 2.0**binning + return val diff --git a/sed/config/default.yaml b/sed/config/default.yaml index 10c362e6..0450faae 100644 --- a/sed/config/default.yaml +++ b/sed/config/default.yaml @@ -9,6 +9,8 @@ dataframe: y_column: "Y" # dataframe column containing time-of-flight data tof_column: "t" + # dataframe column containing time-of-flight data in nanoseconds + tof_ns_column: "t_ns" # dataframe column containing analog-to-digital data adc_column: "ADC" # dataframe column containing bias voltage data @@ -27,7 +29,7 @@ dataframe: energy_column: "energy" # dataframe column containing delay data delay_column: "delay" - # time length of a base time-of-flight bin in ns + # time length of a base time-of-flight bin in s tof_binwidth: 4.125e-12 # Binning factor of the tof_column-data compared to tof_binwidth (2^(tof_binning-1)) tof_binning: 1 diff --git a/sed/config/flash_example_config.yaml b/sed/config/flash_example_config.yaml index 2579b6c2..54c6eec6 100644 --- a/sed/config/flash_example_config.yaml +++ b/sed/config/flash_example_config.yaml @@ -11,20 +11,69 @@ core: paths: data_raw_dir: "tests/data/loader/flash/" data_parquet_dir: "tests/data/loader/flash/parquet" - + # These can be replaced by beamtime_id and year to automatically # find the folders on the desy cluster - + # beamtime_id: xxxxxxxx # year: 20xx dataframe: - # The offset correction to the pulseId - ubid_offset: 5 - # the number of iterations to fill the pulseId forward. - forward_fill_iterations: 2 # The name of the DAQ system to use. Necessary to resolve the filenames/paths. daq: fl1user3 + # The offset correction to the pulseId + ubid_offset: 5 + + # the number of iterations to fill the pulseId forward. + forward_fill_iterations: 2 + # if true, removes the 3 bits reserved for dldSectorID from the dldTimeSteps column + split_sector_id_from_dld_time: True + sector_id_reserved_bits: 3 + # dataframe column containing x coordinates + x_column: dldPosX + # dataframe column containing corrected x coordinates + corrected_x_column: "X" + # dataframe column containing kx coordinates + kx_column: "kx" + # dataframe column containing y coordinates + y_column: dldPosY + # dataframe column containing corrected y coordinates + corrected_y_column: "Y" + # dataframe column containing kx coordinates + ky_column: "ky" + # dataframe column containing time-of-flight data + tof_column: dldTimeSteps + # dataframe column containing time-of-flight data in ns + tof_ns_column: dldTime + # dataframe column containing corrected time-of-flight data + corrected_tof_column: "tm" + # time length of a base time-of-flight bin in seconds + tof_binwidth: 2.0576131995767355E-11 + # binning parameter for time-of-flight data. 2**tof_binning bins per base bin + tof_binning: 3 # power of 2, 3 means 8 bins per step + # dataframe column containing sector ID. obtained from dldTimeSteps column + sector_id_column: dldSectorID + + sector_delays: [0., 0., 0., 0., 0., 0., 0., 0.] + + jitter_cols: ["dldPosX", "dldPosY", "dldTimeSteps"] + + units: + dldPosX: 'step' + dldPosY: 'step' + dldTimeSteps: 'step' + tof_voltage: 'V' + extractorVoltage: 'V' + extractorCurrent: 'A' + cryoTemperature: 'K' + sampleTemperature: 'K' + dldTime: 'ns' + # delay: 'ps' + timeStamp: 's' + # energy: 'eV' + # E: 'eV' + kx: '1/A' + ky: '1/A' # The channels to load. # channels have the following structure: @@ -32,7 +81,7 @@ dataframe: # format: per_pulse/per_electron/per_train # group_name: the hdf5 group path # slice: if the group contains multidim data, where to slice - + channels: # pulse ID is a necessary channel for using the loader. pulseId: @@ -44,17 +93,18 @@ dataframe: format: per_electron group_name: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/" slice: 1 - + dldPosY: format: per_electron group_name: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/" slice: 0 - - dldTime: + # This channel will actually create dldTimeSteps and dldSectorID, + # if unravel_8s_detector_time_channel is set to True + dldTimeSteps: format: per_electron group_name: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/" slice: 3 - + # The auxillary channel has a special structure where the group further contains # a multidim structure so further aliases are defined below dldAux: @@ -69,7 +119,7 @@ dataframe: cryoTemperature: 4 sampleTemperature: 5 dldTimeBinSize: 15 - + # The prefixes of the stream names for different DAQ systems for parsing filenames # (Not to be changed by user) stream_name_prefixes: diff --git a/sed/core/dfops.py b/sed/core/dfops.py index b7dbd21e..aec8fed2 100644 --- a/sed/core/dfops.py +++ b/sed/core/dfops.py @@ -8,9 +8,9 @@ from typing import Union import dask.dataframe -from dask.diagnostics import ProgressBar import numpy as np import pandas as pd +from dask.diagnostics import ProgressBar def apply_jitter( @@ -144,11 +144,11 @@ def map_columns_2d( def forward_fill_lazy( - df: dask.dataframe.DataFrame, - channels: Sequence[str], - before: Union[str, int] = 'max', - compute_lengths: bool = False, - iterations: int = 2, + df: dask.dataframe.DataFrame, + columns: Sequence[str] = None, + before: Union[str, int] = "max", + compute_lengths: bool = False, + iterations: int = 2, ) -> dask.dataframe.DataFrame: """Forward fill the specified columns multiple times in a dask dataframe. @@ -160,25 +160,33 @@ def forward_fill_lazy( Args: df (dask.dataframe.DataFrame): The dataframe to forward fill. - channels (list): The columns to forward fill. + columns (list): The columns to forward fill. If None, fills all columns before (int, str, optional): The number of rows to include before the current partition. if 'max' it takes as much as possible from the previous partition, which is the size of the smallest partition in the dataframe. Defaults to 'max'. - after (int, optional): The number of rows to include after the current partition. - Defaults to 'part'. compute_lengths (bool, optional): Whether to compute the length of each partition iterations (int, optional): The number of times to forward fill the dataframe. Returns: dask.dataframe.DataFrame: The dataframe with the specified columns forward filled. """ + if columns is None: + columns = df.columns + elif isinstance(columns, str): + columns = [columns] + elif len(columns) == 0: + raise ValueError("columns must be a non-empty list of strings!") + for c in columns: + if c not in df.columns: + raise KeyError(f"{c} not in dataframe!") + # Define a custom function to forward fill specified columns def forward_fill_partition(df): - df[channels] = df[channels].ffill() + df[columns] = df[columns].ffill() return df # calculate the number of rows in each partition and choose least - if before == 'max': + if before == "max": nrows = df.map_partitions(len) if compute_lengths: with ProgressBar(): @@ -195,3 +203,190 @@ def forward_fill_partition(df): after=0, ) return df + + +def backward_fill_lazy( + df: dask.dataframe.DataFrame, + columns: Sequence[str] = None, + after: Union[str, int] = "max", + compute_lengths: bool = False, + iterations: int = 1, +) -> dask.dataframe.DataFrame: + """Forward fill the specified columns multiple times in a dask dataframe. + + Allows backward filling between partitions. Similar to forward fill, but backwards. + This helps to fill the initial values of a dataframe, which are often NaNs. + Use with care as the assumption of the values being the same in the past is often not true. + + Args: + df (dask.dataframe.DataFrame): The dataframe to forward fill. + columns (list): The columns to forward fill. If None, fills all columns + after (int, str, optional): The number of rows to include after the current partition. + if 'max' it takes as much as possible from the previous partition, which is + the size of the smallest partition in the dataframe. Defaults to 'max'. + compute_lengths (bool, optional): Whether to compute the length of each partition + iterations (int, optional): The number of times to backward fill the dataframe. + + Returns: + dask.dataframe.DataFrame: The dataframe with the specified columns backward filled. + """ + if columns is None: + columns = df.columns + elif isinstance(columns, str): + columns = [columns] + elif len(columns) == 0: + raise ValueError("columns must be a non-empty list of strings!") + for c in columns: + if c not in df.columns: + raise KeyError(f"{c} not in dataframe!") + + # Define a custom function to forward fill specified columns + def backward_fill_partition(df): + df[columns] = df[columns].bfill() + return df + + # calculate the number of rows in each partition and choose least + if after == "max": + nrows = df.map_partitions(len) + if compute_lengths: + with ProgressBar(): + print("Computing dataframe shape...") + nrows = nrows.compute() + after = min(nrows) + elif not isinstance(after, int): + raise TypeError('before must be an integer or "max"') + # Use map_overlap to apply forward_fill_partition + for _ in range(iterations): + df = df.map_overlap( + backward_fill_partition, + before=0, + after=after, + ) + return df + + +def offset_by_other_columns( + df: dask.dataframe.DataFrame, + target_column: str, + offset_columns: Union[str, Sequence[str]], + signs: Union[int, Sequence[int]], + reductions: Union[str, Sequence[str]] = None, + preserve_mean: Union[bool, Sequence[bool]] = False, + inplace: bool = True, + rename: str = None, +) -> dask.dataframe.DataFrame: + """Apply an offset to a column based on the values of other columns. + + Args: + df (dask.dataframe.DataFrame): Dataframe to use. Currently supports only dask dataframes. + target_column (str): Name of the column to apply the offset to. + offset_columns (str): Name of the column(s) to use for the offset. + signs (int): Sign of the offset. + reductions (str, optional): Reduction function to use for the offset. Defaults to "mean". + Currently, only mean is supported. + preserve_mean (bool, optional): Whether to subtract the mean of the offset column. + Defaults to False. If a list is given, it must have the same length as + offset_columns. Otherwise the value passed is used for all columns. + inplace (bool, optional): Whether to apply the offset inplace. + If false, the new column will have the name provided by rename, or has the same name as + target_column with the suffix _offset if that is None. Defaults to True. + rename (str, optional): Name of the new column if inplace is False. Defaults to None. + Returns: + dask.dataframe.DataFrame: Dataframe with the new column. + """ + if target_column not in df.columns: + raise KeyError(f"{target_column} not in dataframe!") + + if isinstance(offset_columns, str): + offset_columns = [offset_columns] + elif not isinstance(offset_columns, Sequence): + raise TypeError(f"Invalid type for columns: {type(offset_columns)}") + if any(c not in df.columns for c in offset_columns): + raise KeyError(f"{offset_columns} not in dataframe!") + + if isinstance(signs, int): + signs = [signs] + elif not isinstance(signs, Sequence): + raise TypeError(f"Invalid type for signs: {type(signs)}") + if len(signs) != len(offset_columns): + raise ValueError("signs and offset_columns must have the same length!") + signs_dict = dict(zip(offset_columns, signs)) + + if isinstance(reductions, str) or reductions is None: + reductions = [reductions] * len(offset_columns) + elif not isinstance(reductions, Sequence): + raise ValueError(f"reductions must be a string or list of strings! not {type(reductions)}") + if any(r not in ["mean", None] for r in reductions): + raise NotImplementedError("Only reductions currently supported is 'mean'!") + + if isinstance(preserve_mean, bool): + preserve_mean = [preserve_mean] * len(offset_columns) + elif not isinstance(preserve_mean, Sequence): + raise TypeError(f"Invalid type for preserve_mean: {type(preserve_mean)}") + elif any(not isinstance(p, bool) for p in preserve_mean): + raise TypeError(f"Invalid type for preserve_mean: {type(preserve_mean)}") + if len(preserve_mean) != len(offset_columns): + raise ValueError("preserve_mean and offset_columns must have the same length!") + + if not inplace: + if rename is None: + rename = target_column + "_offset" + df[rename] = df[target_column] + target_column = rename + + if isinstance(df, pd.DataFrame): + raise NotImplementedError( + "Offsetting by other columns is currently not supported for pandas dataframes! " + "Please open a request on GitHub if this feature is required.", + ) + + # calculate the mean of the columns to reduce + means = { + col: dask.delayed(df[col].mean()) + for col, red, pm in zip(offset_columns, reductions, preserve_mean) + if red or pm + } + + # define the functions to apply the offsets + def shift_by_mean(x, cols, signs, means, flip_signs=False): + """Shift the target column by the mean of the offset columns.""" + for col in cols: + s = -signs[col] if flip_signs else signs[col] + x[target_column] = x[target_column] + s * means[col] + return x[target_column] + + def shift_by_row(x, cols, signs): + """Apply the offsets to the target column.""" + for col in cols: + x[target_column] = x[target_column] + signs[col] * x[col] + return x[target_column] + + # apply offset from the reduced columns + df[target_column] = df.map_partitions( + shift_by_mean, + cols=[col for col, red in zip(offset_columns, reductions) if red], + signs=signs_dict, + means=means, + meta=df[target_column].dtype, + ) + + # apply offset from the offset columns + df[target_column] = df.map_partitions( + shift_by_row, + cols=[col for col, red in zip(offset_columns, reductions) if not red], + signs=signs_dict, + meta=df[target_column].dtype, + ) + + # compensate shift from the preserved mean columns + if any(preserve_mean): + df[target_column] = df.map_partitions( + shift_by_mean, + cols=[col for col, pmean in zip(offset_columns, preserve_mean) if pmean], + signs=signs_dict, + means=means, + flip_signs=True, + meta=df[target_column].dtype, + ) + + return df diff --git a/sed/core/processor.py b/sed/core/processor.py index 6866843f..9b90fe8b 100644 --- a/sed/core/processor.py +++ b/sed/core/processor.py @@ -1117,7 +1117,13 @@ def save_energy_calibration( "Energy calibration parameters not found, need to generate parameters first!", ) from exc - config = {"energy": {"calibration": calibration}} + config = { + "energy": { + "calibration": calibration, + }, + } + if isinstance(self.ec.offset, dict): + config["energy"]["offset"] = self.ec.offset save_config(config, filename, overwrite) # 4. Apply energy calibration to the dataframe @@ -1159,6 +1165,106 @@ def append_energy_axis( else: print(self._dataframe) + def add_energy_offset( + self, + constant: float = None, + columns: Union[str, Sequence[str]] = None, + signs: Union[int, Sequence[int]] = None, + reductions: Union[str, Sequence[str]] = None, + preserve_mean: Union[bool, Sequence[bool]] = None, + ) -> None: + """Shift the energy axis of the dataframe by a given amount. + + Args: + constant (float, optional): The constant to shift the energy axis by. + columns (Union[str, Sequence[str]]): Name of the column(s) to apply the shift from. + signs (Union[int, Sequence[int]]): Sign of the shift to apply. (+1 or -1) A positive + sign shifts the energy axis to higher kinetic energies. Defaults to +1. + preserve_mean (bool): Whether to subtract the mean of the column before applying the + shift. Defaults to False. + reductions (str): The reduction to apply to the column. Should be an available method + of dask.dataframe.Series. For example "mean". In this case the function is applied + to the column to generate a single value for the whole dataset. If None, the shift + is applied per-dataframe-row. Defaults to None. Currently only "mean" is supported. + + Raises: + ValueError: If the energy column is not in the dataframe. + """ + energy_column = self._config["dataframe"]["energy_column"] + if energy_column not in self._dataframe.columns: + raise ValueError( + f"Energy column {energy_column} not found in dataframe! " + "Run `append energy axis` first.", + ) + if self.dataframe is not None: + df, metadata = self.ec.add_offsets( + df=self._dataframe, + constant=constant, + columns=columns, + energy_column=energy_column, + signs=signs, + reductions=reductions, + preserve_mean=preserve_mean, + ) + self._attributes.add( + metadata, + "add_energy_offset", + # TODO: allow only appending when no offset along this column(s) was applied + # TODO: clear memory of modifications if the energy axis is recalculated + duplicate_policy="append", + ) + self._dataframe = df + else: + raise ValueError("No dataframe loaded!") + + def append_tof_ns_axis( + self, + **kwargs, + ): + """Convert time-of-flight channel steps to nanoseconds. + + Args: + tof_ns_column (str, optional): Name of the generated column containing the + time-of-flight in nanosecond. + Defaults to config["dataframe"]["tof_ns_column"]. + kwargs: additional arguments are passed to ``energy.tof_step_to_ns``. + + """ + if self._dataframe is not None: + print("Adding time-of-flight column in nanoseconds to dataframe:") + # TODO assert order of execution through metadata + + self._dataframe, metadata = self.ec.append_tof_ns_axis( + df=self._dataframe, + **kwargs, + ) + self._attributes.add( + metadata, + "tof_ns_conversion", + duplicate_policy="append", + ) + + def align_dld_sectors(self, sector_delays: np.ndarray = None, **kwargs): + """Align the 8s sectors of the HEXTOF endstation. + + Args: + sector_delays (np.ndarray, optional): Array containing the sector delays. Defaults to + config["dataframe"]["sector_delays"]. + """ + if self._dataframe is not None: + print("Aligning 8s sectors of dataframe") + # TODO assert order of execution through metadata + self._dataframe, metadata = self.ec.align_dld_sectors( + df=self._dataframe, + sector_delays=sector_delays, + **kwargs, + ) + self._attributes.add( + metadata, + "dld_sector_alignment", + duplicate_policy="raise", + ) + # Delay calibration function def calibrate_delay_axis( self, diff --git a/sed/loader/flash/loader.py b/sed/loader/flash/loader.py index c1622c87..f66d73cc 100644 --- a/sed/loader/flash/loader.py +++ b/sed/loader/flash/loader.py @@ -29,6 +29,7 @@ from sed.core import dfops from sed.loader.base.loader import BaseLoader from sed.loader.flash.metadata import MetadataRetriever +from sed.loader.flash.utils import split_dld_time_from_sector_id from sed.loader.utils import parse_h5_keys @@ -591,9 +592,14 @@ def create_dataframe_per_file( # Loads h5 file and creates a dataframe with h5py.File(file_path, "r") as h5_file: self.reset_multi_index() # Reset MultiIndexes for next file - return self.concatenate_channels(h5_file) + df = self.concatenate_channels(h5_file) + df = df.dropna(subset=self._config["dataframe"].get("tof_column", "dldTimeSteps")) + # correct the 3 bit shift which encodes the detector ID in the 8s time + if self._config["dataframe"].get("split_sector_id_from_dld_time", False): + df = split_dld_time_from_sector_id(df, config=self._config) + return df - def create_buffer_file(self, h5_path: Path, parquet_path: Path) -> None: + def create_buffer_file(self, h5_path: Path, parquet_path: Path) -> Union[bool, Exception]: """ Converts an HDF5 file to Parquet format to create a buffer file. @@ -614,10 +620,10 @@ def create_buffer_file(self, h5_path: Path, parquet_path: Path) -> None: .reset_index(level=self.multi_index) .to_parquet(parquet_path, index=False) ) - except ValueError as failed_string_error: - self.failed_files_error.append( - f"{parquet_path}: {failed_string_error}", - ) + except Exception as exc: # pylint: disable=broad-except + self.failed_files_error.append(f"{parquet_path}: {type(exc)} {exc}") + return exc + return None def buffer_file_handler(self, data_parquet_dir: Path, detector: str): """ @@ -663,12 +669,15 @@ def buffer_file_handler(self, data_parquet_dir: Path, detector: str): # Convert the remaining h5 files to parquet in parallel if there are any if len(files_to_read) > 0: - Parallel(n_jobs=len(files_to_read), verbose=10)( + error = Parallel(n_jobs=len(files_to_read), verbose=10)( delayed(self.create_buffer_file)(h5_path, parquet_path) for h5_path, parquet_path in files_to_read ) + if any(error): + raise RuntimeError(f"Conversion failed for some files. {error}") # Raise an error if the conversion failed for any files + # TODO: merge this and the previous error trackings if self.failed_files_error: raise FileNotFoundError( "Conversion failed for the following files:\n" + "\n".join(self.failed_files_error), @@ -743,7 +752,7 @@ def parquet_handler( dataframe = dfops.forward_fill_lazy( df=dataframe, - channels=channels, + columns=channels, before=overlap, iterations=self._config["dataframe"].get("forward_fill_iterations", 2), ) diff --git a/sed/loader/flash/utils.py b/sed/loader/flash/utils.py new file mode 100644 index 00000000..a2bc837a --- /dev/null +++ b/sed/loader/flash/utils.py @@ -0,0 +1,62 @@ +"""Helper functions for the flash loader.""" +from typing import Union + +import dask.dataframe +import numpy as np +import pandas as pd + +from ..utils import split_channel_bitwise + + +def split_dld_time_from_sector_id( + df: Union[pd.DataFrame, dask.dataframe.DataFrame], + tof_column: str = None, + sector_id_column: str = None, + sector_id_reserved_bits: int = None, + config: dict = None, +) -> Union[pd.DataFrame, dask.dataframe.DataFrame]: + """Converts the 8s time in steps to time in steps and sectorID. + + The 8s detector encodes the dldSectorID in the 3 least significant bits of the + dldTimeSteps channel. + + Args: + df (Union[pd.DataFrame, dask.dataframe.DataFrame]): Dataframe to use. + tof_column (str, optional): Name of the column containing the + time-of-flight steps. Defaults to config["dataframe"]["tof_column"]. + sector_id_column (str, optional): Name of the column containing the + sectorID. Defaults to config["dataframe"]["sector_id_column"]. + sector_id_reserved_bits (int, optional): Number of bits reserved for the + config (dict, optional): Configuration dictionary. Defaults to None. + + Returns: + Union[pd.DataFrame, dask.dataframe.DataFrame]: Dataframe with the new columns. + """ + if tof_column is None: + if config is None: + raise ValueError("Either tof_column or config must be given.") + tof_column = config["dataframe"]["tof_column"] + if sector_id_column is None: + if config is None: + raise ValueError("Either sector_id_column or config must be given.") + sector_id_column = config["dataframe"]["sector_id_column"] + if sector_id_reserved_bits is None: + if config is None: + raise ValueError("Either sector_id_reserved_bits or config must be given.") + sector_id_reserved_bits = config["dataframe"].get("sector_id_reserved_bits", None) + if sector_id_reserved_bits is None: + raise ValueError('No value for "sector_id_reserved_bits" found in config.') + + if sector_id_column in df.columns: + raise ValueError( + f"Column {sector_id_column} already in dataframe. This function is not idempotent.", + ) + df = split_channel_bitwise( + df=df, + input_column=tof_column, + output_columns=[sector_id_column, tof_column], + bit_mask=sector_id_reserved_bits, + overwrite=True, + types=[np.int8, np.int32], + ) + return df diff --git a/sed/loader/utils.py b/sed/loader/utils.py index 6048891a..9a1579af 100644 --- a/sed/loader/utils.py +++ b/sed/loader/utils.py @@ -3,7 +3,10 @@ from glob import glob from typing import cast from typing import List +from typing import Sequence +import dask.dataframe +import numpy as np from h5py import File from h5py import Group from natsort import natsorted @@ -89,3 +92,50 @@ def parse_h5_keys(h5_file: File, prefix: str = "") -> List[str]: # Return the list of channels return file_channel_list + + +def split_channel_bitwise( + df: dask.dataframe.DataFrame, + input_column: str, + output_columns: Sequence[str], + bit_mask: int, + overwrite: bool = False, + types: Sequence[type] = None, +) -> dask.dataframe.DataFrame: + """Splits a channel into two channels bitwise. + + This function splits a channel into two channels by separating the first n bits from + the remaining bits. The first n bits are stored in the first output column, the + remaining bits are stored in the second output column. + + Args: + df (dask.dataframe.DataFrame): Dataframe to use. + input_column (str): Name of the column to split. + output_columns (Sequence[str]): Names of the columns to create. + bit_mask (int): Bit mask to use for splitting. + overwrite (bool, optional): Whether to overwrite existing columns. + Defaults to False. + types (Sequence[type], optional): Types of the new columns. + + Returns: + dask.dataframe.DataFrame: Dataframe with the new columns. + """ + if len(output_columns) != 2: + raise ValueError("Exactly two output columns must be given.") + if input_column not in df.columns: + raise KeyError(f"Column {input_column} not in dataframe.") + if output_columns[0] in df.columns and not overwrite: + raise KeyError(f"Column {output_columns[0]} already in dataframe.") + if output_columns[1] in df.columns and not overwrite: + raise KeyError(f"Column {output_columns[1]} already in dataframe.") + if bit_mask < 0 or not isinstance(bit_mask, int): + raise ValueError("bit_mask must be a positive. integer") + if types is None: + types = [np.int8 if bit_mask < 8 else np.int16, np.int32] + elif len(types) != 2: + raise ValueError("Exactly two types must be given.") + elif not all(isinstance(t, type) for t in types): + raise ValueError("types must be a sequence of types.") + df[output_columns[0]] = (df[input_column] % 2**bit_mask).astype(types[0]) + df[output_columns[1]] = (df[input_column] // 2**bit_mask).astype(types[1]) + return df diff --git a/tests/calibrator/test_energy.py b/tests/calibrator/test_energy.py index ba931cc2..212a64e0 100644 --- a/tests/calibrator/test_energy.py +++ b/tests/calibrator/test_energy.py @@ -4,10 +4,12 @@ import glob import itertools import os +from copy import deepcopy from importlib.util import find_spec from typing import Any from typing import Dict +import dask.dataframe import numpy as np import pandas as pd import pytest @@ -269,6 +271,36 @@ def test_append_energy_axis_raises(): ) +def test_append_tof_ns_axis(): + """Function to test if the tof_ns calibration is correctly applied. + TODO: add further tests once the discussion about units is done. + """ + cfg = { + "dataframe": { + "tof_column": "t", + "tof_ns_column": "t_ns", + "tof_binning": 1, + "tof_binwidth": 1e-9, + }, + } + config = parse_config(config=cfg, folder_config={}, user_config={}, system_config={}) + loader = get_loader(loader_name="mpes", config=config) + + # from kwds + df, _ = loader.read_dataframe(folders=df_folder, collect_metadata=False) + ec = EnergyCalibrator(config=config, loader=loader) + df, _ = ec.append_tof_ns_axis(df, binwidth=2e-9, binning=1) + assert config["dataframe"]["tof_ns_column"] in df.columns + np.testing.assert_allclose(df[ec.tof_column], df[ec.tof_ns_column] / 4) + + # from config + df, _ = loader.read_dataframe(folders=df_folder, collect_metadata=False) + ec = EnergyCalibrator(config=config, loader=loader) + df, _ = ec.append_tof_ns_axis(df) + assert config["dataframe"]["tof_ns_column"] in df.columns + np.testing.assert_allclose(df[ec.tof_column], df[ec.tof_ns_column] / 2) + + amplitude = 2.5 # pylint: disable=invalid-name center = (730, 730) sample = np.array( @@ -538,3 +570,187 @@ def test_apply_energy_correction_raises( correction=correction_dict, ) assert config["dataframe"]["corrected_tof_column"] in df.columns + + +def test_add_offsets_functionality(): + """test the add_offsets function""" + config = parse_config( + config={ + "energy": { + "calibration": { + "energy_scale": "kinetic", + }, + "offset": { + "constant": 1, + "off1": { + "sign": 1, + "preserve_mean": True, + }, + "off2": {"sign": -1, "preserve_mean": False}, + "off3": {"sign": 1, "preserve_mean": False, "reduction": "mean"}, + }, + }, + }, + folder_config={}, + user_config={}, + system_config={}, + ) + params = { + "constant": 1, + "energy_column": "energy", + "columns": ["off1", "off2", "off3"], + "signs": [1, -1, 1], + "preserve_mean": [True, False, False], + "reductions": [None, None, "mean"], + } + df = pd.DataFrame( + { + "energy": [10, 20, 30, 40, 50, 60], + "off1": [1, 2, 3, 4, 5, 6], + "off2": [0.1, 0.2, 0.3, 0.4, 0.5, 0.6], + "off3": [10.1, 10.2, 10.3, 10.4, 10.5, 10.6], + }, + ) + t_df = dask.dataframe.from_pandas(df.copy(), npartitions=2) + ec = EnergyCalibrator( + config=config, + loader=get_loader("flash", config=config), + ) + res, meta = ec.add_offsets(t_df) + exp_vals = df["energy"].copy() + 1 + exp_vals += df["off1"] - df["off1"].mean() + exp_vals -= df["off2"] + exp_vals += df["off3"].mean() + np.testing.assert_allclose(res["energy"].values, exp_vals.values) + exp_meta = params.copy() + exp_meta["applied"] = True + assert meta == exp_meta + # test with explicit params + ec = EnergyCalibrator( + config=config, + loader=get_loader("flash", config=config), + ) + t_df = dask.dataframe.from_pandas(df.copy(), npartitions=2) + res, meta = ec.add_offsets(t_df, **params) # pylint disable=unexpected-keyword-arg + np.testing.assert_allclose(res["energy"].values, exp_vals.values) + params["applied"] = True + assert meta == params + + # test with different energy scale + + +def test_add_offset_raises(): + """test if add_offset raises the correct errors""" + cfg_dict = { + "energy": { + "calibration": { + "energy_scale": "kinetic", + }, + "offset": { + "constant": 1, + "off1": {"sign": -1, "preserve_mean": True}, + "off2": {"sign": -1, "preserve_mean": False}, + "off3": {"sign": 1, "preserve_mean": False, "reduction": "mean"}, + }, + }, + } + + df = pd.DataFrame( + { + "energy": [10, 20, 30, 40, 50, 60], + "off1": [1, 2, 3, 4, 5, 6], + "off2": [0.1, 0.2, 0.3, 0.4, 0.5, 0.6], + "off3": [10.1, 10.2, 10.3, 10.4, 10.5, 10.6], + }, + ) + t_df = dask.dataframe.from_pandas(df.copy(), npartitions=2) + # no sign in config + with pytest.raises(KeyError): + cfg = deepcopy(cfg_dict) + cfg["energy"]["offset"]["off1"].pop("sign") + config = parse_config(config=cfg, folder_config={}, user_config={}, system_config={}) + ec = EnergyCalibrator(config=cfg, loader=get_loader("flash", config=config)) + _ = ec.add_offsets(t_df) + + # no energy scale + with pytest.raises(ValueError): + cfg = deepcopy(cfg_dict) + cfg["energy"]["calibration"].pop("energy_scale") + config = parse_config(config=cfg, folder_config={}, user_config={}, system_config={}) + ec = EnergyCalibrator(config=cfg, loader=get_loader("flash", config=config)) + _ = ec.add_offsets(t_df) + + # invalid energy scale + with pytest.raises(ValueError): + cfg = deepcopy(cfg_dict) + cfg["energy"]["calibration"]["energy_scale"] = "wrong_value" + config = parse_config(config=cfg, folder_config={}, user_config={}, system_config={}) + ec = EnergyCalibrator(config=cfg, loader=get_loader("flash", config=config)) + _ = ec.add_offsets(t_df) + + # invalid sign + with pytest.raises(TypeError): + cfg = deepcopy(cfg_dict) + cfg["energy"]["offset"]["off1"]["sign"] = "wrong_type" + config = parse_config(config=cfg, folder_config={}, user_config={}, system_config={}) + ec = EnergyCalibrator(config=cfg, loader=get_loader("flash", config=config)) + _ = ec.add_offsets(t_df) + + # invalid constant + with pytest.raises(TypeError): + cfg = deepcopy(cfg_dict) + cfg["energy"]["offset"]["constant"] = "wrong_type" + config = parse_config(config=cfg, folder_config={}, user_config={}, system_config={}) + ec = EnergyCalibrator(config=cfg, loader=get_loader("flash", config=config)) + _ = ec.add_offsets(t_df) + + +def test_align_dld_sectors(): + """test functionality and error handling of align_dld_sectors""" + cfg_dict = { + "dataframe": { + "tof_column": "dldTimeSteps", + "sector_id_column": "dldSectorId", + "sector_delays": [-0.35, -0.25, -0.15, -0.05, 0.05, 0.15, 0.25, 0.35], + }, + } + df = pd.DataFrame( + { + "dldTimeSteps": [1, 2, 3, 4, 5, 6, 7, 8], + "dldSectorId": [0, 1, 2, 3, 4, 5, 6, 7], + }, + ) + # from config + config = parse_config(config=cfg_dict, folder_config={}, user_config={}, system_config={}) + ec = EnergyCalibrator(config=config, loader=get_loader("flash", config=config)) + t_df = dask.dataframe.from_pandas(df.copy(), npartitions=2) + res, meta = ec.align_dld_sectors(t_df) + assert meta["applied"] is True + assert meta["sector_delays"] == cfg_dict["dataframe"]["sector_delays"] + np.testing.assert_allclose( + res["dldTimeSteps"].values, + np.array([1, 2, 3, 4, 5, 6, 7, 8]) - np.array(cfg_dict["dataframe"]["sector_delays"]), + ) + + # from kwds + config = parse_config(config={}, folder_config={}, user_config={}, system_config={}) + ec = EnergyCalibrator(config=cfg_dict, loader=get_loader("flash", config=config)) + t_df = dask.dataframe.from_pandas(df.copy(), npartitions=2) + res, meta = ec.align_dld_sectors( + t_df, + sector_delays=cfg_dict["dataframe"]["sector_delays"], + sector_id_column="dldSectorId", + ) + assert meta["applied"] is True + assert meta["sector_delays"] == cfg_dict["dataframe"]["sector_delays"] + np.testing.assert_allclose( + res["dldTimeSteps"].values, + np.array([1, 2, 3, 4, 5, 6, 7, 8]) - np.array(cfg_dict["dataframe"]["sector_delays"]), + ) + with pytest.raises(ValueError): + cfg = deepcopy(cfg_dict) + cfg["dataframe"].pop("sector_delays") + config = parse_config(config=cfg, folder_config={}, user_config={}, system_config={}) + ec = EnergyCalibrator(config=config, loader=get_loader("flash", config=config)) + t_df = dask.dataframe.from_pandas(df.copy(), npartitions=2) + res, meta = ec.align_dld_sectors(t_df) diff --git a/tests/data/loader/flash/config.yaml b/tests/data/loader/flash/config.yaml index 9a614975..d7d09f63 100644 --- a/tests/data/loader/flash/config.yaml +++ b/tests/data/loader/flash/config.yaml @@ -11,27 +11,72 @@ core: paths: data_raw_dir: "tests/data/loader/flash/" data_parquet_dir: "tests/data/loader/flash/parquet" - + # These can be replaced by beamtime_id and year to automatically # find the folders on the desy cluster - + # beamtime_id: xxxxxxxx # year: 20xx dataframe: + # The name of the DAQ system to use. Necessary to resolve the filenames/paths. + daq: fl1user3 # The offset correction to the pulseId ubid_offset: 5 - # The name of the DAQ system to use. Necessary to resolve the filenames/paths. - daq: fl1user3 + # the number of iterations to fill the pulseId forward. + forward_fill_iterations: 2 + # if true, removes the 3 bits reserved for dldSectorID from the dldTimeandSector column + unravel_8s_detector_time_channel: True + + # dataframe column containing x coordinates + x_column: dldPosX + # dataframe column containing corrected x coordinates + corrected_x_column: "X" + # dataframe column containing kx coordinates + kx_column: "kx" + # dataframe column containing y coordinates + + y_column: dldPosY + # dataframe column containing corrected y coordinates + corrected_y_column: "Y" + # dataframe column containing kx coordinates + ky_column: "ky" + # dataframe column containing time-of-flight data - # The channels to load. - # channels have the following structure: - # channelAlias: - # format: per_pulse/per_electron/per_train - # group_name: the hdf5 group path - # slice: if the group contains multidim data, where to slice - + tof_column: dldTimeSteps + # dataframe column containing time-of-flight data in ns + tof_ns_column: dldTime + # dataframe column containing corrected time-of-flight data + corrected_tof_column: "tm" + + # time length of a base time-of-flight bin in seconds + tof_binwidth: 2.0576131995767355E-11 + # binning parameter for time-of-flight data. 2**tof_binning bins per base bin + tof_binning: 3 # power of 2, 4 means 8 bins per step + # dataframe column containing sector ID. obtained from dldTimeSteps column + sector_id_column: dldSectorID + + sector_delays: [0., 0., 0., 0., 0., 0., 0., 0.] + + jitter_cols: ["dldPosX", "dldPosY", "dldTimeSteps"] + + units: + dldPosX: 'step' + dldPosY: 'step' + dldTimeSteps: 'step' + tof_voltage: 'V' + extractorVoltage: 'V' + extractorCurrent: 'A' + cryoTemperature: 'K' + sampleTemperature: 'K' + dldTime: 'ns' + # delay: 'ps' + timeStamp: 's' + # energy: 'eV' + # E: 'eV' + kx: '1/A' + ky: '1/A' channels: # pulse ID is a necessary channel for using the loader. pulseId: @@ -43,17 +88,17 @@ dataframe: format: per_electron group_name: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/" slice: 1 - + dldPosY: format: per_electron group_name: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/" slice: 0 - - dldTime: + + dldTimeSteps: format: per_electron group_name: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/" slice: 3 - + # The auxillary channel has a special structure where the group further contains # a multidim structure so further aliases are defined below dldAux: @@ -68,7 +113,7 @@ dataframe: cryoTemperature: 4 sampleTemperature: 5 dldTimeBinSize: 15 - + # The prefixes of the stream names for different DAQ systems for parsing filenames # (Not to be changed by user) stream_name_prefixes: diff --git a/tests/loader/test_utils.py b/tests/loader/test_utils.py new file mode 100644 index 00000000..0ce2601c --- /dev/null +++ b/tests/loader/test_utils.py @@ -0,0 +1,92 @@ +"""Module tests.loader.test_utils, tests for the sed.load.utils file +""" +import dask.dataframe as dd +import numpy as np +import pandas as pd +import pytest + +from sed.loader.utils import split_channel_bitwise + +test_df = pd.DataFrame( + { + "a": [0, 1, 2, 3, 4, 5, 6, 7], + }, +) + + +def test_split_channel_bitwise(): + """Test split_channel_bitwise function""" + output_columns = ["b", "c"] + bit_mask = 2 + expected_output = pd.DataFrame( + { + "a": [0, 1, 2, 3, 4, 5, 6, 7], + "b": np.array([0, 1, 2, 3, 0, 1, 2, 3], dtype=np.int8), + "c": np.array([0, 0, 0, 0, 1, 1, 1, 1], dtype=np.int32), + }, + ) + df = dd.from_pandas(test_df, npartitions=2) + result = split_channel_bitwise(df, "a", output_columns, bit_mask) + pd.testing.assert_frame_equal(result.compute(), expected_output) + + +def test_split_channel_bitwise_raises(): + """Test split_channel_bitwise function raises""" + pytest.raises( + KeyError, + split_channel_bitwise, + test_df, + "wrong", + ["b", "c"], + 3, + False, + [np.int8, np.int16], + ) + pytest.raises( + ValueError, + split_channel_bitwise, + test_df, + "a", + ["b", "c", "wrong"], + 3, + False, + [np.int8, np.int16], + ) + pytest.raises( + ValueError, + split_channel_bitwise, + test_df, + "a", + ["b", "c"], + -1, + False, + [np.int8, np.int16], + ) + pytest.raises( + ValueError, + split_channel_bitwise, + test_df, + "a", + ["b", "c"], + 3, + False, + [np.int8, np.int16, np.int32], + ) + pytest.raises(ValueError, split_channel_bitwise, test_df, "a", ["b", "c"], 3, False, [np.int8]) + pytest.raises( + ValueError, + split_channel_bitwise, + test_df, + "a", + ["b", "c"], + 3, + False, + ["wrong", np.int16], + ) + other_df = pd.DataFrame( + { + "a": [0, 1, 2, 3, 4, 5, 6, 7], + "b": [0, 1, 2, 3, 4, 5, 6, 7], + }, + ) + pytest.raises(KeyError, split_channel_bitwise, other_df, "a", ["b", "c"], 3, False, None) diff --git a/tests/test_dfops.py b/tests/test_dfops.py index fabb9236..59d5cd1a 100644 --- a/tests/test_dfops.py +++ b/tests/test_dfops.py @@ -1,15 +1,17 @@ """This file contains code that performs several tests for the dfops functions """ +import dask.dataframe as ddf import numpy as np import pandas as pd -import dask.dataframe as ddf import pytest from sed.core.dfops import apply_filter from sed.core.dfops import apply_jitter +from sed.core.dfops import backward_fill_lazy from sed.core.dfops import drop_column -from sed.core.dfops import map_columns_2d from sed.core.dfops import forward_fill_lazy +from sed.core.dfops import map_columns_2d +from sed.core.dfops import offset_by_other_columns N_PTS = 100 @@ -77,62 +79,300 @@ def swap(x, y): def test_forward_fill_lazy_sparse_nans(): - """ test that a lazy forward fill works as expected with sparse nans""" + """test that a lazy forward fill works as expected with sparse nans""" t_df = df.copy() - t_df['energy'][::2] = np.nan + t_df["energy"][::2] = np.nan t_dask_df = ddf.from_pandas(t_df, npartitions=N_PARTITIONS) - t_dask_df = forward_fill_lazy(t_dask_df, 'energy', before='max') + t_dask_df = forward_fill_lazy(t_dask_df, "energy", before="max") t_df = t_df.ffill() pd.testing.assert_frame_equal(t_df, t_dask_df.compute()) def test_forward_fill_lazy_full_partition_nans(): - """ test that a lazy forward fill works as expected with a full partition of nans""" + """test that a lazy forward fill works as expected with a full partition of nans""" t_df = df.copy() - t_df['energy'][5:25] = np.nan + t_df["energy"][5:25] = np.nan t_dask_df = ddf.from_pandas(t_df, npartitions=N_PARTITIONS) - t_dask_df = forward_fill_lazy(t_dask_df, 'energy', before='max') + t_dask_df = forward_fill_lazy(t_dask_df, "energy", before="max") t_df = t_df.ffill() pd.testing.assert_frame_equal(t_df, t_dask_df.compute()) def test_forward_fill_lazy_consecutive_full_partition_nans(): - """ test that a lazy forward fill fails as expected on two consecutive partitions + """test that a lazy forward fill fails as expected on two consecutive partitions full of nans """ t_df = df.copy() - t_df['energy'][5:35] = np.nan + t_df["energy"][5:35] = np.nan t_dask_df = ddf.from_pandas(t_df, npartitions=N_PARTITIONS) - t_dask_df = forward_fill_lazy(t_dask_df, 'energy', before='max') + t_dask_df = forward_fill_lazy(t_dask_df, "energy", before="max") t_df = t_df.ffill() assert not t_df.equals(t_dask_df.compute()) def test_forward_fill_lazy_wrong_parameters(): - """ test that a lazy forward fill fails as expected on wrong parameters""" + """test that a lazy forward fill fails as expected on wrong parameters""" t_df = df.copy() - t_df['energy'][5:35] = np.nan + t_df["energy"][5:35] = np.nan t_dask_df = ddf.from_pandas(t_df, npartitions=N_PARTITIONS) with pytest.raises(TypeError): - t_dask_df = forward_fill_lazy(t_dask_df, 'energy', before='wrong parameter') + t_dask_df = forward_fill_lazy(t_dask_df, "energy", before="wrong parameter") def test_forward_fill_lazy_compute(): - """ test that a lazy forward fill works as expected with compute=True""" + """test that a lazy forward fill works as expected with compute=True""" t_df = df.copy() - t_df['energy'][5:35] = np.nan + t_df["energy"][5:35] = np.nan t_dask_df = ddf.from_pandas(t_df, npartitions=N_PARTITIONS) - t_dask_df_comp = forward_fill_lazy(t_dask_df, 'energy', before='max', compute_lengths=True) - t_dask_df_nocomp = forward_fill_lazy(t_dask_df, 'energy', before='max', compute_lengths=False) + t_dask_df_comp = forward_fill_lazy(t_dask_df, "energy", before="max", compute_lengths=True) + t_dask_df_nocomp = forward_fill_lazy(t_dask_df, "energy", before="max", compute_lengths=False) pd.testing.assert_frame_equal(t_dask_df_comp.compute(), t_dask_df_nocomp.compute()) def test_forward_fill_lazy_keep_head_nans(): - """ test that a lazy forward fill works as expected with missing values at the + """test that a lazy forward fill works as expected with missing values at the beginning of the dataframe""" t_df = df.copy() - t_df['energy'][:5] = np.nan + t_df["energy"][:5] = np.nan + t_dask_df = ddf.from_pandas(t_df, npartitions=N_PARTITIONS) + t_df = forward_fill_lazy(t_dask_df, "energy", before="max").compute() + assert np.all(np.isnan(t_df["energy"][:5])) + assert np.all(np.isfinite(t_df["energy"][5:])) + + +def test_forward_fill_lazy_no_channels(): + """test that a lazy forward fill raises an error when no channels are specified""" + t_df = df.copy() + t_dask_df = ddf.from_pandas(t_df, npartitions=N_PARTITIONS) + with pytest.raises(ValueError): + t_dask_df = forward_fill_lazy(t_dask_df, []) + + +def test_forward_fill_lazy_wrong_channels(): + """test that a lazy forward fill raises an error when the specified channels do not exist""" + t_df = df.copy() + t_dask_df = ddf.from_pandas(t_df, npartitions=N_PARTITIONS) + with pytest.raises(KeyError): + t_dask_df = forward_fill_lazy(t_dask_df, ["nonexistent_channel"]) + + +def test_forward_fill_lazy_multiple_iterations(): + """test that a lazy forward fill works as expected with multiple iterations""" + t_df = df.copy() + t_df["energy"][5:35] = np.nan t_dask_df = ddf.from_pandas(t_df, npartitions=N_PARTITIONS) - t_df = forward_fill_lazy(t_dask_df, 'energy', before='max').compute() - assert np.all(np.isnan(t_df['energy'][:5])) - assert np.all(np.isfinite(t_df['energy'][5:])) + t_dask_df = forward_fill_lazy(t_dask_df, "energy", before="max", iterations=5) + t_df = t_df.ffill() + pd.testing.assert_frame_equal(t_df, t_dask_df.compute()) + + +def test_backward_fill_lazy(): + """Test backward fill function""" + t_df = pd.DataFrame( + { + "A": [1, 2, np.nan, np.nan, 5, np.nan], + "B": [1, np.nan, 3, np.nan, 5, np.nan], + "C": [np.nan, np.nan, np.nan, np.nan, np.nan, 6], + "D": [1, 2, 3, 4, 5, 6], + }, + ) + t_dask_df = ddf.from_pandas(t_df, npartitions=2) + t_dask_df = backward_fill_lazy(t_dask_df, ["A", "B", "C"], after=2, iterations=2) + t_df = t_df.bfill().bfill() + pd.testing.assert_frame_equal(t_df, t_dask_df.compute()) + + +def test_backward_fill_lazy_no_channels(): + """Test that an error is raised when no channels are specified""" + t_df = pd.DataFrame( + { + "A": [1, 2, np.nan, np.nan, 5, np.nan], + "B": [1, np.nan, 3, np.nan, 5, np.nan], + "C": [np.nan, np.nan, np.nan, np.nan, np.nan, 6], + "D": [1, 2, 3, 4, 5, 6], + }, + ) + t_dask_df = ddf.from_pandas(t_df, npartitions=2) + with pytest.raises(ValueError): + t_dask_df = backward_fill_lazy(t_dask_df, [], after=2, iterations=2) + + +def test_backward_fill_lazy_wrong_channels(): + """Test that an error is raised when the specified channels do not exist""" + t_df = pd.DataFrame( + { + "A": [1, 2, np.nan, np.nan, 5, np.nan], + "B": [1, np.nan, 3, np.nan, 5, np.nan], + "C": [np.nan, np.nan, np.nan, np.nan, np.nan, 6], + "D": [1, 2, 3, 4, 5, 6], + }, + ) + t_dask_df = ddf.from_pandas(t_df, npartitions=2) + with pytest.raises(KeyError): + t_dask_df = backward_fill_lazy(t_dask_df, ["nonexistent_channel"], after=2, iterations=2) + + +def test_backward_fill_lazy_wrong_after(): + """Test that an error is raised when the 'after' parameter is not an integer or 'max'""" + t_df = pd.DataFrame( + { + "A": [1, 2, np.nan, np.nan, 5, np.nan], + "B": [1, np.nan, 3, np.nan, 5, np.nan], + "C": [np.nan, np.nan, np.nan, np.nan, np.nan, 6], + "D": [1, 2, 3, 4, 5, 6], + }, + ) + t_dask_df = ddf.from_pandas(t_df, npartitions=2) + with pytest.raises(TypeError): + t_dask_df = backward_fill_lazy( + t_dask_df, + ["A", "B", "C"], + after="wrong_parameter", + iterations=2, + ) + + +def test_backward_fill_lazy_multiple_iterations(): + """Test that the function works with multiple iterations""" + t_df = pd.DataFrame( + { + "A": [1, 2, np.nan, np.nan, 5, np.nan], + "B": [1, np.nan, 3, np.nan, 5, np.nan], + "C": [np.nan, np.nan, np.nan, np.nan, np.nan, 6], + "D": [1, 2, 3, 4, 5, 6], + }, + ) + t_dask_df = ddf.from_pandas(t_df, npartitions=2) + t_dask_df = backward_fill_lazy(t_dask_df, ["A", "B", "C"], after=2, iterations=2) + t_df = t_df.bfill().bfill().bfill().bfill() + pd.testing.assert_frame_equal(t_df, t_dask_df.compute()) + + +def test_offset_by_other_columns_functionality(): + """test that the offset_by_other_columns function works as expected""" + pd_df = pd.DataFrame( + { + "target": [10, 20, 30, 40, 50, 60], + "off1": [1, 2, 3, 4, 5, 6], + "off2": [0.1, 0.2, 0.3, 0.4, 0.5, 0.6], + "off3": [9.75, 9.85, 9.95, 10.05, 10.15, 10.25], + }, + ) + t_df = ddf.from_pandas(pd_df, npartitions=2) + res = offset_by_other_columns( + df=t_df.copy(), + target_column="target", + offset_columns=["off1"], + signs=[1], + ) + expected = [11, 22, 33, 44, 55, 66] + np.testing.assert_allclose(res["target"].values, expected) + + res = offset_by_other_columns( + df=t_df.copy(), + target_column="target", + offset_columns=["off1", "off2"], + signs=[1, -1], + ) + expected = [10.9, 21.8, 32.7, 43.6, 54.5, 65.4] + np.testing.assert_allclose(res["target"].values, expected) + + res = offset_by_other_columns( + df=t_df.copy(), + target_column="target", + offset_columns=["off3"], + signs=[1], + preserve_mean=True, + ) + expected = [9.75, 19.85, 29.95, 40.05, 50.15, 60.25] + np.testing.assert_allclose(res["target"].values, expected) + + res = offset_by_other_columns( + df=t_df.copy(), + target_column="target", + offset_columns=["off3"], # off3 has mean of 10 + signs=[1], + reductions="mean", + ) + expected = [20, 30, 40, 50, 60, 70] + np.testing.assert_allclose(res["target"].values, expected) + + +def test_offset_by_other_columns_pandas_not_working(): + """test that the offset_by_other_columns function raises an error when + used with pandas + """ + pd_df = pd.DataFrame( + { + "target": [10, 20, 30, 40, 50, 60], + "off1": [1, 2, 3, 4, 5, 6], + "off2": [0.1, 0.2, 0.3, 0.4, 0.5, 0.6], + "off3": [9.75, 9.85, 9.95, 10.05, 10.15, 10.25], + }, + ) + with pytest.raises(NotImplementedError): + _ = offset_by_other_columns( + df=pd_df, + target_column="target", + offset_columns=["off1"], + signs=[1], + ) + + +def test_offset_by_other_columns_rises(): + """Test that the offset_by_other_columns function raises an error when + the specified columns do not exist + """ + t_df = ddf.from_pandas( + pd.DataFrame( + { + "target": [10, 20, 30, 40, 50, 60], + "off1": [1, 2, 3, 4, 5, 6], + "off2": [0.1, 0.2, 0.3, 0.4, 0.5, 0.6], + "off3": [10.1, 10.2, 10.3, 10.4, 10.5, 10.6], + }, + ), + npartitions=2, + ) + pytest.raises( + KeyError, + offset_by_other_columns, + df=t_df.copy(), + target_column="nonexistent_column", + offset_columns=["off1"], + signs=[1], + ) + pytest.raises( + KeyError, + offset_by_other_columns, + df=t_df.copy(), + target_column="target", + offset_columns=["off1", "nonexistent_column"], + signs=[1, 1], + ) + pytest.raises( + NotImplementedError, + offset_by_other_columns, + df=t_df.copy(), + target_column="target", + offset_columns=["off1"], + signs=[1], + reductions="not_mean", + ) + pytest.raises( + ValueError, + offset_by_other_columns, + df=t_df.copy(), + target_column="target", + offset_columns=["off1"], + signs=[1, 1], + ) + pytest.raises( + TypeError, + offset_by_other_columns, + df=t_df.copy(), + target_column="target", + offset_columns=["off1"], + signs=[1], + preserve_mean="asd", + ) diff --git a/tutorial/5 - hextof workflow.ipynb b/tutorial/5 - hextof workflow.ipynb new file mode 100644 index 00000000..e17a8a73 --- /dev/null +++ b/tutorial/5 - hextof workflow.ipynb @@ -0,0 +1,395 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "notebookRunGroups": { + "groupValue": "1" + } + }, + "outputs": [], + "source": [ + "from pathlib import Path\n", + "\n", + "from sed import SedProcessor\n", + "import sed\n", + "import numpy as np\n", + "\n", + "%matplotlib inline\n", + "# %matplotlib ipympl\n", + "import matplotlib.pyplot as plt" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "notebookRunGroups": { + "groupValue": "1" + } + }, + "outputs": [], + "source": [ + "%matplotlib widget" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "notebookRunGroups": { + "groupValue": "1" + } + }, + "outputs": [], + "source": [ + "local_path = Path(sed.__file__).parent.parent/'tutorial/'\n", + "config_file = local_path/'hextof_config.yaml'\n", + "assert config_file.exists()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Loading Data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "config={\"core\": {\"paths\": {\n", + " \"data_raw_dir\": \"/asap3/flash/gpfs/pg2/2023/data/11019101/raw/hdf/offline/fl1user3\", \n", + " \"data_parquet_dir\": \"/home/agustsss/temp/sed_parquet/\"\n", + "}}}\n", + "sp = SedProcessor(runs=[44797], config=config, user_config=config_file, system_config={}, collect_metadata=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sp.add_jitter()\n", + "sp.align_dld_sectors()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# time-of-flight spectrum" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sp.append_tof_ns_axis()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "axes = ['sampleBias','dldTime']\n", + "bins = [5, 250]\n", + "ranges = [[28,33], [650,800]]\n", + "res = sp.compute(bins=bins, axes=axes, ranges=ranges)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "plt.figure()\n", + "res.plot.line(x='dldTime')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Energy Calibration" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## using lmfit" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "axes = ['sampleBias', 'dldTimeSteps']\n", + "bins = [5, 500]\n", + "ranges = [[28,33], [4000, 4800]]\n", + "res = sp.compute(bins=bins, axes=axes, ranges=ranges)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sp.load_bias_series(binned_data=res)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "ranges=(4120, 4200)\n", + "ref_id=0\n", + "sp.find_bias_peaks(ranges=ranges, ref_id=ref_id)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sp.calibrate_energy_axis(\n", + " ref_id=2,\n", + " ref_energy=-1,\n", + " method=\"lmfit\",\n", + " energy_scale='kinetic',\n", + " d={'value':1.,'min': .2, 'max':1.0, 'vary':True},\n", + " t0={'value':5e-7, 'min': 1e-7, 'max': 1e-6, 'vary':True},\n", + " E0={'value': 0., 'min': -100, 'max': 100, 'vary': True},\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sp.append_energy_axis()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sp.dataframe[['dldTime','dldTimeSteps','energy','dldSectorID']].head()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "axes = ['sampleBias', 'energy']\n", + "bins = [5, 500]\n", + "ranges = [[28,33], [-10,10]]\n", + "res = sp.compute(bins=bins, axes=axes, ranges=ranges)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "plt.figure()\n", + "res.mean('sampleBias').plot.line(x='energy',linewidth=3)\n", + "res.plot.line(x='energy',linewidth=1,alpha=.5);" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sp.dataframe" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sp.add_energy_offset(\n", + " constant=0,\n", + " columns=['sampleBias','monochromatorPhotonEnergy','tofVoltage'],\n", + " signs=[1,-1,-1],\n", + " preserve_mean=True,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sp.dataframe" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "axes = ['sampleBias', 'energy']\n", + "bins = [5, 500]\n", + "ranges = [[28,33], [-10,2]]\n", + "res_fit = sp.compute(bins=bins, axes=axes, ranges=ranges)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "plt.figure()\n", + "ax = plt.subplot(111)\n", + "res_fit.energy.attrs['unit'] = 'eV'\n", + "res_fit.mean('sampleBias').plot.line(x='energy',linewidth=3, ax=ax)\n", + "res_fit.plot.line(x='energy',linewidth=1,alpha=.5,label='all',ax=ax);" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### save the calibration parameters\n", + "This is currently overwriting all other values in the config file you point at. Do not overwrite local settings! \n", + "This should be fixed in the future." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sp.save_energy_calibration(filename=local_path/'energy_calibration.yaml')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# load and process from config" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "notebookRunGroups": { + "groupValue": "2" + } + }, + "outputs": [], + "source": [ + "config_dict={\"core\": {\"paths\": {\n", + " \"data_raw_dir\": \"/asap3/flash/gpfs/pg2/2023/data/11019101/raw/hdf/offline/fl1user3\", \n", + " \"data_parquet_dir\": \"/home/agustsss/temp/sed_parquet/\"\n", + "}}}\n", + "# config = sed.core.config.parse_config(config=config_dict, folder_config=local_path/'energy_calibration.yaml', user_config={}, system_config={})\n", + "sp = SedProcessor(runs=[44797], config=config_dict, folder_config=local_path/'energy_calibration.yaml', user_config=config_file, collect_metadata=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "notebookRunGroups": { + "groupValue": "2" + } + }, + "outputs": [], + "source": [ + "sp.add_jitter()\n", + "sp.align_dld_sectors()\n", + "sp.append_tof_ns_axis()\n", + "sp.append_energy_axis()\n", + "sp.add_energy_offset()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "axes = ['sampleBias', 'energy']\n", + "bins = [5, 500]\n", + "ranges = [[28,33], [-20,5]]\n", + "res_fit = sp.compute(bins=bins, axes=axes, ranges=ranges)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "plt.figure()\n", + "ax = plt.subplot(111)\n", + "res_fit.energy.attrs['unit'] = 'eV'\n", + "res_fit.mean('sampleBias').plot.line(x='energy',linewidth=3, ax=ax)\n", + "res_fit.plot.line(x='energy',linewidth=1,alpha=.5,label='all',ax=ax);" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "sed38", + "language": "python", + "name": "sed38" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.18" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/tutorial/Flash energy calibration.ipynb b/tutorial/Flash energy calibration.ipynb deleted file mode 100755 index dceacdbb..00000000 --- a/tutorial/Flash energy calibration.ipynb +++ /dev/null @@ -1,206 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "id": "39b2e62a", - "metadata": {}, - "outputs": [], - "source": [ - "%load_ext autoreload\n", - "%autoreload 2\n", - "\n", - "from sed import SedProcessor\n", - "import sed\n", - "import numpy as np\n", - "\n", - "# %matplotlib inline\n", - "%matplotlib widget\n", - "import matplotlib.pyplot as plt" - ] - }, - { - "cell_type": "markdown", - "id": "4d78d236", - "metadata": {}, - "source": [ - "# Try to calibrate energy" - ] - }, - { - "cell_type": "markdown", - "id": "a62f084f", - "metadata": {}, - "source": [ - "## Spin-integrated branch, E_TOF=10eV\n", - "single scan, move sample bias manually every 2000 pulses." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "7dabbe92", - "metadata": {}, - "outputs": [], - "source": [ - "sp = SedProcessor(runs=[44638], config=\"config_flash_energy_calib.yaml\", system_config={})" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "248a41a7", - "metadata": {}, - "outputs": [], - "source": [ - "sp.add_jitter()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "2b867e40", - "metadata": {}, - "outputs": [], - "source": [ - "axes = ['sampleBias', 'dldTime']\n", - "bins = [6, 500]\n", - "ranges = [[0,6], [40000, 55000]]\n", - "res = sp.compute(bins=bins, axes=axes, ranges=ranges)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "62081458", - "metadata": {}, - "outputs": [], - "source": [ - "sp.load_bias_series(binned_data=res)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "424af94e", - "metadata": {}, - "outputs": [], - "source": [ - "ranges=(44500, 46000)\n", - "ref_id=3\n", - "sp.find_bias_peaks(ranges=ranges, ref_id=ref_id)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "034eff42", - "metadata": {}, - "outputs": [], - "source": [ - "ref_id=3\n", - "ref_energy=-.3\n", - "sp.calibrate_energy_axis(ref_id=ref_id, ref_energy=ref_energy, method=\"lstsq\", order=3)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "bbbfe992", - "metadata": {}, - "outputs": [], - "source": [ - "ref_id=3\n", - "ref_energy=-.3\n", - "sp.calibrate_energy_axis(ref_id=ref_id, ref_energy=ref_energy, method=\"lmfit\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "e14d6cef", - "metadata": {}, - "outputs": [], - "source": [ - "sp.append_energy_axis(preview=True)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "59c83544", - "metadata": {}, - "outputs": [], - "source": [ - "axes = ['sampleBias', 'energy']\n", - "bins = [6, 1000]\n", - "ranges = [[0,6], [-5, 5]]\n", - "res = sp.compute(bins=bins, axes=axes, ranges=ranges)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "addba4cb", - "metadata": {}, - "outputs": [], - "source": [ - "plt.figure()\n", - "res[3,:].plot()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "1676ec57", - "metadata": {}, - "outputs": [], - "source": [ - "axes = ['sampleBias', 'energy', 'dldPosX']\n", - "bins = [6, 100, 480]\n", - "ranges = [[0,6], [-2, 1], [420,900]]\n", - "res = sp.compute(bins=bins, axes=axes, ranges=ranges)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "ad199c40", - "metadata": {}, - "outputs": [], - "source": [ - "plt.figure()\n", - "res[3, :, :].plot()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "3a4ae88c", - "metadata": {}, - "outputs": [], - "source": [] - } - ], - "metadata": { - "kernelspec": { - "display_name": ".pyenv", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.8.12" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/tutorial/config_flash_energy_calib.yaml b/tutorial/hextof_config.yaml old mode 100755 new mode 100644 similarity index 63% rename from tutorial/config_flash_energy_calib.yaml rename to tutorial/hextof_config.yaml index 675aee1a..4e16ecca --- a/tutorial/config_flash_energy_calib.yaml +++ b/tutorial/hextof_config.yaml @@ -5,18 +5,56 @@ core: beamline: pg2 instrument: hextof paths: - data_raw_dir: "." - data_parquet_dir: "./parquet" + data_raw_dir: "/path/to/data" + # change this to a local directory where you want to store the parquet files + data_parquet_dir: "/path/to/parquet" + +binning: + num_cores: 10 dataframe: ubid_offset: 5 daq: fl1user3 - channels: + forward_fill_iterations: 2 + split_sector_id_from_dld_time: True + sector_id_reserved_bits: 3 + x_column: dldPosX + corrected_x_column: "X" + kx_column: "kx" + y_column: dldPosY + corrected_y_column: "Y" + ky_column: "ky" + tof_column: dldTimeSteps + tof_ns_column: dldTime + corrected_tof_column: "tm" + bias_column: "sampleBias" + tof_binwidth: 2.0576131995767355E-11 # in seconds + tof_binning: 3 + sector_id_column: dldSectorID + sector_delays: [0., 0., 0., 0., 0., 0., 0., 0.] + jitter_cols: ["dldPosX", "dldPosY", "dldTimeSteps"] + units: + dldPosX: 'step' + dldPosY: 'step' + dldTimeSteps: 'step' + tof_voltage: 'V' + extractorVoltage: 'V' + extractorCurrent: 'A' + cryoTemperature: 'K' + sampleTemperature: 'K' + dldTime: 'ns' + # delay: 'ps' + timeStamp: 's' + # energy: 'eV' + # E: 'eV' + kx: '1/A' + ky: '1/A' + + channels: timeStamp: format: per_train group_name: "/uncategorised/FLASH.DIAG/TIMINGINFO/TIME1.BUNCH_FIRST_INDEX.1/" - pulseId: format: per_electron group_name: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/" @@ -29,7 +67,7 @@ dataframe: format: per_electron group_name: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/" slice: 0 - dldTime: + dldTimeSteps: format: per_electron group_name: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/" slice: 3 @@ -46,43 +84,28 @@ dataframe: sampleTemperature: 5 crystalVoltage: 6 dldTimeBinSize: 15 - - - # ADC containing the pulser sign (1: value approx. 35000, 0: 33000) - pulserSignAdc: + pulserSignAdc: # ADC containing the pulser sign (1: value approx. 35000, 0: 33000) format: per_pulse group_name: "/FL1/Experiment/PG/SIS8300 100MHz ADC/CH6/TD/" #slice: 0 - monochromatorPhotonEnergy: format: per_train group_name: "/FL1/Beamlines/PG/Monochromator/monochromator photon energy/" - - - # The GMDs can not be read yet... - gmdBda: + gmdBda: # The GMDs can not be read yet... format: per_train group_name: "/FL1/Photon Diagnostic/GMD/Average energy/energy BDA/" # slice: ":" - - #gmdTunnel: + #gmdTunnel: # The GMDs can not be read yet... # format: per_pulse # group_name: "/FL1/Photon Diagnostic/GMD/Pulse resolved energy/energy tunnel/" # slice: ":" - - # Here we use the DBC2 BAM as the "normal" one is broken. - bam: + bam: # Here we use the DBC2 BAM as the "normal" one is broken. format: per_pulse group_name: "/uncategorised/FLASH.SDIAG/BAM.DAQ/FL0.DBC2.ARRIVAL_TIME.ABSOLUTE.SA1.COMP/" - delayStage: format: per_train group_name: "/zraw/FLASH.SYNC/LASER.LOCK.EXP/F1.PG.OSC/FMC0.MD22.1.ENCODER_POSITION.RD/dGroup/" - tof_column: dldTime - bias_column: sampleBias - tof_binning: 3 - stream_name_prefixes: pbd: "GMD_DATA_gmd_data" pbd2: "FL2PhotDiag_pbd2_gmd_data" @@ -91,12 +114,8 @@ dataframe: fl1user3: "FLASH1_USER3_stream_2" fl2user1: "FLASH2_USER1_stream_2" fl2user2: "FLASH2_USER2_stream_2" + beamtime_dir: pg2: "/asap3/flash/gpfs/pg2/" hextof: "/asap3/fs-flash-o/gpfs/hextof/" wespe: "/asap3/fs-flash-o/gpfs/wespe/" - -nexus: - reader: "mpes" - definition: "NXmpes" - input_files: ["/home/kutnyakd/__beamtimes/Spin_2023/NXmpes_config_HEXTOF_light.json"]