Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
update line to 150
Browse files Browse the repository at this point in the history
marameref committed Mar 6, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 4ff6b5d commit ab9131e
Showing 14 changed files with 42 additions and 63 deletions.
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@
# W504: line break after binary operator
# (Raised by flake8 even when it is followed)
ignore = E126, E402, E129, W504
max-line-length = 151
max-line-length = 150
exclude = test_import_fail.py,
parsl/executors/workqueue/parsl_coprocess.py
# E741 disallows ambiguous single letter names which look like numbers
10 changes: 9 additions & 1 deletion docs/userguide/mpi_apps.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
MPI Apps
========

.. note::

Parsl's support for MPI Apps described below is pending release.
Please use the ``mpi_experimental_3`` branch to use the functionality
described in this document. To install directly from github:

>> pip install git+https://github.com/Parsl/parsl.git@mpi_experimental_3

MPI applications run multiple copies of a program that complete a single task by
coordinating using messages passed within or across nodes.
Starting MPI application requires invoking a "launcher" code (e.g., ``mpiexec``) from one node
@@ -138,7 +146,7 @@ Writing MPI-Compatible Apps
++++++++++++++++++++++++++++
In MPI mode, the :class:`~parsl.executors.high_throughput.executor.HighThroughputExecutor` can execute both Python or Bash Apps which invokes the MPI application.
However, it is important to note that Python Apps that directly use ``mpi4py`` is not supported.
However, it is important to not that Python Apps that directly use ``mpi4py`` is not supported.
For multi-node MPI applications, especially when running multiple applications within a single batch job,
it is important to specify the resource requirements for the app so that the Parsl worker can provision
Binary file added parsl/dataflow/.taskrecord.py.swo
Binary file not shown.
5 changes: 2 additions & 3 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
@@ -1143,9 +1143,8 @@ def add_executors(self, executors):

def atexit_cleanup(self) -> None:
if not self.cleanup_called:
logger.warning("Python is exiting with a DFK still running. "
"You should call parsl.dfk().cleanup() before "
"exiting to release any resources")
logger.info("DFK cleanup because python process is exiting")
self.cleanup()
else:
logger.info("python process is exiting, but DFK has already been cleaned up")

2 changes: 1 addition & 1 deletion parsl/dataflow/taskrecord.py
Original file line number Diff line number Diff line change
@@ -70,7 +70,7 @@ class TaskRecord(TypedDict, total=False):
# these three could be more strongly typed perhaps but I'm not thinking about that now
func: Callable
fn_hash: str
args: Sequence[Any] # in some places we uses a Tuple[Any, ...] and in some places a List[Any]. This is an attempt to correctly type both of those.
args: Sequence[Any] # In some places we use a Tuple[Any, ...] and in some places a List[Any]. This is an attempt to correctly type both of those.
kwargs: Dict[str, Any]

time_invoked: Optional[datetime.datetime]
12 changes: 4 additions & 8 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
@@ -328,7 +328,7 @@ def __init__(self,
def _warn_deprecated(self, old: str, new: str):
warnings.warn(
f"{old} is deprecated and will be removed in a future release. "
f"Please use {new} instead.",
"Please use {new} instead.",
DeprecationWarning,
stacklevel=2
)
@@ -630,7 +630,8 @@ def submit(self, func, resource_specification, *args, **kwargs):
The outgoing_q is an external process listens on this
queue for new work. This method behaves like a
submit call as described here `Python docs: <https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor>`_
submit call as described here `Python docs: <https://docs.python.org/3/library/
concurrent.futures.html#concurrent.futures.ThreadPoolExecutor>`_
Args:
- func (callable) : Callable function
@@ -739,12 +740,7 @@ class BlockInfo:
block_info[b_id].tasks += manager['tasks']
block_info[b_id].idle = min(block_info[b_id].idle, manager['idle_duration'])

# The scaling policy is that longest idle blocks should be scaled down
# in preference to least idle (most recently used) blocks.
# Other policies could be implemented here.

sorted_blocks = sorted(block_info.items(), key=lambda item: (-item[1].idle, item[1].tasks))

sorted_blocks = sorted(block_info.items(), key=lambda item: (item[1].idle, item[1].tasks))
logger.debug(f"Scale in selecting from {len(sorted_blocks)} blocks")
if max_idletime is None:
block_ids_to_kill = [x[0] for x in sorted_blocks[:blocks]]
8 changes: 7 additions & 1 deletion parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
@@ -392,7 +392,13 @@ def start(self) -> None:
logger.info("Processed {} tasks in {} seconds".format(self.count, delta))
logger.warning("Exiting")

def process_task_outgoing_incoming(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket], kill_event: threading.Event) -> None:
def process_task_outgoing_incoming(
self,
interesting_managers: Set[bytes],
hub_channel: Optional[zmq.Socket],
kill_event: threading.Event
) -> None:

"""Process one message from manager on the task_outgoing channel.
Note that this message flow is in contradiction to the name of the
channel - it is not an outgoing message and it is not a task.
5 changes: 4 additions & 1 deletion parsl/jobs/strategy.py
Original file line number Diff line number Diff line change
@@ -245,7 +245,10 @@ def _general_strategy(self, status_list, *, strategy_type):
exec_status.scale_in(active_blocks - min_blocks)

else:
logger.debug(f"Idle time {idle_duration}s is less than max_idletime {self.max_idletime}s for executor {label}; not scaling in")
logger.debug(
f"Idle time {idle_duration}s is less than max_idletime {self.max_idletime}s "
f"for executor {label}; not scaling in"
)

# Case 2
# More tasks than the available slots.
12 changes: 2 additions & 10 deletions parsl/monitoring/db_manager.py
Original file line number Diff line number Diff line change
@@ -250,12 +250,6 @@ class Resource(Base):
'psutil_process_disk_write', Float, nullable=True)
psutil_process_status = Column(
'psutil_process_status', Text, nullable=True)
psutil_cpu_num = Column(
'psutil_cpu_num', Text, nullable=True)
psutil_process_num_ctx_switches_voluntary = Column(
'psutil_process_num_ctx_switches_voluntary', Float, nullable=True)
psutil_process_num_ctx_switches_involuntary = Column(
'psutil_process_num_ctx_switches_involuntary', Float, nullable=True)
__table_args__ = (
PrimaryKeyConstraint('try_id', 'task_id', 'run_id', 'timestamp'),
)
@@ -524,10 +518,8 @@ def start(self,
reprocessable_first_resource_messages.append(msg)
else:
if task_try_id in deferred_resource_messages:
logger.error(
"Task {} already has a deferred resource message. "
"Discarding previous message.".format(msg['task_id'])
)
logger.error(f"Task {msg['task_id']} already has a deferred resource message. Discarding previous message.")

deferred_resource_messages[task_try_id] = msg
elif msg['last_msg']:
# This assumes that the primary key has been added
29 changes: 0 additions & 29 deletions parsl/monitoring/remote.py
Original file line number Diff line number Diff line change
@@ -201,8 +201,6 @@ def monitor(pid: int,

children_user_time = {} # type: Dict[int, float]
children_system_time = {} # type: Dict[int, float]
children_num_ctx_switches_voluntary = {} # type: Dict[int, float]
children_num_ctx_switches_involuntary = {} # type: Dict[int, float]

def accumulate_and_prepare() -> Dict[str, Any]:
d = {"psutil_process_" + str(k): v for k, v in pm.as_dict().items() if k in simple}
@@ -220,15 +218,6 @@ def accumulate_and_prepare() -> Dict[str, Any]:
logging.debug("got children")

d["psutil_cpu_count"] = psutil.cpu_count()

# note that this will be the CPU number of the base process, not anything launched by it
d["psutil_cpu_num"] = pm.cpu_num()

pctxsw = pm.num_ctx_switches()

d["psutil_process_num_ctx_switches_voluntary"] = pctxsw.voluntary
d["psutil_process_num_ctx_switches_involuntary"] = pctxsw.involuntary

d['psutil_process_memory_virtual'] = pm.memory_info().vms
d['psutil_process_memory_resident'] = pm.memory_info().rss
d['psutil_process_time_user'] = pm.cpu_times().user
@@ -249,11 +238,6 @@ def accumulate_and_prepare() -> Dict[str, Any]:
child_system_time = child.cpu_times().system
children_user_time[child.pid] = child_user_time
children_system_time[child.pid] = child_system_time

pctxsw = child.num_ctx_switches()
children_num_ctx_switches_voluntary[child.pid] = pctxsw.voluntary
children_num_ctx_switches_involuntary[child.pid] = pctxsw.involuntary

d['psutil_process_memory_virtual'] += child.memory_info().vms
d['psutil_process_memory_resident'] += child.memory_info().rss
try:
@@ -264,27 +248,14 @@ def accumulate_and_prepare() -> Dict[str, Any]:
logging.exception("Exception reading IO counters for child {k}. Recorded IO usage may be incomplete".format(k=k), exc_info=True)
d['psutil_process_disk_write'] += 0
d['psutil_process_disk_read'] += 0

total_children_user_time = 0.0
for child_pid in children_user_time:
total_children_user_time += children_user_time[child_pid]

total_children_system_time = 0.0
for child_pid in children_system_time:
total_children_system_time += children_system_time[child_pid]

total_children_num_ctx_switches_voluntary = 0.0
for child_pid in children_num_ctx_switches_voluntary:
total_children_num_ctx_switches_voluntary += children_num_ctx_switches_voluntary[child_pid]

total_children_num_ctx_switches_involuntary = 0.0
for child_pid in children_num_ctx_switches_involuntary:
total_children_num_ctx_switches_involuntary += children_num_ctx_switches_involuntary[child_pid]

d['psutil_process_time_user'] += total_children_user_time
d['psutil_process_time_system'] += total_children_system_time
d['psutil_process_num_ctx_switches_voluntary'] += total_children_num_ctx_switches_voluntary
d['psutil_process_num_ctx_switches_involuntary'] += total_children_num_ctx_switches_involuntary
logging.debug("sending message")
return d

7 changes: 0 additions & 7 deletions parsl/monitoring/visualization/models.py
Original file line number Diff line number Diff line change
@@ -102,12 +102,5 @@ class Resource(db.Model):
'psutil_process_disk_write', db.Float, nullable=True)
psutil_process_status = db.Column(
'psutil_process_status', db.Text, nullable=True)
psutil_cpu_num = db.Column(
'psutil_cpu_num', db.Text, nullable=True)
psutil_process_num_ctx_switches_voluntary = db.Column(
'psutil_process_num_ctx_switches_voluntary', db.Float, nullable=True)
psutil_process_num_ctx_switches_involuntary = db.Column(
'psutil_process_num_ctx_switches_involuntary', db.Float, nullable=True)

__table_args__ = (
db.PrimaryKeyConstraint('task_id', 'run_id', 'timestamp'),)
8 changes: 7 additions & 1 deletion parsl/providers/slurm/slurm.py
Original file line number Diff line number Diff line change
@@ -290,7 +290,13 @@ def submit(self, command: str, tasks_per_node: int, job_name="parsl.slurm") -> s
else:
logger.error("Submit command failed")
logger.error("Retcode:%s STDOUT:%s STDERR:%s", retcode, stdout.strip(), stderr.strip())
raise SubmitException(job_name, "Could not read job ID from submit command standard output", stdout=stdout, stderr=stderr, retcode=retcode)
raise SubmitException(
job_name,
"Could not read job ID from submit command standard output",
stdout=stdout,
stderr=stderr,
retcode=retcode
)

def cancel(self, job_ids):
''' Cancels the jobs specified by a list of job ids
4 changes: 4 additions & 0 deletions t.err
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
--> executable follows <--
echo "Hello world"; echo "Hello world" >&2
--> end executable <--
Hello world
1 change: 1 addition & 0 deletions test.memo.stdout.x
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
X

0 comments on commit ab9131e

Please sign in to comment.