Skip to content

Commit

Permalink
Shrink fwjr whenever we fail to commit documents
Browse files Browse the repository at this point in the history
Set object attr before using it
  • Loading branch information
amaltaro committed Apr 21, 2023
1 parent cb6b352 commit 0897da5
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 15 deletions.
11 changes: 9 additions & 2 deletions src/python/WMCore/Database/CMSCouch.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,13 @@ def _reset_queue(self):
"""
self._queue = []

def getQueueSize(self):
"""
Return the current size of the queue, i.e., how
many documents are already queued up
"""
return len(self._queue)

def timestamp(self, data, label=''):
"""
Time stamp each doc in a list
Expand Down Expand Up @@ -239,8 +246,8 @@ def queue(self, doc, timestamp=False, viewlist=None, callback=None):
if timestamp:
self.timestamp(doc, timestamp)
# TODO: Thread this off so that it's non blocking...
if len(self._queue) >= self._queue_size:
print('queue larger than %s records, committing' % self._queue_size)
if self.getQueueSize() >= self._queue_size:
logging.warning('queue larger than %s records, committing', self._queue_size)
self.commit(viewlist=viewlist, callback=callback)
self._queue.append(doc)

Expand Down
73 changes: 60 additions & 13 deletions src/python/WMCore/JobStateMachine/ChangeState.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
Propagate a job from one state to another.
"""

import json
import sys
from builtins import str
import logging
import re
Expand Down Expand Up @@ -59,6 +60,28 @@ def discardConflictingDocument(couchDbInstance, data, result):
return result


def shrinkLargeFJR(couchDbInstance, sizeLimit=8e6):
"""
Look at the CouchDB database queue and empty documents
with a 'fwjr' field larger than sizeLimit.
:param couchDbInstance: couchdb database instance
:param sizeLimit: integer with the limit number of bytes
:return: None
"""
for doc in couchDbInstance._queue:
if sys.getsizeof(json.dumps(doc)) > sizeLimit:
if 'fwjr' in doc:
errMsg = f"The 'fwjr' attribute for jobid: {doc.get('jobid')} will "
errMsg += f"be empty because it is too large."
logging.warning(errMsg)
doc['fwjr'] = dict()
else:
errMsg = f"Found a large Couch document for job id: {doc.get('jobid')}, "
errMsg += "but it does not have a 'fwjr' field. Contact a developer!"
logging.error(errMsg)
return


def getDataFromSpecFile(specFile):
workload = WMWorkloadHelper()
workload.load(specFile)
Expand Down Expand Up @@ -88,6 +111,8 @@ def __init__(self, config, couchDbName=None):
self.jsumdatabase = None
self.statsumdatabase = None

# max total number of documents to be committed in the same Couch operation
self.maxBulkCommit = 250
self.couchdb = CouchServer(self.config.JobStateMachine.couchurl)
self._connectDatabases()

Expand All @@ -109,15 +134,17 @@ def _connectDatabases(self):
"""
if not hasattr(self, 'jobsdatabase') or self.jobsdatabase is None:
try:
self.jobsdatabase = self.couchdb.connectDatabase("%s/jobs" % self.dbname, size=250)
self.jobsdatabase = self.couchdb.connectDatabase("%s/jobs" % self.dbname,
size=self.maxBulkCommit)
except Exception as ex:
logging.error("Error connecting to couch db '%s/jobs': %s", self.dbname, str(ex))
self.jobsdatabase = None
return False

if not hasattr(self, 'fwjrdatabase') or self.fwjrdatabase is None:
try:
self.fwjrdatabase = self.couchdb.connectDatabase("%s/fwjrs" % self.dbname, size=250)
self.fwjrdatabase = self.couchdb.connectDatabase("%s/fwjrs" % self.dbname,
size=self.maxBulkCommit)
except Exception as ex:
logging.error("Error connecting to couch db '%s/fwjrs': %s", self.dbname, str(ex))
self.fwjrdatabase = None
Expand All @@ -126,7 +153,8 @@ def _connectDatabases(self):
if not hasattr(self, 'jsumdatabase') or self.jsumdatabase is None:
dbname = getattr(self.config.JobStateMachine, 'jobSummaryDBName')
try:
self.jsumdatabase = self.couchdb.connectDatabase(dbname, size=250)
self.jsumdatabase = self.couchdb.connectDatabase(dbname,
size=self.maxBulkCommit)
except Exception as ex:
logging.error("Error connecting to couch db '%s': %s", dbname, str(ex))
self.jsumdatabase = None
Expand All @@ -135,7 +163,8 @@ def _connectDatabases(self):
if not hasattr(self, 'statsumdatabase') or self.statsumdatabase is None:
dbname = getattr(self.config.JobStateMachine, 'summaryStatsDBName')
try:
self.statsumdatabase = self.couchdb.connectDatabase(dbname, size=250)
self.statsumdatabase = self.couchdb.connectDatabase(dbname,
size=self.maxBulkCommit)
except Exception as ex:
logging.error("Error connecting to couch db '%s': %s", dbname, str(ex))
self.jsumdatabase = None
Expand Down Expand Up @@ -212,6 +241,7 @@ def recordInCouch(self, jobs, newstate, oldstate, updatesummary=False):
timestamp = int(time.time())
couchRecordsToUpdate = []

countDocs = 0
for job in jobs:
couchDocID = job.get("couch_record", None)

Expand Down Expand Up @@ -273,6 +303,8 @@ def recordInCouch(self, jobs, newstate, oldstate, updatesummary=False):

couchRecordsToUpdate.append({"jobid": job["id"],
"couchid": jobDocument["_id"]})
if countDocs >= self.jobsdatabase.getQueueSize():
self.jobsdatabase.commit(callback=discardConflictingDocument)
self.jobsdatabase.queue(jobDocument, callback=discardConflictingDocument)
else:
# We send a PUT request to the stateTransition update handler.
Expand Down Expand Up @@ -354,13 +386,18 @@ def recordInCouch(self, jobs, newstate, oldstate, updatesummary=False):
"archivestatus": archStatus,
"fwjr": jsonFWJR,
"type": "fwjr"}
try:
self.fwjrdatabase.queue(fwjrDocument, timestamp=True, callback=discardConflictingDocument)
except CouchRequestTooLargeError as exc:
errMsg = f"FJR for jobid: {fwjrDocument['jobid']} will be skipped. Error: {str(exc)}"
logging.error(errMsg)
except Exception:
raise
if countDocs >= self.fwjrdatabase.getQueueSize():
try:
self.fwjrdatabase.commit(callback=discardConflictingDocument)
except CouchRequestTooLargeError as exc:
msg = "Failed to commit bulk of framework job report to CouchDB."
msg += f" Error: {str(exc)}"
logging.warning(msg)
shrinkLargeFJR(self.fwjrdatabase, 8e6)
# now all the documents should fit in
self.fwjrdatabase.commit(callback=discardConflictingDocument)

self.fwjrdatabase.queue(fwjrDocument, timestamp=True, callback=discardConflictingDocument)

updateSummaryDB(self.statsumdatabase, job)

Expand Down Expand Up @@ -458,15 +495,25 @@ def recordInCouch(self, jobs, newstate, oldstate, updatesummary=False):
jobSummary[prop] = jobSummary[prop] if jobSummary[prop] else currentJobDoc.get(prop, [])
except CouchNotFoundError:
pass
if countDocs >= self.fwjrdatabase.getQueueSize():
self.jsumdatabase.commit()
self.jsumdatabase.queue(jobSummary, timestamp=True)

if len(couchRecordsToUpdate) > 0:
self.setCouchDAO.execute(bulkList=couchRecordsToUpdate,
conn=self.getDBConn(),
transaction=self.existingTransaction())

try:
self.fwjrdatabase.commit(callback=discardConflictingDocument)
except CouchRequestTooLargeError as exc:
msg = "Failed to commit bulk of framework job report to CouchDB."
msg += f" Error: {str(exc)}"
logging.warning(msg)
shrinkLargeFJR(self.fwjrdatabase, 8e6)
# now all the documents should fit in
self.fwjrdatabase.commit(callback=discardConflictingDocument)
self.jobsdatabase.commit(callback=discardConflictingDocument)
self.fwjrdatabase.commit(callback=discardConflictingDocument)
self.jsumdatabase.commit()
return

Expand Down

0 comments on commit 0897da5

Please sign in to comment.