-
Notifications
You must be signed in to change notification settings - Fork 0
/
convert4.py
107 lines (91 loc) · 2.73 KB
/
convert4.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import math
import numpy
import mdp
import sys
import array
def main(file_name):
fs = 2.5
input_data = read_file(file_name)
normalized_input_data = normalize_samples(input_data)
g_modes = ['gaus']
for g in g_modes:
channel2 = apply_ica(normalized_input_data, 1, g)
x = calc_heart_rate(channel2, fs)
return x
def main1(file_name):
fs = 2.5
input_data = read_file(file_name)
normalized_input_data = normalize_samples(input_data)
g_modes = ['gaus']
for g in g_modes:
channel2 = apply_ica(normalized_input_data, 1, g)
return channel2
def read_file(file_name):
'''
Receive rgb data from a file in this format r,g,b, on each line
'''
a = open(file_name).read().split('\n')
def my_split(x):
_float = float
return map(_float, x.split(','))
return map(my_split, a[:-1])
def normalize_sample(x):
'''
x is a 1-dimensional raw list of receive data
Return a normalized list
'''
#calc mean value. This may be not correct
N = len(x)
mean = float(sum(x))/N
x1 = [i**2 for i in x]
mean_x1 = float(sum(x1))/N
variance = mean_x1 - mean**2
normalized_x = [float(i-mean)/variance for i in x]
return normalized_x
def normalize_samples(x):
'''
Normalize all the 3 channels
x is a 3-dimensional list of raw data. Just normalize each of them
'''
f_n = normalize_sample
return map(f_n, x)
def apply_ica(x, i, g):
'''
Seperate the original rgb sources into 3 different channels
i is between 0 to 2, which is the channel
g is the mode to converge
'''
#x1 = numpy.transpose(x)
x1 = numpy.array(x)
#channels = mdp.fastica(x1, input_dim=3)
#channels = mdp.fastica(x1, input_dim=3, verbose=True, g=g)
channels = mdp.fastica(x1, input_dim=3, g=g, verbose=True)
print channels
tmp = channels.transpose()
return tmp[i]
def calc_heart_rate(x, fs):
'''
x is a discrete function, list of values.
Returns a DFT of these samples.
fs is the sampling frequency
N is the total number of samples
'''
N = len(x)
#use discrete frequency transform for the samplings
dft_x = numpy.fft.fft(x)
amp_max = 0
tar_freq = 0
def my_compare(x,y):
return cmp(y[1], x[1])
tmp_list = [(fs*i/float(N),math.sqrt(dft_x[i].real**2 + dft_x[i].imag**2)) for i in range(N)]
#tmp_list.sort(my_compare)
#for i in range(1, N):
# tmp = dft_x[i]
# f = fs * i / float(N)
# amp = math.sqrt(tmp.real**2 + tmp.imag**2)
# if amp_max < amp:
# amp_max = amp
# tar_freq = f
return tmp_list
if __name__ == '__main__':
main()