-
-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
amf.py
315 lines (228 loc) · 9.73 KB
/
amf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
"""
A module for working with additive and multiplicative functionals.
"""
import numpy as np
import scipy.linalg as la
import quantecon as qe
from collections import namedtuple
add_decomp = namedtuple('additive_decomp', 'ν H g')
mult_decomp = namedtuple('multiplicative_decomp', 'ν_tilde H g')
class AMF:
"""
A class for transforming an additive (multiplicative) functional into a
QuantEcon linear state space system. It uses the first-order VAR
representation to build the LSS representation using the
`LinearStateSpace` class.
First-order VAR representation:
.. math::
x_{t+1} = Ax_{t} + Bz_{t+1}
y_{t+1}-y_{t} = ν + Dx_{t} + Fz_{t+1}
Linear State Space (LSS) representation:
.. math::
\hat{x}_{t+1} = \hat{A}\hat{x}_{t}+\hat{B}z_{t+1}
\hat{y}_{t} = \hat{D}\hat{x}_{t}
Parameters
----------
A : array_like(float, ndim=2)
Part of the first-order vector autoregression equation. It should be an
`nx x nx` matrix.
B : array_like(float, ndim=2)
Part of the first-order vector autoregression equation. It should be an
`nx x nk` matrix.
D : array_like(float, dim=2)
Part of the nonstationary random process. It should be an `ny x nx`
matrix.
F : array_like or None, optional(default=None)
Part of the nonstationary random process. If array_like, it should be
an `ny x nk` matrix.
ν : array_like or float or None, optional(default=None)
Part of the nonstationary random process. If array_like, it should be
an `ny x 1` matrix.
Attributes
----------
A, B, D, F, ν, nx, nk, ny : See Parameters.
lss : Instance of `LinearStateSpace`.
LSS representation of the additive (multiplicative) functional.
additive_decomp : namedtuple
A namedtuple containing the following items:
::
"ν" : unconditional mean difference in Y
"H" : coefficient for the (linear) martingale component
"g" : coefficient for the stationary component g(x)
multiplicative_decomp : namedtuple
A namedtuple containing the following items:
::
"ν_tilde" : eigenvalue
"H" : coefficient for the (linear) martingale component
"g" : coefficient for the stationary component g(x)
Examples
----------
Consider the following example:
>>> ϕ_1, ϕ_2, ϕ_3, ϕ_4 = 0.5, -0.2, 0, 0.5
>>> σ = 0.01
>>> ν = 0.01 # Growth rate
>>> A = np.array([[ϕ_1, ϕ_2, ϕ_3, ϕ_4],
... [ 1, 0, 0, 0],
... [ 0, 1, 0, 0],
... [ 0, 0, 1, 0]])
>>> B = np.array([[σ, 0, 0, 0]]).T
>>> D = np.array([[1, 0, 0, 0]]) @ A
>>> F = np.array([[1, 0, 0, 0]]) @ B
>>> amf = qe.AMF(A, B, D, F, ν=ν)
The additive decomposition can be accessed by:
>>> amf.additive_decomp
additive_decomp(ν=array([[0.01]]), H=array([[0.05]]),
g=array([[4. , 1.5, 2.5, 2.5]]))
The multiplicative decomposition can be accessed by:
>>> amf.multiplicative_decomp
multiplicative_decomp(ν_tilde=array([[0.01125]]), H=array([[0.05]]),
g=array([[4. , 1.5, 2.5, 2.5]]))
References
----------
.. [1] Lars Peter Hansen and Thomas J Sargent. Robustness. Princeton
university press, 2008.
.. [2] Lars Peter Hansen and José A Scheinkman. Long-term risk: An operator
approach. Econometrica, 77(1):177–234, 2009.
"""
def __init__(self, A, B, D, F=None, ν=None):
# = Set Inputs = #
self.A = np.atleast_2d(A)
self.B = np.atleast_2d(B)
self.nx, self.nk = self.B.shape
self.D = np.atleast_2d(D)
self.ny = self.D.shape[0]
if hasattr(F, '__getitem__'):
self.F = np.atleast_2d(F) # F is array_like
else:
self.F = np.zeros((self.nk, self.nk))
if hasattr(ν, '__getitem__') or isinstance(ν, float):
self.ν = np.atleast_2d(ν) # ν is array_like or float
else:
self.ν = np.zeros((self.ny, 1))
# = Check dimensions = #
self._attr_dims_check()
# = Check shape = #
self._attr_shape_check()
# = Compute Additive Decomposition = #
eye = np.identity(self.nx)
A_res = la.solve(eye - self.A, eye)
g = self.D @ A_res
H = self.F + self.D @ A_res @ self.B
self.additive_decomp = add_decomp(self.ν, H, g)
# = Compute Multiplicative Decomposition = #
ν_tilde = self.ν + (.5) * np.expand_dims(np.diag(H @ H.T), 1)
self.multiplicative_decomp = mult_decomp(ν_tilde, H, g)
# = Construct LSS = #
nx0c = np.zeros((self.nx, 1))
nx0r = np.zeros(self.nx)
nx1 = np.ones(self.nx)
nk0 = np.zeros(self.nk)
ny0c = np.zeros((self.ny, 1))
ny0r = np.zeros(self.ny)
ny1m = np.eye(self.ny)
ny0m = np.zeros((self.ny, self.ny))
nyx0m = np.zeros_like(self.D)
x0 = self._construct_x0(nx0r, ny0r)
A_bar = self._construct_A_bar(x0, nx0c, nyx0m, ny0c, ny1m, ny0m)
B_bar = self._construct_B_bar(nk0, H)
G_bar = self._construct_G_bar(nx0c, self.nx, nyx0m, ny0c, ny1m, ny0m,
g)
H_bar = self._construct_H_bar(self.nx, self.ny, self.nk)
Sigma_0 = self._construct_Sigma_0(x0)
self.lss = qe.LinearStateSpace(A_bar, B_bar, G_bar, H_bar, mu_0=x0,
Sigma_0=Sigma_0)
def _construct_x0(self, nx0r, ny0r):
"Construct initial state x0 for LSS instance."
x0 = np.hstack([1, 0, nx0r, ny0r, ny0r])
return x0
def _construct_A_bar(self, x0, nx0c, nyx0m, ny0c, ny1m, ny0m):
"Construct A matrix for LSS instance."
# Order of states is: [1, t, x_{t}, y_{t}, m_{t}]
# Transition for 1
A1 = x0.copy()
# Transition for t
A2 = x0.copy()
A2[1] = 1.
# Transition for x_{t+1}
A3 = np.hstack([nx0c, nx0c, self.A, nyx0m.T, nyx0m.T])
# Transition for y_{t+1}
A4 = np.hstack([self.ν, ny0c, self.D, ny1m, ny0m])
# Transition for m_{t+1}
A5 = np.hstack([ny0c, ny0c, nyx0m, ny0m, ny1m])
A_bar = np.vstack([A1, A2, A3, A4, A5])
return A_bar
def _construct_B_bar(self, nk0, H):
"Construct B matrix for LSS instance."
B_bar = np.vstack([nk0, nk0, self.B, self.F, H])
return B_bar
def _construct_G_bar(self, nx0c, nx, nyx0m, ny0c, ny1m, ny0m, g):
"Construct G matrix for LSS instance."
# Order of observation is: [x_{t}, y_{t}, m_{t}, s_{t}, tau_{t}]
# Selector for x_{t}
G1 = np.hstack([nx0c, nx0c, np.eye(nx), nyx0m.T, nyx0m.T])
# Selector for y_{t}
G2 = np.hstack([ny0c, ny0c, nyx0m, ny1m, ny0m])
# Selector for martingale m_{t}
G3 = np.hstack([ny0c, ny0c, nyx0m, ny0m, ny1m])
# Selector for stationary s_{t}
G4 = np.hstack([ny0c, ny0c, -g, ny0m, ny0m])
# Selector for trend tau_{t}
G5 = np.hstack([ny0c, self.ν, nyx0m, ny0m, ny0m])
G_bar = np.vstack([G1, G2, G3, G4, G5])
return G_bar
def _construct_H_bar(self, nx, ny, nk):
"Construct H matrix for LSS instance."
H_bar = np.zeros((2 + nx + 2 * ny, nk))
return H_bar
def _construct_Sigma_0(self, x0):
"Construct initial covariance matrix Sigma_0 for LSS instance."
Sigma_0 = np.zeros((len(x0), len(x0)))
return Sigma_0
def _attr_dims_check(self):
"Check the dimensions of attributes."
inputs = {'A': self.A, 'B': self.B, 'D': self.D, 'F': self.F,
'ν': self.ν}
for input_name, input_val in inputs.items():
if input_val.ndim != 2:
raise ValueError(input_name + ' must have 2 dimensions.')
def _attr_shape_check(self):
"Check the shape of attributes."
same_dim_pairs = {'first': (0, {'A and B': [self.A, self.B],
'D and F': [self.D, self.F],
'D and ν': [self.D, self.ν]}),
'second': (1, {'A and D': [self.A, self.D],
'B and F': [self.B, self.F]})}
for dim_name, (dim_idx, pairs) in same_dim_pairs.items():
for pair_name, (e0, e1) in pairs.items():
if e0.shape[dim_idx] != e1.shape[dim_idx]:
raise ValueError('The ' + dim_name + ' dimensions of ' +
pair_name + ' must match.')
if self.A.shape[0] != self.A.shape[1]:
raise ValueError('A (shape: %s) must be a square matrix.' %
(self.A.shape, ))
# F.shape[0] == ν.shape[0] holds by transitivity
# Same for D.shape[1] == B.shape[0] == A.shape[0]
def loglikelihood_path(self, x, y):
"""
Computes the log-likelihood path associated with a path of additive
functionals `x` and `y` and assuming standard normal shocks.
Parameters
----------
x : ndarray(float, ndim=1)
A path of observations for the state variable.
y : ndarray(float, ndim=1)
A path of observations for the random process
Returns
--------
llh : ndarray(float, ndim=1)
An array containing the loglikelihood path.
"""
k, T = y.shape
FF = self.F @ self.F.T
FF_inv = la.inv(FF)
temp = y[:, 1:] - y[:, :-1] - self.D @ x[:, :-1]
obs = temp * FF_inv * temp
obssum = np.cumsum(obs)
scalar = (np.log(la.det(FF)) + k * np.log(2 * np.pi)) * np.arange(1, T)
llh = (-0.5) * (obssum + scalar)
return llh