From 52da01dd7857d82a05b1c31cb29d542205f96331 Mon Sep 17 00:00:00 2001 From: Stephen Herbein Date: Mon, 24 May 2021 20:38:14 -0700 Subject: [PATCH] Flux update (#359) * flux0.26: add version checking during connection Problem: there are several versions of the Flux adaptor and it is very easy to use too new of an adaptor with too old of a Flux version (and vice-versa) Solution: check both the adaptor version and the Flux broker version when making a connection via a new Flux handle and ensure the versions match. If the adaptor version is newer, log an error. If the broker version is newer, log a debug message letting the user know they might benefit from choosing a newer adaptor. * flux0.26: handle the case where the ngpus argument is a string Problem: the `ngpus` argument to `submit` (as well as `parallelize`) can sometimes be a string (e.g., when it isn't the default of 0). This causes validation errors in the Flux jobspec class, which does runtime type checking of the value Solution: explicitly check if `ngpus` is a string, specifically a digit, and if so, convert it to an `int` before passing to Flux * flux0.26: fix number of slots for nested flux launches Problem: when launching with multiple tasks in a nested Flux instance (i.e., `force broker = True`), then the number of slots for the nested instance should be set as the number of tasks not the number of nodes * flux: make `force_broker` the default action Problem: when creating maestro specs that need to be portable across schedulers, it is assumed that a `${LAUNCHER}` call is needed in many scripts, but when `force_broker` is `False`, this results in a doubling up of the parallelism. The call to the `submit` function within the Flux adaptor launches multiple processes and then then `${LAUNCHER}` call within the script gets expanded by the `parallelize` function to also launch multiple processes. Solution: make `force_broker` the default behavior to maximize compatibilty with other schedulers. If users want to avoid the extra overhead of a nested Flux instance, they can always opt out of the default behavior with `force_broker = False`, and then elide their use of the `${LAUNCHER}` variable in the script. * Moved integer check to FluxScriptAdapter main class. * Move the version check to FluxInterface base class. * Update prior interfaces to use connect_to_flux method. * Fix for an empty gpu string in a specification. * Removal of gpu conversion in 0.26.0 Flux backend. Co-authored-by: Francesco Di Natale --- maestrowf/abstracts/interfaces/flux.py | 41 +++++++++++++++++++ .../interfaces/script/_flux/flux0_17_0.py | 12 ++---- .../interfaces/script/_flux/flux0_18_0.py | 13 ++---- .../interfaces/script/_flux/flux0_26_0.py | 21 ++++------ .../interfaces/script/fluxscriptadapter.py | 13 ++++-- 5 files changed, 63 insertions(+), 37 deletions(-) diff --git a/maestrowf/abstracts/interfaces/flux.py b/maestrowf/abstracts/interfaces/flux.py index 77ab2248c..a9ab59455 100644 --- a/maestrowf/abstracts/interfaces/flux.py +++ b/maestrowf/abstracts/interfaces/flux.py @@ -1,9 +1,50 @@ from abc import ABC, abstractclassmethod, abstractmethod, \ abstractstaticmethod +import logging + +LOGGER = logging.getLogger(__name__) + +try: + import flux +except ImportError: + LOGGER.info("Failed to import Flux. Continuing.") class FluxInterface(ABC): + @classmethod + def connect_to_flux(cls): + if not cls.flux_handle: + cls.flux_handle = flux.Flux() + LOGGER.debug("New Flux handle created.") + broker_version = cls.flux_handle.attr_get("version") + adaptor_version = cls.key + LOGGER.debug( + "Connected to Flux broker running version %s using Maestro " + "adapter version %s.", broker_version, adaptor_version) + try: + from distutils.version import StrictVersion + adaptor_version = StrictVersion(adaptor_version) + broker_version = StrictVersion(broker_version) + if adaptor_version > broker_version: + LOGGER.error( + "Maestro adapter version (%s) is too new for the Flux " + "broker version (%s). Functionality not present in " + "this Flux version may be required by the adapter and " + "cause errors. Please switch to an older adapter.", + adaptor_version, broker_version + ) + elif adaptor_version < broker_version: + LOGGER.debug( + "Maestro adaptor version (%s) is older than the Flux " + "broker version (%s). This is usually OK, but if a " + "newer Maestro adapter is available, please consider " + "upgrading to maximize performance and compatibility.", + adaptor_version, broker_version + ) + except ImportError: + pass + @abstractclassmethod def get_statuses(cls, joblist): """ diff --git a/maestrowf/interfaces/script/_flux/flux0_17_0.py b/maestrowf/interfaces/script/_flux/flux0_17_0.py index 1b3fc4422..3ccf36cb0 100644 --- a/maestrowf/interfaces/script/_flux/flux0_17_0.py +++ b/maestrowf/interfaces/script/_flux/flux0_17_0.py @@ -60,9 +60,7 @@ def submit( cls, nodes, procs, cores_per_task, path, cwd, walltime, ngpus=0, job_name=None, force_broker=False ): - if not cls.flux_handle: - cls.flux_handle = flux.Flux() - LOGGER.debug("New Flux instance created.") + cls.connect_to_flux() # NOTE: This previously placed everything under a broker. However, # if there's a job that schedules items to Flux, it will schedule all @@ -160,9 +158,7 @@ def status_callback(future, args): def get_statuses(cls, joblist): # We need to import flux here, as it may not be installed on # all systems. - if not cls.flux_handle: - cls.flux_handle = flux.Flux() - LOGGER.debug("New Flux instance created.") + cls.connect_to_flux() LOGGER.debug( "Handle address -- %s", hex(id(cls.flux_handle))) @@ -245,9 +241,7 @@ def cancel(cls, joblist): """ # We need to import flux here, as it may not be installed on # all systems. - if not cls.flux_handle: - cls.flux_handle = flux.Flux() - LOGGER.debug("New Flux instance created.") + cls.connect_to_flux() LOGGER.debug( "Handle address -- %s", hex(id(cls.flux_handle))) diff --git a/maestrowf/interfaces/script/_flux/flux0_18_0.py b/maestrowf/interfaces/script/_flux/flux0_18_0.py index 91b074273..61c6d0448 100644 --- a/maestrowf/interfaces/script/_flux/flux0_18_0.py +++ b/maestrowf/interfaces/script/_flux/flux0_18_0.py @@ -13,7 +13,6 @@ try: from flux import constants as flux_constants from flux import job as flux_job - from flux import Flux except ImportError: LOGGER.info("Failed to import Flux. Continuing.") @@ -64,9 +63,7 @@ def submit( cls, nodes, procs, cores_per_task, path, cwd, walltime, ngpus=0, job_name=None, force_broker=False ): - if not cls.flux_handle: - cls.flux_handle = Flux() - LOGGER.debug("New Flux instance created.") + cls.connect_to_flux() # NOTE: This previously placed everything under a broker. However, # if there's a job that schedules items to Flux, it will schedule all @@ -174,9 +171,7 @@ def status_callback(future, args): def get_statuses(cls, joblist): # We need to import flux here, as it may not be installed on # all systems. - if not cls.flux_handle: - cls.flux_handle = Flux() - LOGGER.debug("New Flux instance created.") + cls.connect_to_flux() LOGGER.debug( "Handle address -- %s", hex(id(cls.flux_handle))) @@ -257,9 +252,7 @@ def cancel(cls, joblist): """ # We need to import flux here, as it may not be installed on # all systems. - if not cls.flux_handle: - cls.flux_handle = Flux() - LOGGER.debug("New Flux instance created.") + cls.connect_to_flux() LOGGER.debug( "Handle address -- %s", hex(id(cls.flux_handle))) diff --git a/maestrowf/interfaces/script/_flux/flux0_26_0.py b/maestrowf/interfaces/script/_flux/flux0_26_0.py index 4c4f658b9..322d265cb 100644 --- a/maestrowf/interfaces/script/_flux/flux0_26_0.py +++ b/maestrowf/interfaces/script/_flux/flux0_26_0.py @@ -24,11 +24,9 @@ class FluxInterface_0260(FluxInterface): @classmethod def submit( cls, nodes, procs, cores_per_task, path, cwd, walltime, - ngpus=0, job_name=None, force_broker=False + ngpus=0, job_name=None, force_broker=True ): - if not cls.flux_handle: - cls.flux_handle = flux.Flux() - LOGGER.debug("New Flux instance created.") + cls.connect_to_flux() # NOTE: This previously placed everything under a broker. However, # if there's a job that schedules items to Flux, it will schedule all @@ -45,7 +43,7 @@ def submit( ngpus_per_slot = int(ceil(ngpus / nodes)) jobspec = flux.job.JobspecV1.from_nest_command( [path], num_nodes=nodes, cores_per_slot=cores_per_task, - num_slots=nodes, gpus_per_slot=ngpus_per_slot) + num_slots=procs, gpus_per_slot=ngpus_per_slot) else: LOGGER.debug( "Launch under root Flux broker. [force_broker=%s, nodes=%d]", @@ -102,9 +100,8 @@ def parallelize(cls, procs, nodes=None, **kwargs): if "gpus" in kwargs: ngpus = str(kwargs["gpus"]) - if ngpus.isdecimal(): - args.append("-g") - args.append(ngpus) + args.append("-g") + args.append(ngpus) # flux has additional arguments that can be passed via the '-o' flag. addtl = [] @@ -122,9 +119,7 @@ def parallelize(cls, procs, nodes=None, **kwargs): def get_statuses(cls, joblist): # We need to import flux here, as it may not be installed on # all systems. - if not cls.flux_handle: - cls.flux_handle = flux.Flux() - LOGGER.debug("New Flux instance created.") + cls.connect_to_flux() LOGGER.debug( "Handle address -- %s", hex(id(cls.flux_handle))) @@ -157,9 +152,7 @@ def cancel(cls, joblist): """ # We need to import flux here, as it may not be installed on # all systems. - if not cls.flux_handle: - cls.flux_handle = flux.Flux() - LOGGER.debug("New Flux instance created.") + cls.connect_to_flux() LOGGER.debug( "Handle address -- %s", hex(id(cls.flux_handle))) diff --git a/maestrowf/interfaces/script/fluxscriptadapter.py b/maestrowf/interfaces/script/fluxscriptadapter.py index 2cadc8478..7f07c13cc 100644 --- a/maestrowf/interfaces/script/fluxscriptadapter.py +++ b/maestrowf/interfaces/script/fluxscriptadapter.py @@ -172,7 +172,7 @@ def submit(self, step, path, cwd, job_map=None, env=None): # walltime = self._convert_walltime_to_seconds(step.run["walltime"]) nodes = step.run.get("nodes") processors = step.run.get("procs", 0) - force_broker = step.run.get("use_broker", False) + force_broker = step.run.get("use_broker", True) walltime = step.run.get("walltime", "inf") # Compute cores per task @@ -183,9 +183,14 @@ def submit(self, step, path, cwd, job_map=None, env=None): "'cores per task' set to a non-value. Populating with a " "sensible default. (cores per task = %d", cores_per_task) - # Calculate ngpus - ngpus = step.run.get("gpus", 0) - ngpus = 0 if not ngpus else ngpus + try: + # Calculate ngpus + ngpus = step.run.get("gpus", "0") + ngpus = int(ngpus) if ngpus else 0 + except ValueError as val_error: + msg = f"Specified gpus '{ngpus}' is not a decimal value." + LOGGER.error(msg) + raise val_error # Calculate nprocs ncores = cores_per_task * nodes