diff --git a/CHANGELOG.md b/CHANGELOG.md index 9a0380187..f7e08058c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## Features +- [#364](https://github.com/pybop-team/PyBOP/pull/364) - Adds the MultiFittingProblem class and the multi_fitting example script. - [#444](https://github.com/pybop-team/PyBOP/issues/444) - Merge `BaseModel` `build()` and `rebuild()` functionality. - [#435](https://github.com/pybop-team/PyBOP/pull/435) - Adds SLF001 linting for private members. - [#418](https://github.com/pybop-team/PyBOP/issues/418) - Wraps the `get_parameter_info` method from PyBaMM to get a dictionary of parameter names and types. @@ -28,6 +29,7 @@ ## Bug Fixes + ## Breaking Changes # [v24.6](https://github.com/pybop-team/PyBOP/tree/v24.6) - 2024-07-08 diff --git a/examples/scripts/multi_fitting.py b/examples/scripts/multi_fitting.py new file mode 100644 index 000000000..186575cc5 --- /dev/null +++ b/examples/scripts/multi_fitting.py @@ -0,0 +1,79 @@ +import numpy as np + +import pybop + +# Parameter set and model definition +parameter_set = pybop.ParameterSet.pybamm("Chen2020") +model = pybop.lithium_ion.SPM(parameter_set=parameter_set) + +# Fitting parameters +parameters = pybop.Parameters( + pybop.Parameter( + "Negative electrode active material volume fraction", + prior=pybop.Gaussian(0.68, 0.05), + true_value=parameter_set["Negative electrode active material volume fraction"], + ), + pybop.Parameter( + "Positive electrode active material volume fraction", + prior=pybop.Gaussian(0.58, 0.05), + true_value=parameter_set["Positive electrode active material volume fraction"], + ), +) + +# Generate a dataset and a fitting problem +sigma = 0.001 +experiment = pybop.Experiment([("Discharge at 0.5C for 2 minutes (4 second period)")]) +values = model.predict(initial_state={"Initial SoC": 0.8}, experiment=experiment) +dataset_1 = pybop.Dataset( + { + "Time [s]": values["Time [s]"].data, + "Current function [A]": values["Current [A]"].data, + "Voltage [V]": values["Voltage [V]"].data + + np.random.normal(0, sigma, len(values["Voltage [V]"].data)), + } +) +problem_1 = pybop.FittingProblem(model, parameters, dataset_1) + +# Generate a second dataset and problem +model = model.new_copy() +experiment = pybop.Experiment([("Discharge at 1C for 1 minutes (4 second period)")]) +values = model.predict(initial_state={"Initial SoC": 0.8}, experiment=experiment) +dataset_2 = pybop.Dataset( + { + "Time [s]": values["Time [s]"].data, + "Current function [A]": values["Current [A]"].data, + "Voltage [V]": values["Voltage [V]"].data + + np.random.normal(0, sigma, len(values["Voltage [V]"].data)), + } +) +problem_2 = pybop.FittingProblem(model, parameters, dataset_2) + +# Combine the problems into one +problem = pybop.MultiFittingProblem(problem_1, problem_2) + +# Generate the cost function and optimisation class +cost = pybop.SumSquaredError(problem) +optim = pybop.IRPropMin( + cost, + verbose=True, + max_iterations=100, +) + +# Run optimisation +x, final_cost = optim.run() +print("True parameters:", parameters.true_value()) +print("Estimated parameters:", x) + +# Plot the timeseries output +pybop.quick_plot(problem_1, problem_inputs=x, title="Optimised Comparison") +pybop.quick_plot(problem_2, problem_inputs=x, title="Optimised Comparison") + +# Plot convergence +pybop.plot_convergence(optim) + +# Plot the parameter traces +pybop.plot_parameters(optim) + +# Plot the cost landscape with optimisation path +bounds = np.array([[0.5, 0.8], [0.4, 0.7]]) +pybop.plot2d(optim, bounds=bounds, steps=15) diff --git a/pybop/__init__.py b/pybop/__init__.py index ff8c08a81..49c7eb83e 100644 --- a/pybop/__init__.py +++ b/pybop/__init__.py @@ -87,6 +87,7 @@ # from .problems.base_problem import BaseProblem from .problems.fitting_problem import FittingProblem +from .problems.multi_fitting_problem import MultiFittingProblem from .problems.design_problem import DesignProblem # diff --git a/pybop/costs/_weighted_cost.py b/pybop/costs/_weighted_cost.py index d681fa994..0d788218f 100644 --- a/pybop/costs/_weighted_cost.py +++ b/pybop/costs/_weighted_cost.py @@ -16,14 +16,14 @@ class WeightedCost(BaseCost): Attributes --------------------- - costs : list[pybop.BaseCost] - A list of PyBOP cost objects. + costs : pybop.BaseCost + The individual PyBOP cost objects. weights : list[float] A list of values with which to weight the cost values. - _has_identical_problems : bool + has_identical_problems : bool If True, the shared problem will be evaluated once and saved before the self.compute() method of each cost is called (default: False). - _has_separable_problem: bool + has_separable_problem: bool This attribute must be set to False for WeightedCost objects. If the corresponding attribute of an individual cost is True, the problem is separable from the cost function and will be evaluated before the diff --git a/pybop/models/base_model.py b/pybop/models/base_model.py index 75488d730..932587cf5 100644 --- a/pybop/models/base_model.py +++ b/pybop/models/base_model.py @@ -302,7 +302,8 @@ def classify_parameters( Parameters ---------- parameters : Parameters, optional - The optimisation parameters. Defaults to None, resulting in the internal `pybop.Parameters` object to be used. + The optimisation parameters. Defaults to None, resulting in the internal + `pybop.Parameters` object to be used. inputs : Inputs, optional The input parameters for the simulation (default: None). """ @@ -337,6 +338,7 @@ def classify_parameters( if requires_rebuild: self.clear() self._geometry = self.pybamm_model.default_geometry + # Update both the active and unprocessed parameter sets for consistency self._parameter_set.update(rebuild_parameters) self._unprocessed_parameter_set.update(rebuild_parameters) @@ -530,7 +532,7 @@ def predict( if PyBaMM models are not supported by the current simulation method. """ - if self._unprocessed_model is None: + if self.pybamm_model is None: raise ValueError( "The predict method currently only supports PyBaMM models." ) @@ -644,6 +646,32 @@ def copy(self): """ return copy.copy(self) + def new_copy(self): + """ + Return a new copy of the model, explicitly copying all the mutable attributes + to avoid issues with shared objects. + + Returns + ------- + BaseModel + A new copy of the model. + """ + model_class = type(self) + if self.pybamm_model is None: + model_args = {"parameter_set": self.parameter_set.copy()} + else: + model_args = { + "options": self._unprocessed_model.options, + "parameter_set": self._unprocessed_parameter_set.copy(), + "geometry": self.pybamm_model.default_geometry.copy(), + "submesh_types": self.pybamm_model.default_submesh_types.copy(), + "var_pts": self.pybamm_model.default_var_pts.copy(), + "spatial_methods": self.pybamm_model.default_spatial_methods.copy(), + "solver": self.pybamm_model.default_solver.copy(), + } + + return model_class(**model_args) + def get_parameter_info(self, print_info: bool = False): """ Extracts the parameter names and types and returns them as a dictionary. diff --git a/pybop/optimisers/base_optimiser.py b/pybop/optimisers/base_optimiser.py index e6758db1a..a3de5b662 100644 --- a/pybop/optimisers/base_optimiser.py +++ b/pybop/optimisers/base_optimiser.py @@ -244,7 +244,11 @@ def set_allow_infeasible_solutions(self, allow=True): self.physical_viability = allow self.allow_infeasible_solutions = allow - if hasattr(self.cost, "problem") and hasattr(self.cost.problem, "_model"): + if ( + hasattr(self.cost, "problem") + and hasattr(self.cost.problem, "model") + and self.cost.problem.model is not None + ): self.cost.problem.model.allow_infeasible_solutions = ( self.allow_infeasible_solutions ) diff --git a/pybop/problems/base_problem.py b/pybop/problems/base_problem.py index 216ce092f..72fc4c197 100644 --- a/pybop/problems/base_problem.py +++ b/pybop/problems/base_problem.py @@ -63,12 +63,23 @@ def __init__( self.check_model = check_model self.signal = signal or ["Voltage [V]"] self.additional_variables = additional_variables or [] - self.initial_state = initial_state + self.set_initial_state(initial_state) self._dataset = None self._time_data = None self._target = None self.verbose = False + def set_initial_state(self, initial_state: Optional[dict] = None): + """ + Set the initial state to be applied to evaluations of the problem. + + Parameters + ---------- + initial_state : dict, optional + A valid initial state (default: None). + """ + self.initial_state = initial_state + @property def n_parameters(self): return len(self.parameters) @@ -156,3 +167,7 @@ def time_data(self): @time_data.setter def time_data(self, time_data): self._time_data = time_data + + @property + def dataset(self): + return self._dataset diff --git a/pybop/problems/design_problem.py b/pybop/problems/design_problem.py index 8dd4ca065..57c3597c1 100644 --- a/pybop/problems/design_problem.py +++ b/pybop/problems/design_problem.py @@ -47,22 +47,6 @@ def __init__( additional_variables.extend(["Time [s]", "Current [A]"]) additional_variables = list(set(additional_variables)) - if initial_state is None: - if isinstance(model, ECircuitModel): - initial_state = {"Initial SoC": model.parameter_set["Initial SoC"]} - else: - initial_state = {"Initial SoC": 1.0} # default value - elif "Initial open-circuit voltage [V]" in initial_state.keys(): - warnings.warn( - "It is usually better to define an initial state of charge as the " - "initial_state for a DesignProblem because this state will scale with " - "design properties such as the capacity of the battery, as opposed to the " - "initial open-circuit voltage which may correspond to a different state " - "of charge for each design.", - UserWarning, - stacklevel=1, - ) - super().__init__( parameters, model, check_model, signal, additional_variables, initial_state ) @@ -78,6 +62,33 @@ def __init__( "Non-physical point encountered", ] + def set_initial_state(self, initial_state): + """ + Set the initial state to be applied to evaluations of the problem. + + Parameters + ---------- + initial_state : dict, optional + A valid initial state (default: None). + """ + if initial_state is None: + if isinstance(self.model, ECircuitModel): + initial_state = {"Initial SoC": self.model.parameter_set["Initial SoC"]} + else: + initial_state = {"Initial SoC": 1.0} # default value + elif "Initial open-circuit voltage [V]" in initial_state.keys(): + warnings.warn( + "It is usually better to define an initial state of charge as the " + "initial_state for a DesignProblem because this state will scale with " + "design properties such as the capacity of the battery, as opposed to the " + "initial open-circuit voltage which may correspond to a different state " + "of charge for each design.", + UserWarning, + stacklevel=1, + ) + + self.initial_state = initial_state + def evaluate(self, inputs: Inputs, update_capacity=False): """ Evaluate the model with the given parameters and return the signal. diff --git a/pybop/problems/fitting_problem.py b/pybop/problems/fitting_problem.py index f3bb10232..50ca2cf61 100644 --- a/pybop/problems/fitting_problem.py +++ b/pybop/problems/fitting_problem.py @@ -21,12 +21,25 @@ class FittingProblem(BaseProblem): An object or list of the parameters for the problem. dataset : Dataset Dataset object containing the data to fit the model to. + check_model : bool, optional + Flag to indicate if the model should be checked (default: True). signal : str, optional The variable used for fitting (default: "Voltage [V]"). additional_variables : list[str], optional Additional variables to observe and store in the solution (default additions are: ["Time [s]"]). initial_state : dict, optional A valid initial state, e.g. the initial open-circuit voltage (default: None). + + Additional Attributes + --------------------- + dataset : dictionary + The dictionary from a Dataset object containing the signal keys and values to fit the model to. + time_data : np.ndarray + The time points in the dataset. + n_time_data : int + The number of time points. + target : np.ndarray + The target values of the signals. """ def __init__( @@ -44,19 +57,6 @@ def __init__( additional_variables.extend(["Time [s]"]) additional_variables = list(set(additional_variables)) - if initial_state is not None and "Initial SoC" in initial_state.keys(): - warnings.warn( - "It is usually better to define an initial open-circuit voltage as the " - "initial_state for a FittingProblem because this value can typically be " - "obtained from the data, unlike the intrinsic initial state of charge. " - "In the case where the fitting parameters do not change the OCV-SOC " - "relationship, the initial state of charge may be passed to the model " - 'using, for example, `model.set_initial_state({"Initial SoC": 1.0})` ' - "before constructing the FittingProblem.", - UserWarning, - stacklevel=1, - ) - super().__init__( parameters, model, check_model, signal, additional_variables, initial_state ) @@ -82,6 +82,30 @@ def __init__( initial_state=self.initial_state, ) + def set_initial_state(self, initial_state: Optional[dict] = None): + """ + Set the initial state to be applied to evaluations of the problem. + + Parameters + ---------- + initial_state : dict, optional + A valid initial state (default: None). + """ + if initial_state is not None and "Initial SoC" in initial_state.keys(): + warnings.warn( + "It is usually better to define an initial open-circuit voltage as the " + "initial_state for a FittingProblem because this value can typically be " + "obtained from the data, unlike the intrinsic initial state of charge. " + "In the case where the fitting parameters do not change the OCV-SOC " + "relationship, the initial state of charge may be passed to the model " + 'using, for example, `model.set_initial_state({"Initial SoC": 1.0})` ' + "before constructing the FittingProblem.", + UserWarning, + stacklevel=1, + ) + + self.initial_state = initial_state + def evaluate( self, inputs: Inputs, update_capacity=False ) -> dict[str, np.ndarray[np.float64]]: @@ -130,6 +154,7 @@ def evaluateS1(self, inputs: Inputs): dy/dx(t) evaluated with given inputs. """ inputs = self.parameters.verify(inputs) + self.parameters.update(values=list(inputs.values())) try: sol = self._model.simulateS1( diff --git a/pybop/problems/multi_fitting_problem.py b/pybop/problems/multi_fitting_problem.py new file mode 100644 index 000000000..263fc92dc --- /dev/null +++ b/pybop/problems/multi_fitting_problem.py @@ -0,0 +1,140 @@ +from typing import Optional + +import numpy as np + +from pybop import BaseProblem, Dataset +from pybop.parameters.parameter import Inputs, Parameters + + +class MultiFittingProblem(BaseProblem): + """ + Problem class for joining mulitple fitting problems into one combined fitting problem. + + Extends `BaseProblem` in a similar way to FittingProblem but for multiple parameter + estimation problems, which must first be defined individually. + + Additional Attributes + --------------------- + problems : pybop.FittingProblem + The individual PyBOP fitting problems. + """ + + def __init__(self, *args): + self.problems = [] + models_to_check = [] + for problem in args: + self.problems.append(problem) + if problem.model is not None: + models_to_check.append(problem.model) + + # Check that there are no copies of the same model + if len(set(models_to_check)) < len(models_to_check): + raise ValueError("Make a new_copy of the model for each problem.") + + # Compile the set of parameters, ignoring duplicates + combined_parameters = Parameters() + for problem in self.problems: + combined_parameters.join(problem.parameters) + + # Combine the target datasets + combined_time_data = [] + combined_signal = [] + for problem in self.problems: + for signal in problem.signal: + combined_time_data.extend(problem.time_data) + combined_signal.extend(problem.target[signal]) + combined_dataset = Dataset( + { + "Time [s]": np.asarray(combined_time_data), + "Combined signal": np.asarray(combined_signal), + } + ) + + super().__init__( + parameters=combined_parameters, + model=None, + signal=["Combined signal"], + ) + self._dataset = combined_dataset.data + self.parameters.initial_value() + + # Unpack time and target data + self._time_data = self._dataset["Time [s]"] + self.n_time_data = len(self._time_data) + self.set_target(combined_dataset) + + def set_initial_state(self, initial_state: Optional[dict] = None): + """ + Set the initial state to be applied to evaluations of the problem. + + Parameters + ---------- + initial_state : dict, optional + A valid initial state (default: None). + """ + for problem in self.problems: + problem.set_initial_state(initial_state) + + def evaluate(self, inputs: Inputs, **kwargs): + """ + Evaluate the model with the given parameters and return the signal. + + Parameters + ---------- + inputs : Inputs + Parameters for evaluation of the model. + + Returns + ------- + y : np.ndarray + The model output y(t) simulated with given inputs. + """ + inputs = self.parameters.verify(inputs) + self.parameters.update(values=list(inputs.values())) + + combined_signal = [] + + for problem in self.problems: + problem_inputs = problem.parameters.as_dict() + signal_values = problem.evaluate(problem_inputs, **kwargs) + + # Collect signals + for signal in problem.signal: + combined_signal.extend(signal_values[signal]) + + return {"Combined signal": np.asarray(combined_signal)} + + def evaluateS1(self, inputs: Inputs): + """ + Evaluate the model with the given parameters and return the signal and its derivatives. + + Parameters + ---------- + inputs : Inputs + Parameters for evaluation of the model. + + Returns + ------- + tuple[dict, np.ndarray] + A tuple containing the simulation result y(t) as a dictionary and the sensitivities + dy/dx(t) evaluated with given inputs. + """ + inputs = self.parameters.verify(inputs) + self.parameters.update(values=list(inputs.values())) + + combined_signal = [] + all_derivatives = [] + + for problem in self.problems: + problem_inputs = problem.parameters.as_dict() + signal_values, dyi = problem.evaluateS1(problem_inputs) + + # Collect signals and derivatives + for signal in problem.signal: + combined_signal.extend(signal_values[signal]) + all_derivatives.append(dyi) + + y = {"Combined signal": np.asarray(combined_signal)} + dy = np.concatenate(all_derivatives) if all_derivatives else None + + return (y, dy) diff --git a/tests/integration/test_model_experiment_changes.py b/tests/integration/test_model_experiment_changes.py index 157fbf4c5..fec94452b 100644 --- a/tests/integration/test_model_experiment_changes.py +++ b/tests/integration/test_model_experiment_changes.py @@ -107,3 +107,53 @@ def final_cost(self, solution, model, parameters): optim = pybop.PSO(cost) x, final_cost = optim.run() return final_cost + + @pytest.mark.integration + def test_multi_fitting_problem(self): + parameter_set = pybop.ParameterSet.pybamm("Chen2020") + parameters = pybop.Parameter( + "Negative electrode active material volume fraction", + prior=pybop.Gaussian(0.68, 0.05), + true_value=parameter_set[ + "Negative electrode active material volume fraction" + ], + ) + + model_1 = pybop.lithium_ion.SPM(parameter_set=parameter_set) + experiment_1 = pybop.Experiment( + ["Discharge at 1C until 3 V (4 seconds period)"] + ) + solution_1 = model_1.predict(experiment=experiment_1) + dataset_1 = pybop.Dataset( + { + "Time [s]": solution_1["Time [s]"].data, + "Current function [A]": solution_1["Current [A]"].data, + "Voltage [V]": solution_1["Voltage [V]"].data, + } + ) + + model_2 = pybop.lithium_ion.SPMe(parameter_set=parameter_set.copy()) + experiment_2 = pybop.Experiment( + ["Discharge at 3C until 3 V (4 seconds period)"] + ) + solution_2 = model_2.predict(experiment=experiment_2) + dataset_2 = pybop.Dataset( + { + "Time [s]": solution_2["Time [s]"].data, + "Current function [A]": solution_2["Current [A]"].data, + "Voltage [V]": solution_2["Voltage [V]"].data, + } + ) + + # Define a problem for each dataset and combine them into one + problem_1 = pybop.FittingProblem(model_1, parameters, dataset_1) + problem_2 = pybop.FittingProblem(model_2, parameters, dataset_2) + problem = pybop.MultiFittingProblem(problem_1, problem_2) + cost = pybop.RootMeanSquaredError(problem) + + # Test with a gradient and non-gradient-based optimiser + for optimiser in [pybop.SNES, pybop.IRPropMin]: + optim = optimiser(cost) + x, final_cost = optim.run() + np.testing.assert_allclose(x, parameters.true_value, atol=2e-5) + np.testing.assert_allclose(final_cost, 0, atol=2e-5) diff --git a/tests/unit/test_cost.py b/tests/unit/test_cost.py index c4061df17..fcab4ac94 100644 --- a/tests/unit/test_cost.py +++ b/tests/unit/test_cost.py @@ -64,7 +64,6 @@ def problem(self, model, parameters, dataset, signal, request): cut_off = request.param model.parameter_set.update({"Lower voltage cut-off [V]": cut_off}) problem = pybop.FittingProblem(model, parameters, dataset, signal=signal) - problem.dataset = dataset # add this to pass the pybop dataset to cost return problem @pytest.fixture( @@ -97,7 +96,7 @@ def cost(self, problem, request): process_diag[1] = 1e-4 sigma0 = np.diag(sigma_diag) process = np.diag(process_diag) - dataset = problem.dataset + dataset = pybop.Dataset(data_dictionary=problem.dataset) return cls( pybop.UnscentedKalmanFilterObserver( problem.parameters, @@ -159,8 +158,9 @@ def test_MAP(self, problem): "Negative electrode active material volume fraction", prior=pybop.Uniform(0.55, 0.6), ) + dataset = pybop.Dataset(data_dictionary=problem.dataset) problem_non_finite = pybop.FittingProblem( - problem.model, parameter, problem.dataset, signal=problem.signal + problem.model, parameter, dataset, signal=problem.signal ) likelihood = pybop.MAP( problem_non_finite, pybop.GaussianLogLikelihoodKnownSigma, sigma0=0.01 diff --git a/tests/unit/test_models.py b/tests/unit/test_models.py index f0723ee26..9fe434cfc 100644 --- a/tests/unit/test_models.py +++ b/tests/unit/test_models.py @@ -74,7 +74,7 @@ def test_non_default_solver(self): @pytest.mark.unit def test_predict_without_pybamm(self, model): - model._unprocessed_model = None + model.pybamm_model = None with pytest.raises( ValueError, diff --git a/tests/unit/test_problem.py b/tests/unit/test_problem.py index 3eade064c..b7fded8d6 100644 --- a/tests/unit/test_problem.py +++ b/tests/unit/test_problem.py @@ -182,6 +182,47 @@ def test_fitting_problem(self, parameters, dataset, model, signal): with pytest.raises(ValueError): pybop.FittingProblem(model, parameters, bad_dataset, signal=two_signals) + @pytest.mark.unit + def test_multi_fitting_problem(self, model, parameters, dataset, signal): + problem_1 = pybop.FittingProblem(model, parameters, dataset, signal=signal) + + with pytest.raises( + ValueError, match="Make a new_copy of the model for each problem." + ): + pybop.MultiFittingProblem(problem_1, problem_1) + + # Generate a second fitting problem + model = model.new_copy() + experiment = pybop.Experiment( + ["Discharge at 1C for 5 minutes (1 second period)"] + ) + values = model.predict( + initial_state={"Initial SoC": 0.8}, experiment=experiment + ) + dataset_2 = pybop.Dataset( + { + "Time [s]": values["Time [s]"].data, + "Current function [A]": values["Current [A]"].data, + "Voltage [V]": values["Voltage [V]"].data, + } + ) + problem_2 = pybop.FittingProblem(model, parameters, dataset_2, signal=signal) + combined_problem = pybop.MultiFittingProblem(problem_1, problem_2) + + assert combined_problem._model is None + + assert len(combined_problem._dataset["Time [s]"]) == len( + problem_1._dataset["Time [s]"] + ) + len(problem_2._dataset["Time [s]"]) + assert len(combined_problem._dataset["Combined signal"]) == len( + problem_1._dataset[signal] + ) + len(problem_2._dataset[signal]) + + y = combined_problem.evaluate(inputs=[1e-5, 1e-5]) + assert len(y["Combined signal"]) == len( + combined_problem._dataset["Combined signal"] + ) + @pytest.mark.unit def test_design_problem(self, parameters, experiment, model): with pytest.warns(UserWarning) as record: