diff --git a/docker/flux/0.26.0/Dockerfile b/docker/flux/0.26.0/Dockerfile new file mode 100644 index 000000000..6a2dfd89d --- /dev/null +++ b/docker/flux/0.26.0/Dockerfile @@ -0,0 +1,19 @@ +FROM fluxrm/flux-core:centos7-v0.26.0 +ENV PATH="/home/fluxuser/.local/bin:$PATH" + +RUN curl -sL https://download.open-mpi.org/release/open-mpi/v4.0/openmpi-4.0.5.tar.gz | tar xz && \ + cd ./openmpi-4.0.5 && \ + ./configure && \ + make && \ + sudo make install + +COPY . /home/fluxuser/maestrowf +WORKDIR /home/fluxuser/maestrowf +RUN echo `which python3` && python3 --version +RUN echo `which pip3` && pip3 --version +RUN pip3 install -U --user pip +RUN echo "$PWD" && ls -la && pip3 install --user . +RUN pip3 install -U ipython +WORKDIR /home/fluxuser + +LABEL maintainer="Francesco Di Natale dinatale3@llnl.gov" diff --git a/maestrowf/interfaces/script/__init__.py b/maestrowf/interfaces/script/__init__.py index b141fa307..cabde60f5 100644 --- a/maestrowf/interfaces/script/__init__.py +++ b/maestrowf/interfaces/script/__init__.py @@ -147,7 +147,7 @@ def lookup_status(self, cancel_status): class FluxFactory(object): """A factory for swapping out Flux's backend interface based on version.""" - latest = "0.17.0" + latest = "0.26.0" def _iter_flux(): """ diff --git a/maestrowf/interfaces/script/_flux/flux0_18_0.py b/maestrowf/interfaces/script/_flux/flux0_18_0.py index b9999397e..91b074273 100644 --- a/maestrowf/interfaces/script/_flux/flux0_18_0.py +++ b/maestrowf/interfaces/script/_flux/flux0_18_0.py @@ -19,7 +19,7 @@ class FluxInterface_0190(FluxInterface): - # This utility class is for Flux 0.17.0 + # This utility class is for Flux 0.19.0 key = "0.19.0" _FIELDATTRS = { diff --git a/maestrowf/interfaces/script/_flux/flux0_26_0.py b/maestrowf/interfaces/script/_flux/flux0_26_0.py new file mode 100644 index 000000000..4c4f658b9 --- /dev/null +++ b/maestrowf/interfaces/script/_flux/flux0_26_0.py @@ -0,0 +1,199 @@ +from datetime import datetime +import logging +from math import ceil +import os + +from maestrowf.abstracts.enums import CancelCode, JobStatusCode, State, \ + SubmissionCode +from maestrowf.abstracts.interfaces.flux import FluxInterface + +LOGGER = logging.getLogger(__name__) + +try: + import flux +except ImportError: + LOGGER.info("Failed to import Flux. Continuing.") + + +class FluxInterface_0260(FluxInterface): + # This utility class is for Flux 0.26.0 + key = "0.26.0" + + flux_handle = None + + @classmethod + def submit( + cls, nodes, procs, cores_per_task, path, cwd, walltime, + ngpus=0, job_name=None, force_broker=False + ): + if not cls.flux_handle: + cls.flux_handle = flux.Flux() + LOGGER.debug("New Flux instance created.") + + # NOTE: This previously placed everything under a broker. However, + # if there's a job that schedules items to Flux, it will schedule all + # new jobs to the sub-broker. Sometimes this is desired, but it's + # incorrect to make that the general case. If we are asking for a + # single node, don't use a broker -- but introduce a flag that can + # force a single node to run in a broker. + + if force_broker or nodes > 1: + LOGGER.debug( + "Launch under Flux sub-broker. [force_broker=%s, nodes=%d]", + force_broker, nodes + ) + ngpus_per_slot = int(ceil(ngpus / nodes)) + jobspec = flux.job.JobspecV1.from_nest_command( + [path], num_nodes=nodes, cores_per_slot=cores_per_task, + num_slots=nodes, gpus_per_slot=ngpus_per_slot) + else: + LOGGER.debug( + "Launch under root Flux broker. [force_broker=%s, nodes=%d]", + force_broker, nodes + ) + jobspec = flux.job.JobspecV1.from_command( + [path], num_tasks=procs, num_nodes=nodes, + cores_per_task=cores_per_task, gpus_per_task=ngpus) + + LOGGER.debug("Handle address -- %s", hex(id(cls.flux_handle))) + if job_name: + jobspec.setattr("system.job.name", job_name) + jobspec.cwd = cwd + jobspec.environment = dict(os.environ) + + if walltime and walltime != "inf": + seconds = datetime.strptime(walltime, "%H:%M:%S") + seconds = seconds - datetime(1900, 1, 1) + jobspec.duration = seconds + + jobspec.stdout = f"{job_name}.{{{{id}}}}.out" + jobspec.stderr = f"{job_name}.{{{{id}}}}.err" + + try: + # Submit our job spec. + jobid = \ + flux.job.submit(cls.flux_handle, jobspec, waitable=True) + submit_status = SubmissionCode.OK + retcode = 0 + + LOGGER.info("Submission returned status OK. -- " + "Assigned identifier (%s)", jobid) + except Exception as exception: + LOGGER.error( + "Submission failed -- Message (%s).", exception) + jobid = -1 + retcode = -1 + submit_status = SubmissionCode.ERROR + + return jobid, retcode, submit_status + + @classmethod + def parallelize(cls, procs, nodes=None, **kwargs): + args = ["flux", "mini", "run", "-n", str(procs)] + + # if we've specified nodes, add that to wreckrun + ntasks = nodes if nodes else 1 + args.append("-N") + args.append(str(ntasks)) + + if "cores per task" in kwargs: + args.append("-c") + args.append(str(kwargs["cores per task"])) + + if "gpus" in kwargs: + ngpus = str(kwargs["gpus"]) + if ngpus.isdecimal(): + args.append("-g") + args.append(ngpus) + + # flux has additional arguments that can be passed via the '-o' flag. + addtl = [] + addtl_args = kwargs.get("addtl_args", {}) + for key, value in addtl_args.items(): + addtl.append(f"{key}={value}") + + if addtl: + args.append("-o") + args.append(",".join(addtl)) + + return " ".join(args) + + @classmethod + def get_statuses(cls, joblist): + # We need to import flux here, as it may not be installed on + # all systems. + if not cls.flux_handle: + cls.flux_handle = flux.Flux() + LOGGER.debug("New Flux instance created.") + + LOGGER.debug( + "Handle address -- %s", hex(id(cls.flux_handle))) + + jobs_rpc = flux.job.list.JobList(cls.flux_handle, ids=joblist) + + statuses = {} + for jobinfo in jobs_rpc.jobs(): + statuses[jobinfo.id] = cls.state(jobinfo.status_abbrev) + + chk_status = JobStatusCode.OK + # Print all errors accumulated in JobList RPC: + try: + for err in jobs_rpc.errors: + chk_status = JobStatusCode.ERROR + LOGGER.error("Error in JobList RPC %s", err) + except EnvironmentError: + pass + + return chk_status, statuses + + @classmethod + def cancel(cls, joblist): + """ + Cancel a job using Flux 0.17.0 cancellation API. + + :param joblist: A list of job identifiers to cancel. + :return: CancelCode enumeration that reflects result of cancellation. + "return: A cancel return code indicating how cancellation call exited. + """ + # We need to import flux here, as it may not be installed on + # all systems. + if not cls.flux_handle: + cls.flux_handle = flux.Flux() + LOGGER.debug("New Flux instance created.") + + LOGGER.debug( + "Handle address -- %s", hex(id(cls.flux_handle))) + LOGGER.debug( + "Attempting to cancel jobs.\nJoblist:\n%s", + "\n".join(str(j) for j in joblist) + ) + + cancel_code = CancelCode.OK + cancel_rcode = 0 + for job in joblist: + try: + LOGGER.debug("Cancelling Job %s...", job) + flux.job.cancel(cls.flux_handle, int(job)) + except Exception as exception: + LOGGER.error(str(exception)) + cancel_code = CancelCode.ERROR + cancel_rcode = 1 + + return cancel_code, cancel_rcode + + @staticmethod + def state(state): + if state == "CD": + return State.FINISHED + elif state == "F": + return State.FAILED + elif state == "R": + return State.RUNNING + elif state == "PD": + return State.PENDING + elif state == "C": + return State.CANCELLED + elif state == "TO": + return State.TIMEDOUT + else: + return State.UNKNOWN diff --git a/samples/lulesh/lulesh_sample1_unix_flux.yaml b/samples/lulesh/lulesh_sample1_unix_flux.yaml index f9cad06d5..330507681 100644 --- a/samples/lulesh/lulesh_sample1_unix_flux.yaml +++ b/samples/lulesh/lulesh_sample1_unix_flux.yaml @@ -17,7 +17,6 @@ env: batch: type : flux - version : "0.19.0" host : quartz bank : baasic queue : pbatch