Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit bad input file reporting #8801

Merged
merged 2 commits into from
Nov 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 106 additions & 59 deletions src/python/TaskWorker/Actions/RetryJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import socket
from collections import namedtuple

from ServerUtilities import executeCommand
from ServerUtilities import executeCommand, getLock
from ServerUtilities import MAX_DISK_SPACE, MAX_WALLTIME, MAX_MEMORY

if 'useHtcV2' in os.environ:
Expand All @@ -17,6 +17,12 @@

JOB_RETURN_CODES = namedtuple('JobReturnCodes', 'OK RECOVERABLE_ERROR FATAL_ERROR')(0, 1, 2)

# strings in fatal root exception text which indicate code problem, not corrupted file
# a small "knowledge data base"
NOT_FILE_RELATED_FATAL_ROOT_ERRORS = [
"already deleted (list name = TList)",
]

# Without this environment variable set, HTCondor takes a write lock per logfile entry
os.environ['_condor_ENABLE_USERLOG_LOCKING'] = 'false'

Expand Down Expand Up @@ -413,7 +419,9 @@ def check_corrupted_file(self, exitCode):

corruptedFile = False
suspiciousFile = False
fatalLine = False
inputFileName = 'NotAvailable'
errorLines = []
RSE = self.site
RSE = RSE if not RSE.startswith('T1') else f"{RSE}_Disk"
fname = os.path.realpath("WEB_DIR/job_out.%s.%d.txt" % (self.job_id, self.crab_retry))
Expand All @@ -424,70 +432,109 @@ def check_corrupted_file(self, exitCode):
# remember last opened file, in case of 8021 that's the one that matters
if line.startswith("== CMSSW:") and ' Successfully opened file' in line:
inputFileName = f"/store/{line.split('/store/')[1]}" # strip protocol part
if line.startswith("== CMSSW:") and "Fatal Root Error:" in line:
corruptedFile = True
self.logger.info("Corrupted input file found")
self.logger.debug(line)
errorLines = [line]
# file name is in next line
continue
if corruptedFile:
errorLines.append(line)
if '/store/' in line and '.root' in line:
# this may be better done in the script which processes the BadInputFiles reports
# if '/store/user' in line or '/store/group' in line and not 'rucio' in line:
# # no point in reporting files unknown to Rucio
# corruptedFile = False
# break

# extract the '/store/...root' part of this line
fragment1 = line.split('/store/')[1]
fragment2 = fragment1.split('.root')[0]
inputFileName = f"/store/{fragment2}.root"
self.logger.info(f"RSE: {RSE} - ec: {exitCode} - file: {inputFileName}")

else:
corruptedFile = False
suspiciousFile = True
errorLines.append('NOT CLEARLY CORRUPTED, OTHER ROOT ERROR ?')
errorLines.append('DID Identification may not be correct')
self.logger.info("RootFatalError does not contain file info")
# extract the Exception message
if fatalLine:
fatalExceptionLines.append(line)
if line.startswith("== CMSSW:") and " ----- Begin Fatal Exception" in line:
fatalExceptionLines = []
fatalLine = True
if line.startswith("== CMSSW:") and " ----- End Fatal Exception" in line:
break
if corruptedFile or suspiciousFile:
# add pointers to logs
schedHostname = socket.gethostname().split('.')[0]
schedId = schedHostname.removeprefix('vocms') # vomcs059 -> 059, vocms0106 -> 0106 etc,
username = self.reqname.split(':')[1].split('_')[0]
webDirUrl = f"https://cmsweb.cern.ch:8443/scheddmon/{schedId}/{username}/{self.reqname}"
stdoutUrl = f"{webDirUrl}/job_out.{self.job_id}.{self.crab_retry}.txt"
postJobUrl = f"{webDirUrl}/postjob.{self.job_id}.{self.crab_retry}.txt"
errorLines.append(f"stdout: {stdoutUrl}")
errorLines.append(f"postjob: {postJobUrl}")
# note things down
reportFileName = f'Badfile.job.{self.job_id}.{self.crab_retry}.json'
corruptionMessage = {'DID': f'cms:{inputFileName}', 'RSE': RSE,
'exitCode': exitCode, 'message': errorLines}
with open(reportFileName, 'w', encoding='utf-8') as fp:
json.dump(corruptionMessage, fp)
self.logger.info('corruption message prepared, gfal-copy to EOS')
proxy = os.getenv('X509_USER_PROXY')
self.logger.info(f"X509_USER_PROXY = {proxy}")
reportLocation = 'davs://eoscms.cern.ch:443/eos/cms/store/temp/user/BadInputFiles/'
# there can be so many that we better split by task
# parse fatal exception text
for line in fatalExceptionLines:
for falsePositive in NOT_FILE_RELATED_FATAL_ROOT_ERRORS:
if falsePositive in line:
return False
for line in fatalExceptionLines:
if "Fatal Root Error:" in fatalExceptionLines:
corruptedFile = True
self.logger.info("Corrupted input file found")
self.logger.debug(line)
errorLines = [line]
# file name is in next line
continue
if corruptedFile:
reportLocation += f'corrupted/new/{self.reqname}/'
if suspiciousFile:
reportLocation += f'suspicious/new/{self.reqname}/'

destination = reportLocation + reportFileName
cmd = f'gfal-copy -vp -t 60 {reportFileName} {destination}'
out, err, ec = executeCommand(cmd)
if ec:
self.logger.error(f'gfal-copy failed with out: {out} err: {err}')
errorLines.append(line)
if '/store/' in line and '.root' in line:
# this may be better done in the script which processes the BadInputFiles reports
# if '/store/user' in line or '/store/group' in line and not 'rucio' in line:
# # no point in reporting files unknown to Rucio
# corruptedFile = False
# break

# extract the '/store/...root' part of this line
fragment1 = line.split('/store/')[1]
fragment2 = fragment1.split('.root')[0]
inputFileName = f"/store/{fragment2}.root"
self.logger.info(f"RSE: {RSE} - ec: {exitCode} - file: {inputFileName}")
else:
corruptedFile = False
suspiciousFile = True
errorLines.append('NOT CLEARLY CORRUPTED, OTHER ROOT ERROR ?')
errorLines.append('DID Identification may not be correct')
self.logger.info("RootFatalError does not contain file info")
if corruptedFile or suspiciousFile:
corruptionMessage = {'DID': f'cms:{inputFileName}', 'RSE': RSE,
'exitCode': exitCode, 'message': errorLines}
self.reportBadInputFile(corruptedFile, suspiciousFile, corruptionMessage)
return corruptedFile

# = = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =

def reportBadInputFile(self, corruptedFile, suspiciousFile, corruptionMessage):
"""
report bad file via a file on EOS
"""
taskName = self.reqname
username = taskName.split(':')[1].split('_')[0]
jobId = f"{self.job_id}.{self.crab_retry}"
# do not report HammerCloud
if username == 'sciaba':
return
# count reports for this task, too many indicates software error, not bad file(s)
fname = 'BadInputFileCount.json'
with getLock(fname): # use lock to avoid races with concurrent PostJobs
if os.path.exists(fname):
with open(fname, 'r', encoding='utf-8') as fp:
oldCount = int(json.load(fp))
else:
oldCount = 0
count = str(oldCount + 1)
with open(fname, 'w', encoding='utf-8') as fp:
json.dump(count, fp)
if count > 30:
return

# add pointers to logs
schedHostname = socket.gethostname().split('.')[0]
schedId = schedHostname.removeprefix('vocms') # vomcs059 -> 059, vocms0106 -> 0106 etc,
webDirUrl = f"https://cmsweb.cern.ch:8443/scheddmon/{schedId}/{username}/{taskName}"
stdoutUrl = f"{webDirUrl}/job_out.{jobId}.txt"
postJobUrl = f"{webDirUrl}/postjob.{jobId}.txt"
corruptionMessage['errorLines'].append(f"stdout: {stdoutUrl}")
corruptionMessage['errorLines'].append(f"postjob: {postJobUrl}")
# note things down
reportFileName = f'Badfile.job.{jobId}.json'
with open(reportFileName, 'w', encoding='utf-8') as fp:
json.dump(corruptionMessage, fp)
self.logger.info('corruption message prepared, gfal-copy to EOS')
proxy = os.getenv('X509_USER_PROXY')
self.logger.info(f"X509_USER_PROXY = {proxy}")
reportLocation = 'davs://eoscms.cern.ch:443/eos/cms/store/temp/user/BadInputFiles/'
# there can be so many that we better split by task
if corruptedFile:
reportLocation += f'corrupted/new/{taskName}/'
if suspiciousFile:
reportLocation += f'suspicious/new/{taskName}/'

destination = reportLocation + reportFileName
cmd = f'gfal-copy -vp -t 60 {reportFileName} {destination}'
out, err, ec = executeCommand(cmd)
if ec:
self.logger.error(f'gfal-copy failed with out: {out} err: {err}')

# = = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =

def check_empty_report(self):
"""
Need a doc string here.
Expand Down