diff --git a/qiskit_experiments/curve_analysis/__init__.py b/qiskit_experiments/curve_analysis/__init__.py index 5053735859..b389e5d97e 100644 --- a/qiskit_experiments/curve_analysis/__init__.py +++ b/qiskit_experiments/curve_analysis/__init__.py @@ -28,6 +28,9 @@ CurveAnalysis SeriesDef CurveData + FitModel + SingleFitFunction + CompositeFitFunction FitData ParameterRepr FitOptions @@ -112,7 +115,18 @@ is_error_not_significant """ from .curve_analysis import CurveAnalysis, is_error_not_significant -from .curve_data import CurveData, SeriesDef, FitData, ParameterRepr, FitOptions +from .curve_data import ( + CurveData, + SeriesDef, + FitData, + ParameterRepr, + FitOptions, +) +from .fit_models import ( + FitModel, + SingleFitFunction, + CompositeFitFunction, +) from .curve_fit import ( curve_fit, multi_curve_fit, diff --git a/qiskit_experiments/curve_analysis/curve_analysis.py b/qiskit_experiments/curve_analysis/curve_analysis.py index 573db1b7ce..254ffef40b 100644 --- a/qiskit_experiments/curve_analysis/curve_analysis.py +++ b/qiskit_experiments/curve_analysis/curve_analysis.py @@ -16,9 +16,6 @@ # pylint: disable=invalid-name import copy -import dataclasses -import functools -import inspect import warnings from abc import ABC from typing import Any, Dict, List, Tuple, Callable, Union, Optional @@ -26,16 +23,16 @@ import numpy as np import uncertainties from uncertainties import unumpy as unp +from scipy import optimize as opt from qiskit.providers import Backend from qiskit_experiments.curve_analysis.curve_data import ( CurveData, - SeriesDef, FitData, ParameterRepr, FitOptions, ) -from qiskit_experiments.curve_analysis.curve_fit import multi_curve_fit +from qiskit_experiments.curve_analysis.fit_models import FitModel, CompositeFitFunction from qiskit_experiments.curve_analysis.data_processing import multi_mean_xy_data, data_sort from qiskit_experiments.curve_analysis.visualization import FitResultPlotters, PlotterStyle from qiskit_experiments.data_processing import DataProcessor @@ -234,6 +231,21 @@ class AnalysisExample(CurveAnalysis): #: List[SeriesDef]: List of mapping representing a data series __series__ = list() + # Automatically generated fitting functions of child class + _cls_fit_model = None + + def __init_subclass__(cls, **kwargs): + """Parse series definition of subclass and set fit function and signature. + + This initializes the function(s) to which the data is fit: + The fit model is created only once when the sub-class is initialized. + This removes the overhead of instantiating the same fit model object multiple times. + This may occur in, e.g., parallel experiments where the curve analysis subclass is + instantiated multiple times. + """ + super().__init_subclass__(**kwargs) + cls._cls_fit_model = FitModel.from_definitions(cls.__series__) + def __init__(self): """Initialize data fields that are privately accessed by methods.""" super().__init__() @@ -252,6 +264,14 @@ def __init__(self): p: self.options.get(p, None) for p in self.__fixed_parameters__ } + # Create fit model for this instance. Load parsed series data from the class. + # The model is copied because it can be modified, i.e. parameter binding. + self._fit_model = self._cls_fit_model.copy() + + # Assign class default fixed parameters + if self.options.fixed_parameters: + self._fit_model.bind_parameters(**self.options.fixed_parameters) + #: Dict[str, Any]: Experiment metadata self.__experiment_metadata = None @@ -261,42 +281,132 @@ def __init__(self): #: Backend: backend object used for experimentation self.__backend = None - @classmethod - def _fit_params(cls) -> List[str]: - """Return a list of fitting parameters. + @staticmethod + def curve_fit( + func: FitModel, + xdata: np.ndarray, + ydata: np.ndarray, + sigma: np.ndarray, + p0: Dict[str, float], + bounds: Dict[str, Tuple[float, float]], + **kwargs, + ) -> FitData: + """Perform curve fitting. + + This is the scipy curve fit wrapper to manage named fit parameters and + return outcomes as ufloat objects with parameter correlation computed based on the + covariance matrix obtained from the fitting. The result is returned as + :class:`~qiskit_experiments.curve_analysis.FitData` which is a special data container + for curve analysis. This method can perform multi-objective optimization with + multiple data series with related fit models. + + Args: + func: A fit model that may consist of multiple curves. + xdata: Numpy array representing X values. + ydata: Numpy array representing Y values. + sigma: Numpy array representing standard error of Y values. + p0: Dictionary of initial guesses for given fit function. + bounds: Dictionary of parameter boundary for given fit function. + **kwargs: Solver options. Returns: - A list of fit parameter names. + Fit result. Raises: - AnalysisError: When series definitions have inconsistent multi-objective fit function. - ValueError: When fixed parameter name is not used in the fit function. + AnalysisError: When invalid fit function is provided. + AnalysisError: When number of data points is too small. + AnalysisError: When curve fitting does not converge. """ - fsigs = set() - for series_def in cls.__series__: - fsigs.add(inspect.signature(series_def.fit_func)) - if len(fsigs) > 1: + if not isinstance(func, FitModel): + raise AnalysisError( + "CurveAnalysis subclass requires `func` of FitModel instance to perform fitting. " + "This SciPy fit wrapper requires .signature that returns a list of fit parameters " + "for name-based parameter mapping, which is not available in a standard callable." + ) + + lower = [bounds[p][0] for p in func.signature] + upper = [bounds[p][1] for p in func.signature] + scipy_bounds = (lower, upper) + scipy_p0 = list(p0.values()) + + dof = len(ydata) - len(func.signature) + if dof < 1: raise AnalysisError( - "Fit functions specified in the series definition have " - "different function signature. They should receive " - "the same parameter set for multi-objective function fit." + "The number of degrees of freedom of the fit data and model " + " (len(ydata) - len(p0)) is less than 1" + ) + + if np.any(np.nan_to_num(sigma) == 0): + # Sigma = 0 causes zero division error + sigma = None + else: + if "absolute_sigma" not in kwargs: + kwargs["absolute_sigma"] = True + + try: + # pylint: disable = unbalanced-tuple-unpacking + popt, pcov = opt.curve_fit( + func, + xdata, + ydata, + sigma=sigma, + p0=scipy_p0, + bounds=scipy_bounds, + **kwargs, ) + except Exception as ex: + raise AnalysisError( + "scipy.optimize.curve_fit failed with error: {}".format(str(ex)) + ) from ex + + # Compute outcome with errors correlation + if np.isfinite(pcov).all(): + # Keep parameter correlations in following analysis steps + fit_params = uncertainties.correlated_values(nom_values=popt, covariance_mat=pcov) + else: + # Ignore correlations, add standard error if finite. + fit_params = [ + uncertainties.ufloat(nominal_value=n, std_dev=s if np.isfinite(s) else np.nan) + for n, s in zip(popt, np.sqrt(np.diag(pcov))) + ] + + # Calculate the reduced chi-squared for fit + yfits = func(xdata, *popt) + residues = (yfits - ydata) ** 2 + if sigma is not None: + residues = residues / (sigma**2) + reduced_chisq = np.sum(residues) / dof + + # Compute data range for fit + xdata_range = np.min(xdata), np.max(xdata) + ydata_range = np.min(ydata), np.max(ydata) + + return FitData( + popt=list(fit_params), + popt_keys=func.signature, + pcov=pcov, + reduced_chisq=reduced_chisq, + dof=dof, + x_range=xdata_range, + y_range=ydata_range, + fit_model=func.fit_model, + ) - # remove the first function argument. this is usually x, i.e. not a fit parameter. - return list(list(fsigs)[0].parameters.keys())[1:] + @property + def fit_model(self) -> FitModel: + """Return a fit model for this analysis instance.""" + return self._fit_model @property def parameters(self) -> List[str]: """Return parameters of this curve analysis.""" - return [s for s in self._fit_params() if s not in self.options.fixed_parameters] + return self._fit_model.signature @classmethod def _default_options(cls) -> Options: """Return default analysis options. Analysis Options: - curve_fitter (Callable): A callback function to perform fitting with formatted data. - See :func:`~qiskit_experiments.analysis.multi_curve_fit` for example. data_processor (Callable): A callback function to format experiment data. This can be a :class:`~qiskit_experiments.data_processing.DataProcessor` instance that defines the `self.__call__` method. @@ -348,7 +458,6 @@ def _default_options(cls) -> Options: """ options = super()._default_options() - options.curve_fitter = multi_curve_fit options.data_processor = None options.normalization = False options.x_key = "xval" @@ -372,6 +481,27 @@ def _default_options(cls) -> Options: return options + def set_options(self, **fields): + """Set the analysis options for :meth:`run` method. + + Args: + fields: The fields to update the options + + Raises: + KeyError: When removed option ``curve_fitter`` is set. + """ + # TODO remove this in Qiskit Experiments v0.4 + if "curve_fitter" in fields: + raise KeyError( + "Option curve_fitter has been removed. Please directly override curve_fit method." + ) + + if "fixed_parameters" in fields: + # User can update fixed parameter via analysis options. + self._fit_model.bind_parameters(**fields["fixed_parameters"]) + + super().set_options(**fields) + def _generate_fit_guesses(self, user_opt: FitOptions) -> Union[FitOptions, List[FitOptions]]: """Create algorithmic guess with analysis options and curve data. @@ -764,30 +894,6 @@ def _data( def _run_analysis( self, experiment_data: ExperimentData ) -> Tuple[List[AnalysisResultData], List["pyplot.Figure"]]: - # - # 1. Parse arguments - # - - # Update all fit functions in the series definitions if fixed parameter is defined. - assigned_params = self.options.fixed_parameters - - if assigned_params: - # Check if all parameters are assigned. - if any(v is None for v in assigned_params.values()): - raise AnalysisError( - f"Unassigned fixed-value parameters for the fit " - f"function {self.__class__.__name__}." - f"All values of fixed-parameters, i.e. {assigned_params}, " - "must be provided by the analysis options to run this analysis." - ) - - # Override series definition with assigned fit functions. - assigned_series = [] - for series_def in self.__series__: - dict_def = dataclasses.asdict(series_def) - dict_def["fit_func"] = functools.partial(series_def.fit_func, **assigned_params) - assigned_series.append(SeriesDef(**dict_def)) - self.__series__ = assigned_series # get experiment metadata try: @@ -803,7 +909,7 @@ def _run_analysis( pass # - # 2. Setup data processor + # 1. Setup data processor # # If no data processor was provided at run-time we infer one from the job @@ -818,12 +924,12 @@ def _run_analysis( data_processor.train(data=experiment_data.data()) # - # 3. Extract curve entries from experiment data + # 2. Extract curve entries from experiment data # self._extract_curves(experiment_data=experiment_data, data_processor=data_processor) # - # 4. Run fitting + # 3. Run fitting # formatted_data = self._data(label="fit_ready") @@ -839,13 +945,16 @@ def _run_analysis( if isinstance(fit_options, FitOptions): fit_options = [fit_options] + # Assign data allocation if the model consists of multiple curves + if isinstance(self._fit_model, CompositeFitFunction): + self._fit_model.data_allocation = formatted_data.data_index + # Run fit for each configuration fit_results = [] for fit_opt in set(fit_options): try: - fit_result = self.options.curve_fitter( - funcs=[series_def.fit_func for series_def in self.__series__], - series=formatted_data.data_index, + fit_result = self.curve_fit( + func=self._fit_model, xdata=formatted_data.x, ydata=formatted_data.y, sigma=formatted_data.y_err, @@ -871,18 +980,13 @@ def _run_analysis( fit_result = self._post_process_fit_result(fit_result) # - # 5. Create database entry + # 4. Create database entry # analysis_results = [] if fit_result: # pylint: disable=assignment-from-none quality = self._evaluate_quality(fit_data=fit_result) - fit_models = { - series_def.name: series_def.model_description or "no description" - for series_def in self.__series__ - } - # overview entry analysis_results.append( AnalysisResultData( @@ -894,7 +998,7 @@ def _run_analysis( "popt_keys": fit_result.popt_keys, "dof": fit_result.dof, "covariance_mat": fit_result.pcov, - "fit_models": fit_models, + "fit_model": fit_result.fit_model, **self.options.extra, }, ) @@ -914,11 +1018,10 @@ def _run_analysis( unit = None fit_val = fit_result.fitval(p_name) + + metadata = copy.copy(self.options.extra) if unit: - metadata = copy.copy(self.options.extra) metadata["unit"] = unit - else: - metadata = self.options.extra result_entry = AnalysisResultData( name=p_repr, @@ -953,7 +1056,7 @@ def _run_analysis( analysis_results.append(raw_data_entry) # - # 6. Create figures + # 5. Create figures # if self.options.plot: fit_figure = FitResultPlotters[self.options.curve_plotter].value.draw( @@ -969,6 +1072,7 @@ def _run_analysis( "ylim": self.options.ylim, }, fit_data=fit_result, + fix_parameters=self._fit_model._fixed_params, result_entries=analysis_results, style=self.options.style, axis=self.options.axis, @@ -1008,6 +1112,25 @@ def from_config(cls, config: Union[AnalysisConfig, Dict]) -> "CurveAnalysis": return instance + def __getstate__(self): + """Remove entries from :code:`self.__dict__` that cannot be pickled. + + Instances of ``CurveAnalysis`` must be serializable. However, the fit model they contain, i.e. + `self._fit_model` has fit functions which are typically defined as lambda functions. These + lambda functions are not serializable. Therefore, to make them serializable with pickle we + remove them from the state dictionary. Deserialization reconstructs the fit model from + the class fit model and the fixed parameters in the options (see ``__setstate__``). + """ + state = self.__dict__.copy() + del state["_fit_model"] + return state + + def __setstate__(self, state): + """Reconstruct fit model from the class attribute.""" + self.__dict__.update(state) + self._fit_model = self._cls_fit_model.copy() + self._fit_model.bind_parameters(**self.options.fixed_parameters) + def is_error_not_significant( val: Union[float, uncertainties.UFloat], diff --git a/qiskit_experiments/curve_analysis/curve_data.py b/qiskit_experiments/curve_analysis/curve_data.py index 294a74f5ac..524fac33f6 100644 --- a/qiskit_experiments/curve_analysis/curve_data.py +++ b/qiskit_experiments/curve_analysis/curve_data.py @@ -15,6 +15,7 @@ """ import dataclasses +import inspect from typing import Any, Dict, Callable, Union, List, Tuple, Optional, Iterable import numpy as np @@ -47,6 +48,20 @@ class SeriesDef: # Index of canvas if the result figure is multi-panel canvas: Optional[int] = None + # Automatically extracted signature of the fit function + signature: List[str] = dataclasses.field(init=False) + + def __post_init__(self): + """Parse the fit function signature to extract the names of the variables. + + Fit functions take arguments F(x, p0, p1, p2, ...) thus the first value should be excluded. + """ + signature = list(inspect.signature(self.fit_func).parameters.keys()) + fitparams = signature[1:] + + # Note that this dataclass is frozen + object.__setattr__(self, "signature", fitparams) + @dataclasses.dataclass(frozen=True) class CurveData: @@ -99,6 +114,9 @@ class FitData: # Y data range y_range: Tuple[float, float] + # String representation of fit model + fit_model: str = "not defined" + def fitval(self, key: str) -> uncertainties.UFloat: """A helper method to get fit value object from parameter key name. diff --git a/qiskit_experiments/curve_analysis/fit_models.py b/qiskit_experiments/curve_analysis/fit_models.py new file mode 100644 index 0000000000..5ba70234d2 --- /dev/null +++ b/qiskit_experiments/curve_analysis/fit_models.py @@ -0,0 +1,339 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2021. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. +"""Fit models that are used for curve fitting.""" + +from abc import ABC, abstractmethod +from typing import Callable, List, Optional, Union + +import numpy as np + +from qiskit_experiments.exceptions import AnalysisError +from qiskit_experiments.curve_analysis.curve_data import SeriesDef + + +class FitModel(ABC): + r"""Base class of fit models. + + This is a function-like object that implements a fit model as a ``__call__`` magic method, + thus it behaves like a Python function that the SciPy curve_fit solver accepts. + Note that the fit function there only accepts variadic arguments. + + This class ties together the fit function and associated parameter names to + perform correct parameter mapping among multiple objective functions with different signatures, + in which some parameters may be excluded from the fitting when their values are fixed. + + Examples: + + We assume a model with two functions :math:`F_1(x_1, p_0, p_1, p_2)` and + :math:`F_2(x_2, p_0, p_3)`. + During the fit, we assign :math:`p_1=2` and exclude it from the fitting. + The parameters of this model are described by the sets + + .. math:: + + \Theta_1 = \{ p_0, p_1, p_2 \}, \Theta_2 = \{p_0, p_3\}, \Theta_{\rm fix} = \{p_1\} + + The corresponding :class:`FitModel` subclass is instantiated with a list ``fit_functions`` + containing the :math:`F_1` and :math:`F_2` functions together with + a list ``signatures`` containing :math:`\Theta_1` and :math:`\Theta_2`. The parameters + with fixed values :math:`\Theta_{\rm fix}` are removed from the signature using the + :meth:`bind_parameters` method. The signature of new fit model instance will be + + .. math:: + + \Theta = (\Theta_1 \cup \Theta_2) - \Theta_{\rm fix} = \{ p_0, p_2, p_3\}. + + The fit function that this model provides is therefore + + .. math:: + + F(x, \Theta) = F_1(x_0, \Theta_1) \oplus F_2(x_1, \Theta_2) \ + = F(x_0 \oplus x_1, p_0, p_2, p_3). + + This function might be called from the scipy curve fit algorithm + which only takes variadic arguments (i.e. agnostic to parameter names). + + .. math:: + + F(x, {\rm *args}) = F(x,\bar{p}_0, \bar{p}_1, \bar{p}_2) + + The fit model internally maps :math:`\bar{p}_0 \rightarrow p_0`, + :math:`\bar{p}_1 \rightarrow p_2`, and :math:`\bar{p}_2 \rightarrow p_3` + while assigning :math:`p_1=2` when its called from the curve fitting algorithm. + Note that this mapping is performed in the ``__call__`` method. + The function signature :math:`\Theta` is provided with the property :attr:`signature`. + + Notes: + + This class is usually instantiated with the :class:`SeriesDef` in the + ``__init_subclass__`` method of :class:`CurveAnalysis` subclasses. + Users do not need to know how to initialize this class + unless they manually create the instance for debugging purposes. + """ + + def __init__( + self, + fit_functions: List[Callable], + signatures: List[List[str]], + fit_models: Optional[Union[List[str], str]] = None, + ): + """Create new fit model. + + Args: + fit_functions: List of callables that forms the fit model for a + particular curve analysis class. It may consist of multiple curves + which are defined in :attr:`CurveAnalysis.__series__`. + signatures: List of argument names that each fit function callable takes. + The length of the list should be identical to the ``fit_functions``. + fit_models: String representation of fit functions. + Because this is just a metadata, the format of input value doesn't matter. + It may be a single string description for the entire fit model, or + list of descriptions for each fit function. If not provided, + "not defined" is stored in the experiment result metadata. + + Raises: + AnalysisError: When ``fit_functions`` and ``signatures`` have a different length. + """ + if len(fit_functions) != len(signatures): + raise AnalysisError("Different numbers of fit_functions and signatures are given.") + + self._fit_functions = fit_functions + self._signatures = signatures + + # String representation of the fit model. This is stored as a list of string. + if not fit_models or isinstance(fit_models, str): + fit_models = [fit_models] + self._fit_models = fit_models + + # Create the signature of the fit model, i.e. the signature of ``__call__`` for scipy. + # The individual curves comprising this model may have different signatures. + # The signature of this fit model is the union of the parameters in all curves. + # This is order preserving since this affects the index of ``popt`` that scipy fitter + # returns, which appears as @Parameters entry of curve analysis as-is. + union_params = [] + for signature in signatures: + for parameter in signature: + if parameter not in union_params: + union_params.append(parameter) + self._union_params = union_params + + # This is set by users. + self._fixed_params = {} + + @abstractmethod + def __call__(self, x: np.ndarray, *params) -> np.ndarray: + """Compute values of fit functions. + + Args: + x: Input X values array. + *params: Variadic argument provided from the fitter. + + Returns: + Computed Y values array. + """ + pass + + @classmethod + def from_definitions( + cls, series_defs: List[SeriesDef] + ) -> Union["SingleFitFunction", "CompositeFitFunction"]: + """Create fit model from series definitions. + + Args: + series_defs: Series definitions that define a set of fit functions. + + Returns: + Fit model. + """ + fit_functions = [] + signatures = [] + fit_models = [] + for series_def in series_defs: + fit_functions.append(series_def.fit_func) + signatures.append(series_def.signature) + fit_models.append(series_def.model_description) + + if len(series_defs) == 1: + return SingleFitFunction(fit_functions, signatures, fit_models) + return CompositeFitFunction(fit_functions, signatures, fit_models) + + def bind_parameters(self, **kwparams): + """Assign values to the fixed parameters. + + Args: + kwparams: Dictionary of parameters that are excluded from the fitting. + Every parameter, i.e. dictionary key, should be defined in the fit model. + + Raises: + AnalysisError: When parameter name is not defined in the fit model. + """ + if any(k not in self._union_params for k in kwparams): + raise AnalysisError( + f"Fixed parameters {', '.join(kwparams.keys())} are not all defined in the " + f"fit model {', '.join(self._union_params)}." + ) + self._fixed_params = kwparams + + @property + def signature(self) -> List[str]: + """Return signature of this fit model.""" + return [p for p in self._union_params if p not in self._fixed_params] + + @property + def fit_model(self) -> str: + """Return fit models.""" + if any(f is None for f in self._fit_models): + return "not defined" + return ",".join(self._fit_models) + + def copy(self): + """Return copy of this function.""" + instance = self.__class__( + fit_functions=self._fit_functions, + signatures=self._signatures, + fit_models=self._fit_models, + ) + + if self._fixed_params: + instance.bind_parameters(**self._fixed_params.copy()) + + return instance + + def __repr__(self): + sigrepr = ", ".join(self.signature) + if self._fixed_params: + fixrepr = ", ".join(self._fixed_params.keys()) + return f"{self.__class__.__name__}(x, {sigrepr}; @ Fixed {fixrepr})" + return f"{self.__class__.__name__}(x, {sigrepr})" + + +class SingleFitFunction(FitModel): + r"""Fit model consisting of a single curve. + + This model is created when only a single curve exists in the fit model. + + .. math:: + + F(x, \Theta) = f(x, \vec{p}) + + The parameter :math:`\vec{p} = \Theta \cup \Theta_{\rm fix}` which is a union of + the fit parameters and the fixed parameters :math:`\Theta_{\rm fix}`. + The function :math:`f` is usually set by :attr:`SeriesDef.fit_func` which is + a standard python function. + + .. seealso:: + + Class :class:`FitModel`. + """ + + def __call__(self, x: np.ndarray, *params) -> np.ndarray: + kwparams = dict(zip(self.signature, params)) + kwparams.update(self._fixed_params) + + return self._fit_functions[0](x, **{p: kwparams[p] for p in self._signatures[0]}) + + +class CompositeFitFunction(FitModel): + r"""Fit model consisting of multiple curves sharing fit parameters. + + This model is created when multiple curves form a fit model. + + .. math:: + + F(X, \Theta) = f_0(\vec{x}_0, \vec{p}_0) \oplus f_1(\vec{x}_1, \vec{p}_1) \oplus ..., + + here the function :math:`f_i(\vec{x}_i, \vec{p}_i)` is applied to the data with the sequence + of x-values :math:`\vec{x}_i \in \Re^{N_i}`, which are provided by the :math:`i`-th subset + of experiments, :math:`E_i(\vec{x}_i) = \{E_i(x_0), E_i(x_1), ... E_i(x_{N_i-1})\}`, + together with the measured outcomes :math:`\vec{y}_i \in \Re^{N_i}` that are fit by this model. + The size of vector :math:`N_i` may depend on the configuration of experiment :math:`i`. + + The parameter :math:`\vec{p}_i = \theta_i \cup \Theta_{\rm fix}` is a union of the + fit parameter for the function :math:`f_i` and the fixed parameters :math:`\Theta_{\rm fix}`. + The composite function :math:`F` consists of multiple fit functions :math:`f_i` + taking independent data points :math:`\vec{x}_i` with + partly shared fit parameters :math:`\theta_i`, + where :math:`\Theta = \theta_0 \cup \theta_1 \cup ...`. + The composite scan values is :math:`X =\vec{x}_0 \oplus \vec{x}_1 \oplus ...` and + the corresponding outcome is :math:`Y =\vec{y}_0 \oplus \vec{y}_1 \oplus ...`. + + In the Qiskit Experiments, these data sources are represented by + a single 1D array ``vec[k]``, rather than a 2D array ``mat[i, j]``. + To keep the mapping of the datum at index :math:`k` to the original series :math:`i`, + an extra index vector ``data_allocation`` :math:`I` must be + provided with :math:`X` and :math:`Y`. + + For example, we assume following data are obtained with two experiments :math:`E_0, E_1`. + + .. code-block:: python3 + + # From E0 + x_0 = array([1, 2, 3]) + y_0 = array([4, 5, 6]) + + # From E1 + x_1 = array([4, 5]) + y_1 = array([7, 8]) + + The composite data :math:`(X, Y, I)` might take the form: + + .. code-block:: python3 + + X = array([1, 4, 2, 5, 3]) + Y = array([4, 7, 5, 8, 6]) + I = array([0, 1, 0, 1, 0]) + + With this data representation, we can reconstruct each subset as + + .. code-block:: python3 + + # To E0 subset + assert all(X[I == 0] == x_0) + assert all(Y[I == 0] == y_0) + + # To E1 subset + assert all(X[I == 1] == x_1) + assert all(Y[I == 1] == y_1) + + The caller of this model must set this data indices before calling the function. + With this data representation, we can reuse the fitting algorithm for the + single objective function, where only 1-D arrays are accepted, + for the multi-objective optimization model consisting of multiple data set. + + .. seealso:: + + Class :class:`FitModel`. + """ + + def __init__( + self, + fit_functions: List[Callable], + signatures: List[List[str]], + fit_models: Optional[List[str]] = None, + ): + super().__init__(fit_functions, signatures, fit_models) + + # This attribute is set by users or another function that calls this model. + # The existence of this value is not checked within the __call__ for performance, + # but one must guarantee this is assigned before the model is called. + self.data_allocation = None + + def __call__(self, x: np.ndarray, *params) -> np.ndarray: + kwparams = dict(zip(self.signature, params)) + kwparams.update(self._fixed_params) + + y = np.zeros(x.size, dtype=float) + for i, (func, sig) in enumerate(zip(self._fit_functions, self._signatures)): + inds = self.data_allocation == i + y[inds] = func(x[inds], **{p: kwparams[p] for p in sig}) + + return y diff --git a/qiskit_experiments/curve_analysis/visualization/fit_result_plotters.py b/qiskit_experiments/curve_analysis/visualization/fit_result_plotters.py index 537c3ed759..977962c875 100644 --- a/qiskit_experiments/curve_analysis/visualization/fit_result_plotters.py +++ b/qiskit_experiments/curve_analysis/visualization/fit_result_plotters.py @@ -22,6 +22,7 @@ """ from collections import defaultdict +import functools from typing import List, Dict, Optional import uncertainties @@ -47,6 +48,7 @@ def draw( fit_samples: List[CurveData], tick_labels: Dict[str, str], fit_data: FitData, + fix_parameters: Dict[str, float], result_entries: List[AnalysisResultData], style: Optional[PlotterStyle] = None, axis: Optional["matplotlib.axes.Axes"] = None, @@ -60,6 +62,7 @@ def draw( tick_labels: Dictionary of axis label information. Axis units and label for x and y value should be explained. fit_data: fit data generated by the analysis. + fix_parameters: parameter not being in fitting. result_entries: List of analysis result data entries. style: Optional. A configuration object to modify the appearance of the figure. axis: Optional. A matplotlib Axis object. @@ -84,6 +87,7 @@ def draw( raw_sample=raw_samp, fit_sample=fit_samp, fit_data=fit_data, + fix_parameters=fix_parameters, style=style, ) @@ -120,7 +124,7 @@ def draw( report_handler = axis.text( *style.fit_report_rpos, - report_str, + s=report_str, ha="center", va="top", size=style.fit_report_text_size, @@ -147,6 +151,7 @@ def draw( fit_samples: List[CurveData], tick_labels: Dict[str, str], fit_data: FitData, + fix_parameters: Dict[str, float], result_entries: List[AnalysisResultData], style: Optional[PlotterStyle] = None, axis: Optional["matplotlib.axes.Axes"] = None, @@ -160,6 +165,7 @@ def draw( tick_labels: Dictionary of axis label information. Axis units and label for x and y value should be explained. fit_data: fit data generated by the analysis. + fix_parameters: parameter not being in fitting. result_entries: List of analysis result data entries. style: Optional. A configuration object to modify the appearance of the figure. axis: Optional. A matplotlib Axis object. @@ -222,6 +228,7 @@ def draw( raw_sample=raw_samples[curve_ind], fit_sample=fit_samples[curve_ind], fit_data=fit_data, + fix_parameters=fix_parameters, style=style, ) @@ -272,7 +279,7 @@ def draw( report_handler = axis.text( *style.fit_report_rpos, - report_str, + s=report_str, ha="center", va="top", size=style.fit_report_text_size, @@ -294,6 +301,7 @@ def draw_single_curve_mpl( raw_sample: CurveData, fit_sample: CurveData, fit_data: FitData, + fix_parameters: Dict[str, float], style: PlotterStyle, ): """A function that draws a single curve on the given plotter canvas. @@ -304,6 +312,7 @@ def draw_single_curve_mpl( raw_sample: Raw sample data. fit_sample: Formatted sample data. fit_data: Fitting parameter collection. + fix_parameters: Parameters not being in the fitting. style: Style sheet for plotting. """ @@ -330,15 +339,19 @@ def draw_single_curve_mpl( ) # plot fit curve - if fit_data: - plot_curve_fit( - func=series_def.fit_func, - result=fit_data, - ax=axis, - color=series_def.plot_color, - zorder=2, - fit_uncertainty=style.plot_sigma, - ) + if fix_parameters: + fit_func = functools.partial(series_def.fit_func, **fix_parameters) + else: + fit_func = series_def.fit_func + + plot_curve_fit( + func=fit_func, + result=fit_data, + ax=axis, + color=series_def.plot_color, + zorder=2, + fit_uncertainty=style.plot_sigma, + ) def write_fit_report(result_entries: List[AnalysisResultData]) -> str: diff --git a/releasenotes/notes/add-fit-model-and-remove-curve_fitter-arg-4a0effb5f9b88ba9.yaml b/releasenotes/notes/add-fit-model-and-remove-curve_fitter-arg-4a0effb5f9b88ba9.yaml new file mode 100644 index 0000000000..ccd6f318fc --- /dev/null +++ b/releasenotes/notes/add-fit-model-and-remove-curve_fitter-arg-4a0effb5f9b88ba9.yaml @@ -0,0 +1,34 @@ +--- +features: + - | + :attr:`CurveAnalysis.fit_model` and :attr:`CurveAnalysis.parameters` have been + added to the curve analysis and its subclasses. These properties return + :class:`FitModel` instance that represents a fit function of the analysis + and parameters to be fit with the analysis, respectively. + - | + New classes :class:`SingleFitFunction` and :class:`CompositeFitFunction` have been + added to the curve analysis module. These fit function instance is created for + each curve analysis subclass, and user can access it via + the curve analysis property ``.fit_model``. + Note that the fit function instance can be called by itself, and user can manually test + the curve beeing computed against given x values and parameters. + In addition, these function implements pretty printing to describe the function itself. + These information have been hidden in the private member :attr:`CurveAnalysis.__series__`, + but now one can easily grasp the fit model details with these fit function object. +upgrade: + - | + Analysis option `curve_fitter` of the :class:`CurveAnalysis` has been removed. + The `curve_fitter` had been a python callable that performs curve fit with given numpy arrays + and fit functions, which defaulted to + :func:`~qiskit_experiments.curve_analysis.curve_fit.multi_curve_fit`. + Becasue of the strict specification of the input and output data format of the function, + this has been removed from the analysis options and + now it is integarated as a class method :meth:`CurveAnalysis.curve_fit`. + This integaration will avoid complicated serialization (no version control is performed + for functions) and will offer better reproducibility of analysis for future updates. +developer: + - | + :meth:`CurveAnalysis.curve_fit` has been added to the curve analysis and + its subclasses. Now you can directly access the core fitting code + with bare numpy arrays representing data to be fit. + This may help debugging of new fit functions. diff --git a/test/curve_analysis/test_curve_analysis_base_class.py b/test/curve_analysis/test_curve_analysis_base_class.py new file mode 100644 index 0000000000..5ea673b6ed --- /dev/null +++ b/test/curve_analysis/test_curve_analysis_base_class.py @@ -0,0 +1,865 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2021. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +# pylint: disable=invalid-name, missing-class-docstring, unsubscriptable-object + +"""Test curve fitting base class.""" +from test.base import QiskitExperimentsTestCase + +import numpy as np +from uncertainties import unumpy as unp + +from qiskit_experiments.curve_analysis import CurveAnalysis, fit_function +from qiskit_experiments.curve_analysis.curve_data import SeriesDef, FitData, ParameterRepr +from qiskit_experiments.curve_analysis.fit_models import SingleFitFunction, CompositeFitFunction +from qiskit_experiments.data_processing import DataProcessor, Probability +from qiskit_experiments.exceptions import AnalysisError +from qiskit_experiments.framework import ExperimentData, AnalysisResultData, CompositeAnalysis + + +class TestCompositeFunction(QiskitExperimentsTestCase): + """Test behavior of CompositeFunction which is a core object of CurveAnalysis. + + This is new fit function wrapper introduced in Qiskit Experiments 0.3. + This function-like object should manage parameter assignment and mapping to + manage multiple sub functions (curves) for multi-objective optimization. + """ + + def test_single_function(self): + """A simple testcase for having only single fit function.""" + + def child_function(x, par0, par1): + return par0 * x + par1 + + function = SingleFitFunction( + fit_functions=[child_function], + signatures=[["par0", "par1"]], + fit_models=["par0 x + par1"], + ) + + self.assertListEqual(function.signature, ["par0", "par1"]) + self.assertEqual(function.fit_model, "par0 x + par1") + self.assertEqual(repr(function), "SingleFitFunction(x, par0, par1)") + + x = np.linspace(0, 1, 10) + par0 = 1 + par1 = 2 + ref_y = child_function(x, par0, par1) + test_y = function(x, par0, par1) + + np.testing.assert_array_equal(ref_y, test_y) + + def test_single_function_parameter_fixed(self): + """Test when some parameters are fixed.""" + + def child_function(x, par0, par1): + return par0 * x + par1 + + x = np.linspace(0, 1, 10) + par0 = 1 + par1 = 2 + + function = SingleFitFunction( + fit_functions=[child_function], + signatures=[["par0", "par1"]], + ) + function.bind_parameters(par0=par0) + + self.assertListEqual(function.signature, ["par1"]) + self.assertEqual(repr(function), "SingleFitFunction(x, par1; @ Fixed par0)") + + ref_y = child_function(x, par0, par1) + test_y = function(x, par1) + + np.testing.assert_array_equal(ref_y, test_y) + + def test_multiple_functions(self): + """Test with multiple functions.""" + + def child_function1(x, par0, par1): + return par0 * x + par1 + + def child_function2(x, par0, par2): + return par0 * x - par2 + + function = CompositeFitFunction( + fit_functions=[child_function1, child_function2], + signatures=[["par0", "par1"], ["par0", "par2"]], + ) + + self.assertListEqual(function.signature, ["par0", "par1", "par2"]) + + x1 = np.linspace(0, 1, 10) + x2 = np.linspace(2, 3, 10) + par0 = 1 + par1 = 2 + par2 = 3 + ref_y1 = child_function1(x1, par0, par1) + ref_y2 = child_function2(x2, par0, par2) + + data_index = np.array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]) + ref_y = np.zeros(ref_y1.size + ref_y2.size) + ref_y[data_index == 0] = ref_y1 + ref_y[data_index == 1] = ref_y2 + + # Need to set data index + function.data_allocation = data_index + test_y = function(np.r_[x1, x2], par0, par1, par2) + + np.testing.assert_array_equal(ref_y, test_y) + + def test_multiple_functions_with_fixed_parameter(self): + """Test with multiple functions while some parameters are fixed.""" + + def child_function1(x, par0, par1): + return par0 * x + par1 + + def child_function2(x, par0, par2): + return par0 * x - par2 + + x1 = np.linspace(0, 1, 10) + x2 = np.linspace(2, 3, 10) + par0 = 1 + par1 = 2 + par2 = 3 + + function = CompositeFitFunction( + fit_functions=[child_function1, child_function2], + signatures=[["par0", "par1"], ["par0", "par2"]], + ) + function.bind_parameters(par1=par1) + + self.assertListEqual(function.signature, ["par0", "par2"]) + self.assertEqual(repr(function), "CompositeFitFunction(x, par0, par2; @ Fixed par1)") + + ref_y1 = child_function1(x1, par0, par1) + ref_y2 = child_function2(x2, par0, par2) + + data_index = np.array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]) + ref_y = np.zeros(ref_y1.size + ref_y2.size) + ref_y[data_index == 0] = ref_y1 + ref_y[data_index == 1] = ref_y2 + + function.data_allocation = data_index + test_y = function(np.r_[x1, x2], par0, par2) + + np.testing.assert_array_equal(ref_y, test_y) + + +class TestCurveFit(QiskitExperimentsTestCase): + """Test core fitting functionality by bypassing analysis framework. + + CurveAnalysis can provide fit function and fit algorithm via its + instance property and static method, we can only unittest fitting part. + This test suite validate fitting function with various situation including + single curve, mutiple curves, parameter fixsing, etc... + """ + + def test_single_function(self): + """Test case for single curve entry.""" + + def child_function(x, par0, par1): + return par0 * x + par1 + + class MyCurveFit(CurveAnalysis): + __series__ = [SeriesDef(fit_func=child_function)] + + instance = MyCurveFit() + + x = np.linspace(0, 1, 10) + par0 = 1 + par1 = 2 + fake_outcome = child_function(x, par0, par1) + + fit_func = instance.fit_model + result = instance.curve_fit( + func=fit_func, + xdata=x, + ydata=fake_outcome, + sigma=np.zeros_like(fake_outcome), + p0={"par0": 0.9, "par1": 2.1}, + bounds={"par0": (0, 2), "par1": (1, 3)}, + ) + self.assertIsInstance(result, FitData) + + self.assertEqual(result.fit_model, "not defined") + self.assertEqual(result.popt_keys, ["par0", "par1"]) + self.assertEqual(result.dof, 8) + np.testing.assert_array_almost_equal(unp.nominal_values(result.popt), [par0, par1]) + + # test if values are operable + par0_val = result.fitval("par0") + par1_val = result.fitval("par1") + + new_quantity = par0_val + par1_val + self.assertAlmostEqual(new_quantity.s, np.sqrt(par0_val.s**2 + par1_val.s**2)) + + def test_single_function_with_fixed_parameter(self): + """Test case for single curve entry and parameters are fixed.""" + + def child_function(x, par0, par1): + return par0 * x + par1 + + class MyCurveFit(CurveAnalysis): + __series__ = [SeriesDef(fit_func=child_function)] + + @classmethod + def _default_options(cls): + opts = super()._default_options() + opts.fixed_parameters = {"par1": 2} + + return opts + + instance = MyCurveFit() + + # parameter par1 is excluded + self.assertListEqual(instance.parameters, ["par0"]) + + x = np.linspace(0, 1, 10) + par0 = 1 + par1 = 2 + fake_outcome = child_function(x, par0, par1) + + fit_func = instance.fit_model + result = instance.curve_fit( + func=fit_func, + xdata=x, + ydata=fake_outcome, + sigma=np.zeros_like(fake_outcome), + p0={"par0": 0.9}, + bounds={"par0": (0, 2)}, + ) + + self.assertEqual(result.popt_keys, ["par0"]) + self.assertAlmostEqual(result.popt[0], par0) + + def test_single_function_user_fix_parameters(self): + """Test case for single curve entry and user fixes parameter afterwards.""" + + def child_function(x, par0, par1): + return par0 * x + par1 + + class MyCurveFit(CurveAnalysis): + __series__ = [SeriesDef(fit_func=child_function)] + + instance = MyCurveFit() + + # both par0 and par1 are included + self.assertListEqual(instance.parameters, ["par0", "par1"]) + + # par1 is excluded + instance.set_options(fixed_parameters={"par1": 2}) + self.assertListEqual(instance.parameters, ["par0"]) + + x = np.linspace(0, 1, 10) + par0 = 1 + par1 = 2 + fake_outcome = child_function(x, par0, par1) + + fit_func = instance.fit_model + result = instance.curve_fit( + func=fit_func, + xdata=x, + ydata=fake_outcome, + sigma=np.zeros_like(fake_outcome), + p0={"par0": 0.9}, + bounds={"par0": (0, 2)}, + ) + + self.assertEqual(result.popt_keys, ["par0"]) + self.assertAlmostEqual(result.popt[0], par0) + + def test_multiple_functions(self): + """Test case for multiple curve entries.""" + + def child_function1(x, par0, par1): + return par0 * x + par1 + + def child_function2(x, par0, par2): + return par0 * x - par2 + + class MyCurveFit(CurveAnalysis): + __series__ = [ + SeriesDef( + fit_func=child_function1, + model_description="par0 x + par1", + ), + SeriesDef( + fit_func=child_function2, + model_description="par0 x - par2", + ), + ] + + instance = MyCurveFit() + + x1 = np.linspace(0, 1, 10) + x2 = np.linspace(2, 3, 10) + par0 = 1 + par1 = 2 + par2 = 3 + ref_y1 = child_function1(x1, par0, par1) + ref_y2 = child_function2(x2, par0, par2) + + data_index = np.array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]) + fake_outcome = np.zeros(ref_y1.size + ref_y2.size) + fake_outcome[data_index == 0] = ref_y1 + fake_outcome[data_index == 1] = ref_y2 + + fit_func = instance.fit_model + fit_func.data_allocation = data_index + + result = instance.curve_fit( + func=fit_func, + xdata=np.r_[x1, x2], + ydata=fake_outcome, + sigma=np.zeros_like(fake_outcome), + p0={"par0": 0.9, "par1": 2.1, "par2": 2.9}, + bounds={"par0": (0, 2), "par1": (1, 3), "par2": (2, 4)}, + ) + + self.assertEqual(result.fit_model, "par0 x + par1,par0 x - par2") + self.assertEqual(result.popt_keys, ["par0", "par1", "par2"]) + np.testing.assert_array_almost_equal(unp.nominal_values(result.popt), [par0, par1, par2]) + + def test_assert_dof_error(self): + """Test raise an DOF error when input data size is too small.""" + + def child_function(x, par0, par1): + return par0 * x + par1 + + class MyCurveFit(CurveAnalysis): + __series__ = [SeriesDef(fit_func=child_function)] + + instance = MyCurveFit() + + x = np.array([2]) # DOF = 0 + par0 = 1 + par1 = 2 + fake_outcome = child_function(x, par0, par1) + + fit_func = instance.fit_model + with self.assertRaises(AnalysisError): + instance.curve_fit( + func=fit_func, + xdata=x, + ydata=fake_outcome, + sigma=np.zeros_like(fake_outcome), + p0={"par0": 0.9, "par1": 2.1}, + bounds={"par0": (0, 2), "par1": (1, 3)}, + ) + + def test_assert_invalid_fit(self): + """Test scipy solver error is converted into AnalysisError.""" + + def child_function(x, par0, par1): + return par0 * x + par1 + + class MyCurveFit(CurveAnalysis): + __series__ = [SeriesDef(fit_func=child_function)] + + instance = MyCurveFit() + + x = np.linspace(0, 1, 10) + par0 = 1 + par1 = 2 + fake_outcome = child_function(x, par0, par1) + + fit_func = instance.fit_model + with self.assertRaises(AnalysisError): + instance.curve_fit( + func=fit_func, + xdata=x, + ydata=fake_outcome, + sigma=np.zeros_like(fake_outcome), + p0={"par0": 0, "par1": 2.1}, + bounds={"par0": (-1, 0), "par1": (-1, 0)}, # impossible to fit within this range + ) + + def test_assert_fit_with_bare_calback(self): + """Test raise error when normal callback is set.""" + + def child_function(x, par0, par1): + return par0 * x + par1 + + class MyCurveFit(CurveAnalysis): + __series__ = [SeriesDef(fit_func=child_function)] + + instance = MyCurveFit() + + x = np.linspace(0, 1, 10) + par0 = 1 + par1 = 2 + fake_outcome = child_function(x, par0, par1) + + with self.assertRaises(AnalysisError): + instance.curve_fit( + func=child_function, # cannot manage parameter mapping and metadata + xdata=x, + ydata=fake_outcome, + sigma=np.zeros_like(fake_outcome), + p0={"par0": 0, "par1": 2.1}, + bounds={"par0": (-1, 0), "par1": (-1, 0)}, # impossible to fit within this range + ) + + def test_assert_invalid_fixed_parameter(self): + """Test we cannot create invalid analysis instance with wrong fixed value name.""" + + class InvalidAnalysis(CurveAnalysis): + __series__ = [ + SeriesDef( + fit_func=lambda x, par0: x + par0, + ) + ] + + @classmethod + def _default_options(cls): + opts = super()._default_options() + opts.fixed_parameters = {"not_existing": 1} + + return opts + + with self.assertRaises(AnalysisError): + InvalidAnalysis() + + +class CurveAnalysisTestCase(QiskitExperimentsTestCase): + """A baseclass for CurveAnalysis unittest.""" + + seeds = 123 + + @classmethod + def single_sampler(cls, xvalues, yvalues, shots=10000, **metadata): + """A helper function to generate experiment data.""" + rng = np.random.default_rng(seed=cls.seeds) + counts = rng.binomial(shots, yvalues) + data = [ + { + "counts": {"0": shots - c, "1": c}, + "metadata": {"xval": xi, "qubits": (0,), **metadata}, + } + for xi, c in zip(xvalues, counts) + ] + + return data + + @classmethod + def parallel_sampler(cls, xvalues, yvalues1, yvalues2, shots=10000): + """A helper function to generate fake parallel experiment data.""" + rng = np.random.default_rng(seed=cls.seeds) + + data = [] + for xi, par1, par2 in zip(xvalues, yvalues1, yvalues2): + cs = rng.multinomial( + shots, [(1 - par1) * (1 - par2), par1 * (1 - par2), (1 - par1) * par2, par1 * par2] + ) + circ_data = { + "counts": {"00": cs[0], "01": cs[1], "10": cs[2], "11": cs[3]}, + "metadata": { + "composite_index": [0, 1], + "composite_metadata": [{"xval": xi}, {"xval": xi}], + "composite_qubits": [[0], [1]], + "composite_clbits": [[0], [1]], + }, + } + data.append(circ_data) + + return data + + +class TestCurveAnalysisUnit(CurveAnalysisTestCase): + """Unittest of CurveAnalysis functionality.""" + + def setUp(self): + super().setUp() + + # Description of test setting + # + # - This model contains three curves, namely, curve1, curve2, curve3 + # - Each curve can be represented by the same function + # - Parameter amp and baseline are shared among all curves + # - Each curve has unique lamb + # - In total 5 parameters in the fit, namely, par0, par1, par2, par3 + # + class MyAnalysis(CurveAnalysis): + """Test analysis""" + + # Note that series def function can take different argument now. + # The signature of composite function is generated on the fly. + __series__ = [ + SeriesDef( + name="curve1", + fit_func=lambda x, par0, par1, par4: fit_function.exponential_decay( + x, amp=par0, lamb=par1, baseline=par4 + ), + filter_kwargs={"type": 1, "valid": True}, + model_description=r"p_0 * \exp(p_1 x) + par4", + ), + SeriesDef( + name="curve2", + fit_func=lambda x, par0, par2, par4: fit_function.exponential_decay( + x, amp=par0, lamb=par2, baseline=par4 + ), + filter_kwargs={"type": 2, "valid": True}, + model_description=r"p_0 * \exp(p_2 x) + par4", + ), + SeriesDef( + name="curve3", + fit_func=lambda x, par0, par3, par4: fit_function.exponential_decay( + x, amp=par0, lamb=par3, baseline=par4 + ), + filter_kwargs={"type": 3, "valid": True}, + model_description=r"p_0 * \exp(p_3 x) + par4", + ), + ] + + self.analysis_cls = MyAnalysis + + def test_parsed_fit_params(self): + """Test parsed fit params.""" + instance = self.analysis_cls() + + # Note that parameters are ordered according to the following rule. + # 1. Take series[0] and add its fittting parameters + # 2. Take next series and its fitting parameters if not exist in the list + # 3. Repeat until the last series + self.assertListEqual(instance.parameters, ["par0", "par1", "par4", "par2", "par3"]) + + def test_data_extraction(self): + """Test data extraction method.""" + shots = 5000000 # something big for data generation unittest + + instance = self.analysis_cls() + instance.set_options(x_key="xval") + + def data_processor(datum): + count = datum["counts"].get("1", 0) + pmean = count / shots + return pmean, pmean * (1 - pmean) / shots + + x = np.linspace(1.0, 5.0, 10) + y1 = fit_function.exponential_decay(x, amp=1.0) + y2 = fit_function.exponential_decay(x, amp=0.5) + + test_data_y1 = self.single_sampler(xvalues=x, yvalues=y1, shots=shots, type=1, valid=True) + test_data_y2 = self.single_sampler(xvalues=x, yvalues=y2, shots=shots, type=2, valid=False) + + expdata = ExperimentData() + expdata.add_data(test_data_y1 + test_data_y2) + + instance._extract_curves(experiment_data=expdata, data_processor=data_processor) + raw_data = instance._data(label="raw_data") + + # check x value + xdata = raw_data.x + ref_x = np.r_[x, x] + np.testing.assert_array_equal(xdata, ref_x) + + # check y value + ydata = raw_data.y + ref_y = np.r_[y1, y2] + np.testing.assert_array_almost_equal(ydata, ref_y, decimal=3) + + # check sigma + sigma = raw_data.y_err + ref_sigma = np.r_[y1 * (1 - y1) / shots, y2 * (1 - y2) / shots] + np.testing.assert_array_almost_equal(sigma, ref_sigma, decimal=3) + + # check data index + index = raw_data.data_index + ref_index = np.r_[np.full(10, 0), np.full(10, -1)] # second value doesn't match; -1 + np.testing.assert_array_equal(index, ref_index) + + def test_get_subset(self): + """Test that get subset data from full data array.""" + instance = self.analysis_cls() + instance.set_options(x_key="xval") + + fake_data = [ + {"data": 1, "metadata": {"xval": 1, "type": 1, "valid": True}}, + {"data": 2, "metadata": {"xval": 2, "type": 2, "valid": True}}, + {"data": 3, "metadata": {"xval": 3, "type": 1, "valid": True}}, + {"data": 4, "metadata": {"xval": 4, "type": 3, "valid": True}}, + {"data": 5, "metadata": {"xval": 5, "type": 3, "valid": True}}, + {"data": 6, "metadata": {"xval": 6, "type": 4, "valid": True}}, # this if fake + ] + expdata = ExperimentData() + expdata.add_data(fake_data) + + def data_processor(datum): + return datum["data"], datum["data"] * 2 + + instance._extract_curves(expdata, data_processor=data_processor) + + filt_data = instance._data(series_name="curve1") + np.testing.assert_array_equal(filt_data.x, np.asarray([1, 3], dtype=float)) + np.testing.assert_array_equal(filt_data.y, np.asarray([1, 3], dtype=float)) + np.testing.assert_array_equal(filt_data.y_err, np.asarray([2, 6], dtype=float)) + + filt_data = instance._data(series_name="curve2") + np.testing.assert_array_equal(filt_data.x, np.asarray([2], dtype=float)) + np.testing.assert_array_equal(filt_data.y, np.asarray([2], dtype=float)) + np.testing.assert_array_equal(filt_data.y_err, np.asarray([4], dtype=float)) + + filt_data = instance._data(series_name="curve3") + np.testing.assert_array_equal(filt_data.x, np.asarray([4, 5], dtype=float)) + np.testing.assert_array_equal(filt_data.y, np.asarray([4, 5], dtype=float)) + np.testing.assert_array_equal(filt_data.y_err, np.asarray([8, 10], dtype=float)) + + +class TestCurveAnalysisIntegration(CurveAnalysisTestCase): + """Unittest of CurveAnalysis full functionality. + + Because parameter mapping and fitting feature is already tested in + TestCompositeFunction and TestCurveFit, + this test suite focuses on the entire workflow of curve analysis. + """ + + def test_single_function(self): + """Simple test case with a single curve.""" + par0 = 0.5 + par1 = 3 + + data_processor = DataProcessor(input_key="counts", data_actions=[Probability("1")]) + xvalues = np.linspace(0, 1, 100) + yvalues = fit_function.exponential_decay(xvalues, amp=par0, lamb=par1) + + class MyAnalysis(CurveAnalysis): + __series__ = [ + SeriesDef( + fit_func=lambda x, par0, par1: fit_function.exponential_decay( + x, amp=par0, lamb=par1 + ) + ) + ] + + test_data = self.single_sampler(xvalues, yvalues) + expdata = ExperimentData() + expdata.add_data(test_data) + + init_guess = {"par0": 0.4, "par1": 2.9} + instance = MyAnalysis() + instance.set_options( + x_key="xval", + p0=init_guess, + result_parameters=[ParameterRepr("par0", "amp"), ParameterRepr("par1", "lamb")], + data_processor=data_processor, + plot=False, + ) + + run_expdata = instance.run(expdata, replace_results=False) + + all_parameters = run_expdata.analysis_results("@Parameters_MyAnalysis") + par0_analyzed = run_expdata.analysis_results("amp") + par1_analyzed = run_expdata.analysis_results("lamb") + + np.testing.assert_array_almost_equal(all_parameters.value, [par0, par1], decimal=2) + self.assertAlmostEqual(par0_analyzed.value.n, par0, delta=0.05) + self.assertAlmostEqual(par1_analyzed.value.n, par1, delta=0.05) + + def test_extra_entry(self): + """Simple test case analysis add new entry.""" + par0 = 0.5 + par1 = 3 + + data_processor = DataProcessor(input_key="counts", data_actions=[Probability("1")]) + xvalues = np.linspace(0, 1, 100) + yvalues = fit_function.exponential_decay(xvalues, amp=par0, lamb=par1) + + class MyAnalysis(CurveAnalysis): + __series__ = [ + SeriesDef( + fit_func=lambda x, par0, par1: fit_function.exponential_decay( + x, amp=par0, lamb=par1 + ) + ) + ] + + def _extra_database_entry(self, fit_data): + return [ + AnalysisResultData( + name="new_value", + value=fit_data.fitval("par0") + fit_data.fitval("par1"), + ) + ] + + test_data = self.single_sampler(xvalues, yvalues) + expdata = ExperimentData() + expdata.add_data(test_data) + + init_guess = {"par0": 0.4, "par1": 2.9} + instance = MyAnalysis() + instance.set_options( + x_key="xval", + p0=init_guess, + data_processor=data_processor, + plot=False, + ) + + run_expdata = instance.run(expdata, replace_results=False) + + new_entry = run_expdata.analysis_results("new_value") + + self.assertAlmostEqual(new_entry.value.n, par0 + par1, delta=0.05) + + def test_evaluate_quality(self): + """Simple test case evaluating quality.""" + par0 = 0.5 + par1 = 3 + + data_processor = DataProcessor(input_key="counts", data_actions=[Probability("1")]) + xvalues = np.linspace(0, 1, 100) + yvalues = fit_function.exponential_decay(xvalues, amp=par0, lamb=par1) + + class MyAnalysis(CurveAnalysis): + __series__ = [ + SeriesDef( + fit_func=lambda x, par0, par1: fit_function.exponential_decay( + x, amp=par0, lamb=par1 + ) + ) + ] + + def _evaluate_quality(self, fit_data): + return "evaluated!" + + test_data = self.single_sampler(xvalues, yvalues) + expdata = ExperimentData() + expdata.add_data(test_data) + + init_guess = {"par0": 0.4, "par1": 2.9} + instance = MyAnalysis() + instance.set_options( + x_key="xval", + p0=init_guess, + data_processor=data_processor, + plot=False, + ) + + run_expdata = instance.run(expdata, replace_results=False) + + entry = run_expdata.analysis_results(0) + self.assertEqual(entry.quality, "evaluated!") + + def test_curve_analysis_multi_thread(self): + """Test case for composite analyis. + + Check if analysis works properly when two instances are simultaneously operated + in the multiple threads. Note that composite function is a class attribute + thus it should not be modified during the fit. + """ + par00 = 0.5 + par10 = 3 + + par01 = 0.5 + par11 = 4 + + data_processor = DataProcessor(input_key="counts", data_actions=[Probability("1")]) + xvalues = np.linspace(0, 1, 100) + yvalues_a = fit_function.exponential_decay(xvalues, amp=par00, lamb=par10) + yvalues_b = fit_function.exponential_decay(xvalues, amp=par01, lamb=par11) + + comp_data = self.parallel_sampler(xvalues, yvalues_a, yvalues_b) + + subdata1 = ExperimentData() + subdata2 = ExperimentData() + + composite_expdata = ExperimentData() + composite_expdata.metadata["component_child_index"] = [0, 1] + composite_expdata.add_child_data(subdata1) + composite_expdata.add_child_data(subdata2) + composite_expdata.add_data(comp_data) + + class MyAnalysis(CurveAnalysis): + __series__ = [ + SeriesDef( + fit_func=lambda x, par0, par1: fit_function.exponential_decay( + x, amp=par0, lamb=par1 + ) + ) + ] + + @classmethod + def _default_options(cls): + options = super()._default_options() + options.data_processor = data_processor + options.plot = False + options.result_parameters = ["par0"] + options.p0 = {"par0": 0.49} + options.bounds = {"par0": (0.4, 0.6)} + options.par1 = None + + return options + + # Override CompositeFitFunction with different fixed parameters + # Model attached to each instance should be independent object. + sub_analysis1 = MyAnalysis() + sub_analysis1.set_options(fixed_parameters={"par1": par10}) + sub_analysis2 = MyAnalysis() + sub_analysis2.set_options(fixed_parameters={"par1": par11}) + + instance = CompositeAnalysis([sub_analysis1, sub_analysis2]) + run_expdata = instance.run(composite_expdata, replace_results=False).block_for_results() + + par0_sub1 = run_expdata.child_data(0).analysis_results("par0") + self.assertAlmostEqual(par0_sub1.value.n, par00, delta=0.05) + + par0_sub2 = run_expdata.child_data(1).analysis_results("par0") + self.assertAlmostEqual(par0_sub2.value.n, par01, delta=0.05) + + +class TestBackwardCompatibility(QiskitExperimentsTestCase): + """Test case for backward compatibility.""" + + def test_old_fixed_param_attributes(self): + """Test if old class structure for fixed param is still supported.""" + + class _DeprecatedAnalysis(CurveAnalysis): + __series__ = [ + SeriesDef( + fit_func=lambda x, par0, par1, par2, par3: fit_function.exponential_decay( + x, amp=par0, lamb=par1, x0=par2, baseline=par3 + ), + ) + ] + + __fixed_parameters__ = ["par1"] + + @classmethod + def _default_options(cls): + opts = super()._default_options() + opts.par1 = 2 + + return opts + + with self.assertWarns(DeprecationWarning): + instance = _DeprecatedAnalysis() + + self.assertDictEqual(instance.options.fixed_parameters, {"par1": 2}) + + def test_loading_data_with_deprecated_fixed_param(self): + """Test loading old data with fixed parameters as standalone options.""" + + class _DeprecatedAnalysis(CurveAnalysis): + __series__ = [ + SeriesDef( + fit_func=lambda x, par0, par1, par2, par3: fit_function.exponential_decay( + x, amp=par0, lamb=par1, x0=par2, baseline=par3 + ), + ) + ] + + with self.assertWarns(DeprecationWarning): + # old option data structure, i.e. fixed param as a standalone option + # the analysis instance fixed parameters might be set via the experiment instance + instance = _DeprecatedAnalysis.from_config({"options": {"par1": 2}}) + + self.assertDictEqual(instance.options.fixed_parameters, {"par1": 2}) diff --git a/test/curve_analysis/test_curve_fit.py b/test/curve_analysis/test_curve_fit.py deleted file mode 100644 index ef5051b206..0000000000 --- a/test/curve_analysis/test_curve_fit.py +++ /dev/null @@ -1,796 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2021. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -# pylint: disable=invalid-name - -"""Test curve fitting base class.""" -from test.base import QiskitExperimentsTestCase -from test.fake_experiment import FakeExperiment -from typing import List - -import numpy as np -from qiskit.qobj.utils import MeasLevel -from uncertainties import correlated_values - -from qiskit_experiments.curve_analysis import CurveAnalysis, fit_function -from qiskit_experiments.curve_analysis.curve_data import ( - SeriesDef, - FitData, - ParameterRepr, - FitOptions, -) -from qiskit_experiments.curve_analysis.data_processing import probability -from qiskit_experiments.exceptions import AnalysisError -from qiskit_experiments.framework import ExperimentData - - -def simulate_output_data(func, xvals, param_dict, **metadata): - """Generate arbitrary fit data.""" - __shots = 100000 - - expected_probs = func(xvals, **param_dict) - counts = np.asarray(expected_probs * __shots, dtype=int) - - data = [ - { - "counts": {"0": __shots - count, "1": count}, - "metadata": dict(xval=xi, qubits=(0,), experiment_type="fake_experiment", **metadata), - } - for xi, count in zip(xvals, counts) - ] - - expdata = ExperimentData(experiment=FakeExperiment()) - for datum in data: - expdata.add_data(datum) - - expdata.metadata["job_metadata"] = [{"run_options": {"meas_level": MeasLevel.CLASSIFIED}}] - - return expdata - - -def create_new_analysis(series: List[SeriesDef], fixed_params: List[str] = None) -> CurveAnalysis: - """A helper function to create a mock analysis class instance.""" - - class TestAnalysis(CurveAnalysis): - """A mock analysis class to test.""" - - __series__ = series - - @classmethod - def _default_options(cls): - opts = super()._default_options() - if fixed_params: - opts.fixed_parameters = {p: None for p in fixed_params} - - return opts - - return TestAnalysis() - - -class TestFitData(QiskitExperimentsTestCase): - """Unittest for fit data dataclass.""" - - def test_get_value(self): - """Get fit value from fit data object.""" - pcov = np.diag(np.ones(3)) - popt = np.asarray([1.0, 2.0, 3.0]) - fit_params = correlated_values(popt, pcov) - - data = FitData( - popt=fit_params, - popt_keys=["a", "b", "c"], - pcov=pcov, - reduced_chisq=0.0, - dof=0, - x_range=(0, 0), - y_range=(0, 0), - ) - - a_val = data.fitval("a") - self.assertEqual(a_val, fit_params[0]) - - b_val = data.fitval("b") - self.assertEqual(b_val, fit_params[1]) - - c_val = data.fitval("c") - self.assertEqual(c_val, fit_params[2]) - - -class TestCurveAnalysisUnit(QiskitExperimentsTestCase): - """Unittest for curve fit analysis.""" - - def setUp(self): - super().setUp() - self.xvalues = np.linspace(1.0, 5.0, 10) - - # Description of test setting - # - # - This model contains three curves, namely, curve1, curve2, curve3 - # - Each curve can be represented by the same function - # - Parameter amp and baseline are shared among all curves - # - Each curve has unique lamb - # - In total 5 parameters in the fit, namely, p0, p1, p2, p3 - # - self.analysis = create_new_analysis( - series=[ - SeriesDef( - name="curve1", - fit_func=lambda x, par0, par1, par2, par3, par4: fit_function.exponential_decay( - x, amp=par0, lamb=par1, baseline=par4 - ), - filter_kwargs={"type": 1, "valid": True}, - model_description=r"p_0 * \exp(p_1 x) + p4", - ), - SeriesDef( - name="curve2", - fit_func=lambda x, par0, par1, par2, par3, par4: fit_function.exponential_decay( - x, amp=par0, lamb=par2, baseline=par4 - ), - filter_kwargs={"type": 2, "valid": True}, - model_description=r"p_0 * \exp(p_2 x) + p4", - ), - SeriesDef( - name="curve3", - fit_func=lambda x, par0, par1, par2, par3, par4: fit_function.exponential_decay( - x, amp=par0, lamb=par3, baseline=par4 - ), - filter_kwargs={"type": 3, "valid": True}, - model_description=r"p_0 * \exp(p_3 x) + p4", - ), - ], - ) - self.err_decimal = 3 - - def test_parsed_fit_params(self): - """Test parsed fit params.""" - self.assertSetEqual( - set(self.analysis._fit_params()), {"par0", "par1", "par2", "par3", "par4"} - ) - - def test_cannot_create_invalid_series_fit(self): - """Test we cannot create invalid analysis instance.""" - invalid_series = [ - SeriesDef( - name="fit1", - fit_func=lambda x, par0: fit_function.exponential_decay(x, amp=par0), - ), - SeriesDef( - name="fit2", - fit_func=lambda x, par1: fit_function.exponential_decay(x, amp=par1), - ), - ] - - instance = create_new_analysis(series=invalid_series) - with self.assertRaises(AnalysisError): - # pylint: disable=pointless-statement - instance.parameters # fit1 has param par0 while fit2 has par1 - - def test_data_extraction(self): - """Test data extraction method.""" - self.analysis.set_options(x_key="xval") - - # data to analyze - test_data0 = simulate_output_data( - func=fit_function.exponential_decay, - xvals=self.xvalues, - param_dict={"amp": 1.0}, - type=1, - valid=True, - ) - - # fake data - test_data1 = simulate_output_data( - func=fit_function.exponential_decay, - xvals=self.xvalues, - param_dict={"amp": 0.5}, - type=2, - valid=False, - ) - - # merge two experiment data - for datum in test_data1.data(): - test_data0.add_data(datum) - - self.analysis._extract_curves( - experiment_data=test_data0, data_processor=probability(outcome="1") - ) - - raw_data = self.analysis._data(label="raw_data") - - xdata = raw_data.x - ydata = raw_data.y - sigma = raw_data.y_err - d_index = raw_data.data_index - - # check if the module filter off data: valid=False - self.assertEqual(len(xdata), 20) - - # check x values - ref_x = np.concatenate((self.xvalues, self.xvalues)) - np.testing.assert_array_almost_equal(xdata, ref_x) - - # check y values - ref_y = np.concatenate( - ( - fit_function.exponential_decay(self.xvalues, amp=1.0), - fit_function.exponential_decay(self.xvalues, amp=0.5), - ) - ) - np.testing.assert_array_almost_equal(ydata, ref_y, decimal=self.err_decimal) - - # check series - ref_series = np.concatenate((np.zeros(10, dtype=int), -1 * np.ones(10, dtype=int))) - self.assertListEqual(list(d_index), list(ref_series)) - - # check y errors - ref_yerr = ref_y * (1 - ref_y) / 100000 - np.testing.assert_array_almost_equal(sigma, ref_yerr, decimal=self.err_decimal) - - def test_get_subset(self): - """Test that get subset data from full data array.""" - # data to analyze - fake_data = [ - {"data": 1, "metadata": {"xval": 1, "type": 1, "valid": True}}, - {"data": 2, "metadata": {"xval": 2, "type": 2, "valid": True}}, - {"data": 3, "metadata": {"xval": 3, "type": 1, "valid": True}}, - {"data": 4, "metadata": {"xval": 4, "type": 3, "valid": True}}, - {"data": 5, "metadata": {"xval": 5, "type": 3, "valid": True}}, - {"data": 6, "metadata": {"xval": 6, "type": 4, "valid": True}}, # this if fake - ] - expdata = ExperimentData(experiment=FakeExperiment()) - for datum in fake_data: - expdata.add_data(datum) - - def _processor(datum): - return datum["data"], datum["data"] * 2 - - self.analysis.set_options(x_key="xval") - self.analysis._extract_curves(expdata, data_processor=_processor) - - filt_data = self.analysis._data(series_name="curve1") - np.testing.assert_array_equal(filt_data.x, np.asarray([1, 3], dtype=float)) - np.testing.assert_array_equal(filt_data.y, np.asarray([1, 3], dtype=float)) - np.testing.assert_array_equal(filt_data.y_err, np.asarray([2, 6], dtype=float)) - - filt_data = self.analysis._data(series_name="curve2") - np.testing.assert_array_equal(filt_data.x, np.asarray([2], dtype=float)) - np.testing.assert_array_equal(filt_data.y, np.asarray([2], dtype=float)) - np.testing.assert_array_equal(filt_data.y_err, np.asarray([4], dtype=float)) - - filt_data = self.analysis._data(series_name="curve3") - np.testing.assert_array_equal(filt_data.x, np.asarray([4, 5], dtype=float)) - np.testing.assert_array_equal(filt_data.y, np.asarray([4, 5], dtype=float)) - np.testing.assert_array_equal(filt_data.y_err, np.asarray([8, 10], dtype=float)) - - -class TestCurveAnalysisIntegration(QiskitExperimentsTestCase): - """Integration test for curve fit analysis through entire analysis.run function.""" - - def setUp(self): - super().setUp() - self.xvalues = np.linspace(0.1, 1, 50) - self.err_decimal = 2 - - def test_run_single_curve_analysis(self): - """Test analysis for single curve.""" - analysis = create_new_analysis( - series=[ - SeriesDef( - name="curve1", - fit_func=lambda x, par0, par1, par2, par3: fit_function.exponential_decay( - x, amp=par0, lamb=par1, x0=par2, baseline=par3 - ), - model_description=r"p_0 \exp(p_1 x + p_2) + p_3", - ) - ], - ) - ref_p0 = 0.9 - ref_p1 = 2.5 - ref_p2 = 0.0 - ref_p3 = 0.1 - - test_data = simulate_output_data( - func=fit_function.exponential_decay, - xvals=self.xvalues, - param_dict={"amp": ref_p0, "lamb": ref_p1, "x0": ref_p2, "baseline": ref_p3}, - ) - analysis.set_options( - p0={"par0": ref_p0, "par1": ref_p1, "par2": ref_p2, "par3": ref_p3}, - result_parameters=[ParameterRepr("par1", "parameter_name", "unit")], - ) - - results, _ = analysis._run_analysis(test_data) - result = results[0] - - ref_popt = np.asarray([ref_p0, ref_p1, ref_p2, ref_p3]) - - # check result data - np.testing.assert_array_almost_equal(result.value, ref_popt, decimal=self.err_decimal) - self.assertEqual(result.extra["dof"], 46) - self.assertListEqual(result.extra["popt_keys"], ["par0", "par1", "par2", "par3"]) - self.assertDictEqual(result.extra["fit_models"], {"curve1": r"p_0 \exp(p_1 x + p_2) + p_3"}) - - # special entry formatted for database - result = results[1] - self.assertEqual(result.name, "parameter_name") - self.assertEqual(result.extra["unit"], "unit") - self.assertAlmostEqual(result.value.nominal_value, ref_p1, places=self.err_decimal) - - def test_run_single_curve_fail(self): - """Test analysis returns status when it fails.""" - analysis = create_new_analysis( - series=[ - SeriesDef( - name="curve1", - fit_func=lambda x, par0, par1, par2, par3: fit_function.exponential_decay( - x, amp=par0, lamb=par1, x0=par2, baseline=par3 - ), - ) - ], - ) - ref_p0 = 0.9 - ref_p1 = 2.5 - ref_p2 = 0.0 - ref_p3 = 0.1 - - test_data = simulate_output_data( - func=fit_function.exponential_decay, - xvals=self.xvalues, - param_dict={"amp": ref_p0, "lamb": ref_p1, "x0": ref_p2, "baseline": ref_p3}, - ) - analysis.set_options( - p0={"par0": ref_p0, "par1": ref_p1, "par2": ref_p2, "par3": ref_p3}, - bounds={"par0": [-10, 0], "par1": [-10, 0], "par2": [-10, 0], "par3": [-10, 0]}, - return_data_points=True, - ) - - # Try to fit with infeasible parameter boundary. This should fail. - results, _ = analysis._run_analysis(test_data) - - # This returns only data point entry - self.assertEqual(len(results), 1) - self.assertEqual(results[0].name, "@Data_TestAnalysis") - - def test_run_two_curves_with_same_fitfunc(self): - """Test analysis for two curves. Curves shares fit model.""" - analysis = create_new_analysis( - series=[ - SeriesDef( - name="curve1", - fit_func=lambda x, par0, par1, par2, par3, par4: fit_function.exponential_decay( - x, amp=par0, lamb=par1, x0=par3, baseline=par4 - ), - filter_kwargs={"exp": 0}, - ), - SeriesDef( - name="curve1", - fit_func=lambda x, par0, par1, par2, par3, par4: fit_function.exponential_decay( - x, amp=par0, lamb=par2, x0=par3, baseline=par4 - ), - filter_kwargs={"exp": 1}, - ), - ], - ) - ref_p0 = 0.9 - ref_p1 = 7.0 - ref_p2 = 5.0 - ref_p3 = 0.0 - ref_p4 = 0.1 - - test_data0 = simulate_output_data( - func=fit_function.exponential_decay, - xvals=self.xvalues, - param_dict={"amp": ref_p0, "lamb": ref_p1, "x0": ref_p3, "baseline": ref_p4}, - exp=0, - ) - - test_data1 = simulate_output_data( - func=fit_function.exponential_decay, - xvals=self.xvalues, - param_dict={"amp": ref_p0, "lamb": ref_p2, "x0": ref_p3, "baseline": ref_p4}, - exp=1, - ) - - # merge two experiment data - for datum in test_data1.data(): - test_data0.add_data(datum) - - analysis.set_options( - p0={"par0": ref_p0, "par1": ref_p1, "par2": ref_p2, "par3": ref_p3, "par4": ref_p4} - ) - results, _ = analysis._run_analysis(test_data0) - result = results[0] - - ref_popt = np.asarray([ref_p0, ref_p1, ref_p2, ref_p3, ref_p4]) - - # check result data - np.testing.assert_array_almost_equal(result.value, ref_popt, decimal=self.err_decimal) - - def test_run_two_curves_with_two_fitfuncs(self): - """Test analysis for two curves. Curves shares fit parameters.""" - analysis = create_new_analysis( - series=[ - SeriesDef( - name="curve1", - fit_func=lambda x, par0, par1, par2, par3: fit_function.cos( - x, amp=par0, freq=par1, phase=par2, baseline=par3 - ), - filter_kwargs={"exp": 0}, - ), - SeriesDef( - name="curve2", - fit_func=lambda x, par0, par1, par2, par3: fit_function.sin( - x, amp=par0, freq=par1, phase=par2, baseline=par3 - ), - filter_kwargs={"exp": 1}, - ), - ], - ) - ref_p0 = 0.1 - ref_p1 = 2 - ref_p2 = -0.3 - ref_p3 = 0.5 - - test_data0 = simulate_output_data( - func=fit_function.cos, - xvals=self.xvalues, - param_dict={"amp": ref_p0, "freq": ref_p1, "phase": ref_p2, "baseline": ref_p3}, - exp=0, - ) - - test_data1 = simulate_output_data( - func=fit_function.sin, - xvals=self.xvalues, - param_dict={"amp": ref_p0, "freq": ref_p1, "phase": ref_p2, "baseline": ref_p3}, - exp=1, - ) - - # merge two experiment data - for datum in test_data1.data(): - test_data0.add_data(datum) - - analysis.set_options(p0={"par0": ref_p0, "par1": ref_p1, "par2": ref_p2, "par3": ref_p3}) - results, _ = analysis._run_analysis(test_data0) - result = results[0] - - ref_popt = np.asarray([ref_p0, ref_p1, ref_p2, ref_p3]) - - # check result data - np.testing.assert_array_almost_equal(result.value, ref_popt, decimal=self.err_decimal) - - def test_run_fixed_parameters(self): - """Test analysis when some of parameters are fixed.""" - analysis = create_new_analysis( - series=[ - SeriesDef( - name="curve1", - fit_func=lambda x, par0, par1, fixed_par2, par3: fit_function.cos( - x, amp=par0, freq=par1, phase=fixed_par2, baseline=par3 - ), - ), - ], - fixed_params=["fixed_par2"], - ) - - ref_p0 = 0.1 - ref_p1 = 2 - ref_p2 = -0.3 - ref_p3 = 0.5 - - test_data = simulate_output_data( - func=fit_function.cos, - xvals=self.xvalues, - param_dict={"amp": ref_p0, "freq": ref_p1, "phase": ref_p2, "baseline": ref_p3}, - ) - - analysis.set_options( - p0={"par0": ref_p0, "par1": ref_p1, "par3": ref_p3}, - fixed_parameters={"fixed_par2": ref_p2}, - ) - - results, _ = analysis._run_analysis(test_data) - result = results[0] - - ref_popt = np.asarray([ref_p0, ref_p1, ref_p3]) - - # check result data - np.testing.assert_array_almost_equal(result.value, ref_popt, decimal=self.err_decimal) - - def test_fixed_param_is_missing(self): - """Test raising an analysis error when fixed parameter is missing.""" - analysis = create_new_analysis( - series=[ - SeriesDef( - name="curve1", - fit_func=lambda x, par0, par1, fixed_par2, par3: fit_function.cos( - x, amp=par0, freq=par1, phase=fixed_par2, baseline=par3 - ), - ), - ], - fixed_params=["fixed_p2"], - ) - - ref_p0 = 0.1 - ref_p1 = 2 - ref_p2 = -0.3 - ref_p3 = 0.5 - - test_data = simulate_output_data( - func=fit_function.cos, - xvals=self.xvalues, - param_dict={"amp": ref_p0, "freq": ref_p1, "phase": ref_p2, "baseline": ref_p3}, - ) - # do not define fixed_p2 here - analysis.set_options(p0={"par0": ref_p0, "par1": ref_p1, "par3": ref_p3}) - with self.assertRaises(AnalysisError): - analysis._run_analysis(test_data) - - -class TestFitOptions(QiskitExperimentsTestCase): - """Unittest for fit option object.""" - - def test_empty(self): - """Test if default value is automatically filled.""" - opt = FitOptions(["par0", "par1", "par2"]) - - # bounds should be default to inf tuple. otherwise crashes the scipy fitter. - ref_opts = { - "p0": {"par0": None, "par1": None, "par2": None}, - "bounds": { - "par0": (-np.inf, np.inf), - "par1": (-np.inf, np.inf), - "par2": (-np.inf, np.inf), - }, - } - - self.assertDictEqual(opt.options, ref_opts) - - def test_create_option_with_dict(self): - """Create option and fill with dictionary.""" - opt = FitOptions( - ["par0", "par1", "par2"], - default_p0={"par0": 0, "par1": 1, "par2": 2}, - default_bounds={"par0": (0, 1), "par1": (1, 2), "par2": (2, 3)}, - ) - - ref_opts = { - "p0": {"par0": 0.0, "par1": 1.0, "par2": 2.0}, - "bounds": {"par0": (0.0, 1.0), "par1": (1.0, 2.0), "par2": (2.0, 3.0)}, - } - - self.assertDictEqual(opt.options, ref_opts) - - def test_create_option_with_array(self): - """Create option and fill with array.""" - opt = FitOptions( - ["par0", "par1", "par2"], - default_p0=[0, 1, 2], - default_bounds=[(0, 1), (1, 2), (2, 3)], - ) - - ref_opts = { - "p0": {"par0": 0.0, "par1": 1.0, "par2": 2.0}, - "bounds": {"par0": (0.0, 1.0), "par1": (1.0, 2.0), "par2": (2.0, 3.0)}, - } - - self.assertDictEqual(opt.options, ref_opts) - - def test_override_partial_dict(self): - """Create option and override value with partial dictionary.""" - opt = FitOptions(["par0", "par1", "par2"]) - opt.p0.set_if_empty(par1=3) - - ref_opts = { - "p0": {"par0": None, "par1": 3.0, "par2": None}, - "bounds": { - "par0": (-np.inf, np.inf), - "par1": (-np.inf, np.inf), - "par2": (-np.inf, np.inf), - }, - } - - self.assertDictEqual(opt.options, ref_opts) - - def test_cannot_override_assigned_value(self): - """Test cannot override already assigned value.""" - opt = FitOptions(["par0", "par1", "par2"]) - opt.p0.set_if_empty(par1=3) - opt.p0.set_if_empty(par1=5) - - ref_opts = { - "p0": {"par0": None, "par1": 3.0, "par2": None}, - "bounds": { - "par0": (-np.inf, np.inf), - "par1": (-np.inf, np.inf), - "par2": (-np.inf, np.inf), - }, - } - - self.assertDictEqual(opt.options, ref_opts) - - def test_can_override_assigned_value_with_dict_access(self): - """Test override already assigned value with direct dict access.""" - opt = FitOptions(["par0", "par1", "par2"]) - opt.p0["par1"] = 3 - opt.p0["par1"] = 5 - - ref_opts = { - "p0": {"par0": None, "par1": 5.0, "par2": None}, - "bounds": { - "par0": (-np.inf, np.inf), - "par1": (-np.inf, np.inf), - "par2": (-np.inf, np.inf), - }, - } - - self.assertDictEqual(opt.options, ref_opts) - - def test_cannot_override_user_option(self): - """Test cannot override already assigned value.""" - opt = FitOptions(["par0", "par1", "par2"], default_p0={"par1": 3}) - opt.p0.set_if_empty(par1=5) - - ref_opts = { - "p0": {"par0": None, "par1": 3, "par2": None}, - "bounds": { - "par0": (-np.inf, np.inf), - "par1": (-np.inf, np.inf), - "par2": (-np.inf, np.inf), - }, - } - - self.assertDictEqual(opt.options, ref_opts) - - def test_set_operation(self): - """Test if set works and duplicated entry is removed.""" - opt1 = FitOptions(["par0", "par1"], default_p0=[0, 1]) - opt2 = FitOptions(["par0", "par1"], default_p0=[0, 1]) - opt3 = FitOptions(["par0", "par1"], default_p0=[0, 2]) - - opts = set() - opts.add(opt1) - opts.add(opt2) - opts.add(opt3) - - self.assertEqual(len(opts), 2) - - def test_detect_invalid_p0(self): - """Test if invalid p0 raises Error.""" - with self.assertRaises(AnalysisError): - # less element - FitOptions(["par0", "par1", "par2"], default_p0=[0, 1]) - - def test_detect_invalid_bounds(self): - """Test if invalid bounds raises Error.""" - with self.assertRaises(AnalysisError): - # less element - FitOptions(["par0", "par1", "par2"], default_bounds=[(0, 1), (1, 2)]) - - with self.assertRaises(AnalysisError): - # not min-max tuple - FitOptions(["par0", "par1", "par2"], default_bounds=[0, 1, 2]) - - with self.assertRaises(AnalysisError): - # max-min tuple - FitOptions(["par0", "par1", "par2"], default_bounds=[(1, 0), (2, 1), (3, 2)]) - - def test_detect_invalid_key(self): - """Test if invalid key raises Error.""" - opt = FitOptions(["par0", "par1", "par2"]) - - with self.assertRaises(AnalysisError): - opt.p0.set_if_empty(par3=3) - - def test_set_extra_options(self): - """Add extra fitter options.""" - opt = FitOptions( - ["par0", "par1", "par2"], default_p0=[0, 1, 2], default_bounds=[(0, 1), (1, 2), (2, 3)] - ) - opt.add_extra_options(ex1=0, ex2=1) - - ref_opts = { - "p0": {"par0": 0.0, "par1": 1.0, "par2": 2.0}, - "bounds": {"par0": (0.0, 1.0), "par1": (1.0, 2.0), "par2": (2.0, 3.0)}, - "ex1": 0, - "ex2": 1, - } - - self.assertDictEqual(opt.options, ref_opts) - - def test_complicated(self): - """Test for realistic operations for algorithmic guess with user options.""" - user_p0 = {"par0": 1, "par1": None} - user_bounds = {"par0": None, "par1": (-100, 100)} - - opt = FitOptions( - ["par0", "par1", "par2"], - default_p0=user_p0, - default_bounds=user_bounds, - ) - - # similar computation in algorithmic guess - - opt.p0.set_if_empty(par0=5) # this is ignored because user already provided initial guess - opt.p0.set_if_empty(par1=opt.p0["par0"] * 2 + 3) # user provided guess propagates - - opt.bounds.set_if_empty(par0=(0, 10)) # this will be set - opt.add_extra_options(fitter="algo1") - - opt1 = opt.copy() # copy options while keeping previous values - opt1.p0.set_if_empty(par2=opt1.p0["par0"] + opt1.p0["par1"]) - - opt2 = opt.copy() - opt2.p0.set_if_empty(par2=opt2.p0["par0"] * 2) # add another p2 value - - ref_opt1 = { - "p0": {"par0": 1.0, "par1": 5.0, "par2": 6.0}, - "bounds": {"par0": (0.0, 10.0), "par1": (-100.0, 100.0), "par2": (-np.inf, np.inf)}, - "fitter": "algo1", - } - - ref_opt2 = { - "p0": {"par0": 1.0, "par1": 5.0, "par2": 2.0}, - "bounds": {"par0": (0.0, 10.0), "par1": (-100.0, 100.0), "par2": (-np.inf, np.inf)}, - "fitter": "algo1", - } - - self.assertDictEqual(opt1.options, ref_opt1) - self.assertDictEqual(opt2.options, ref_opt2) - - -class TestBackwardCompatibility(QiskitExperimentsTestCase): - """Test case for backward compatibility.""" - - def test_old_fixed_param_attributes(self): - """Test if old class structure for fixed param is still supported.""" - - class _DeprecatedAnalysis(CurveAnalysis): - __series__ = [ - SeriesDef( - fit_func=lambda x, par0, par1, par2, par3: fit_function.exponential_decay( - x, amp=par0, lamb=par1, x0=par2, baseline=par3 - ), - ) - ] - - __fixed_parameters__ = ["par1"] - - @classmethod - def _default_options(cls): - opts = super()._default_options() - opts.par1 = 2 - - return opts - - with self.assertWarns(DeprecationWarning): - instance = _DeprecatedAnalysis() - - self.assertDictEqual(instance.options.fixed_parameters, {"par1": 2}) - - def test_loading_data_with_deprecated_fixed_param(self): - """Test loading old data with fixed parameters as standalone options.""" - - class _DeprecatedAnalysis(CurveAnalysis): - __series__ = [ - SeriesDef( - fit_func=lambda x, par0, par1, par2, par3: fit_function.exponential_decay( - x, amp=par0, lamb=par1, x0=par2, baseline=par3 - ), - ) - ] - - with self.assertWarns(DeprecationWarning): - # old option data structure, i.e. fixed param as a standalone option - # the analysis instance fixed parameters might be set via the experiment instance - instance = _DeprecatedAnalysis.from_config({"options": {"par1": 2}}) - - self.assertDictEqual(instance.options.fixed_parameters, {"par1": 2}) diff --git a/test/curve_analysis/test_fit_options.py b/test/curve_analysis/test_fit_options.py new file mode 100644 index 0000000000..2e54dbd44a --- /dev/null +++ b/test/curve_analysis/test_fit_options.py @@ -0,0 +1,231 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2021. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""Test for fit options.""" +from test.base import QiskitExperimentsTestCase + +import numpy as np + +from qiskit_experiments.curve_analysis.curve_data import FitOptions +from qiskit_experiments.exceptions import AnalysisError + + +class TestFitOptions(QiskitExperimentsTestCase): + """Unittest for fit option object.""" + + def test_empty(self): + """Test if default value is automatically filled.""" + opt = FitOptions(["par0", "par1", "par2"]) + + # bounds should be default to inf tuple. otherwise crashes the scipy fitter. + ref_opts = { + "p0": {"par0": None, "par1": None, "par2": None}, + "bounds": { + "par0": (-np.inf, np.inf), + "par1": (-np.inf, np.inf), + "par2": (-np.inf, np.inf), + }, + } + + self.assertDictEqual(opt.options, ref_opts) + + def test_create_option_with_dict(self): + """Create option and fill with dictionary.""" + opt = FitOptions( + ["par0", "par1", "par2"], + default_p0={"par0": 0, "par1": 1, "par2": 2}, + default_bounds={"par0": (0, 1), "par1": (1, 2), "par2": (2, 3)}, + ) + + ref_opts = { + "p0": {"par0": 0.0, "par1": 1.0, "par2": 2.0}, + "bounds": {"par0": (0.0, 1.0), "par1": (1.0, 2.0), "par2": (2.0, 3.0)}, + } + + self.assertDictEqual(opt.options, ref_opts) + + def test_create_option_with_array(self): + """Create option and fill with array.""" + opt = FitOptions( + ["par0", "par1", "par2"], + default_p0=[0, 1, 2], + default_bounds=[(0, 1), (1, 2), (2, 3)], + ) + + ref_opts = { + "p0": {"par0": 0.0, "par1": 1.0, "par2": 2.0}, + "bounds": {"par0": (0.0, 1.0), "par1": (1.0, 2.0), "par2": (2.0, 3.0)}, + } + + self.assertDictEqual(opt.options, ref_opts) + + def test_override_partial_dict(self): + """Create option and override value with partial dictionary.""" + opt = FitOptions(["par0", "par1", "par2"]) + opt.p0.set_if_empty(par1=3) + + ref_opts = { + "p0": {"par0": None, "par1": 3.0, "par2": None}, + "bounds": { + "par0": (-np.inf, np.inf), + "par1": (-np.inf, np.inf), + "par2": (-np.inf, np.inf), + }, + } + + self.assertDictEqual(opt.options, ref_opts) + + def test_cannot_override_assigned_value(self): + """Test cannot override already assigned value.""" + opt = FitOptions(["par0", "par1", "par2"]) + opt.p0.set_if_empty(par1=3) + opt.p0.set_if_empty(par1=5) + + ref_opts = { + "p0": {"par0": None, "par1": 3.0, "par2": None}, + "bounds": { + "par0": (-np.inf, np.inf), + "par1": (-np.inf, np.inf), + "par2": (-np.inf, np.inf), + }, + } + + self.assertDictEqual(opt.options, ref_opts) + + def test_can_override_assigned_value_with_dict_access(self): + """Test override already assigned value with direct dict access.""" + opt = FitOptions(["par0", "par1", "par2"]) + opt.p0["par1"] = 3 + opt.p0["par1"] = 5 + + ref_opts = { + "p0": {"par0": None, "par1": 5.0, "par2": None}, + "bounds": { + "par0": (-np.inf, np.inf), + "par1": (-np.inf, np.inf), + "par2": (-np.inf, np.inf), + }, + } + + self.assertDictEqual(opt.options, ref_opts) + + def test_cannot_override_user_option(self): + """Test cannot override already assigned value.""" + opt = FitOptions(["par0", "par1", "par2"], default_p0={"par1": 3}) + opt.p0.set_if_empty(par1=5) + + ref_opts = { + "p0": {"par0": None, "par1": 3, "par2": None}, + "bounds": { + "par0": (-np.inf, np.inf), + "par1": (-np.inf, np.inf), + "par2": (-np.inf, np.inf), + }, + } + + self.assertDictEqual(opt.options, ref_opts) + + def test_set_operation(self): + """Test if set works and duplicated entry is removed.""" + opt1 = FitOptions(["par0", "par1"], default_p0=[0, 1]) + opt2 = FitOptions(["par0", "par1"], default_p0=[0, 1]) + opt3 = FitOptions(["par0", "par1"], default_p0=[0, 2]) + + opts = set() + opts.add(opt1) + opts.add(opt2) + opts.add(opt3) + + self.assertEqual(len(opts), 2) + + def test_detect_invalid_p0(self): + """Test if invalid p0 raises Error.""" + with self.assertRaises(AnalysisError): + # less element + FitOptions(["par0", "par1", "par2"], default_p0=[0, 1]) + + def test_detect_invalid_bounds(self): + """Test if invalid bounds raises Error.""" + with self.assertRaises(AnalysisError): + # less element + FitOptions(["par0", "par1", "par2"], default_bounds=[(0, 1), (1, 2)]) + + with self.assertRaises(AnalysisError): + # not min-max tuple + FitOptions(["par0", "par1", "par2"], default_bounds=[0, 1, 2]) + + with self.assertRaises(AnalysisError): + # max-min tuple + FitOptions(["par0", "par1", "par2"], default_bounds=[(1, 0), (2, 1), (3, 2)]) + + def test_detect_invalid_key(self): + """Test if invalid key raises Error.""" + opt = FitOptions(["par0", "par1", "par2"]) + + with self.assertRaises(AnalysisError): + opt.p0.set_if_empty(par3=3) + + def test_set_extra_options(self): + """Add extra fitter options.""" + opt = FitOptions( + ["par0", "par1", "par2"], default_p0=[0, 1, 2], default_bounds=[(0, 1), (1, 2), (2, 3)] + ) + opt.add_extra_options(ex1=0, ex2=1) + + ref_opts = { + "p0": {"par0": 0.0, "par1": 1.0, "par2": 2.0}, + "bounds": {"par0": (0.0, 1.0), "par1": (1.0, 2.0), "par2": (2.0, 3.0)}, + "ex1": 0, + "ex2": 1, + } + + self.assertDictEqual(opt.options, ref_opts) + + def test_complicated(self): + """Test for realistic operations for algorithmic guess with user options.""" + user_p0 = {"par0": 1, "par1": None} + user_bounds = {"par0": None, "par1": (-100, 100)} + + opt = FitOptions( + ["par0", "par1", "par2"], + default_p0=user_p0, + default_bounds=user_bounds, + ) + + # similar computation in algorithmic guess + + opt.p0.set_if_empty(par0=5) # this is ignored because user already provided initial guess + opt.p0.set_if_empty(par1=opt.p0["par0"] * 2 + 3) # user provided guess propagates + + opt.bounds.set_if_empty(par0=(0, 10)) # this will be set + opt.add_extra_options(fitter="algo1") + + opt1 = opt.copy() # copy options while keeping previous values + opt1.p0.set_if_empty(par2=opt1.p0["par0"] + opt1.p0["par1"]) + + opt2 = opt.copy() + opt2.p0.set_if_empty(par2=opt2.p0["par0"] * 2) # add another p2 value + + ref_opt1 = { + "p0": {"par0": 1.0, "par1": 5.0, "par2": 6.0}, + "bounds": {"par0": (0.0, 10.0), "par1": (-100.0, 100.0), "par2": (-np.inf, np.inf)}, + "fitter": "algo1", + } + + ref_opt2 = { + "p0": {"par0": 1.0, "par1": 5.0, "par2": 2.0}, + "bounds": {"par0": (0.0, 10.0), "par1": (-100.0, 100.0), "par2": (-np.inf, np.inf)}, + "fitter": "algo1", + } + + self.assertDictEqual(opt1.options, ref_opt1) + self.assertDictEqual(opt2.options, ref_opt2)