This repository has been archived by the owner on Jan 30, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathjob.py
2754 lines (2239 loc) · 104 KB
/
job.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Authors:
# - Mario Lassnig, [email protected], 2016-2017
# - Daniel Drizhuk, [email protected], 2017
# - Paul Nilsson, [email protected], 2017-2021
# - Wen Guan, [email protected], 2018
from __future__ import print_function # Python 2
import os
import time
import hashlib
import random
import socket
import logging
try:
import Queue as queue # noqa: N813
#except ModuleNotFoundError: # Python 3
except Exception:
import queue # Python 3
from json import dumps #, loads
from re import findall
from glob import glob
from pilot.common.errorcodes import ErrorCodes
from pilot.common.exception import ExcThread, PilotException #, JobAlreadyRunning
from pilot.info import infosys, JobData, InfoService, JobInfoProvider
from pilot.util import https
from pilot.util.auxiliary import get_batchsystem_jobid, get_job_scheduler_id, get_pilot_id, \
set_pilot_state, get_pilot_state, check_for_final_server_update, pilot_version_banner, is_virtual_machine, \
is_python3, show_memory_usage, has_instruction_sets, locate_core_file, get_display_info
from pilot.util.config import config
from pilot.util.common import should_abort, was_pilot_killed
from pilot.util.constants import PILOT_MULTIJOB_START_TIME, PILOT_PRE_GETJOB, PILOT_POST_GETJOB, PILOT_KILL_SIGNAL, LOG_TRANSFER_NOT_DONE, \
LOG_TRANSFER_IN_PROGRESS, LOG_TRANSFER_DONE, LOG_TRANSFER_FAILED, SERVER_UPDATE_TROUBLE, SERVER_UPDATE_FINAL, \
SERVER_UPDATE_UPDATING, SERVER_UPDATE_NOT_DONE
from pilot.util.container import execute
from pilot.util.filehandling import find_text_files, tail, is_json, copy, remove, write_json, establish_logging, write_file, \
create_symlink
from pilot.util.harvester import request_new_jobs, remove_job_request_file, parse_job_definition_file, \
is_harvester_mode, get_worker_attributes_file, publish_job_report, publish_work_report, get_event_status_file, \
publish_stageout_files
from pilot.util.jobmetrics import get_job_metrics
from pilot.util.math import mean
from pilot.util.middleware import containerise_general_command
from pilot.util.monitoring import job_monitor_tasks, check_local_space
from pilot.util.monitoringtime import MonitoringTime
from pilot.util.processes import cleanup, threads_aborted, kill_process, kill_processes
from pilot.util.proxy import get_distinguished_name
from pilot.util.queuehandling import scan_for_jobs, put_in_queue, queue_report, purge_queue
from pilot.util.timing import add_to_pilot_timing, timing_report, get_postgetjob_time, get_time_since, time_stamp
from pilot.util.workernode import get_disk_space, collect_workernode_info, get_node_name, get_cpu_model
logger = logging.getLogger(__name__)
errors = ErrorCodes()
def control(queues, traces, args):
"""
Main function of job control.
:param queues: internal queues for job handling.
:param traces: tuple containing internal pilot states.
:param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
:return:
"""
targets = {'validate': validate, 'retrieve': retrieve, 'create_data_payload': create_data_payload,
'queue_monitor': queue_monitor, 'job_monitor': job_monitor, 'fast_job_monitor': fast_job_monitor}
threads = [ExcThread(bucket=queue.Queue(), target=target, kwargs={'queues': queues, 'traces': traces, 'args': args},
name=name) for name, target in list(targets.items())] # Python 2/3
[thread.start() for thread in threads]
# if an exception is thrown, the graceful_stop will be set by the ExcThread class run() function
while not args.graceful_stop.is_set():
for thread in threads:
bucket = thread.get_bucket()
try:
exc = bucket.get(block=False)
except queue.Empty:
pass
else:
exc_type, exc_obj, exc_trace = exc
logger.warning("thread \'%s\' received an exception from bucket: %s", thread.name, exc_obj)
# deal with the exception
# ..
thread.join(0.1)
time.sleep(0.1)
time.sleep(0.5)
logger.debug('job control ending since graceful_stop has been set')
if args.abort_job.is_set():
if traces.pilot['command'] == 'aborting':
logger.warning('jobs are aborting')
elif traces.pilot['command'] == 'abort':
logger.warning('job control detected a set abort_job (due to a kill signal)')
traces.pilot['command'] = 'aborting'
# find all running jobs and stop them, find all jobs in queues relevant to this module
#abort_jobs_in_queues(queues, args.signal)
# proceed to set the job_aborted flag?
if threads_aborted():
logger.debug('will proceed to set job_aborted')
args.job_aborted.set()
else:
logger.debug('will not set job_aborted yet')
logger.debug('[job] control thread has finished')
# test kill signal during end of generic workflow
#import signal
#os.kill(os.getpid(), signal.SIGBUS)
def _validate_job(job):
"""
Verify job parameters for specific problems.
:param job: job object.
:return: Boolean.
"""
pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0) # Python 2/3
container = __import__('pilot.user.%s.container' % pilot_user, globals(), locals(), [user], 0) # Python 2/3
# should a container be used for the payload?
try:
kwargs = {'job': job}
job.usecontainer = container.do_use_container(**kwargs)
except Exception as error:
logger.warning('exception caught: %s', error)
return True if user.verify_job(job) else False
def verify_error_code(job):
"""
Make sure an error code is properly set.
This makes sure that job.piloterrorcode is always set for a failed/holding job, that not only
job.piloterrorcodes are set but not job.piloterrorcode. This function also negates the sign of the error code
and sets job state 'holding' (instead of 'failed') if the error is found to be recoverable by a later job (user
jobs only).
:param job: job object.
:return:
"""
if job.piloterrorcode == 0 and len(job.piloterrorcodes) > 0:
logger.warning('piloterrorcode set to first piloterrorcodes list entry: %s', str(job.piloterrorcodes))
job.piloterrorcode = job.piloterrorcodes[0]
if job.piloterrorcode != 0 and job.is_analysis():
if errors.is_recoverable(code=job.piloterrorcode):
job.piloterrorcode = -abs(job.piloterrorcode)
job.state = 'failed'
logger.info('failed user job is recoverable (error code=%s)', job.piloterrorcode)
else:
logger.info('failed user job is not recoverable')
else:
logger.info('verified error code')
def get_proper_state(job, state):
"""
Return a proper job state to send to server.
This function should only return 'starting', 'running', 'finished', 'holding' or 'failed'.
If the internal job.serverstate is not yet set, it means it is the first server update, ie 'starting' should be
sent.
:param job: job object.
:param state: internal pilot state (string).
:return: valid server state (string).
"""
if job.serverstate == "finished" or job.serverstate == "failed":
pass
elif job.serverstate == "" and state != "finished" and state != "failed":
job.serverstate = 'starting'
elif state == "finished" or state == "failed" or state == "holding":
job.serverstate = state
else:
job.serverstate = 'running'
return job.serverstate
def publish_harvester_reports(state, args, data, job, final):
"""
Publish all reports needed by Harvester.
:param state: job state (string).
:param args: pilot args object.
:param data: data structure for server update (dictionary).
:param job: job object.
:param final: is this the final update? (Boolean).
:return: True if successful, False otherwise (Boolean).
"""
# write part of the heartbeat message to worker attributes files needed by Harvester
path = get_worker_attributes_file(args)
# add jobStatus (state) for Harvester
data['jobStatus'] = state
# publish work report
if not publish_work_report(data, path):
logger.debug('failed to write to workerAttributesFile %s', path)
return False
# check if we are in final state then write out information for output files
if final:
# Use the job information to write Harvester event_status.dump file
event_status_file = get_event_status_file(args)
if publish_stageout_files(job, event_status_file):
logger.debug('wrote log and output files to file %s', event_status_file)
else:
logger.warning('could not write log and output files to file %s', event_status_file)
return False
# publish job report
_path = os.path.join(job.workdir, config.Payload.jobreport)
if os.path.exists(_path):
if publish_job_report(job, args, config.Payload.jobreport):
logger.debug('wrote job report file')
return True
else:
logger.warning('failed to write job report file')
return False
else:
logger.info('finished writing various report files in Harvester mode')
return True
def write_heartbeat_to_file(data):
"""
Write heartbeat dictionary to file.
This is only done when server updates are not wanted.
:param data: server data (dictionary).
:return: True if successful, False otherwise (Boolean).
"""
path = os.path.join(os.environ.get('PILOT_HOME'), config.Pilot.heartbeat_message)
if write_json(path, data):
logger.debug('heartbeat dictionary: %s', data)
logger.debug('wrote heartbeat to file %s', path)
return True
else:
return False
def send_state(job, args, state, xml=None, metadata=None, test_tobekilled=False):
"""
Update the server (send heartbeat message).
Interpret and handle any server instructions arriving with the updateJob back channel.
:param job: job object.
:param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
:param state: job state (string).
:param xml: optional metadata xml (string).
:param metadata: job report metadata read as a string.
:param test_tobekilled: emulate a tobekilled command (boolean).
:return: boolean (True if successful, False otherwise).
"""
state = get_proper_state(job, state)
# should the pilot make any server updates?
if not args.update_server:
logger.info('pilot will not update the server (heartbeat message will be written to file)')
tag = 'sending' if args.update_server else 'writing'
if state == 'finished' or state == 'failed' or state == 'holding':
final = True
os.environ['SERVER_UPDATE'] = SERVER_UPDATE_UPDATING
logger.info('job %s has %s - %s final server update', job.jobid, state, tag)
# make sure that job.state is 'failed' if there's a set error code
if job.piloterrorcode or job.piloterrorcodes:
logger.warning('making sure that job.state is set to failed since a pilot error code is set')
state = 'failed'
job.state = state
# make sure an error code is properly set
elif state != 'finished':
verify_error_code(job)
else:
final = False
logger.info('job %s has state \'%s\' - %s heartbeat', job.jobid, state, tag)
# build the data structure needed for getJob, updateJob
data = get_data_structure(job, state, args, xml=xml, metadata=metadata)
# write the heartbeat message to file if the server is not to be updated by the pilot (Nordugrid mode)
if not args.update_server:
logger.debug('is_harvester_mode(args) : {0}'.format(is_harvester_mode(args)))
# if in harvester mode write to files required by harvester
if is_harvester_mode(args):
return publish_harvester_reports(state, args, data, job, final)
else:
# store the file in the main workdir
return write_heartbeat_to_file(data)
try:
if config.Pilot.pandajob == 'real':
time_before = int(time.time())
max_attempts = 10
attempt = 0
done = False
while attempt < max_attempts and not done:
logger.info('job update attempt %d/%d', attempt + 1, max_attempts)
# get the URL for the PanDA server from pilot options or from config
pandaserver = get_panda_server(args.url, args.port)
res = https.request('{pandaserver}/server/panda/updateJob'.format(pandaserver=pandaserver), data=data)
if res is not None:
done = True
attempt += 1
time_after = int(time.time())
logger.info('server updateJob request completed in %ds for job %s', time_after - time_before, job.jobid)
logger.info("server responded with: res = %s", str(res))
show_memory_usage()
if res is not None:
# does the server update contain any backchannel information? if so, update the job object
handle_backchannel_command(res, job, args, test_tobekilled=test_tobekilled)
if final:
os.environ['SERVER_UPDATE'] = SERVER_UPDATE_FINAL
logger.debug('set SERVER_UPDATE=SERVER_UPDATE_FINAL')
return True
else:
logger.info('skipping job update for fake test job')
return True
except Exception as error:
logger.warning('exception caught while sending https request: %s', error)
logger.warning('possibly offending data: %s', data)
if final:
os.environ['SERVER_UPDATE'] = SERVER_UPDATE_TROUBLE
logger.debug('set SERVER_UPDATE=SERVER_UPDATE_TROUBLE')
return False
def get_job_status_from_server(job_id, url, port):
"""
Return the current status of job <jobId> from the dispatcher.
typical dispatcher response: 'status=finished&StatusCode=0'
StatusCode 0: succeeded
10: time-out
20: general error
30: failed
In the case of time-out, the dispatcher will be asked one more time after 10 s.
:param job_id: PanDA job id (int).
:param url: PanDA server URL (string).
:param port: PanDA server port (int).
:return: status (string; e.g. holding), attempt_nr (int), status_code (int)
"""
status = 'unknown'
attempt_nr = 0
status_code = 0
if config.Pilot.pandajob == 'fake':
return status, attempt_nr, status_code
data = {}
data['ids'] = job_id
# get the URL for the PanDA server from pilot options or from config
pandaserver = get_panda_server(url, port)
# ask dispatcher about lost job status
trial = 1
max_trials = 2
while trial <= max_trials:
try:
# open connection
ret = https.request('{pandaserver}/server/panda/getStatus'.format(pandaserver=pandaserver), data=data)
response = ret[1]
logger.info("response: %s", str(response))
if response:
try:
# decode the response
# eg. var = ['status=notfound', 'attemptNr=0', 'StatusCode=0']
# = response
status = response['status'] # e.g. 'holding'
attempt_nr = int(response['attemptNr']) # e.g. '0'
status_code = int(response['StatusCode']) # e.g. '0'
except Exception as error:
logger.warning(
"exception: dispatcher did not return allowed values: %s, %s", str(ret), error)
status = "unknown"
attempt_nr = -1
status_code = 20
else:
logger.debug('server job status=%s, attempt_nr=%d, status_code=%d', status, attempt_nr, status_code)
else:
logger.warning("dispatcher did not return allowed values: %s", str(ret))
status = "unknown"
attempt_nr = -1
status_code = 20
except Exception as error:
logger.warning("could not interpret job status from dispatcher: %s", error)
status = 'unknown'
attempt_nr = -1
status_code = -1
break
else:
if status_code == 0: # success
break
elif status_code == 10: # time-out
trial += 1
time.sleep(10)
continue
elif status_code == 20: # other error
if ret[0] == 13056 or ret[0] == '13056':
logger.warning("wrong certificate used with curl operation? (encountered error 13056)")
break
else: # general error
break
return status, attempt_nr, status_code
def get_panda_server(url, port):
"""
Get the URL for the PanDA server.
:param url: URL string, if set in pilot option (port not included).
:param port: port number, if set in pilot option (int).
:return: full URL (either from pilot options or from config file)
"""
if url.startswith('https://'):
url = url.replace('https://', '')
if url != '' and port != 0:
pandaserver = '%s:%s' % (url, port) if ":" not in url else url
else:
pandaserver = config.Pilot.pandaserver
if not pandaserver.startswith('http'):
pandaserver = 'https://' + pandaserver
# add randomization for PanDA server
default = 'pandaserver.cern.ch'
if default in pandaserver:
rnd = random.choice([socket.getfqdn(vv) for vv in set([v[-1][0] for v in socket.getaddrinfo(default, 25443, socket.AF_INET)])])
pandaserver = pandaserver.replace(default, rnd)
logger.debug('updated %s to %s', default, pandaserver)
return pandaserver
def get_debug_command(cmd):
"""
Identify and filter the given debug command.
Note: only a single command will be allowed from a predefined list: tail, ls, gdb, ps, du.
:param cmd: raw debug command from job definition (string).
:return: debug_mode (Boolean, True if command is deemed ok), debug_command (string).
"""
debug_mode = False
debug_command = ""
allowed_commands = ['tail', 'ls', 'ps', 'gdb', 'du']
forbidden_commands = ['rm']
# remove any 'debug,' command that the server might send redundantly
if ',' in cmd and 'debug' in cmd:
cmd = cmd.replace('debug,', '').replace(',debug', '')
try:
tmp = cmd.split(' ')
com = tmp[0]
except Exception as error:
logger.warning('failed to identify debug command: %s', error)
else:
if com not in allowed_commands:
logger.warning('command=%s is not in the list of allowed commands: %s', com, str(allowed_commands))
elif ';' in cmd or ';' in cmd:
logger.warning('debug command cannot contain \';\': \'%s\'', cmd)
elif com in forbidden_commands:
logger.warning('command=%s is not allowed', com)
else:
debug_mode = True
debug_command = cmd
return debug_mode, debug_command
def handle_backchannel_command(res, job, args, test_tobekilled=False):
"""
Does the server update contain any backchannel information? if so, update the job object.
:param res: server response (dictionary).
:param job: job object.
:param args: pilot args object.
:param test_tobekilled: emulate a tobekilled command (boolean).
:return:
"""
if test_tobekilled:
logger.info('faking a \'tobekilled\' command')
res['command'] = 'tobekilled'
if 'command' in res and res.get('command') != 'NULL':
# warning: server might return comma-separated string, 'debug,tobekilled'
cmd = res.get('command')
# is it a 'command options'-type? debug_command=tail .., ls .., gdb .., ps .., du ..
if ' ' in cmd and 'tobekilled' not in cmd:
try:
job.debug, job.debug_command = get_debug_command(cmd)
except Exception as error:
logger.debug('exception caught in get_debug_command(): %s', error)
elif 'tobekilled' in cmd:
logger.info('pilot received a panda server signal to kill job %s at %s', job.jobid, time_stamp())
set_pilot_state(job=job, state="failed")
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.PANDAKILL)
if job.pid:
logger.debug('killing payload process')
kill_process(job.pid)
else:
logger.debug('no pid to kill')
args.abort_job.set()
elif 'softkill' in cmd:
logger.info('pilot received a panda server signal to softkill job %s at %s', job.jobid, time_stamp())
# event service kill instruction
job.debug_command = 'softkill'
elif 'debug' in cmd:
logger.info('pilot received a command to turn on standard debug mode from the server')
job.debug = True
job.debug_command = 'debug'
elif 'debugoff' in cmd:
logger.info('pilot received a command to turn off debug mode from the server')
job.debug = False
job.debug_command = 'debugoff'
else:
logger.warning('received unknown server command via backchannel: %s', cmd)
# for testing debug mode
# job.debug = True
# job.debug_command = 'du -sk'
# job.debug_command = 'tail -30 payload.stdout'
# job.debug_command = 'ls -ltr workDir' # not really tested
# job.debug_command = 'ls -ltr %s' % job.workdir
# job.debug_command = 'ps -ef'
# job.debug_command = 'ps axo pid,ppid,pgid,args'
# job.debug_command = 'gdb --pid % -ex \'generate-core-file\''
def add_data_structure_ids(data, version_tag):
"""
Add pilot, batch and scheduler ids to the data structure for getJob, updateJob.
:param data: data structure (dict).
:return: updated data structure (dict).
"""
schedulerid = get_job_scheduler_id()
if schedulerid:
data['schedulerID'] = schedulerid
pilotid = get_pilot_id()
if pilotid:
pilotversion = os.environ.get('PILOT_VERSION')
# report the batch system job id, if available
batchsystem_type, batchsystem_id = get_batchsystem_jobid()
if batchsystem_type:
data['pilotID'] = "%s|%s|%s|%s" % (pilotid, batchsystem_type, version_tag, pilotversion)
data['batchID'] = batchsystem_id
else:
data['pilotID'] = "%s|%s|%s" % (pilotid, version_tag, pilotversion)
return data
def get_data_structure(job, state, args, xml=None, metadata=None):
"""
Build the data structure needed for getJob, updateJob.
:param job: job object.
:param state: state of the job (string).
:param args:
:param xml: optional XML string.
:param metadata: job report metadata read as a string.
:return: data structure (dictionary).
"""
data = {'jobId': job.jobid,
'state': state,
'timestamp': time_stamp(),
'siteName': os.environ.get('PILOT_SITENAME'), # args.site,
'node': get_node_name(),
'attemptNr': job.attemptnr}
# add pilot, batch and scheduler ids to the data structure
data = add_data_structure_ids(data, args.version_tag)
starttime = get_postgetjob_time(job.jobid, args)
if starttime:
data['startTime'] = starttime
job_metrics = get_job_metrics(job)
if job_metrics:
data['jobMetrics'] = job_metrics
if xml is not None:
data['xml'] = xml
if metadata is not None:
data['metaData'] = metadata
# in debug mode, also send a tail of the latest log file touched by the payload
if job.debug:
data['stdout'] = process_debug_mode(job)
# add the core count
if job.corecount and job.corecount != 'null' and job.corecount != 'NULL':
data['coreCount'] = job.corecount
#data['coreCount'] = mean(job.corecounts) if job.corecounts else job.corecount
if job.corecounts:
_mean = mean(job.corecounts)
logger.info('mean actualcorecount: %f', _mean)
data['meanCoreCount'] = _mean
# get the number of events, should report in heartbeat in case of preempted.
if job.nevents != 0:
data['nEvents'] = job.nevents
logger.info("total number of processed events: %d (read)", job.nevents)
else:
logger.info("payload/TRF did not report the number of read events")
# get the CU consumption time
constime = get_cpu_consumption_time(job.cpuconsumptiontime)
if constime and constime != -1:
data['cpuConsumptionTime'] = constime
data['cpuConversionFactor'] = job.cpuconversionfactor
data['cpuConsumptionUnit'] = job.cpuconsumptionunit + "+" + get_cpu_model()
instruction_sets = has_instruction_sets(['AVX2'])
product, vendor = get_display_info()
if instruction_sets:
if 'cpuConsumptionUnit' in data:
data['cpuConsumptionUnit'] += '+' + instruction_sets
else:
data['cpuConsumptionUnit'] = instruction_sets
if product and vendor:
logger.debug('cpuConsumptionUnit: could have added: product=%s, vendor=%s', product, vendor)
# add memory information if available
add_memory_info(data, job.workdir, name=job.memorymonitor)
if state == 'finished' or state == 'failed':
add_timing_and_extracts(data, job, state, args)
add_error_codes(data, job)
return data
def process_debug_mode(job):
"""
Handle debug mode - preprocess debug command, get the output and kill the payload in case of gdb.
:param job: job object.
:return: stdout from debug command (string).
"""
# for gdb commands, use the proper gdb version (the system one may be too old)
if job.debug_command.startswith('gdb '):
pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0) # Python 2/3
user.preprocess_debug_command(job)
stdout = get_debug_stdout(job)
if stdout:
# in case gdb was successfully used, the payload can now be killed
if job.debug_command.startswith('gdb ') and job.pid:
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.PANDAKILL,
msg='payload was killed after gdb produced requested core file')
logger.debug('will proceed to kill payload processes')
kill_processes(job.pid)
return stdout
def get_debug_stdout(job):
"""
Return the requested output from a given debug command.
:param job: job object.
:return: output (string).
"""
if job.debug_command == 'debug':
return get_payload_log_tail(job.workdir)
elif 'tail ' in job.debug_command:
return get_requested_log_tail(job.debug_command, job.workdir)
elif 'ls ' in job.debug_command:
return get_ls(job.debug_command, job.workdir)
elif 'ps ' in job.debug_command or 'gdb ' in job.debug_command:
return get_general_command_stdout(job)
else:
# general command, execute and return output
_, stdout, _ = execute(job.debug_command)
logger.info('debug_command: %s:\n\n%s\n', job.debug_command, stdout)
return stdout
def get_general_command_stdout(job):
"""
Return the output from the requested debug command.
:param job: job object.
:return: output (string).
"""
stdout = ''
# for gdb, we might have to process the debug command (e.g. to identify the proper pid to debug)
if 'gdb ' in job.debug_command and '--pid %' in job.debug_command:
pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0) # Python 2/3
job.debug_command = user.process_debug_command(job.debug_command, job.jobid)
if job.debug_command:
_containerisation = False # set this with some logic instead - not used for now
if _containerisation:
try:
containerise_general_command(job, job.infosys.queuedata.container_options,
label='general',
container_type='container')
except PilotException as error:
logger.warning('general containerisation threw a pilot exception: %s', error)
except Exception as error:
logger.warning('general containerisation threw an exception: %s', error)
else:
_, stdout, stderr = execute(job.debug_command)
logger.debug("%s (stdout):\n\n%s\n\n", job.debug_command, stdout)
logger.debug("%s (stderr):\n\n%s\n\n", job.debug_command, stderr)
# in case a core file was produced, locate it
path = locate_core_file(cmd=job.debug_command) if 'gdb ' in job.debug_command else ''
if path:
# copy it to the working directory (so it will be saved in the log)
try:
copy(path, job.workdir)
except Exception:
pass
return stdout
def get_ls(debug_command, workdir):
"""
Return the requested ls debug command.
:param debug_command: full debug command (string).
:param workdir: job work directory (string).
:return: output (string).
"""
items = debug_command.split(' ')
# cmd = items[0]
options = ' '.join(items[1:])
path = options.split(' ')[-1] if ' ' in options else options
if path.startswith('-'):
path = '.'
finalpath = os.path.join(workdir, path)
debug_command = debug_command.replace(path, finalpath)
_, stdout, _ = execute(debug_command)
logger.debug("%s:\n\n%s\n\n", debug_command, stdout)
return stdout
def get_requested_log_tail(debug_command, workdir):
"""
Return the tail of the requested debug log.
Examples
tail workdir/tmp.stdout* <- pilot finds the requested log file in the specified relative path
tail log.RAWtoALL <- pilot finds the requested log file
:param debug_command: full debug command (string).
:param workdir: job work directory (string).
:return: output (string).
"""
_tail = ""
items = debug_command.split(' ')
cmd = items[0]
options = ' '.join(items[1:])
logger.debug('debug command: %s', cmd)
logger.debug('debug options: %s', options)
# assume that the path is the last of the options; <some option> <some path>
path = options.split(' ')[-1] if ' ' in options else options
fullpath = os.path.join(workdir, path)
# find all files with the given pattern and pick the latest updated file (if several)
files = glob(fullpath)
if files:
logger.info('files found: %s', str(files))
_tail = get_latest_log_tail(files)
else:
logger.warning('did not find \'%s\' in path %s', path, fullpath)
if _tail:
logger.debug('tail =\n\n%s\n\n', _tail)
return _tail
def add_error_codes(data, job):
"""
Add error codes to data structure.
:param data: data dictionary.
:param job: job object.
:return:
"""
# error codes
pilot_error_code = job.piloterrorcode
pilot_error_codes = job.piloterrorcodes
if pilot_error_codes != []:
logger.warning('pilotErrorCodes = %s (will report primary/first error code)', str(pilot_error_codes))
data['pilotErrorCode'] = pilot_error_codes[0]
else:
data['pilotErrorCode'] = pilot_error_code
# add error info
pilot_error_diag = job.piloterrordiag
pilot_error_diags = job.piloterrordiags
if pilot_error_diags != []:
logger.warning('pilotErrorDiags = %s (will report primary/first error diag)', str(pilot_error_diags))
data['pilotErrorDiag'] = pilot_error_diags[0]
else:
data['pilotErrorDiag'] = pilot_error_diag
data['transExitCode'] = job.transexitcode
data['exeErrorCode'] = job.exeerrorcode
data['exeErrorDiag'] = job.exeerrordiag
def get_cpu_consumption_time(cpuconsumptiontime):
"""
Get the CPU consumption time.
The function makes sure that the value exists and is within allowed limits (< 10^9).
:param cpuconsumptiontime: CPU consumption time (int/None).
:return: properly set CPU consumption time (int/None).
"""
constime = None
try:
constime = int(cpuconsumptiontime)
except Exception:
constime = None
if constime and constime > 10 ** 9:
logger.warning("unrealistic cpuconsumptiontime: %d (reset to -1)", constime)
constime = -1
return constime
def add_timing_and_extracts(data, job, state, args):
"""
Add timing info and log extracts to data structure for a completed job (finished or failed) to be sent to server.
Note: this function updates the data dictionary.
:param data: data structure (dictionary).
:param job: job object.
:param state: state of the job (string).
:param args: pilot args.
:return:
"""
time_getjob, time_stagein, time_payload, time_stageout, time_total_setup = timing_report(job.jobid, args)
data['pilotTiming'] = "%s|%s|%s|%s|%s" % \
(time_getjob, time_stagein, time_payload, time_stageout, time_total_setup)
# add log extracts (for failed/holding jobs or for jobs with outbound connections)
extracts = ""
if state == 'failed' or state == 'holding':
pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
user = __import__('pilot.user.%s.diagnose' % pilot_user, globals(), locals(), [pilot_user], 0) # Python 2/3
extracts = user.get_log_extracts(job, state)
if extracts != "":
logger.warning('\nXXXXXXXXXXXXXXXXXXXXX[begin log extracts]\n%s\nXXXXXXXXXXXXXXXXXXXXX[end log extracts]', extracts)
data['pilotLog'] = extracts[:1024]
data['endTime'] = time.time()
def add_memory_info(data, workdir, name=""):
"""
Add memory information (if available) to the data structure that will be sent to the server with job updates
Note: this function updates the data dictionary.
:param data: data structure (dictionary).
:param workdir: working directory of the job (string).
:param name: name of memory monitor (string).
:return:
"""
pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
utilities = __import__('pilot.user.%s.utilities' % pilot_user, globals(), locals(), [pilot_user], 0) # Python 2/3
try:
utility_node = utilities.get_memory_monitor_info(workdir, name=name)
data.update(utility_node)
except Exception as error:
logger.info('memory information not available: %s', error)
def remove_pilot_logs_from_list(list_of_files):
"""
Remove any pilot logs from the list of last updated files.
:param list_of_files: list of last updated files (list).
:return: list of files (list).
"""
# note: better to move experiment specific files to user area
# ignore the pilot log files
try:
to_be_removed = [config.Pilot.pilotlog, config.Pilot.stageinlog, config.Pilot.stageoutlog,
config.Pilot.timing_file, config.Pilot.remotefileverification_dictionary,
config.Pilot.remotefileverification_log, config.Pilot.base_trace_report,
config.Container.container_script, config.Container.release_setup,
config.Container.stagein_status_dictionary, config.Container.stagein_replica_dictionary,
'eventLoopHeartBeat.txt', 'memory_monitor_output.txt', 'memory_monitor_summary.json_snapshot']
except Exception as error:
logger.warning('exception caught: %s', error)
to_be_removed = []
new_list_of_files = []
for filename in list_of_files:
if os.path.basename(filename) not in to_be_removed and '/pilot/' not in filename and 'prmon' not in filename:
new_list_of_files.append(filename)
return new_list_of_files
def get_payload_log_tail(workdir):
"""
Return the tail of the payload stdout or its latest updated log file.
:param workdir: job work directory (string).
:return: tail of stdout (string).
"""
# find the latest updated log file
# list_of_files = get_list_of_log_files()
# find the latest updated text file
list_of_files = find_text_files()
list_of_files = remove_pilot_logs_from_list(list_of_files)
if not list_of_files:
logger.info('no log files were found (will use default %s)', config.Payload.payloadstdout)
list_of_files = [os.path.join(workdir, config.Payload.payloadstdout)]
return get_latest_log_tail(list_of_files)
def get_latest_log_tail(files):
"""
Get the tail of the latest updated file from the given file list.
:param files: files (list).
"""
stdout_tail = ""
try:
latest_file = max(files, key=os.path.getmtime)
logger.info('tail of file %s will be added to heartbeat', latest_file)