diff --git a/.github/workflows/e2e_testing.yml b/.github/workflows/e2e_testing.yml index 0a6c083a..7198e790 100644 --- a/.github/workflows/e2e_testing.yml +++ b/.github/workflows/e2e_testing.yml @@ -15,7 +15,7 @@ jobs: strategy: matrix: # test case name as defined in e2e_test_cases.yaml - test_case: [ "basic", "synchropasef", "astral", ] + test_case: [ "basic", "synchropasef", "astral", "astral_automatic_calibration", ] env: RUN_NAME: alphadia-${{github.sha}}-${{github.run_id}}-${{github.run_attempt}} BRANCH_NAME: ${{ github.head_ref || github.ref_name }} diff --git a/alphadia/constants/default.yaml b/alphadia/constants/default.yaml index d936fd5f..365c1b96 100644 --- a/alphadia/constants/default.yaml +++ b/alphadia/constants/default.yaml @@ -95,17 +95,20 @@ search_advanced: calibration: - # minimum number of times (epochs) the updated calibration target has to been passed - min_epochs: 3 + # minimum number of steps taken during the optimization lock (during which the elution groups used for optimization are extracted) + optimization_lock_min_steps: 0 # Number of precursors searched and scored per batch batch_size: 8000 - # recalibration target for the first epoch. For subsequent epochs, the target will increase by this amount. - recalibration_target: 200 + # minimum number of precursors to be found before search parameter optimization begins + optimization_lock_target: 200 - # TODO: remove as not relevant anymore - max_epochs: 20 + # the maximum number of steps that a given optimizer is permitted to take + max_steps: 20 + + # the maximum number of steps that a given optimizer is permitted to take + min_steps: 2 # TODO: remove this parameter final_full_calibration: False diff --git a/alphadia/workflow/base.py b/alphadia/workflow/base.py index b06fc5d7..3268fd5d 100644 --- a/alphadia/workflow/base.py +++ b/alphadia/workflow/base.py @@ -95,8 +95,22 @@ def load( self._calibration_manager.disable_mobility_calibration() # initialize the optimization manager + optimization_manager_config = { + "ms1_error": self.config["search_initial"]["initial_ms1_tolerance"], + "ms2_error": self.config["search_initial"]["initial_ms2_tolerance"], + "rt_error": self.config["search_initial"]["initial_rt_tolerance"], + "mobility_error": self.config["search_initial"][ + "initial_mobility_tolerance" + ], + "column_type": "library", + "num_candidates": self.config["search_initial"]["initial_num_candidates"], + "classifier_version": -1, + "fwhm_rt": self.config["optimization_manager"]["fwhm_rt"], + "fwhm_mobility": self.config["optimization_manager"]["fwhm_mobility"], + "score_cutoff": self.config["optimization_manager"]["score_cutoff"], + } self._optimization_manager = manager.OptimizationManager( - self.config["optimization_manager"], + optimization_manager_config, path=os.path.join(self.path, self.OPTIMIZATION_MANAGER_PATH), load_from_file=self.config["general"]["reuse_calibration"], figure_path=os.path.join(self.path, self.FIGURE_PATH), diff --git a/alphadia/workflow/optimization.py b/alphadia/workflow/optimization.py new file mode 100644 index 00000000..16e2e2b6 --- /dev/null +++ b/alphadia/workflow/optimization.py @@ -0,0 +1,581 @@ +# native imports +from abc import ABC, abstractmethod + +import matplotlib.pyplot as plt + +# alpha family imports +# third party imports +import pandas as pd +import seaborn as sns + +# alphadia imports +from alphadia.workflow import reporting + + +class BaseOptimizer(ABC): + def __init__( + self, + workflow, + reporter: None | reporting.Pipeline | reporting.Backend = None, + ): + """This class serves as a base class for the search parameter optimization process, which defines the parameters used for search. + + Parameters + ---------- + + workflow: peptidecentric.PeptideCentricWorkflow + The workflow object, which includes the calibration, calibration_optimization and FDR managers which are used as part of optimization. + + reporter: None | reporting.Pipeline | reporting.Backend + The reporter object used to log information about the optimization process. If None, a new LogBackend object is created. + + """ + self.optimal_parameter = None + self.workflow = workflow + self.reporter = reporting.LogBackend() if reporter is None else reporter + + @abstractmethod + def step(self, precursors_df: pd.DataFrame, fragments_df: pd.DataFrame): + """This method evaluates the progress of the optimization, and either concludes the optimization if it has converged or continues the optimization if it has not. + This method includes the update rule for the optimization. + + Parameters + ---------- + + precursors_df: pd.DataFrame + The precursor dataframe for the search + + fragments_df: pd.DataFrame + The fragment dataframe for the search + + + """ + + pass + + @abstractmethod + def plot(self): + """This method plots relevant information about optimization of the search parameter. This can be overwritten with an empty function if there is nothing to plot.""" + + pass + + +class AutomaticOptimizer(BaseOptimizer): + def __init__( + self, + initial_parameter: float, + workflow, + reporter: None | reporting.Pipeline | reporting.Backend = None, + ): + """This class automatically optimizes the search parameter and stores the progres of optimization in a dataframe, history_df. + + Parameters + ---------- + initial_parameter: float + The parameter used for search in the first round of optimization. + + See base class for other parameters. + + """ + super().__init__(workflow, reporter) + self.history_df = pd.DataFrame() + self.workflow.optimization_manager.fit({self.parameter_name: initial_parameter}) + self.has_converged = False + + def step( + self, + precursors_df: pd.DataFrame, + fragments_df: pd.DataFrame, + current_step: int = -1, + ): + """See base class. The feature is used to track the progres of the optimization (stored in .feature) and determine whether it has converged.""" + if self.has_converged: + self.reporter.log_string( + f"✅ {self.parameter_name:<15}: optimization complete. Optimal parameter {self.workflow.optimization_manager.__dict__[self.parameter_name]} found after {len(self.history_df)} searches.", + verbosity="progress", + ) + return + + new_row = pd.DataFrame( + [ + { + "parameter": self.workflow.optimization_manager.__dict__[ + self.parameter_name + ], + self.feature_name: self._get_feature_value( + precursors_df, fragments_df + ), + "classifier_version": self.workflow.fdr_manager.current_version, + "score_cutoff": self.workflow.optimization_manager.score_cutoff, + "fwhm_rt": self.workflow.optimization_manager.fwhm_rt, + "fwhm_mobility": self.workflow.optimization_manager.fwhm_mobility, + } + ] + ) + self.history_df = pd.concat([self.history_df, new_row], ignore_index=True) + just_converged = self._check_convergence(current_step) + + if just_converged: + self.has_converged = True + + index_of_optimum = self.history_df[self.feature_name].idxmax() + + optimal_parameter = self.history_df["parameter"].loc[index_of_optimum] + classifier_version_at_optimum = self.history_df["classifier_version"].loc[ + index_of_optimum + ] + score_cutoff_at_optimum = self.history_df["score_cutoff"].loc[ + index_of_optimum + ] + fwhm_rt_at_optimum = self.history_df["fwhm_rt"].loc[index_of_optimum] + fwhm_mobility_at_optimum = self.history_df["fwhm_mobility"].loc[ + index_of_optimum + ] + + self.workflow.optimization_manager.fit( + {self.parameter_name: optimal_parameter} + ) + self.workflow.optimization_manager.fit( + {"classifier_version": classifier_version_at_optimum} + ) + self.workflow.optimization_manager.fit( + {"score_cutoff": score_cutoff_at_optimum} + ) + self.workflow.optimization_manager.fit({"fwhm_rt": fwhm_rt_at_optimum}) + self.workflow.optimization_manager.fit( + {"fwhm_mobility": fwhm_mobility_at_optimum} + ) + + self.reporter.log_string( + f"✅ {self.parameter_name:<15}: optimization complete. Optimal parameter {self.workflow.optimization_manager.__dict__[self.parameter_name]:.4f} found after {len(self.history_df)} searches.", + verbosity="progress", + ) + + else: + new_parameter = self._propose_new_parameter( + precursors_df + if self.estimator_group_name == "precursor" + else fragments_df + ) + + self.workflow.optimization_manager.fit({self.parameter_name: new_parameter}) + + self.reporter.log_string( + f"❌ {self.parameter_name:<15}: optimization incomplete after {len(self.history_df)} search(es). Will search with parameter {self.workflow.optimization_manager.__dict__[self.parameter_name]:.4f}.", + verbosity="progress", + ) + + def plot(self): + """Plot the value of the feature used to assess optimization progress against the parameter value, for each value tested.""" + fig, ax = plt.subplots() + + ax.vlines( + x=self.workflow.optimization_manager.__dict__[self.parameter_name], + ymin=0, + ymax=self.history_df[self.feature_name].max(), + color="red", + zorder=0, + label=f"Optimal {self.parameter_name}", + ) + + sns.lineplot( + data=self.history_df, + x="parameter", + y=self.feature_name, + ax=ax, + ) + sns.scatterplot( + data=self.history_df, + x="parameter", + y=self.feature_name, + ax=ax, + ) + + ax.set_xlabel(self.parameter_name) + ax.xaxis.set_inverted(True) + ax.set_ylim(bottom=0, top=self.history_df[self.feature_name].max() * 1.1) + ax.legend(loc="upper left") + + plt.show() + + @abstractmethod + def _propose_new_parameter(self, df): + """This method specifies the rule according to which the search parameter is updated between rounds of optimization. The rule is specific to the parameter being optimized. + + Parameters + ---------- + + df: pd.DataFrame + The dataframe used to update the parameter. This could be the precursor or fragment dataframe, depending on the search parameter being optimized. + + Returns + ------- + float + The proposed new value for the search parameter. + + + """ + pass + + def _check_convergence(self, current_step: int = -1): + """Optimization should stop if continued narrowing of the parameter is not improving the feature value. + This function checks if the previous rounds of optimization have led to a meaningful improvement in the feature value. + If so, it continues optimization and appends the proposed new parameter to the list of parameters. If not, it stops optimization and sets the optimal parameter attribute. + + Notes + ----- + Because the check for an increase in feature value requires two previous rounds, the function will also initialize for another round of optimization if there have been fewer than 3 rounds. + + Parameters + ---------- + current_step: int + The current step in the optimization process. By default it is set to -1, which prevents the optimizer from converging unless min_steps has been set to 0. + + """ + + min_steps_reached = ( + current_step >= self.workflow.config["calibration"]["min_steps"] - 1 + ) + return ( + min_steps_reached + and len(self.history_df) > 2 + and self.history_df[self.feature_name].iloc[-1] + < 1.1 * self.history_df[self.feature_name].iloc[-2] + and self.history_df[self.feature_name].iloc[-1] + < 1.1 * self.history_df[self.feature_name].iloc[-3] + ) + + @abstractmethod + def _get_feature_value( + self, precursors_df: pd.DataFrame, fragments_df: pd.DataFrame + ): + """Each parameter is optimized according to a particular feature. This method gets the value of that feature for a given round of optimization. + + Parameters + ---------- + + precursors_df: pd.DataFrame + The precursor dataframe for the search + + fragments_df: pd.DataFrame + The fragment dataframe for the search + + + """ + pass + + +class TargetedOptimizer(BaseOptimizer): + def __init__( + self, + initial_parameter: float, + target_parameter: float, + workflow, + reporter: None | reporting.Pipeline | reporting.Backend = None, + ): + """This class optimizes the search parameter until it reaches a user-specified target value. + + Parameters + ---------- + + initial_parameter: float + The parameter used for search in the first round of optimization. + + target_parameter: float + Optimization will stop when this parameter is reached. + + See base class for other parameters. + + """ + super().__init__(workflow, reporter) + self.workflow.optimization_manager.fit({self.parameter_name: initial_parameter}) + self.target_parameter = target_parameter + self.has_converged = False + + def _check_convergence(self, proposed_parameter: float, current_step: int = -1): + """The optimization has converged if the proposed parameter is equal to or less than the target parameter and the a sufficient number of steps has been taken. + + Parameters + ---------- + proposed_parameter: float + The proposed parameter for the next round of optimization. + + current_step: int + The current step in the optimization process. By default it is set to -1, which prevents the optimizer from converging unless min_steps has been set to 0. + + Returns + ------- + bool + True if proposed parameter less than target and the current step is greater than the minimum required, False otherwise. + + + """ + + return ( + proposed_parameter <= self.target_parameter + and current_step >= self.workflow.config["calibration"]["min_steps"] - 1 + ) + + def _propose_new_parameter(self, df: pd.DataFrame): + """See base class. The update rule is + 1) calculate the deviation of the predicted mz values from the observed mz values, + 2) take the mean of the endpoints of the central 95% of these deviations, and + 3) take the maximum of this value and the target parameter. + This is implemented by the ci method for the estimator. + """ + return max( + self.workflow.calibration_manager.get_estimator( + self.estimator_group_name, self.estimator_name + ).ci(df, 0.95), + self.target_parameter, + ) + + def step( + self, + precursors_df: pd.DataFrame, + fragments_df: pd.DataFrame, + current_step: int = -1, + ): + """See base class.""" + if self.has_converged: + self.reporter.log_string( + f"✅ {self.parameter_name:<15}: {self.workflow.optimization_manager.__dict__[self.parameter_name]:.4f} <= {self.target_parameter:.4f}", + verbosity="progress", + ) + return + + new_parameter = self._propose_new_parameter( + precursors_df if self.estimator_group_name == "precursor" else fragments_df + ) + just_converged = self._check_convergence(new_parameter, current_step) + self.workflow.optimization_manager.fit({self.parameter_name: new_parameter}) + self.workflow.optimization_manager.fit( + {"classifier_version": self.workflow.fdr_manager.current_version} + ) + + if just_converged: + self.has_converged = True + self.reporter.log_string( + f"✅ {self.parameter_name:<15}: {self.workflow.optimization_manager.__dict__[self.parameter_name]:.4f} <= {self.target_parameter:.4f}", + verbosity="progress", + ) + + else: + self.reporter.log_string( + f"❌ {self.parameter_name:<15}: {self.workflow.optimization_manager.__dict__[self.parameter_name]:.4f} > {self.target_parameter:.4f} or insufficient steps taken.", + verbosity="progress", + ) + + def plot(self): + """See base class. There is nothing of interest to plot here.""" + pass + + +class AutomaticRTOptimizer(AutomaticOptimizer): + def __init__( + self, + initial_parameter: float, + workflow, + reporter: None | reporting.Pipeline | reporting.Backend = None, + ): + """See base class. Optimizes retention time error.""" + self.parameter_name = "rt_error" + self.estimator_group_name = "precursor" + self.estimator_name = "rt" + self.feature_name = "precursor_count" + super().__init__(initial_parameter, workflow, reporter) + + def _propose_new_parameter(self, df: pd.DataFrame): + """See base class. The update rule is + 1) calculate the deviation of the predicted mz values from the observed mz values, + 2) take the mean of the endpoints of the central 99% of these deviations, and + 3) multiply this value by 1.1. + This is implemented by the ci method for the estimator. + + Returns + ------- + float + The proposed new value for the search parameter. + + """ + return 1.1 * self.workflow.calibration_manager.get_estimator( + self.estimator_group_name, self.estimator_name + ).ci(df, 0.99) + + def _get_feature_value( + self, precursors_df: pd.DataFrame, fragments_df: pd.DataFrame + ): + return len(precursors_df) + + +class AutomaticMS2Optimizer(AutomaticOptimizer): + def __init__( + self, + initial_parameter: float, + workflow, + reporter: None | reporting.Pipeline | reporting.Backend = None, + ): + """See base class. This class automatically optimizes the MS2 tolerance parameter by tracking the number of precursor identifications and stopping when further changes do not increase this number.""" + self.parameter_name = "ms2_error" + self.estimator_group_name = "fragment" + self.estimator_name = "mz" + self.feature_name = "precursor_count" + super().__init__(initial_parameter, workflow, reporter) + + def _propose_new_parameter(self, df: pd.DataFrame): + """See base class. The update rule is + 1) calculate the deviation of the predicted mz values from the observed mz values, + 2) take the mean of the endpoints of the central 99% of these deviations, and + 3) multiply this value by 1.1. + This is implemented by the ci method for the estimator. + + Returns + ------- + float + The proposed new value for the search parameter. + + """ + return 1.1 * self.workflow.calibration_manager.get_estimator( + self.estimator_group_name, self.estimator_name + ).ci(df, 0.99) + + def _get_feature_value( + self, precursors_df: pd.DataFrame, fragments_df: pd.DataFrame + ): + return len(precursors_df) + + +class AutomaticMS1Optimizer(AutomaticOptimizer): + def __init__( + self, + initial_parameter: float, + workflow, + reporter: None | reporting.Pipeline | reporting.Backend = None, + ): + """See base class. Optimizes MS1 error.""" + self.parameter_name = "ms1_error" + self.estimator_group_name = "precursor" + self.estimator_name = "mz" + self.feature_name = "mean_isotope_intensity_correlation" + super().__init__(initial_parameter, workflow, reporter) + + def _propose_new_parameter(self, df: pd.DataFrame): + """See base class. The update rule is + 1) calculate the deviation of the predicted mz values from the observed mz values, + 2) take the mean of the endpoints of the central 99% of these deviations, and + 3) multiply this value by 1.1. + This is implemented by the ci method for the estimator. + + + Returns + ------- + float + The proposed new value for the search parameter. + + """ + return 1.1 * self.workflow.calibration_manager.get_estimator( + self.estimator_group_name, self.estimator_name + ).ci(df, 0.99) + + def _get_feature_value( + self, precursors_df: pd.DataFrame, fragments_df: pd.DataFrame + ): + return precursors_df.isotope_intensity_correlation.mean() + + +class AutomaticMobilityOptimizer(AutomaticOptimizer): + def __init__( + self, + initial_parameter: float, + workflow, + reporter: None | reporting.Pipeline | reporting.Backend = None, + ): + """See base class. Optimizes mobility error.""" + self.parameter_name = "mobility_error" + self.estimator_group_name = "precursor" + self.estimator_name = "mobility" + self.feature_name = "precursor_count" + super().__init__(initial_parameter, workflow, reporter) + + def _propose_new_parameter(self, df: pd.DataFrame): + """See base class. The update rule is + 1) calculate the deviation of the predicted mz values from the observed mz values, + 2) take the mean of the endpoints of the central 99% of these deviations, and + 3) multiply this value by 1.1. + This is implemented by the ci method for the estimator. + + Returns + ------- + float + The proposed new value for the search parameter. + + """ + + return 1.1 * self.workflow.calibration_manager.get_estimator( + self.estimator_group_name, self.estimator_name + ).ci(df, 0.99) + + def _get_feature_value( + self, precursors_df: pd.DataFrame, fragments_df: pd.DataFrame + ): + return len(precursors_df) + + +class TargetedRTOptimizer(TargetedOptimizer): + def __init__( + self, + initial_parameter: float, + target_parameter: float, + workflow, + reporter: None | reporting.Pipeline | reporting.Backend = None, + ): + """See base class.""" + self.parameter_name = "rt_error" + self.estimator_group_name = "precursor" + self.estimator_name = "rt" + super().__init__(initial_parameter, target_parameter, workflow, reporter) + + +class TargetedMS2Optimizer(TargetedOptimizer): + def __init__( + self, + initial_parameter: float, + target_parameter: float, + workflow, + reporter: None | reporting.Pipeline | reporting.Backend = None, + ): + """See base class.""" + self.parameter_name = "ms2_error" + self.estimator_group_name = "fragment" + self.estimator_name = "mz" + super().__init__(initial_parameter, target_parameter, workflow, reporter) + + +class TargetedMS1Optimizer(TargetedOptimizer): + def __init__( + self, + initial_parameter: float, + target_parameter: float, + workflow, + reporter: None | reporting.Pipeline | reporting.Backend = None, + ): + """See base class.""" + self.parameter_name = "ms1_error" + self.estimator_group_name = "precursor" + self.estimator_name = "mz" + super().__init__(initial_parameter, target_parameter, workflow, reporter) + + +class TargetedMobilityOptimizer(TargetedOptimizer): + def __init__( + self, + initial_parameter: float, + target_parameter: float, + workflow, + reporter: None | reporting.Pipeline | reporting.Backend = None, + ): + """See base class.""" + self.parameter_name = "mobility_error" + self.estimator_group_name = "precursor" + self.estimator_name = "mobility" + super().__init__(initial_parameter, target_parameter, workflow, reporter) diff --git a/alphadia/workflow/peptidecentric.py b/alphadia/workflow/peptidecentric.py index e2be1a81..cb85d4e1 100644 --- a/alphadia/workflow/peptidecentric.py +++ b/alphadia/workflow/peptidecentric.py @@ -15,9 +15,8 @@ # alphadia imports from alphadia import fragcomp, plexscoring, utils -from alphadia.exceptions import NoRecalibrationTargetError from alphadia.peakgroup import search -from alphadia.workflow import base, manager +from alphadia.workflow import base, manager, optimization logger = logging.getLogger() @@ -124,46 +123,9 @@ def load( f"Initializing workflow {self.instance_name}", verbosity="progress" ) - self.init_calibration_optimization_manager() self.init_fdr_manager() self.init_spectral_library() - @property - def calibration_optimization_manager(self): - """Is used during the iterative optimization of the calibration parameters. - Should not be stored on disk. - """ - return self._calibration_optimization_manager - - @property - def com(self): - """alias for calibration_optimization_manager""" - return self.calibration_optimization_manager - - def init_calibration_optimization_manager(self): - self._calibration_optimization_manager = manager.OptimizationManager( - { - "current_epoch": 0, - "current_step": 0, - "ms1_error": self.config["search_initial"]["initial_ms1_tolerance"], - "ms2_error": self.config["search_initial"]["initial_ms2_tolerance"], - "rt_error": self.config["search_initial"]["initial_rt_tolerance"], - "mobility_error": self.config["search_initial"][ - "initial_mobility_tolerance" - ], - "column_type": "library", - "num_candidates": self.config["search_initial"][ - "initial_num_candidates" - ], - "recalibration_target": self.config["calibration"][ - "recalibration_target" - ], - "accumulated_precursors": 0, - "accumulated_precursors_01FDR": 0, - "accumulated_precursors_001FDR": 0, - } - ) - def init_fdr_manager(self): self.fdr_manager = manager.FDRManager( feature_columns=feature_columns, @@ -302,6 +264,7 @@ def get_exponential_batches(self, step): return int(2**step) def get_batch_plan(self): + """Gets an exponential batch plan based on the batch_size value in the config.""" n_eg = self.spectral_library._precursor_df["elution_group_idx"].nunique() plan = [] @@ -319,121 +282,237 @@ def get_batch_plan(self): return plan - def start_of_calibration(self): - self.batch_plan = self.get_batch_plan() + def get_optimization_lock(self): + """Search parameter optimization (i.e. refinement of tolerances for RT, MS2, etc.) is performed on a subset of the elution groups in the spectral library. + This subset is termed the optimization lock. + The number of elution groups which must be searched to get a sufficiently large number for robust calibration varies depending the library used and the data. + This function searches an increasing number of elution groups until a sufficient number of precursors are identified at 1% FDR and a sufficient number of steps have been taken. + The values deemed sufficient are specified in by "optimization_lock_target" and "optmization_lock_min_steps" in the config. - def start_of_epoch(self, current_epoch): - self.com.current_epoch = current_epoch + Returns + ------- + eg_idxes_for_calibration : np.ndarray + The indices (in .spectral_library._precursor_df) of the precursors which will be used for calibration. - # if self.neptune is not None: - # self.neptune["eval/epoch"].log(current_epoch) + precursor_df : pd.DataFrame + Dataframe of all precursors accumulated during the optimization lock, including q-values from FDR correction. + + fragments_df : pd.DataFrame + Dataframe of all fragments accumulated during the optimization lock, including q-values from FDR correction. + + """ self.elution_group_order = self.spectral_library.precursor_df[ "elution_group_idx" ].unique() np.random.shuffle(self.elution_group_order) - self.calibration_manager.predict( - self.spectral_library._precursor_df, "precursor" - ) - self.calibration_manager.predict(self.spectral_library._fragment_df, "fragment") - - # make updates to the progress dict depending on the epoch - if self.com.current_epoch > 0: - self.com.recalibration_target = self.config["calibration"][ - "recalibration_target" - ] * (1 + current_epoch) + batch_plan = self.get_batch_plan() - def start_of_step(self, current_step, start_index, stop_index): - self.com.current_step = current_step - # if self.neptune is not None: - # self.neptune["eval/step"].log(current_step) + features = [] + fragments = [] + for current_step, (start_index, stop_index) in enumerate(batch_plan): + self.reporter.log_string( + f"=== Step {current_step}, extracting elution groups {start_index} to {stop_index} ===", + verbosity="progress", + ) - # for key, value in self.com.__dict__.items(): - # self.neptune[f"eval/{key}"].log(value) + eg_idxes = self.elution_group_order[start_index:stop_index] + batch_df = self.spectral_library._precursor_df[ + self.spectral_library._precursor_df["elution_group_idx"].isin(eg_idxes) + ] - self.reporter.log_string( - f"=== Epoch {self.com.current_epoch}, step {current_step}, extracting elution groups {start_index} to {stop_index} ===", - verbosity="progress", - ) + feature_df, fragment_df = self.extract_batch(batch_df) + features += [feature_df] + fragments += [fragment_df] + features_df = pd.concat(features) + fragments_df = pd.concat(fragments) - def check_epoch_conditions(self): - continue_calibration = False + self.reporter.log_string( + f"=== Step {current_step}, extracted {len(feature_df)} precursors and {len(fragment_df)} fragments ===", + verbosity="progress", + ) - self.reporter.log_string( - "=== checking if epoch conditions were reached ===", verbosity="info" - ) - if self.dia_data.has_ms1: - if self.com.ms1_error > self.config["search"]["target_ms1_tolerance"]: - self.reporter.log_string( - f"❌ {'ms1_error':<15}: {self.com.ms1_error:.4f} > {self.config['search']['target_ms1_tolerance']}", - verbosity="info", - ) - continue_calibration = True - else: - self.reporter.log_string( - f"✅ {'ms1_error':<15}: {self.com.ms1_error:.4f} <= {self.config['search']['target_ms1_tolerance']}", - verbosity="info", - ) + precursor_df = self.fdr_correction( + features_df, fragments_df, self.optimization_manager.classifier_version + ) - if self.com.ms2_error > self.config["search"]["target_ms2_tolerance"]: self.reporter.log_string( - f"❌ {'ms2_error':<15}: {self.com.ms2_error:.4f} > {self.config['search']['target_ms2_tolerance']}", + f"=== FDR correction performed with classifier version {self.optimization_manager.classifier_version} ===", verbosity="info", ) - continue_calibration = True - else: + + num_precursors_at_01FDR = len(precursor_df[precursor_df["qval"] < 0.01]) + self.reporter.log_string( - f"✅ {'ms2_error':<15}: {self.com.ms2_error:.4f} <= {self.config['search']['target_ms2_tolerance']}", - verbosity="info", + f"=== Checking if minimum number of precursors for optimization found yet; minimum number is {self.config['calibration']['optimization_lock_target']} ===", + verbosity="progress", ) - if self.com.rt_error > self.config["search"]["target_rt_tolerance"]: + self.log_precursor_df(precursor_df) + self.reporter.log_string( - f"❌ {'rt_error':<15}: {self.com.rt_error:.4f} > {self.config['search']['target_rt_tolerance']}", - verbosity="info", + f"=== Classifier has been trained for {self.fdr_manager.current_version + 1} iteration(s); minimum number is {self.config['calibration']['optimization_lock_min_steps']} ===", + verbosity="progress", + ) + + if ( + num_precursors_at_01FDR + > self.config["calibration"]["optimization_lock_target"] + and current_step + >= self.config["calibration"]["optimization_lock_min_steps"] - 1 + ): + final_stop_index = stop_index # final_stop_index is the number of elution groups that will be included in the calibration data + break + + eg_idxes_for_calibration = self.elution_group_order[:final_stop_index] + return eg_idxes_for_calibration, precursor_df, fragments_df + + # self.eg_idxes_for_calibration = self.elution_group_order[:final_stop_index] + # self.optimization_manager.fit({"classifier_version": self.fdr_manager.current_version}) + + def get_ordered_optimizers(self): + """Select appropriate optimizers. Targeted optimization is used if a valid target value (i.e. a number greater than 0) is specified in the config; + if a value less than or equal to 0 is supplied, automatic optimization is used. + Targeted optimizers are run simultaneously; automatic optimizers are run separately in the order MS2, RT, MS1, mobility. + This order is built into the structure of the returned list of lists, ordered_optimizers. + For MS1 and mobility, the relevant optimizer will be excluded from the returned list of lists if it is not present in the data. + + Returns + ------- + ordered_optimizers : list + List of lists of optimizers + + """ + config_search_initial = self.config["search_initial"] + config_search = self.config["search"] + + if config_search["target_ms2_tolerance"] > 0: + ms2_optimizer = optimization.TargetedMS2Optimizer( + config_search_initial["initial_ms2_tolerance"], + config_search["target_ms2_tolerance"], + self, ) - continue_calibration = True else: - self.reporter.log_string( - f"✅ {'rt_error':<15}: {self.com.rt_error:.4f} <= {self.config['search']['target_rt_tolerance']}", - verbosity="info", + ms2_optimizer = optimization.AutomaticMS2Optimizer( + config_search_initial["initial_ms2_tolerance"], + self, ) + if config_search["target_rt_tolerance"] > 0: + rt_optimizer = optimization.TargetedRTOptimizer( + config_search_initial["initial_rt_tolerance"], + config_search["target_rt_tolerance"], + self, + ) + else: + rt_optimizer = optimization.AutomaticRTOptimizer( + config_search_initial["initial_rt_tolerance"], + self, + ) + if self.dia_data.has_ms1: + if config_search["target_ms1_tolerance"] > 0: + ms1_optimizer = optimization.TargetedMS1Optimizer( + config_search_initial["initial_ms1_tolerance"], + config_search["target_ms1_tolerance"], + self, + ) + else: + ms1_optimizer = optimization.AutomaticMS1Optimizer( + config_search_initial["initial_ms1_tolerance"], + self, + ) + else: + ms1_optimizer = None if self.dia_data.has_mobility: - if ( - self.com.mobility_error - > self.config["search"]["target_mobility_tolerance"] - ): - self.reporter.log_string( - f"❌ {'mobility_error':<15}: {self.com.mobility_error:.4f} > {self.config['search']['target_mobility_tolerance']}", - verbosity="info", + if config_search["target_mobility_tolerance"] > 0: + mobility_optimizer = optimization.TargetedMobilityOptimizer( + config_search_initial["initial_mobility_tolerance"], + config_search["target_mobility_tolerance"], + self, ) - continue_calibration = True else: - self.reporter.log_string( - f"✅ {'mobility_error':<15}: {self.com.mobility_error:.4f} <= {self.config['search']['target_mobility_tolerance']}", - verbosity="info", + mobility_optimizer = optimization.AutomaticMobilityOptimizer( + config_search_initial["initial_mobility_tolerance"], + self.calibration_manager, + self.optimization_manager, + self.fdr_manager, ) - - if self.com.current_epoch < self.config["calibration"]["min_epochs"] - 1: - self.reporter.log_string( - f"❌ {'current_epoch':<15}: {self.com.current_epoch} < {self.config['calibration']['min_epochs']}", - verbosity="info", - ) - continue_calibration = True else: - self.reporter.log_string( - f"✅ {'current_epoch':<15}: {self.com.current_epoch} >= {self.config['calibration']['min_epochs']}", - verbosity="info", - ) + mobility_optimizer = None + + optimizers = [ + ms2_optimizer, + rt_optimizer, + ms1_optimizer, + mobility_optimizer, + ] + targeted_optimizers = [ + [ + optimizer + for optimizer in optimizers + if isinstance(optimizer, optimization.TargetedOptimizer) + ] + ] + automatic_optimizers = [ + [optimizer] + for optimizer in optimizers + if isinstance(optimizer, optimization.AutomaticOptimizer) + ] + + ordered_optimizers = ( + targeted_optimizers + automatic_optimizers + if any( + targeted_optimizers + ) # This line is required so no empty list is added to the ordered_optimizers list + else automatic_optimizers + ) + + return ordered_optimizers + + def first_recalibration_and_optimization( + self, + precursor_df: pd.DataFrame, + fragments_df: pd.DataFrame, + ordered_optimizers: list, + ): + """Performs the first recalibration and optimization step. + + Parameters + ---------- + precursor_df : pd.DataFrame + Precursor dataframe from optimization lock + + fragments_df : pd.DataFrame + Fragment dataframe from optimization lock + + ordered_optimizers : list + List of lists of optimizers in correct order + """ + precursor_df_filtered, fragments_df_filtered = self.filter_dfs( + precursor_df, fragments_df + ) + + self.recalibration(precursor_df_filtered, fragments_df_filtered) self.reporter.log_string( - "==============================================", verbosity="info" + "=== Performing initial optimization on extracted data. ===", + verbosity="info", ) - return continue_calibration + + for optimizers in ordered_optimizers: + for optimizer in optimizers: + optimizer.step(precursor_df_filtered, fragments_df_filtered) def calibration(self): + """Performs optimization of the search parameters. This occurs in two stages: + 1) Optimization lock: the data are searched to acquire a locked set of precursors which is used for search parameter optimization. The classifier is also trained during this stage. + 2) Optimization loop: the search parameters are optimized iteratively using the locked set of precursors. + In each iteration, the data are searched with the locked library from stage 1, and the properties -- m/z for both precursors and fragments (i.e. MS1 and MS2), RT and mobility -- are recalibrated. + The optimization loop is repeated for each list of optimizers in ordered_optimizers. + + """ + # First check to see if the calibration has already been performed. Return if so. if ( self.calibration_manager.is_fitted and self.calibration_manager.is_loaded_from_file @@ -444,89 +523,147 @@ def calibration(self): ) return - self.start_of_calibration() - for current_epoch in range(self.config["calibration"]["max_epochs"]): - if self.check_epoch_conditions(): - pass - else: - break + # Get the order of optimization + ordered_optimizers = self.get_ordered_optimizers() + + self.reporter.log_string( + "Starting initial classifier training and precursor identification.", + verbosity="progress", + ) - self.start_of_epoch(current_epoch) + # Get the optimization lock + eg_idxes_for_calibration, precursor_df, fragments_df = ( + self.get_optimization_lock() + ) - features = [] - fragments = [] - for current_step, (start_index, stop_index) in enumerate(self.batch_plan): - self.start_of_step(current_step, start_index, stop_index) + self.optimization_manager.fit( + {"classifier_version": self.fdr_manager.current_version} + ) - eg_idxes = self.elution_group_order[start_index:stop_index] + self.reporter.log_string( + "Required number of precursors found and required number of training iterations performed. Starting search parameter optimization.", + verbosity="progress", + ) + + # Perform a first recalibration on the optimization lock. + self.first_recalibration_and_optimization( + precursor_df, fragments_df, ordered_optimizers + ) + + # Start of optimization/recalibration loop + for optimizers in ordered_optimizers: + for current_step in range(self.config["calibration"]["max_steps"]): + if np.all([optimizer.has_converged for optimizer in optimizers]): + self.reporter.log_string( + f"Optimization finished for {', '.join([optimizer.parameter_name for optimizer in optimizers])}.", + verbosity="progress", + ) + + for optimizer in optimizers: + optimizer.plot() + + break batch_df = self.spectral_library._precursor_df[ self.spectral_library._precursor_df["elution_group_idx"].isin( - eg_idxes + eg_idxes_for_calibration ) ] - feature_df, fragment_df = self.extract_batch(batch_df) - features += [feature_df] - fragments += [fragment_df] - features_df = pd.concat(features) - fragments_df = pd.concat(fragments) + features_df, fragments_df = self.extract_batch(batch_df) self.reporter.log_string( - f"=== Epoch {self.com.current_epoch}, step {current_step}, extracted {len(feature_df)} precursors and {len(fragment_df)} fragments ===", + f"=== Step {current_step}, extracted {len(features_df)} precursors and {len(fragments_df)} fragments ===", verbosity="progress", ) - precursor_df = self.fdr_correction(features_df, fragments_df) - if self.check_recalibration(precursor_df): - self.recalibration(precursor_df, fragments_df) - break - else: - # check if last step has been reached - if current_step == len(self.batch_plan) - 1: - raise NoRecalibrationTargetError() + precursor_df = self.fdr_correction( + features_df, + fragments_df, + self.optimization_manager.classifier_version, + ) - self.end_of_epoch() + self.reporter.log_string( + f"=== FDR correction performed with classifier version {self.optimization_manager.classifier_version} ===", + verbosity="info", + ) - if self.config["calibration"].get("final_full_calibration", False): - self.reporter.log_string( - "Performing final calibration with all precursors", - verbosity="progress", - ) - features_df, fragments_df = self.extract_batch( - self.spectral_library._precursor_df - ) - precursor_df = self.fdr_correction(features_df, fragments_df) - self.recalibration(precursor_df, fragments_df) + self.log_precursor_df(precursor_df) + + precursor_df_filtered, fragments_df_filtered = self.filter_dfs( + precursor_df, fragments_df + ) - self.end_of_calibration() + self.recalibration(precursor_df_filtered, fragments_df_filtered) - def end_of_epoch(self): - pass + self.reporter.log_string( + "=== checking if optimization conditions were reached ===", + verbosity="info", + ) - def end_of_calibration(self): - # self.calibration_manager.predict(self.spectral_library._precursor_df, 'precursor') - # self.calibration_manager.predict(self.spectral_library._fragment_df, 'fragment') - self.calibration_manager.save() - pass + for optimizer in optimizers: + optimizer.step( + precursor_df_filtered, fragments_df_filtered, current_step + ) + + self.reporter.log_string( + "==============================================", verbosity="info" + ) + + self.reporter.log_string( + f"=== Optimization has been performed for {current_step + 1} step(s); minimum number is {self.config['calibration']['min_steps']} ===", + verbosity="progress", + ) - def recalibration(self, precursor_df, fragments_df): + else: + self.reporter.log_string( + "Optimization did not converge within the maximum number of steps, which is {self.config['calibration']['max_steps']}.", + verbosity="warning", + ) + + self.reporter.log_string( + "Search parameter optimization finished. Values taken forward for search are:", + verbosity="progress", + ) + self.reporter.log_string( + "==============================================", verbosity="progress" + ) + for optimizers in ordered_optimizers: + for optimizer in optimizers: + self.reporter.log_string( + f"{optimizer.parameter_name:<15}: {self.optimization_manager.__dict__[optimizer.parameter_name]:.4f}", + verbosity="progress", + ) + self.reporter.log_string( + "==============================================", verbosity="progress" + ) + + def filter_dfs(self, precursor_df, fragments_df): + """Filters precursor and fragment dataframes to extract the most reliable examples for calibration. + + Parameters + ---------- + precursor_df : pd.DataFrame + Precursor dataframe after FDR correction. + + fragments_df : pd.DataFrame + Fragment dataframe. + + Returns + ------- + precursor_df_filtered : pd.DataFrame + Filtered precursor dataframe. Decoy precursors and those found at worse than 1% FDR are removed from the precursor dataframe. + + fragments_df_filtered : pd.DataFrame + Filtered fragment dataframe. Retained fragments must either: + 1) have a correlation greater than 0.7 and belong to the top 5000 fragments sorted by correlation, if there are more than 500 with a correlation greater than 0.7, or + 2) belong to the top 500 fragments sorted by correlation otherwise. + + """ precursor_df_filtered = precursor_df[precursor_df["qval"] < 0.01] precursor_df_filtered = precursor_df_filtered[ precursor_df_filtered["decoy"] == 0 ] - self.calibration_manager.fit( - precursor_df_filtered, - "precursor", - plot=True, - skip=["mz"] if not self.dia_data.has_ms1 else [], - # neptune_run = self.neptune - ) - - rt_99 = self.calibration_manager.get_estimator("precursor", "rt").ci( - precursor_df_filtered, 0.95 - ) - fragments_df_filtered = fragments_df[ fragments_df["precursor_idx"].isin(precursor_df_filtered["precursor_idx"]) ] @@ -548,7 +685,32 @@ def recalibration(self, precursor_df, fragments_df): ) fragments_df_filtered = fragments_df_filtered.iloc[:stop_rank] - print(f"fragments_df_filtered: {len(fragments_df_filtered)}") + self.reporter.log_string( + f"fragments_df_filtered: {len(fragments_df_filtered)}", verbosity="info" + ) + + return precursor_df_filtered, fragments_df_filtered + + def recalibration(self, precursor_df_filtered, fragments_df_filtered): + """Performs recalibration of the the MS1, MS2, RT and mobility properties. Also fits the convolution kernel and the score cutoff. + The calibration manager is used to fit the data and predict the calibrated values. + + Parameters + ---------- + precursor_df_filtered : pd.DataFrame + Filtered precursor dataframe (see filter_dfs) + + fragments_df_filtered : pd.DataFrame + Filtered fragment dataframe (see filter_dfs) + + """ + self.calibration_manager.fit( + precursor_df_filtered, + "precursor", + plot=True, + skip=["mz"] if not self.dia_data.has_ms1 else [], + # neptune_run = self.neptune + ) self.calibration_manager.fit( fragments_df_filtered, @@ -557,49 +719,21 @@ def recalibration(self, precursor_df, fragments_df): # neptune_run = self.neptune ) - m2_99 = self.calibration_manager.get_estimator("fragment", "mz").ci( - fragments_df_filtered, 0.95 + self.calibration_manager.predict( + self.spectral_library._precursor_df, + "precursor", ) - self.com.fit( + self.calibration_manager.predict(self.spectral_library._fragment_df, "fragment") + + self.optimization_manager.fit( { - "ms2_error": max(m2_99, self.config["search"]["target_ms2_tolerance"]), - "rt_error": max(rt_99, self.config["search"]["target_rt_tolerance"]), "column_type": "calibrated", "num_candidates": self.config["search"]["target_num_candidates"], } ) - if self.dia_data.has_ms1: - m1_99 = self.calibration_manager.get_estimator("precursor", "mz").ci( - precursor_df_filtered, 0.95 - ) - self.com.fit( - { - "ms1_error": max( - m1_99, self.config["search"]["target_ms1_tolerance"] - ), - } - ) - - if self.dia_data.has_mobility: - mobility_99 = self.calibration_manager.get_estimator( - "precursor", "mobility" - ).ci(precursor_df_filtered, 0.95) - self.com.fit( - { - "mobility_error": max( - mobility_99, self.config["search"]["target_mobility_tolerance"] - ), - } - ) - - # if self.neptune is not None: - # self.neptune['eval/99_mobility_error'].log(mobility_99) - percentile_001 = np.percentile(precursor_df_filtered["score"], 0.1) - print("score cutoff", percentile_001) - self.optimization_manager.fit( { "fwhm_rt": precursor_df_filtered["cycle_fwhm"].median(), @@ -608,34 +742,7 @@ def recalibration(self, precursor_df, fragments_df): } ) - # if self.neptune is not None: - # precursor_df_fdr = precursor_df_filtered[precursor_df_filtered['qval'] < 0.01] - # self.neptune["eval/precursors"].log(len(precursor_df_fdr)) - # self.neptune['eval/99_ms1_error'].log(m1_99) - # self.neptune['eval/99_ms2_error'].log(m2_99) - # self.neptune['eval/99_rt_error'].log(rt_99) - - def check_recalibration(self, precursor_df): - self.com.accumulated_precursors = len(precursor_df) - self.com.accumulated_precursors_01FDR = len( - precursor_df[precursor_df["qval"] < 0.01] - ) - - self.reporter.log_string( - f"=== checking if recalibration conditions were reached, target {self.com.recalibration_target} precursors ===", - verbosity="progress", - ) - - self.log_precursor_df(precursor_df) - - perform_recalibration = False - - if self.com.accumulated_precursors_01FDR > self.com.recalibration_target: - perform_recalibration = True - - return perform_recalibration - - def fdr_correction(self, features_df, df_fragments): + def fdr_correction(self, features_df, df_fragments, version=-1): return self.fdr_manager.fit_predict( features_df, decoy_strategy="precursor_channel_wise" @@ -646,6 +753,7 @@ def fdr_correction(self, features_df, df_fragments): if self.config["search"]["compete_for_fragments"] else None, dia_cycle=self.dia_data.cycle, + version=version, # neptune_run=self.neptune ) @@ -659,11 +767,11 @@ def extract_batch(self, batch_df, apply_cutoff=False): config.update( { "top_k_fragments": self.config["search_advanced"]["top_k_fragments"], - "rt_tolerance": self.com.rt_error, - "mobility_tolerance": self.com.mobility_error, - "candidate_count": self.com.num_candidates, - "precursor_mz_tolerance": self.com.ms1_error, - "fragment_mz_tolerance": self.com.ms2_error, + "rt_tolerance": self.optimization_manager.rt_error, + "mobility_tolerance": self.optimization_manager.mobility_error, + "candidate_count": self.optimization_manager.num_candidates, + "precursor_mz_tolerance": self.optimization_manager.ms1_error, + "fragment_mz_tolerance": self.optimization_manager.ms2_error, "exclude_shared_ions": self.config["search"]["exclude_shared_ions"], "min_size_rt": self.config["search"]["quant_window"], } @@ -674,14 +782,14 @@ def extract_batch(self, batch_df, apply_cutoff=False): batch_df, self.spectral_library.fragment_df, config.jitclass(), - rt_column=f"rt_{self.com.column_type}", - mobility_column=f"mobility_{self.com.column_type}" + rt_column=f"rt_{self.optimization_manager.column_type}", + mobility_column=f"mobility_{self.optimization_manager.column_type}" if self.dia_data.has_mobility else "mobility_library", - precursor_mz_column=f"mz_{self.com.column_type}" + precursor_mz_column=f"mz_{self.optimization_manager.column_type}" if self.dia_data.has_ms1 else "mz_library", - fragment_mz_column=f"mz_{self.com.column_type}", + fragment_mz_column=f"mz_{self.optimization_manager.column_type}", fwhm_rt=self.optimization_manager.fwhm_rt, fwhm_mobility=self.optimization_manager.fwhm_mobility, ) @@ -710,8 +818,8 @@ def extract_batch(self, batch_df, apply_cutoff=False): config.update( { "top_k_fragments": self.config["search_advanced"]["top_k_fragments"], - "precursor_mz_tolerance": self.com.ms1_error, - "fragment_mz_tolerance": self.com.ms2_error, + "precursor_mz_tolerance": self.optimization_manager.ms1_error, + "fragment_mz_tolerance": self.optimization_manager.ms2_error, "exclude_shared_ions": self.config["search"]["exclude_shared_ions"], "quant_window": self.config["search"]["quant_window"], "quant_all": self.config["search"]["quant_all"], @@ -723,14 +831,14 @@ def extract_batch(self, batch_df, apply_cutoff=False): self.spectral_library._precursor_df, self.spectral_library._fragment_df, config=config, - rt_column=f"rt_{self.com.column_type}", - mobility_column=f"mobility_{self.com.column_type}" + rt_column=f"rt_{self.optimization_manager.column_type}", + mobility_column=f"mobility_{self.optimization_manager.column_type}" if self.dia_data.has_mobility else "mobility_library", - precursor_mz_column=f"mz_{self.com.column_type}" + precursor_mz_column=f"mz_{self.optimization_manager.column_type}" if self.dia_data.has_ms1 else "mz_library", - fragment_mz_column=f"mz_{self.com.column_type}", + fragment_mz_column=f"mz_{self.optimization_manager.column_type}", ) features_df, fragments_df = candidate_scoring( @@ -742,13 +850,9 @@ def extract_batch(self, batch_df, apply_cutoff=False): return features_df, fragments_df def extraction(self): - self.com.fit( + self.optimization_manager.fit( { "num_candidates": self.config["search"]["target_num_candidates"], - "ms1_error": self.config["search"]["target_ms1_tolerance"], - "ms2_error": self.config["search"]["target_ms2_tolerance"], - "rt_error": self.config["search"]["target_rt_tolerance"], - "mobility_error": self.config["search"]["target_mobility_tolerance"], "column_type": "calibrated", } ) @@ -763,7 +867,14 @@ def extraction(self): apply_cutoff=True, ) - precursor_df = self.fdr_correction(features_df, fragments_df) + self.reporter.log_string( + f"=== FDR correction performed with classifier version {self.optimization_manager.classifier_version} ===", + verbosity="info", + ) + + precursor_df = self.fdr_correction( + features_df, fragments_df, self.optimization_manager.classifier_version + ) precursor_df = precursor_df[precursor_df["qval"] <= self.config["fdr"]["fdr"]] diff --git a/alphadia/workflow/searchoptimization.py b/alphadia/workflow/searchoptimization.py deleted file mode 100644 index f4ba0489..00000000 --- a/alphadia/workflow/searchoptimization.py +++ /dev/null @@ -1,368 +0,0 @@ -# native imports -from abc import ABC, abstractmethod - -import numpy as np - -# alpha family imports -# third party imports -import pandas as pd - -# alphadia imports -from alphadia.workflow import manager, reporting - - -class BaseOptimizer(ABC): - def __init__( - self, - calibration_manager: manager.CalibrationManager, - optimization_manager: manager.OptimizationManager, - fdr_manager: manager.FDRManager, - reporter: None | reporting.Pipeline | reporting.Backend = None, - ): - """This class serves as a base class for organizing the search parameter optimization process, which defines the parameters used for search. - - Parameters - ---------- - - calibration_manager: manager.CalibrationManager - The calibration manager for the workflow, which is needed to update the search parameter between rounds of optimization - - optimization_manager: manager.OptimizationManager - The optimization manager for the workflow, which is needed so the optimal parameter and classifier version can be saved to the manager - - fdr_manager: manager.FDRManager - The FDR manager for the workflow, which is needed to update the optimal classifier version in the optimization manager - - """ - self.optimal_parameter = None - self.calibration_manager = calibration_manager - self.optimization_manager = optimization_manager - self.fdr_manager = fdr_manager - self.reporter = reporting.LogBackend() if reporter is None else reporter - - @abstractmethod - def step(self, precursors_df: pd.DataFrame, fragments_df: pd.DataFrame): - """This method evaluates the progress of the optimization, and either concludes the optimization if it has converged or continues the optimization if it has not. - This method includes the update rule for the optimization. - - Parameters - ---------- - - precursors_df: pd.DataFrame - The precursor dataframe for the search - - fragments_df: pd.DataFrame - The fragment dataframe for the search - - - """ - pass - - @abstractmethod - def _update_parameter(self, df): - """This method specifies the rule according to which the search parameter is updated between rounds of optimization. The rule is specific to the parameter being optimized. - - Parameters - ---------- - - df: pd.DataFrame - The dataframe used to update the parameter. This could be the precursor or fragment dataframe, depending on the search parameter being optimized. - - - """ - pass - - @abstractmethod - def _check_convergence(self): - """This method checks if the optimization has converged according to parameter-specific conditions and, if it has, sets the optimal parameter attribute and updates the optimization manager.""" - pass - - def has_converged(self): - """If the optimal parameter has been set, the optimization must have converged and the method returns True. Otherwise, it returns False.""" - return self.optimal_parameter is not None - - -class RTOptimizer(BaseOptimizer): - """TODO Finish this optimizer""" - - def __init__( - self, - initial_parameter: float, - calibration_manager: manager.CalibrationManager, - optimization_manager: manager.OptimizationManager, - fdr_manager: manager.FDRManager, - **kwargs, - ): - """See base class. - - Parameters - ---------- - - initial_parameter: float - The parameter used for search in the first round of optimization. - - """ - super().__init__( - calibration_manager, optimization_manager, fdr_manager, **kwargs - ) - self.parameters = [initial_parameter] - self.feature = [] - - def _check_convergence(self): - """Optimization should stop if continued narrowing of the TODO parameter is not improving the TODO feature value. - This function checks if the previous rounds of optimization have led to a meaningful improvement in the TODO feature value. - If so, it continues optimization and appends the proposed new parameter to the list of parameters. If not, it stops optimization and sets the optimal parameter attribute. - - Notes - ----- - Because the check for an increase in TODO feature value requires two previous rounds, the function will also initialize for another round of optimization if there have been fewer than 3 rounds. - - - """ - - if ( - len(self.feature) > 2 - and self.feature[-1] < 1.1 * self.feature[-2] - and self.feature[-1] < 1.1 * self.feature[-3] - ): - self.optimal_parameter = self.parameters[np.argmax(self.feature)] - - self.optimization_manager.fit({"rt_error": self.optimal_parameter}) - - def _update_parameter(self, df: pd.DataFrame): - """See base class. The update rule is - 1) calculate the deviation of the predicted mz values from the observed mz values, - 2) take the mean of the endpoints of the central 99% of these deviations, and - 3) multiply this value by 1.1. - This is implemented by the ci method for the estimator. - - - """ - proposed_parameter = 1.1 * self.calibration_manager.get_estimator( - "precursor", "rt" - ).ci(df, 0.99) - - return proposed_parameter - - def step(self, precursors_df: pd.DataFrame, fragments_df: pd.DataFrame): - """See base class. The number of precursor identifications is used to track the progres of the optimization (stored in .feature) and determine whether it has converged.""" - if not self.has_converged(): - self.feature.append(len(precursors_df)) - self._check_convergence() - - if self.has_converged(): # Note this may change from the above statement since .optimal_parameter may be set in ._check_convergence - self.reporter.log_string( - f"✅ RT: optimization complete. Optimal parameter {self.optimal_parameter} found after {len(self.parameters)} searches.", - verbosity="progress", - ) - - else: - proposed_parameter = self._update_parameter(precursors_df) - - self.reporter.log_string( - f"❌ RT: optimization incomplete after {len(self.parameters)} search(es). Will search with parameter {proposed_parameter}.", - verbosity="progress", - ) - - self.parameters.append(proposed_parameter) - self.optimization_manager.fit({"rt_error": proposed_parameter}) - - -class MS2Optimizer(BaseOptimizer): - def __init__( - self, - initial_parameter: float, - calibration_manager: manager.CalibrationManager, - optimization_manager: manager.OptimizationManager, - fdr_manager: manager.FDRManager, - **kwargs, - ): - """See base class. - - Parameters - ---------- - initial_parameter: float - The parameter used for search in the first round of optimization. - - - """ - super().__init__( - calibration_manager, optimization_manager, fdr_manager, **kwargs - ) - self.history_df = pd.DataFrame( - columns=["parameter", "precursor_ids", "classifier_version"] - ) - self.proposed_parameter = initial_parameter - - def _check_convergence(self): - """Optimization should stop if continued narrowing of the MS2 parameter is not improving the number of precursor identifications. - This function checks if the previous rounds of optimization have led to a meaningful improvement in the number of identifications. - If so, it continues optimization and appends the proposed new parameter to the list of parameters. If not, it stops optimization and sets the optimal parameter attribute. - - Notes - ----- - Because the check for an increase in identifications requires two previous rounds, the function will also initialize for another round of optimization if there have been fewer than 3 rounds. - - - """ - - if ( - len(self.history_df) > 2 - and self.history_df["precursor_ids"].iloc[-1] - < 1.1 * self.history_df["precursor_ids"].iloc[-2] - and self.history_df["precursor_ids"].iloc[-1] - < 1.1 * self.history_df["precursor_ids"].iloc[-3] - ): - self.optimal_parameter = self.history_df.loc[ - self.history_df["precursor_ids"].idxmax(), "parameter" - ] - self.optimization_manager.fit({"ms2_error": self.optimal_parameter}) - self.optimization_manager.fit( - { - "classifier_version": self.history_df.loc[ - self.history_df["precursor_ids"].idxmax(), "classifier_version" - ] - } - ) - - def _update_parameter(self, df: pd.DataFrame): - """See base class. The update rule is - 1) calculate the deviation of the predicted mz values from the observed mz values, - 2) take the mean of the endpoints of the central 99% of these deviations, and - 3) multiply this value by 1.1. - This is implemented by the ci method for the estimator. - - - """ - proposed_parameter = 1.1 * self.calibration_manager.get_estimator( - "fragment", "mz" - ).ci(df, 0.99) - - return proposed_parameter - - def step(self, precursors_df: pd.DataFrame, fragments_df: pd.DataFrame): - """See base class. The number of precursor identifications is used to track the progres of the optimization (stored in .precursor_ids) and determine whether it has converged.""" - if not self.has_converged(): - new_row = pd.DataFrame( - [ - { - "parameter": float( - self.proposed_parameter - ), # Ensure float dtype - "precursor_ids": int(len(precursors_df)), # Ensure int dtype - "classifier_version": int( - self.fdr_manager.current_version - ), # Ensure int dtype - } - ] - ) - self.history_df = pd.concat([self.history_df, new_row], ignore_index=True) - self._check_convergence() - - if self.has_converged(): # Note this may change from the above statement since .optimal_parameter may be set in ._check_convergence - self.reporter.log_string( - f"✅ MS2: optimization complete. Optimal parameter {self.optimal_parameter} found after {len(self.history_df['parameter'])} searches.", - verbosity="progress", - ) - - else: - self.proposed_parameter = self._update_parameter(fragments_df) - - self.reporter.log_string( - f"❌ MS2: optimization incomplete after {len(self.history_df['parameter'])} search(es). Will search with parameter {self.proposed_parameter}.", - verbosity="progress", - ) - - self.optimization_manager.fit({"ms2_error": self.proposed_parameter}) - - -class MS1Optimizer(BaseOptimizer): - """TODO Finish this optimizer""" - - def __init__( - self, - initial_parameter: float, - calibration_manager: manager.CalibrationManager, - optimization_manager: manager.OptimizationManager, - fdr_manager: manager.FDRManager, - **kwargs, - ): - """See base class. - - Parameters - ---------- - - initial_parameter: float - The parameter used for search in the first round of optimization. - - """ - super().__init__( - calibration_manager, optimization_manager, fdr_manager, **kwargs - ) - self.parameters = [initial_parameter] - self.feature = [] - - def _check_convergence(self): - """Optimization should stop if continued narrowing of the TODO parameter is not improving the TODO feature value. - This function checks if the previous rounds of optimization have led to a meaningful improvement in the TODO feature value. - If so, it continues optimization and appends the proposed new parameter to the list of parameters. If not, it stops optimization and sets the optimal parameter attribute. - - Notes - ----- - Because the check for an increase in TODO feature value requires two previous rounds, the function will also initialize for another round of optimization if there have been fewer than 3 rounds. - - - """ - - if ( - len(self.feature) > 2 - and self.feature[-1] < 1.1 * self.feature[-2] - and self.feature[-1] < 1.1 * self.feature[-3] - ): - self.optimal_parameter = self.parameters[np.argmax(self.feature)] - - self.optimization_manager.fit({"ms1_error": self.optimal_parameter}) - - def _update_parameter(self, df: pd.DataFrame): - """See base class. The update rule is - 1) calculate the deviation of the predicted mz values from the observed mz values, - 2) take the mean of the endpoints of the central 99% of these deviations, and - 3) multiply this value by 1.1. - This is implemented by the ci method for the estimator. - - - """ - proposed_parameter = 1.1 * self.calibration_manager.get_estimator( - "precursor", "mz" - ).ci(df, 0.99) - - return proposed_parameter - - def step(self, precursors_df: pd.DataFrame, fragments_df: pd.DataFrame): - """See base class. The number of precursor identifications is used to track the progres of the optimization (stored in .feature) and determine whether it has converged.""" - if not self.has_converged(): - self.feature.append(len(precursors_df)) - self._check_convergence() - - if self.has_converged(): # Note this may change from the above statement since .optimal_parameter may be set in ._check_convergence - self.reporter.log_string( - f"✅ MS1: optimization complete. Optimal parameter {self.optimal_parameter} found after {len(self.parameters)} searches.", - verbosity="progress", - ) - - else: - proposed_parameter = self._update_parameter(precursors_df) - - self.reporter.log_string( - f"❌ MS1: optimization incomplete after {len(self.parameters)} search(es). Will search with parameter {proposed_parameter}.", - verbosity="progress", - ) - - self.parameters.append(proposed_parameter) - self.optimization_manager.fit({"ms1_error": proposed_parameter}) - - -class MobilityOptimizer(BaseOptimizer): - """TODO: Implement this class. It will be used to optimize the mobility parameter for the search.""" - - pass diff --git a/tests/e2e_tests/e2e_test_cases.yaml b/tests/e2e_tests/e2e_test_cases.yaml index 6d6ddfc8..4b286aa9 100644 --- a/tests/e2e_tests/e2e_test_cases.yaml +++ b/tests/e2e_tests/e2e_test_cases.yaml @@ -99,6 +99,41 @@ test_cases: - BasicStats + - name: astral_automatic_calibration + config: + library_prediction: + predict: true + fixed_modifications: 'Carbamidomethyl@C' + variable_modifications: 'Oxidation@M;Acetyl@Protein N-term' + max_var_mod_num: 2 + missed_cleavages: 1 + precursor_mz: + - 380 + - 980 + nce: 25 + instrument: Lumos + search: + target_num_candidates: 3 + target_ms1_tolerance: -1 + target_ms2_tolerance: -1 + target_rt_tolerance: -1 + search_initial: + initial_num_candidates: 1 + initial_ms1_tolerance: 100 + initial_ms2_tolerance: 100 + initial_rt_tolerance: 600 + search_output: + peptide_level_lfq: true + precursor_level_lfq: true + fasta: + - source_url: https://datashare.biochem.mpg.de/s/WTu3rFZHNeb3uG2/download?files=2024_01_12_human.fasta + raw_data: + - source_url: https://datashare.biochem.mpg.de/s/WTu3rFZHNeb3uG2/download?files=20231024_OA3_TiHe_ADIAMA_HeLa_200ng_Evo01_21min_F-40_iO_before_01.raw + - source_url: https://datashare.biochem.mpg.de/s/WTu3rFZHNeb3uG2/download?files=20231024_OA3_TiHe_ADIAMA_HeLa_200ng_Evo01_21min_F-40_iO_before_01.raw + - source_url: https://datashare.biochem.mpg.de/s/WTu3rFZHNeb3uG2/download?files=20231024_OA3_TiHe_ADIAMA_HeLa_200ng_Evo01_21min_F-40_iO_before_02.raw + metrics: + - BasicStats + # - name: astral_mixed_species # config: # library_prediction: diff --git a/tests/unit_tests/test_workflow.py b/tests/unit_tests/test_workflow.py index 1729a165..b62737a9 100644 --- a/tests/unit_tests/test_workflow.py +++ b/tests/unit_tests/test_workflow.py @@ -1,7 +1,6 @@ import os import shutil import tempfile -from collections import defaultdict from pathlib import Path import numpy as np @@ -13,7 +12,8 @@ from alphadia.calibration.models import LOESSRegression from alphadia.calibration.property import Calibration from alphadia.fdrexperimental import BinaryClassifierLegacyNewBatching -from alphadia.workflow import base, manager, searchoptimization +from alphadia.workflow import base, manager, optimization, peptidecentric, reporting +from alphadia.workflow.config import Config def test_base_manager(): @@ -131,12 +131,24 @@ def calibration_testdata(): rt_library + np.random.normal(0, 0.5, 1000) + np.sin(rt_library * 0.05) ) + mobility_library = np.linspace(0, 100, 1000) + mobility_observed = ( + mobility_library + + np.random.normal(0, 0.5, 1000) + + np.sin(mobility_library * 0.05) + ) + + isotope_intensity_correlation = np.linspace(0, 100, 1000) + return pd.DataFrame( { "mz_library": mz_library, "mz_observed": mz_observed, "rt_library": rt_library, "rt_observed": rt_observed, + "mobility_library": mobility_library, + "mobility_observed": mobility_observed, + "isotope_intensity_correlation": isotope_intensity_correlation, } ).copy() @@ -372,48 +384,388 @@ def test_fdr_manager_fit_predict(): os.remove(temp_path) -def ms2_optimizer_test(): - temp_path = os.path.join(tempfile.tempdir, "calibration_manager.pkl") - calibration_manager = manager.CalibrationManager( - TEST_CONFIG, path=temp_path, load_from_file=False +def create_workflow_instance(): + config_base_path = os.path.join( + Path(__file__).parents[2], "alphadia", "constants", "default.yaml" ) - temp_path = os.path.join(tempfile.tempdir, "optimization_manager.pkl") + config = Config() + config.from_yaml(config_base_path) + config["output"] = tempfile.mkdtemp() + workflow = peptidecentric.PeptideCentricWorkflow( + "test", + config, + ) + workflow.reporter = reporting.Pipeline( + backends=[ + reporting.LogBackend(), + reporting.JSONLBackend(path=workflow.path), + reporting.FigureBackend(path=workflow.path), + ] + ) + workflow._calibration_manager = manager.CalibrationManager( + workflow.config["calibration_manager"], + path=os.path.join(workflow.path, workflow.CALIBRATION_MANAGER_PATH), + load_from_file=workflow.config["general"]["reuse_calibration"], + reporter=workflow.reporter, + ) - optimization_manager = manager.OptimizationManager( - OPTIMIZATION_TEST_DATA, path=temp_path, load_from_file=False + optimization_manager_config = { + "ms1_error": workflow.config["search_initial"]["initial_ms1_tolerance"], + "ms2_error": workflow.config["search_initial"]["initial_ms2_tolerance"], + "rt_error": workflow.config["search_initial"]["initial_rt_tolerance"], + "mobility_error": workflow.config["search_initial"][ + "initial_mobility_tolerance" + ], + "column_type": "library", + "num_candidates": workflow.config["search_initial"]["initial_num_candidates"], + "classifier_version": -1, + "fwhm_rt": workflow.config["optimization_manager"]["fwhm_rt"], + "fwhm_mobility": workflow.config["optimization_manager"]["fwhm_mobility"], + "score_cutoff": workflow.config["optimization_manager"]["score_cutoff"], + } + + workflow._optimization_manager = manager.OptimizationManager( + optimization_manager_config, + path=os.path.join(workflow.path, workflow.OPTIMIZATION_MANAGER_PATH), + load_from_file=workflow.config["general"]["reuse_calibration"], + figure_path=os.path.join(workflow.path, workflow.FIGURE_PATH), + reporter=workflow.reporter, ) - test_fragment_df = calibration_testdata() - calibration_manager.fit(test_fragment_df, "fragment", plot=False) + workflow.init_fdr_manager() - fdr_manager = manager.FDRManager(FDR_TEST_FEATURES, FDR_TEST_BASE_CLASSIFIER) - fdr_manager._num_classifiers = 1 + return workflow + + +def test_automatic_ms2_optimizer(): + workflow = create_workflow_instance() + + calibration_test_df1 = calibration_testdata() + calibration_test_df2 = calibration_testdata() + + workflow.calibration_manager.fit(calibration_test_df2, "fragment", plot=False) + + ms2_optimizer = optimization.AutomaticMS2Optimizer( + 100, + workflow, + ) + + assert ms2_optimizer.has_converged is False + assert ms2_optimizer.parameter_name == "ms2_error" + + workflow.fdr_manager._current_version += 1 + ms2_optimizer.step(calibration_test_df1, calibration_test_df2, current_step=0) + + assert len(ms2_optimizer.history_df) == 1 + + calibration_test_df1 = pd.concat( + [calibration_test_df1, pd.DataFrame(calibration_test_df1.loc[0]).T], + ignore_index=True, + ) + workflow.fdr_manager._current_version += 1 + + assert workflow.optimization_manager.classifier_version == -1 + + ms2_optimizer.step(calibration_test_df1, calibration_test_df2, current_step=1) + + calibration_test_df1 = pd.concat( + [calibration_test_df1, pd.DataFrame(calibration_test_df1.loc[0]).T], + ignore_index=True, + ) + workflow.fdr_manager._current_version += 1 + + assert workflow.optimization_manager.classifier_version == -1 + + ms2_optimizer.step(calibration_test_df1, calibration_test_df2, current_step=2) + + assert ms2_optimizer.has_converged is True + assert ( + ms2_optimizer.history_df.precursor_count == pd.Series([1000, 1001, 1002]) + ).all() + assert ( + workflow.optimization_manager.ms2_error + == ms2_optimizer.history_df.parameter[ + ms2_optimizer.history_df.precursor_count.idxmax() + ] + ) + assert workflow.optimization_manager.classifier_version == 2 + + +def test_automatic_rt_optimizer(): + workflow = create_workflow_instance() + + calibration_test_df1 = calibration_testdata() + calibration_test_df2 = calibration_testdata() + + workflow.calibration_manager.fit(calibration_test_df1, "precursor", plot=False) + + rt_optimizer = optimization.AutomaticRTOptimizer( + 100, + workflow, + ) + + assert rt_optimizer.has_converged is False + assert rt_optimizer.parameter_name == "rt_error" + + workflow.fdr_manager._current_version += 1 + rt_optimizer.step(calibration_test_df1, calibration_test_df2, current_step=0) + + assert len(rt_optimizer.history_df) == 1 + + calibration_test_df1 = pd.concat( + [calibration_test_df1, pd.DataFrame(calibration_test_df1.loc[0]).T], + ignore_index=True, + ) + workflow.fdr_manager._current_version += 1 + + assert workflow.optimization_manager.classifier_version == -1 + + rt_optimizer.step(calibration_test_df1, calibration_test_df2, current_step=1) + + calibration_test_df1 = pd.concat( + [calibration_test_df1, pd.DataFrame(calibration_test_df1.loc[0]).T], + ignore_index=True, + ) + workflow.fdr_manager._current_version += 1 + + assert workflow.optimization_manager.classifier_version == -1 + + rt_optimizer.step(calibration_test_df1, calibration_test_df2, current_step=2) + + assert rt_optimizer.has_converged is True + assert ( + rt_optimizer.history_df.precursor_count == pd.Series([1000, 1001, 1002]) + ).all() + assert ( + workflow.optimization_manager.rt_error + == rt_optimizer.history_df.parameter[ + rt_optimizer.history_df.precursor_count.idxmax() + ] + ) + assert workflow.optimization_manager.classifier_version == 2 + + +def test_automatic_ms1_optimizer(): + workflow = create_workflow_instance() + + calibration_test_df1 = calibration_testdata() + calibration_test_df2 = calibration_testdata() + + workflow.calibration_manager.fit(calibration_test_df1, "precursor", plot=False) + + ms1_optimizer = optimization.AutomaticMS1Optimizer( + 100, + workflow, + ) + + assert ms1_optimizer.has_converged is False + assert ms1_optimizer.parameter_name == "ms1_error" + + workflow.fdr_manager._current_version += 1 + ms1_optimizer.step(calibration_test_df1, calibration_test_df2, current_step=0) + + assert len(ms1_optimizer.history_df) == 1 - test_dict = defaultdict(list) - test_dict["var"] = list(range(100)) + calibration_test_df1 = pd.concat( + [calibration_test_df1, pd.DataFrame(calibration_test_df1.loc[0]).T], + ignore_index=True, + ) + workflow.fdr_manager._current_version += 1 + + assert workflow.optimization_manager.classifier_version == -1 + + ms1_optimizer.step(calibration_test_df1, calibration_test_df2, current_step=1) - ms2_optimizer = searchoptimization.MS2Optimizer( - 100, calibration_manager, optimization_manager, fdr_manager + calibration_test_df1 = pd.concat( + [calibration_test_df1, pd.DataFrame(calibration_test_df1.loc[0]).T], + ignore_index=True, ) + workflow.fdr_manager._current_version += 1 - assert ms2_optimizer.optimal_parameter is None + assert workflow.optimization_manager.classifier_version == -1 - ms2_optimizer.step(pd.DataFrame(test_dict), test_fragment_df) + ms1_optimizer.step(calibration_test_df1, calibration_test_df2, current_step=2) - assert len(ms2_optimizer.parameters) == 2 + assert ms1_optimizer.has_converged is True + assert ( + workflow.optimization_manager.ms1_error + == ms1_optimizer.history_df.parameter[ + ms1_optimizer.history_df.mean_isotope_intensity_correlation.idxmax() + ] + ) + assert workflow.optimization_manager.classifier_version == 0 - test_dict["var"].append(1) - ms2_optimizer.step(pd.DataFrame(test_dict), test_fragment_df) - test_dict["var"].append(1) - ms2_optimizer.step(pd.DataFrame(test_dict), test_fragment_df) +def test_automatic_mobility_optimizer(): + workflow = create_workflow_instance() + + calibration_test_df1 = calibration_testdata() + calibration_test_df2 = calibration_testdata() + + workflow.calibration_manager.fit(calibration_test_df1, "precursor", plot=False) + + mobility_optimizer = optimization.AutomaticMobilityOptimizer( + 100, + workflow, + ) + + assert mobility_optimizer.has_converged is False + assert mobility_optimizer.parameter_name == "mobility_error" + + workflow.fdr_manager._current_version += 1 + mobility_optimizer.step(calibration_test_df1, calibration_test_df2, current_step=0) + + assert len(mobility_optimizer.history_df) == 1 + + calibration_test_df1 = pd.concat( + [calibration_test_df1, pd.DataFrame(calibration_test_df1.loc[0]).T], + ignore_index=True, + ) + workflow.fdr_manager._current_version += 1 + + assert workflow.optimization_manager.classifier_version == -1 + mobility_optimizer.step(calibration_test_df1, calibration_test_df2, current_step=1) + + calibration_test_df1 = pd.concat( + [calibration_test_df1, pd.DataFrame(calibration_test_df1.loc[0]).T], + ignore_index=True, + ) + workflow.fdr_manager._current_version += 1 - assert ms2_optimizer.optimal_parameter is not None - assert ms2_optimizer.precursor_ids == [100, 101, 102] + assert workflow.optimization_manager.classifier_version == -1 + + mobility_optimizer.step(calibration_test_df1, calibration_test_df2, current_step=2) + + assert mobility_optimizer.has_converged is True assert ( - ms2_optimizer.optimal_parameter - == ms2_optimizer.parameters[np.argmax(ms2_optimizer.precursor_ids)] + mobility_optimizer.history_df.precursor_count == pd.Series([1000, 1001, 1002]) + ).all() + assert ( + workflow.optimization_manager.mobility_error + == mobility_optimizer.history_df.parameter[ + mobility_optimizer.history_df.precursor_count.idxmax() + ] + ) + assert workflow.optimization_manager.classifier_version == 2 + + +def test_targeted_ms2_optimizer(): + workflow = create_workflow_instance() + + calibration_test_df1 = calibration_testdata() + calibration_test_df2 = calibration_testdata() + + workflow.calibration_manager.fit(calibration_test_df1, "precursor", plot=False) + + optimizer = optimization.TargetedMS2Optimizer( + 100, + 7, + workflow, ) - assert optimization_manager.ms2_error == ms2_optimizer.optimal_parameter - assert optimization_manager.classifier_version == 0 + + assert optimizer.parameter_name == "ms2_error" + + for current_step in range(workflow.config["calibration"]["min_steps"]): + assert optimizer.has_converged is False + + workflow.fdr_manager._current_version += 1 + optimizer.step( + calibration_test_df1, calibration_test_df2, current_step=current_step + ) + + assert workflow.optimization_manager.classifier_version == current_step + + assert optimizer.has_converged is True + assert workflow.optimization_manager.ms2_error == optimizer.target_parameter + + +def test_targeted_rt_optimizer(): + workflow = create_workflow_instance() + + calibration_test_df1 = calibration_testdata() + calibration_test_df2 = calibration_testdata() + + workflow.calibration_manager.fit(calibration_test_df1, "precursor", plot=False) + + optimizer = optimization.TargetedRTOptimizer( + 100, + 7, + workflow, + ) + + assert optimizer.parameter_name == "rt_error" + + for current_step in range(workflow.config["calibration"]["min_steps"]): + assert optimizer.has_converged is False + + workflow.fdr_manager._current_version += 1 + optimizer.step( + calibration_test_df1, calibration_test_df2, current_step=current_step + ) + + assert workflow.optimization_manager.classifier_version == current_step + + assert optimizer.has_converged is True + assert workflow.optimization_manager.rt_error == optimizer.target_parameter + + +def test_targeted_ms1_optimizer(): + workflow = create_workflow_instance() + + calibration_test_df1 = calibration_testdata() + calibration_test_df2 = calibration_testdata() + + workflow.calibration_manager.fit(calibration_test_df1, "precursor", plot=False) + + optimizer = optimization.TargetedMS1Optimizer( + 100, + 7, + workflow, + ) + + assert optimizer.parameter_name == "ms1_error" + + for current_step in range(workflow.config["calibration"]["min_steps"]): + assert optimizer.has_converged is False + + workflow.fdr_manager._current_version += 1 + optimizer.step( + calibration_test_df1, calibration_test_df2, current_step=current_step + ) + + assert workflow.optimization_manager.classifier_version == current_step + + assert optimizer.has_converged is True + assert workflow.optimization_manager.ms1_error == optimizer.target_parameter + + +def test_targeted_mobility_optimizer(): + workflow = create_workflow_instance() + + calibration_test_df1 = calibration_testdata() + calibration_test_df2 = calibration_testdata() + + workflow.calibration_manager.fit(calibration_test_df1, "precursor", plot=False) + + optimizer = optimization.TargetedMobilityOptimizer( + 100, + 7, + workflow, + ) + + assert optimizer.parameter_name == "mobility_error" + + for current_step in range(workflow.config["calibration"]["min_steps"]): + assert optimizer.has_converged is False + + workflow.fdr_manager._current_version += 1 + optimizer.step( + calibration_test_df1, calibration_test_df2, current_step=current_step + ) + + assert workflow.optimization_manager.classifier_version == current_step + + assert optimizer.has_converged is True + + assert workflow.optimization_manager.mobility_error == optimizer.target_parameter