-
Notifications
You must be signed in to change notification settings - Fork 47
/
Copy pathbase.py
714 lines (617 loc) · 22.4 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
import copy
import logging
from abc import ABC, abstractmethod
from collections.abc import Iterable, Sequence
from typing import Optional, Union
import numpy as np
import pandas as pd
from ..C import FVAL, GRAD, HESS, MODE_FUN, MODE_RES, RES, SRES, ModeType
from ..history import NoHistory, create_history
from .pre_post_process import FixedParametersProcessor, PrePostProcessor
ResultDict = dict[str, Union[float, np.ndarray, dict]]
logger = logging.getLogger(__name__)
class ObjectiveBase(ABC):
"""
Abstract objective class.
The objective class is a simple wrapper around the objective function,
giving a standardized way of calling. Apart from that, it manages several
things including fixing of parameters and history.
The objective function is assumed to be in the format of a cost function,
log-likelihood function, or log-posterior function. These functions are
subject to minimization. For profiling and sampling, the sign is internally
flipped, all returned and stored values are however given as returned
by this objective function. If maximization is to be performed, the sign
should be flipped before creating the objective function.
Parameters
----------
x_names:
Parameter names that can be optionally used in, e.g., history or
gradient checks.
Attributes
----------
history:
For storing the call history. Initialized by the methods, e.g. the
optimizer, in `initialize_history()`.
pre_post_processor:
Preprocess input values to and postprocess output values from
__call__. Configured in `update_from_problem()`.
"""
def __init__(
self,
x_names: Optional[Sequence[str]] = None,
):
self._x_names = x_names
self.pre_post_processor = PrePostProcessor()
self.history = NoHistory()
def __deepcopy__(self, memodict=None) -> "ObjectiveBase":
"""Create deepcopy of objective object."""
other = type(self)() # maintain type for derived classes
for attr, val in self.__dict__.items():
other.__dict__[attr] = copy.deepcopy(val)
return other
# The following has_ properties can be used to find out what values
# the objective supports.
@property
def has_fun(self) -> bool:
"""Check whether function is defined."""
return self.check_sensi_orders((0,), MODE_FUN)
@property
def has_grad(self) -> bool:
"""Check whether gradient is defined."""
return self.check_sensi_orders((1,), MODE_FUN)
@property
def has_hess(self) -> bool:
"""Check whether Hessian is defined."""
return self.check_sensi_orders((2,), MODE_FUN)
@property
def has_hessp(self) -> bool:
"""Check whether Hessian-vector product is defined."""
# Not supported yet
return False
@property
def has_res(self) -> bool:
"""Check whether residuals are defined."""
return self.check_sensi_orders((0,), MODE_RES)
@property
def has_sres(self) -> bool:
"""Check whether residual sensitivities are defined."""
return self.check_sensi_orders((1,), MODE_RES)
@property
def x_names(self) -> Union[list[str], None]:
"""Parameter names."""
if self._x_names is None:
return self._x_names
# change from numpy array with `str_` dtype to list with `str` dtype
# to avoid issues when writing to hdf (and correctness of typehint)
return [
str(name)
for name in self.pre_post_processor.reduce(
np.asarray(self._x_names)
)
]
def initialize(self):
"""
Initialize the objective function.
This function is used at the beginning of an analysis, e.g.
optimization, and can e.g. reset the objective memory.
By default does nothing.
"""
def create_history(self, id, x_names, options):
"""See `history.generate.create_history` documentation."""
return create_history(id, x_names, options)
def __call__(
self,
x: np.ndarray,
sensi_orders: tuple[int, ...] = (0,),
mode: ModeType = MODE_FUN,
return_dict: bool = False,
**kwargs,
) -> Union[float, np.ndarray, tuple, ResultDict]:
"""
Obtain arbitrary sensitivities.
This is the central method which is always called, also by the
get_* methods.
There are different ways in which an optimizer calls the objective
function, and in how the objective function provides information
(e.g. derivatives via separate functions or along with the function
values). The different calling modes increase efficiency in space
and time and make the objective flexible.
Parameters
----------
x:
The parameters for which to evaluate the objective function.
sensi_orders:
Specifies which sensitivities to compute, e.g. (0,1) -> fval, grad.
mode:
Whether to compute function values or residuals.
return_dict:
If False (default), the result is a Tuple of the requested values
in the requested order. Tuples of length one are flattened.
If True, instead a dict is returned which can carry further
information.
Returns
-------
result:
By default, this is a tuple of the requested function values
and derivatives in the requested order (if only 1 value, the tuple
is flattened). If `return_dict`, then instead a dict is returned
with function values and derivatives indicated by ids.
"""
# copy parameter vector to prevent side effects
# np.array creates a copy of x already
x = np.array(x)
# check input
if not self.check_mode(mode):
raise ValueError(
f"This Objective cannot be called with mode" f"={mode}."
)
if not self.check_sensi_orders(sensi_orders, mode):
raise ValueError(
f"This Objective cannot be called with "
f"sensi_orders= {sensi_orders} and mode={mode}."
)
# pre-process
x_full = self.pre_post_processor.preprocess(x=x)
# compute result
result = self.call_unprocessed(
x=x_full, sensi_orders=sensi_orders, mode=mode, **kwargs
)
# post-process
result = self.pre_post_processor.postprocess(result=result)
# update history
self.history.update(
x=x, sensi_orders=sensi_orders, mode=mode, result=result
)
# map to output format
if not return_dict:
result = ObjectiveBase.output_to_tuple(
sensi_orders=sensi_orders, mode=mode, **result
)
return result
@abstractmethod
def call_unprocessed(
self,
x: np.ndarray,
sensi_orders: tuple[int, ...],
mode: ModeType,
**kwargs,
) -> ResultDict:
"""
Call objective function without pre- or post-processing and formatting.
Parameters
----------
x:
The parameters for which to evaluate the objective function.
sensi_orders:
Specifies which sensitivities to compute, e.g. (0,1) -> fval, grad.
mode:
Whether to compute function values or residuals.
Returns
-------
result:
A dict containing the results.
"""
def check_mode(self, mode: ModeType) -> bool:
"""
Check if the objective is able to compute in the requested mode.
Either `check_mode` or the `fun_...` functions
must be overwritten in derived classes.
Parameters
----------
mode:
Whether to compute function values or residuals.
Returns
-------
flag:
Boolean indicating whether mode is supported
"""
if mode == MODE_FUN:
return self.has_fun
elif mode == MODE_RES:
return self.has_res
else:
raise ValueError(f"Unknown mode {mode}.")
def get_config(self) -> dict:
"""
Get the configuration information of the objective function.
Return it as a dictionary.
"""
info = {"type": self.__class__.__name__}
return info
def check_sensi_orders(
self,
sensi_orders: tuple[int, ...],
mode: ModeType,
) -> bool:
"""
Check if the objective is able to compute the requested sensitivities.
Either `check_sensi_orders` or the `fun_...` functions
must be overwritten in derived classes.
Parameters
----------
sensi_orders:
Specifies which sensitivities to compute, e.g. (0,1) -> fval, grad.
mode:
Whether to compute function values or residuals.
Returns
-------
flag:
Boolean indicating whether combination of sensi_orders and mode
is supported
"""
if not sensi_orders:
return True
if (
mode == MODE_FUN
and (
0 in sensi_orders
and not self.has_fun
or 1 in sensi_orders
and not self.has_grad
or 2 in sensi_orders
and not self.has_hess
or max(sensi_orders) > 2
)
) or (
mode == MODE_RES
and (
0 in sensi_orders
and not self.has_res
or 1 in sensi_orders
and not self.has_sres
or max(sensi_orders) > 1
)
):
return False
return True
@staticmethod
def output_to_tuple(
sensi_orders: tuple[int, ...],
mode: ModeType,
**kwargs: Union[float, np.ndarray],
) -> tuple:
"""
Return values as requested by the caller.
Usually only a subset of outputs is demanded. One output is returned
as-is, more than one output are returned as a tuple in order (fval,
grad, hess).
"""
output = ()
if mode == MODE_FUN:
if 0 in sensi_orders:
output += (kwargs[FVAL],)
if 1 in sensi_orders:
output += (kwargs[GRAD],)
if 2 in sensi_orders:
output += (kwargs[HESS],)
elif mode == MODE_RES:
if 0 in sensi_orders:
output += (kwargs[RES],)
if 1 in sensi_orders:
output += (kwargs[SRES],)
if len(output) == 1:
output = output[0]
return output
# The following are convenience functions for getting specific outputs.
def get_fval(self, x: np.ndarray) -> float:
"""Get the function value at x."""
fval = self(x, (0,), MODE_FUN)
return fval
def get_grad(self, x: np.ndarray) -> np.ndarray:
"""Get the gradient at x."""
grad = self(x, (1,), MODE_FUN)
return grad
def get_hess(self, x: np.ndarray) -> np.ndarray:
"""Get the Hessian at x."""
hess = self(x, (2,), MODE_FUN)
return hess
def get_res(self, x: np.ndarray) -> np.ndarray:
"""Get the residuals at x."""
res = self(x, (0,), MODE_RES)
return res
def get_sres(self, x: np.ndarray) -> np.ndarray:
"""Get the residual sensitivities at x."""
sres = self(x, (1,), MODE_RES)
return sres
def update_from_problem(
self,
dim_full: int,
x_free_indices: Sequence[int],
x_fixed_indices: Sequence[int],
x_fixed_vals: Sequence[float],
):
"""
Handle fixed parameters.
Later, the objective will be given parameter vectors x of dimension
dim, which have to be filled up with fixed parameter values to form
a vector of dimension dim_full >= dim. This vector is then used to
compute function value and derivatives. The derivatives must later
be reduced again to dimension dim.
This is so as to make the fixing of parameters transparent to the
caller.
The methods preprocess, postprocess are overwritten for the above
functionality, respectively.
Parameters
----------
dim_full:
Dimension of the full vector including fixed parameters.
x_free_indices:
Vector containing the indices (zero-based) of free parameters
(complimentary to x_fixed_indices).
x_fixed_indices:
Vector containing the indices (zero-based) of parameter components
that are not to be optimized.
x_fixed_vals:
Vector of the same length as x_fixed_indices, containing the values
of the fixed parameters.
"""
self.pre_post_processor = FixedParametersProcessor(
dim_full=dim_full,
x_free_indices=x_free_indices,
x_fixed_indices=x_fixed_indices,
x_fixed_vals=x_fixed_vals,
)
def check_grad_multi_eps(
self,
*args,
multi_eps: Optional[Iterable] = None,
label: str = "rel_err",
**kwargs,
):
"""
Compare gradient evaluation.
Equivalent to the `ObjectiveBase.check_grad` method, except multiple
finite difference step sizes are tested. The result contains the
lowest finite difference for each parameter, and the corresponding
finite difference step size.
Parameters
----------
All `ObjectiveBase.check_grad` method parameters.
multi_eps:
The finite difference step sizes to be tested.
label:
The label of the column that will be minimized for each parameter.
Valid options are the column labels of the dataframe returned by
the `ObjectiveBase.check_grad` method.
"""
if "eps" in kwargs:
raise KeyError(
"Please use the `multi_eps` (not the `eps`) argument with "
"`check_grad_multi_eps` to specify step sizes."
)
if multi_eps is None:
multi_eps = {1e-1, 1e-3, 1e-5, 1e-7, 1e-9}
results = {}
for eps in multi_eps:
results[eps] = self.check_grad(*args, **kwargs, eps=eps)
# The combined result is, for each parameter, the gradient check from
# the step size (`eps`) that produced the smallest error (`label`).
combined_result = None
for eps, result in results.items():
result["eps"] = eps
if combined_result is None:
combined_result = result
continue
# Replace rows in `combined_result` with corresponding rows
# in `result` that have an improved value in column `label`.
mask_improvements = result[label] < combined_result[label]
combined_result.loc[mask_improvements, :] = result.loc[
mask_improvements, :
]
return combined_result
def check_grad(
self,
x: np.ndarray,
x_indices: Sequence[int] = None,
eps: float = 1e-5,
verbosity: int = 1,
mode: ModeType = MODE_FUN,
order: int = 0,
detailed: bool = False,
) -> pd.DataFrame:
"""
Compare gradient evaluation.
Firstly approximate via finite differences, and secondly use the
objective gradient.
Parameters
----------
x:
The parameters for which to evaluate the gradient.
x_indices:
Indices for which to compute gradients. Default: all.
eps:
Finite differences step size.
verbosity:
Level of verbosity for function output.
0: no output,
1: summary for all parameters,
2: summary for individual parameters.
mode:
Residual (MODE_RES) or objective function value (MODE_FUN)
computation mode.
order:
Derivative order, either gradient (0) or Hessian (1).
detailed:
Toggle whether additional values are returned. Additional values
are function values, and the central difference weighted by the
difference in output from all methods (standard deviation and
mean).
Returns
-------
result:
gradient, finite difference approximations and error estimates.
"""
if x_indices is None:
x_indices = list(range(len(x)))
# function value and objective gradient
fval, grad = self(x, (0 + order, 1 + order), mode)
grad_list = []
fd_f_list = []
fd_b_list = []
fd_c_list = []
fd_err_list = []
abs_err_list = []
rel_err_list = []
if detailed:
fval_p_list = []
fval_m_list = []
std_check_list = []
mean_check_list = []
# loop over indices
for ix in x_indices:
# forward (plus) point
x_p = copy.deepcopy(x)
x_p[ix] += eps
fval_p = self(x_p, (0,), mode)
# backward (minus) point
x_m = copy.deepcopy(x)
x_m[ix] -= eps
fval_m = self(x_m, (0,), mode)
# finite differences
fd_f_ix = (fval_p - fval) / eps
fd_b_ix = (fval - fval_m) / eps
fd_c_ix = (fval_p - fval_m) / (2 * eps)
# gradient in direction ix
grad_ix = grad[ix] if grad.ndim == 1 else grad[:, ix]
# errors
fd_err_ix = abs(fd_f_ix - fd_b_ix)
abs_err_ix = abs(grad_ix - fd_c_ix)
rel_err_ix = abs(abs_err_ix / (fd_c_ix + eps))
if detailed:
std_check_ix = (grad_ix - fd_c_ix) / np.std(
[fd_f_ix, fd_b_ix, fd_c_ix]
)
mean_check_ix = abs(grad_ix - fd_c_ix) / np.mean(
[
abs(fd_f_ix - fd_b_ix),
abs(fd_f_ix - fd_c_ix),
abs(fd_b_ix - fd_c_ix),
]
)
# log for dimension ix
if verbosity > 1:
logger.info(
f"index: {ix}\n"
f"grad: {grad_ix}\n"
f"fd_f: {fd_f_ix}\n"
f"fd_b: {fd_b_ix}\n"
f"fd_c: {fd_c_ix}\n"
f"fd_err: {fd_err_ix}\n"
f"abs_err: {abs_err_ix}\n"
f"rel_err: {rel_err_ix}\n"
)
# append to lists
grad_list.append(grad_ix)
fd_f_list.append(fd_f_ix)
fd_b_list.append(fd_b_ix)
fd_c_list.append(fd_c_ix)
fd_err_list.append(np.mean(fd_err_ix))
abs_err_list.append(np.mean(abs_err_ix))
rel_err_list.append(np.mean(rel_err_ix))
if detailed:
fval_p_list.append(fval_p)
fval_m_list.append(fval_m)
std_check_list.append(std_check_ix)
mean_check_list.append(mean_check_ix)
# create data dictionary for dataframe
data = {
"grad": grad_list,
"fd_f": fd_f_list,
"fd_b": fd_b_list,
"fd_c": fd_c_list,
"fd_err": fd_err_list,
"abs_err": abs_err_list,
"rel_err": rel_err_list,
}
# update data dictionary if detailed output is requested
if detailed:
prefix_data = {
"fval": [fval] * len(x_indices),
"fval_p": fval_p_list,
"fval_m": fval_m_list,
}
std_str = "(grad-fd_c)/std({fd_f,fd_b,fd_c})"
mean_str = "|grad-fd_c|/mean(|fd_f-fd_b|,|fd_f-fd_c|,|fd_b-fd_c|)"
postfix_data = {
std_str: std_check_list,
mean_str: mean_check_list,
}
data = {**prefix_data, **data, **postfix_data}
# create dataframe
result = pd.DataFrame(
data=data,
index=[
self.x_names[ix] if self.x_names is not None else f"x_{ix}"
for ix in x_indices
],
)
# log full result
if verbosity > 0:
logger.info(result)
return result
def check_gradients_match_finite_differences(
self,
*args,
x: np.ndarray = None,
x_free: Sequence[int] = None,
rtol: float = 1e-2,
atol: float = 1e-3,
mode: ModeType = None,
order: int = 0,
multi_eps=None,
**kwargs,
) -> bool:
"""Check if gradients match finite differences (FDs).
Parameters
----------
rtol: relative error tolerance
x: The parameters for which to evaluate the gradient
x_free: Indices for which to compute gradients
rtol: relative error tolerance
atol: absolute error tolerance
mode: function values or residuals
order: gradient order, 0 for gradient, 1 for hessian
multi_eps: multiple test step width for FDs
Returns
-------
bool
Indicates whether gradients match (True) FDs or not (False)
"""
par = np.asarray(x)
if x_free is None:
free_indices = par
else:
free_indices = par[x_free]
dfs = []
if mode is None:
modes = [MODE_FUN, MODE_RES]
else:
modes = [mode]
if multi_eps is None:
multi_eps = np.array([10 ** (-i) for i in range(3, 9)])
for mode in modes:
try:
dfs.append(
self.check_grad_multi_eps(
free_indices,
*args,
**kwargs,
mode=mode,
multi_eps=multi_eps,
)
)
except (RuntimeError, ValueError):
# Might happen in case PEtab problem not well defined or
# fails for specified tolerances in forward sensitivities
return False
return all(
any(
[
np.all(
(mode_df.rel_err.values < rtol)
| (mode_df.abs_err.values < atol)
),
]
)
for mode_df in dfs
)