Skip to content

Commit

Permalink
Use specific vstring to pass selected accelerators instead of overwri…
Browse files Browse the repository at this point in the history
…ting options.accelerators

Also add unit tests for two ProcessAccelerator objects in the Process
  • Loading branch information
makortel committed Feb 21, 2022
1 parent fc821ee commit 5caf7b8
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 28 deletions.
13 changes: 10 additions & 3 deletions FWCore/Framework/src/ensureAvailableAccelerators.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,24 @@

namespace edm {
void ensureAvailableAccelerators(edm::ParameterSet const& parameterSet) {
auto const& selectedAccelerators =
parameterSet.getUntrackedParameter<std::vector<std::string>>("@selected_accelerators");
ParameterSet const& optionsPset(parameterSet.getUntrackedParameterSet("options"));
auto accelerators = optionsPset.getUntrackedParameter<std::vector<std::string>>("accelerators");
if (accelerators.empty()) {
if (selectedAccelerators.empty()) {
Exception ex(errors::UnavailableAccelerator);
ex << "The system has no compute accelerators that match the patterns specified in "
"process.options.accelerators.\nThe following compute accelerators are available:\n";
"process.options.accelerators:\n";
auto const& patterns = optionsPset.getUntrackedParameter<std::vector<std::string>>("accelerators");
for (auto const& pat : patterns) {
ex << " " << pat << "\n";
}
ex << "\nThe following compute accelerators are available:\n";
auto const& availableAccelerators =
parameterSet.getUntrackedParameter<std::vector<std::string>>("@available_accelerators");
for (auto const& acc : availableAccelerators) {
ex << " " << acc << "\n";
}

throw ex;
}
}
Expand Down
95 changes: 72 additions & 23 deletions FWCore/ParameterSet/python/Config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1159,9 +1159,10 @@ def _insertManyInto(self, parameterSet, label, itemDict, tracked):
def _insertSwitchProducersInto(self, parameterSet, labelModules, labelAliases, itemDict, tracked):
modules = parameterSet.getVString(tracked, labelModules)
aliases = parameterSet.getVString(tracked, labelAliases)
accelerators = parameterSet.getVString(False, "@selected_accelerators")
for name,value in itemDict.items():
value.appendToProcessDescLists_(modules, aliases, name)
value.insertInto(parameterSet, name, self.options.accelerators)
value.insertInto(parameterSet, name, accelerators)
modules.sort()
aliases.sort()
parameterSet.addVString(tracked, labelModules, modules)
Expand Down Expand Up @@ -1443,20 +1444,21 @@ def validate(self):
pass

def handleProcessAccelerators(self, parameterSet):
allAccelerators = set(label for acc in self.__dict__['_Process__accelerators'].values() for label in acc.labels())
# 'cpu' accelerator is always implicitly there
allAccelerators = set(["cpu"])
availableAccelerators = set(["cpu"])
for acc in self.__dict__['_Process__accelerators'].values():
for l in acc.enabledLabels():
availableAccelerators.add(l)
allAccelerators.update(acc.labels())
availableAccelerators.update(acc.enabledLabels())
availableAccelerators = sorted(list(availableAccelerators))
parameterSet.addVString(False, "@available_accelerators", availableAccelerators)

# Resolve wildcards
selectedAccelerators = []
if "*" in self.options.accelerators:
if len(self.options.accelerators) >= 2:
raise ValueError("process.options.accelerators may contain '*' only as the only element, now it has {} elements".format(len(self.options.accelerators)))
self.options.accelerators = availableAccelerators
selectedAccelerators = availableAccelerators
else:
import fnmatch
resolved = set()
Expand All @@ -1474,11 +1476,12 @@ def handleProcessAccelerators(self, parameterSet):
"s" if len(invalid) > 2 else "",
",".join(invalid),
",".join(sorted(list(allAccelerators)))))
self.options.accelerators = sorted(list(resolved))
selectedAccelerators = sorted(list(resolved))
parameterSet.addVString(False, "@selected_accelerators", selectedAccelerators)

# Customize
for acc in self.__dict__['_Process__accelerators'].values():
acc.apply(self)
acc.apply(self, selectedAccelerators)

def prefer(self, esmodule,*args,**kargs):
"""Prefer this ES source or producer. The argument can
Expand Down Expand Up @@ -1912,13 +1915,14 @@ def enabledLabels(self):
"""Override to return a list of strings for the accelerator labels
that are enabled in the system the job is being run on."""
return []
def apply(self, process):
def apply(self, process, accelerators):
"""Override if need to customize the Process at worker node. The
available accelerator labels are available via
'process.options.accelerators' (the patterns, e.g. '*' have been expanded
to concrete labels at this point).
selected available accelerator labels are given in the
'accelerators' argument (the patterns, e.g. '*' have been
expanded to concrete labels).
This function may touch only untracked parameters."""
This function may touch only untracked parameters.
"""
pass

# Need to be a module-level function for the configuration with a
Expand Down Expand Up @@ -2056,11 +2060,33 @@ def labels(self):
return self._labels
def enabledLabels(self):
return self._enabled
def apply(self, process):
def apply(self, process, accelerators):
process.acceleratorTestProducer = EDProducer("AcceleratorTestProducer")
process.acceleratorTestPath = Path(process.acceleratorTestProducer)
specialImportRegistry.registerSpecialImportForType(ProcessAcceleratorTest, "from test import ProcessAcceleratorTest")

class ProcessAcceleratorTest2(ProcessAccelerator):
def __init__(self, enabled=["anothertest3", "anothertest4"]):
super(ProcessAcceleratorTest2,self).__init__()
self._labels = ["anothertest3", "anothertest4"]
self.setEnabled(enabled)
def setEnabled(self, enabled):
invalid = set(enabled).difference(set(self._labels))
if len(invalid) > 0:
raise Exception("Tried to enabled nonexistent test accelerators {}".format(",".join(invalid)))
self._enabled = enabled[:]
def dumpPythonImpl(self,options):
result = "{}enabled = [{}]".format(options.indentation(),
", ".join(["'{}'".format(e) for e in self._enabled]))
return result
def labels(self):
return self._labels
def enabledLabels(self):
return self._enabled
def apply(self, process, accelerators):
pass
specialImportRegistry.registerSpecialImportForType(ProcessAcceleratorTest2, "from test import ProcessAcceleratorTest2")

class TestModuleCommand(unittest.TestCase):
def setUp(self):
"""Nothing to do """
Expand Down Expand Up @@ -3922,10 +3948,9 @@ def testProcessAccelerator(self):
proc = Process("TEST")
p = TestMakePSet()
proc.fillProcessDesc(p)
self.assertFalse(p.values["options"][1].values["accelerators"][0])
self.assertTrue(["cpu"], p.values["options"][1].values["accelerators"][1])
self.assertFalse(p.values["@available_accelerators"][0])
self.assertTrue(["cpu"], p.values["@available_accelerators"][1])
self.assertFalse(p.values["@selected_accelerators"][0])
self.assertTrue(["cpu"], p.values["@selected_accelerators"][1])

proc = Process("TEST")
self.assertRaises(TypeError, setattr, proc, "processAcceleratorTest", ProcessAcceleratorTest())
Expand Down Expand Up @@ -3984,8 +4009,9 @@ def testProcessAccelerator(self):
""")
p = TestMakePSet()
proc.fillProcessDesc(p)
self.assertEqual(["*"], p.values["options"][1].values["accelerators"][1])
self.assertFalse(p.values["options"][1].values["accelerators"][0])
self.assertTrue(["anothertest3", "cpu", "test1", "test2"], p.values["options"][1].values["accelerators"][1])
self.assertTrue(["anothertest3", "cpu", "test1", "test2"], p.values["@selected_accelerators"][1])
self.assertEqual((True, "AcceleratorTestProducer"), p.values["acceleratorTestProducer"][1].values["@module_type"])
self.assertFalse(p.values["@available_accelerators"][0])
self.assertTrue(["anothertest3", "cpu", "test1", "test2"], p.values["@available_accelerators"][1])
Expand All @@ -3994,34 +4020,40 @@ def testProcessAccelerator(self):
proc.ProcessAcceleratorTest = ProcessAcceleratorTest(enabled=["test1"])
p = TestMakePSet()
proc.fillProcessDesc(p)
self.assertEqual(["cpu", "test1"], p.values["options"][1].values["accelerators"][1])
self.assertEqual(["cpu", "test1"], p.values["@selected_accelerators"][1])
self.assertEqual(["cpu", "test1"], p.values["@available_accelerators"][1])

proc = Process("TEST")
proc.ProcessAcceleratorTest = ProcessAcceleratorTest()
proc.options.accelerators = ["test2"]
p = TestMakePSet()
proc.fillProcessDesc(p)
self.assertEqual(["test2"], p.values["options"][1].values["accelerators"][1])
availableAccelerators = p.values["@available_accelerators"][1]
self.assertEqual(["test2"], p.values["@selected_accelerators"][1])
self.assertEqual(["anothertest3", "cpu", "test1", "test2"], p.values["@available_accelerators"][1])

proc = Process("TEST")
proc.ProcessAcceleratorTest = ProcessAcceleratorTest()
proc.options.accelerators = ["test*"]
proc.fillProcessDesc(p)
accelerators = p.values["options"][1].values["accelerators"][1]
self.assertEqual(["test1", "test2"], p.values["options"][1].values["accelerators"][1])
self.assertEqual(["test1", "test2"], p.values["@selected_accelerators"][1])
self.assertEqual(["anothertest3", "cpu", "test1", "test2"], p.values["@available_accelerators"][1])

proc = Process("TEST")
proc.ProcessAcceleratorTest = ProcessAcceleratorTest(enabled=["test1"])
proc.options.accelerators = ["test2"]
p = TestMakePSet()
proc.fillProcessDesc(p)
self.assertEqual([], p.values["options"][1].values["accelerators"][1])
self.assertEqual([], p.values["@selected_accelerators"][1])
self.assertEqual(["cpu", "test1"], p.values["@available_accelerators"][1])

proc = Process("TEST")
proc.ProcessAcceleratorTest = ProcessAcceleratorTest()
proc.options.accelerators = ["cpu*"]
p = TestMakePSet()
proc.fillProcessDesc(p)
self.assertEqual(["cpu"], p.values["@selected_accelerators"][1])
self.assertEqual(["anothertest3", "cpu", "test1", "test2"], p.values["@available_accelerators"][1])

proc = Process("TEST")
proc.ProcessAcceleratorTest = ProcessAcceleratorTest()
proc.options.accelerators = ["test3"]
Expand All @@ -4034,6 +4066,23 @@ def testProcessAccelerator(self):
p = TestMakePSet()
self.assertRaises(ValueError, proc.fillProcessDesc, p)

proc = Process("TEST")
proc.ProcessAcceleratorTest = ProcessAcceleratorTest()
proc.ProcessAcceleratorTest2 = ProcessAcceleratorTest2()
p = TestMakePSet()
proc.fillProcessDesc(p)
self.assertEqual(["anothertest3", "anothertest4", "cpu", "test1", "test2"], p.values["@selected_accelerators"][1])
self.assertEqual(["anothertest3", "anothertest4", "cpu", "test1", "test2"], p.values["@available_accelerators"][1])

proc = Process("TEST")
proc.ProcessAcceleratorTest = ProcessAcceleratorTest()
proc.ProcessAcceleratorTest2 = ProcessAcceleratorTest2()
proc.options.accelerators = ["*test3", "c*"]
p = TestMakePSet()
proc.fillProcessDesc(p)
self.assertEqual(["anothertest3", "cpu"], p.values["@selected_accelerators"][1])
self.assertEqual(["anothertest3", "anothertest4", "cpu", "test1", "test2"], p.values["@available_accelerators"][1])

proc = Process("TEST")
proc.ProcessAcceleratorTest = ProcessAcceleratorTest()
proc.sp = SwitchProducerTest2(test2 = EDProducer("Foo",
Expand Down
4 changes: 2 additions & 2 deletions HeterogeneousCore/CUDACore/python/ProcessAcceleratorCUDA.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ def enabledLabels(self):
return self.labels()
else:
return []
def apply(self, process):
def apply(self, process, accelerators):
if not hasattr(process, "CUDAService"):
process.load("HeterogeneousCore.CUDAServices.CUDAService_cfi")

if self._label in process.options.accelerators:
if self._label in accelerators:
process.CUDAService.enabled = True
process.MessageLogger.CUDAService = cms.untracked.PSet()
else:
Expand Down

0 comments on commit 5caf7b8

Please sign in to comment.