-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathimporting.py
199 lines (152 loc) · 7.26 KB
/
importing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
import os
import glob
import asyncio
from concurrent.futures import ProcessPoolExecutor
from helpers import logged
from storage import GUIDataStore
from data_types import FileFormat, SampleFull, TooManyRequestsException
from compressing import GUIBlobCompressor
from importing.dqmio_importer import DQMIOImporter
from importing.dqmclassic_importer import DQMCLASSICImporter
from importing.protobuf_importer import ProtobufImporter
class GUIImportManager:
"""
This is a ROOT file importing manager.
It picks the correct importer based on a specifc file format that
has to be imported and delegates actual importing to it.
"""
__EOSPATH = '/eos/cms/store/group/comm_dqm/DQMGUI_data/Run*/*/R000*/DQM_*.root'
__EOSPREFIX = 'root://eoscms.cern.ch/'
store = GUIDataStore()
compressor = GUIBlobCompressor()
semaphore = asyncio.Semaphore(100)
@classmethod
async def initialize(cls, files=__EOSPATH):
"""
Imports all samples from given ROOT files if no samples are present in the DB.
Format is assumed to be TDirectory.
If files is a string, it is globed to get the list of files.
If files is a list, all these files gets imported.
"""
if await cls.store.is_samples_empty():
if files == None:
files = cls.__EOSPATH
if isinstance(files, list) and len(files) >= 1:
# Make sure that a directory ends in '/*.root' for glob to search for files
if os.path.isdir(files[0]):
suffix = '*.root' if files[0].endswith('/') else '/*.root'
files = files[0] + suffix
if isinstance(files, str):
print('Listing files for registration, this might take a few minutes...')
files = glob.glob(files)
print(f'Found {len(files)} files, registering...')
importer = cls.__pick_importer(FileFormat.DQMCLASSIC)
samples = []
# Parse filenames to get the metadata
for file in files:
try:
run, dataset = importer.parse_filename(file)
samples.append(SampleFull(dataset=dataset, run=int(run), lumi=0, file=cls.__EOSPREFIX + file, fileformat=FileFormat.DQMCLASSIC))
except:
print('Unable to import file: %s' % file)
await cls.register_samples(samples)
@classmethod
async def destroy(cls):
pass
@classmethod
async def register_samples(cls, samples):
"""No need to go to an importer as it's format agnostic. Samples array is of type SamplesFull."""
await cls.store.register_samples(samples)
@classmethod
async def import_blobs(cls, dataset, run, lumi=0):
"""
Imports ME list and ME info blobs into the database from a ROOT/PB file.
It is required to first import the blobs before samples can be used.
"""
filename, fileformat = await cls.store.get_sample_file_info(dataset, run, lumi)
if not filename: # Sample doesn't exist
return False
# delegate the hard work to a process pool.
samples, blob_descriptions = await cls.import_in_worker(fileformat, filename, dataset, run, lumi)
await cls.store.register_samples(samples)
for blob_description in blob_descriptions:
# import_sync prepares everything
await cls.store.add_blobs(**blob_description)
return True
@classmethod
def __pick_importer(cls, file_format):
"""
Picks the correct importer based on the file format that's being imported.
If a new file format are added, an importer has to be registered in this method.
"""
if file_format == FileFormat.DQMCLASSIC:
return DQMCLASSICImporter()
elif file_format == FileFormat.DQMIO:
return DQMIOImporter()
elif file_format == FileFormat.PROTOBUF:
return ProtobufImporter()
return None
@classmethod
@logged
async def import_in_worker(cls, fileformat, filename, dataset, run, lumi):
"""
This function will call `import_sync` using a process pool.
"""
if cls.semaphore.locked():
# It takes very long to handle all 100 imports in parallel.
# If it happens, just rate limit the user. It is most probably an
# erroneous tool requesting to import everything in an endless loop.
raise TooManyRequestsException()
async with cls.semaphore:
with ProcessPoolExecutor(1) as executor:
return await asyncio.get_event_loop().run_in_executor(executor,
cls.import_sync, fileformat, filename, dataset, run, lumi)
@classmethod
def import_sync(cls, fileformat, filename, dataset, run, lumi):
"""
This function should be called in a different process (via multiprocessing)
to actually perform the import.
"""
# This must be a top-level, sync, public function, so multiprocessing
# can import this module on the (clean, forked before initalization)
# worker process and then call it using the pickle'd arguments.
# To call the async function, we set up and tear down an event loop just
# for this one call. The worker is left 'clean', without anything running.
# return asyncio.run(cls.import_async(fileformat, filename, dataset, run, lumi))
loop = asyncio.new_event_loop()
future = loop.run_until_complete(cls.import_async(fileformat, filename, dataset, run, lumi))
loop.close()
return future
# But we need an async function to call async stuff, so here it is.
@classmethod
async def import_async(cls, fileformat, filename, dataset, run, lumi):
"""
This function performs the actual import work, inside a dedicated main
loop in a process pool worker.
"""
importer = cls.__pick_importer(fileformat)
mes_lists = await importer.get_me_lists(filename, dataset, run, lumi)
# It's possible that some samples that exists in this file were not yet
# registered as samples (through register API endpoint). So we (re)create
# all samples that we have found
samples = [SampleFull(dataset, run=int(key[0]), lumi=int(key[1]), file=filename, fileformat=fileformat) for key in mes_lists]
blob_descriptions = []
for key in mes_lists:
mes = mes_lists[key]
mes.sort()
# Separate lists
names_list = b'\n'.join(name for name, _ in mes)
infos_list = [info for _, info in mes]
# these go straight into GUIDataStore.add_blobs(...), but back in the main process.
infos_blob = await cls.compressor.compress_names_list(names_list)
names_blob = await cls.compressor.compress_infos_list(infos_list)
blob_descriptions.append({
# Compress blobs
'names_blob': infos_blob,
'infos_blob': names_blob,
'dataset': dataset,
'filename': filename,
'run': key[0],
'lumi': key[1],
})
return samples, blob_descriptions