Skip to content

Commit

Permalink
I12b unscented kalman edits (#180)
Browse files Browse the repository at this point in the history
* Move cost from call to evaluate

* Rename and add n_states

* Rename and update UkfFilter to SquareRootUKF

Update the paper reference, the value of kappa (from 1 to 0), the allowed input values to observe, and the SquareRootUKF functions: init, filtered_cholupdate and cholupdate to cope with downdates as well as updates.

* Add example and update tests

* Rename exponential_decay as standalone/model

* Make observers a type of problem

- update model parameters to parameter_set
- add n_outputs to BaseCost
- remove observer from FittingProblem
- change solver to _solver
- add evaluate and problem inputs to Observer
- except all Exception from call to solver
- add dataset to UnscentedKalman
- update tests to match

* Update to cope with subset of states

* Add spm_UKF example

* Update bool_mask

* Update state indices
  • Loading branch information
NicolaCourtier authored Feb 7, 2024
1 parent 68bf5fa commit 6d0d460
Show file tree
Hide file tree
Showing 12 changed files with 518 additions and 155 deletions.
116 changes: 116 additions & 0 deletions examples/scripts/exp_UKF.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import pybop
import pybamm
import numpy as np
from examples.standalone.model import ExponentialDecay

# Parameter set and model definition
parameter_set = pybamm.ParameterValues({"k": "[input]", "y0": "[input]"})
model = ExponentialDecay(parameter_set=parameter_set, n_states=1)
x0 = np.array([0.1, 1.0])

# Fitting parameters
parameters = [
pybop.Parameter(
"k",
prior=pybop.Gaussian(0.1, 0.05),
bounds=[0, 1],
),
pybop.Parameter(
"y0",
prior=pybop.Gaussian(1, 0.05),
bounds=[0, 3],
),
]

# Verification: save fixed inputs for testing
inputs = dict()
for i, param in enumerate(parameters):
inputs[param.name] = x0[i]

# Make a prediction with measurement noise
sigma = 1e-2
t_eval = np.linspace(0, 20, 10)
values = model.predict(t_eval=t_eval, inputs=inputs)
values = values["2y"].data
corrupt_values = values + np.random.normal(0, sigma, len(t_eval))

# Verification step: compute the analytical solution for 2y
expected_values = 2 * inputs["y0"] * np.exp(-inputs["k"] * t_eval)

# Verification step: make another prediction using the Observer class
model.build(parameters=parameters)
simulator = pybop.Observer(parameters, model, signal=["2y"], x0=x0)
simulator._time_data = t_eval
measurements = simulator.evaluate(x0)
measurements = measurements[:, 0]

# Verification step: Compare by plotting
go = pybop.PlotlyManager().go
line1 = go.Scatter(x=t_eval, y=corrupt_values, name="Corrupt values", mode="markers")
line2 = go.Scatter(
x=t_eval, y=expected_values, name="Expected trajectory", mode="lines"
)
line3 = go.Scatter(x=t_eval, y=measurements, name="Observed values", mode="markers")
fig = go.Figure(data=[line1, line2, line3])

# Form dataset
dataset = pybop.Dataset(
{
"Time [s]": t_eval,
"Current function [A]": 0 * t_eval, # placeholder
"2y": corrupt_values,
}
)

# Build the model to get the number of states
model.build(dataset=dataset.data, parameters=parameters)

# Define the UKF observer
signal = ["2y"]
n_states = model.n_states
n_signals = len(signal)
covariance = np.diag([sigma**2] * n_states)
process_noise = np.diag([1e-6] * n_states)
measurement_noise = np.diag([sigma**2] * n_signals)
observer = pybop.UnscentedKalmanFilterObserver(
parameters,
model,
covariance,
process_noise,
measurement_noise,
dataset,
signal=signal,
x0=x0,
)

# Verification step: Find the maximum likelihood estimate given the true parameters
estimation = observer.evaluate(x0)
estimation = estimation[:, 0]

# Verification step: Add the estimate to the plot
line4 = go.Scatter(x=t_eval, y=estimation, name="Estimated trajectory", mode="lines")
fig.add_trace(line4)
fig.show()

# Generate problem, cost function, and optimisation class
cost = pybop.ObserverCost(observer)
optim = pybop.Optimisation(cost, optimiser=pybop.CMAES, verbose=True)

# Run optimisation
x, final_cost = optim.run()
print("Estimated parameters:", x)

# Plot the timeseries output (requires model that returns Voltage)
pybop.quick_plot(x, cost, title="Optimised Comparison")

# Plot convergence
pybop.plot_convergence(optim)

# Plot the parameter traces
pybop.plot_parameters(optim)

# Plot the cost landscape
pybop.plot_cost2d(cost, steps=15)

# Plot the cost landscape with optimisation path
pybop.plot_cost2d(cost, optim=optim, steps=15)
82 changes: 82 additions & 0 deletions examples/scripts/spm_UKF.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import pybop
import numpy as np

# Parameter set and model definition
parameter_set = pybop.ParameterSet.pybamm("Chen2020")
model = pybop.lithium_ion.SPM(parameter_set=parameter_set)

# Fitting parameters
parameters = [
pybop.Parameter(
"Negative electrode active material volume fraction",
prior=pybop.Gaussian(0.6, 0.05),
bounds=[0.5, 0.8],
),
pybop.Parameter(
"Positive electrode active material volume fraction",
prior=pybop.Gaussian(0.48, 0.05),
bounds=[0.4, 0.7],
),
]

# Make a prediction with measurement noise
sigma = 0.001
t_eval = np.arange(0, 300, 2)
values = model.predict(t_eval=t_eval)
corrupt_values = values["Voltage [V]"].data + np.random.normal(0, sigma, len(t_eval))

# Form dataset
dataset = pybop.Dataset(
{
"Time [s]": t_eval,
"Current function [A]": values["Current [A]"].data,
"Voltage [V]": corrupt_values,
}
)

# Build the model to get the number of states
model.build(dataset=dataset.data, parameters=parameters)

# Define the UKF observer, setting the particle boundaries as uncertain states
signal = ["Voltage [V]"]
n_states = model.n_states
n_signals = len(signal)
covariance = np.diag([0] * 19 + [sigma**2] + [0] * 19 + [sigma**2])
process_noise = np.diag([0] * 19 + [1e-6] + [0] * 19 + [1e-6])
measurement_noise = np.diag([sigma**2])
observer = pybop.UnscentedKalmanFilterObserver(
parameters,
model,
covariance,
process_noise,
measurement_noise,
dataset,
signal=signal,
)

# Generate problem, cost function, and optimisation class
cost = pybop.ObserverCost(observer)
optim = pybop.Optimisation(cost, optimiser=pybop.PSO, verbose=True)

# Parameter identification using the current observer implementation is very slow
# so let's restrict the number of iterations and reduce the number of plots
optim.set_max_iterations(5)

# Run optimisation
x, final_cost = optim.run()
print("Estimated parameters:", x)

# Plot the timeseries output (requires model that returns Voltage)
pybop.quick_plot(x, cost, title="Optimised Comparison")

# # Plot convergence
# pybop.plot_convergence(optim)

# # Plot the parameter traces
# pybop.plot_parameters(optim)

# # Plot the cost landscape
# pybop.plot_cost2d(cost, steps=5)

# # Plot the cost landscape with optimisation path
# pybop.plot_cost2d(cost, optim=optim, steps=5)
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ class ExponentialDecay(BaseModel):
def __init__(
self,
name: str = "Constant Model",
parameters: pybamm.ParameterValues = None,
nstate: int = 1,
parameter_set: pybamm.ParameterValues = None,
n_states: int = 1,
):
super().__init__()
self.nstate = nstate
if nstate < 1:
raise ValueError("nstate must be greater than 0")
self.n_states = n_states
if n_states < 1:
raise ValueError("The number of states (n_states) must be greater than 0")

Check warning on line 24 in examples/standalone/model.py

View check run for this annotation

Codecov / codecov/patch

examples/standalone/model.py#L24

Added line #L24 was not covered by tests
self.pybamm_model = pybamm.BaseModel()
ys = [pybamm.Variable(f"y_{i}") for i in range(nstate)]
ys = [pybamm.Variable(f"y_{i}") for i in range(n_states)]
k = pybamm.Parameter("k")
y0 = pybamm.Parameter("y0")
self.pybamm_model.rhs = {y: -k * y for y in ys}
Expand All @@ -41,7 +41,7 @@ def __init__(
self.name = name

self.default_parameter_values = (
default_parameter_values if parameters is None else parameters
default_parameter_values if parameter_set is None else parameter_set
)
self._parameter_set = self.default_parameter_values
self._unprocessed_parameter_set = self._parameter_set
Expand Down
30 changes: 14 additions & 16 deletions pybop/_costs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import numpy as np

from pybop.observers.observer import Observer
from pybop._problem import FittingProblem


class BaseCost:
Expand All @@ -26,6 +25,8 @@ class BaseCost:
The bounds for the model parameters.
n_parameters : int
The number of parameters in the model.
n_outputs : int
The number of outputs in the model.
"""

def __init__(self, problem):
Expand All @@ -35,6 +36,7 @@ def __init__(self, problem):
self.x0 = problem.x0
self.bounds = problem.bounds
self.n_parameters = problem.n_parameters
self.n_outputs = problem.n_outputs

def __call__(self, x, grad=None):
"""
Expand Down Expand Up @@ -255,13 +257,13 @@ def _evaluateS1(self, x):
y, dy = self.problem.evaluateS1(x)
if len(y) < len(self._target):
e = np.float64(np.inf)
de = self._de * np.ones(self.problem.n_parameters)
de = self._de * np.ones(self.n_parameters)
else:
dy = dy.reshape(
(
self.problem.n_time_data,
self.problem.n_outputs,
self.problem.n_parameters,
self.n_outputs,
self.n_parameters,
)
)
r = y - self._target
Expand Down Expand Up @@ -297,11 +299,11 @@ class ObserverCost(BaseCost):
"""

def __init__(self, problem: FittingProblem, observer: Observer):
super(ObserverCost, self).__init__(problem)
def __init__(self, observer: Observer):
super().__init__(problem=observer)
self._observer = observer

def __call__(self, x, grad=None):
def _evaluate(self, x, grad=None):
"""
Calculate the observer cost for a given set of parameters.
Expand All @@ -317,16 +319,12 @@ def __call__(self, x, grad=None):
-------
float
The observer cost (negative of the log likelihood).
"""
try:
inputs = {key: x[i] for i, key in enumerate(self._observer._model.fit_keys)}
log_likelihood = self._observer.log_likelihood(
self.problem.target(), self.problem.time_data(), inputs
)
return -log_likelihood
except Exception as e:
raise ValueError(f"Error in cost calculation: {e}")
inputs = {key: x[i] for i, key in enumerate(self._observer._model.fit_keys)}
log_likelihood = self._observer.log_likelihood(
self._target, self._observer.time_data(), inputs
)
return -log_likelihood

def evaluateS1(self, x):
"""
Expand Down
12 changes: 5 additions & 7 deletions pybop/_problem.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
from typing import Optional
import numpy as np

from pybop.observers.observer import Observer


class BaseProblem:
"""
Expand All @@ -16,6 +13,8 @@ class BaseProblem:
The model to be used for the problem (default: None).
check_model : bool, optional
Flag to indicate if the model should be checked (default: True).
signal: List[str]
The signal to observe.
init_soc : float, optional
Initial state of charge (default: None).
x0 : np.ndarray, optional
Expand Down Expand Up @@ -132,8 +131,8 @@ class FittingProblem(BaseProblem):
The model to fit.
parameters : list
List of parameters for the problem.
dataset : list
List of data objects to fit the model to.
dataset : Dataset
Dataset object containing the data to fit the model to.
signal : str, optional
The signal to fit (default: "Voltage [V]").
"""
Expand All @@ -147,15 +146,14 @@ def __init__(
signal=["Voltage [V]"],
init_soc=None,
x0=None,
observer: Optional[Observer] = None,
):
super().__init__(parameters, model, check_model, signal, init_soc, x0)
self._dataset = dataset.data

# Check that the dataset contains time and current
for name in ["Time [s]", "Current function [A]"] + self.signal:
if name not in self._dataset:
raise ValueError(f"expected {name} in list of dataset")
raise ValueError(f"Expected {name} in list of dataset")

self._time_data = self._dataset["Time [s]"]
self.n_time_data = len(self._time_data)
Expand Down
8 changes: 5 additions & 3 deletions pybop/models/base_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ def build(
# Clear solver and setup model
self._solver._model_set_up = {}

self.n_states = self._built_model.len_rhs_and_alg # len_rhs + len_alg

def set_init_soc(self, init_soc):
"""
Set the initial state of charge for the battery model.
Expand Down Expand Up @@ -214,7 +216,7 @@ def step(self, state: TimeSeriesState, time: np.ndarray) -> TimeSeriesState:
The time to predict the system to (in whatever time units the model is in)
"""
dt = time - state.t
new_sol = self.solver.step(
new_sol = self._solver.step(
state.sol, self.built_model, dt, npts=2, inputs=state.inputs, save=False
)
return TimeSeriesState(sol=new_sol, inputs=state.inputs, t=time)
Expand Down Expand Up @@ -252,7 +254,7 @@ def simulate(self, inputs, t_eval) -> np.ndarray[np.float64]:
inputs=inputs,
allow_infeasible_solutions=self.allow_infeasible_solutions,
):
sol = self.solver.solve(self.built_model, inputs=inputs, t_eval=t_eval)
sol = self._solver.solve(self.built_model, inputs=inputs, t_eval=t_eval)

predictions = [sol[signal].data for signal in self.signal]

Expand Down Expand Up @@ -295,7 +297,7 @@ def simulateS1(self, inputs, t_eval):
inputs=inputs,
allow_infeasible_solutions=self.allow_infeasible_solutions,
):
sol = self.solver.solve(
sol = self._solver.solve(
self.built_model,
inputs=inputs,
t_eval=t_eval,
Expand Down
Loading

0 comments on commit 6d0d460

Please sign in to comment.