-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathvon_mises_fisher.py
212 lines (173 loc) · 7.09 KB
/
von_mises_fisher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
import math
import torch
from torch.distributions.kl import register_kl
from ive import ive, ive_fraction_approx, ive_fraction_approx2
from hyperspherical_uniform import (
HypersphericalUniform,
)
class VonMisesFisher(torch.distributions.Distribution):
arg_constraints = {
"loc": torch.distributions.constraints.real,
"scale": torch.distributions.constraints.positive,
}
support = torch.distributions.constraints.real
has_rsample = True
_mean_carrier_measure = 0
@property
def mean(self):
# option 1:
return self.loc * (
ive(self.__m / 2, self.scale) / ive(self.__m / 2 - 1, self.scale)
)
# option 2:
# return self.loc * ive_fraction_approx(torch.tensor(self.__m / 2), self.scale)
# options 3:
# return self.loc * ive_fraction_approx2(torch.tensor(self.__m / 2), self.scale)
@property
def stddev(self):
return self.scale
def __init__(self, loc, scale, validate_args=None, k=1):
self.dtype = loc.dtype
self.loc = loc
self.scale = scale
self.device = loc.device
self.__m = loc.shape[-1]
self.__e1 = (torch.Tensor([1.0] + [0] * (loc.shape[-1] - 1))).to(self.device)
self.k = k
super().__init__(self.loc.size(), validate_args=validate_args)
def sample(self, shape=torch.Size()):
with torch.no_grad():
return self.rsample(shape)
def rsample(self, shape=torch.Size()):
shape = shape if isinstance(shape, torch.Size) else torch.Size([shape])
w = (
self.__sample_w3(shape=shape)
if self.__m == 3
else self.__sample_w_rej(shape=shape)
)
v = (
torch.distributions.Normal(0, 1)
.sample(shape + torch.Size(self.loc.shape))
.to(self.device)
.transpose(0, -1)[1:]
).transpose(0, -1)
v = v / v.norm(dim=-1, keepdim=True)
w_ = torch.sqrt(torch.clamp(1 - (w ** 2), 1e-10))
# w= torch.unsqueeze(w,-1)
# w_ = torch.unsqueeze(w_,-1)
# print(torch.unsqueeze(w_, -1).shape)
x = torch.cat((w, w_ * v), -1)
z = self.__householder_rotation(x)
return z.type(self.dtype)
def __sample_w3(self, shape):
shape = shape + torch.Size(self.scale.shape)
u = torch.distributions.Uniform(0, 1).sample(shape).to(self.device)
self.__w = (
1
+ torch.stack(
[torch.log(u), torch.log(1 - u) - 2 * self.scale], dim=0
).logsumexp(0)
/ self.scale
)
return self.__w
def __sample_w_rej(self, shape):
c = torch.sqrt((4 * (self.scale ** 2)) + (self.__m - 1) ** 2)
b_true = (-2 * self.scale + c) / (self.__m - 1)
# using Taylor approximation with a smooth swift from 10 < scale < 11
# to avoid numerical errors for large scale
b_app = (self.__m - 1) / (4 * self.scale)
s = torch.min(
torch.max(
torch.tensor([0.0], dtype=self.dtype, device=self.device),
self.scale - 10,
),
torch.tensor([1.0], dtype=self.dtype, device=self.device),
)
b = b_app * s + b_true * (1 - s)
a = (self.__m - 1 + 2 * self.scale + c) / 4
d = (4 * a * b) / (1 + b) - (self.__m - 1) * math.log(self.__m - 1)
self.__b, (self.__e, self.__w) = b, self.__while_loop(b, a, d, shape, k=self.k)
return self.__w
@staticmethod
def first_nonzero(x, dim, invalid_val=-1):
mask = x > 0
idx = torch.where(
mask.any(dim=dim),
mask.float().argmax(dim=1).squeeze(),
torch.tensor(invalid_val, device=x.device),
)
return idx
def __while_loop(self, b, a, d, shape, k=20, eps=1e-20):
# matrix while loop: samples a matrix of [A, k] samples, to avoid looping all together
b, a, d = [
e.repeat(*shape, *([1] * len(self.scale.shape))).reshape(-1, 1)
for e in (b, a, d)
]
w, e, bool_mask = (
torch.zeros_like(b).to(self.device),
torch.zeros_like(b).to(self.device),
(torch.ones_like(b) == 1).to(self.device),
)
sample_shape = torch.Size([b.shape[0], k])
shape = shape + torch.Size(self.scale.shape)
while bool_mask.sum() != 0:
con1 = torch.tensor((self.__m - 1) / 2, dtype=torch.float64)
con2 = torch.tensor((self.__m - 1) / 2, dtype=torch.float64)
e_ = (
torch.distributions.Beta(con1, con2)
.sample(sample_shape)
.to(self.device)
.type(self.dtype)
)
u = (
torch.distributions.Uniform(0 + eps, 1 - eps)
.sample(sample_shape)
.to(self.device)
.type(self.dtype)
)
w_ = (1 - (1 + b) * e_) / (1 - (1 - b) * e_)
t = (2 * a * b) / (1 - (1 - b) * e_)
accept = ((self.__m - 1.0) * t.log() - t + d) > torch.log(u)
accept_idx = self.first_nonzero(accept, dim=-1, invalid_val=-1).unsqueeze(1)
accept_idx_clamped = accept_idx.clamp(0)
# we use .abs(), in order to not get -1 index issues, the -1 is still used afterwards
w_ = w_.gather(1, accept_idx_clamped.view(-1, 1))
e_ = e_.gather(1, accept_idx_clamped.view(-1, 1))
reject = accept_idx < 0
accept = ~reject if torch.__version__ >= "1.2.0" else 1 - reject
w[bool_mask * accept] = w_[bool_mask * accept]
e[bool_mask * accept] = e_[bool_mask * accept]
bool_mask[bool_mask * accept] = reject[bool_mask * accept]
return e.reshape(shape), w.reshape(shape)
def __householder_rotation(self, x):
u = self.__e1 - self.loc
u = u / (u.norm(dim=-1, keepdim=True) + 1e-5)
z = x - 2 * (x * u).sum(-1, keepdim=True) * u
return z
def entropy(self):
# option 1:
output = (
-self.scale
* ive(self.__m / 2, self.scale)
/ ive((self.__m / 2) - 1, self.scale)
)
# option 2:
# output = - self.scale * ive_fraction_approx(torch.tensor(self.__m / 2), self.scale)
# option 3:
# output = - self.scale * ive_fraction_approx2(torch.tensor(self.__m / 2), self.scale)
return output.view(*(output.shape[:-1])) + self._log_normalization()
def log_prob(self, x):
return self._log_unnormalized_prob(x) - self._log_normalization()
def _log_unnormalized_prob(self, x):
output = self.scale * (self.loc * x).sum(-1, keepdim=True)
return output.view(*(output.shape[:-1]))
def _log_normalization(self):
output = -(
(self.__m / 2 - 1) * torch.log(self.scale)
- (self.__m / 2) * math.log(2 * math.pi)
- (self.scale + torch.log(ive(self.__m / 2 - 1, self.scale)))
)
return output.view(*(output.shape[:-1]))
@register_kl(VonMisesFisher, HypersphericalUniform)
def _kl_vmf_uniform(vmf, hyu):
return -vmf.entropy() + hyu.entropy()