forked from ptodev/Distributed-Resilient-Storage
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathget-ec.py
executable file
·312 lines (263 loc) · 12.4 KB
/
get-ec.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
#!/usr/bin/env python
'''
A script to download an erasure coded file from the catalogue.
The required input parameters are "input_file", "temporary_directory" and "output_directory"
IMORTANT: ONLY ABSOLUTE PATHS SHOULD BE USED
--input_file could be /gridpp/ptodev/_Penguins.jpg/ or just Penguins.jpg
$ python get-ec.py --temporary_directory /home/paulin/Distributed-Resilient-Storage/fec/
--output_directory /home/paulin/Desktop
--input_file /gridpp/ptodev/_Penguins.jpg/
--processes 1
"temporary_directory" is where the .fec (intermediate) files will be stored.
"output_directory" is where the reconstructed file will be stored.
"input_file" is either a directory on the catalogue with erasure coded files
(eg. "/gridpp/user/_Data.txt") or the filename (eg. "Data.txt").
If it is the filename, the catalogue will be searched for the relevant files via metadata.
"processes" is the number of concurrent processes that will be ran.
'''
import zfec, sys, os, glob, multiprocessing, time, itertools, math
from DIRAC import S_OK, S_ERROR, gLogger, exit
from DIRAC.Core.Base import Script
class Params:
def __init__(self):
self.Processes = int(math.ceil(multiprocessing.cpu_count()/2.0))
def setTempDir(self, value):
self.TempDir = value
return S_OK()
def getTempDir(self):
return self.TempDir
def setOutDir(self, value):
self.OutDir = value
return S_OK()
def getOutDir(self):
return self.OutDir
def setInputFile(self, value):
self.InputFile = value
return S_OK()
def getInputFile(self):
return self.InputFile
def setProcesses(self, value):
self.Processes = value
return S_OK()
def getProcesses(self):
return self.Processes
# Instantiate the params class
cliParams = Params()
# Register accepted switches and their callbacks
Script.registerSwitch("td:", "temporary_directory=", "Direcory where the files will be downloaded.", cliParams.setTempDir)
Script.registerSwitch("od:", "output_directory=", "Location of the reconstructed file.", cliParams.setOutDir)
Script.registerSwitch("i:", "input_file=", "Name/LFN of the file to be downloaded.", cliParams.setInputFile)
Script.registerSwitch("pr:", "processes=", "Number of processes to run concurrently.", cliParams.setProcesses)
# Parse the command line and initialize DIRAC
Script.parseCommandLine(ignoreErrors = False)
switches = dict(Script.getUnprocessedSwitches())
# Get the list of services
servicesList = Script.getPositionalArgs()
import DIRAC.Interfaces.API.Dirac as dirac_api
import DIRAC.Resources.Catalog.FileCatalogClient as FCC
class Counter(object):
# A counter class for easier incrementing
def __init__(self, initval, manager):
self.val = manager.Value('i', initval)
self.lock = manager.Lock()
def increment(self):
with self.lock:
self.val.value += 1
return self.val.value
def value(self):
with self.lock:
return self.val.value
def getFileCC(ec_file, local_dir):
# A function to download files via LFNs
dirac = dirac_api.Dirac()
time1 = time.time()
output = dirac.getFile(ec_file, local_dir)
time2 = time.time()
local_counter = counter.increment()
result_queue.put([local_counter, ec_file, str(time2-time1), output])
def getFileCC_func(args):
# A function needed to unpack the two arguments
return getFileCC(*args)
def sanitize_directory(input_str):
# Add a / to the end of the string if there isn't one
if(input_str[-1] != '/'):
input_str = input_str + '/'
return input_str
def sanitize_remote_directory(input_str):
if(input_str[0] == '/' and input_str[-1] != '/'):
input_str = input_str + '/'
return input_str
def sanitizeProcesses(processes):
if(processes <= 0):
return 1
return processes
if __name__ == '__main__':
input_str = cliParams.getInputFile()
local_dir = cliParams.getTempDir()
output_dir = cliParams.getOutDir()
processes = int(cliParams.getProcesses())
ec_files = []
fc = FCC.FileCatalogClient()
#######################################################################
########################### INPUT SANITIZING ##########################
#######################################################################
input_str = sanitize_remote_directory(input_str)
local_dir = sanitize_directory(local_dir)
output_dir = sanitize_directory(output_dir)
processes = sanitizeProcesses(processes)
#######################################################################
############### GENERATE A LIST WITH ERASURE CODED FILES ##############
#######################################################################
print 'Generating a list with LFNs... ',
# The input is a directory if it starts with /
# In that case use an LFN
if(input_str[0] == '/'):
output = fc.listDirectory(input_str)
if(not output['Value']['Successful']):
print 'ERROR: ' + output['Value']['Failed'][input_str]
sys.exit()
ec_files = output['Value']['Successful'][input_str[:-1]]['Files'].keys()
# If the input does not start with / it is assumed to be a filename
# In that case search for the file using metadata
else:
output = fc.findFilesByMetadata({'EC_FILE': input_str}, '/')
if(output['Value'] == []):
print 'No such file was found!'
sys.exit()
ec_files = output['Value']
# Check if the metadata has the right filename
ec_files_tmp = []
ec_data = [ (e,fc.getFileUserMetadata(e)) for e in ec_files]
ec_files = [ (x[0],x[1]['Value']) for x in ec_data if x[1]['OK']==True and x[0].split('/')[-2][1:]==x[1]['Value']['EC_FILE'] ]
# check consistency of metadata
# Check if the metadata of the file matches the given filename
# output = fc.getFileUserMetadata(ec_file)
# if(output['Value']['EC_FILE'] == ec_file.split('/')[-2][1:]):
# ec_files_tmp.append(ec_file)
#ec_files = ec_files_tmp
print 'done!'
#######################################################################
#################### FIND THE NUMBER OF NEEDED FILES ##################
#######################################################################
#SCS - make this more resilient (compare metadata across files, and repair)
number_of_needed_files = 0
number_of_needed_files = reduce(lambda x,y: max(x,y[1]['SPLIT']), ec_files, 0)
#for ecf in ec_files:
# output = fc.getFileUserMetadata(ec_files[0])
# if(output['OK']==True):
# number_of_needed_files = int(output['Value']['SPLIT'])
# break
if (number_of_needed_files == 0):
print "Error, unable to obtain EC data from any chunk metadata, exiting"
sys.exit(1)
print 'Number of files needed for reconstruction: ' + str(number_of_needed_files)
#######################################################################
################### START A MULTIPROCESSING DOWNLOAD ##################
#######################################################################
sharefiles = []
print 'Downloading files to ' + local_dir
successful_downloads = 0
if(processes == 1):
dirac = dirac_api.Dirac()
# A counter showing the number of the current downloded file
i = 1
time1 = time.time()
for ec_file in ec_files:
# Counstruct a counter, i.e. [2/15] which
# shows which file i being downloaded
counter = '[' + str(i) + '/' + str(number_of_needed_files) + '] '
i += 1
print counter +'Downloading ' + ec_file[0] + '... ',
# Download the file from the grid
time_file_1 = time.time()
output = dirac.getFile(ec_file[0], local_dir)
time_file_2 = time.time()
# See if the download was successful
if(not output['OK']):
print ''
print 'ERROR: ' + output['Message']
else:
successful_downloads += 1
print 'done in ' + str(time_file_2-time_file_1)[:4] + ' seconds!'
# Get the local adress of the file from the dirac output to be decoded later
sharefiles.append(output['Value']['Successful'][ec_file[0]])
if(successful_downloads == number_of_needed_files):
print 'The number of necessary files has been reached.'
print 'Terminating download... ',
break
print 'done!'
time2 = time.time()
print "Total time for download: " + str(time2-time1)[:4] + ' seconds!'
else:
manager = multiprocessing.Manager()
# The counter counts how many downloads have been completed
counter = Counter(0, manager)
# The queue is for the threads to give real time information
# to the main function about the downloads
result_queue = manager.Queue()
# The Pool() class takes an intit argument for how many
# processes th pool should have
pool = multiprocessing.Pool(4)
# This is the timer for all the downloads
time1 = time.time()
# map_async() is different from map(), because it doesn't
# block the main function
pool.map_async(getFileCC_func, itertools.izip(ec_files, itertools.repeat(local_dir)))
# queue_results contains lists like:
# [number_of_download, name_of_file, elapsed_time]
queue_results = []
# Get results from the queue
while(True):
queue_results.append(result_queue.get())
print '[' + str(queue_results[-1][0]) + '/' + str(number_of_needed_files) + ']',
print 'Downloaded ' + queue_results[-1][1] + ' in ' + str(queue_results[-1][2])[:4] + ' seconds!'
# See if the download was successful
if(not queue_results[-1][3]['OK']):
print ''
print 'ERROR: ' + output['Message']
else:
successful_downloads += 1
# Get the local adress of the file from the dirac output to be decoded later
ec_file = queue_results[-1][1]
sharefiles.append(queue_results[-1][3]['Value']['Successful'][ec_file])
if(successful_downloads == number_of_needed_files):
print 'The number of necessary files has been reached.'
print 'Terminating download... ',
# terminate() has to be followed by join() in order to give the background machinery
# time to update the status of the object to reflect the termination.
pool.terminate()
pool.join()
print 'done!'
break
time2 = time.time()
print 'Total time for download: ' + str(time2-time1)[:4] + ' seconds!'
#######################################################################
###################### GENERATE THE ORIGINAL FILE #####################
#######################################################################
print 'Beginning decoding... ',
# Check if the input is a directory or a file name
if(input_str[0] == '/'):
# This code could convert input_str = "/gridpp/ptodev/_Penguins.jpg/"
# with input_str.split('/')[-2][1:] into Penguins.jpg (The filename)
zfec_ouput_file_name = output_dir + input_str.split('/')[-2][1:]
else:
# If the input string does not start with / we assume that it is the filename
zfec_ouput_file_name = output_dir + input_str
# Create a file object where the original will be recreated
zfec_ouput_file = open(zfec_ouput_file_name, 'wb')
# Craete file objects for the erasure coded segments
zfec_input_files = []
for sharefile in sharefiles:
zfec_input_files.append(open(sharefile, 'rb'))
# Decode the file
zfec.filefec.decode_from_files(zfec_ouput_file, zfec_input_files)
print 'done!'
print 'The restored original file has been is located at: ' + zfec_ouput_file_name
#######################################################################
################# CLEAN THE LOCAL ERASURE CODED FILES #################
#######################################################################
print "Cleaning up the local EC files... ",
files = glob.glob(local_dir + '*')
for f in files:
os.remove(f)
print "done!"