forked from FlorentF9/DeepTemporalClustering
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTSClusteringLayer.py
90 lines (79 loc) · 4.61 KB
/
TSClusteringLayer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
"""
Implementation of the Deep Temporal Clustering model
Time Series Clustering layer
@author Florent Forest (FlorentF9)
"""
from keras.engine.topology import Layer, InputSpec
import keras.backend as K
class TSClusteringLayer(Layer):
"""
Clustering layer converts input sample (feature) to soft label, i.e. a vector that represents the probability of the
sample belonging to each cluster. The probability is calculated with student's t-distribution.
# Arguments
n_clusters: number of clusters.
weights: list of Numpy array with shape `(n_clusters, timesteps, n_features)` witch represents the initial cluster centers.
alpha: parameter in Student's t-distribution. Default to 1.0.
dist_metric: distance metric between sequences used in similarity kernel ('eucl', 'cir', 'cor' or 'acf').
# Input shape
3D tensor with shape: `(n_samples, timesteps, n_features)`.
# Output shape
2D tensor with shape: `(n_samples, n_clusters)`.
"""
def __init__(self, n_clusters, weights=None, alpha=1.0, dist_metric='eucl', **kwargs):
if 'input_shape' not in kwargs and 'input_dim' in kwargs:
kwargs['input_shape'] = (kwargs.pop('input_dim'),)
super(TSClusteringLayer, self).__init__(**kwargs)
self.n_clusters = n_clusters
self.alpha = alpha
self.dist_metric = dist_metric
self.initial_weights = weights
self.input_spec = InputSpec(ndim=3)
self.clusters = None
self.built = False
def build(self, input_shape):
assert len(input_shape) == 3
input_dim = input_shape[2]
input_steps = input_shape[1]
self.input_spec = InputSpec(dtype=K.floatx(), shape=(None, input_steps, input_dim))
self.clusters = self.add_weight(shape=(self.n_clusters, input_steps, input_dim), initializer='glorot_uniform', name='cluster_centers')
if self.initial_weights is not None:
self.set_weights(self.initial_weights)
del self.initial_weights
self.built = True
def call(self, inputs, **kwargs):
"""
Student t-distribution kernel, probability of assigning encoded sequence i to cluster k.
q_{ik} = (1 + dist(z_i, m_k)^2)^{-1} / normalization.
Arguments:
inputs: encoded input sequences, shape=(n_samples, timesteps, n_features)
Return:
q: soft labels for each sample. shape=(n_samples, n_clusters)
"""
if self.dist_metric == 'eucl':
distance = K.sum(K.sqrt(K.sum(K.square(K.expand_dims(inputs, axis=1) - self.clusters), axis=2)), axis=-1)
elif self.dist_metric == 'cid':
ce_x = K.sqrt(K.sum(K.square(inputs[:, 1:, :] - inputs[:, :-1, :]), axis=1)) # shape (n_samples, n_features)
ce_w = K.sqrt(K.sum(K.square(self.clusters[:, 1:, :] - self.clusters[:, :-1, :]), axis=1)) # shape (n_clusters, n_features)
ce = K.maximum(K.expand_dims(ce_x, axis=1), ce_w) / K.minimum(K.expand_dims(ce_x, axis=1), ce_w) # shape (n_samples, n_clusters, n_features)
ed = K.sqrt(K.sum(K.square(K.expand_dims(inputs, axis=1) - self.clusters), axis=2)) # shape (n_samples, n_clusters, n_features)
distance = K.sum(ed * ce, axis=-1) # shape (n_samples, n_clusters)
elif self.dist_metric == 'cor':
inputs_norm = (inputs - K.expand_dims(K.mean(inputs, axis=1), axis=1)) / K.expand_dims(K.std(inputs, axis=1), axis=1) # shape (n_samples, timesteps, n_features)
clusters_norm = (self.clusters - K.expand_dims(K.mean(self.clusters, axis=1), axis=1)) / K.expand_dims(K.std(self.clusters, axis=1), axis=1) # shape (n_clusters, timesteps, n_features)
pcc = K.mean(K.expand_dims(inputs_norm, axis=1) * clusters_norm, axis=2) # Pearson correlation coefficients
distance = K.sum(K.sqrt(2.0 * (1.0 - pcc)), axis=-1) # correlation-based similarities, shape (n_samples, n_clusters)
elif self.dist_metric == 'acf':
raise NotImplementedError
else:
raise ValueError('Available distances are eucl, cid, cor and acf!')
q = 1.0 / (1.0 + K.square(distance) / self.alpha)
q **= (self.alpha + 1.0) / 2.0
q = K.transpose(K.transpose(q) / K.sum(q, axis=1))
return q
def compute_output_shape(self, input_shape):
assert input_shape and len(input_shape) == 3
return input_shape[0], self.n_clusters
def get_config(self):
config = {'n_clusters': self.n_clusters, 'dist_metric': self.dist_metric}
base_config = super(TSClusteringLayer, self).get_config()
return dict(list(base_config.items()) + list(config.items()))