Skip to content

Commit

Permalink
Add support for Flux core 0.26.0 (#357)
Browse files Browse the repository at this point in the history
* flux: fix version typo in comment

* flux: add 0.26.0 adaptor

Notable differences from 0.18.0 version:
- Use Flux's builtin statustostr & remove adaptor-local statustostr and
  attr list
- Drop the individual calls to `job_id_list` for a single call to `JobList`

* Removal of forced Flux default version.

* Addition of [email protected] dockerfile.

* Removal of unused cb_args variable.

Co-authored-by: Francesco Di Natale <[email protected]>
  • Loading branch information
SteVwonder and FrankD412 committed May 28, 2022
1 parent 076cf87 commit f6ecffe
Show file tree
Hide file tree
Showing 5 changed files with 220 additions and 3 deletions.
19 changes: 19 additions & 0 deletions docker/flux/0.26.0/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
FROM fluxrm/flux-core:centos7-v0.26.0
ENV PATH="/home/fluxuser/.local/bin:$PATH"

RUN curl -sL https://download.open-mpi.org/release/open-mpi/v4.0/openmpi-4.0.5.tar.gz | tar xz && \
cd ./openmpi-4.0.5 && \
./configure && \
make && \
sudo make install

COPY . /home/fluxuser/maestrowf
WORKDIR /home/fluxuser/maestrowf
RUN echo `which python3` && python3 --version
RUN echo `which pip3` && pip3 --version
RUN pip3 install -U --user pip
RUN echo "$PWD" && ls -la && pip3 install --user .
RUN pip3 install -U ipython
WORKDIR /home/fluxuser

LABEL maintainer="Francesco Di Natale [email protected]"
2 changes: 1 addition & 1 deletion maestrowf/interfaces/script/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def lookup_status(self, cancel_status):
class FluxFactory(object):
"""A factory for swapping out Flux's backend interface based on version."""

latest = "0.17.0"
latest = "0.26.0"

def _iter_flux():
"""
Expand Down
2 changes: 1 addition & 1 deletion maestrowf/interfaces/script/_flux/flux0_18_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@


class FluxInterface_0190(FluxInterface):
# This utility class is for Flux 0.17.0
# This utility class is for Flux 0.19.0
key = "0.19.0"

_FIELDATTRS = {
Expand Down
199 changes: 199 additions & 0 deletions maestrowf/interfaces/script/_flux/flux0_26_0.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
from datetime import datetime
import logging
from math import ceil
import os

from maestrowf.abstracts.enums import CancelCode, JobStatusCode, State, \
SubmissionCode
from maestrowf.abstracts.interfaces.flux import FluxInterface

LOGGER = logging.getLogger(__name__)

try:
import flux
except ImportError:
LOGGER.info("Failed to import Flux. Continuing.")


class FluxInterface_0260(FluxInterface):
# This utility class is for Flux 0.26.0
key = "0.26.0"

flux_handle = None

@classmethod
def submit(
cls, nodes, procs, cores_per_task, path, cwd, walltime,
ngpus=0, job_name=None, force_broker=False
):
if not cls.flux_handle:
cls.flux_handle = flux.Flux()
LOGGER.debug("New Flux instance created.")

# NOTE: This previously placed everything under a broker. However,
# if there's a job that schedules items to Flux, it will schedule all
# new jobs to the sub-broker. Sometimes this is desired, but it's
# incorrect to make that the general case. If we are asking for a
# single node, don't use a broker -- but introduce a flag that can
# force a single node to run in a broker.

if force_broker or nodes > 1:
LOGGER.debug(
"Launch under Flux sub-broker. [force_broker=%s, nodes=%d]",
force_broker, nodes
)
ngpus_per_slot = int(ceil(ngpus / nodes))
jobspec = flux.job.JobspecV1.from_nest_command(
[path], num_nodes=nodes, cores_per_slot=cores_per_task,
num_slots=nodes, gpus_per_slot=ngpus_per_slot)
else:
LOGGER.debug(
"Launch under root Flux broker. [force_broker=%s, nodes=%d]",
force_broker, nodes
)
jobspec = flux.job.JobspecV1.from_command(
[path], num_tasks=procs, num_nodes=nodes,
cores_per_task=cores_per_task, gpus_per_task=ngpus)

LOGGER.debug("Handle address -- %s", hex(id(cls.flux_handle)))
if job_name:
jobspec.setattr("system.job.name", job_name)
jobspec.cwd = cwd
jobspec.environment = dict(os.environ)

if walltime and walltime != "inf":
seconds = datetime.strptime(walltime, "%H:%M:%S")
seconds = seconds - datetime(1900, 1, 1)
jobspec.duration = seconds

jobspec.stdout = f"{job_name}.{{{{id}}}}.out"
jobspec.stderr = f"{job_name}.{{{{id}}}}.err"

try:
# Submit our job spec.
jobid = \
flux.job.submit(cls.flux_handle, jobspec, waitable=True)
submit_status = SubmissionCode.OK
retcode = 0

LOGGER.info("Submission returned status OK. -- "
"Assigned identifier (%s)", jobid)
except Exception as exception:
LOGGER.error(
"Submission failed -- Message (%s).", exception)
jobid = -1
retcode = -1
submit_status = SubmissionCode.ERROR

return jobid, retcode, submit_status

@classmethod
def parallelize(cls, procs, nodes=None, **kwargs):
args = ["flux", "mini", "run", "-n", str(procs)]

# if we've specified nodes, add that to wreckrun
ntasks = nodes if nodes else 1
args.append("-N")
args.append(str(ntasks))

if "cores per task" in kwargs:
args.append("-c")
args.append(str(kwargs["cores per task"]))

if "gpus" in kwargs:
ngpus = str(kwargs["gpus"])
if ngpus.isdecimal():
args.append("-g")
args.append(ngpus)

# flux has additional arguments that can be passed via the '-o' flag.
addtl = []
addtl_args = kwargs.get("addtl_args", {})
for key, value in addtl_args.items():
addtl.append(f"{key}={value}")

if addtl:
args.append("-o")
args.append(",".join(addtl))

return " ".join(args)

@classmethod
def get_statuses(cls, joblist):
# We need to import flux here, as it may not be installed on
# all systems.
if not cls.flux_handle:
cls.flux_handle = flux.Flux()
LOGGER.debug("New Flux instance created.")

LOGGER.debug(
"Handle address -- %s", hex(id(cls.flux_handle)))

jobs_rpc = flux.job.list.JobList(cls.flux_handle, ids=joblist)

statuses = {}
for jobinfo in jobs_rpc.jobs():
statuses[jobinfo.id] = cls.state(jobinfo.status_abbrev)

chk_status = JobStatusCode.OK
# Print all errors accumulated in JobList RPC:
try:
for err in jobs_rpc.errors:
chk_status = JobStatusCode.ERROR
LOGGER.error("Error in JobList RPC %s", err)
except EnvironmentError:
pass

return chk_status, statuses

@classmethod
def cancel(cls, joblist):
"""
Cancel a job using Flux 0.17.0 cancellation API.
:param joblist: A list of job identifiers to cancel.
:return: CancelCode enumeration that reflects result of cancellation.
"return: A cancel return code indicating how cancellation call exited.
"""
# We need to import flux here, as it may not be installed on
# all systems.
if not cls.flux_handle:
cls.flux_handle = flux.Flux()
LOGGER.debug("New Flux instance created.")

LOGGER.debug(
"Handle address -- %s", hex(id(cls.flux_handle)))
LOGGER.debug(
"Attempting to cancel jobs.\nJoblist:\n%s",
"\n".join(str(j) for j in joblist)
)

cancel_code = CancelCode.OK
cancel_rcode = 0
for job in joblist:
try:
LOGGER.debug("Cancelling Job %s...", job)
flux.job.cancel(cls.flux_handle, int(job))
except Exception as exception:
LOGGER.error(str(exception))
cancel_code = CancelCode.ERROR
cancel_rcode = 1

return cancel_code, cancel_rcode

@staticmethod
def state(state):
if state == "CD":
return State.FINISHED
elif state == "F":
return State.FAILED
elif state == "R":
return State.RUNNING
elif state == "PD":
return State.PENDING
elif state == "C":
return State.CANCELLED
elif state == "TO":
return State.TIMEDOUT
else:
return State.UNKNOWN
1 change: 0 additions & 1 deletion samples/lulesh/lulesh_sample1_unix_flux.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ env:

batch:
type : flux
version : "0.19.0"
host : quartz
bank : baasic
queue : pbatch
Expand Down

0 comments on commit f6ecffe

Please sign in to comment.