-
Notifications
You must be signed in to change notification settings - Fork 2
/
clustering.py
387 lines (317 loc) · 13.9 KB
/
clustering.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
'''
@Project :Awesome-DL-Models
@File :clustering.py
@Author :JackHCC
@Date :2022/2/8 12:13
@Desc :Implement Hierarchical Clustering and K-means
'''
import numpy as np
import math
import time
import random
from scipy.special import comb
from data.utils import load_data
import matplotlib.pyplot as plt
def mahalanobis_distance(X):
"""计算所有样本之间的马哈拉诺比斯距离矩阵"""
n_samples = len(X[0])
# 计算协方差矩阵
S = np.cov(X)
# 计算协方差矩阵的逆矩阵
S = np.linalg.inv(S)
# 构造马哈拉诺比斯距离矩阵
D = np.zeros((n_samples, n_samples)) # 初始化为零矩阵
for i in range(n_samples):
for j in range(i + 1, n_samples):
xi = X[:, i][:, np.newaxis]
xj = X[:, j][:, np.newaxis]
D[i][j] = D[j][i] = np.sqrt((np.dot(np.dot((xi - xj).T, S), (xi - xj))))
return D
def correlation_coefficient(X):
"""计算所有样本之间的相关系数矩阵"""
n_samples = len(X[0])
# 计算均值
means = np.mean(X, axis=0)
# 计算误差平方和
variance = [np.square((X[:, i] - means[i])).sum() for i in range(n_samples)]
# 构造相关系数矩阵
D = np.identity(n_samples) # 初始化为单位矩阵
for i in range(n_samples):
for j in range(i + 1, n_samples):
xi, xj = X[:, i], X[:, j]
numerator = ((xi - means[i]) * (xj - means[j])).sum()
denominator = np.sqrt(variance[i] * variance[j])
if denominator:
D[i][j] = D[j][i] = numerator / denominator
else: # 当出现零方差时
D[i][j] = D[j][i] = np.nan
return D
def cosine(X):
"""计算所有样本之间的夹角余弦矩阵"""
n_samples = len(X[0])
# 计算平方和
square = [np.square(X[:, i]).sum() for i in range(n_samples)]
# 构造夹角余弦矩阵
D = np.identity(n_samples) # 初始化为单位矩阵
for i in range(n_samples):
for j in range(i + 1, n_samples):
xi, xj = X[:, i], X[:, j]
numerator = (xi * xj).sum()
denominator = np.sqrt(square[i] * square[j])
if denominator:
D[i][j] = D[j][i] = numerator / denominator
else: # 当出现零平方和时
D[i][j] = D[j][i] = np.nan
return D
def scatter_matrix(X, G):
"""计算样本散布矩阵
:param X: 样本
:param G: 类别包含的样本
:return: 样本散步矩阵
"""
n_samples = len(G)
n_features = len(X)
# 计算类的中心
means = np.mean(X[:, G], axis=1)
A = np.zeros((n_features, n_features))
for i in range(n_samples):
A += np.dot((X[:, i] - means)[:, np.newaxis], (X[:, i] - means)[:, np.newaxis].T)
return A
def covariance_matrix(X, G):
"""计算样本协方差矩阵"""
n_features = len(X)
A = scatter_matrix(X, G)
S = A / (n_features - 1)
return S
# 定义标准化函数,对每一列特征进行min-max标准化,将数据缩放到0-1之间
# 标准化处理对于计算距离的机器学习方法是非常重要的,因为特征的尺度不同会导致计算出来的距离倾向于尺度大的特征,为保证距离对每一列特征都是公平的,必须将所有特征缩放到同一尺度范围内
def normalize(Xarray):
'''
INPUT:
Xarray - (array) 特征数据数组
OUTPUT:
Xarray - (array) 标准化处理后的特征数据数组
'''
for f in range(Xarray.shape[1]):
maxf = np.max(Xarray[:, f])
minf = np.min(Xarray[:, f])
for n in range(Xarray.shape[0]):
Xarray[n][f] = (Xarray[n][f] - minf) / (maxf - minf)
return Xarray
# 定义计算两条数据间的距离的函数,这里计算的是欧式距离
def cal_distance(xi, xj):
'''
INPUT:
Xi - (array) 第i条特征数据
Xj - (array) 第j条特征数据
OUTPUT:
dist - (float) 两条数据的欧式距离
'''
dist = 0
for col in range(len(xi)):
dist += (xi[col] - xj[col]) ** 2
dist = math.sqrt(dist)
return dist
# 定义计算所有特征数据两两之间距离的函数
def distances(Xarray):
'''
INPUT:
Xarray - (array) 特征数据数组
OUTPUT:
dists - (array) 两两数据的欧式距离数组
'''
dists = np.zeros((Xarray.shape[0], Xarray.shape[0])) # 定义一个数组用来保存两两数据的距离
for n1 in range(Xarray.shape[0]):
for n2 in range(n1):
dists[n1][n2] = cal_distance(Xarray[n1], Xarray[n2])
dists[n2][n1] = dists[n1][n2]
return dists
# 定义计算两类的类间距离的函数,这里计算的是最短距离
def cal_group_dist(g1, g2, group_dict, dists):
'''
INPUT:
g1 - (int) 类别1的标签
g2 - (int) 类别2的标签
group_dict - (dict) 类别字典
dists - (array) 两两数据的欧式距离数组
OUTPUT:
(int) 类间最短距离
'''
d = []
# 循环计算两类之间两两数据的距离
for xi in group_dict[g1]:
for xj in group_dict[g2]:
if xi != xj:
d.append(dists[xi][xj])
return min(d)
# 定义层次聚类函数
def hierarchical_clustering(Xarray, k, dists):
'''
INPUT:
Xarray - (array) 特征数据数组
k - (int) 设定的类别数
dists - (array) 两两数据的欧式距离数组
OUTPUT:
group_dict - (dict) 类别字典
'''
group_dict = {} # 定义一个空字典,用于保存聚类所产生的所有类别
for n in range(Xarray.shape[0]): # 层次聚类是一种聚合聚类方法,首先将每条数据都分到不同的类,数据的类别标签为0-(N-1),其中N为数据条数
group_dict[n] = [n]
newgroup = Xarray.shape[0] # newgroup表示新的类别标签,此时下一个类别标签为N
while len(group_dict.keys()) > k: # 当类别数大于我们所设定的类别数k时,不断循环进行聚类
print('Number of groups:', len(group_dict.keys()))
group_dists = {} # 定义一个空字典,用于保存两两类之间的间距,其中字典的值为元组(g1, g2),表示两个类别标签,字典的键为这两个类别的间距
# 循环计算group_dict中两两类别之间的间距,保存到group_dists中
for g1 in group_dict.keys():
for g2 in group_dict.keys():
if g1 != g2:
if (g1, g2) not in group_dists.values():
d = cal_group_dist(g1, g2, group_dict, dists)
group_dists[d] = (g1, g2)
group_mindist = min(list(group_dists.keys())) # 取类别之间的最小间距
mingroups = group_dists[group_mindist] # 取间距最小的两个类别
new = [] # 定义一个列表,用于保存所产生的新类中包含的数据,这里用之前对每条数据给的类别标签0-(N-1)来表示
for g in mingroups:
new.extend(group_dict[g]) # 将间距最小的两类中包含的数据保存在new列表中
del group_dict[g] # 然后在group_dict中移去这两类
print(newgroup, new)
group_dict[newgroup] = new # 此时聚类所产生的新类中包含的数据即为以上两类的中包含的数据的聚合,给新类贴上类别标签为newgroup,保存到group_dict中
newgroup += 1 # 产生下一个类别标签
return group_dict
# 定义计算类中心的函数,以当前类中所包含数据的各个特征均值作为新的新的类中心
def cal_group_center(group, Xarray):
'''
INPUT:
group - (list) 类所包含的数据列表
Xarray - (array) 特征数据数组
OUTPUT:
center - (array) 新的类中心
'''
center = np.zeros(Xarray.shape[1])
for i in range(Xarray.shape[1]):
for n in group:
center[i] += Xarray[n][i] # 计算当前类中第i个特征的数据之和
center = center / Xarray.shape[0] # 计算各个特征的均值
return center
# 定义k均值聚类函数
def k_means(Xarray, k, iters):
'''
INPUT:
Xarray - (array) 特征数据数组
k - (int) 设定的类别数
iters - (int) 设定的迭代次数
OUTPUT:
group_dict - (dict) 类别字典
scores - (int) 每次迭代的ARI得分列表
'''
center_inds = random.sample(range(Xarray.shape[0]), k) # 从特征数据中随机抽取k个数据索引
centers = [Xarray[ci] for ci in center_inds] # 将这k个数据索引所对应的特征数据作为初始的k个聚类中心
scores = [] # 定义一个空列表用来保存每次迭代的ARI得分
for i in range(iters):
group_dict = {i: [] for i in range(k)} # 定义一个空字典,用于保存聚类所产生的所有类别,其中字典的键为类别标签,值为类别所包含的数据列表,以索引表示每条数据
print('{}/{}'.format(i + 1, iters))
# 循环计算每条数据到各个聚类中心的距离
for n in range(Xarray.shape[0]):
dists = [] # 保存第n条数据到各个聚类中心的距离
for ci in range(k):
dist = cal_distance(Xarray[n], centers[ci])
dists.append(dist)
g = dists.index(min(dists)) # 取距离最近的中心所在的类
group_dict[g].append(n) # 将第n条数据的索引n保存到g类
print(group_dict)
for i in range(k):
centers[i] = cal_group_center(group_dict[i], Xarray) # 根据每一类所包含的数据重新计算类中心
scores.append(adjusted_rand_index(group_dict, Ylist, k)) # 将该轮迭代的ARI得分保存到scores列表
return group_dict, scores
# 定义计算调整兰德系数(ARI)的函数,调整兰德系数是一种聚类方法的常用评估方法
def adjusted_rand_index(group_dict, Ylist, k):
'''
INPUT:
group_dict - (dict) 类别字典
Ylist - (list) 类别标签列表
k - (int) 设定的类别数
OUTPUT:
(int) 调整兰德系数
'''
group_array = np.zeros((k, k)) # 定义一个数组,用来保存聚类所产生的类别标签与给定的外部标签各类别之间共同包含的数据数量
y_dict = {} # 定义一个空字典,用来保存外部标签中各类所包含的数据,结构与group_dict相同
for i in range(len(Ylist)):
if Ylist[i] not in y_dict:
y_dict[Ylist[i]] = [i]
else:
y_dict[Ylist[i]].append(i)
# 循环计算group_array的值
for i in range(k):
for j in range(k):
for n in range(len(Ylist)):
if n in group_dict[list(group_dict.keys())[i]] and n in y_dict[list(y_dict.keys())[j]]:
group_array[i][j] += 1 # 如果数据n同时在group_dict的类别i和y_dict的类别j中,group_array[i][j]的数值加一
RI = 0 # 定义兰德系数(RI)
sum_i = np.zeros(k) # 定义一个数组,用于保存聚类结果group_dict中每一类的个数
sum_j = np.zeros(k) # 定义一个数组,用于保存外部标签y_dict中每一类的个数
for i in range(k):
for j in range(k):
sum_i[i] += group_array[i][j]
sum_j[j] += group_array[i][j]
if group_array[i][j] >= 2:
RI += comb(group_array[i][j], 2) # comb用于计算group_array[i][j]中两两组合的组合数
ci = 0 # ci保存聚类结果中同一类中的两两组合数之和
cj = 0 # cj保存外部标签中同一类中的两两组合数之和
for i in range(k):
if sum_i[i] >= 2:
ci += comb(sum_i[i], 2)
for j in range(k):
if sum_j[j] >= 2:
cj += comb(sum_j[j], 2)
E_RI = ci * cj / comb(len(Ylist), 2) # 计算RI的期望
max_RI = (ci + cj) / 2 # 计算RI的最大值
return (RI - E_RI) / (max_RI - E_RI) # 返回调整兰德系数的值
if __name__ == "__main__":
print("开始测试马哈拉诺比斯距离矩阵……")
X = np.array([[0, 0, 1, 5, 5],
[2, 0, 0, 0, 2]])
print(mahalanobis_distance(X))
print("------------------------------------------")
print("开始测试相关系数矩阵……")
X = np.array([[0, 0, 1, 5, 5],
[2, 0, 0, 0, 2]])
print(correlation_coefficient(X))
print("------------------------------------------")
print("开始测试夹角余弦矩阵……")
X = np.array([[0, 0, 1, 5, 5],
[2, 0, 0, 0, 2]])
print(cosine(X))
print("------------------------------------------")
print("开始测试类的样本散布矩阵与样本协方差矩阵……")
X = np.array([[0, 0, 1, 5, 5],
[2, 0, 0, 0, 2]])
print(scatter_matrix(X, [2, 3, 4]))
print(covariance_matrix(X, [2, 3, 4]))
print("------------------------------------------")
print("开始测试聚合聚类算法……")
Xarray, Ylist = load_data('./data/iris.data') # 加载数据
start = time.time() # 保存开始时间
Xarray = normalize(Xarray) # 对特征数据进行标准化处理
k = 3 # 设定聚类数为3
dists = distances(Xarray) # 计算特征数据的距离数组
print(dists)
group_dict = hierarchical_clustering(Xarray, k, dists) # 进行层次聚类
end = time.time() # 保存结束时间
print(group_dict)
ARI = adjusted_rand_index(group_dict, Ylist, k) # 计算ARI用来评估聚类结果
print('Adjusted Rand Index:', ARI)
print('Time:', end - start)
print("------------------------------------------")
print("开始测试K-means算法……")
Xarray, Ylist = load_data('./data/iris.data') # 加载数据
start = time.time() # 保存开始时间
Xarray = normalize(Xarray) # 对特征数据进行标准化处理
k = 3 # 设定聚类数为3
iters = 2 # 设定迭代次数为2
group_dict, scores = k_means(Xarray, k, iters) # 进行k均值聚类
end = time.time() # 保存结束时间
print('Time:', end - start)
plt.plot(range(iters), scores) # 绘制ARI得分折线图
plt.show()