diff --git a/sed/calibrator/energy.py b/sed/calibrator/energy.py index 318ae75a..d9427cb0 100644 --- a/sed/calibrator/energy.py +++ b/sed/calibrator/energy.py @@ -911,6 +911,10 @@ def append_tof_ns_axis( time-of-flight steps. Defaults to config["dataframe"]["tof_column"]. tof_ns_column (str, optional): Name of the column to store the time-of-flight in nanoseconds. Defaults to config["dataframe"]["tof_ns_column"]. + binwidth (float, optional): Time-of-flight binwidth in ns. + Defaults to config["energy"]["binwidth"]. + binning (int, optional): Time-of-flight binning factor. + Defaults to config["energy"]["binning"]. Returns: dask.dataframe.DataFrame: Dataframe with the new columns. @@ -926,8 +930,6 @@ def append_tof_ns_axis( if tof_ns_column is None: tof_ns_column = self.tof_ns_column - if tof_ns_column is None: - raise AttributeError("tof_ns_column not set!") df[tof_ns_column] = tof2ns( binwidth, @@ -1423,26 +1425,34 @@ def gather_correction_metadata(self, correction: dict = None) -> dict: def align_dld_sectors( self, df: Union[pd.DataFrame, dask.dataframe.DataFrame], - **kwds, + tof_column: str = None, + sector_id_column: str = None, + sector_delays: np.ndarray = None, ) -> Tuple[Union[pd.DataFrame, dask.dataframe.DataFrame], dict]: """Aligns the time-of-flight axis of the different sections of a detector. Args: df (Union[pd.DataFrame, dask.dataframe.DataFrame]): Dataframe to use. + tof_column (str, optional): Name of the column containing the time-of-flight values. + Defaults to config["dataframe"]["tof_column"]. + sector_id_column (str, optional): Name of the column containing the sector id values. + Defaults to config["dataframe"]["sector_id_column"]. + sector_delays (np.ndarray, optional): Array containing the sector delays. Defaults to + config["dataframe"]["sector_delays"]. - Returns: + Returns: dask.dataframe.DataFrame: Dataframe with the new columns. dict: Metadata dictionary. """ - sector_delays = kwds.pop("sector_delays", self.sector_delays) - sector_id_column = kwds.pop("sector_id_column", self.sector_id_column) + sector_delays = sector_delays or self.sector_delays + sector_id_column = sector_id_column or self.sector_id_column if sector_delays is None or sector_id_column is None: raise ValueError( "No value for sector_delays or sector_id_column found in config." - "config file is not properly configured for dld sector correction.", + "Config file is not properly configured for dld sector correction.", ) - tof_column = kwds.pop("tof_column", self.tof_column) + tof_column = tof_column or self.tof_column # align the 8s sectors sector_delays_arr = dask.array.from_array(sector_delays) @@ -1467,8 +1477,8 @@ def apply_energy_offset( subtract_mean: Union[bool, Sequence[bool]] = None, energy_column: str = None, reductions: Union[str, Sequence[str]] = None, - ) -> Union[pd.DataFrame, dask.dataframe.DataFrame]: - """Apply an energy shift to the given column(s). + ) -> Tuple[Union[pd.DataFrame, dask.dataframe.DataFrame], dict]: + """Apply an offset to the energy column by the values of the provided columns. If no parameter is passed to this function, the offset is applied as defined in the config file. If parameters are passed, they are used to generate a new offset dictionary @@ -1479,19 +1489,16 @@ def apply_energy_offset( Args: df (Union[pd.DataFrame, dask.dataframe.DataFrame]): Dataframe to use. constant (float, optional): The constant to shift the energy axis by. - columns (Union[str, Sequence[str]]): Name of the column(s) to apply the shift to. + columns (Union[str, Sequence[str]]): Name of the column(s) to apply the shift from. signs (Union[int, Sequence[int]]): Sign of the shift to apply. (+1 or -1) A positive sign shifts the energy axis to higher kinetic energies. Defaults to +1. energy_column (str, optional): Name of the column containing the energy values. - reductions (str): The reduction to apply to the column. If "rolled" it searches for - columns with suffix "_rolled", e.g. "sampleBias_rolled", as those generated by the - ``SedProcessor.smooth_columns()`` function. Otherwise should be an available method + reductions (str): The reduction to apply to the column. Should be an available method of dask.dataframe.Series. For example "mean". In this case the function is applied to the column to generate a single value for the whole dataset. If None, the shift is applied per-dataframe-row. Defaults to None. subtract_mean (bool): Whether to subtract the mean of the column before applying the shift. Defaults to False. - **kwargs: Additional arguments for the rolling average function. """ if energy_column is None: energy_column = self.energy_column @@ -1506,16 +1513,12 @@ def apply_energy_offset( constant = v print(f"Applying constant offset of {constant} to energy axis.") else: - assert k in df.columns, f"Column {k} not found in dataframe." + if k not in df.columns: + raise KeyError(f"Column {k} not found in dataframe.") columns.append(k) signs.append(v.get("sign", 1)) subtract_mean.append(v.get("subtract_mean", False)) reductions.append(v.get("reduction", None)) - s = "+" if signs[-1] > 0 else "-" - msg = f"Shifting {energy_column} by {s} {k}" - if subtract_mean[-1]: - msg += " and subtracting mean" - print(msg) else: # use passed parameters if columns is not None and (signs is None or subtract_mean is None): @@ -1539,34 +1542,16 @@ def apply_energy_offset( elif energy_scale == "kinetic": pass elif energy_scale is None: - raise ValueError("Energy scale not set. Please run `set_energy_scale` first.") - # check if columns have been smoothed - columns_: List[str] = [] - reductions_: List[str] = [] - to_roll: List[str] = [] - for c, r in zip(columns, reductions): - if r == "rolled": - cname = c + "_rolled" - if cname not in df.columns: - to_roll.append(cname) - else: - columns_.append(cname) - reductions_.append(None) - else: - columns_.append(c) - reductions_.append(r) - if len(to_roll) > 0: - raise RuntimeError( - f"Columns {to_roll} have not been smoothed. please run `smooth_column`", - ) + raise ValueError("Energy scale not set. I don't know how to interpret the sign.") + # apply offset df = dfops.apply_offset_from_columns( df=df, target_column=energy_column, - offset_columns=columns_, + offset_columns=columns, signs=signs, subtract_mean=subtract_mean, - reductions=reductions_, + reductions=reductions, inplace=True, ) # apply constant @@ -2062,6 +2047,12 @@ def fit_energy_calibation( - **'kinetic'**: increasing energy with decreasing TOF. - **'binding'**: increasing energy with increasing TOF. + t0 (float, optional): constrains and initial values for the fit parameter t0, corresponding + to the time of flight offset. Defaults to 1e-6. + E0 (float, optional): constrains and initial values for the fit parameter E0, corresponding + to the energy offset. Defaults to min(vals). + d (float, optional): constrains and initial values for the fit parameter d, corresponding + to the drift distance. Defaults to 1. Returns: dict: A dictionary of fitting parameters including the following, @@ -2337,7 +2328,7 @@ def tof2ns( binwidth: float, binning: int, t: float, -) -> Union[List[float], np.ndarray]: +) -> float: """Converts the time-of-flight steps to time-of-flight in nanoseconds. designed for use with dask.dataframe.DataFrame.map_partitions. @@ -2349,5 +2340,5 @@ def tof2ns( Returns: float: Converted time in nanoseconds. """ - val = t * 1e9 * binwidth * 2**binning + val = t * 1e9 * binwidth * 2.0**binning return val diff --git a/sed/config/default.yaml b/sed/config/default.yaml index dead1293..0450faae 100644 --- a/sed/config/default.yaml +++ b/sed/config/default.yaml @@ -9,6 +9,8 @@ dataframe: y_column: "Y" # dataframe column containing time-of-flight data tof_column: "t" + # dataframe column containing time-of-flight data in nanoseconds + tof_ns_column: "t_ns" # dataframe column containing analog-to-digital data adc_column: "ADC" # dataframe column containing bias voltage data diff --git a/sed/config/flash_example_config.yaml b/sed/config/flash_example_config.yaml index e321d295..54c6eec6 100644 --- a/sed/config/flash_example_config.yaml +++ b/sed/config/flash_example_config.yaml @@ -36,22 +36,19 @@ dataframe: # dataframe column containing kx coordinates kx_column: "kx" # dataframe column containing y coordinates - y_column: dldPosY # dataframe column containing corrected y coordinates corrected_y_column: "Y" # dataframe column containing kx coordinates ky_column: "ky" # dataframe column containing time-of-flight data - tof_column: dldTimeSteps # dataframe column containing time-of-flight data in ns tof_ns_column: dldTime # dataframe column containing corrected time-of-flight data corrected_tof_column: "tm" - - # time length of a base time-of-flight bin in ns - tof_binwidth: 0.020576131995767355 # 0.16460905596613884 + # time length of a base time-of-flight bin in seconds + tof_binwidth: 2.0576131995767355E-11 # binning parameter for time-of-flight data. 2**tof_binning bins per base bin tof_binning: 3 # power of 2, 3 means 8 bins per step # dataframe column containing sector ID. obtained from dldTimeSteps column diff --git a/sed/core/dfops.py b/sed/core/dfops.py index 9e1532b4..49692879 100644 --- a/sed/core/dfops.py +++ b/sed/core/dfops.py @@ -265,66 +265,6 @@ def backward_fill_partition(df): return df -def rolling_average_on_acquisition_time( - df: Union[pd.DataFrame, dask.dataframe.DataFrame], - rolling_group_channel: str = None, - columns: Union[str, Sequence[str]] = None, - window: float = None, - sigma: float = 2, - config: dict = None, -) -> Union[pd.DataFrame, dask.dataframe.DataFrame]: - """Perform a rolling average with a gaussian weighted window. - - The rolling average is performed on the acquisition time instead of the index. - This can be a time-stamp or similar, such as the trainID at FLASH. - This is necessary first when considering the recorded electrons do not come at a regular time - interval, but even more importantly when loading multiple datasets with gaps in the acquisition. - - - In order to preserve the number of points, the first and last "window" - number of points are substituted with the original signal. - # TODO: this is currently very slow, and could do with a remake. - - Args: - df (Union[pd.DataFrame, dask.dataframe.DataFrame]): Dataframe to use. - group_channel: (str): Name of the column on which to group the data - cols (str): Name of the column on which to perform the rolling average - window (float): Size of the rolling average window - sigma (float): number of standard deviations for the gaussian weighting of the window. - a value of 2 corresponds to a gaussian with sigma equal to half the window size. - Smaller values reduce the weighting in the window frame. - - Returns: - Union[pd.DataFrame, dask.dataframe.DataFrame]: Dataframe with the new columns. - """ - if rolling_group_channel is None: - if config is None: - raise ValueError("Either group_channel or config must be given.") - rolling_group_channel = config["dataframe"]["rolling_group_channel"] - if isinstance(columns, str): - columns = [columns] - s = f"rolling average over {rolling_group_channel} on " - for c in columns: - s += f"{c}, " - print(s) - with ProgressBar(): - df_ = df.groupby(rolling_group_channel).agg({c: "mean" for c in columns}).compute() - df_["dt"] = pd.to_datetime(df_.index, unit="s") - df_["ts"] = df_.index - for c in columns: - df_[c + "_rolled"] = ( - df_[c] - .interpolate(method="nearest") - .rolling(window, center=True, win_type="gaussian") - .mean(std=window / sigma) - .fillna(df_[c]) - ) - df_ = df_.drop(c, axis=1) - if c + "_rolled" in df.columns: - df = df.drop(c + "_rolled", axis=1) - return df.merge(df_, left_on="timeStamp", right_on="ts").drop(["ts", "dt"], axis=1) - - def apply_offset_from_columns( df: Union[pd.DataFrame, dask.dataframe.DataFrame], target_column: str, @@ -342,7 +282,9 @@ def apply_offset_from_columns( offset_columns (str): Name of the column(s) to use for the offset. signs (int): Sign of the offset. Defaults to 1. reductions (str): Reduction function to use for the offset. Defaults to "mean". - + subtract_mean (bool): Whether to subtract the mean of the offset column. Defaults to False. + If a list is given, it must have the same length as offset_columns. Otherwise the value + passed is used for all columns. Returns: Union[pd.DataFrame, dask.dataframe.DataFrame]: Dataframe with the new column. """ @@ -351,8 +293,7 @@ def apply_offset_from_columns( if not inplace: df[target_column + "_offset"] = df[target_column] target_column = target_column + "_offset" - if reductions is None: - reductions = "mean" + if isinstance(reductions, str): reductions = [reductions] * len(offset_columns) if isinstance(signs, int): @@ -363,11 +304,17 @@ def apply_offset_from_columns( subtract_mean = [subtract_mean] * len(offset_columns) for col, sign, red, submean in zip(offset_columns, signs, reductions, subtract_mean): - assert col in df.columns, f"{col} not in dataframe!" + if col not in df.columns: + raise KeyError(f"{col} not in dataframe!") if red is not None: df[target_column] = df[target_column] + sign * df[col].agg(red) else: df[target_column] = df[target_column] + sign * df[col] if submean: df[target_column] = df[target_column] - sign * df[col].mean() + s = "+" if sign > 0 else "-" + msg = f"Shifting {target_column} by {s} {col}" + if submean[-1]: + msg += " and subtracting mean" + print(msg) return df diff --git a/sed/core/processor.py b/sed/core/processor.py index ddbb01bd..3a528de5 100644 --- a/sed/core/processor.py +++ b/sed/core/processor.py @@ -6,7 +6,6 @@ from typing import cast from typing import Dict from typing import List -from typing import Literal from typing import Sequence from typing import Tuple from typing import Union @@ -25,7 +24,6 @@ from sed.core.config import parse_config from sed.core.config import save_config from sed.core.dfops import apply_jitter -from sed.core.dfops import rolling_average_on_acquisition_time from sed.core.metadata import MetaHandler from sed.diagnostics import grid_histogram from sed.io import to_h5 @@ -1190,9 +1188,8 @@ def apply_energy_offset( if energy_column not in self._dataframe.columns: raise ValueError( f"Energy column {energy_column} not found in dataframe! " - "Run energy calibration first", + "Run `append energy axis` first.", ) - metadata = {} self._dataframe, metadata = self.ec.apply_energy_offset( df=self._dataframe, constant=constant, @@ -1237,12 +1234,21 @@ def append_tof_ns_axis( duplicate_policy="append", ) - def align_dld_sectors(self, **kwargs): - """Align the 8s sectors of the HEXTOF endstation.""" + def align_dld_sectors(self, sector_delays: np.ndarray = None, **kwargs): + """Align the 8s sectors of the HEXTOF endstation. + + Args: + sector_delays (np.ndarray, optional): Array containing the sector delays. Defaults to + config["dataframe"]["sector_delays"]. + """ if self._dataframe is not None: print("Aligning 8s sectors of dataframe") # TODO assert order of execution through metadata - self._dataframe, metadata = self.ec.align_dld_sectors(df=self._dataframe, **kwargs) + self._dataframe, metadata = self.ec.align_dld_sectors( + df=self._dataframe, + sector_delays=sector_delays, + **kwargs, + ) self._attributes.add( metadata, "dld_sector_alignment", @@ -1343,43 +1349,6 @@ def add_jitter( metadata.append(col) self._attributes.add(metadata, "jittering", duplicate_policy="append") - def smooth_columns( - self, - columns: Union[str, Sequence[str]] = None, - method: Literal["rolling"] = "rolling", - **kwargs, - ) -> None: - """Apply a filter along one or more columns of the dataframe. - - Currently only supports rolling average on acquisition time. - - Args: - columns (Union[str,Sequence[str]]): The colums onto which to apply the filter. - method (Literal['rolling'], optional): The filter method. Defaults to 'rolling'. - **kwargs: Keyword arguments passed to the filter method. - """ - if isinstance(columns, str): - columns = [columns] - for column in columns: - if column not in self._dataframe.columns: - raise ValueError(f"Cannot smooth {column}. Column not in dataframe!") - kwargs = {**self._config["smooth"], **kwargs} - if method == "rolling": - self._dataframe = rolling_average_on_acquisition_time( - df=self._dataframe, - rolling_group_channel=kwargs.get("rolling_group_channel", None), - columns=columns or kwargs.get("columns", None), - window=kwargs.get("window", None), - sigma=kwargs.get("sigma", None), - ) - else: - raise ValueError(f"Method {method} not supported!") - self._attributes.add( - columns, - "smooth", - duplicate_policy="append", - ) - def pre_binning( self, df_partitions: int = 100, diff --git a/tests/data/loader/flash/config.yaml b/tests/data/loader/flash/config.yaml index fa5d38f6..d7d09f63 100644 --- a/tests/data/loader/flash/config.yaml +++ b/tests/data/loader/flash/config.yaml @@ -11,10 +11,10 @@ core: paths: data_raw_dir: "tests/data/loader/flash/" data_parquet_dir: "tests/data/loader/flash/parquet" - + # These can be replaced by beamtime_id and year to automatically # find the folders on the desy cluster - + # beamtime_id: xxxxxxxx # year: 20xx @@ -24,8 +24,8 @@ dataframe: # The offset correction to the pulseId ubid_offset: 5 - # the number of iterations to fill the pulseId forward. - forward_fill_iterations: 2 + # the number of iterations to fill the pulseId forward. + forward_fill_iterations: 2 # if true, removes the 3 bits reserved for dldSectorID from the dldTimeandSector column unravel_8s_detector_time_channel: True @@ -50,8 +50,8 @@ dataframe: # dataframe column containing corrected time-of-flight data corrected_tof_column: "tm" - # time length of a base time-of-flight bin in ns - tof_binwidth: 0.020576131995767355 # 0.16460905596613884 + # time length of a base time-of-flight bin in seconds + tof_binwidth: 2.0576131995767355E-11 # binning parameter for time-of-flight data. 2**tof_binning bins per base bin tof_binning: 3 # power of 2, 4 means 8 bins per step # dataframe column containing sector ID. obtained from dldTimeSteps column @@ -88,17 +88,17 @@ dataframe: format: per_electron group_name: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/" slice: 1 - + dldPosY: format: per_electron group_name: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/" slice: 0 - + dldTimeSteps: format: per_electron group_name: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/" slice: 3 - + # The auxillary channel has a special structure where the group further contains # a multidim structure so further aliases are defined below dldAux: @@ -113,7 +113,7 @@ dataframe: cryoTemperature: 4 sampleTemperature: 5 dldTimeBinSize: 15 - + # The prefixes of the stream names for different DAQ systems for parsing filenames # (Not to be changed by user) stream_name_prefixes: