diff --git a/benchmark/benchmarks/integration/ex_two_datasets/benchmark.py b/benchmark/benchmarks/integration/ex_two_datasets/benchmark.py index e6279b1f8..8a02f2bce 100644 --- a/benchmark/benchmarks/integration/ex_two_datasets/benchmark.py +++ b/benchmark/benchmarks/integration/ex_two_datasets/benchmark.py @@ -1,11 +1,6 @@ -import pickle from pathlib import Path -from scipy.optimize import OptimizeResult - -from glotaran.analysis.optimize import _create_result from glotaran.analysis.optimize import optimize -from glotaran.analysis.problem_grouped import GroupedProblem from glotaran.io import load_dataset from glotaran.io import load_model from glotaran.io import load_parameters @@ -37,24 +32,6 @@ def setup(self): non_negative_least_squares=True, optimization_method="TrustRegionReflection", ) - # Values extracted from a previous run of IntegrationTwoDatasets.time_optimize() - self.problem = GroupedProblem(self.scheme) - # pickled OptimizeResult - with open(SCRIPT_DIR / "data/ls_result.pcl", "rb") as ls_result_file: - self.ls_result: OptimizeResult = pickle.load(ls_result_file) - self.free_parameter_labels = [ - "inputs.2", - "inputs.3", - "inputs.7", - "inputs.8", - "scale.2", - "rates.k1", - "rates.k2", - "rates.k3", - "irf.center", - "irf.width", - ] - self.termination_reason = "The maximum number of function evaluations is exceeded." def time_optimize(self): optimize(self.scheme) @@ -62,16 +39,6 @@ def time_optimize(self): def peakmem_optimize(self): optimize(self.scheme) - def time_create_result(self): - _create_result( - self.problem, self.ls_result, self.free_parameter_labels, self.termination_reason - ) - - def peakmem_create_result(self): - _create_result( - self.problem, self.ls_result, self.free_parameter_labels, self.termination_reason - ) - if __name__ == "__main__": test = IntegrationTwoDatasets() diff --git a/benchmark/pytest/analysis/test_problem.py b/benchmark/pytest/analysis/test_optimization_group.py similarity index 59% rename from benchmark/pytest/analysis/test_problem.py rename to benchmark/pytest/analysis/test_optimization_group.py index 16aa886c3..f7c9a5c2e 100644 --- a/benchmark/pytest/analysis/test_problem.py +++ b/benchmark/pytest/analysis/test_optimization_group.py @@ -6,8 +6,7 @@ import pytest import xarray as xr -from glotaran.analysis.problem_grouped import GroupedProblem -from glotaran.analysis.problem_ungrouped import UngroupedProblem +from glotaran.analysis.optimization_group import OptimizationGroup from glotaran.model import Megacomplex from glotaran.model import Model from glotaran.model import megacomplex @@ -55,9 +54,10 @@ def finalize_data( @monkeypatch_plugin_registry(test_megacomplex={"benchmark": BenchmarkMegacomplex}) -def setup_model(index_dependent): +def setup_model(index_dependent, link_clp): model_dict = { "megacomplex": {"m1": {"is_index_dependent": index_dependent}}, + "dataset_groups": {"default": {"link_clp": link_clp}}, "dataset": { "dataset1": {"megacomplex": ["m1"]}, "dataset2": {"megacomplex": ["m1"]}, @@ -83,90 +83,93 @@ def setup_scheme(model): ) -def setup_problem(scheme, grouped): - return GroupedProblem(scheme) if grouped else UngroupedProblem(scheme) +def setup_optimization_group(scheme): + return OptimizationGroup(scheme, scheme.model.get_dataset_groups()["default"]) def test_benchmark_bag_creation(benchmark): - model = setup_model(False) + model = setup_model(False, True) assert model.valid() scheme = setup_scheme(model) - problem = setup_problem(scheme, True) + optimization_group = setup_optimization_group(scheme) - benchmark(problem.init_bag) + benchmark(optimization_group._calculator.init_bag) -@pytest.mark.parametrize("grouped", [True, False]) +@pytest.mark.parametrize("link_clp", [True, False]) @pytest.mark.parametrize("index_dependent", [True, False]) -def test_benchmark_calculate_matrix(benchmark, grouped, index_dependent): +def test_benchmark_calculate_matrix(benchmark, link_clp, index_dependent): - model = setup_model(index_dependent) + model = setup_model(index_dependent, link_clp) assert model.valid() scheme = setup_scheme(model) - problem = setup_problem(scheme, grouped) + optimization_group = setup_optimization_group(scheme) - if grouped: - problem.init_bag() + if link_clp: + optimization_group._calculator.init_bag() - benchmark(problem.calculate_matrices) + benchmark(optimization_group._calculator.calculate_matrices) -@pytest.mark.parametrize("grouped", [True, False]) +@pytest.mark.parametrize("link_clp", [True, False]) @pytest.mark.parametrize("index_dependent", [True, False]) -def test_benchmark_calculate_residual(benchmark, grouped, index_dependent): +def test_benchmark_calculate_residual(benchmark, link_clp, index_dependent): - model = setup_model(index_dependent) + model = setup_model(index_dependent, link_clp) assert model.valid() scheme = setup_scheme(model) - problem = setup_problem(scheme, grouped) + optimization_group = setup_optimization_group(scheme) - if grouped: - problem.init_bag() - problem.calculate_matrices() + if link_clp: + optimization_group._calculator.init_bag() - benchmark(problem.calculate_residual) + optimization_group._calculator.calculate_matrices() + benchmark(optimization_group._calculator.calculate_residual) -@pytest.mark.parametrize("grouped", [True, False]) + +@pytest.mark.parametrize("link_clp", [True, False]) @pytest.mark.parametrize("index_dependent", [True, False]) -def test_benchmark_calculate_result_data(benchmark, grouped, index_dependent): +def test_benchmark_calculate_result_data(benchmark, link_clp, index_dependent): - model = setup_model(index_dependent) + model = setup_model(index_dependent, link_clp) assert model.valid() scheme = setup_scheme(model) - problem = setup_problem(scheme, grouped) + optimization_group = setup_optimization_group(scheme) + + if link_clp: + optimization_group._calculator.init_bag() + + optimization_group._calculator.calculate_matrices() - if grouped: - problem.init_bag() - problem.calculate_matrices() - problem.calculate_residual() + optimization_group._calculator.calculate_residual() - benchmark(problem.create_result_data) + benchmark(optimization_group.create_result_data) # @pytest.mark.skip(reason="To time consuming atm.") -@pytest.mark.parametrize("grouped", [True, False]) +@pytest.mark.parametrize("link_clp", [True, False]) @pytest.mark.parametrize("index_dependent", [True, False]) -def test_benchmark_optimize_20_runs(benchmark, grouped, index_dependent): +def test_benchmark_optimize_20_runs(benchmark, link_clp, index_dependent): - model = setup_model(index_dependent) + model = setup_model(index_dependent, link_clp) assert model.valid() scheme = setup_scheme(model) - problem = setup_problem(scheme, grouped) + optimization_group = setup_optimization_group(scheme) @benchmark def run(): - if grouped: - problem.init_bag() + if link_clp: + optimization_group._calculator.init_bag() for _ in range(20): - problem.reset() - problem.full_penalty + optimization_group.reset() + optimization_group._calculator.calculate_full_penalty() - problem.create_result_data() + optimization_group.create_result_data() diff --git a/glotaran/analysis/problem.py b/glotaran/analysis/optimization_group.py similarity index 74% rename from glotaran/analysis/problem.py rename to glotaran/analysis/optimization_group.py index 91cf0fadc..36a4081f5 100644 --- a/glotaran/analysis/problem.py +++ b/glotaran/analysis/optimization_group.py @@ -2,17 +2,23 @@ import warnings from typing import TYPE_CHECKING -from typing import Dict -from typing import NamedTuple from typing import TypeVar import numpy as np import xarray as xr from glotaran.analysis.nnls import residual_nnls +from glotaran.analysis.optimization_group_calculator import OptimizationGroupCalculator +from glotaran.analysis.optimization_group_calculator_linked import ( + OptimizationGroupCalculatorLinked, +) +from glotaran.analysis.optimization_group_calculator_unlinked import ( + OptimizationGroupCalculatorUnlinked, +) from glotaran.analysis.util import get_min_max_from_interval from glotaran.analysis.variable_projection import residual_variable_projection from glotaran.io.prepare_dataset import add_svd_to_dataset +from glotaran.model import DatasetGroup from glotaran.model import DatasetModel from glotaran.model import Model from glotaran.parameter import ParameterGroup @@ -33,67 +39,59 @@ def __init__(self): super().__init__("Parameter not initialized") -class UngroupedProblemDescriptor(NamedTuple): - dataset: DatasetModel - data: xr.DataArray - model_axis: np.ndarray - global_axis: np.ndarray - weight: xr.DataArray - - -class GroupedProblemDescriptor(NamedTuple): - label: str - indices: dict[str, int] - axis: dict[str, np.ndarray] - - -class ProblemGroup(NamedTuple): - data: np.ndarray - weight: np.ndarray - has_scaling: bool - """Indicates if at least one dataset in the group needs scaling.""" - group: str - """The concatenated labels of the involved datasets.""" - data_sizes: list[int] - """Holds the sizes of the concatenated datasets.""" - descriptor: list[GroupedProblemDescriptor] - - -UngroupedBag = Dict[str, UngroupedProblemDescriptor] - XrDataContainer = TypeVar("XrDataContainer", xr.DataArray, xr.Dataset) +residual_functions = { + "variable_projection": residual_variable_projection, + "non_negative_least_squares": residual_nnls, +} -class Problem: - """A Problem class""" - def __init__(self, scheme: Scheme): - """Initializes the Problem class from a scheme (:class:`glotaran.analysis.scheme.Scheme`) +class OptimizationGroup: + def __init__( + self, + scheme: Scheme, + dataset_group: DatasetGroup, + ): + """Create OptimizationGroup instance from a scheme (:class:`glotaran.analysis.scheme.Scheme`) Args: scheme (Scheme): An instance of :class:`glotaran.analysis.scheme.Scheme` which defines your model, parameters, and data """ - self._scheme = scheme - self._model = scheme.model - - self._bag = None - - self._residual_function = ( - residual_nnls if scheme.non_negative_least_squares else residual_variable_projection - ) - self._parameters = None - self._dataset_models = None + if scheme.parameters is None: + raise ParameterNotInitializedError + self._parameters = scheme.parameters.copy() + self._dataset_group_model = dataset_group.model + self._clp_link_tolerance = scheme.clp_link_tolerance + + try: + self._residual_function = residual_functions[dataset_group.model.residual_function] + except KeyError: + raise ValueError( + f"Unknown residual function '{dataset_group.model.residual_function}', " + f"allowed functions are: {list(residual_functions.keys())}." + ) + self._dataset_models = dataset_group.dataset_models self._overwrite_index_dependent = self.model.need_index_dependent() - self._parameters = scheme.parameters.copy() - self._parameter_history = ParameterHistory() self._model.validate(raise_exception=True) - self._prepare_data(scheme.data) + self._prepare_data(scheme, list(dataset_group.dataset_models.keys())) + self._dataset_labels = list(self.data.keys()) + + link_clp = dataset_group.model.link_clp + if link_clp is None: + link_clp = self.model.is_groupable(self.parameters, self.data) + + self._calculator: OptimizationGroupCalculator = ( + OptimizationGroupCalculatorLinked(self) + if link_clp + else OptimizationGroupCalculatorUnlinked(self) + ) # all of the above are always not None @@ -104,19 +102,8 @@ def __init__(self, scheme: Scheme): self._weighted_residuals = None self._residuals = None self._additional_penalty = None - self._full_axis = None self._full_penalty = None - @property - def scheme(self) -> Scheme: - """Property providing access to the used scheme - - Returns: - Scheme: An instance of :class:`glotaran.analysis.scheme.Scheme` - Provides access to data, model, parameters and optimization arguments. - """ - return self._scheme - @property def model(self) -> Model: """Property providing access to the used model @@ -145,10 +132,6 @@ def parameters(self, parameters: ParameterGroup): self._parameters = parameters self.reset() - @property - def parameter_history(self) -> ParameterHistory: - return self._parameter_history - @property def dataset_models(self) -> dict[str, DatasetModel]: return self._dataset_models @@ -158,7 +141,7 @@ def matrices( self, ) -> dict[str, np.ndarray | list[np.ndarray]]: if self._matrices is None: - self.calculate_matrices() + self._calculator.calculate_matrices() return self._matrices @property @@ -166,7 +149,7 @@ def reduced_matrices( self, ) -> dict[str, np.ndarray] | dict[str, list[np.ndarray]] | list[np.ndarray]: if self._reduced_matrices is None: - self.calculate_matrices() + self._calculator.calculate_matrices() return self._reduced_matrices @property @@ -174,7 +157,7 @@ def reduced_clps( self, ) -> dict[str, list[np.ndarray]]: if self._reduced_clps is None: - self.calculate_residual() + self._calculator.calculate_residual() return self._reduced_clps @property @@ -182,7 +165,7 @@ def clps( self, ) -> dict[str, list[np.ndarray]]: if self._clps is None: - self.calculate_residual() + self._calculator.calculate_residual() return self._clps @property @@ -190,7 +173,7 @@ def weighted_residuals( self, ) -> dict[str, list[np.ndarray]]: if self._weighted_residuals is None: - self.calculate_residual() + self._calculator.calculate_residual() return self._weighted_residuals @property @@ -198,7 +181,7 @@ def residuals( self, ) -> dict[str, list[np.ndarray]]: if self._residuals is None: - self.calculate_residual() + self._calculator.calculate_residual() return self._residuals @property @@ -206,25 +189,25 @@ def additional_penalty( self, ) -> dict[str, list[float]]: if self._additional_penalty is None: - self.calculate_residual() + self._calculator.calculate_residual() return self._additional_penalty @property def full_penalty(self) -> np.ndarray: - raise NotImplementedError + if self._full_penalty is None: + self._calculator.calculate_full_penalty() + return self._full_penalty @property def cost(self) -> float: return 0.5 * np.dot(self.full_penalty, self.full_penalty) - def save_parameters_for_history(self): - self._parameter_history.append(self._parameters) - def reset(self): """Resets all results and `DatasetModels`. Use after updating parameters.""" self._dataset_models = { label: dataset_model.fill(self._model, self._parameters).set_data(self.data[label]) - for label, dataset_model in self._model.dataset.items() + for label, dataset_model in self.model.dataset.items() + if label in self._dataset_labels } if self._overwrite_index_dependent: for d in self._dataset_models.values(): @@ -241,10 +224,12 @@ def _reset_results(self): self._additional_penalty = None self._full_penalty = None - def _prepare_data(self, data: dict[str, xr.DataArray | xr.Dataset]): + def _prepare_data(self, scheme: Scheme, labels: list[str]): self._data = {} self._dataset_models = {} - for label, dataset in data.items(): + for label, dataset in scheme.data.items(): + if label not in labels: + continue if isinstance(dataset, xr.DataArray): dataset = dataset.to_dataset(name="data") @@ -261,7 +246,7 @@ def _prepare_data(self, data: dict[str, xr.DataArray | xr.Dataset]): dataset, ordered_dims=[model_dimension, global_dimension] ) - if self.scheme.add_svd: + if scheme.add_svd: add_svd_to_dataset(dataset, lsv_dim=model_dimension, rsv_dim=global_dimension) self._add_weight(label, dataset) @@ -324,16 +309,22 @@ def _add_weight(self, label, dataset): ) dataset.weight[idx] *= weight.value - def create_result_data(self, copy: bool = True, success: bool = True) -> dict[str, xr.Dataset]: + def create_result_data( + self, + parameter_history: ParameterHistory = None, + copy: bool = True, + success: bool = True, + add_svd: bool = True, + ) -> dict[str, xr.Dataset]: if not success: - if self.parameter_history.number_of_records > 1: - self.parameters.set_from_history(self.parameter_history, -2) + if parameter_history is not None and parameter_history.number_of_records > 1: + self.parameters.set_from_history(parameter_history, -2) else: raise InitialParameterError() self.reset() - self.prepare_result_creation() + self._calculator.prepare_result_creation() result_data = {} for label, dataset_model in self.dataset_models.items(): result_data[label] = self.create_result_dataset(label, copy=copy) @@ -341,7 +332,9 @@ def create_result_data(self, copy: bool = True, success: bool = True) -> dict[st return result_data - def create_result_dataset(self, label: str, copy: bool = True) -> xr.Dataset: + def create_result_dataset( + self, label: str, copy: bool = True, add_svd: bool = True + ) -> xr.Dataset: dataset = self.data[label] dataset_model = self.dataset_models[label] global_dimension = dataset_model.get_global_dimension() @@ -349,12 +342,12 @@ def create_result_dataset(self, label: str, copy: bool = True) -> xr.Dataset: if copy: dataset = dataset.copy() if dataset_model.is_index_dependent(): - dataset = self.create_index_dependent_result_dataset(label, dataset) + dataset = self._calculator.create_index_dependent_result_dataset(label, dataset) else: - dataset = self.create_index_independent_result_dataset(label, dataset) + dataset = self._calculator.create_index_independent_result_dataset(label, dataset) # TODO: adapt tests to handle add_svd=False - if self.scheme.add_svd: + if add_svd: self._create_svd("weighted_residual", dataset, model_dimension, global_dimension) self._create_svd("residual", dataset, model_dimension, global_dimension) @@ -390,22 +383,3 @@ def _create_svd(self, name: str, dataset: xr.Dataset, lsv_dim: str, rsv_dim: str add_svd_to_dataset( dataset, name=name, lsv_dim=lsv_dim, rsv_dim=rsv_dim, data_array=data_array ) - - def create_index_dependent_result_dataset(self, label: str, dataset: xr.Dataset) -> xr.Dataset: - """Creates a result datasets for index dependent matrices.""" - raise NotImplementedError - - def create_index_independent_result_dataset( - self, label: str, dataset: xr.Dataset - ) -> xr.Dataset: - """Creates a result datasets for index independent matrices.""" - raise NotImplementedError - - def calculate_matrices(self): - raise NotImplementedError - - def calculate_residual(self): - raise NotImplementedError - - def prepare_result_creation(self): - pass diff --git a/glotaran/analysis/optimization_group_calculator.py b/glotaran/analysis/optimization_group_calculator.py new file mode 100644 index 000000000..2f200629d --- /dev/null +++ b/glotaran/analysis/optimization_group_calculator.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import numpy as np +import xarray as xr + +if TYPE_CHECKING: + from glotaran.analysis.optimization_group import OptimizationGroup + + +class OptimizationGroupCalculator: + """A Problem class""" + + def __init__(self, group: OptimizationGroup): + self._group = group + + def calculate_matrices(self): + raise NotImplementedError + + def calculate_residual(self): + raise NotImplementedError + + def calculate_full_penalty(self) -> np.ndarray: + raise NotImplementedError + + def prepare_result_creation(self): + pass + + def create_index_dependent_result_dataset(self, label: str, dataset: xr.Dataset) -> xr.Dataset: + """Creates a result datasets for index dependent matrices.""" + raise NotImplementedError + + def create_index_independent_result_dataset( + self, label: str, dataset: xr.Dataset + ) -> xr.Dataset: + """Creates a result datasets for index independent matrices.""" + raise NotImplementedError diff --git a/glotaran/analysis/problem_grouped.py b/glotaran/analysis/optimization_group_calculator_linked.py similarity index 64% rename from glotaran/analysis/problem_grouped.py rename to glotaran/analysis/optimization_group_calculator_linked.py index 31c047f5a..6da488894 100644 --- a/glotaran/analysis/problem_grouped.py +++ b/glotaran/analysis/optimization_group_calculator_linked.py @@ -2,16 +2,15 @@ import collections import itertools +from typing import TYPE_CHECKING from typing import Any from typing import Deque +from typing import NamedTuple import numpy as np import xarray as xr -from glotaran.analysis.problem import GroupedProblemDescriptor -from glotaran.analysis.problem import ParameterNotInitializedError -from glotaran.analysis.problem import Problem -from glotaran.analysis.problem import ProblemGroup +from glotaran.analysis.optimization_group_calculator import OptimizationGroupCalculator from glotaran.analysis.util import CalculatedMatrix from glotaran.analysis.util import apply_weight from glotaran.analysis.util import calculate_clp_penalties @@ -21,26 +20,48 @@ from glotaran.analysis.util import reduce_matrix from glotaran.analysis.util import retrieve_clps from glotaran.model import DatasetModel -from glotaran.project import Scheme -Bag = Deque[ProblemGroup] +if TYPE_CHECKING: + from glotaran.analysis.optimization_group import OptimizationGroup -class GroupedProblem(Problem): - """Represents a problem where the data is grouped.""" +class DatasetIndexModel(NamedTuple): + """A model which contains a dataset label and index information.""" - def __init__(self, scheme: Scheme): - """Initializes the Problem class from a scheme (:class:`glotaran.analysis.scheme.Scheme`) + label: str + indices: dict[str, int] + axis: dict[str, np.ndarray] - Args: - scheme (Scheme): An instance of :class:`glotaran.analysis.scheme.Scheme` - which defines your model, parameters, and data - """ - super().__init__(scheme=scheme) + +class DatasetIndexModelGroup(NamedTuple): + """A model which contains information about a group of dataset with linked clp.""" + + data: np.ndarray + weight: np.ndarray + has_scaling: bool + """Indicates if at least one dataset in the group needs scaling.""" + group: str + """The concatenated labels of the involved datasets.""" + data_sizes: list[int] + """Holds the sizes of the concatenated datasets.""" + dataset_models: list[DatasetIndexModel] + + +Bag = Deque[DatasetIndexModelGroup] +"""A deque of dataset group index models.""" + + +class OptimizationGroupCalculatorLinked(OptimizationGroupCalculator): + """A class to calculate a set of datasets with linked CLP.""" + + def __init__(self, group: OptimizationGroup): + super().__init__(group) + self._bag = None + self._full_axis = None # TODO: grouping should be user controlled not inferred automatically - global_dimensions = {d.get_global_dimension() for d in self.dataset_models.values()} - model_dimensions = {d.get_model_dimension() for d in self.dataset_models.values()} + global_dimensions = {d.get_global_dimension() for d in group.dataset_models.values()} + model_dimensions = {d.get_model_dimension() for d in group.dataset_models.values()} if len(global_dimensions) != 1: raise ValueError( f"Cannot group datasets. Global dimensions '{global_dimensions}' do not match." @@ -49,16 +70,16 @@ def __init__(self, scheme: Scheme): raise ValueError( f"Cannot group datasets. Model dimension '{model_dimensions}' do not match." ) - self._index_dependent = any(d.is_index_dependent() for d in self.dataset_models.values()) + self._index_dependent = any(d.is_index_dependent() for d in group.dataset_models.values()) self._global_dimension = global_dimensions.pop() self._model_dimension = model_dimensions.pop() self._group_clp_labels = None self._groups = None - self._has_weights = any("weight" in d for d in self._data.values()) + self._has_weights = any("weight" in d for d in group.data.values()) @property def bag(self) -> Bag: - if not self._bag: + if self._bag is None: self.init_bag() return self._bag @@ -66,7 +87,7 @@ def init_bag(self): """Initializes a grouped problem bag.""" self._bag = None datasets = None - for label, dataset_model in self.dataset_models.items(): + for label, dataset_model in self._group.dataset_models.items(): data = dataset_model.get_data() weight = dataset_model.get_weight() @@ -79,14 +100,14 @@ def init_bag(self): if self._bag is None: self._bag = collections.deque( - ProblemGroup( + DatasetIndexModelGroup( data=data[:, i], weight=weight[:, i] if weight is not None else None, has_scaling=has_scaling, group=label, data_sizes=[model_axis.size], - descriptor=[ - GroupedProblemDescriptor( + dataset_models=[ + DatasetIndexModel( label, { self._global_dimension: i, @@ -119,12 +140,12 @@ def _append_to_grouped_bag( weight: xr.DataArray, has_scaling: bool, ): - i1, i2 = find_overlap(self._full_axis, global_axis, atol=self._scheme.group_tolerance) + i1, i2 = find_overlap(self._full_axis, global_axis, atol=self._group._clp_link_tolerance) for i, j in enumerate(i1): datasets[j].append(label) data_stripe = data[:, i2[i]] - self._bag[j] = ProblemGroup( + self._bag[j] = DatasetIndexModelGroup( data=np.concatenate( [ self._bag[j].data, @@ -137,9 +158,9 @@ def _append_to_grouped_bag( has_scaling=has_scaling or self._bag[j].has_scaling, group=self._bag[j].group + label, data_sizes=self._bag[j].data_sizes + [data_stripe.size], - descriptor=self._bag[j].descriptor + dataset_models=self._bag[j].dataset_models + [ - GroupedProblemDescriptor( + DatasetIndexModel( label, { self._global_dimension: i2[i], @@ -157,14 +178,14 @@ def _append_to_grouped_bag( end_overlap = i2[-1] + 1 if len(i2) != 0 else 0 for i in itertools.chain(range(begin_overlap), range(end_overlap, len(global_axis))): data_stripe = data[:, i] - problem = ProblemGroup( + problem = DatasetIndexModelGroup( data=data_stripe, weight=weight[:, i] if weight is not None else None, has_scaling=has_scaling, group=label, data_sizes=[data_stripe.size], - descriptor=[ - GroupedProblemDescriptor( + dataset_models=[ + DatasetIndexModel( label, { self._global_dimension: i, @@ -192,8 +213,6 @@ def groups(self) -> dict[str, list[str]]: return self._groups def calculate_matrices(self): - if self._parameters is None: - raise ParameterNotInitializedError if self._index_dependent: self.calculate_index_dependent_matrices() else: @@ -205,73 +224,78 @@ def calculate_index_dependent_matrices( """Calculates the index dependent model matrices.""" def calculate_group( - group: ProblemGroup, descriptors: dict[str, DatasetModel] + group_model: DatasetIndexModelGroup, dataset_models: dict[str, DatasetModel] ) -> tuple[list[CalculatedMatrix], list[str], CalculatedMatrix]: matrices = [ calculate_matrix( - descriptors[problem.label], - problem.indices, + dataset_models[dataset_index_model.label], + dataset_index_model.indices, ) - for problem in group.descriptor + for dataset_index_model in group_model.dataset_models ] - global_index = group.descriptor[0].indices[self._global_dimension] - global_index = group.descriptor[0].axis[self._global_dimension][global_index] + global_index = group_model.dataset_models[0].indices[self._global_dimension] + global_index = group_model.dataset_models[0].axis[self._global_dimension][global_index] combined_matrix = combine_matrices(matrices) group_clp_labels = combined_matrix.clp_labels reduced_matrix = reduce_matrix( - combined_matrix, self.model, self.parameters, global_index + combined_matrix, self._group.model, self._group.parameters, global_index ) return matrices, group_clp_labels, reduced_matrix - results = list(map(lambda group: calculate_group(group, self.dataset_models), self.bag)) + results = list( + map( + lambda group_model: calculate_group(group_model, self._group.dataset_models), + self.bag, + ) + ) matrices = list(map(lambda result: result[0], results)) - self._matrices = {} - for i, grouped_problem in enumerate(self._bag): - for j, descriptor in enumerate(grouped_problem.descriptor): - if descriptor.label not in self._matrices: - self._matrices[descriptor.label] = [] - self._matrices[descriptor.label].append(matrices[i][j]) + self._group._matrices = {} + for i, group_model in enumerate(self._bag): + for j, dataset_index_model in enumerate(group_model.dataset_models): + if dataset_index_model.label not in self._group._matrices: + self._group._matrices[dataset_index_model.label] = [] + self._group._matrices[dataset_index_model.label].append(matrices[i][j]) self._group_clp_labels = list(map(lambda result: result[1], results)) - self._reduced_matrices = list(map(lambda result: result[2], results)) - return self._matrices, self._reduced_matrices + self._group._reduced_matrices = list(map(lambda result: result[2], results)) + return self._group._matrices, self._group._reduced_matrices def calculate_index_independent_matrices( self, ) -> tuple[dict[str, CalculatedMatrix], dict[str, CalculatedMatrix],]: """Calculates the index independent model matrices.""" - self._matrices = {} + self._group._matrices = {} + self._group._reduced_matrices = {} self._group_clp_labels = {} - self._reduced_matrices = {} - for label, dataset_model in self.dataset_models.items(): - self._matrices[label] = calculate_matrix( + for label, dataset_model in self._group.dataset_models.items(): + self._group._matrices[label] = calculate_matrix( dataset_model, {}, ) - self._group_clp_labels[label] = self._matrices[label].clp_labels - self._reduced_matrices[label] = reduce_matrix( - self._matrices[label], - self.model, - self.parameters, + self._group_clp_labels[label] = self._group._matrices[label].clp_labels + self._group._reduced_matrices[label] = reduce_matrix( + self._group._matrices[label], + self._group.model, + self._group.parameters, None, ) for group_label, group in self.groups.items(): - if group_label not in self._matrices: - self._reduced_matrices[group_label] = combine_matrices( - [self._reduced_matrices[label] for label in group] + if group_label not in self._group._matrices: + self._group._reduced_matrices[group_label] = combine_matrices( + [self._group._reduced_matrices[label] for label in group] ) group_clp_labels = [] for label in group: - for clp_label in self._matrices[label].clp_labels: + for clp_label in self._group._matrices[label].clp_labels: if clp_label not in group_clp_labels: group_clp_labels.append(clp_label) self._group_clp_labels[group_label] = group_clp_labels - return self._matrices, self._reduced_matrices + return self._group._matrices, self._group._reduced_matrices def calculate_residual(self): results = ( @@ -279,7 +303,7 @@ def calculate_residual(self): map( self._index_dependent_residual, self.bag, - self.reduced_matrices, + self._group.reduced_matrices, self._group_clp_labels, self._full_axis, ) @@ -291,22 +315,27 @@ def calculate_residual(self): self._clp_labels = list(map(lambda result: result[0], results)) self._grouped_clps = list(map(lambda result: result[1], results)) - self._weighted_residuals = list(map(lambda result: result[2], results)) - self._residuals = list(map(lambda result: result[3], results)) - self._additional_penalty = calculate_clp_penalties( - self.model, - self.parameters, + self._group._weighted_residuals = list(map(lambda result: result[2], results)) + self._group._residuals = list(map(lambda result: result[3], results)) + self._group._additional_penalty = calculate_clp_penalties( + self._group.model, + self._group.parameters, self._clp_labels, self._grouped_clps, self._full_axis, - self.dataset_models, + self._group.dataset_models, ) - return self._reduced_clps, self._clps, self._weighted_residuals, self._residuals + return ( + self._group._reduced_clps, + self._group._clps, + self._group._weighted_residuals, + self._group._residuals, + ) def _index_dependent_residual( self, - problem: ProblemGroup, + group_model: DatasetIndexModelGroup, matrix: CalculatedMatrix, clp_labels: str, index: Any, @@ -314,72 +343,76 @@ def _index_dependent_residual( reduced_clp_labels = matrix.clp_labels matrix = matrix.matrix - if problem.weight is not None: - apply_weight(matrix, problem.weight) - data = problem.data - if problem.has_scaling: - for i, descriptor in enumerate(problem.descriptor): - label = descriptor.label - if self.dataset_models[label] is not None: - start = sum(problem.data_sizes[0:i]) - end = start + problem.data_sizes[i] - matrix[start:end, :] *= self.dataset_models[label].scale - - reduced_clps, weighted_residual = self._residual_function(matrix, data) + if group_model.weight is not None: + apply_weight(matrix, group_model.weight) + data = group_model.data + + self._apply_scale(group_model, matrix) + + reduced_clps, weighted_residual = self._group._residual_function(matrix, data) clps = retrieve_clps( - self.model, - self.parameters, + self._group.model, + self._group.parameters, clp_labels, reduced_clp_labels, reduced_clps, index, ) residual = ( - weighted_residual / problem.weight if problem.weight is not None else weighted_residual + weighted_residual / group_model.weight + if group_model.weight is not None + else weighted_residual ) return clp_labels, clps, weighted_residual, residual - def _index_independent_residual(self, problem: ProblemGroup, index: Any): - matrix = self.reduced_matrices[problem.group] + def _index_independent_residual(self, group_model: DatasetIndexModelGroup, index: Any): + matrix = self._group.reduced_matrices[group_model.group] reduced_clp_labels = matrix.clp_labels matrix = matrix.matrix.copy() - if problem.weight is not None: - apply_weight(matrix, problem.weight) - data = problem.data - if problem.has_scaling: - for i, descriptor in enumerate(problem.descriptor): - label = descriptor.label - if self.dataset_models[label] is not None: - start = sum(problem.data_sizes[0:i]) - end = start + problem.data_sizes[i] - matrix[start:end, :] *= self.dataset_models[label].scale - reduced_clps, weighted_residual = self._residual_function(matrix, data) - clp_labels = self._group_clp_labels[problem.group] + if group_model.weight is not None: + apply_weight(matrix, group_model.weight) + data = group_model.data + + self._apply_scale(group_model, matrix) + + reduced_clps, weighted_residual = self._group._residual_function(matrix, data) + clp_labels = self._group_clp_labels[group_model.group] clps = retrieve_clps( - self.model, - self.parameters, + self._group.model, + self._group.parameters, clp_labels, reduced_clp_labels, reduced_clps, index, ) residual = ( - weighted_residual / problem.weight if problem.weight is not None else weighted_residual + weighted_residual / group_model.weight + if group_model.weight is not None + else weighted_residual ) return clp_labels, clps, weighted_residual, residual + def _apply_scale(self, group_model: DatasetIndexModelGroup, matrix: np.ndarray): + if group_model.has_scaling: + for i, index_model in enumerate(group_model.dataset_models): + label = index_model.label + if self._group.dataset_models[label] is not None: + start = sum(group_model.data_sizes[0:i]) + end = start + group_model.data_sizes[i] + matrix[start:end, :] *= self._group.dataset_models[label].scale + def prepare_result_creation(self): - if self._residuals is None: + if self._group._residuals is None: self.calculate_residual() full_clp_labels = self._clp_labels full_clps = self._grouped_clps - self._clps = {} - for label, matrix in self.matrices.items(): + self._group._clps = {} + for label, matrix in self._group.matrices.items(): # TODO deal with different clps at indices clp_labels = matrix[0].clp_labels if self._index_dependent else matrix.clp_labels # find offset in the full axis - global_axis = self.dataset_models[label].get_global_axis() + global_axis = self._group.dataset_models[label].get_global_axis() offset = find_closest_index(global_axis[0], self._full_axis) clps = [] @@ -389,7 +422,7 @@ def prepare_result_creation(self): mask = [full_index_clp_labels.index(clp_label) for clp_label in clp_labels] clps.append(index_clps[mask]) - self._clps[label] = xr.DataArray( + self._group._clps[label] = xr.DataArray( clps, coords=((self._global_dimension, global_axis), ("clp_label", clp_labels)), ) @@ -401,9 +434,9 @@ def create_index_dependent_result_dataset(self, label: str, dataset: xr.Dataset) if label in grouped_problem.group: group_index = [ - descriptor.label for descriptor in grouped_problem.descriptor + descriptor.label for descriptor in grouped_problem.dataset_models ].index(label) - group_descriptor = grouped_problem.descriptor[group_index] + group_descriptor = grouped_problem.dataset_models[group_index] global_index = group_descriptor.indices[self._global_dimension] global_index = group_descriptor.axis[self._global_dimension][global_index] @@ -417,9 +450,9 @@ def create_index_dependent_result_dataset(self, label: str, dataset: xr.Dataset) (self._model_dimension), ("clp_label"), ), - np.asarray([m.matrix for m in self.matrices[label]]), + np.asarray([m.matrix for m in self._group.matrices[label]]), ) - dataset["clp"] = self.clps[label] + dataset["clp"] = self._group.clps[label] return dataset @@ -433,17 +466,17 @@ def create_index_independent_result_dataset( (self._model_dimension), ("clp_label"), ), - self.matrices[label].matrix, + self._group.matrices[label].matrix, ) - dataset["clp"] = self.clps[label] + dataset["clp"] = self._group.clps[label] for index, grouped_problem in enumerate(self.bag): if label in grouped_problem.group: group_index = [ - descriptor.label for descriptor in grouped_problem.descriptor + descriptor.label for descriptor in grouped_problem.dataset_models ].index(label) - group_descriptor = grouped_problem.descriptor[group_index] + group_descriptor = grouped_problem.dataset_models[group_index] global_index = group_descriptor.indices[self._global_dimension] global_index = group_descriptor.axis[self._global_dimension][global_index] @@ -456,7 +489,7 @@ def create_index_independent_result_dataset( def _add_grouped_residual_to_dataset( self, dataset: xr.Dataset, - grouped_problem: ProblemGroup, + grouped_problem: DatasetIndexModelGroup, index: int, group_index: int, global_index: int, @@ -474,30 +507,31 @@ def _add_grouped_residual_to_dataset( ) start = sum( - self.data[grouped_problem.descriptor[i].label].coords[self._model_dimension].size + self._group.data[grouped_problem.dataset_models[i].label] + .coords[self._model_dimension] + .size for i in range(group_index) ) end = start + dataset.coords[self._model_dimension].size dataset.weighted_residual.loc[ {self._global_dimension: global_index} - ] = self.weighted_residuals[index][start:end] - dataset.residual.loc[{self._global_dimension: global_index}] = self.residuals[index][ - start:end - ] + ] = self._group.weighted_residuals[index][start:end] + dataset.residual.loc[{self._global_dimension: global_index}] = self._group.residuals[ + index + ][start:end] - @property - def full_penalty(self) -> np.ndarray: - if self._full_penalty is None: - residuals = self.weighted_residuals - additional_penalty = self.additional_penalty + def calculate_full_penalty(self) -> np.ndarray: + if self._group._full_penalty is None: + residuals = self._group.weighted_residuals + additional_penalty = self._group.additional_penalty - self._full_penalty = ( + self._group._full_penalty = ( np.concatenate((np.concatenate(residuals), additional_penalty)) if additional_penalty is not None else np.concatenate(residuals) ) - return self._full_penalty + return self._group._full_penalty def combine_matrices(matrices: list[CalculatedMatrix]) -> CalculatedMatrix: diff --git a/glotaran/analysis/problem_ungrouped.py b/glotaran/analysis/optimization_group_calculator_unlinked.py similarity index 61% rename from glotaran/analysis/problem_ungrouped.py rename to glotaran/analysis/optimization_group_calculator_unlinked.py index dbe0df5ae..d261f523b 100644 --- a/glotaran/analysis/problem_ungrouped.py +++ b/glotaran/analysis/optimization_group_calculator_unlinked.py @@ -1,10 +1,11 @@ from __future__ import annotations +from typing import TYPE_CHECKING + import numpy as np import xarray as xr -from glotaran.analysis.problem import ParameterNotInitializedError -from glotaran.analysis.problem import Problem +from glotaran.analysis.optimization_group_calculator import OptimizationGroupCalculator from glotaran.analysis.util import CalculatedMatrix from glotaran.analysis.util import apply_weight from glotaran.analysis.util import calculate_clp_penalties @@ -12,25 +13,21 @@ from glotaran.analysis.util import reduce_matrix from glotaran.analysis.util import retrieve_clps from glotaran.model import DatasetModel -from glotaran.project import Scheme +if TYPE_CHECKING: + from glotaran.analysis.optimization_group import OptimizationGroup -class UngroupedProblem(Problem): - """Represents a problem where the data is not grouped.""" - def __init__(self, scheme: Scheme): - """Initializes the Problem class from a scheme (:class:`glotaran.analysis.scheme.Scheme`) +class OptimizationGroupCalculatorUnlinked(OptimizationGroupCalculator): + """Represents a problem where the clps are not linked.""" - Args: - scheme (Scheme): An instance of :class:`glotaran.analysis.scheme.Scheme` - which defines your model, parameters, and data - """ - super().__init__(scheme=scheme) + def __init__(self, group: OptimizationGroup): + super().__init__(group) self._global_matrices = {} self._flattened_data = {} self._flattened_weights = {} - for label, dataset_model in self.dataset_models.items(): + for label, dataset_model in group.dataset_models.items(): if dataset_model.has_global_model(): self._flattened_data[label] = dataset_model.get_data().T.flatten() weight = dataset_model.get_weight() @@ -50,14 +47,12 @@ def calculate_matrices( dict[str, CalculatedMatrix | list[CalculatedMatrix]], ]: """Calculates the model matrices.""" - if self._parameters is None: - raise ParameterNotInitializedError - self._matrices = {} + self._group._matrices = {} self._global_matrices = {} - self._reduced_matrices = {} + self._group._reduced_matrices = {} - for label, dataset_model in self.dataset_models.items(): + for label, dataset_model in self._group.dataset_models.items(): if dataset_model.is_index_dependent(): self._calculate_index_dependent_matrix(label, dataset_model) @@ -67,27 +62,29 @@ def calculate_matrices( if dataset_model.has_global_model(): self._calculate_global_matrix(label, dataset_model) - return self._matrices, self._reduced_matrices + return self._group._matrices, self._group._reduced_matrices def _calculate_index_dependent_matrix(self, label: str, dataset_model: DatasetModel): - self._matrices[label] = [] - self._reduced_matrices[label] = [] + self._group._matrices[label] = [] + self._group._reduced_matrices[label] = [] for i, index in enumerate(dataset_model.get_global_axis()): matrix = calculate_matrix( dataset_model, {dataset_model.get_global_dimension(): i}, ) - self._matrices[label].append(matrix) + self._group._matrices[label].append(matrix) if not dataset_model.has_global_model(): - reduced_matrix = reduce_matrix(matrix, self.model, self.parameters, index) - self._reduced_matrices[label].append(reduced_matrix) + reduced_matrix = reduce_matrix( + matrix, self._group.model, self._group.parameters, index + ) + self._group._reduced_matrices[label].append(reduced_matrix) def _calculate_index_independent_matrix(self, label: str, dataset_model: DatasetModel): matrix = calculate_matrix(dataset_model, {}) - self._matrices[label] = matrix + self._group._matrices[label] = matrix if not dataset_model.has_global_model(): - reduced_matrix = reduce_matrix(matrix, self.model, self.parameters, None) - self._reduced_matrices[label] = reduced_matrix + reduced_matrix = reduce_matrix(matrix, self._group.model, self._group.parameters, None) + self._group._reduced_matrices[label] = reduced_matrix def _calculate_global_matrix(self, label: str, dataset_model: DatasetModel): matrix = calculate_matrix(dataset_model, {}, as_global_model=True) @@ -103,37 +100,44 @@ def calculate_residual( ]: """Calculates the residuals.""" - self._reduced_clps = {} - self._clps = {} - self._weighted_residuals = {} - self._residuals = {} - self._additional_penalty = [] + self._group._reduced_clps = {} + self._group._clps = {} + self._group._weighted_residuals = {} + self._group._residuals = {} + self._group._additional_penalty = [] - for label, dataset_model in self._dataset_models.items(): + for label, dataset_model in self._group._dataset_models.items(): if dataset_model.has_global_model(): self._calculate_full_model_residual(label, dataset_model) else: self._calculate_residual(label, dataset_model) - self._additional_penalty = ( - np.concatenate(self._additional_penalty) if len(self._additional_penalty) != 0 else [] + self._group._additional_penalty = ( + np.concatenate(self._group._additional_penalty) + if len(self._group._additional_penalty) != 0 + else [] + ) + return ( + self._group._reduced_clps, + self._group._clps, + self._group._weighted_residuals, + self._group._residuals, ) - return self._reduced_clps, self._clps, self._weighted_residuals, self._residuals def _calculate_residual(self, label: str, dataset_model: DatasetModel): - self._reduced_clps[label] = [] - self._clps[label] = [] - self._weighted_residuals[label] = [] - self._residuals[label] = [] + self._group._reduced_clps[label] = [] + self._group._clps[label] = [] + self._group._weighted_residuals[label] = [] + self._group._residuals[label] = [] data = dataset_model.get_data() global_axis = dataset_model.get_global_axis() for i, index in enumerate(global_axis): reduced_clp_labels, reduced_matrix = ( - self.reduced_matrices[label][i] + self._group.reduced_matrices[label][i] if dataset_model.is_index_dependent() - else self.reduced_matrices[label] + else self._group.reduced_matrices[label] ) if not dataset_model.is_index_dependent(): reduced_matrix = reduced_matrix.copy() @@ -145,42 +149,42 @@ def _calculate_residual(self, label: str, dataset_model: DatasetModel): if weight is not None: apply_weight(reduced_matrix, weight[:, i]) - reduced_clps, residual = self._residual_function(reduced_matrix, data[:, i]) + reduced_clps, residual = self._group._residual_function(reduced_matrix, data[:, i]) - self._reduced_clps[label].append(reduced_clps) + self._group._reduced_clps[label].append(reduced_clps) clp_labels = self._get_clp_labels(label, i) - self._clps[label].append( + self._group._clps[label].append( retrieve_clps( - self.model, - self.parameters, + self._group.model, + self._group.parameters, clp_labels, reduced_clp_labels, reduced_clps, index, ) ) - self._weighted_residuals[label].append(residual) + self._group._weighted_residuals[label].append(residual) if weight is not None: - self._residuals[label].append(residual / weight[:, i]) + self._group._residuals[label].append(residual / weight[:, i]) else: - self._residuals[label].append(residual) + self._group._residuals[label].append(residual) clp_labels = self._get_clp_labels(label) additional_penalty = calculate_clp_penalties( - self.model, - self.parameters, + self._group.model, + self._group.parameters, clp_labels, - self._clps[label], + self._group._clps[label], global_axis, - self.dataset_models, + self._group.dataset_models, ) if additional_penalty.size != 0: - self._additional_penalty.append(additional_penalty) + self._group._additional_penalty.append(additional_penalty) def _calculate_full_model_residual(self, label: str, dataset_model: DatasetModel): - model_matrix = self.matrices[label] + model_matrix = self._group.matrices[label] global_matrix = self.global_matrices[label].matrix if dataset_model.is_index_dependent(): @@ -196,24 +200,27 @@ def _calculate_full_model_residual(self, label: str, dataset_model: DatasetModel if weight is not None: apply_weight(matrix, weight) data = self._flattened_data[label] - self._clps[label], self._weighted_residuals[label] = self._residual_function(matrix, data) + ( + self._group._clps[label], + self._group._weighted_residuals[label], + ) = self._group._residual_function(matrix, data) - self._residuals[label] = self._weighted_residuals[label] + self._group._residuals[label] = self._group._weighted_residuals[label] if weight is not None: - self._residuals[label] /= weight + self._group._residuals[label] /= weight def _get_clp_labels(self, label: str, index: int = 0): return ( - self.matrices[label][index].clp_labels - if self.dataset_models[label].is_index_dependent() - else self.matrices[label].clp_labels + self._group.matrices[label][index].clp_labels + if self._group.dataset_models[label].is_index_dependent() + else self._group.matrices[label].clp_labels ) def create_index_dependent_result_dataset(self, label: str, dataset: xr.Dataset) -> xr.Dataset: """Creates a result datasets for index dependent matrices.""" - model_dimension = self.dataset_models[label].get_model_dimension() - global_dimension = self.dataset_models[label].get_global_dimension() + model_dimension = self._group.dataset_models[label].get_model_dimension() + global_dimension = self._group.dataset_models[label].get_global_dimension() dataset.coords["clp_label"] = self._get_clp_labels(label) dataset["matrix"] = ( @@ -222,10 +229,10 @@ def create_index_dependent_result_dataset(self, label: str, dataset: xr.Dataset) (model_dimension), ("clp_label"), ), - np.asarray([m.matrix for m in self.matrices[label]]), + np.asarray([m.matrix for m in self._group.matrices[label]]), ) - if self.dataset_models[label].has_global_model(): + if self._group.dataset_models[label].has_global_model(): self._add_global_matrix_to_dataset(label, dataset) self._add_full_model_residual_and_clp_to_dataset(label, dataset) else: @@ -238,9 +245,9 @@ def create_index_independent_result_dataset( ) -> xr.Dataset: """Creates a result datasets for index independent matrices.""" - matrix = self.matrices[label] + matrix = self._group.matrices[label] dataset.coords["clp_label"] = matrix.clp_labels - model_dimension = self.dataset_models[label].get_model_dimension() + model_dimension = self._group.dataset_models[label].get_model_dimension() dataset["matrix"] = ( ( (model_dimension), @@ -249,7 +256,7 @@ def create_index_independent_result_dataset( matrix.matrix, ) - if self.dataset_models[label].has_global_model(): + if self._group.dataset_models[label].has_global_model(): self._add_global_matrix_to_dataset(label, dataset) self._add_full_model_residual_and_clp_to_dataset(label, dataset) else: @@ -260,7 +267,7 @@ def create_index_independent_result_dataset( def _add_global_matrix_to_dataset(self, label: str, dataset: xr.Dataset) -> xr.Dataset: matrix = self.global_matrices[label] dataset.coords["global_clp_label"] = matrix.clp_labels - global_dimension = self.dataset_models[label].get_global_dimension() + global_dimension = self._group.dataset_models[label].get_global_dimension() dataset["global_matrix"] = ( ( (global_dimension), @@ -270,39 +277,39 @@ def _add_global_matrix_to_dataset(self, label: str, dataset: xr.Dataset) -> xr.D ) def _add_residual_and_clp_to_dataset(self, label: str, dataset: xr.Dataset): - model_dimension = self.dataset_models[label].get_model_dimension() - global_dimension = self.dataset_models[label].get_global_dimension() + model_dimension = self._group.dataset_models[label].get_model_dimension() + global_dimension = self._group.dataset_models[label].get_global_dimension() dataset["clp"] = ( ( (global_dimension), ("clp_label"), ), - np.asarray(self.clps[label]), + np.asarray(self._group.clps[label]), ) dataset["weighted_residual"] = ( ( (model_dimension), (global_dimension), ), - np.transpose(np.asarray(self.weighted_residuals[label])), + np.transpose(np.asarray(self._group.weighted_residuals[label])), ) dataset["residual"] = ( ( (model_dimension), (global_dimension), ), - np.transpose(np.asarray(self.residuals[label])), + np.transpose(np.asarray(self._group.residuals[label])), ) def _add_full_model_residual_and_clp_to_dataset(self, label: str, dataset: xr.Dataset): - model_dimension = self.dataset_models[label].get_model_dimension() - global_dimension = self.dataset_models[label].get_global_dimension() + model_dimension = self._group.dataset_models[label].get_model_dimension() + global_dimension = self._group.dataset_models[label].get_global_dimension() dataset["clp"] = ( ( ("global_clp_label"), ("clp_label"), ), - self.clps[label].reshape( + self._group.clps[label].reshape( (dataset.coords["global_clp_label"].size, dataset.coords["clp_label"].size) ), ) @@ -311,21 +318,20 @@ def _add_full_model_residual_and_clp_to_dataset(self, label: str, dataset: xr.Da (model_dimension), (global_dimension), ), - self.weighted_residuals[label].T.reshape(dataset.data.shape), + self._group.weighted_residuals[label].T.reshape(dataset.data.shape), ) dataset["residual"] = ( ( (model_dimension), (global_dimension), ), - self.residuals[label].T.reshape(dataset.data.shape), + self._group.residuals[label].T.reshape(dataset.data.shape), ) - @property - def full_penalty(self) -> np.ndarray: - if self._full_penalty is None: - residuals = self.weighted_residuals - additional_penalty = self.additional_penalty + def calculate_full_penalty(self) -> np.ndarray: + if self._group._full_penalty is None: + residuals = self._group.weighted_residuals + additional_penalty = self._group.additional_penalty residuals = [ np.concatenate(residuals[label]) if isinstance(residuals[label], list) @@ -333,9 +339,9 @@ def full_penalty(self) -> np.ndarray: for label in residuals.keys() ] - self._full_penalty = ( + self._group._full_penalty = ( np.concatenate((np.concatenate(residuals), additional_penalty)) if additional_penalty is not None else np.concatenate(residuals) ) - return self._full_penalty + return self._group._full_penalty diff --git a/glotaran/analysis/optimize.py b/glotaran/analysis/optimize.py index 68c6377eb..0b94009f2 100644 --- a/glotaran/analysis/optimize.py +++ b/glotaran/analysis/optimize.py @@ -7,9 +7,8 @@ from scipy.optimize import least_squares from glotaran import __version__ as glotaran_version -from glotaran.analysis.problem import Problem -from glotaran.analysis.problem_grouped import GroupedProblem -from glotaran.analysis.problem_ungrouped import UngroupedProblem +from glotaran.analysis.optimization_group import OptimizationGroup +from glotaran.parameter import ParameterHistory from glotaran.project import Result from glotaran.project import Scheme @@ -21,34 +20,34 @@ def optimize(scheme: Scheme, verbose: bool = True, raise_exception: bool = False) -> Result: - problem = GroupedProblem(scheme) if scheme.is_grouped() else UngroupedProblem(scheme) - return optimize_problem(problem, verbose=verbose, raise_exception=raise_exception) - -def optimize_problem( - problem: Problem, verbose: bool = True, raise_exception: bool = False -) -> Result: - - if problem.scheme.optimization_method not in SUPPORTED_METHODS: - raise ValueError( - f"Unsupported optimization method {problem.scheme.optimization_method}. " - f"Supported methods are '{list(SUPPORTED_METHODS.keys())}'" - ) + optimization_groups = [ + OptimizationGroup(scheme, group) for group in scheme.model.get_dataset_groups().values() + ] ( free_parameter_labels, initial_parameter, lower_bounds, upper_bounds, - ) = problem.scheme.parameters.get_label_value_and_bounds_arrays(exclude_non_vary=True) - method = SUPPORTED_METHODS[problem.scheme.optimization_method] - nfev = problem.scheme.maximum_number_function_evaluations - ftol = problem.scheme.ftol - gtol = problem.scheme.gtol - xtol = problem.scheme.xtol + ) = scheme.parameters.get_label_value_and_bounds_arrays(exclude_non_vary=True) + + if scheme.optimization_method not in SUPPORTED_METHODS: + raise ValueError( + f"Unsupported optimization method {scheme.optimization_method}. " + f"Supported methods are '{list(SUPPORTED_METHODS.keys())}'" + ) + method = SUPPORTED_METHODS[scheme.optimization_method] + + nfev = scheme.maximum_number_function_evaluations + ftol = scheme.ftol + gtol = scheme.gtol + xtol = scheme.xtol verbose = 2 if verbose else 0 termination_reason = "" + parameter_history = ParameterHistory() + parameter_history.append(scheme.parameters) try: ls_result = least_squares( _calculate_penalty, @@ -60,7 +59,11 @@ def optimize_problem( ftol=ftol, gtol=gtol, xtol=xtol, - kwargs={"free_parameter_labels": free_parameter_labels, "problem": problem}, + kwargs={ + "free_parameter_labels": free_parameter_labels, + "optimization_groups": optimization_groups, + "parameter_history": parameter_history, + }, ) termination_reason = ls_result.message except Exception as e: @@ -70,29 +73,48 @@ def optimize_problem( termination_reason = str(e) ls_result = None - return _create_result(problem, ls_result, free_parameter_labels, termination_reason) + return _create_result( + scheme, + optimization_groups, + ls_result, + free_parameter_labels, + termination_reason, + parameter_history, + ) def _calculate_penalty( - parameters: np.ndarray, free_parameter_labels: list[str] = None, problem: Problem = None + parameters: np.ndarray, + *, + free_parameter_labels: list[str], + optimization_groups: list[OptimizationGroup], + parameter_history: ParameterHistory, ): - problem.save_parameters_for_history() - problem.parameters.set_from_label_and_value_arrays(free_parameter_labels, parameters) - problem.reset() - return problem.full_penalty + for group in optimization_groups: + group.parameters.set_from_label_and_value_arrays(free_parameter_labels, parameters) + group.reset() + parameter_history.append( + optimization_groups[0].parameters + ) # parameters are the same for all groups + + penalties = [group.full_penalty for group in optimization_groups] + + return np.concatenate(penalties) if len(penalties) != 1 else penalties[0] def _create_result( - problem: Problem, + scheme: Scheme, + optimization_groups: list[OptimizationGroup], ls_result: OptimizeResult | None, free_parameter_labels: list[str], termination_reason: str, + parameter_history: ParameterHistory, ) -> Result: success = ls_result is not None number_of_function_evaluation = ( - ls_result.nfev if success else problem.parameter_history.number_of_records + ls_result.nfev if success else parameter_history.number_of_records ) number_of_jacobian_evaluation = ls_result.njev if success else None optimality = float(ls_result.optimality) if success else None @@ -105,10 +127,18 @@ def _create_result( jacobian = ls_result.jac if success else None if success: - problem.parameters.set_from_label_and_value_arrays(free_parameter_labels, ls_result.x) - data = problem.create_result_data(success) + for group in optimization_groups: + group.parameters.set_from_label_and_value_arrays(free_parameter_labels, ls_result.x) + group.reset() + + data = {} + for group in optimization_groups: + data.update( + group.create_result_data(parameter_history, success=success, add_svd=scheme.add_svd) + ) + # the optimized parameters are those of the last run if the optimization has crashed - parameters = problem.parameters + parameters = optimization_groups[0].parameters covariance_matrix = None if success: # See PR #706: More robust covariance matrix calculation @@ -120,17 +150,21 @@ def _create_result( for label, error in zip(free_parameter_labels, standard_errors): parameters.get(label).standard_error = error + additional_penalty = [group.additional_penalty for group in optimization_groups] + + cost = [group.cost for group in optimization_groups] + return Result( - additional_penalty=problem.additional_penalty, - cost=problem.cost, + additional_penalty=additional_penalty, + cost=cost, data=data, glotaran_version=glotaran_version, free_parameter_labels=free_parameter_labels, number_of_function_evaluations=number_of_function_evaluation, - initial_parameters=problem.scheme.parameters, + initial_parameters=scheme.parameters, optimized_parameters=parameters, - parameter_history=problem.parameter_history, - scheme=problem.scheme, + parameter_history=parameter_history, + scheme=scheme, success=success, termination_reason=termination_reason, chi_square=chi_square, diff --git a/glotaran/analysis/test/test_constraints.py b/glotaran/analysis/test/test_constraints.py index 8b10b82ab..e3d9849fe 100644 --- a/glotaran/analysis/test/test_constraints.py +++ b/glotaran/analysis/test/test_constraints.py @@ -2,8 +2,7 @@ import pytest -from glotaran.analysis.problem_grouped import GroupedProblem -from glotaran.analysis.problem_ungrouped import UngroupedProblem +from glotaran.analysis.optimization_group import OptimizationGroup from glotaran.analysis.simulation import simulate from glotaran.analysis.test.models import TwoCompartmentDecay as suite from glotaran.model import ZeroConstraint @@ -11,13 +10,14 @@ @pytest.mark.parametrize("index_dependent", [True, False]) -@pytest.mark.parametrize("grouped", [True, False]) -def test_constraint(index_dependent, grouped): +@pytest.mark.parametrize("link_clp", [True, False]) +def test_constraint(index_dependent, link_clp): model = deepcopy(suite.model) + model.dataset_group_models["default"].link_clp = link_clp model.megacomplex["m1"].is_index_dependent = index_dependent model.clp_constraints.append(ZeroConstraint.from_dict({"target": "s2"})) - print("grouped", grouped, "index_dependent", index_dependent) + print("link_clp", link_clp, "index_dependent", index_dependent) dataset = simulate( suite.sim_model, "dataset1", @@ -25,17 +25,23 @@ def test_constraint(index_dependent, grouped): {"global": suite.global_axis, "model": suite.model_axis}, ) scheme = Scheme(model=model, parameters=suite.initial_parameters, data={"dataset1": dataset}) - problem = GroupedProblem(scheme) if grouped else UngroupedProblem(scheme) + optimization_group = OptimizationGroup(scheme, model.get_dataset_groups()["default"]) if index_dependent: reduced_matrix = ( - problem.reduced_matrices[0] if grouped else problem.reduced_matrices["dataset1"][0] + optimization_group.reduced_matrices[0] + if link_clp + else optimization_group.reduced_matrices["dataset1"][0] ) else: - reduced_matrix = problem.reduced_matrices["dataset1"] - matrix = problem.matrices["dataset1"][0] if index_dependent else problem.matrices["dataset1"] + reduced_matrix = optimization_group.reduced_matrices["dataset1"] + matrix = ( + optimization_group.matrices["dataset1"][0] + if index_dependent + else optimization_group.matrices["dataset1"] + ) - result_data = problem.create_result_data() + result_data = optimization_group.create_result_data() print(result_data) clps = result_data["dataset1"].clp diff --git a/glotaran/analysis/test/test_grouping.py b/glotaran/analysis/test/test_grouping.py index 35ea15f42..e63be10b9 100644 --- a/glotaran/analysis/test/test_grouping.py +++ b/glotaran/analysis/test/test_grouping.py @@ -1,7 +1,7 @@ import numpy as np import xarray as xr -from glotaran.analysis.problem_grouped import GroupedProblem +from glotaran.analysis.optimization_group import OptimizationGroup from glotaran.analysis.test.models import SimpleTestModel from glotaran.parameter import ParameterGroup from glotaran.project import Scheme @@ -11,6 +11,7 @@ def test_single_dataset(): model = SimpleTestModel.from_dict( { "megacomplex": {"m1": {"is_index_dependent": False}}, + "dataset_groups": {"default": {"link_clp": True}}, "dataset": { "dataset1": { "megacomplex": ["m1"], @@ -18,10 +19,8 @@ def test_single_dataset(): }, } ) - model.grouped = lambda: True print(model.validate()) assert model.valid() - assert model.grouped() parameters = ParameterGroup.from_list([1, 10]) print(model.validate(parameters)) @@ -36,22 +35,23 @@ def test_single_dataset(): } scheme = Scheme(model, parameters, data) - problem = GroupedProblem(scheme) - bag = problem.bag - datasets = problem.groups + optimization_group = OptimizationGroup(scheme, model.get_dataset_groups()["default"]) + bag = optimization_group._calculator.bag + datasets = optimization_group._calculator.groups assert len(datasets) == 1 assert len(bag) == 3 assert all(p.data.size == 4 for p in bag) - assert all(p.descriptor[0].label == "dataset1" for p in bag) - assert all(all(p.descriptor[0].axis["model"] == model_axis) for p in bag) - assert all(all(p.descriptor[0].axis["global"] == global_axis) for p in bag) - assert [p.descriptor[0].indices["global"] for p in bag] == [0, 1, 2] + assert all(p.dataset_models[0].label == "dataset1" for p in bag) + assert all(all(p.dataset_models[0].axis["model"] == model_axis) for p in bag) + assert all(all(p.dataset_models[0].axis["global"] == global_axis) for p in bag) + assert [p.dataset_models[0].indices["global"] for p in bag] == [0, 1, 2] def test_multi_dataset_no_overlap(): model = SimpleTestModel.from_dict( { "megacomplex": {"m1": {"is_index_dependent": False}}, + "dataset_groups": {"default": {"link_clp": True}}, "dataset": { "dataset1": { "megacomplex": ["m1"], @@ -86,27 +86,28 @@ def test_multi_dataset_no_overlap(): } scheme = Scheme(model, parameters, data) - problem = GroupedProblem(scheme) - bag = list(problem.bag) - assert len(problem.groups) == 2 + optimization_group = OptimizationGroup(scheme, model.get_dataset_groups()["default"]) + bag = list(optimization_group._calculator.bag) + assert len(optimization_group._calculator.groups) == 2 assert len(bag) == 6 assert all(p.data.size == 2 for p in bag[:3]) - assert all(p.descriptor[0].label == "dataset1" for p in bag[:3]) - assert all(all(p.descriptor[0].axis["model"] == model_axis_1) for p in bag[:3]) - assert all(all(p.descriptor[0].axis["global"] == global_axis_1) for p in bag[:3]) - assert [p.descriptor[0].indices["global"] for p in bag[:3]] == [0, 1, 2] + assert all(p.dataset_models[0].label == "dataset1" for p in bag[:3]) + assert all(all(p.dataset_models[0].axis["model"] == model_axis_1) for p in bag[:3]) + assert all(all(p.dataset_models[0].axis["global"] == global_axis_1) for p in bag[:3]) + assert [p.dataset_models[0].indices["global"] for p in bag[:3]] == [0, 1, 2] assert all(p.data.size == 3 for p in bag[3:]) - assert all(p.descriptor[0].label == "dataset2" for p in bag[3:]) - assert all(all(p.descriptor[0].axis["model"] == model_axis_2) for p in bag[3:]) - assert all(all(p.descriptor[0].axis["global"] == global_axis_2) for p in bag[3:]) - assert [p.descriptor[0].indices["global"] for p in bag[3:]] == [0, 1, 2] + assert all(p.dataset_models[0].label == "dataset2" for p in bag[3:]) + assert all(all(p.dataset_models[0].axis["model"] == model_axis_2) for p in bag[3:]) + assert all(all(p.dataset_models[0].axis["global"] == global_axis_2) for p in bag[3:]) + assert [p.dataset_models[0].indices["global"] for p in bag[3:]] == [0, 1, 2] def test_multi_dataset_overlap(): model = SimpleTestModel.from_dict( { "megacomplex": {"m1": {"is_index_dependent": False}}, + "dataset_groups": {"default": {"link_clp": True}}, "dataset": { "dataset1": { "megacomplex": ["m1"], @@ -140,29 +141,29 @@ def test_multi_dataset_overlap(): ).to_dataset(name="data"), } - scheme = Scheme(model, parameters, data, group_tolerance=5e-1) - problem = GroupedProblem(scheme) - bag = list(problem.bag) - assert len(problem.groups) == 3 - assert "dataset1dataset2" in problem.groups - assert problem.groups["dataset1dataset2"] == ["dataset1", "dataset2"] + scheme = Scheme(model, parameters, data, clp_link_tolerance=5e-1) + optimization_group = OptimizationGroup(scheme, model.get_dataset_groups()["default"]) + bag = list(optimization_group._calculator.bag) + assert len(optimization_group._calculator.groups) == 3 + assert "dataset1dataset2" in optimization_group._calculator.groups + assert optimization_group._calculator.groups["dataset1dataset2"] == ["dataset1", "dataset2"] assert len(bag) == 6 assert all(p.data.size == 4 for p in bag[:1]) - assert all(p.descriptor[0].label == "dataset1" for p in bag[1:5]) - assert all(all(p.descriptor[0].axis["model"] == model_axis_1) for p in bag[1:5]) - assert all(all(p.descriptor[0].axis["global"] == global_axis_1) for p in bag[1:5]) - assert [p.descriptor[0].indices["global"] for p in bag[1:5]] == [0, 1, 2, 3] + assert all(p.dataset_models[0].label == "dataset1" for p in bag[1:5]) + assert all(all(p.dataset_models[0].axis["model"] == model_axis_1) for p in bag[1:5]) + assert all(all(p.dataset_models[0].axis["global"] == global_axis_1) for p in bag[1:5]) + assert [p.dataset_models[0].indices["global"] for p in bag[1:5]] == [0, 1, 2, 3] assert all(p.data.size == 6 for p in bag[1:4]) - assert all(p.descriptor[1].label == "dataset2" for p in bag[1:4]) - assert all(all(p.descriptor[1].axis["model"] == model_axis_2) for p in bag[1:4]) - assert all(all(p.descriptor[1].axis["global"] == global_axis_2) for p in bag[1:4]) - assert [p.descriptor[1].indices["global"] for p in bag[1:4]] == [1, 2, 3] + assert all(p.dataset_models[1].label == "dataset2" for p in bag[1:4]) + assert all(all(p.dataset_models[1].axis["model"] == model_axis_2) for p in bag[1:4]) + assert all(all(p.dataset_models[1].axis["global"] == global_axis_2) for p in bag[1:4]) + assert [p.dataset_models[1].indices["global"] for p in bag[1:4]] == [1, 2, 3] assert all(p.data.size == 4 for p in bag[5:]) - assert bag[4].descriptor[0].label == "dataset1" - assert bag[5].descriptor[0].label == "dataset2" - assert np.array_equal(bag[4].descriptor[0].axis["model"], model_axis_1) - assert np.array_equal(bag[5].descriptor[0].axis["model"], model_axis_2) - assert [p.descriptor[0].indices["global"] for p in bag[1:4]] == [0, 1, 2] + assert bag[4].dataset_models[0].label == "dataset1" + assert bag[5].dataset_models[0].label == "dataset2" + assert np.array_equal(bag[4].dataset_models[0].axis["model"], model_axis_1) + assert np.array_equal(bag[5].dataset_models[0].axis["model"], model_axis_2) + assert [p.dataset_models[0].indices["global"] for p in bag[1:4]] == [0, 1, 2] diff --git a/glotaran/analysis/test/test_multiple_goups.py b/glotaran/analysis/test/test_multiple_goups.py new file mode 100644 index 000000000..803a3384c --- /dev/null +++ b/glotaran/analysis/test/test_multiple_goups.py @@ -0,0 +1,67 @@ +import numpy as np + +from glotaran.analysis.optimize import optimize +from glotaran.analysis.simulation import simulate +from glotaran.analysis.test.models import DecayModel +from glotaran.parameter import ParameterGroup +from glotaran.project import Scheme + + +def test_multiple_groups(): + wanted_parameters = ParameterGroup.from_list([101e-4]) + initial_parameters = ParameterGroup.from_list([100e-5]) + + global_axis = np.asarray([1.0]) + model_axis = np.arange(0, 150, 1.5) + + sim_model_dict = { + "megacomplex": {"m1": {"is_index_dependent": False}, "m2": {"type": "global_complex"}}, + "dataset": { + "dataset1": { + "initial_concentration": [], + "megacomplex": ["m1"], + "global_megacomplex": ["m2"], + "kinetic": ["1"], + } + }, + } + sim_model = DecayModel.from_dict(sim_model_dict) + model_dict = { + "dataset_groups": {"g1": {}, "g2": {"residual_function": "non_negative_least_squares"}}, + "megacomplex": {"m1": {"is_index_dependent": False}}, + "dataset": { + "dataset1": { + "group": "g1", + "initial_concentration": [], + "megacomplex": ["m1"], + "kinetic": ["1"], + }, + "dataset2": { + "group": "g2", + "initial_concentration": [], + "megacomplex": ["m1"], + "kinetic": ["1"], + }, + }, + } + model = DecayModel.from_dict(model_dict) + dataset = simulate( + sim_model, + "dataset1", + wanted_parameters, + {"global": global_axis, "model": model_axis}, + ) + scheme = Scheme( + model=model, + parameters=initial_parameters, + data={"dataset1": dataset, "dataset2": dataset}, + maximum_number_function_evaluations=10, + clp_link_tolerance=0.1, + ) + + result = optimize(scheme, raise_exception=True) + print(result.optimized_parameters) + assert result.success + for label, param in result.optimized_parameters.all(): + if param.vary: + assert np.allclose(param.value, wanted_parameters.get(label).value, rtol=1e-1) diff --git a/glotaran/analysis/test/test_optimization.py b/glotaran/analysis/test/test_optimization.py index 2a8e97ea7..e3eaaab1e 100644 --- a/glotaran/analysis/test/test_optimization.py +++ b/glotaran/analysis/test/test_optimization.py @@ -13,7 +13,7 @@ @pytest.mark.parametrize("is_index_dependent", [True, False]) -@pytest.mark.parametrize("grouped", [True, False]) +@pytest.mark.parametrize("link_clp", [True, False]) @pytest.mark.parametrize("weight", [True, False]) @pytest.mark.parametrize( "method", @@ -27,12 +27,12 @@ "suite", [OneCompartmentDecay, TwoCompartmentDecay, ThreeDatasetDecay, MultichannelMulticomponentDecay], ) -def test_optimization(suite, is_index_dependent, grouped, weight, method): +def test_optimization(suite, is_index_dependent, link_clp, weight, method): model = suite.model model.megacomplex["m1"].is_index_dependent = is_index_dependent - print("Grouped:", grouped) + print("Link CLP:", link_clp) print("Index dependent:", is_index_dependent) sim_model = suite.sim_model @@ -91,11 +91,12 @@ def test_optimization(suite, is_index_dependent, grouped, weight, method): parameters=initial_parameters, data=data, maximum_number_function_evaluations=10, - group=grouped, - group_tolerance=0.1, + clp_link_tolerance=0.1, optimization_method=method, ) + model.dataset_group_models["default"].link_clp = link_clp + result = optimize(scheme, raise_exception=True) print(result.optimized_parameters) assert result.success @@ -149,7 +150,6 @@ def test_optimization_full_model(index_dependent): parameters=parameters, data={"dataset1": dataset}, maximum_number_function_evaluations=10, - group=False, ) result = optimize(scheme, raise_exception=True) diff --git a/glotaran/analysis/test/test_problem.py b/glotaran/analysis/test/test_optimization_group.py similarity index 59% rename from glotaran/analysis/test/test_problem.py rename to glotaran/analysis/test/test_optimization_group.py index e3cac4504..bc26587ce 100644 --- a/glotaran/analysis/test/test_problem.py +++ b/glotaran/analysis/test/test_optimization_group.py @@ -4,9 +4,10 @@ import pytest import xarray as xr -from glotaran.analysis.problem import Problem -from glotaran.analysis.problem_grouped import GroupedProblem -from glotaran.analysis.problem_ungrouped import UngroupedProblem +from glotaran.analysis.optimization_group import OptimizationGroup +from glotaran.analysis.optimization_group_calculator_linked import ( + OptimizationGroupCalculatorLinked, +) from glotaran.analysis.simulation import simulate from glotaran.analysis.test.models import FullModel from glotaran.analysis.test.models import MultichannelMulticomponentDecay as suite @@ -19,10 +20,11 @@ @pytest.fixture( scope="module", params=[[True, True], [True, False], [False, True], [False, False]] ) -def problem(request) -> Problem: +def optimization_group(request) -> OptimizationGroup: model = suite.model model.megacomplex["m1"].is_index_dependent = request.param[1] model.is_index_dependent = request.param[1] + model.dataset_group_models["default"].link_clp = request.param[0] dataset = simulate( suite.sim_model, @@ -31,66 +33,68 @@ def problem(request) -> Problem: {"global": suite.global_axis, "model": suite.model_axis}, ) scheme = Scheme(model=model, parameters=suite.initial_parameters, data={"dataset1": dataset}) - problem = GroupedProblem(scheme) if request.param[0] else UngroupedProblem(scheme) - problem.grouped = request.param[0] - return problem + + return OptimizationGroup(scheme, model.get_dataset_groups()["default"]) -def test_problem_bag(problem: Problem): +def test_optimization_group_bag(optimization_group: OptimizationGroup): - if problem.grouped: - bag = problem.bag + if isinstance(optimization_group._calculator, OptimizationGroupCalculatorLinked): + bag = optimization_group._calculator.bag assert isinstance(bag, collections.deque) assert len(bag) == suite.global_axis.size - assert problem.groups == {"dataset1": ["dataset1"]} + assert optimization_group._calculator.groups == {"dataset1": ["dataset1"]} -def test_problem_matrices(problem: Problem): - problem.calculate_matrices() +def test_optimization_group_matrices(optimization_group: OptimizationGroup): + optimization_group._calculator.calculate_matrices() - if problem.grouped: - if problem.model.is_index_dependent: - assert all(isinstance(m, CalculatedMatrix) for m in problem.reduced_matrices) - assert len(problem.reduced_matrices) == suite.global_axis.size + if isinstance(optimization_group._calculator, OptimizationGroupCalculatorLinked): + if optimization_group.model.is_index_dependent: + assert all( + isinstance(m, CalculatedMatrix) for m in optimization_group.reduced_matrices + ) + assert len(optimization_group.reduced_matrices) == suite.global_axis.size else: - assert "dataset1" in problem.reduced_matrices - assert isinstance(problem.reduced_matrices["dataset1"], CalculatedMatrix) + assert "dataset1" in optimization_group.reduced_matrices + assert isinstance(optimization_group.reduced_matrices["dataset1"], CalculatedMatrix) else: - if problem.model.is_index_dependent: - assert isinstance(problem.reduced_matrices, dict) - assert isinstance(problem.reduced_matrices["dataset1"], list) + if optimization_group.model.is_index_dependent: + assert isinstance(optimization_group.reduced_matrices, dict) + assert isinstance(optimization_group.reduced_matrices["dataset1"], list) assert all( - isinstance(m, CalculatedMatrix) for m in problem.reduced_matrices["dataset1"] + isinstance(m, CalculatedMatrix) + for m in optimization_group.reduced_matrices["dataset1"] ) else: - assert isinstance(problem.reduced_matrices["dataset1"], CalculatedMatrix) + assert isinstance(optimization_group.reduced_matrices["dataset1"], CalculatedMatrix) - assert isinstance(problem.matrices, dict) - assert "dataset1" in problem.reduced_matrices + assert isinstance(optimization_group.matrices, dict) + assert "dataset1" in optimization_group.reduced_matrices -def test_problem_residuals(problem: Problem): - problem.calculate_residual() - if problem.grouped: - assert isinstance(problem.residuals, list) - assert all(isinstance(r, np.ndarray) for r in problem.residuals) - assert len(problem.residuals) == suite.global_axis.size +def test_optimization_group_residuals(optimization_group: OptimizationGroup): + optimization_group._calculator.calculate_residual() + if isinstance(optimization_group._calculator, OptimizationGroupCalculatorLinked): + assert isinstance(optimization_group.residuals, list) + assert all(isinstance(r, np.ndarray) for r in optimization_group.residuals) + assert len(optimization_group.residuals) == suite.global_axis.size else: - assert isinstance(problem.residuals, dict) - assert "dataset1" in problem.residuals - assert all(isinstance(r, np.ndarray) for r in problem.residuals["dataset1"]) - assert len(problem.residuals["dataset1"]) == suite.global_axis.size + assert isinstance(optimization_group.residuals, dict) + assert "dataset1" in optimization_group.residuals + assert all(isinstance(r, np.ndarray) for r in optimization_group.residuals["dataset1"]) + assert len(optimization_group.residuals["dataset1"]) == suite.global_axis.size -def test_problem_result_data(problem: Problem): +def test_optimization_group_result_data(optimization_group: OptimizationGroup): - data = problem.create_result_data() + data = optimization_group.create_result_data() label = "dataset1" assert label in data dataset = data[label] - dataset_model = problem.dataset_models[label] + dataset_model = optimization_group.dataset_models[label] assert "clp_label" in dataset.coords assert np.array_equal(dataset.clp_label, ["s1", "s2", "s3", "s4"]) @@ -103,7 +107,7 @@ def test_problem_result_data(problem: Problem): assert "matrix" in dataset matrix = dataset.matrix - if problem.model.is_index_dependent: + if optimization_group.model.is_index_dependent: assert len(matrix.shape) == 3 assert matrix.shape[0] == suite.global_axis.size assert matrix.shape[1] == suite.model_axis.size @@ -162,9 +166,9 @@ def test_prepare_data(): ) scheme = Scheme(model, parameters, {"dataset1": dataset}) - problem = Problem(scheme) + optimization_group = OptimizationGroup(scheme, model.get_dataset_groups()["default"]) - data = problem.data["dataset1"] + data = optimization_group.data["dataset1"] print(data) assert "data" in data assert "weight" in data @@ -185,8 +189,9 @@ def test_prepare_data(): assert model.valid() scheme = Scheme(model, parameters, {"dataset1": dataset}) - problem = Problem(scheme) - data = problem.data["dataset1"] + optimization_group = OptimizationGroup(scheme, model.get_dataset_groups()["default"]) + + data = optimization_group.data["dataset1"] assert np.all( data.weight.sel({"global": slice(0, 200), "model": slice(4, 8)}).values == 0.5 * 0.2 ) @@ -197,7 +202,9 @@ def test_prepare_data(): match="Ignoring model weight for dataset 'dataset1'" " because weight is already supplied by dataset.", ): - Problem(Scheme(model, parameters, {"dataset1": data})) + OptimizationGroup( + Scheme(model, parameters, {"dataset1": data}), model.get_dataset_groups()["default"] + ) def test_full_model_problem(): @@ -205,9 +212,9 @@ def test_full_model_problem(): scheme = Scheme( model=FullModel.model, parameters=FullModel.parameters, data={"dataset1": dataset} ) - problem = UngroupedProblem(scheme) + optimization_group = OptimizationGroup(scheme, FullModel.model.get_dataset_groups()["default"]) - result = problem.create_result_data()["dataset1"] + result = optimization_group.create_result_data()["dataset1"] assert "global_matrix" in result assert "global_clp_label" in result diff --git a/glotaran/analysis/test/test_penalties.py b/glotaran/analysis/test/test_penalties.py index 43f5c4874..c2a3c8fd4 100644 --- a/glotaran/analysis/test/test_penalties.py +++ b/glotaran/analysis/test/test_penalties.py @@ -3,8 +3,7 @@ import numpy as np import pytest -from glotaran.analysis.problem_grouped import GroupedProblem -from glotaran.analysis.problem_ungrouped import UngroupedProblem +from glotaran.analysis.optimization_group import OptimizationGroup from glotaran.analysis.simulation import simulate from glotaran.analysis.test.models import TwoCompartmentDecay as suite from glotaran.model import EqualAreaPenalty @@ -13,9 +12,10 @@ @pytest.mark.parametrize("index_dependent", [True, False]) -@pytest.mark.parametrize("grouped", [True, False]) -def test_penalties(index_dependent, grouped): +@pytest.mark.parametrize("link_clp", [True, False]) +def test_penalties(index_dependent, link_clp): model = deepcopy(suite.model) + model.dataset_group_models["default"].link_clp = link_clp model.megacomplex["m1"].is_index_dependent = index_dependent model.clp_area_penalties.append( EqualAreaPenalty.from_dict( @@ -33,7 +33,7 @@ def test_penalties(index_dependent, grouped): global_axis = np.arange(50) - print("grouped", grouped, "index_dependent", index_dependent) + print(f"{link_clp=}\n{index_dependent=}") dataset = simulate( suite.sim_model, "dataset1", @@ -41,13 +41,13 @@ def test_penalties(index_dependent, grouped): {"global": global_axis, "model": suite.model_axis}, ) scheme = Scheme(model=model, parameters=parameters, data={"dataset1": dataset}) - problem = GroupedProblem(scheme) if grouped else UngroupedProblem(scheme) + optimization_group = OptimizationGroup(scheme, model.get_dataset_groups()["default"]) - assert isinstance(problem.additional_penalty, np.ndarray) - assert problem.additional_penalty.size == 1 - assert problem.additional_penalty[0] != 0 - assert isinstance(problem.full_penalty, np.ndarray) + assert isinstance(optimization_group.additional_penalty, np.ndarray) + assert optimization_group.additional_penalty.size == 1 + assert optimization_group.additional_penalty[0] != 0 + assert isinstance(optimization_group.full_penalty, np.ndarray) assert ( - problem.full_penalty.size - == (suite.model_axis.size * global_axis.size) + problem.additional_penalty.size + optimization_group.full_penalty.size + == (suite.model_axis.size * global_axis.size) + optimization_group.additional_penalty.size ) diff --git a/glotaran/analysis/test/test_relations.py b/glotaran/analysis/test/test_relations.py index f515c59af..d8cb1e877 100644 --- a/glotaran/analysis/test/test_relations.py +++ b/glotaran/analysis/test/test_relations.py @@ -2,8 +2,7 @@ import pytest -from glotaran.analysis.problem_grouped import GroupedProblem -from glotaran.analysis.problem_ungrouped import UngroupedProblem +from glotaran.analysis.optimization_group import OptimizationGroup from glotaran.analysis.simulation import simulate from glotaran.analysis.test.models import TwoCompartmentDecay as suite from glotaran.model import Relation @@ -12,16 +11,17 @@ @pytest.mark.parametrize("index_dependent", [True, False]) -@pytest.mark.parametrize("grouped", [True, False]) -def test_relations(index_dependent, grouped): +@pytest.mark.parametrize("link_clp", [True, False]) +def test_relations(index_dependent, link_clp): model = deepcopy(suite.model) + model.dataset_group_models["default"].link_clp = link_clp model.megacomplex["m1"].is_index_dependent = index_dependent model.clp_relations.append( Relation.from_dict({"source": "s1", "target": "s2", "parameter": "3"}) ) parameters = ParameterGroup.from_list([11e-4, 22e-5, 2]) - print("grouped", grouped, "index_dependent", index_dependent) + print("link_clp", link_clp, "index_dependent", index_dependent) dataset = simulate( suite.sim_model, "dataset1", @@ -29,17 +29,23 @@ def test_relations(index_dependent, grouped): {"global": suite.global_axis, "model": suite.model_axis}, ) scheme = Scheme(model=model, parameters=parameters, data={"dataset1": dataset}) - problem = GroupedProblem(scheme) if grouped else UngroupedProblem(scheme) + optimization_group = OptimizationGroup(scheme, model.get_dataset_groups()["default"]) if index_dependent: reduced_matrix = ( - problem.reduced_matrices[0] if grouped else problem.reduced_matrices["dataset1"][0] + optimization_group.reduced_matrices[0] + if link_clp + else optimization_group.reduced_matrices["dataset1"][0] ) else: - reduced_matrix = problem.reduced_matrices["dataset1"] - matrix = problem.matrices["dataset1"][0] if index_dependent else problem.matrices["dataset1"] + reduced_matrix = optimization_group.reduced_matrices["dataset1"] + matrix = ( + optimization_group.matrices["dataset1"][0] + if index_dependent + else optimization_group.matrices["dataset1"] + ) - result_data = problem.create_result_data() + result_data = optimization_group.create_result_data() print(result_data) clps = result_data["dataset1"].clp diff --git a/glotaran/builtin/io/yml/test/test_save_model.py b/glotaran/builtin/io/yml/test/test_save_model.py index c51b0438e..4705defc0 100644 --- a/glotaran/builtin/io/yml/test/test_save_model.py +++ b/glotaran/builtin/io/yml/test/test_save_model.py @@ -12,10 +12,15 @@ want = """dataset: dataset1: + group: default initial_concentration: j1 irf: irf1 megacomplex: - m1 +dataset_groups: + default: + link_clp: null + residual_function: variable_projection default-megacomplex: decay initial_concentration: j1: diff --git a/glotaran/builtin/io/yml/test/test_save_scheme.py b/glotaran/builtin/io/yml/test/test_save_scheme.py index f44e909d2..00266e81c 100644 --- a/glotaran/builtin/io/yml/test/test_save_scheme.py +++ b/glotaran/builtin/io/yml/test/test_save_scheme.py @@ -19,15 +19,13 @@ want = """add_svd: true +clp_link_tolerance: 0.0 data_files: dataset_1: d.nc ftol: 1.0e-08 -group: null -group_tolerance: 0.0 gtol: 1.0e-08 maximum_number_function_evaluations: null model_file: m.yml -non_negative_least_squares: false optimization_method: TrustRegionReflection parameters_file: p.csv result_path: null diff --git a/glotaran/builtin/megacomplexes/decay/test/test_decay_megacomplex.py b/glotaran/builtin/megacomplexes/decay/test/test_decay_megacomplex.py index 939fb6c1b..162c1ea8d 100644 --- a/glotaran/builtin/megacomplexes/decay/test/test_decay_megacomplex.py +++ b/glotaran/builtin/megacomplexes/decay/test/test_decay_megacomplex.py @@ -233,6 +233,9 @@ def test_kinetic_model(suite, nnls): model = suite.model print(model.validate()) assert model.valid() + model.dataset_group_models["default"].method = ( + "non_negative_least_squares" if nnls else "variable_projection" + ) wanted_parameters = suite.wanted_parameters print(model.validate(wanted_parameters)) @@ -256,7 +259,6 @@ def test_kinetic_model(suite, nnls): parameters=initial_parameters, data=data, maximum_number_function_evaluations=20, - non_negative_least_squares=nnls, ) result = optimize(scheme) print(result.optimized_parameters) diff --git a/glotaran/deprecation/modules/test/__init__.py b/glotaran/deprecation/modules/test/__init__.py index 9e67df7e7..f2752ba5c 100644 --- a/glotaran/deprecation/modules/test/__init__.py +++ b/glotaran/deprecation/modules/test/__init__.py @@ -58,8 +58,10 @@ def deprecation_warning_on_call_test_helper( try: result = deprecated_callable(*args, **kwargs) - assert len(record) >= 1 - assert Path(record[0].filename) == Path(__file__) + assert len(record) >= 1, f"{len(record)=}" + assert Path(record[0].filename) == Path( + __file__ + ), f"{Path(record[0].filename)=}, {Path(__file__)=}" return record, result diff --git a/glotaran/deprecation/modules/test/test_project_scheme.py b/glotaran/deprecation/modules/test/test_project_scheme.py index 0bf65635a..8ec5292f4 100644 --- a/glotaran/deprecation/modules/test/test_project_scheme.py +++ b/glotaran/deprecation/modules/test/test_project_scheme.py @@ -1,18 +1,36 @@ """Test deprecated functionality in 'glotaran.project.schmeme'.""" from __future__ import annotations +from functools import lru_cache from typing import TYPE_CHECKING +import pytest import xarray as xr from glotaran.deprecation.modules.test import deprecation_warning_on_call_test_helper from glotaran.project.scheme import Scheme +from glotaran.testing.model_generators import SimpleModelGenerator if TYPE_CHECKING: from pathlib import Path -def test_Scheme_from_yaml_file_method(tmp_path: Path): +@lru_cache(maxsize=1) +def create_test_args(): + """Objects to initialize a ``Scheme`` for testing.""" + generator = SimpleModelGenerator( + rates=[501e-3, 202e-4, 105e-5], + irf={"center": 1.3, "width": 7.8}, + k_matrix="sequential", + ) + model, parameters = generator.model_and_parameters + dataset = xr.DataArray([[1, 2, 3]], coords=[("e", [1]), ("c", [1, 2, 3])]).to_dataset( + name="data" + ) + return model, parameters, dataset + + +def test_scheme_from_yaml_file_method(tmp_path: Path): """Create Scheme from file.""" scheme_path = tmp_path / "scheme.yml" @@ -40,7 +58,6 @@ def test_Scheme_from_yaml_file_method(tmp_path: Path): f""" model_file: {model_path} parameters_file: {parameter_path} - non_negative_least_squares: True maximum_number_function_evaluations: 42 data_files: dataset1: {dataset_path}""" @@ -51,3 +68,61 @@ def test_Scheme_from_yaml_file_method(tmp_path: Path): ) assert isinstance(result, Scheme) + + +def test_scheme_group_tolerance(): + """Argument ``group_tolerance`` raises deprecation and maps to ``clp_link_tolerance``.""" + model, parameters, dataset = create_test_args() + + warnings, result = deprecation_warning_on_call_test_helper( + Scheme, + args=(model, parameters, {"dataset": dataset}), + kwargs={"group_tolerance": 1}, + raise_exception=True, + ) + + assert isinstance(result, Scheme) + assert result.clp_link_tolerance == 1 + assert "glotaran.project.Scheme(..., clp_link_tolerance=...)" in warnings[0].message.args[0] + + +@pytest.mark.parametrize( + "group", + (True, False), +) +def test_scheme_group(group: bool): + """Argument ``group`` raises deprecation and maps to ``dataset_groups.default.link_clp``.""" + model, parameters, dataset = create_test_args() + + warnings, result = deprecation_warning_on_call_test_helper( + Scheme, + args=(model, parameters, {"dataset": dataset}), + kwargs={"group": group}, + raise_exception=True, + ) + + assert isinstance(result, Scheme) + assert result.model.dataset_group_models["default"].link_clp == group + assert "dataset_groups.default.link_clp" in warnings[0].message.args[0] + + +@pytest.mark.parametrize( + "non_negative_least_squares, expected", + ((True, "non_negative_least_squares"), (False, "variable_projection")), +) +def test_scheme_non_negative_least_squares(non_negative_least_squares: bool, expected: str): + """Argument ``non_negative_least_squares`` raises deprecation and maps to + ``dataset_groups.default.residual_function``. + """ + model, parameters, dataset = create_test_args() + + warnings, result = deprecation_warning_on_call_test_helper( + Scheme, + args=(model, parameters, {"dataset": dataset}), + kwargs={"non_negative_least_squares": non_negative_least_squares}, + raise_exception=True, + ) + + assert isinstance(result, Scheme) + assert result.model.dataset_group_models["default"].residual_function == expected + assert "dataset_groups.default.residual_function" in warnings[0].message.args[0] diff --git a/glotaran/model/__init__.py b/glotaran/model/__init__.py index fc07bcdf3..6c2a6d493 100644 --- a/glotaran/model/__init__.py +++ b/glotaran/model/__init__.py @@ -8,6 +8,8 @@ from glotaran.model.constraint import Constraint from glotaran.model.constraint import OnlyConstraint from glotaran.model.constraint import ZeroConstraint +from glotaran.model.dataset_group import DatasetGroup +from glotaran.model.dataset_group import DatasetGroupModel from glotaran.model.dataset_model import DatasetModel from glotaran.model.item import model_item from glotaran.model.item import model_item_typed diff --git a/glotaran/model/dataset_group.py b/glotaran/model/dataset_group.py new file mode 100644 index 000000000..f41f6f43f --- /dev/null +++ b/glotaran/model/dataset_group.py @@ -0,0 +1,26 @@ +from __future__ import annotations + +from dataclasses import dataclass +from dataclasses import field +from typing import Literal + +from glotaran.model.dataset_model import DatasetModel + + +@dataclass +class DatasetGroupModel: + """A group of datasets which will evaluated independently.""" + + residual_function: Literal[ + "variable_projection", "non_negative_least_squares" + ] = "variable_projection" + """The residual function to use.""" + + link_clp: bool | None = None + """Whether to link the clp parameter.""" + + +@dataclass +class DatasetGroup: + model: DatasetGroupModel + dataset_models: dict[str, DatasetModel] = field(default_factory=dict) diff --git a/glotaran/model/model.py b/glotaran/model/model.py index 1813c652b..cf7b053dc 100644 --- a/glotaran/model/model.py +++ b/glotaran/model/model.py @@ -2,6 +2,7 @@ from __future__ import annotations import copy +from dataclasses import asdict from typing import Any from typing import List from warnings import warn @@ -11,6 +12,8 @@ from glotaran.deprecation import raise_deprecation_error from glotaran.model.clp_penalties import EqualAreaPenalty from glotaran.model.constraint import Constraint +from glotaran.model.dataset_group import DatasetGroup +from glotaran.model.dataset_group import DatasetGroupModel from glotaran.model.dataset_model import create_dataset_model_type from glotaran.model.megacomplex import Megacomplex from glotaran.model.megacomplex import create_model_megacomplex_type @@ -30,6 +33,7 @@ } default_dataset_properties = { + "group": {"type": str, "default": "default"}, "megacomplex": List[str], "megacomplex_scale": {"type": List[Parameter], "allow_none": True}, "global_megacomplex": {"type": List[str], "allow_none": True}, @@ -46,10 +50,15 @@ def __init__( *, megacomplex_types: dict[str, type[Megacomplex]], default_megacomplex_type: str | None = None, + dataset_group_models: dict[str, DatasetGroupModel] = None, ): self._megacomplex_types = megacomplex_types self._default_megacomplex_type = default_megacomplex_type or next(iter(megacomplex_types)) + self._dataset_group_models = dataset_group_models or {"default": DatasetGroupModel()} + if "default" not in self._dataset_group_models: + self._dataset_group_models["default"] = DatasetGroupModel() + self._model_items = {} self._dataset_properties = {} self._add_default_items_and_properties() @@ -93,8 +102,16 @@ def from_dict( if "default-megacomplex" in model_dict: model_dict.pop("default-megacomplex", None) + dataset_group_models = model_dict.pop("dataset_groups", None) + if dataset_group_models is not None: + dataset_group_models = { + label: DatasetGroupModel(**group) for label, group in dataset_group_models.items() + } + model = cls( - megacomplex_types=megacomplex_types, default_megacomplex_type=default_megacomplex_type + megacomplex_types=megacomplex_types, + default_megacomplex_type=default_megacomplex_type, + dataset_group_models=dataset_group_models, ) # iterate over items @@ -247,6 +264,10 @@ def megacomplex_types(self) -> dict[str, type[Megacomplex]]: """The megacomplex types used by this model.""" return self._megacomplex_types + @property + def dataset_group_models(self) -> dict[str, DatasetGroupModel]: + return self._dataset_group_models + @property def model_items(self) -> dict[str, type[object]]: """The model_items types used by this model.""" @@ -257,8 +278,25 @@ def global_megacomplex(self) -> dict[str, Megacomplex]: """Alias for `glotaran.model.megacomplex`. Needed internally.""" return self.megacomplex + def get_dataset_groups(self) -> dict[str, DatasetGroup]: + groups = {} + for dataset_model in self.dataset.values(): + group = dataset_model.group + if group not in groups: + try: + groups[group] = DatasetGroup(model=self.dataset_group_models[group]) + except KeyError: + raise ValueError(f"Unknown dataset group '{group}'") + groups[group].dataset_models[dataset_model.label] = dataset_model + return groups + def as_dict(self) -> dict: - model_dict = {"default-megacomplex": self.default_megacomplex} + model_dict = { + "default-megacomplex": self.default_megacomplex, + "dataset_groups": { + label: asdict(group) for label, group in self.dataset_group_models.items() + }, + } for item_name in self._model_items: items = getattr(self, item_name) if len(items) == 0: @@ -284,15 +322,16 @@ def need_index_dependent(self) -> bool: return any(i.interval is not None for i in self.clp_constraints + self.clp_relations) def is_groupable(self, parameters: ParameterGroup, data: dict[str, xr.DataArray]) -> bool: - if any(d.has_global_model() for d in self.dataset.values()): + dataset_models = {label: self.dataset[label] for label in data} + if any(d.has_global_model() for d in dataset_models.values()): return False global_dimensions = { d.fill(self, parameters).set_data(data[k]).get_global_dimension() - for k, d in self.dataset.items() + for k, d in dataset_models.items() } model_dimensions = { d.fill(self, parameters).set_data(data[k]).get_model_dimension() - for k, d in self.dataset.items() + for k, d in dataset_models.items() } return len(global_dimensions) == 1 and len(model_dimensions) == 1 diff --git a/glotaran/model/test/test_model.py b/glotaran/model/test/test_model.py index 14eff9741..b80a4c145 100644 --- a/glotaran/model/test/test_model.py +++ b/glotaran/model/test/test_model.py @@ -103,6 +103,9 @@ def test_model_dict(): "m1": {"test_item1": "t2"}, "m2": {"type": "type5", "dimension": "model2"}, }, + "dataset_groups": { + "testgroup": {"residual_function": "non_negative_least_squares", "link_clp": True} + }, "weights": [ { "datasets": ["d1", "d2"], @@ -141,6 +144,7 @@ def test_model_dict(): "test_item_dataset": "t2", "test_property_dataset1": 1, "test_property_dataset2": "bar", + "group": "testgroup", }, }, } @@ -283,6 +287,28 @@ def test_model_misc(test_model: Model): assert test_model.megacomplex["m2"].dimension == "model2" +def test_dataset_group_models(test_model: Model): + groups = test_model.dataset_group_models + assert "default" in groups + assert groups["default"].residual_function == "variable_projection" + assert groups["default"].link_clp is None + assert "testgroup" in groups + assert groups["testgroup"].residual_function == "non_negative_least_squares" + assert groups["testgroup"].link_clp + + +def test_dataset_groups(test_model: Model): + groups = test_model.get_dataset_groups() + assert "default" in groups + assert groups["default"].model.residual_function == "variable_projection" + assert groups["default"].model.link_clp is None + assert "dataset1" in groups["default"].dataset_models + assert "testgroup" in groups + assert groups["testgroup"].model.residual_function == "non_negative_least_squares" + assert groups["testgroup"].model.link_clp + assert "dataset2" in groups["testgroup"].dataset_models + + def test_model_validity(test_model: Model, model_error: Model, parameter: ParameterGroup): print(test_model.test_item1["t1"]) print(test_model.problem_list()) @@ -390,10 +416,14 @@ def test_model_as_dict(): "number": 21, }, }, + "dataset_groups": { + "default": {"link_clp": None, "residual_function": "variable_projection"} + }, "dataset": { "dataset1": { "megacomplex": ["m1"], "scale": "scale_1", + "group": "default", }, }, } diff --git a/glotaran/project/scheme.py b/glotaran/project/scheme.py index c62e9b01e..f47a3b252 100644 --- a/glotaran/project/scheme.py +++ b/glotaran/project/scheme.py @@ -1,11 +1,12 @@ """The module for :class:``Scheme``.""" from __future__ import annotations -import warnings from dataclasses import dataclass +from dataclasses import fields from typing import TYPE_CHECKING from glotaran.deprecation import deprecate +from glotaran.deprecation import warn_deprecated from glotaran.io import load_dataset from glotaran.io import load_model from glotaran.io import load_parameters @@ -37,10 +38,11 @@ class Scheme: model_file: str | None = file_representation_field("model", load_model, default=None) parameters_file: str | None = file_representation_field("parameters", load_parameters, None) data_files: dict[str, str] | None = file_representation_field("data", load_dataset, None) - group: bool | None = None - group_tolerance: float = 0.0 - non_negative_least_squares: bool = False + clp_link_tolerance: float = 0.0 maximum_number_function_evaluations: int | None = None + non_negative_least_squares: bool | None = exclude_from_dict_field(None) + group_tolerance: float | None = exclude_from_dict_field(None) + group: bool | None = exclude_from_dict_field(None) add_svd: bool = True ftol: float = 1e-8 gtol: float = 1e-8 @@ -52,6 +54,50 @@ class Scheme: ] = "TrustRegionReflection" result_path: str | None = None + def __post_init__(self): + """Override attributes after initialization.""" + if self.non_negative_least_squares is not None: + warn_deprecated( + deprecated_qual_name_usage=( + "glotaran.project.Scheme(..., non_negative_least_squares=...)" + ), + new_qual_name_usage="dataset_groups.default.residual_function", + to_be_removed_in_version="0.7.0", + check_qual_names=(True, False), + stacklevel=4, + ) + + default_group = self.model.dataset_group_models["default"] + if self.non_negative_least_squares is True: + default_group.residual_function = "non_negative_least_squares" + else: + default_group.residual_function = "variable_projection" + for field in fields(self): + if field.name == "non_negative_least_squares": + field.metadata = {} + + if self.group is not None: + warn_deprecated( + deprecated_qual_name_usage="glotaran.project.Scheme(..., group=...)", + new_qual_name_usage="dataset_groups.default.link_clp", + to_be_removed_in_version="0.7.0", + check_qual_names=(True, False), + stacklevel=4, + ) + self.model.dataset_group_models["default"].link_clp = self.group + for field in fields(self): + if field.name == "group": + field.metadata = {} + + if self.group_tolerance is not None: + warn_deprecated( + deprecated_qual_name_usage="glotaran.project.Scheme(..., group_tolerance=...)", + new_qual_name_usage="glotaran.project.Scheme(..., clp_link_tolerance=...)", + to_be_removed_in_version="0.7.0", + stacklevel=4, + ) + self.clp_link_tolerance = self.group_tolerance + def problem_list(self) -> list[str]: """Return a list with all problems in the model and missing parameters. @@ -96,27 +142,16 @@ def markdown(self): markdown_str = "\n\n" markdown_str += "__Scheme__\n\n" - markdown_str += f"* *nnls*: {self.non_negative_least_squares}\n" - markdown_str += f"* *nfev*: {self.maximum_number_function_evaluations}\n" - markdown_str += f"* *group_tolerance*: {self.group_tolerance}\n" + if self.non_negative_least_squares is not None: + markdown_str += f"* *non_negative_least_squares*: {self.non_negative_least_squares}\n" + markdown_str += ( + "* *maximum_number_function_evaluations*: " + f"{self.maximum_number_function_evaluations}\n" + ) + markdown_str += f"* *clp_link_tolerance*: {self.clp_link_tolerance}\n" return model_markdown_str + MarkdownStr(markdown_str) - def is_grouped(self) -> bool: - """Return whether the scheme should be grouped. - - Returns - ------- - bool - Weather the scheme should be grouped. - """ - if self.group is not None and not self.group: - return False - is_groupable = self.model.is_groupable(self.parameters, self.data) - if not is_groupable and self.group is not None: - warnings.warn("Cannot group scheme. Continuing ungrouped.") - return is_groupable - def _repr_markdown_(self) -> str: """Return a markdown representation str. diff --git a/glotaran/project/test/test_scheme.py b/glotaran/project/test/test_scheme.py index 57e04802e..2226a54dd 100644 --- a/glotaran/project/test/test_scheme.py +++ b/glotaran/project/test/test_scheme.py @@ -34,7 +34,6 @@ def mock_scheme(tmp_path: Path) -> Scheme: scheme_yml_str = f""" model_file: {model_path} parameters_file: {parameter_path} - non_negative_least_squares: True maximum_number_function_evaluations: 42 data_files: dataset1: {dataset_path} @@ -55,7 +54,6 @@ def test_scheme(mock_scheme: Scheme): assert mock_scheme.parameters.get("1") == 1.0 assert mock_scheme.parameters.get("2") == 67.0 - assert mock_scheme.non_negative_least_squares assert mock_scheme.maximum_number_function_evaluations == 42 assert "dataset1" in mock_scheme.data diff --git a/glotaran/test/test_spectral_decay.py b/glotaran/test/test_spectral_decay.py index bede508b8..e69edc9f3 100644 --- a/glotaran/test/test_spectral_decay.py +++ b/glotaran/test/test_spectral_decay.py @@ -240,11 +240,15 @@ class ThreeComponentSequential: ], ) @pytest.mark.parametrize("nnls", [True, False]) -def test_kinetic_model(suite, nnls): +def test_decay_model(suite, nnls): model = suite.model print(model.validate()) assert model.valid() + model.dataset_group_models["default"].link_clp = False + model.dataset_group_models["default"].method = ( + "non_negative_least_squares" if nnls else "variable_projection" + ) wanted_parameters = suite.wanted_parameters print(model.validate(wanted_parameters)) @@ -268,8 +272,6 @@ def test_kinetic_model(suite, nnls): parameters=initial_parameters, data=data, maximum_number_function_evaluations=20, - non_negative_least_squares=nnls, - group=False, ) result = optimize(scheme) print(result.optimized_parameters) diff --git a/glotaran/test/test_spectral_decay_full_model.py b/glotaran/test/test_spectral_decay_full_model.py index b21df0e52..070cea158 100644 --- a/glotaran/test/test_spectral_decay_full_model.py +++ b/glotaran/test/test_spectral_decay_full_model.py @@ -180,6 +180,9 @@ def test_kinetic_model(suite, nnls): model = suite.model print(model.validate()) assert model.valid() + model.dataset_group_models["default"].method = ( + "non_negative_least_squares" if nnls else "variable_projection" + ) wanted_parameters = suite.wanted_parameters print(model.validate(wanted_parameters)) @@ -203,8 +206,6 @@ def test_kinetic_model(suite, nnls): parameters=initial_parameters, data=data, maximum_number_function_evaluations=20, - non_negative_least_squares=nnls, - group=False, ) result = optimize(scheme) print(result.optimized_parameters) diff --git a/glotaran/test/test_spectral_penalties.py b/glotaran/test/test_spectral_penalties.py index 2d07f5650..6bebb8075 100644 --- a/glotaran/test/test_spectral_penalties.py +++ b/glotaran/test/test_spectral_penalties.py @@ -216,8 +216,7 @@ def test_equal_area_penalties(debug=False): # for both we perturb kinetic parameters a bit to give the optimizer some work pspec_wp = dict(deepcopy(pspec.base), **pspec.equal_area) pspec_wp["kinetic"] = [v * 1.01 for v in pspec_wp["kinetic"]] - pspec_wp.update({"i": [[1, {"vary": False}], 1]}) - + pspec_wp["i"] = [[1, {"vary": False}], 1] pspec_np = dict(deepcopy(pspec.base)) param_wp = ParameterGroup.from_dict(pspec_wp) @@ -245,23 +244,29 @@ def test_equal_area_penalties(debug=False): # %% Optimizing model without penalty (np) + model_np.dataset_group_models["default"].method = ( + "non_negative_least_squares" if optim_spec.nnls else "variable_projection" + ) + dataset = {"dataset1": data} scheme_np = Scheme( model=model_np, parameters=param_np, data=dataset, - non_negative_least_squares=optim_spec.nnls, maximum_number_function_evaluations=optim_spec.max_nfev, ) result_np = optimize(scheme_np) print(result_np) + model_wp.dataset_group_models["default"].method = ( + "non_negative_least_squares" if optim_spec.nnls else "variable_projection" + ) + # %% Optimizing model with penalty fixed inputs (wp_ifix) scheme_wp = Scheme( model=model_wp, parameters=param_wp, data=dataset, - non_negative_least_squares=optim_spec.nnls, maximum_number_function_evaluations=optim_spec.max_nfev, ) result_wp = optimize(scheme_wp)