Skip to content

Commit

Permalink
Flux update (#359)
Browse files Browse the repository at this point in the history
* flux0.26: add version checking during connection

Problem: there are several versions of the Flux adaptor and it is very
easy to use too new of an adaptor with too old of a Flux version (and
vice-versa)

Solution: check both the adaptor version and the Flux broker version
when making a connection via a new Flux handle and ensure the versions
match.  If the adaptor version is newer, log an error.  If the broker
version is newer, log a debug message letting the user know they might
benefit from choosing a newer adaptor.

* flux0.26: handle the case where the ngpus argument is a string

Problem: the `ngpus` argument to `submit` (as well as `parallelize`) can
sometimes be a string (e.g., when it isn't the default of 0). This
causes validation errors in the Flux jobspec class, which does runtime
type checking of the value

Solution: explicitly check if `ngpus` is a string, specifically a digit,
and if so, convert it to an `int` before passing to Flux

* flux0.26: fix number of slots for nested flux launches

Problem: when launching with multiple tasks in a nested Flux
instance (i.e., `force broker = True`), then the number of slots for the
nested instance should be set as the number of tasks not the number of
nodes

* flux: make `force_broker` the default action

Problem: when creating maestro specs that need to be portable across
schedulers, it is assumed that a `${LAUNCHER}` call is needed in many
scripts, but when `force_broker` is `False`, this results in a doubling
up of the parallelism.  The call to the `submit` function within the
Flux adaptor launches multiple processes and then then `${LAUNCHER}`
call within the script gets expanded by the `parallelize` function to
also launch multiple processes.

Solution: make `force_broker` the default behavior to maximize
compatibilty with other schedulers.  If users want to avoid the extra
overhead of a nested Flux instance, they can always opt out of the
default behavior with `force_broker = False`, and then elide their use
of the `${LAUNCHER}` variable in the script.

* Moved integer check to FluxScriptAdapter main class.

* Move the version check to FluxInterface base class.

* Update prior interfaces to use connect_to_flux method.

* Fix for an empty gpu string in a specification.

* Removal of gpu conversion in 0.26.0 Flux backend.

Co-authored-by: Francesco Di Natale <[email protected]>
  • Loading branch information
SteVwonder and FrankD412 committed May 28, 2022
1 parent 83e703c commit 52da01d
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 37 deletions.
41 changes: 41 additions & 0 deletions maestrowf/abstracts/interfaces/flux.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,50 @@
from abc import ABC, abstractclassmethod, abstractmethod, \
abstractstaticmethod
import logging

LOGGER = logging.getLogger(__name__)

try:
import flux
except ImportError:
LOGGER.info("Failed to import Flux. Continuing.")


class FluxInterface(ABC):

@classmethod
def connect_to_flux(cls):
if not cls.flux_handle:
cls.flux_handle = flux.Flux()
LOGGER.debug("New Flux handle created.")
broker_version = cls.flux_handle.attr_get("version")
adaptor_version = cls.key
LOGGER.debug(
"Connected to Flux broker running version %s using Maestro "
"adapter version %s.", broker_version, adaptor_version)
try:
from distutils.version import StrictVersion
adaptor_version = StrictVersion(adaptor_version)
broker_version = StrictVersion(broker_version)
if adaptor_version > broker_version:
LOGGER.error(
"Maestro adapter version (%s) is too new for the Flux "
"broker version (%s). Functionality not present in "
"this Flux version may be required by the adapter and "
"cause errors. Please switch to an older adapter.",
adaptor_version, broker_version
)
elif adaptor_version < broker_version:
LOGGER.debug(
"Maestro adaptor version (%s) is older than the Flux "
"broker version (%s). This is usually OK, but if a "
"newer Maestro adapter is available, please consider "
"upgrading to maximize performance and compatibility.",
adaptor_version, broker_version
)
except ImportError:
pass

@abstractclassmethod
def get_statuses(cls, joblist):
"""
Expand Down
12 changes: 3 additions & 9 deletions maestrowf/interfaces/script/_flux/flux0_17_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ def submit(
cls, nodes, procs, cores_per_task, path, cwd, walltime,
ngpus=0, job_name=None, force_broker=False
):
if not cls.flux_handle:
cls.flux_handle = flux.Flux()
LOGGER.debug("New Flux instance created.")
cls.connect_to_flux()

# NOTE: This previously placed everything under a broker. However,
# if there's a job that schedules items to Flux, it will schedule all
Expand Down Expand Up @@ -160,9 +158,7 @@ def status_callback(future, args):
def get_statuses(cls, joblist):
# We need to import flux here, as it may not be installed on
# all systems.
if not cls.flux_handle:
cls.flux_handle = flux.Flux()
LOGGER.debug("New Flux instance created.")
cls.connect_to_flux()

LOGGER.debug(
"Handle address -- %s", hex(id(cls.flux_handle)))
Expand Down Expand Up @@ -245,9 +241,7 @@ def cancel(cls, joblist):
"""
# We need to import flux here, as it may not be installed on
# all systems.
if not cls.flux_handle:
cls.flux_handle = flux.Flux()
LOGGER.debug("New Flux instance created.")
cls.connect_to_flux()

LOGGER.debug(
"Handle address -- %s", hex(id(cls.flux_handle)))
Expand Down
13 changes: 3 additions & 10 deletions maestrowf/interfaces/script/_flux/flux0_18_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
try:
from flux import constants as flux_constants
from flux import job as flux_job
from flux import Flux
except ImportError:
LOGGER.info("Failed to import Flux. Continuing.")

Expand Down Expand Up @@ -64,9 +63,7 @@ def submit(
cls, nodes, procs, cores_per_task, path, cwd, walltime,
ngpus=0, job_name=None, force_broker=False
):
if not cls.flux_handle:
cls.flux_handle = Flux()
LOGGER.debug("New Flux instance created.")
cls.connect_to_flux()

# NOTE: This previously placed everything under a broker. However,
# if there's a job that schedules items to Flux, it will schedule all
Expand Down Expand Up @@ -174,9 +171,7 @@ def status_callback(future, args):
def get_statuses(cls, joblist):
# We need to import flux here, as it may not be installed on
# all systems.
if not cls.flux_handle:
cls.flux_handle = Flux()
LOGGER.debug("New Flux instance created.")
cls.connect_to_flux()

LOGGER.debug(
"Handle address -- %s", hex(id(cls.flux_handle)))
Expand Down Expand Up @@ -257,9 +252,7 @@ def cancel(cls, joblist):
"""
# We need to import flux here, as it may not be installed on
# all systems.
if not cls.flux_handle:
cls.flux_handle = Flux()
LOGGER.debug("New Flux instance created.")
cls.connect_to_flux()

LOGGER.debug(
"Handle address -- %s", hex(id(cls.flux_handle)))
Expand Down
21 changes: 7 additions & 14 deletions maestrowf/interfaces/script/_flux/flux0_26_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@ class FluxInterface_0260(FluxInterface):
@classmethod
def submit(
cls, nodes, procs, cores_per_task, path, cwd, walltime,
ngpus=0, job_name=None, force_broker=False
ngpus=0, job_name=None, force_broker=True
):
if not cls.flux_handle:
cls.flux_handle = flux.Flux()
LOGGER.debug("New Flux instance created.")
cls.connect_to_flux()

# NOTE: This previously placed everything under a broker. However,
# if there's a job that schedules items to Flux, it will schedule all
Expand All @@ -45,7 +43,7 @@ def submit(
ngpus_per_slot = int(ceil(ngpus / nodes))
jobspec = flux.job.JobspecV1.from_nest_command(
[path], num_nodes=nodes, cores_per_slot=cores_per_task,
num_slots=nodes, gpus_per_slot=ngpus_per_slot)
num_slots=procs, gpus_per_slot=ngpus_per_slot)
else:
LOGGER.debug(
"Launch under root Flux broker. [force_broker=%s, nodes=%d]",
Expand Down Expand Up @@ -102,9 +100,8 @@ def parallelize(cls, procs, nodes=None, **kwargs):

if "gpus" in kwargs:
ngpus = str(kwargs["gpus"])
if ngpus.isdecimal():
args.append("-g")
args.append(ngpus)
args.append("-g")
args.append(ngpus)

# flux has additional arguments that can be passed via the '-o' flag.
addtl = []
Expand All @@ -122,9 +119,7 @@ def parallelize(cls, procs, nodes=None, **kwargs):
def get_statuses(cls, joblist):
# We need to import flux here, as it may not be installed on
# all systems.
if not cls.flux_handle:
cls.flux_handle = flux.Flux()
LOGGER.debug("New Flux instance created.")
cls.connect_to_flux()

LOGGER.debug(
"Handle address -- %s", hex(id(cls.flux_handle)))
Expand Down Expand Up @@ -157,9 +152,7 @@ def cancel(cls, joblist):
"""
# We need to import flux here, as it may not be installed on
# all systems.
if not cls.flux_handle:
cls.flux_handle = flux.Flux()
LOGGER.debug("New Flux instance created.")
cls.connect_to_flux()

LOGGER.debug(
"Handle address -- %s", hex(id(cls.flux_handle)))
Expand Down
13 changes: 9 additions & 4 deletions maestrowf/interfaces/script/fluxscriptadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def submit(self, step, path, cwd, job_map=None, env=None):
# walltime = self._convert_walltime_to_seconds(step.run["walltime"])
nodes = step.run.get("nodes")
processors = step.run.get("procs", 0)
force_broker = step.run.get("use_broker", False)
force_broker = step.run.get("use_broker", True)
walltime = step.run.get("walltime", "inf")

# Compute cores per task
Expand All @@ -183,9 +183,14 @@ def submit(self, step, path, cwd, job_map=None, env=None):
"'cores per task' set to a non-value. Populating with a "
"sensible default. (cores per task = %d", cores_per_task)

# Calculate ngpus
ngpus = step.run.get("gpus", 0)
ngpus = 0 if not ngpus else ngpus
try:
# Calculate ngpus
ngpus = step.run.get("gpus", "0")
ngpus = int(ngpus) if ngpus else 0
except ValueError as val_error:
msg = f"Specified gpus '{ngpus}' is not a decimal value."
LOGGER.error(msg)
raise val_error

# Calculate nprocs
ncores = cores_per_task * nodes
Expand Down

0 comments on commit 52da01d

Please sign in to comment.