-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathimage_analysis.py
executable file
·254 lines (231 loc) · 8.39 KB
/
image_analysis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
#! /usr/local/enthought/bin/python
"""
Script for converting .mat files into a more useful format
more to come...
Can be called inside ipython: %run matconvert (same directory)
If not called as main, super_grid = make_super_grid must be called
"""
import scipy as sp
from scipy.io import loadmat
from scipy.interpolate import griddata
from scipy.linalg import det
from scipy import sparse
import numpy as np
from numpy import sum, sqrt, einsum
import os
import matplotlib.pyplot as plt
import re
from socket import gethostname
hostname = gethostname()
if hostname == 'ept':
STUFFDIR = '/afs/msc.cornell.edu/home/jsethna/cc2285/storage/ERL/ERLstuff'
elif hostname == 'heveled':
STUFFDIR = '/home/colinc/storage/ERL/ERLstuff'
elif hostname == 'dain':
STUFFDIR = '/c/colin/ERL/ERLstuff'
walktree = os.walk(STUFFDIR,topdown=True, followlinks=False)
allfiles=[]
for folder in walktree:
for files in folder[2]:
allfiles.append(os.path.join(folder[0],files))
howmany=len(allfiles) #number of files accessible
#allfiles = [ os.path.join(STUFFDIR,dir,fl) for dir in os.listdir(STUFFDIR)
#for fl in os.listdir(os.path.join(STUFFDIR,dir)) ] #(By Alex)
def sortfiles():
"""Takes allfiles and makes a dictionary with 4 different kinds of file
paths, sorts according to different measurements denoted by B1ver, B1hor,
A4ver, A4hor."""
B1ver=re.compile('B1ver')
B1hor=re.compile('B1hor')
A4ver=re.compile('A4ver')
A4hor=re.compile('A4hor')
bv=[]
bh=[]
av=[]
ah=[]
outdict={}
for files in allfiles:
if B1ver.search(files)!=None:
bv.append(files)
elif B1hor.search(files)!=None:
bh.append(files)
elif A4ver.search(files)!=None:
av.append(files)
elif A4hor.search(files)!=None:
ah.append(files)
outdict['B1ver']=bv
outdict['B1hor']=bh
outdict['A4ver']=av
outdict['A4hor']=ah
return outdict
sortedfiles=sortfiles()
bvlen=len(sortedfiles['B1ver'])
bhlen=len(sortedfiles['B1hor'])
avlen=len(sortedfiles['A4ver'])
ahlen=len(sortedfiles['A4hor'])
def loadnum(num, kind = 'B1hor', res = 0):
"""Loads .mat file and outputs dictionary. First argument accepts one of
four kinds B1ver, B1hor, A4ver, A4hor as strings. Second argument
takes ints."""
loaded = loadmat(sortedfiles[kind][num])['data']
#loaded is of type dtype (access with loaded.dtype)
outdict = {}
res = int( sqrt(len(loaded['x'][0,0])) )
outdict['x'] = loaded['x'][0,0].reshape(res,res)
outdict['y'] = loaded['y'][0,0].reshape(res,res)
outdict['z'] = -loaded['z'][0,0].reshape(res,res)
outdict['res'] = res
#note outdict['z'] is inverted to correct data
return outdict
def load_all_images(kind = 'B1hor'):
"""Select measurement kind and return all raw images"""
length = len(sortedfiles[kind])
outlist = []
for fil in range(length):
tp = loadmat(sortedfiles[kind][fil])['data']
tpx = tp['x'][0,0]
tpy = tp['y'][0,0]
tpz = - tp['z'][0,0] #fixes inversion
p_val = tp['p'][0,0][0]
if p_val == 0.0:
#print 'file {} has p = 0.0'.format(fil)
continue
tpx = tpx/p_val
outlist.append([tpx, tpy, tpz, p_val])
outfile = open('B1hor_raw.npy','w+')
np.savez(outfile, data = outlist)
outfile.close()
return None
if __name__ == '__main__':
load_all_images()
with open('B1hor_raw.npy','r') as infile:
data = np.load(infile)
B1hor_raw = data['data']
del data
max_number = len(B1hor_raw)
def super_grid_parameters():
"""Find the size and resolution of a grid which contains all data"""
all_x_range, all_y_range, all_pixel = [], [], []
for image in range(B1hor_raw.shape[0]):
all_x_range.append(B1hor_raw[image,0].ptp())
all_y_range.append(B1hor_raw[image,1].ptp())
all_pixel.append(int(sqrt(len(B1hor_raw[image,0]))))
x_ranges = np.array(all_x_range)
y_ranges = np.array(all_y_range)
pixels = np.array(all_pixel)
deltaXs = x_ranges/pixels
deltaYs = y_ranges/pixels
deltaX = np.median(deltaXs)/2. #factors chosen arbitrarily
deltaY = np.median(deltaYs)/2.
min_dX = deltaXs.min()
min_dY = deltaYs.min()
Lx = x_ranges.max()
Ly = y_ranges.max()
Nx = Lx/deltaX
Ny = Ly/deltaY
return {'Lx': Lx, 'Ly': Ly, 'deltaX': deltaXs, 'deltaY': deltaYs,
'x_ranges': x_ranges, 'y_ranges': y_ranges, 'pixel_no': pixels,
'deltaX': deltaX, 'deltaY': deltaY,
'Nx': Nx, 'Ny': Ny, 'min_dX': min_dX, 'min_dY': min_dY}
grid_params = super_grid_parameters()
def subtract_background(num):
"""Returns background subtracted from numpy array of B1hor image data"""
temp = B1hor_raw[num]
counts, bins = np.histogram(temp[2], bins = 300)
pos = [pos for pos,value in enumerate(counts) if value == max(counts)]
background_level = bins[pos[0]]
temp[2] = temp[2]-background_level #subtract background
temp[2][abs(temp[2])<100.] = 0.0 #clip small numbers
return temp
def make_super_grid():
"""For optimal resolution and range, leave custom_params = None. In order to
create custom super_grid, specify tuple custom = (Lx, Ly, min_dX,
min_dY)"""
Lx = grid_params['Lx']
Ly = grid_params['Ly']
min_dX = grid_params['min_dX']
min_dY = grid_params['min_dY']
new_x = np.arange(-Lx/2., Lx/2., min_dX)
new_y = np.arange(-Ly/2., Ly/2., min_dY)
new_x_grid, new_y_grid = np.meshgrid(new_x, new_y)
return np.array([new_x_grid, new_y_grid])
super_grid = make_super_grid()
def interpolate_to_center(num):
"""Select number from 0 to max_number-1. Outputs interpolated data points
associated to super_grid specified"""
Lx = grid_params['Lx']
Ly = grid_params['Ly']
min_dX = grid_params['min_dX']
min_dY = grid_params['min_dY']
deltaX = grid_params['deltaX']
deltaY = grid_params['deltaY']
indata = subtract_background(num)
measuredpoints = np.vstack((indata[0].squeeze(),indata[1].squeeze())).T
res = int(np.sqrt(len(indata[0])))
x_space = np.linspace(indata[0].min(), indata[0].max(), 2*res)
y_space = np.linspace(indata[1].min(), indata[1].max(), 2*res)
x_grid, y_grid = np.meshgrid(x_space, y_space)
try:
z_grid = griddata(measuredpoints, indata[2], (x_grid, y_grid),
method = 'linear', fill_value = 0).squeeze()
except:
print "**First Interpolation failed**"
raise
mass = sum(z_grid)
#print mass
if mass == 0.0:
print 'Zero mass error'
return None
x_center = sum(x_grid * z_grid)/mass
y_center = sum(y_grid * z_grid)/mass
#print x_center, y_center
#new_x = np.arange(x_center - Lx/2., x_center + Lx/2., min_dX)
#new_y = np.arange(y_center - Ly/2., y_center + Ly/2., min_dY)
#new_x_grid, new_y_grid = np.meshgrid(new_x, new_y)
new_x_grid = super_grid[0] + x_center
new_y_grid = super_grid[1] + y_center
try:
z_grid = griddata(measuredpoints, indata[2], (new_x_grid, new_y_grid),
method = 'linear', fill_value = 0).squeeze()
except:
print "**Second Interpolation failed**"
raise
#mass = sum(z_grid)
#x_center = sum(new_x_grid * z_grid)/mass
#y_center = sum(new_y_grid * z_grid)/mass
#new_x_grid = new_x_grid - x_center
#new_y_grid = new_y_grid - y_center
return z_grid
def emittance(data):
"""Calculate the r.m.s. emittance of a uniform [X,Y,F(X,Y)] data array"""
#tp = interpolate_to_center(num)
xy = super_grid
F = data
norm = abs(F.sum())
if norm == 0.0:
return None
return 4*sqrt(det( 1./norm * einsum('iXY,jXY,XY->ij',xy,xy,F) ))
def make_sparse_array(num):
"""Select num in range from 0 to len(B1hor_raw) = 2074 (at some time).
Outputs sparse array and accompanying emittance in a dictionary."""
data = interpolate_to_center(num)
emit = emittance(data)
if data is None:
return None
pos = data.nonzero()
values = [data[pos[0][i],pos[0][i]] for i in range(len(pos[0]))]
sparse_data = sparse.coo_matrix((values, pos), shape = data.shape)
return {'sparse_data': sparse_data.tocsc(), 'emittance': emit}
def plot_interpolated_image(num):
"""Select number and super_grid parameters, will plot result"""
data = interpolate_to_center(num)
grid = super_grid
if plt.fignum_exists(num):
plt.close(num)
f1=plt.figure(num)
ax=f1.add_subplot(111)
cax1=ax.pcolormesh(super_grid[0], super_grid[1],data)
f1.colorbar(cax1)
plt.show()
print data.shape
return f1