Skip to content

Commit

Permalink
Merge pull request #12202 from amaltaro/fix-12195-wma237
Browse files Browse the repository at this point in the history
Adopt MSPileup data into PileupFetcher - wmagent branch
  • Loading branch information
amaltaro authored Dec 10, 2024
2 parents cde16c2 + 81ce033 commit f1649e8
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 18 deletions.
13 changes: 12 additions & 1 deletion src/python/Utils/Patterns.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
Patterns module provides set of CS patterns
"""

import re

class Singleton(type):
"""Implementation of Singleton class"""
Expand All @@ -11,3 +11,14 @@ def __call__(cls, *args, **kwargs):
cls._instances[cls] = \
super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]


def getDomainName(urlStr):
"""
Given a URL string, return the domain name.
:param urlStr: URL string
:return: a string with the domain name (e.g. "cmsweb-prod")
"""
domainPattern = re.compile(r'https?://([^/]+)\.cern\.ch')
match = domainPattern.search(urlStr)
return match.group(1) if match else ""
72 changes: 55 additions & 17 deletions src/python/WMCore/WMSpec/Steps/Fetchers/PileupFetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@
of pileup files in the job sandbox for the dataset.
"""
from __future__ import print_function

from future.utils import viewitems

import datetime
import os
import hashlib
Expand All @@ -15,8 +11,10 @@
import logging
from json import JSONEncoder
import WMCore.WMSpec.WMStep as WMStep
from Utils.Patterns import getDomainName
from Utils.Utilities import encodeUnicodeToBytes
from WMCore.Services.DBS.DBSReader import DBSReader
from WMCore.Services.MSPileup.MSPileupUtils import getPileupDocs
from WMCore.Services.Rucio.Rucio import Rucio
from WMCore.WMSpec.Steps.Fetchers.FetcherInterface import FetcherInterface

Expand All @@ -34,7 +32,6 @@ def __init__(self):
Prepare module setup
"""
super(PileupFetcher, self).__init__()
# FIXME: find a way to pass the Rucio account name to this fetcher module
self.rucioAcct = "wmcore_pileup"
self.rucio = None

Expand All @@ -52,41 +49,81 @@ def _queryDbsAndGetPileupConfig(self, stepHelper, dbsReader):
"BlockB": {"FileList": [], "PhEDExNodeName": []}, ....}
"""
resultDict = {}
# first, figure out which instance of MSPileup and Rucio to use
pileupInstance = getDomainName(dbsReader.dbsURL)
msPileupUrl = f"https://{pileupInstance}.cern.ch/ms-pileup/data/pileup"
# FIXME: this juggling with Rucio is tough! We can get away without it,
# but for that we would have to use testbed MSPileup against Prod Rucio
if pileupInstance == "cmsweb-prod" or pileupInstance == "cmsweb":
rucioAuthUrl, rucioUrl = "cms-rucio-auth", "cms-rucio"
else:
rucioAuthUrl, rucioUrl = "cms-rucio-auth-int", "cms-rucio-int"
# initialize Rucio here to avoid this authentication on T0-WMAgent
self.rucio = Rucio(self.rucioAcct,
authUrl=f"https://{rucioAuthUrl}.cern.ch",
hostUrl=f"http://{rucioUrl}.cern.ch")

# iterate over input pileup types (e.g. "cosmics", "minbias")
for pileupType in stepHelper.data.pileup.listSections_():
# the format here is: step.data.pileup.cosmics.dataset = [/some/data/set]
datasets = getattr(getattr(stepHelper.data.pileup, pileupType), "dataset")
# each dataset input can generally be a list, iterate over dataset names
blockDict = {}
for dataset in datasets:

# using the original dataset, resolve blocks, files and number of events with DBS
fCounter = 0
for fileInfo in dbsReader.getFileListByDataset(dataset=dataset, detail=True):
blockDict.setdefault(fileInfo['block_name'], {'FileList': [],
'NumberOfEvents': 0,
'PhEDExNodeNames': []})
blockDict[fileInfo['block_name']]['FileList'].append(fileInfo['logical_file_name'])
blockDict[fileInfo['block_name']]['NumberOfEvents'] += fileInfo['event_count']
fCounter += 1

self._getDatasetLocation(dataset, blockDict)
logging.info(f"Found {len(blockDict)} blocks in DBS for dataset {dataset} with {fCounter} files")
self._getDatasetLocation(dataset, blockDict, msPileupUrl)

resultDict[pileupType] = blockDict
return resultDict

def _getDatasetLocation(self, dset, blockDict):
def _getDatasetLocation(self, dset, blockDict, msPileupUrl):
"""
Given a dataset name, query PhEDEx or Rucio and resolve the block location
:param dset: string with the dataset name
:param blockDict: dictionary with DBS summary info
:param msPileupUrl: string with the MSPileup url
:return: update blockDict in place
"""
# initialize Rucio here to avoid this authentication on T0-WMAgent
self.rucio = Rucio(self.rucioAcct)
blockReplicas = self.rucio.getPileupLockedAndAvailable(dset, account=self.rucioAcct)
for blockName, blockLocation in viewitems(blockReplicas):
try:
blockDict[blockName]['PhEDExNodeNames'] = list(blockLocation)
except KeyError:
logging.warning("Block '%s' present in Rucio but not in DBS", blockName)
# fetch the pileup configuration from MSPileup
try:
queryDict = {'query': {'pileupName': dset},
'filters': ['pileupName', 'customName', 'containerFraction', 'currentRSEs']}
doc = getPileupDocs(msPileupUrl, queryDict, method='POST')[0]
msg = f'Pileup dataset {doc["pileupName"]} with:\n\tcustom name: {doc["customName"]},'
msg += f'\n\tcurrent RSEs: {doc["currentRSEs"]}\n\tand container fraction: {doc["containerFraction"]}'
logging.info(msg)
except Exception as ex:
logging.error(f'Error querying MSPileup for dataset {dset}. Details: {str(ex)}')
raise ex

# custom dataset name means there was a container fraction change, use different scope
puScope = 'cms'
if doc["customName"]:
dset = doc["customName"]
puScope = 'group.wmcore'

blockReplicas = self.rucio.getBlocksInContainer(container=dset, scope=puScope)
logging.info(f"Found {len(blockReplicas)} blocks in container {dset} for scope {puScope}")

# Finally, update blocks present in Rucio with the MSPileup currentRSEs.
# Blocks not present in Rucio - hence only in DBS - are meant to be removed.
for blockName in list(blockDict):
if blockName not in blockReplicas:
logging.warning(f"Block {blockName} present in DBS but not in Rucio. Removing it.")
blockDict.pop(blockName)
else:
blockDict[blockName]['PhEDExNodeNames'] = doc["currentRSEs"]
logging.info(f"Final pileup dataset {dset} has a total of {len(blockDict)} blocks.")

def _getCacheFilePath(self, stepHelper):

Expand Down Expand Up @@ -171,7 +208,8 @@ def createPileupConfigFile(self, helper):
"""
Stores pileup JSON configuration file in the working
directory / sandbox.
:param helper: WMStepHelper instance
:return: None
"""
if self._isCacheValid(helper):
# if file already exist don't make a new dbs call and overwrite the file.
Expand Down

0 comments on commit f1649e8

Please sign in to comment.